#encoding = utf8 import heapq import logging import os import shutil from utils import save_wav from human_handler import AudioHandler logger = logging.getLogger(__name__) class TTSAudioHandle(AudioHandler): def __init__(self, context, handler): super().__init__(context, handler) self._sample_rate = 16000 self._index = -1 @property def sample_rate(self): return self._sample_rate @sample_rate.setter def sample_rate(self, value): self._sample_rate = value def get_index(self): self._index = self._index + 1 return self._index def on_handle(self, stream, index): pass def stop(self): pass def pause_talk(self): pass class TTSAudioSplitHandle(TTSAudioHandle): def __init__(self, context, handler): super().__init__(context, handler) self.sample_rate = self._context.sample_rate self._chunk = self.sample_rate // self._context.fps self._priority_queue = [] self._current = 0 logger.info("TTSAudioSplitHandle init") def on_handle(self, stream, index): # heapq.heappush(self._priority_queue, (index, stream)) if stream is None: heapq.heappush(self._priority_queue, (index, None)) else: stream_len = stream.shape[0] idx = 0 chunks = [] while stream_len >= self._chunk: # self.on_next_handle(stream[idx:idx + self._chunk], 0) chunks.append(stream[idx:idx + self._chunk]) stream_len -= self._chunk idx += self._chunk heapq.heappush(self._priority_queue, (index, chunks)) current = self._priority_queue[0][0] print('TTSAudioSplitHandle::on_handle', index, current, self._current) if current == self._current: self._current = self._current + 1 chunks = heapq.heappop(self._priority_queue)[1] if chunks is None: pass else: for chunk in chunks: self.on_next_handle(chunk, 0) def stop(self): pass class TTSAudioSaveHandle(TTSAudioHandle): def __init__(self, context, handler): super().__init__(context, handler) self._save_path_dir = '../temp/audio/' self._clean() def _clean(self): directory = self._save_path_dir if not os.path.exists(directory): print(f"The directory {directory} does not exist.") return for filename in os.listdir(directory): file_path = os.path.join(directory, filename) # 如果是文件,删除 if os.path.isfile(file_path): os.remove(file_path) print(f"Deleted file: {file_path}") # 如果是文件夹,递归删除所有文件夹中的内容 elif os.path.isdir(file_path): shutil.rmtree(file_path) print(f"Deleted directory and its contents: {file_path}") def on_handle(self, stream, index): file_name = self._save_path_dir + str(index) + '.wav' save_wav(stream, file_name, self.sample_rate) def stop(self): pass