human/edge_tts_test.py
2024-09-27 01:34:52 +08:00

102 lines
3.4 KiB
Python

#encoding = utf8
import edge_tts
import asyncio
import pyaudio
from pydub import AudioSegment
from io import BytesIO
# 如果在 Jupyter Notebook 中使用,解除事件循环限制
try:
import nest_asyncio
nest_asyncio.apply()
except ImportError:
pass
def play_audio(data: bytes, stream: pyaudio.Stream) -> None:
stream.write(AudioSegment.from_mp3(BytesIO(data)).raw_data)
CHUNK_SIZE = 20 * 1024
async def play_tts(text, voice):
communicate = edge_tts.Communicate(text, voice)
# 设置 PyAudio
audio = pyaudio.PyAudio()
stream = audio.open(format=pyaudio.paInt16, channels=1, rate=16000, output=True)
# async for chunk in communicate.stream(): # 使用 stream 方法
# if chunk['type'] == 'audio': # 确保 chunk 是字节流
# stream.write(chunk['data'])
total_data = b''
for chunk in communicate.stream_sync():
if chunk["type"] == "audio" and chunk["data"]:
total_data += chunk["data"]
if len(total_data) >= CHUNK_SIZE:
# print(f"Time elapsed: {time.time() - start_time:.2f} seconds") # Print time
stream.write(AudioSegment.from_mp3(BytesIO(total_data[:CHUNK_SIZE])).raw_data)
# play_audio(total_data[:CHUNK_SIZE], stream) # Play first CHUNK_SIZE bytes
total_data = total_data[CHUNK_SIZE:] # Remove played data
# play_audio(total_data, stream)
# 停止和关闭音频流
stream.stop_stream()
stream.close()
audio.terminate()
async def save_to_file(text, voice, filename):
communicate = edge_tts.Communicate(text, voice)
with open(filename, "wb") as f:
async for chunk in communicate.stream():
if chunk['type'] == 'audio':
f.write(chunk['data'])
if __name__ == "__main__":
text = "Hello, this is a test of the Edge TTS service."
voice = "en-US-JessaNeural"
# 使用 asyncio.run() 运行异步函数
asyncio.run(play_tts(text, voice))
# asyncio.run(save_to_file(text, voice, "output.wav"))
#
# import edge_tts
# import pyaudio
# from io import BytesIO
# from pydub import AudioSegment
# import time
#
# TEXT = 'Hello World! How are you guys doing? I hope great, cause I am having fun and honestly it has been a blast'
# VOICE = "en-US-AndrewMultilingualNeural"
# CHUNK_SIZE = 20 * 1024 # Assuming around 1024 bytes per chunk (adjust based on format)
#
# def main() -> None:
# start_time = time.time()
# communicator = edge_tts.Communicate(TEXT, VOICE)
#
# pyaudio_instance = pyaudio.PyAudio()
# audio_stream = pyaudio_instance.open(format=pyaudio.paInt16, channels=1, rate=16000, output=True)
#
# total_data = b'' # Store audio data instead of chunks
#
# for chunk in communicator.stream_sync():
# if chunk["type"] == "audio" and chunk["data"]:
# total_data += chunk["data"]
# if len(total_data) >= CHUNK_SIZE:
# print(f"Time elapsed: {time.time() - start_time:.2f} seconds") # Print time
# play_audio(total_data[:CHUNK_SIZE], audio_stream) # Play first CHUNK_SIZE bytes
# total_data = total_data[CHUNK_SIZE:] # Remove played data
#
# # Play remaining audio
# play_audio(total_data, audio_stream)
#
# audio_stream.stop_stream()
# audio_stream.close()
# pyaudio_instance.terminate()
#
# def play_audio(data: bytes, stream: pyaudio.Stream) -> None:
# stream.write(AudioSegment.from_mp3(BytesIO(data)).raw_data)
#
# if __name__ == "__main__":
# main()