#encoding = utf8 from io import BytesIO import numpy as np import soundfile as sf import edge_tts import resampy from .tts_base import TTSBase class TTSEdge(TTSBase): def __init__(self, handle, voice='zh-CN-XiaoyiNeural'): super().__init__(handle) self._voice = voice self._byte_stream = BytesIO() async def _on_request(self, txt: str): communicate = edge_tts.Communicate(txt, self._voice) first = True async for chunk in communicate.stream(): if first: first = False if chunk["type"] == "audio": self._byte_stream.write(chunk["data"]) elif chunk["type"] == "WordBoundary": pass async def _on_handle(self): self._byte_stream.seek(0) try: self._byte_stream.seek(0) stream = self.__create_bytes_stream(self._byte_stream) print('-------tts start push chunk') self._handle.on_handle(stream) self._byte_stream.seek(0) self._byte_stream.truncate() print('-------tts finish push chunk') except Exception as e: self._byte_stream.seek(0) self._byte_stream.truncate() print('-------tts finish error:', e) def __create_bytes_stream(self, byte_stream): stream, sample_rate = sf.read(byte_stream) # [T*sample_rate,] float64 print(f'[INFO]tts audio stream {sample_rate}: {stream.shape}') stream = stream.astype(np.float32) if stream.ndim > 1: print(f'[WARN] audio has {stream.shape[1]} channels, only use the first.') stream = stream[:, 0] if sample_rate != self._handle.sample_rate and stream.shape[0] > 0: print(f'[WARN] audio sample rate is {sample_rate}, resampling into {self._handle.sample_rate}.') stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self._handle.sample_rate) return stream