Compare commits
No commits in common. "c4eb191e1800e2d8c42d27534e1f2b43bdcfcc00" and "740655228962385b7ba157f19398c0383efd6841" have entirely different histories.
c4eb191e18
...
7406552289
@ -17,7 +17,7 @@ class AsrBase:
|
|||||||
|
|
||||||
self._stop_event = threading.Event()
|
self._stop_event = threading.Event()
|
||||||
self._stop_event.set()
|
self._stop_event.set()
|
||||||
self._thread = threading.Thread(target=self._recognize_loop, name="AsrBaseThread")
|
self._thread = threading.Thread(target=self._recognize_loop)
|
||||||
self._thread.start()
|
self._thread.start()
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
@ -34,7 +34,6 @@ class AsrBase:
|
|||||||
observer.process(message)
|
observer.process(message)
|
||||||
|
|
||||||
def _notify_complete(self, message: str):
|
def _notify_complete(self, message: str):
|
||||||
EventBus().post('clear_cache')
|
|
||||||
for observer in self._observers:
|
for observer in self._observers:
|
||||||
observer.completed(message)
|
observer.completed(message)
|
||||||
|
|
||||||
|
@ -63,15 +63,6 @@ class SherpaNcnnAsr(AsrBase):
|
|||||||
logger.info(f'_recognize_loop')
|
logger.info(f'_recognize_loop')
|
||||||
print(f'_recognize_loop')
|
print(f'_recognize_loop')
|
||||||
|
|
||||||
while self._stop_event.is_set():
|
|
||||||
logger.info(f'_recognize_loop000')
|
|
||||||
self._notify_complete('介绍中国5000年历史文学')
|
|
||||||
logger.info(f'_recognize_loop111')
|
|
||||||
segment_id += 1
|
|
||||||
time.sleep(150)
|
|
||||||
logger.info(f'_recognize_loop222')
|
|
||||||
logger.info(f'_recognize_loop exit')
|
|
||||||
'''
|
|
||||||
with sd.InputStream(channels=1, dtype="float32", samplerate=self._sample_rate) as s:
|
with sd.InputStream(channels=1, dtype="float32", samplerate=self._sample_rate) as s:
|
||||||
while self._stop_event.is_set():
|
while self._stop_event.is_set():
|
||||||
samples, _ = s.read(self._samples_per_read) # a blocking read
|
samples, _ = s.read(self._samples_per_read) # a blocking read
|
||||||
@ -93,4 +84,12 @@ class SherpaNcnnAsr(AsrBase):
|
|||||||
segment_id += 1
|
segment_id += 1
|
||||||
self._recognizer.reset()
|
self._recognizer.reset()
|
||||||
'''
|
'''
|
||||||
|
while self._stop_event.is_set():
|
||||||
|
logger.info(f'_recognize_loop000')
|
||||||
|
self._notify_complete('介绍中国5000年历史文学')
|
||||||
|
logger.info(f'_recognize_loop111')
|
||||||
|
segment_id += 1
|
||||||
|
time.sleep(60)
|
||||||
|
logger.info(f'_recognize_loop222')
|
||||||
|
logger.info(f'_recognize_loop exit')
|
||||||
|
'''
|
||||||
|
@ -22,7 +22,6 @@ class AudioInferenceHandler(AudioHandler):
|
|||||||
super().__init__(context, handler)
|
super().__init__(context, handler)
|
||||||
|
|
||||||
EventBus().register('stop', self._on_stop)
|
EventBus().register('stop', self._on_stop)
|
||||||
EventBus().register('clear_cache', self.on_clear_cache)
|
|
||||||
self._mal_queue = SyncQueue(1, "AudioInferenceHandler_Mel")
|
self._mal_queue = SyncQueue(1, "AudioInferenceHandler_Mel")
|
||||||
self._audio_queue = SyncQueue(context.batch_size * 2, "AudioInferenceHandler_Audio")
|
self._audio_queue = SyncQueue(context.batch_size * 2, "AudioInferenceHandler_Audio")
|
||||||
|
|
||||||
@ -36,15 +35,10 @@ class AudioInferenceHandler(AudioHandler):
|
|||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
EventBus().unregister('stop', self._on_stop)
|
EventBus().unregister('stop', self._on_stop)
|
||||||
EventBus().unregister('clear_cache', self.on_clear_cache)
|
|
||||||
|
|
||||||
def _on_stop(self, *args, **kwargs):
|
def _on_stop(self, *args, **kwargs):
|
||||||
self.stop()
|
self.stop()
|
||||||
|
|
||||||
def on_clear_cache(self, *args, **kwargs):
|
|
||||||
self._mal_queue.clear()
|
|
||||||
self._audio_queue.clear()
|
|
||||||
|
|
||||||
def on_handle(self, stream, type_):
|
def on_handle(self, stream, type_):
|
||||||
if not self._is_running:
|
if not self._is_running:
|
||||||
return
|
return
|
||||||
@ -88,11 +82,9 @@ class AudioInferenceHandler(AudioHandler):
|
|||||||
# print('origin mel_batch:', len(mel_batch))
|
# print('origin mel_batch:', len(mel_batch))
|
||||||
is_all_silence = True
|
is_all_silence = True
|
||||||
audio_frames = []
|
audio_frames = []
|
||||||
current_text = ''
|
|
||||||
for _ in range(batch_size * 2):
|
for _ in range(batch_size * 2):
|
||||||
frame, type_ = self._audio_queue.get()
|
frame, type_ = self._audio_queue.get()
|
||||||
# print('AudioInferenceHandler type_', type_)
|
# print('AudioInferenceHandler type_', type_)
|
||||||
current_text = frame[1]
|
|
||||||
audio_frames.append((frame, type_))
|
audio_frames.append((frame, type_))
|
||||||
if type_ == 0:
|
if type_ == 0:
|
||||||
is_all_silence = False
|
is_all_silence = False
|
||||||
@ -109,7 +101,7 @@ class AudioInferenceHandler(AudioHandler):
|
|||||||
0)
|
0)
|
||||||
index = index + 1
|
index = index + 1
|
||||||
else:
|
else:
|
||||||
logger.info(f'infer======= {current_text}')
|
logger.info('infer=======')
|
||||||
t = time.perf_counter()
|
t = time.perf_counter()
|
||||||
img_batch = []
|
img_batch = []
|
||||||
# for i in range(batch_size):
|
# for i in range(batch_size):
|
||||||
|
@ -19,7 +19,6 @@ class AudioMalHandler(AudioHandler):
|
|||||||
super().__init__(context, handler)
|
super().__init__(context, handler)
|
||||||
|
|
||||||
EventBus().register('stop', self._on_stop)
|
EventBus().register('stop', self._on_stop)
|
||||||
EventBus().register('clear_cache', self.on_clear_cache)
|
|
||||||
|
|
||||||
self._is_running = True
|
self._is_running = True
|
||||||
self._queue = SyncQueue(context.batch_size * 2, "AudioMalHandler_queue")
|
self._queue = SyncQueue(context.batch_size * 2, "AudioMalHandler_queue")
|
||||||
@ -35,20 +34,15 @@ class AudioMalHandler(AudioHandler):
|
|||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
EventBus().unregister('stop', self._on_stop)
|
EventBus().unregister('stop', self._on_stop)
|
||||||
EventBus().unregister('clear_cache', self.on_clear_cache)
|
|
||||||
|
|
||||||
def _on_stop(self, *args, **kwargs):
|
def _on_stop(self, *args, **kwargs):
|
||||||
self.stop()
|
self.stop()
|
||||||
|
|
||||||
def on_clear_cache(self, *args, **kwargs):
|
|
||||||
self.frames.clear()
|
|
||||||
self._queue.clear()
|
|
||||||
|
|
||||||
def on_message(self, message):
|
def on_message(self, message):
|
||||||
super().on_message(message)
|
super().on_message(message)
|
||||||
|
|
||||||
def on_handle(self, stream, index):
|
def on_handle(self, stream, index):
|
||||||
# logging.info(f'AudioMalHandler on_handle {index}')
|
# print('AudioMalHandler on_handle', index)
|
||||||
self._queue.put(stream)
|
self._queue.put(stream)
|
||||||
|
|
||||||
def _on_run(self):
|
def _on_run(self):
|
||||||
@ -63,8 +57,7 @@ class AudioMalHandler(AudioHandler):
|
|||||||
count = 0
|
count = 0
|
||||||
for _ in range(self._context.batch_size * 2):
|
for _ in range(self._context.batch_size * 2):
|
||||||
frame, _type = self.get_audio_frame()
|
frame, _type = self.get_audio_frame()
|
||||||
chunk, txt = frame
|
self.frames.append(frame)
|
||||||
self.frames.append(chunk)
|
|
||||||
self.on_next_handle((frame, _type), 0)
|
self.on_next_handle((frame, _type), 0)
|
||||||
count = count + 1
|
count = count + 1
|
||||||
|
|
||||||
@ -104,10 +97,9 @@ class AudioMalHandler(AudioHandler):
|
|||||||
frame = self._queue.get()
|
frame = self._queue.get()
|
||||||
type_ = 0
|
type_ = 0
|
||||||
else:
|
else:
|
||||||
chunk = np.zeros(self.chunk, dtype=np.float32)
|
frame = np.zeros(self.chunk, dtype=np.float32)
|
||||||
frame = (chunk, '')
|
|
||||||
type_ = 1
|
type_ = 1
|
||||||
# logging.info(f'AudioMalHandler get_audio_frame type:{type_}')
|
# print('AudioMalHandler get_audio_frame type:', type_)
|
||||||
return frame, type_
|
return frame, type_
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
@ -19,7 +19,6 @@ class HumanRender(AudioHandler):
|
|||||||
super().__init__(context, handler)
|
super().__init__(context, handler)
|
||||||
|
|
||||||
EventBus().register('stop', self._on_stop)
|
EventBus().register('stop', self._on_stop)
|
||||||
EventBus().register('clear_cache', self.on_clear_cache)
|
|
||||||
play_clock = PlayClock()
|
play_clock = PlayClock()
|
||||||
self._voice_render = VoiceRender(play_clock, context)
|
self._voice_render = VoiceRender(play_clock, context)
|
||||||
self._video_render = VideoRender(play_clock, context, self)
|
self._video_render = VideoRender(play_clock, context, self)
|
||||||
@ -36,21 +35,17 @@ class HumanRender(AudioHandler):
|
|||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
EventBus().unregister('stop', self._on_stop)
|
EventBus().unregister('stop', self._on_stop)
|
||||||
EventBus().unregister('clear_cache', self.on_clear_cache)
|
|
||||||
|
|
||||||
def _on_stop(self, *args, **kwargs):
|
def _on_stop(self, *args, **kwargs):
|
||||||
self.stop()
|
self.stop()
|
||||||
|
|
||||||
def on_clear_cache(self, *args, **kwargs):
|
|
||||||
self._queue.clear()
|
|
||||||
|
|
||||||
def _on_run(self):
|
def _on_run(self):
|
||||||
logging.info('human render run')
|
logging.info('human render run')
|
||||||
while self._exit_event.is_set() and self._is_running:
|
while self._exit_event.is_set() and self._is_running:
|
||||||
# t = time.time()
|
# t = time.time()
|
||||||
self._run_step()
|
self._run_step()
|
||||||
# delay = time.time() - t
|
# delay = time.time() - t
|
||||||
delay = 0.038 # - delay
|
delay = 0.03805 # - delay
|
||||||
# print(delay)
|
# print(delay)
|
||||||
# if delay <= 0.0:
|
# if delay <= 0.0:
|
||||||
# continue
|
# continue
|
||||||
@ -121,4 +116,52 @@ class HumanRender(AudioHandler):
|
|||||||
# self._video_render.stop()
|
# self._video_render.stop()
|
||||||
# self._exit_event.clear()
|
# self._exit_event.clear()
|
||||||
# self._thread.join()
|
# self._thread.join()
|
||||||
|
'''
|
||||||
|
self._exit_event = Event()
|
||||||
|
self._thread = Thread(target=self._on_run)
|
||||||
|
self._exit_event.set()
|
||||||
|
self._thread.start()
|
||||||
|
|
||||||
|
def _on_run(self):
|
||||||
|
logging.info('human render run')
|
||||||
|
while self._exit_event.is_set():
|
||||||
|
self._run_step()
|
||||||
|
time.sleep(0.02)
|
||||||
|
|
||||||
|
logging.info('human render exit')
|
||||||
|
|
||||||
|
def _run_step(self):
|
||||||
|
try:
|
||||||
|
res_frame, idx, audio_frames = self._queue.get(block=True, timeout=.002)
|
||||||
|
except queue.Empty:
|
||||||
|
# print('render queue.Empty:')
|
||||||
|
return None
|
||||||
|
if audio_frames[0][1] != 0 and audio_frames[1][1] != 0:
|
||||||
|
combine_frame = self._context.frame_list_cycle[idx]
|
||||||
|
else:
|
||||||
|
bbox = self._context.coord_list_cycle[idx]
|
||||||
|
combine_frame = copy.deepcopy(self._context.frame_list_cycle[idx])
|
||||||
|
y1, y2, x1, x2 = bbox
|
||||||
|
try:
|
||||||
|
res_frame = cv2.resize(res_frame.astype(np.uint8), (x2 - x1, y2 - y1))
|
||||||
|
except:
|
||||||
|
return
|
||||||
|
# combine_frame = get_image(ori_frame,res_frame,bbox)
|
||||||
|
# t=time.perf_counter()
|
||||||
|
combine_frame[y1:y2, x1:x2] = res_frame
|
||||||
|
|
||||||
|
image = combine_frame
|
||||||
|
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
|
||||||
|
|
||||||
|
if self._image_render is not None:
|
||||||
|
self._image_render.on_render(image)
|
||||||
|
|
||||||
|
for audio_frame in audio_frames:
|
||||||
|
frame, type_ = audio_frame
|
||||||
|
frame = (frame * 32767).astype(np.int16)
|
||||||
|
if self._audio_render is not None:
|
||||||
|
self._audio_render.write(frame.tobytes(), int(frame.shape[0]*2))
|
||||||
|
# new_frame = AudioFrame(format='s16', layout='mono', samples=frame.shape[0])
|
||||||
|
# new_frame.planes[0].update(frame.tobytes())
|
||||||
|
# new_frame.sample_rate = 16000
|
||||||
|
'''
|
||||||
|
@ -17,19 +17,13 @@ class NLPBase(AsrObserver):
|
|||||||
self._is_running = True
|
self._is_running = True
|
||||||
|
|
||||||
EventBus().register('stop', self.on_stop)
|
EventBus().register('stop', self.on_stop)
|
||||||
EventBus().register('clear_cache', self.on_clear_cache)
|
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
EventBus().unregister('stop', self.on_stop)
|
EventBus().unregister('stop', self.on_stop)
|
||||||
EventBus().unregister('clear_cache', self.on_clear_cache)
|
|
||||||
|
|
||||||
def on_stop(self, *args, **kwargs):
|
def on_stop(self, *args, **kwargs):
|
||||||
self.stop()
|
self.stop()
|
||||||
|
|
||||||
def on_clear_cache(self, *args, **kwargs):
|
|
||||||
logger.info('NLPBase clear_cache')
|
|
||||||
self._ask_queue.clear()
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def callback(self):
|
def callback(self):
|
||||||
return self._callback
|
return self._callback
|
||||||
@ -42,10 +36,10 @@ class NLPBase(AsrObserver):
|
|||||||
if self._callback is not None and self._is_running:
|
if self._callback is not None and self._is_running:
|
||||||
self._callback.on_message(txt)
|
self._callback.on_message(txt)
|
||||||
|
|
||||||
def _request(self, question):
|
async def _request(self, question):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def _on_close(self):
|
async def _on_close(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def process(self, message: str):
|
def process(self, message: str):
|
||||||
|
@ -17,13 +17,13 @@ class DouBaoSDK:
|
|||||||
self.__client = AsyncArk(api_key=token)
|
self.__client = AsyncArk(api_key=token)
|
||||||
self._stream = None
|
self._stream = None
|
||||||
|
|
||||||
def request(self, question, handle, callback):
|
async def request(self, question, handle, callback):
|
||||||
if self.__client is None:
|
if self.__client is None:
|
||||||
self.__client = AsyncArk(api_key=self._token)
|
self.__client = AsyncArk(api_key=self._token)
|
||||||
t = time.time()
|
t = time.time()
|
||||||
logger.info(f'-------dou_bao ask:{question}')
|
logger.info(f'-------dou_bao ask:{question}')
|
||||||
try:
|
try:
|
||||||
self._stream = self.__client.chat.completions.create(
|
self._stream = await self.__client.chat.completions.create(
|
||||||
model="ep-20241008152048-fsgzf",
|
model="ep-20241008152048-fsgzf",
|
||||||
messages=[
|
messages=[
|
||||||
{"role": "system", "content": "你是测试客服,是由字节跳动开发的 AI 人工智能助手"},
|
{"role": "system", "content": "你是测试客服,是由字节跳动开发的 AI 人工智能助手"},
|
||||||
@ -33,36 +33,27 @@ class DouBaoSDK:
|
|||||||
)
|
)
|
||||||
|
|
||||||
sec = ''
|
sec = ''
|
||||||
for completion in self._stream:
|
async for completion in self._stream:
|
||||||
sec = sec + completion.choices[0].delta.content
|
sec = sec + completion.choices[0].delta.content
|
||||||
sec, message = handle.handle(sec)
|
sec, message = handle.handle(sec)
|
||||||
if len(message) > 0:
|
if len(message) > 0:
|
||||||
# logger.info(f'-------dou_bao nlp time:{time.time() - t:.4f}s')
|
logger.info(f'-------dou_bao nlp time:{time.time() - t:.4f}s')
|
||||||
callback(message)
|
callback(message)
|
||||||
callback(sec)
|
callback(sec)
|
||||||
# logger.info(f'-------dou_bao nlp time:{time.time() - t:.4f}s')
|
logger.info(f'-------dou_bao nlp time:{time.time() - t:.4f}s')
|
||||||
self._stream.close()
|
await self._stream.close()
|
||||||
self._stream = None
|
self._stream = None
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f'-------dou_bao error:{e}')
|
print(e)
|
||||||
# logger.info(f'-------dou_bao nlp time:{time.time() - t:.4f}s')
|
logger.info(f'-------dou_bao nlp time:{time.time() - t:.4f}s')
|
||||||
|
|
||||||
def close(self):
|
async def close(self):
|
||||||
if self._stream is not None:
|
if self._stream is not None:
|
||||||
self._stream.close()
|
await self._stream.close()
|
||||||
self._stream = None
|
self._stream = None
|
||||||
logger.info('AsyncArk close')
|
logger.info('AsyncArk close')
|
||||||
if self.__client is not None and not self.__client.is_closed():
|
if self.__client is not None and not self.__client.is_closed():
|
||||||
self.__client.close()
|
await self.__client.close()
|
||||||
self.__client = None
|
|
||||||
|
|
||||||
def aclose(self):
|
|
||||||
if self._stream is not None:
|
|
||||||
self._stream.close()
|
|
||||||
self._stream = None
|
|
||||||
logger.info('AsyncArk close')
|
|
||||||
if self.__client is not None and not self.__client.is_closed():
|
|
||||||
self.__client.close()
|
|
||||||
self.__client = None
|
self.__client = None
|
||||||
|
|
||||||
|
|
||||||
@ -88,7 +79,7 @@ class DouBaoHttp:
|
|||||||
response = requests.post(url, headers=headers, json=data, stream=True)
|
response = requests.post(url, headers=headers, json=data, stream=True)
|
||||||
return response
|
return response
|
||||||
|
|
||||||
def request(self, question, handle, callback):
|
async def request(self, question, handle, callback):
|
||||||
t = time.time()
|
t = time.time()
|
||||||
self._requesting = True
|
self._requesting = True
|
||||||
logger.info(f'-------dou_bao ask:{question}')
|
logger.info(f'-------dou_bao ask:{question}')
|
||||||
@ -98,7 +89,7 @@ class DouBaoHttp:
|
|||||||
]
|
]
|
||||||
self._response = self.__request(msg_list)
|
self._response = self.__request(msg_list)
|
||||||
if not self._response.ok:
|
if not self._response.ok:
|
||||||
logger.error(f"请求失败,状态码:{self._response.status_code}")
|
logger.info(f"请求失败,状态码:{self._response.status_code}")
|
||||||
return
|
return
|
||||||
sec = ''
|
sec = ''
|
||||||
for chunk in self._response.iter_lines():
|
for chunk in self._response.iter_lines():
|
||||||
@ -106,35 +97,20 @@ class DouBaoHttp:
|
|||||||
if len(content) < 1:
|
if len(content) < 1:
|
||||||
continue
|
continue
|
||||||
content = content[5:]
|
content = content[5:]
|
||||||
content = content.strip()
|
|
||||||
if content == '[DONE]':
|
|
||||||
break
|
|
||||||
|
|
||||||
try:
|
|
||||||
content = json.loads(content)
|
content = json.loads(content)
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"json解析失败,错误信息:{e, content}")
|
|
||||||
continue
|
|
||||||
sec = sec + content["choices"][0]["delta"]["content"]
|
sec = sec + content["choices"][0]["delta"]["content"]
|
||||||
sec, message = handle.handle(sec)
|
sec, message = handle.handle(sec)
|
||||||
if len(message) > 0:
|
if len(message) > 0:
|
||||||
logger.info(f'-------dou_bao nlp time:{time.time() - t:.4f}s')
|
logger.info(f'-------dou_bao nlp time:{time.time() - t:.4f}s')
|
||||||
callback(message)
|
callback(message)
|
||||||
if len(sec) > 0:
|
|
||||||
callback(sec)
|
callback(sec)
|
||||||
|
|
||||||
self._requesting = False
|
self._requesting = False
|
||||||
logger.info(f'-------dou_bao nlp time:{time.time() - t:.4f}s')
|
logger.info(f'-------dou_bao nlp time:{time.time() - t:.4f}s')
|
||||||
|
|
||||||
def close(self):
|
async def close(self):
|
||||||
if self._response is not None and self._requesting:
|
if self._response is not None and self._requesting:
|
||||||
self._response.close()
|
self._response.close()
|
||||||
|
|
||||||
def aclose(self):
|
|
||||||
if self._response is not None and self._requesting:
|
|
||||||
self._response.close()
|
|
||||||
logger.info('DouBaoHttp close')
|
|
||||||
|
|
||||||
|
|
||||||
class DouBao(NLPBase):
|
class DouBao(NLPBase):
|
||||||
def __init__(self, context, split, callback=None):
|
def __init__(self, context, split, callback=None):
|
||||||
@ -155,16 +131,10 @@ class DouBao(NLPBase):
|
|||||||
self.__token = 'c9635f9e-0f9e-4ca1-ac90-8af25a541b74'
|
self.__token = 'c9635f9e-0f9e-4ca1-ac90-8af25a541b74'
|
||||||
self._dou_bao = DouBaoHttp(self.__token)
|
self._dou_bao = DouBaoHttp(self.__token)
|
||||||
|
|
||||||
def _request(self, question):
|
async def _request(self, question):
|
||||||
self._dou_bao.request(question, self._split_handle, self._on_callback)
|
await self._dou_bao.request(question, self._split_handle, self._on_callback)
|
||||||
|
|
||||||
def _on_close(self):
|
async def _on_close(self):
|
||||||
if self._dou_bao is not None:
|
if self._dou_bao is not None:
|
||||||
self._dou_bao.close()
|
await self._dou_bao.close()
|
||||||
logger.info('AsyncArk close')
|
logger.info('AsyncArk close')
|
||||||
|
|
||||||
def on_clear_cache(self, *args, **kwargs):
|
|
||||||
super().on_clear_cache(*args, **kwargs)
|
|
||||||
if self._dou_bao is not None:
|
|
||||||
self._dou_bao.aclose()
|
|
||||||
logger.info('DouBao clear_cache')
|
|
||||||
|
@ -16,7 +16,6 @@ class VoiceRender(BaseRender):
|
|||||||
def __init__(self, play_clock, context):
|
def __init__(self, play_clock, context):
|
||||||
self._audio_render = AudioRender()
|
self._audio_render = AudioRender()
|
||||||
super().__init__(play_clock, context, 'Voice')
|
super().__init__(play_clock, context, 'Voice')
|
||||||
self._current_text = ''
|
|
||||||
|
|
||||||
def render(self, frame, ps):
|
def render(self, frame, ps):
|
||||||
self._play_clock.update_display_time()
|
self._play_clock.update_display_time()
|
||||||
@ -24,16 +23,12 @@ class VoiceRender(BaseRender):
|
|||||||
|
|
||||||
for audio_frame in frame:
|
for audio_frame in frame:
|
||||||
frame, type_ = audio_frame
|
frame, type_ = audio_frame
|
||||||
chunk, txt = frame
|
frame = (frame * 32767).astype(np.int16)
|
||||||
if txt != self._current_text:
|
|
||||||
self._current_text = txt
|
|
||||||
logging.info(f'VoiceRender: {txt}')
|
|
||||||
chunk = (chunk * 32767).astype(np.int16)
|
|
||||||
|
|
||||||
if self._audio_render is not None:
|
if self._audio_render is not None:
|
||||||
try:
|
try:
|
||||||
chunk_len = int(chunk.shape[0] * 2)
|
chunk_len = int(frame.shape[0] * 2)
|
||||||
# print('audio frame:', frame.shape, chunk_len)
|
# print('audio frame:', frame.shape, chunk_len)
|
||||||
self._audio_render.write(chunk.tobytes(), chunk_len)
|
self._audio_render.write(frame.tobytes(), chunk_len)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f'Error writing audio frame: {e}')
|
logging.error(f'Error writing audio frame: {e}')
|
||||||
|
@ -3,8 +3,6 @@ import heapq
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
import time
|
|
||||||
from threading import Lock, Thread
|
|
||||||
|
|
||||||
from eventbus import EventBus
|
from eventbus import EventBus
|
||||||
from utils import save_wav
|
from utils import save_wav
|
||||||
@ -20,18 +18,13 @@ class TTSAudioHandle(AudioHandler):
|
|||||||
self._index = -1
|
self._index = -1
|
||||||
|
|
||||||
EventBus().register('stop', self._on_stop)
|
EventBus().register('stop', self._on_stop)
|
||||||
EventBus().register('clear_cache', self.on_clear_cache)
|
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
EventBus().unregister('stop', self._on_stop)
|
EventBus().unregister('stop', self._on_stop)
|
||||||
EventBus().unregister('clear_cache', self.on_clear_cache)
|
|
||||||
|
|
||||||
def _on_stop(self, *args, **kwargs):
|
def _on_stop(self, *args, **kwargs):
|
||||||
self.stop()
|
self.stop()
|
||||||
|
|
||||||
def on_clear_cache(self, *args, **kwargs):
|
|
||||||
self._index = -1
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def sample_rate(self):
|
def sample_rate(self):
|
||||||
return self._sample_rate
|
return self._sample_rate
|
||||||
@ -60,54 +53,40 @@ class TTSAudioSplitHandle(TTSAudioHandle):
|
|||||||
self.sample_rate = self._context.sample_rate
|
self.sample_rate = self._context.sample_rate
|
||||||
self._chunk = self.sample_rate // self._context.fps
|
self._chunk = self.sample_rate // self._context.fps
|
||||||
self._priority_queue = []
|
self._priority_queue = []
|
||||||
self._lock = Lock()
|
|
||||||
self._current = 0
|
self._current = 0
|
||||||
self._is_running = True
|
self._is_running = True
|
||||||
self._thread = Thread(target=self._process_loop)
|
|
||||||
self._thread.start()
|
|
||||||
logger.info("TTSAudioSplitHandle init")
|
logger.info("TTSAudioSplitHandle init")
|
||||||
|
|
||||||
def _process_loop(self):
|
|
||||||
while self._is_running:
|
|
||||||
with self._lock:
|
|
||||||
if self._priority_queue and self._priority_queue[0][0] == self._current:
|
|
||||||
self._current += 1
|
|
||||||
chunks, txt = heapq.heappop(self._priority_queue)[1]
|
|
||||||
if chunks is not None:
|
|
||||||
for chunk in chunks:
|
|
||||||
self.on_next_handle((chunk, txt), 0)
|
|
||||||
time.sleep(0.01) # Sleep briefly to prevent busy-waiting
|
|
||||||
|
|
||||||
def on_handle(self, stream, index):
|
def on_handle(self, stream, index):
|
||||||
if not self._is_running:
|
if not self._is_running:
|
||||||
logger.info('TTSAudioSplitHandle::on_handle is not running')
|
logger.info('TTSAudioSplitHandle::on_handle is not running')
|
||||||
return
|
return
|
||||||
|
# heapq.heappush(self._priority_queue, (index, stream))
|
||||||
logger.info(f'TTSAudioSplitHandle::on_handle {index}')
|
if stream is None:
|
||||||
s, txt = stream
|
|
||||||
if s is None:
|
|
||||||
heapq.heappush(self._priority_queue, (index, None))
|
heapq.heappush(self._priority_queue, (index, None))
|
||||||
else:
|
else:
|
||||||
stream_len = s.shape[0]
|
stream_len = stream.shape[0]
|
||||||
idx = 0
|
idx = 0
|
||||||
chunks = []
|
chunks = []
|
||||||
while stream_len >= self._chunk and self._is_running:
|
while stream_len >= self._chunk and self._is_running:
|
||||||
chunks.append(s[idx:idx + self._chunk])
|
# self.on_next_handle(stream[idx:idx + self._chunk], 0)
|
||||||
|
chunks.append(stream[idx:idx + self._chunk])
|
||||||
stream_len -= self._chunk
|
stream_len -= self._chunk
|
||||||
idx += self._chunk
|
idx += self._chunk
|
||||||
if not self._is_running:
|
heapq.heappush(self._priority_queue, (index, chunks))
|
||||||
return
|
current = self._priority_queue[0][0]
|
||||||
heapq.heappush(self._priority_queue, (index, (chunks, txt)))
|
print('TTSAudioSplitHandle::on_handle', index, current, self._current)
|
||||||
|
if current == self._current and self._is_running:
|
||||||
|
self._current = self._current + 1
|
||||||
|
chunks = heapq.heappop(self._priority_queue)[1]
|
||||||
|
if chunks is None:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
for chunk in chunks:
|
||||||
|
self.on_next_handle(chunk, 0)
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self._is_running = False
|
self._is_running = False
|
||||||
self._thread.join()
|
|
||||||
|
|
||||||
def on_clear_cache(self, *args, **kwargs):
|
|
||||||
super().on_clear_cache()
|
|
||||||
with self._lock:
|
|
||||||
self._current = 0
|
|
||||||
self._priority_queue.clear()
|
|
||||||
|
|
||||||
|
|
||||||
class TTSAudioSaveHandle(TTSAudioHandle):
|
class TTSAudioSaveHandle(TTSAudioHandle):
|
||||||
|
@ -16,19 +16,13 @@ class TTSBase(NLPCallback):
|
|||||||
self._message_queue = AsyncTaskQueue('TTSBaseQueue', 5)
|
self._message_queue = AsyncTaskQueue('TTSBaseQueue', 5)
|
||||||
self._is_running = True
|
self._is_running = True
|
||||||
EventBus().register('stop', self.on_stop)
|
EventBus().register('stop', self.on_stop)
|
||||||
EventBus().register('clear_cache', self.on_clear_cache)
|
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
EventBus().unregister('stop', self.on_stop)
|
EventBus().unregister('stop', self.on_stop)
|
||||||
EventBus().unregister('clear_cache', self.on_clear_cache)
|
|
||||||
|
|
||||||
def on_stop(self, *args, **kwargs):
|
def on_stop(self, *args, **kwargs):
|
||||||
self.stop()
|
self.stop()
|
||||||
|
|
||||||
def on_clear_cache(self, *args, **kwargs):
|
|
||||||
logger.info('TTSBase clear_cache')
|
|
||||||
self._message_queue.clear()
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def handle(self):
|
def handle(self):
|
||||||
return self._handle
|
return self._handle
|
||||||
@ -37,39 +31,32 @@ class TTSBase(NLPCallback):
|
|||||||
def handle(self, value):
|
def handle(self, value):
|
||||||
self._handle = value
|
self._handle = value
|
||||||
|
|
||||||
def _request(self, txt: str, index):
|
async def _request(self, txt: str, index):
|
||||||
if not self._is_running:
|
# print('_request:', txt)
|
||||||
logger.info('TTSBase::_request is not running')
|
|
||||||
return
|
|
||||||
|
|
||||||
t = time.time()
|
t = time.time()
|
||||||
stream = self._on_request(txt)
|
stream = await self._on_request(txt)
|
||||||
logger.info(f'-------tts request time:{time.time() - t:.4f}s, txt:{txt}')
|
if stream is None:
|
||||||
if stream is None or self._is_running is False:
|
logger.warn(f'-------stream is None')
|
||||||
logger.warning(f'-------stream is None or is_running {self._is_running}')
|
|
||||||
return
|
return
|
||||||
|
logger.info(f'-------tts time:{time.time() - t:.4f}s, txt:{txt}')
|
||||||
if self._handle is not None and self._is_running:
|
if self._handle is not None and self._is_running:
|
||||||
self._on_handle((stream, txt), index)
|
await self._on_handle(stream, index)
|
||||||
else:
|
else:
|
||||||
logger.info(f'handle is None, running:{self._is_running}')
|
logger.info(f'handle is None, running:{self._is_running}')
|
||||||
logger.info(f'-------tts finish time:{time.time() - t:.4f}s, txt:{txt}')
|
|
||||||
|
|
||||||
def _on_request(self, text: str):
|
async def _on_request(self, text: str):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def _on_handle(self, stream, index):
|
async def _on_handle(self, stream, index):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def _on_close(self):
|
async def _on_close(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def on_message(self, txt: str):
|
def on_message(self, txt: str):
|
||||||
self.message(txt)
|
self.message(txt)
|
||||||
|
|
||||||
def message(self, txt):
|
def message(self, txt):
|
||||||
if not self._is_running:
|
|
||||||
logger.info('TTSBase::message is not running')
|
|
||||||
return
|
|
||||||
txt = txt.strip()
|
txt = txt.strip()
|
||||||
if len(txt) == 0:
|
if len(txt) == 0:
|
||||||
# logger.info(f'message is empty')
|
# logger.info(f'message is empty')
|
||||||
@ -79,7 +66,6 @@ class TTSBase(NLPCallback):
|
|||||||
if self._handle is not None:
|
if self._handle is not None:
|
||||||
index = self._handle.get_index()
|
index = self._handle.get_index()
|
||||||
# print(f'message txt-index:{txt}, index {index}')
|
# print(f'message txt-index:{txt}, index {index}')
|
||||||
logger.info(f'TTSBase::message request:{txt}, index:{index}')
|
|
||||||
self._message_queue.add_task(self._request, txt, index)
|
self._message_queue.add_task(self._request, txt, index)
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
#encoding = utf8
|
#encoding = utf8
|
||||||
import logging
|
import logging
|
||||||
import time
|
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
@ -22,14 +21,13 @@ class TTSEdgeHttp(TTSBase):
|
|||||||
# self._url = 'http://localhost:8082/v1/audio/speech'
|
# self._url = 'http://localhost:8082/v1/audio/speech'
|
||||||
self._url = 'https://tts.mzzsfy.eu.org/v1/audio/speech'
|
self._url = 'https://tts.mzzsfy.eu.org/v1/audio/speech'
|
||||||
logger.info(f"TTSEdge init, {voice}")
|
logger.info(f"TTSEdge init, {voice}")
|
||||||
self._response_list = []
|
|
||||||
|
|
||||||
def _on_async_request(self, data):
|
async def _on_async_request(self, data):
|
||||||
with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
with session.post(self._url, json=data) as response:
|
async with session.post(self._url, json=data) as response:
|
||||||
print('TTSEdgeHttp, _on_request, response:', response)
|
print('TTSEdgeHttp, _on_request, response:', response)
|
||||||
if response.status == 200:
|
if response.status == 200:
|
||||||
stream = BytesIO(response.read())
|
stream = BytesIO(await response.read())
|
||||||
return stream
|
return stream
|
||||||
else:
|
else:
|
||||||
byte_stream = None
|
byte_stream = None
|
||||||
@ -37,14 +35,13 @@ class TTSEdgeHttp(TTSBase):
|
|||||||
|
|
||||||
def _on_sync_request(self, data):
|
def _on_sync_request(self, data):
|
||||||
response = requests.post(self._url, json=data)
|
response = requests.post(self._url, json=data)
|
||||||
self._response_list.append(response)
|
|
||||||
stream = None
|
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
stream = BytesIO(response.content)
|
stream = BytesIO(response.content)
|
||||||
self._response_list.remove(response)
|
|
||||||
return stream
|
return stream
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
def _on_request(self, txt: str):
|
async def _on_request(self, txt: str):
|
||||||
logger.info(f'TTSEdgeHttp, _on_request, txt:{txt}')
|
logger.info(f'TTSEdgeHttp, _on_request, txt:{txt}')
|
||||||
data = {
|
data = {
|
||||||
"model": "tts-1",
|
"model": "tts-1",
|
||||||
@ -57,25 +54,23 @@ class TTSEdgeHttp(TTSBase):
|
|||||||
# return self._on_async_request(data)
|
# return self._on_async_request(data)
|
||||||
return self._on_sync_request(data)
|
return self._on_sync_request(data)
|
||||||
|
|
||||||
def _on_handle(self, stream, index):
|
async def _on_handle(self, stream, index):
|
||||||
st, txt = stream
|
print('-------tts _on_handle')
|
||||||
try:
|
try:
|
||||||
st.seek(0)
|
stream.seek(0)
|
||||||
t = time.time()
|
byte_stream = self.__create_bytes_stream(stream)
|
||||||
byte_stream = self.__create_bytes_stream(st)
|
print('-------tts start push chunk', index)
|
||||||
logger.info(f'-------tts resample time:{time.time() - t:.4f}s, txt:{txt}')
|
self._handle.on_handle(byte_stream, index)
|
||||||
t = time.time()
|
stream.seek(0)
|
||||||
self._handle.on_handle((byte_stream, txt), index)
|
stream.truncate()
|
||||||
logger.info(f'-------tts handle time:{time.time() - t:.4f}s')
|
print('-------tts finish push chunk')
|
||||||
st.seek(0)
|
|
||||||
st.truncate()
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self._handle.on_handle(None, index)
|
self._handle.on_handle(None, index)
|
||||||
st.seek(0)
|
stream.seek(0)
|
||||||
st.truncate()
|
stream.truncate()
|
||||||
logger.error(f'-------tts finish error:{e}')
|
print('-------tts finish error:', e)
|
||||||
st.close()
|
stream.close()
|
||||||
|
|
||||||
def __create_bytes_stream(self, byte_stream):
|
def __create_bytes_stream(self, byte_stream):
|
||||||
stream, sample_rate = sf.read(byte_stream) # [T*sample_rate,] float64
|
stream, sample_rate = sf.read(byte_stream) # [T*sample_rate,] float64
|
||||||
@ -92,13 +87,7 @@ class TTSEdgeHttp(TTSBase):
|
|||||||
|
|
||||||
return stream
|
return stream
|
||||||
|
|
||||||
def _on_close(self):
|
async def _on_close(self):
|
||||||
print('TTSEdge close')
|
print('TTSEdge close')
|
||||||
# if self._byte_stream is not None and not self._byte_stream.closed:
|
# if self._byte_stream is not None and not self._byte_stream.closed:
|
||||||
# self._byte_stream.close()
|
# self._byte_stream.close()
|
||||||
|
|
||||||
def on_clear_cache(self, *args, **kwargs):
|
|
||||||
logger.info('TTSEdgeHttp clear_cache')
|
|
||||||
super().on_clear_cache(*args, **kwargs)
|
|
||||||
for response in self._response_list:
|
|
||||||
response.close()
|
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
#encoding = utf8
|
#encoding = utf8
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
from queue import Queue
|
from queue import Queue
|
||||||
import threading
|
import threading
|
||||||
@ -13,14 +14,24 @@ class AsyncTaskQueue:
|
|||||||
self._worker_num = work_num
|
self._worker_num = work_num
|
||||||
self._current_worker_num = work_num
|
self._current_worker_num = work_num
|
||||||
self._name = name
|
self._name = name
|
||||||
self._threads = []
|
self._thread = threading.Thread(target=self._run_loop, name=name)
|
||||||
self._lock = threading.Lock()
|
self._thread.start()
|
||||||
for _ in range(work_num):
|
self.__loop = None
|
||||||
thread = threading.Thread(target=self._worker, name=f'{name}_worker_{_}')
|
|
||||||
thread.start()
|
|
||||||
self._threads.append(thread)
|
|
||||||
|
|
||||||
def _worker(self):
|
def _run_loop(self):
|
||||||
|
logging.info(f'{self._name}, _run_loop')
|
||||||
|
self.__loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(self.__loop)
|
||||||
|
self._tasks = [self.__loop.create_task(self._worker()) for _ in range(self._worker_num)]
|
||||||
|
try:
|
||||||
|
self.__loop.run_forever()
|
||||||
|
finally:
|
||||||
|
logging.info(f'{self._name}, exit run')
|
||||||
|
self.__loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(self.__loop)))
|
||||||
|
self.__loop.close()
|
||||||
|
logging.info(f'{self._name}, close loop')
|
||||||
|
|
||||||
|
async def _worker(self):
|
||||||
logging.info(f'{self._name}, _worker')
|
logging.info(f'{self._name}, _worker')
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
@ -32,17 +43,16 @@ class AsyncTaskQueue:
|
|||||||
if func is None: # None as a stop signal
|
if func is None: # None as a stop signal
|
||||||
break
|
break
|
||||||
|
|
||||||
func(*args) # Execute function
|
await func(*args) # Execute async function
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f'{self._name} error: {repr(e)}')
|
logging.error(f'{self._name} error: {e}')
|
||||||
finally:
|
finally:
|
||||||
self._queue.task_done()
|
self._queue.task_done()
|
||||||
|
|
||||||
logging.info(f'{self._name}, _worker finish')
|
logging.info(f'{self._name}, _worker finish')
|
||||||
with self._lock:
|
|
||||||
self._current_worker_num -= 1
|
self._current_worker_num -= 1
|
||||||
if self._current_worker_num == 0:
|
if self._current_worker_num == 0:
|
||||||
self._queue.put(None) # Send stop signal to remaining workers
|
self.__loop.call_soon_threadsafe(self.__loop.stop)
|
||||||
|
|
||||||
def add_task(self, func, *args):
|
def add_task(self, func, *args):
|
||||||
self._queue.put((func, *args))
|
self._queue.put((func, *args))
|
||||||
@ -52,10 +62,10 @@ class AsyncTaskQueue:
|
|||||||
self.add_task(None) # Send stop signal
|
self.add_task(None) # Send stop signal
|
||||||
|
|
||||||
def clear(self):
|
def clear(self):
|
||||||
with self._queue.mutex:
|
while not self._queue.empty():
|
||||||
self._queue.queue.clear()
|
self._queue.get_nowait()
|
||||||
|
self._queue.task_done()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self.stop_workers()
|
self.stop_workers()
|
||||||
for thread in self._threads:
|
self._thread.join()
|
||||||
thread.join()
|
|
Loading…
Reference in New Issue
Block a user