#encoding = utf8 import logging import asyncio import time import edge_tts import numpy as np import pyaudio import soundfile import sounddevice import resampy import queue from io import BytesIO from queue import Queue from threading import Thread, Event from IPython.core.display_functions import display from pydub import AudioSegment import audio logger = logging.getLogger(__name__) class TTSBase: def __init__(self, human): self._human = human self._thread = None self._queue = Queue() self._io_stream = BytesIO() self._chunk_len = self._human.get_audio_sample_rate() // self._human.get_fps() self._exit_event = Event() self._thread = Thread(target=self._on_run) self._exit_event.set() self._thread.start() # self._pcm_player = pyaudio.PyAudio() # self._pcm_stream = self._pcm_player.open(format=pyaudio.paInt16, # channels=1, rate=24000, output=True) logging.info('tts start') def _on_run(self): logging.info('tts run') while self._exit_event.is_set(): try: txt = self._queue.get(block=True, timeout=1) except queue.Empty: continue self._request(txt) logging.info('tts exit') def _request(self, txt): voice = 'zh-CN-XiaoyiNeural' t = time.time() asyncio.new_event_loop().run_until_complete(self.__on_request(voice, txt)) logger.info(f'edge tts time:{time.time() - t : 0.4f}s') self._io_stream.seek(0) stream = self.__create_bytes_stream(self._io_stream) # audio.save_wav(stream, "./temp/audio/test1.wav", 16000) stream_len = stream.shape[0] print("stream_len:", stream_len, " _chunk_len:", self._chunk_len) index = 0 segment = 0 while stream_len >= self._chunk_len: audio_chunk = stream[index:index + self._chunk_len] self._human.push_audio_chunk(audio_chunk) stream_len -= self._chunk_len index += self._chunk_len segment = segment + 1 print("segment:", segment) self._io_stream.seek(0) self._io_stream.truncate() def __create_bytes_stream(self, io_stream): stream, sample_rate = soundfile.read(io_stream) logger.info(f'tts audio stream {sample_rate} : {stream.shape}') stream = stream.astype(np.float32) if stream.ndim > 1: logger.warning(f'tts audio has {stream.shape[1]} channels, only use the first') stream = stream[:, 1] if sample_rate != self._human.get_audio_sample_rate() and stream.shape[0] > 0: logger.warning(f'tts audio sample rate is {sample_rate}, resample to {self._human.get_audio_sample_rate() }') stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self._human.get_audio_sample_rate() ) return stream async def __on_request(self, voice, txt): communicate = edge_tts.Communicate(txt, voice) first = True total_data = b'' CHUNK_SIZE = self._chunk_len async for chunk in communicate.stream(): if chunk["type"] == "audio" and chunk["data"]: data = chunk['data'] self._io_stream.write(data) elif chunk["type"] == "WordBoundary": pass ''' total_data += chunk["data"] if len(total_data) >= CHUNK_SIZE: # print(f"Time elapsed: {time.time() - start_time:.2f} seconds") # Print time audio_data = AudioSegment.from_mp3(BytesIO(total_data[:CHUNK_SIZE])) #.raw_data audio_data = audio_data.set_frame_rate(self._human.get_audio_sample_rate()) # self._human.push_audio_chunk(audio_data) self._pcm_stream.write(audio_data.raw_data) # play_audio(total_data[:CHUNK_SIZE], stream) # Play first CHUNK_SIZE bytes total_data = total_data[CHUNK_SIZE:] # Remove played data ''' # if first: # first = False # if chuck['type'] == 'audio': # # self._io_stream.write(chuck['data']) # self._io_stream.write(AudioSegment.from_mp3(BytesIO(total_data[:CHUNK_SIZE])).raw_data) # if len(total_data) > 0: # self._pcm_stream.write(AudioSegment.from_mp3(BytesIO(total_data)).raw_data) # audio_data = AudioSegment.from_mp3(BytesIO(total_data)) # .raw_data # audio_data = audio_data.set_frame_rate(self._human.get_audio_sample_rate()) # self._pcm_stream.write(audio_data.raw_data) # self._human.push_audio_chunk(audio_data) # self._io_stream.write(AudioSegment.from_mp3(BytesIO(total_data)).raw_data) def stop(self): self._pcm_stream.stop_stream() self._pcm_player.close(self._pcm_stream) self._pcm_player.terminate() if self._exit_event is None: return self._exit_event.clear() self._thread.join() logging.info('tts stop') def clear(self): self._queue.queue.clear() def push_txt(self, txt): self._queue.put(txt)