From 8173d5356ed0f6b98711a2130ddd0f3657b98d66 Mon Sep 17 00:00:00 2001 From: brige Date: Thu, 10 Oct 2024 11:31:09 +0800 Subject: [PATCH] modify async task queue --- utils/async_task_queue.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/utils/async_task_queue.py b/utils/async_task_queue.py index fb7cc83..21216ab 100644 --- a/utils/async_task_queue.py +++ b/utils/async_task_queue.py @@ -9,17 +9,17 @@ class AsyncTaskQueue: self._queue = asyncio.Queue() self._loop = asyncio.new_event_loop() self._thread = threading.Thread(target=self._run_loop) - self.worker_task = None - self.loop_running = threading.Event() + self._worker_task = None + self._loop_running = threading.Event() self._thread.start() def _run_loop(self): asyncio.set_event_loop(self._loop) - self.loop_running.set() # 设置事件,表明事件循环正在运行 - self._loop.run_forever() # 启动事件循环 + self._loop_running.set() + self._loop.run_forever() async def _worker(self): - while True: + while self._loop_running.is_set(): task = await self._queue.get() if task is None: break @@ -30,13 +30,14 @@ class AsyncTaskQueue: asyncio.run_coroutine_threadsafe(self._queue.put(coro), self._loop) def start_worker(self): - if not self.worker_task: - self.worker_task = asyncio.run_coroutine_threadsafe(self._worker(), self._loop) + if not self._worker_task: + self._worker_task = asyncio.run_coroutine_threadsafe(self._worker(), self._loop) def stop(self): + self._loop_running.clear() asyncio.run_coroutine_threadsafe(self._queue.put(None), self._loop).result() - if self.worker_task: - self.worker_task.result() + if self._worker_task: + self._worker_task.result() self._loop.call_soon_threadsafe(self._loop.stop) - self._thread.join() # 等待线程结束 - self._loop.close() # 关闭事件循环 + self._thread.join() + self._loop.close()