modify stop sync

This commit is contained in:
brige 2024-11-06 20:31:23 +08:00
parent 2f180f0f65
commit ebbe8f2989
8 changed files with 73 additions and 15 deletions

View File

@ -65,7 +65,7 @@ class SherpaNcnnAsr(AsrBase):
self._notify_complete('介绍中国5000年历史文学') self._notify_complete('介绍中国5000年历史文学')
logger.info(f'_recognize_loop111') logger.info(f'_recognize_loop111')
segment_id += 1 segment_id += 1
time.sleep(5) time.sleep(15)
logger.info(f'_recognize_loop222') logger.info(f'_recognize_loop222')
logger.info(f'_recognize_loop exit') logger.info(f'_recognize_loop exit')
''' '''

View File

@ -9,6 +9,7 @@ from threading import Event, Thread
import numpy as np import numpy as np
import torch import torch
from eventbus import EventBus
from human_handler import AudioHandler from human_handler import AudioHandler
from utils import load_model, mirror_index, get_device, SyncQueue from utils import load_model, mirror_index, get_device, SyncQueue
@ -20,16 +21,28 @@ class AudioInferenceHandler(AudioHandler):
def __init__(self, context, handler): def __init__(self, context, handler):
super().__init__(context, handler) super().__init__(context, handler)
EventBus().register('stop', self._on_stop)
self._mal_queue = SyncQueue(1, "AudioInferenceHandler_Mel") self._mal_queue = SyncQueue(1, "AudioInferenceHandler_Mel")
self._audio_queue = SyncQueue(context.batch_size * 2, "AudioInferenceHandler_Audio") self._audio_queue = SyncQueue(context.batch_size * 2, "AudioInferenceHandler_Audio")
self._is_running = True
self._exit_event = Event() self._exit_event = Event()
self._run_thread = Thread(target=self.__on_run, name="AudioInferenceHandlerThread") self._run_thread = Thread(target=self.__on_run, name="AudioInferenceHandlerThread")
self._exit_event.set() self._exit_event.set()
self._run_thread.start() self._run_thread.start()
logger.info("AudioInferenceHandler init") logger.info("AudioInferenceHandler init")
def __del__(self):
EventBus().unregister('stop', self._on_stop)
def _on_stop(self, *args, **kwargs):
self.stop()
def on_handle(self, stream, type_): def on_handle(self, stream, type_):
if not self._is_running:
return
if type_ == 1: if type_ == 1:
self._mal_queue.put(stream) self._mal_queue.put(stream)
elif type_ == 0: elif type_ == 0:
@ -56,7 +69,7 @@ class AudioInferenceHandler(AudioHandler):
device = get_device() device = get_device()
logger.info(f'use device:{device}') logger.info(f'use device:{device}')
while True: while self._is_running:
if self._exit_event.is_set(): if self._exit_event.is_set():
start_time = time.perf_counter() start_time = time.perf_counter()
batch_size = self._context.batch_size batch_size = self._context.batch_size
@ -76,6 +89,10 @@ class AudioInferenceHandler(AudioHandler):
audio_frames.append((frame, type_)) audio_frames.append((frame, type_))
if type_ == 0: if type_ == 0:
is_all_silence = False is_all_silence = False
if not self._is_running:
return
if is_all_silence: if is_all_silence:
for i in range(batch_size): for i in range(batch_size):
self.on_next_handle((None, mirror_index(length, index), audio_frames[i * 2:i * 2 + 2]), self.on_next_handle((None, mirror_index(length, index), audio_frames[i * 2:i * 2 + 2]),
@ -129,8 +146,10 @@ class AudioInferenceHandler(AudioHandler):
logger.info('AudioInferenceHandler inference processor stop') logger.info('AudioInferenceHandler inference processor stop')
def stop(self): def stop(self):
logger.info('AudioInferenceHandler stop')
self._is_running = False
self._exit_event.clear() self._exit_event.clear()
self._run_thread.join() # self._run_thread.join()
def pause_talk(self): def pause_talk(self):
print('AudioInferenceHandler pause_talk', self._audio_queue.size(), self._mal_queue.size()) print('AudioInferenceHandler pause_talk', self._audio_queue.size(), self._mal_queue.size())

View File

@ -28,6 +28,7 @@ class AudioMalHandler(AudioHandler):
self.frames = [] self.frames = []
self.chunk = context.sample_rate // context.fps self.chunk = context.sample_rate // context.fps
self._is_running = True
logger.info("AudioMalHandler init") logger.info("AudioMalHandler init")
def __del__(self): def __del__(self):
@ -45,7 +46,7 @@ class AudioMalHandler(AudioHandler):
def _on_run(self): def _on_run(self):
logging.info('chunk2mal run') logging.info('chunk2mal run')
while self._exit_event.is_set(): while self._exit_event.is_set() and self._is_running:
self._run_step() self._run_step()
time.sleep(0.02) time.sleep(0.02)
@ -58,6 +59,9 @@ class AudioMalHandler(AudioHandler):
self.frames.append(frame) self.frames.append(frame)
self.on_next_handle((frame, _type), 0) self.on_next_handle((frame, _type), 0)
count = count + 1 count = count + 1
if self._is_running is False:
return
# context not enough, do not run network. # context not enough, do not run network.
if len(self.frames) <= self._context.stride_left_size + self._context.stride_right_size: if len(self.frames) <= self._context.stride_left_size + self._context.stride_right_size:
return return
@ -73,7 +77,8 @@ class AudioMalHandler(AudioHandler):
mel_step_size = 16 mel_step_size = 16
i = 0 i = 0
mel_chunks = [] mel_chunks = []
while i < (len(self.frames) - self._context.stride_left_size - self._context.stride_right_size) / 2: while i < (len(self.frames) - self._context.stride_left_size - self._context.stride_right_size) / 2\
and self._is_running:
start_idx = int(left + i * mel_idx_multiplier) start_idx = int(left + i * mel_idx_multiplier)
# print(start_idx) # print(start_idx)
if start_idx + mel_step_size > len(mel[0]): if start_idx + mel_step_size > len(mel[0]):
@ -99,6 +104,7 @@ class AudioMalHandler(AudioHandler):
def stop(self): def stop(self):
logging.info('stop') logging.info('stop')
self._is_running = False
if self._exit_event is None: if self._exit_event is None:
return return

View File

@ -5,6 +5,7 @@ import time
from queue import Empty from queue import Empty
from threading import Event, Thread from threading import Event, Thread
from eventbus import EventBus
from human.message_type import MessageType from human.message_type import MessageType
from human_handler import AudioHandler from human_handler import AudioHandler
from render import VoiceRender, VideoRender, PlayClock from render import VoiceRender, VideoRender, PlayClock
@ -17,9 +18,11 @@ class HumanRender(AudioHandler):
def __init__(self, context, handler): def __init__(self, context, handler):
super().__init__(context, handler) super().__init__(context, handler)
EventBus().register('stop', self._on_stop)
play_clock = PlayClock() play_clock = PlayClock()
self._voice_render = VoiceRender(play_clock, context) self._voice_render = VoiceRender(play_clock, context)
self._video_render = VideoRender(play_clock, context, self) self._video_render = VideoRender(play_clock, context, self)
self._is_running = True
self._queue = SyncQueue(context.batch_size, "HumanRender_queue") self._queue = SyncQueue(context.batch_size, "HumanRender_queue")
self._exit_event = Event() self._exit_event = Event()
self._thread = Thread(target=self._on_run, name="AudioMalHandlerThread") self._thread = Thread(target=self._on_run, name="AudioMalHandlerThread")
@ -28,24 +31,37 @@ class HumanRender(AudioHandler):
self._image_render = None self._image_render = None
self._last_audio_ps = 0 self._last_audio_ps = 0
self._last_video_ps = 0 self._last_video_ps = 0
self._empty_log = True
def __del__(self):
EventBus().unregister('stop', self._on_stop)
def _on_stop(self, *args, **kwargs):
self.stop()
def _on_run(self): def _on_run(self):
logging.info('human render run') logging.info('human render run')
while self._exit_event.is_set(): while self._exit_event.is_set() and self._is_running:
self._run_step() self._run_step()
time.sleep(0.038) time.sleep(0.038)
logging.info('human render exit') logging.info('human render exit')
def _run_step(self): def _run_step(self):
try: try:
value = self._queue.get() value = self._queue.get(timeout=.005)
if value is None: if value is None:
return return
res_frame, idx, audio_frames = value res_frame, idx, audio_frames = value
if not self._empty_log:
self._empty_log = True
logging.info('render render:')
# print('voice render queue size', self._queue.size()) # print('voice render queue size', self._queue.size())
except Empty: except Empty:
print('render queue.Empty:') if self._empty_log:
self._empty_log = False
logging.info('render queue.Empty:')
return return
type_ = 1 type_ = 1
@ -92,6 +108,7 @@ class HumanRender(AudioHandler):
def stop(self): def stop(self):
logging.info('hunan render stop') logging.info('hunan render stop')
self._is_running = False
if self._exit_event is None: if self._exit_event is None:
return return

View File

@ -53,7 +53,7 @@ class NLPBase(AsrObserver):
def ask(self, question): def ask(self, question):
logger.info(f'ask:{question}') logger.info(f'ask:{question}')
self._is_running = True self._is_running = True
self._ask_queue.add_task(self._request, question) task = self._ask_queue.add_task(self._request, question)
logger.info(f'ask:{question} completed') logger.info(f'ask:{question} completed')
def stop(self): def stop(self):

View File

@ -30,7 +30,6 @@ class DouBao(NLPBase):
async def _request(self, question): async def _request(self, question):
t = time.time() t = time.time()
logger.info(f'_request:{question}')
logger.info(f'-------dou_bao ask:{question}') logger.info(f'-------dou_bao ask:{question}')
try: try:
stream = await self.__client.chat.completions.create( stream = await self.__client.chat.completions.create(
@ -41,6 +40,10 @@ class DouBao(NLPBase):
], ],
stream=True stream=True
) )
t1 = time.time()
await stream.close()
logger.info(f'-------dou_bao close time:{time.time() - t1:.4f}s')
return
sec = '' sec = ''
async for completion in stream: async for completion in stream:
sec = sec + completion.choices[0].delta.content sec = sec + completion.choices[0].delta.content
@ -59,7 +62,6 @@ class DouBao(NLPBase):
# self._on_callback(sec) # self._on_callback(sec)
except Exception as e: except Exception as e:
print(e) print(e)
logger.info(f'_request:{question}, time:{time.time() - t:.4f}s')
logger.info(f'-------dou_bao nlp time:{time.time() - t:.4f}s') logger.info(f'-------dou_bao nlp time:{time.time() - t:.4f}s')
async def _on_close(self): async def _on_close(self):

View File

@ -4,6 +4,7 @@ import logging
import os import os
import shutil import shutil
from eventbus import EventBus
from utils import save_wav from utils import save_wav
from human_handler import AudioHandler from human_handler import AudioHandler
@ -16,6 +17,14 @@ class TTSAudioHandle(AudioHandler):
self._sample_rate = 16000 self._sample_rate = 16000
self._index = -1 self._index = -1
EventBus().register('stop', self._on_stop)
def __del__(self):
EventBus().unregister('stop', self._on_stop)
def _on_stop(self, *args, **kwargs):
self.stop()
@property @property
def sample_rate(self): def sample_rate(self):
return self._sample_rate return self._sample_rate
@ -45,9 +54,13 @@ class TTSAudioSplitHandle(TTSAudioHandle):
self._chunk = self.sample_rate // self._context.fps self._chunk = self.sample_rate // self._context.fps
self._priority_queue = [] self._priority_queue = []
self._current = 0 self._current = 0
self._is_running = True
logger.info("TTSAudioSplitHandle init") logger.info("TTSAudioSplitHandle init")
def on_handle(self, stream, index): def on_handle(self, stream, index):
if not self._is_running:
logger.info('TTSAudioSplitHandle::on_handle is not running')
return
# heapq.heappush(self._priority_queue, (index, stream)) # heapq.heappush(self._priority_queue, (index, stream))
if stream is None: if stream is None:
heapq.heappush(self._priority_queue, (index, None)) heapq.heappush(self._priority_queue, (index, None))
@ -55,7 +68,7 @@ class TTSAudioSplitHandle(TTSAudioHandle):
stream_len = stream.shape[0] stream_len = stream.shape[0]
idx = 0 idx = 0
chunks = [] chunks = []
while stream_len >= self._chunk: while stream_len >= self._chunk and self._is_running:
# self.on_next_handle(stream[idx:idx + self._chunk], 0) # self.on_next_handle(stream[idx:idx + self._chunk], 0)
chunks.append(stream[idx:idx + self._chunk]) chunks.append(stream[idx:idx + self._chunk])
stream_len -= self._chunk stream_len -= self._chunk
@ -63,7 +76,7 @@ class TTSAudioSplitHandle(TTSAudioHandle):
heapq.heappush(self._priority_queue, (index, chunks)) heapq.heappush(self._priority_queue, (index, chunks))
current = self._priority_queue[0][0] current = self._priority_queue[0][0]
print('TTSAudioSplitHandle::on_handle', index, current, self._current) print('TTSAudioSplitHandle::on_handle', index, current, self._current)
if current == self._current: if current == self._current and self._is_running:
self._current = self._current + 1 self._current = self._current + 1
chunks = heapq.heappop(self._priority_queue)[1] chunks = heapq.heappop(self._priority_queue)[1]
if chunks is None: if chunks is None:
@ -73,7 +86,7 @@ class TTSAudioSplitHandle(TTSAudioHandle):
self.on_next_handle(chunk, 0) self.on_next_handle(chunk, 0)
def stop(self): def stop(self):
pass self._is_running = False
class TTSAudioSaveHandle(TTSAudioHandle): class TTSAudioSaveHandle(TTSAudioHandle):

View File

@ -27,6 +27,7 @@ class AsyncTaskQueue:
print('_worker') print('_worker')
while True: while True:
task = await self._queue.get() task = await self._queue.get()
print(f"Get task size: {self._queue.qsize()}")
if task is None: # None as a stop signal if task is None: # None as a stop signal
break break
@ -42,7 +43,7 @@ class AsyncTaskQueue:
self.__loop.stop() self.__loop.stop()
def add_task(self, func, *args): def add_task(self, func, *args):
self.__loop.call_soon_threadsafe(self._queue.put_nowait, (func, *args)) return self.__loop.call_soon_threadsafe(self._queue.put_nowait, (func, *args))
def stop_workers(self): def stop_workers(self):
for _ in range(self._worker_num): for _ in range(self._worker_num):