human/edge_tts_test.py
2024-09-29 02:47:04 +08:00

106 lines
3.4 KiB
Python

#encoding = utf8
#
# import edge_tts
# import asyncio
# import pyaudio
# from pydub import AudioSegment
# from io import BytesIO
#
# # 如果在 Jupyter Notebook 中使用,解除事件循环限制
# try:
# import nest_asyncio
# nest_asyncio.apply()
# except ImportError:
# pass
#
# def play_audio(data: bytes, stream: pyaudio.Stream) -> None:
# stream.write(AudioSegment.from_mp3(BytesIO(data)).raw_data)
#
# CHUNK_SIZE = 20 * 1024
# async def play_tts(text, voice):
# communicate = edge_tts.Communicate(text, voice)
#
# # 设置 PyAudio
# audio = pyaudio.PyAudio()
# stream = audio.open(format=pyaudio.paInt16, channels=1, rate=24000, output=True)
#
# # async for chunk in communicate.stream(): # 使用 stream 方法
# # if chunk['type'] == 'audio': # 确保 chunk 是字节流
# # stream.write(chunk['data'])
#
# total_data = b''
# for chunk in communicate.stream_sync():
# if chunk["type"] == "audio" and chunk["data"]:
# total_data += chunk["data"]
# if len(total_data) >= CHUNK_SIZE:
# # print(f"Time elapsed: {time.time() - start_time:.2f} seconds") # Print time
# stream.write(AudioSegment.from_mp3(BytesIO(total_data[:CHUNK_SIZE])).raw_data)
# # play_audio(total_data[:CHUNK_SIZE], stream) # Play first CHUNK_SIZE bytes
# total_data = total_data[CHUNK_SIZE:] # Remove played data
# # play_audio(total_data, stream)
# # 停止和关闭音频流
# stream.stop_stream()
# stream.close()
# audio.terminate()
#
#
# async def save_to_file(text, voice, filename):
# communicate = edge_tts.Communicate(text, voice)
#
# with open(filename, "wb") as f:
# async for chunk in communicate.stream():
# if chunk['type'] == 'audio':
# f.write(chunk['data'])
#
# if __name__ == "__main__":
# text = "Hello, this is a test of the Edge TTS service."
# voice = "en-US-JessaNeural"
#
# # 使用 asyncio.run() 运行异步函数
# asyncio.run(play_tts(text, voice))
# # asyncio.run(save_to_file(text, voice, "output.wav"))
import edge_tts
import pyaudio
from io import BytesIO
from pydub import AudioSegment
import time
TEXT = 'Hello World! How are you guys doing? I hope great, cause I am having fun and honestly it has been a blast'
VOICE = "en-US-AndrewMultilingualNeural"
CHUNK_SIZE = 20 * 1024 # Assuming around 1024 bytes per chunk (adjust based on format)
def main() -> None:
start_time = time.time()
communicator = edge_tts.Communicate(TEXT, VOICE)
pyaudio_instance = pyaudio.PyAudio()
audio_stream = pyaudio_instance.open(format=pyaudio.paInt16, channels=1, rate=24000, output=True)
total_data = b'' # Store audio data instead of chunks
for chunk in communicator.stream_sync():
if chunk["type"] == "audio" and chunk["data"]:
total_data += chunk["data"]
if len(total_data) >= CHUNK_SIZE:
print(f"Time elapsed: {time.time() - start_time:.2f} seconds") # Print time
play_audio(total_data[:CHUNK_SIZE], audio_stream) # Play first CHUNK_SIZE bytes
total_data = total_data[CHUNK_SIZE:] # Remove played data
# Play remaining audio
play_audio(total_data, audio_stream)
audio_stream.stop_stream()
audio_stream.close()
pyaudio_instance.terminate()
def play_audio(data: bytes, stream: pyaudio.Stream) -> None:
stream.write(AudioSegment.from_mp3(BytesIO(data)).raw_data)
if __name__ == "__main__":
main()