#encoding = utf8 # # import edge_tts # import asyncio # import pyaudio # from pydub import AudioSegment # from io import BytesIO # # # 如果在 Jupyter Notebook 中使用,解除事件循环限制 # try: # import nest_asyncio # nest_asyncio.apply() # except ImportError: # pass # # def play_audio(data: bytes, stream: pyaudio.Stream) -> None: # stream.write(AudioSegment.from_mp3(BytesIO(data)).raw_data) # # CHUNK_SIZE = 20 * 1024 # async def play_tts(text, voice): # communicate = edge_tts.Communicate(text, voice) # # # 设置 PyAudio # audio = pyaudio.PyAudio() # stream = audio.open(format=pyaudio.paInt16, channels=1, rate=24000, output=True) # # # async for chunk in communicate.stream(): # 使用 stream 方法 # # if chunk['type'] == 'audio': # 确保 chunk 是字节流 # # stream.write(chunk['data']) # # total_data = b'' # for chunk in communicate.stream_sync(): # if chunk["type"] == "audio" and chunk["data"]: # total_data += chunk["data"] # if len(total_data) >= CHUNK_SIZE: # # print(f"Time elapsed: {time.time() - start_time:.2f} seconds") # Print time # stream.write(AudioSegment.from_mp3(BytesIO(total_data[:CHUNK_SIZE])).raw_data) # # play_audio(total_data[:CHUNK_SIZE], stream) # Play first CHUNK_SIZE bytes # total_data = total_data[CHUNK_SIZE:] # Remove played data # # play_audio(total_data, stream) # # 停止和关闭音频流 # stream.stop_stream() # stream.close() # audio.terminate() # # # async def save_to_file(text, voice, filename): # communicate = edge_tts.Communicate(text, voice) # # with open(filename, "wb") as f: # async for chunk in communicate.stream(): # if chunk['type'] == 'audio': # f.write(chunk['data']) # # if __name__ == "__main__": # text = "Hello, this is a test of the Edge TTS service." # voice = "en-US-JessaNeural" # # # 使用 asyncio.run() 运行异步函数 # asyncio.run(play_tts(text, voice)) # # asyncio.run(save_to_file(text, voice, "output.wav")) import edge_tts import pyaudio from io import BytesIO from pydub import AudioSegment import time TEXT = 'Hello World! How are you guys doing? I hope great, cause I am having fun and honestly it has been a blast' VOICE = "en-US-AndrewMultilingualNeural" CHUNK_SIZE = 20 * 1024 # Assuming around 1024 bytes per chunk (adjust based on format) def main() -> None: start_time = time.time() communicator = edge_tts.Communicate(TEXT, VOICE) pyaudio_instance = pyaudio.PyAudio() audio_stream = pyaudio_instance.open(format=pyaudio.paInt16, channels=1, rate=24000, output=True) total_data = b'' # Store audio data instead of chunks for chunk in communicator.stream_sync(): if chunk["type"] == "audio" and chunk["data"]: total_data += chunk["data"] if len(total_data) >= CHUNK_SIZE: print(f"Time elapsed: {time.time() - start_time:.2f} seconds") # Print time play_audio(total_data[:CHUNK_SIZE], audio_stream) # Play first CHUNK_SIZE bytes total_data = total_data[CHUNK_SIZE:] # Remove played data # Play remaining audio play_audio(total_data, audio_stream) audio_stream.stop_stream() audio_stream.close() pyaudio_instance.terminate() def play_audio(data: bytes, stream: pyaudio.Stream) -> None: stream.write(AudioSegment.from_mp3(BytesIO(data)).raw_data) if __name__ == "__main__": main()