human/tts/tts_edge.py
2024-10-11 20:09:54 +08:00

60 lines
1.9 KiB
Python

#encoding = utf8
from io import BytesIO
import numpy as np
import soundfile as sf
import edge_tts
import resampy
from .tts_base import TTSBase
class TTSEdge(TTSBase):
def __init__(self, handle, voice='zh-CN-XiaoyiNeural'):
super().__init__(handle)
self._voice = voice
self._byte_stream = BytesIO()
async def _on_request(self, txt: str):
communicate = edge_tts.Communicate(txt, self._voice)
first = True
async for chunk in communicate.stream():
if first:
first = False
if chunk["type"] == "audio":
self._byte_stream.write(chunk["data"])
elif chunk["type"] == "WordBoundary":
pass
async def _on_handle(self):
self._byte_stream.seek(0)
try:
self._byte_stream.seek(0)
stream = self.__create_bytes_stream(self._byte_stream)
print('-------tts start push chunk')
self._handle.on_handle(stream)
self._byte_stream.seek(0)
self._byte_stream.truncate()
print('-------tts finish push chunk')
except Exception as e:
self._byte_stream.seek(0)
self._byte_stream.truncate()
print('-------tts finish error:', e)
def __create_bytes_stream(self, byte_stream):
stream, sample_rate = sf.read(byte_stream) # [T*sample_rate,] float64
print(f'[INFO]tts audio stream {sample_rate}: {stream.shape}')
stream = stream.astype(np.float32)
if stream.ndim > 1:
print(f'[WARN] audio has {stream.shape[1]} channels, only use the first.')
stream = stream[:, 0]
if sample_rate != self._handle.sample_rate and stream.shape[0] > 0:
print(f'[WARN] audio sample rate is {sample_rate}, resampling into {self._handle.sample_rate}.')
stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self._handle.sample_rate)
return stream