94 lines
2.5 KiB
Python
94 lines
2.5 KiB
Python
#encoding = utf8
|
|
import heapq
|
|
import logging
|
|
import os
|
|
import shutil
|
|
|
|
from utils import save_wav
|
|
from human_handler import AudioHandler
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TTSAudioHandle(AudioHandler):
|
|
def __init__(self, context, handler):
|
|
super().__init__(context, handler)
|
|
self._sample_rate = 16000
|
|
self._index = 1
|
|
|
|
@property
|
|
def sample_rate(self):
|
|
return self._sample_rate
|
|
|
|
@sample_rate.setter
|
|
def sample_rate(self, value):
|
|
self._sample_rate = value
|
|
|
|
def get_index(self):
|
|
self._index = self._index + 1
|
|
return self._index
|
|
|
|
def on_handle(self, stream, index):
|
|
pass
|
|
|
|
def stop(self):
|
|
pass
|
|
|
|
|
|
class TTSAudioSplitHandle(TTSAudioHandle):
|
|
def __init__(self, context, handler):
|
|
super().__init__(context, handler)
|
|
self.sample_rate = self._context.sample_rate
|
|
self._chunk = self.sample_rate // self._context.fps
|
|
self._priority_queue = []
|
|
logger.info("TTSAudioSplitHandle init")
|
|
|
|
def on_handle(self, stream, index):
|
|
# heapq.heappush(self._priority_queue, (index, stream))
|
|
if stream is None:
|
|
heapq.heappush(self._priority_queue, (index, None))
|
|
|
|
stream_len = stream.shape[0]
|
|
idx = 0
|
|
|
|
while stream_len >= self._chunk:
|
|
self.on_next_handle(stream[idx:idx + self._chunk], 0)
|
|
stream_len -= self._chunk
|
|
idx += self._chunk
|
|
|
|
def stop(self):
|
|
pass
|
|
|
|
|
|
class TTSAudioSaveHandle(TTSAudioHandle):
|
|
def __init__(self, context, handler):
|
|
super().__init__(context, handler)
|
|
self._save_path_dir = '../temp/audio/'
|
|
self._clean()
|
|
|
|
def _clean(self):
|
|
directory = self._save_path_dir
|
|
if not os.path.exists(directory):
|
|
print(f"The directory {directory} does not exist.")
|
|
return
|
|
|
|
for filename in os.listdir(directory):
|
|
file_path = os.path.join(directory, filename)
|
|
|
|
# 如果是文件,删除
|
|
if os.path.isfile(file_path):
|
|
os.remove(file_path)
|
|
print(f"Deleted file: {file_path}")
|
|
# 如果是文件夹,递归删除所有文件夹中的内容
|
|
elif os.path.isdir(file_path):
|
|
shutil.rmtree(file_path)
|
|
print(f"Deleted directory and its contents: {file_path}")
|
|
|
|
def on_handle(self, stream, index):
|
|
file_name = self._save_path_dir + str(index) + '.wav'
|
|
save_wav(stream, file_name, self.sample_rate)
|
|
|
|
def stop(self):
|
|
pass
|
|
|