102 lines
3.4 KiB
Python
102 lines
3.4 KiB
Python
#encoding = utf8
|
|
|
|
import edge_tts
|
|
import asyncio
|
|
import pyaudio
|
|
from pydub import AudioSegment
|
|
from io import BytesIO
|
|
|
|
# 如果在 Jupyter Notebook 中使用,解除事件循环限制
|
|
try:
|
|
import nest_asyncio
|
|
nest_asyncio.apply()
|
|
except ImportError:
|
|
pass
|
|
|
|
def play_audio(data: bytes, stream: pyaudio.Stream) -> None:
|
|
stream.write(AudioSegment.from_mp3(BytesIO(data)).raw_data)
|
|
|
|
CHUNK_SIZE = 20 * 1024
|
|
async def play_tts(text, voice):
|
|
communicate = edge_tts.Communicate(text, voice)
|
|
|
|
# 设置 PyAudio
|
|
audio = pyaudio.PyAudio()
|
|
stream = audio.open(format=pyaudio.paInt16, channels=1, rate=24000, output=True)
|
|
|
|
# async for chunk in communicate.stream(): # 使用 stream 方法
|
|
# if chunk['type'] == 'audio': # 确保 chunk 是字节流
|
|
# stream.write(chunk['data'])
|
|
|
|
total_data = b''
|
|
for chunk in communicate.stream_sync():
|
|
if chunk["type"] == "audio" and chunk["data"]:
|
|
total_data += chunk["data"]
|
|
if len(total_data) >= CHUNK_SIZE:
|
|
# print(f"Time elapsed: {time.time() - start_time:.2f} seconds") # Print time
|
|
stream.write(AudioSegment.from_mp3(BytesIO(total_data[:CHUNK_SIZE])).raw_data)
|
|
# play_audio(total_data[:CHUNK_SIZE], stream) # Play first CHUNK_SIZE bytes
|
|
total_data = total_data[CHUNK_SIZE:] # Remove played data
|
|
# play_audio(total_data, stream)
|
|
# 停止和关闭音频流
|
|
stream.stop_stream()
|
|
stream.close()
|
|
audio.terminate()
|
|
|
|
|
|
async def save_to_file(text, voice, filename):
|
|
communicate = edge_tts.Communicate(text, voice)
|
|
|
|
with open(filename, "wb") as f:
|
|
async for chunk in communicate.stream():
|
|
if chunk['type'] == 'audio':
|
|
f.write(chunk['data'])
|
|
|
|
if __name__ == "__main__":
|
|
text = "Hello, this is a test of the Edge TTS service."
|
|
voice = "en-US-JessaNeural"
|
|
|
|
# 使用 asyncio.run() 运行异步函数
|
|
asyncio.run(play_tts(text, voice))
|
|
# asyncio.run(save_to_file(text, voice, "output.wav"))
|
|
|
|
#
|
|
# import edge_tts
|
|
# import pyaudio
|
|
# from io import BytesIO
|
|
# from pydub import AudioSegment
|
|
# import time
|
|
#
|
|
# TEXT = 'Hello World! How are you guys doing? I hope great, cause I am having fun and honestly it has been a blast'
|
|
# VOICE = "en-US-AndrewMultilingualNeural"
|
|
# CHUNK_SIZE = 20 * 1024 # Assuming around 1024 bytes per chunk (adjust based on format)
|
|
#
|
|
# def main() -> None:
|
|
# start_time = time.time()
|
|
# communicator = edge_tts.Communicate(TEXT, VOICE)
|
|
#
|
|
# pyaudio_instance = pyaudio.PyAudio()
|
|
# audio_stream = pyaudio_instance.open(format=pyaudio.paInt16, channels=1, rate=16000, output=True)
|
|
#
|
|
# total_data = b'' # Store audio data instead of chunks
|
|
#
|
|
# for chunk in communicator.stream_sync():
|
|
# if chunk["type"] == "audio" and chunk["data"]:
|
|
# total_data += chunk["data"]
|
|
# if len(total_data) >= CHUNK_SIZE:
|
|
# print(f"Time elapsed: {time.time() - start_time:.2f} seconds") # Print time
|
|
# play_audio(total_data[:CHUNK_SIZE], audio_stream) # Play first CHUNK_SIZE bytes
|
|
# total_data = total_data[CHUNK_SIZE:] # Remove played data
|
|
#
|
|
# # Play remaining audio
|
|
# play_audio(total_data, audio_stream)
|
|
#
|
|
# audio_stream.stop_stream()
|
|
# audio_stream.close()
|
|
# pyaudio_instance.terminate()
|
|
#
|
|
# def play_audio(data: bytes, stream: pyaudio.Stream) -> None:
|
|
# stream.write(AudioSegment.from_mp3(BytesIO(data)).raw_data)
|
|
#
|
|
# if __name__ == "__main__":
|
|
# main() |