#encoding = utf8 import logging import asyncio import time import edge_tts import numpy as np import pyaudio import soundfile import sounddevice import resampy import queue from io import BytesIO from queue import Queue from threading import Thread, Event from pydub import AudioSegment logger = logging.getLogger(__name__) class TTSBase: def __init__(self, human): self._human = human self._thread = None self._queue = Queue() self._io_stream = BytesIO() self._chunk_len = self._human.get_audio_sample_rate() // self._human.get_fps() self._exit_event = Event() self._thread = Thread(target=self._on_run) self._exit_event.set() self._thread.start() # self._pcm_player = pyaudio.PyAudio() # self._pcm_stream = self._pcm_player.open(format=pyaudio.paInt16, # channels=1, rate=24000, output=True) logging.info('tts start') def _on_run(self): logging.info('tts run') while self._exit_event.is_set(): try: txt = self._queue.get(block=True, timeout=1) except queue.Empty: continue self._request(txt) logging.info('tts exit') def _request(self, txt): voice = 'zh-CN-XiaoyiNeural' t = time.time() asyncio.new_event_loop().run_until_complete(self.__on_request(voice, txt)) logger.info(f'edge tts time:{time.time() - t : 0.4f}s') self._io_stream.seek(0) stream = self.__create_bytes_stream(self._io_stream) stream_len = stream.shape[0] # try: # sounddevice.play(stream, samplerate=self._human.get_audio_sample_rate()) # sounddevice.wait() # 等待音频播放完毕 # except Exception as e: # logger.error(f"播放音频出错: {e}") playrec index = 0 while stream_len >= self._chunk_len: audio_chunk = stream[index:index + self._chunk_len] # sounddevice.play(audio_chunk, samplerate=self._human.get_audio_sample_rate()) # self._pcm_stream.write(audio_chunk) # self._pcm_stream.write(audio_chunk.tobytes()) # self._human.push_audio_chunk(audio_chunk) # self._human.push_mel_chunks_queue(audio_chunk) self._human.push_audio_chunk(audio_chunk) stream_len -= self._chunk_len index += self._chunk_len self._io_stream.seek(0) self._io_stream.truncate() def __create_bytes_stream(self, io_stream): stream, sample_rate = soundfile.read(io_stream) logger.info(f'tts audio stream {sample_rate} : {stream.shape}') stream = stream.astype(np.float32) if stream.ndim > 1: logger.warning(f'tts audio has {stream.shape[1]} channels, only use the first') stream = stream[:, 1] if sample_rate != self._human.get_audio_sample_rate() and stream.shape[0] > 0: logger.warning(f'tts audio sample rate is {sample_rate}, resample to {self._human.get_audio_sample_rate() }') stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self._human.get_audio_sample_rate() ) return stream async def __on_request(self, voice, txt): communicate = edge_tts.Communicate(txt, voice) first = True total_data = b'' CHUNK_SIZE = self._chunk_len async for chunk in communicate.stream(): if chunk["type"] == "audio" and chunk["data"]: data = chunk['data'] self._io_stream.write(data) elif chunk["type"] == "WordBoundary": pass ''' total_data += chunk["data"] if len(total_data) >= CHUNK_SIZE: # print(f"Time elapsed: {time.time() - start_time:.2f} seconds") # Print time audio_data = AudioSegment.from_mp3(BytesIO(total_data[:CHUNK_SIZE])) #.raw_data audio_data = audio_data.set_frame_rate(self._human.get_audio_sample_rate()) # self._human.push_audio_chunk(audio_data) self._pcm_stream.write(audio_data.raw_data) # play_audio(total_data[:CHUNK_SIZE], stream) # Play first CHUNK_SIZE bytes total_data = total_data[CHUNK_SIZE:] # Remove played data ''' # if first: # first = False # if chuck['type'] == 'audio': # # self._io_stream.write(chuck['data']) # self._io_stream.write(AudioSegment.from_mp3(BytesIO(total_data[:CHUNK_SIZE])).raw_data) # if len(total_data) > 0: # self._pcm_stream.write(AudioSegment.from_mp3(BytesIO(total_data)).raw_data) # audio_data = AudioSegment.from_mp3(BytesIO(total_data)) # .raw_data # audio_data = audio_data.set_frame_rate(self._human.get_audio_sample_rate()) # self._pcm_stream.write(audio_data.raw_data) # self._human.push_audio_chunk(audio_data) # self._io_stream.write(AudioSegment.from_mp3(BytesIO(total_data)).raw_data) def stop(self): self._pcm_stream.stop_stream() self._pcm_player.close(self._pcm_stream) self._pcm_player.terminate() if self._exit_event is None: return self._exit_event.clear() self._thread.join() logging.info('tts stop') def clear(self): self._queue.queue.clear() def push_txt(self, txt): self._queue.put(txt)