diff --git a/asr/sherpa_ncnn_asr.py b/asr/sherpa_ncnn_asr.py index 2e5c257..c47d184 100644 --- a/asr/sherpa_ncnn_asr.py +++ b/asr/sherpa_ncnn_asr.py @@ -58,7 +58,7 @@ class SherpaNcnnAsr(AsrBase): def _recognize_loop(self): segment_id = 0 - time.sleep(3) + time.sleep(9) last_result = "" logger.info(f'_recognize_loop') print(f'_recognize_loop') diff --git a/utils/async_task_queue.py b/utils/async_task_queue.py index 05c5e93..c796b41 100644 --- a/utils/async_task_queue.py +++ b/utils/async_task_queue.py @@ -1,6 +1,5 @@ #encoding = utf8 -import asyncio import logging from queue import Queue import threading @@ -14,24 +13,14 @@ class AsyncTaskQueue: self._worker_num = work_num self._current_worker_num = work_num self._name = name - self._thread = threading.Thread(target=self._run_loop, name=name) - self._thread.start() - self.__loop = None + self._threads = [] + self._lock = threading.Lock() + for _ in range(work_num): + thread = threading.Thread(target=self._worker, name=f'{name}_worker_{_}') + thread.start() + self._threads.append(thread) - def _run_loop(self): - logging.info(f'{self._name}, _run_loop') - self.__loop = asyncio.new_event_loop() - asyncio.set_event_loop(self.__loop) - self._tasks = [self.__loop.create_task(self._worker()) for _ in range(self._worker_num)] - try: - self.__loop.run_forever() - finally: - logging.info(f'{self._name}, exit run') - self.__loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(self.__loop))) - self.__loop.close() - logging.info(f'{self._name}, close loop') - - async def _worker(self): + def _worker(self): logging.info(f'{self._name}, _worker') while True: try: @@ -43,16 +32,17 @@ class AsyncTaskQueue: if func is None: # None as a stop signal break - await func(*args) # Execute async function + func(*args) # Execute function except Exception as e: logging.error(f'{self._name} error: {repr(e)}') finally: self._queue.task_done() logging.info(f'{self._name}, _worker finish') - self._current_worker_num -= 1 - if self._current_worker_num == 0: - self.__loop.call_soon_threadsafe(self.__loop.stop) + with self._lock: + self._current_worker_num -= 1 + if self._current_worker_num == 0: + self._queue.put(None) # Send stop signal to remaining workers def add_task(self, func, *args): self._queue.put((func, *args)) @@ -62,10 +52,11 @@ class AsyncTaskQueue: self.add_task(None) # Send stop signal def clear(self): - while not self._queue.empty(): - self._queue.get_nowait() - self._queue.task_done() + with self._queue.mutex: + self._queue.queue.clear() def stop(self): self.stop_workers() - self._thread.join() + for thread in self._threads: + thread.join() + \ No newline at end of file