#encoding = utf8 import asyncio import threading class AsyncTaskQueue: def __init__(self, work_num=1): self._queue = asyncio.Queue() self._worker_num = work_num self._current_worker_num = work_num self._condition = threading.Condition() self._thread = threading.Thread(target=self._run_loop) self._thread.start() self.__loop = None def _run_loop(self): print('_run_loop') self.__loop = asyncio.new_event_loop() asyncio.set_event_loop(self.__loop) self._tasks = [self.__loop.create_task(self._worker()) for _ in range(self._worker_num)] self.__loop.run_forever() print("exit run") if not self.__loop.is_closed(): self.__loop.close() async def _worker(self): print('_worker') while True: with self._condition: self._condition.wait_for(lambda: not self._queue.empty()) task = await self._queue.get() func, *args = task # 解包任务 if func is None: # None 作为结束信号 break print(f"Executing task with args: {args}") await func(*args) # 执行异步函数 self._queue.task_done() print('_worker finish') self._current_worker_num = self._current_worker_num - 1 if self._current_worker_num == 0: print('loop stop') self.__loop.stop() def add_task(self, func, *args): with self._condition: self._queue.put_nowait((func, *args)) self._condition.notify() def stop_workers(self): for _ in range(self._worker_num): self.add_task(None) # 发送结束信号 def clear(self): while not self._queue.empty(): self._queue.get() self._queue.task_done() def stop(self): self.stop_workers() self._thread.join()