diff --git a/asr/asr_base.py b/asr/asr_base.py index 73dcc9e..4eadd41 100644 --- a/asr/asr_base.py +++ b/asr/asr_base.py @@ -17,7 +17,7 @@ class AsrBase: self._stop_event = threading.Event() self._stop_event.set() - self._thread = threading.Thread(target=self._recognize_loop) + self._thread = threading.Thread(target=self._recognize_loop, name="AsrBaseThread") self._thread.start() def __del__(self): diff --git a/asr/sherpa_ncnn_asr.py b/asr/sherpa_ncnn_asr.py index 629f90e..cefe451 100644 --- a/asr/sherpa_ncnn_asr.py +++ b/asr/sherpa_ncnn_asr.py @@ -63,6 +63,15 @@ class SherpaNcnnAsr(AsrBase): logger.info(f'_recognize_loop') print(f'_recognize_loop') + while self._stop_event.is_set(): + logger.info(f'_recognize_loop000') + self._notify_complete('介绍中国5000年历史文学') + logger.info(f'_recognize_loop111') + segment_id += 1 + time.sleep(20) + logger.info(f'_recognize_loop222') + logger.info(f'_recognize_loop exit') +''' with sd.InputStream(channels=1, dtype="float32", samplerate=self._sample_rate) as s: while self._stop_event.is_set(): samples, _ = s.read(self._samples_per_read) # a blocking read @@ -84,12 +93,4 @@ class SherpaNcnnAsr(AsrBase): segment_id += 1 self._recognizer.reset() ''' - while self._stop_event.is_set(): - logger.info(f'_recognize_loop000') - self._notify_complete('介绍中国5000年历史文学') - logger.info(f'_recognize_loop111') - segment_id += 1 - time.sleep(60) - logger.info(f'_recognize_loop222') - logger.info(f'_recognize_loop exit') -''' + diff --git a/nlp/nlp_doubao.py b/nlp/nlp_doubao.py index 5402f62..1a1af84 100644 --- a/nlp/nlp_doubao.py +++ b/nlp/nlp_doubao.py @@ -56,6 +56,15 @@ class DouBaoSDK: await self.__client.close() self.__client = None + def aclose(self): + if self._stream is not None: + self._stream.close() + self._stream = None + logger.info('AsyncArk close') + if self.__client is not None and not self.__client.is_closed(): + self.__client.close() + self.__client = None + class DouBaoHttp: def __init__(self, token): @@ -111,6 +120,11 @@ class DouBaoHttp: if self._response is not None and self._requesting: self._response.close() + def aclose(self): + if self._response is not None and self._requesting: + self._response.close() + logger.info('DouBaoHttp close') + class DouBao(NLPBase): def __init__(self, context, split, callback=None): @@ -138,3 +152,9 @@ class DouBao(NLPBase): if self._dou_bao is not None: await self._dou_bao.close() logger.info('AsyncArk close') + + def on_clear_cache(self, *args, **kwargs): + super().on_clear_cache(*args, **kwargs) + if self._dou_bao is not None: + self._dou_bao.aclose() + logger.info('DouBao clear_cache') diff --git a/tts/tts_audio_handle.py b/tts/tts_audio_handle.py index baf38ca..25d130a 100644 --- a/tts/tts_audio_handle.py +++ b/tts/tts_audio_handle.py @@ -18,13 +18,18 @@ class TTSAudioHandle(AudioHandler): self._index = -1 EventBus().register('stop', self._on_stop) + EventBus().register('clear_cache', self.on_clear_cache) def __del__(self): EventBus().unregister('stop', self._on_stop) + EventBus().unregister('clear_cache', self.on_clear_cache) def _on_stop(self, *args, **kwargs): self.stop() + def on_clear_cache(self, *args, **kwargs): + pass + @property def sample_rate(self): return self._sample_rate @@ -73,10 +78,12 @@ class TTSAudioSplitHandle(TTSAudioHandle): chunks.append(stream[idx:idx + self._chunk]) stream_len -= self._chunk idx += self._chunk + if not self._is_running: + return heapq.heappush(self._priority_queue, (index, chunks)) current = self._priority_queue[0][0] - print('TTSAudioSplitHandle::on_handle', index, current, self._current) - if current == self._current and self._is_running: + print('TTSAudioSplitHandle::on_handle', index, current, self._current, len(self._priority_queue)) + if current == self._current: self._current = self._current + 1 chunks = heapq.heappop(self._priority_queue)[1] if chunks is None: @@ -88,6 +95,13 @@ class TTSAudioSplitHandle(TTSAudioHandle): def stop(self): self._is_running = False + def on_clear_cache(self, *args, **kwargs): + if self._priority_queue is None or len(self._priority_queue) == 0: + return + current = self._priority_queue[0][0] + self._priority_queue.clear() + self._current = current + class TTSAudioSaveHandle(TTSAudioHandle): def __init__(self, context, handler): diff --git a/tts/tts_base.py b/tts/tts_base.py index 5ef0775..d6e21a3 100644 --- a/tts/tts_base.py +++ b/tts/tts_base.py @@ -39,10 +39,14 @@ class TTSBase(NLPCallback): async def _request(self, txt: str, index): # print('_request:', txt) + if not self._is_running: + logger.info('TTSBase::_request is not running') + return + t = time.time() stream = await self._on_request(txt) - if stream is None: - logger.warn(f'-------stream is None') + if stream is None or self._is_running is False: + logger.warn(f'-------stream is None or is_running {self._is_running}') return logger.info(f'-------tts time:{time.time() - t:.4f}s, txt:{txt}') if self._handle is not None and self._is_running: @@ -63,6 +67,9 @@ class TTSBase(NLPCallback): self.message(txt) def message(self, txt): + if not self._is_running: + logger.info('TTSBase::message is not running') + return txt = txt.strip() if len(txt) == 0: # logger.info(f'message is empty')