diff --git a/asr/sherpa_ncnn_asr.py b/asr/sherpa_ncnn_asr.py index 2e5c257..1cc81d3 100644 --- a/asr/sherpa_ncnn_asr.py +++ b/asr/sherpa_ncnn_asr.py @@ -68,7 +68,7 @@ class SherpaNcnnAsr(AsrBase): self._notify_complete('介绍中国5000年历史文学') logger.info(f'_recognize_loop111') segment_id += 1 - time.sleep(50) + time.sleep(150) logger.info(f'_recognize_loop222') logger.info(f'_recognize_loop exit') ''' diff --git a/tts/tts_audio_handle.py b/tts/tts_audio_handle.py index ed35898..13ba8e4 100644 --- a/tts/tts_audio_handle.py +++ b/tts/tts_audio_handle.py @@ -3,7 +3,8 @@ import heapq import logging import os import shutil -from threading import Lock +import time +from threading import Lock, Thread from eventbus import EventBus from utils import save_wav @@ -62,8 +63,21 @@ class TTSAudioSplitHandle(TTSAudioHandle): self._lock = Lock() self._current = 0 self._is_running = True + self._thread = Thread(target=self._process_loop) + self._thread.start() logger.info("TTSAudioSplitHandle init") + def _process_loop(self): + while self._is_running: + with self._lock: + if self._priority_queue and self._priority_queue[0][0] == self._current: + self._current += 1 + chunks, txt = heapq.heappop(self._priority_queue)[1] + if chunks is not None: + for chunk in chunks: + self.on_next_handle((chunk, txt), 0) + time.sleep(0.01) # Sleep briefly to prevent busy-waiting + def on_handle(self, stream, index): if not self._is_running: logger.info('TTSAudioSplitHandle::on_handle is not running') @@ -71,13 +85,6 @@ class TTSAudioSplitHandle(TTSAudioHandle): logger.info(f'TTSAudioSplitHandle::on_handle {index}') s, txt = stream - current = 0 - with self._lock: - if len(self._priority_queue) != 0: - current = self._priority_queue[0][0] - if current == 0: - self._current = 0 - if s is None: heapq.heappush(self._priority_queue, (index, None)) else: @@ -85,7 +92,6 @@ class TTSAudioSplitHandle(TTSAudioHandle): idx = 0 chunks = [] while stream_len >= self._chunk and self._is_running: - # self.on_next_handle(stream[idx:idx + self._chunk], 0) chunks.append(s[idx:idx + self._chunk]) stream_len -= self._chunk idx += self._chunk @@ -93,31 +99,14 @@ class TTSAudioSplitHandle(TTSAudioHandle): return heapq.heappush(self._priority_queue, (index, (chunks, txt))) - logger.info(f'TTSAudioSplitHandle::on_handle {index}, {current}, {self._current}, {len(self._priority_queue)}') - if current == self._current: - self._current = self._current + 1 - chunks = heapq.heappop(self._priority_queue)[1] - chunks, txt = chunks - - if chunks is not None: - for chunk in chunks: - # logger.info(f'TTSAudioSplitHandle::on_handle push') - self.on_next_handle((chunk, txt), 0) - # logger.info(f'TTSAudioSplitHandle::on_handle push finish') - def stop(self): self._is_running = False + self._thread.join() def on_clear_cache(self, *args, **kwargs): super().on_clear_cache() - if self._priority_queue is None or len(self._priority_queue) == 0: - return - with self._lock: - print('TTSAudioSplitHandle::on_clear_cache', self._current) self._current = 0 - print('TTSAudioSplitHandle::on_clear_cache', self._current) - self._priority_queue.clear() diff --git a/utils/async_task_queue.py b/utils/async_task_queue.py index c796b41..4a4b726 100644 --- a/utils/async_task_queue.py +++ b/utils/async_task_queue.py @@ -59,4 +59,3 @@ class AsyncTaskQueue: self.stop_workers() for thread in self._threads: thread.join() - \ No newline at end of file