From ebbe8f298985fe9448e8ed9819cfe1a93625139d Mon Sep 17 00:00:00 2001 From: brige Date: Wed, 6 Nov 2024 20:31:23 +0800 Subject: [PATCH] modify stop sync --- asr/sherpa_ncnn_asr.py | 2 +- human/audio_inference_handler.py | 23 +++++++++++++++++++++-- human/audio_mal_handler.py | 10 ++++++++-- human/human_render.py | 23 ++++++++++++++++++++--- nlp/nlp_base.py | 2 +- nlp/nlp_doubao.py | 6 ++++-- tts/tts_audio_handle.py | 19 ++++++++++++++++--- utils/async_task_queue.py | 3 ++- 8 files changed, 73 insertions(+), 15 deletions(-) diff --git a/asr/sherpa_ncnn_asr.py b/asr/sherpa_ncnn_asr.py index 8abda87..eca2459 100644 --- a/asr/sherpa_ncnn_asr.py +++ b/asr/sherpa_ncnn_asr.py @@ -65,7 +65,7 @@ class SherpaNcnnAsr(AsrBase): self._notify_complete('介绍中国5000年历史文学') logger.info(f'_recognize_loop111') segment_id += 1 - time.sleep(5) + time.sleep(15) logger.info(f'_recognize_loop222') logger.info(f'_recognize_loop exit') ''' diff --git a/human/audio_inference_handler.py b/human/audio_inference_handler.py index f867614..0a5acbe 100644 --- a/human/audio_inference_handler.py +++ b/human/audio_inference_handler.py @@ -9,6 +9,7 @@ from threading import Event, Thread import numpy as np import torch +from eventbus import EventBus from human_handler import AudioHandler from utils import load_model, mirror_index, get_device, SyncQueue @@ -20,16 +21,28 @@ class AudioInferenceHandler(AudioHandler): def __init__(self, context, handler): super().__init__(context, handler) + EventBus().register('stop', self._on_stop) self._mal_queue = SyncQueue(1, "AudioInferenceHandler_Mel") self._audio_queue = SyncQueue(context.batch_size * 2, "AudioInferenceHandler_Audio") + self._is_running = True self._exit_event = Event() self._run_thread = Thread(target=self.__on_run, name="AudioInferenceHandlerThread") self._exit_event.set() self._run_thread.start() + logger.info("AudioInferenceHandler init") + def __del__(self): + EventBus().unregister('stop', self._on_stop) + + def _on_stop(self, *args, **kwargs): + self.stop() + def on_handle(self, stream, type_): + if not self._is_running: + return + if type_ == 1: self._mal_queue.put(stream) elif type_ == 0: @@ -56,7 +69,7 @@ class AudioInferenceHandler(AudioHandler): device = get_device() logger.info(f'use device:{device}') - while True: + while self._is_running: if self._exit_event.is_set(): start_time = time.perf_counter() batch_size = self._context.batch_size @@ -76,6 +89,10 @@ class AudioInferenceHandler(AudioHandler): audio_frames.append((frame, type_)) if type_ == 0: is_all_silence = False + + if not self._is_running: + return + if is_all_silence: for i in range(batch_size): self.on_next_handle((None, mirror_index(length, index), audio_frames[i * 2:i * 2 + 2]), @@ -129,8 +146,10 @@ class AudioInferenceHandler(AudioHandler): logger.info('AudioInferenceHandler inference processor stop') def stop(self): + logger.info('AudioInferenceHandler stop') + self._is_running = False self._exit_event.clear() - self._run_thread.join() + # self._run_thread.join() def pause_talk(self): print('AudioInferenceHandler pause_talk', self._audio_queue.size(), self._mal_queue.size()) diff --git a/human/audio_mal_handler.py b/human/audio_mal_handler.py index a880b55..4637255 100644 --- a/human/audio_mal_handler.py +++ b/human/audio_mal_handler.py @@ -28,6 +28,7 @@ class AudioMalHandler(AudioHandler): self.frames = [] self.chunk = context.sample_rate // context.fps + self._is_running = True logger.info("AudioMalHandler init") def __del__(self): @@ -45,7 +46,7 @@ class AudioMalHandler(AudioHandler): def _on_run(self): logging.info('chunk2mal run') - while self._exit_event.is_set(): + while self._exit_event.is_set() and self._is_running: self._run_step() time.sleep(0.02) @@ -58,6 +59,9 @@ class AudioMalHandler(AudioHandler): self.frames.append(frame) self.on_next_handle((frame, _type), 0) count = count + 1 + + if self._is_running is False: + return # context not enough, do not run network. if len(self.frames) <= self._context.stride_left_size + self._context.stride_right_size: return @@ -73,7 +77,8 @@ class AudioMalHandler(AudioHandler): mel_step_size = 16 i = 0 mel_chunks = [] - while i < (len(self.frames) - self._context.stride_left_size - self._context.stride_right_size) / 2: + while i < (len(self.frames) - self._context.stride_left_size - self._context.stride_right_size) / 2\ + and self._is_running: start_idx = int(left + i * mel_idx_multiplier) # print(start_idx) if start_idx + mel_step_size > len(mel[0]): @@ -99,6 +104,7 @@ class AudioMalHandler(AudioHandler): def stop(self): logging.info('stop') + self._is_running = False if self._exit_event is None: return diff --git a/human/human_render.py b/human/human_render.py index f0afe8f..eef3f1c 100644 --- a/human/human_render.py +++ b/human/human_render.py @@ -5,6 +5,7 @@ import time from queue import Empty from threading import Event, Thread +from eventbus import EventBus from human.message_type import MessageType from human_handler import AudioHandler from render import VoiceRender, VideoRender, PlayClock @@ -17,9 +18,11 @@ class HumanRender(AudioHandler): def __init__(self, context, handler): super().__init__(context, handler) + EventBus().register('stop', self._on_stop) play_clock = PlayClock() self._voice_render = VoiceRender(play_clock, context) self._video_render = VideoRender(play_clock, context, self) + self._is_running = True self._queue = SyncQueue(context.batch_size, "HumanRender_queue") self._exit_event = Event() self._thread = Thread(target=self._on_run, name="AudioMalHandlerThread") @@ -28,24 +31,37 @@ class HumanRender(AudioHandler): self._image_render = None self._last_audio_ps = 0 self._last_video_ps = 0 + self._empty_log = True + + def __del__(self): + EventBus().unregister('stop', self._on_stop) + + def _on_stop(self, *args, **kwargs): + self.stop() def _on_run(self): logging.info('human render run') - while self._exit_event.is_set(): + while self._exit_event.is_set() and self._is_running: self._run_step() time.sleep(0.038) logging.info('human render exit') def _run_step(self): + try: - value = self._queue.get() + value = self._queue.get(timeout=.005) if value is None: return res_frame, idx, audio_frames = value + if not self._empty_log: + self._empty_log = True + logging.info('render render:') # print('voice render queue size', self._queue.size()) except Empty: - print('render queue.Empty:') + if self._empty_log: + self._empty_log = False + logging.info('render queue.Empty:') return type_ = 1 @@ -92,6 +108,7 @@ class HumanRender(AudioHandler): def stop(self): logging.info('hunan render stop') + self._is_running = False if self._exit_event is None: return diff --git a/nlp/nlp_base.py b/nlp/nlp_base.py index 3dbd4a6..f186254 100644 --- a/nlp/nlp_base.py +++ b/nlp/nlp_base.py @@ -53,7 +53,7 @@ class NLPBase(AsrObserver): def ask(self, question): logger.info(f'ask:{question}') self._is_running = True - self._ask_queue.add_task(self._request, question) + task = self._ask_queue.add_task(self._request, question) logger.info(f'ask:{question} completed') def stop(self): diff --git a/nlp/nlp_doubao.py b/nlp/nlp_doubao.py index d1b41da..7bbc637 100644 --- a/nlp/nlp_doubao.py +++ b/nlp/nlp_doubao.py @@ -30,7 +30,6 @@ class DouBao(NLPBase): async def _request(self, question): t = time.time() - logger.info(f'_request:{question}') logger.info(f'-------dou_bao ask:{question}') try: stream = await self.__client.chat.completions.create( @@ -41,6 +40,10 @@ class DouBao(NLPBase): ], stream=True ) + t1 = time.time() + await stream.close() + logger.info(f'-------dou_bao close time:{time.time() - t1:.4f}s') + return sec = '' async for completion in stream: sec = sec + completion.choices[0].delta.content @@ -59,7 +62,6 @@ class DouBao(NLPBase): # self._on_callback(sec) except Exception as e: print(e) - logger.info(f'_request:{question}, time:{time.time() - t:.4f}s') logger.info(f'-------dou_bao nlp time:{time.time() - t:.4f}s') async def _on_close(self): diff --git a/tts/tts_audio_handle.py b/tts/tts_audio_handle.py index badbe7f..baf38ca 100644 --- a/tts/tts_audio_handle.py +++ b/tts/tts_audio_handle.py @@ -4,6 +4,7 @@ import logging import os import shutil +from eventbus import EventBus from utils import save_wav from human_handler import AudioHandler @@ -16,6 +17,14 @@ class TTSAudioHandle(AudioHandler): self._sample_rate = 16000 self._index = -1 + EventBus().register('stop', self._on_stop) + + def __del__(self): + EventBus().unregister('stop', self._on_stop) + + def _on_stop(self, *args, **kwargs): + self.stop() + @property def sample_rate(self): return self._sample_rate @@ -45,9 +54,13 @@ class TTSAudioSplitHandle(TTSAudioHandle): self._chunk = self.sample_rate // self._context.fps self._priority_queue = [] self._current = 0 + self._is_running = True logger.info("TTSAudioSplitHandle init") def on_handle(self, stream, index): + if not self._is_running: + logger.info('TTSAudioSplitHandle::on_handle is not running') + return # heapq.heappush(self._priority_queue, (index, stream)) if stream is None: heapq.heappush(self._priority_queue, (index, None)) @@ -55,7 +68,7 @@ class TTSAudioSplitHandle(TTSAudioHandle): stream_len = stream.shape[0] idx = 0 chunks = [] - while stream_len >= self._chunk: + while stream_len >= self._chunk and self._is_running: # self.on_next_handle(stream[idx:idx + self._chunk], 0) chunks.append(stream[idx:idx + self._chunk]) stream_len -= self._chunk @@ -63,7 +76,7 @@ class TTSAudioSplitHandle(TTSAudioHandle): heapq.heappush(self._priority_queue, (index, chunks)) current = self._priority_queue[0][0] print('TTSAudioSplitHandle::on_handle', index, current, self._current) - if current == self._current: + if current == self._current and self._is_running: self._current = self._current + 1 chunks = heapq.heappop(self._priority_queue)[1] if chunks is None: @@ -73,7 +86,7 @@ class TTSAudioSplitHandle(TTSAudioHandle): self.on_next_handle(chunk, 0) def stop(self): - pass + self._is_running = False class TTSAudioSaveHandle(TTSAudioHandle): diff --git a/utils/async_task_queue.py b/utils/async_task_queue.py index fc03d4e..e3d395a 100644 --- a/utils/async_task_queue.py +++ b/utils/async_task_queue.py @@ -27,6 +27,7 @@ class AsyncTaskQueue: print('_worker') while True: task = await self._queue.get() + print(f"Get task size: {self._queue.qsize()}") if task is None: # None as a stop signal break @@ -42,7 +43,7 @@ class AsyncTaskQueue: self.__loop.stop() def add_task(self, func, *args): - self.__loop.call_soon_threadsafe(self._queue.put_nowait, (func, *args)) + return self.__loop.call_soon_threadsafe(self._queue.put_nowait, (func, *args)) def stop_workers(self): for _ in range(self._worker_num):