#encoding = utf8 import asyncio import threading class AsyncTaskQueue: def __init__(self, work_num=1): self._queue = asyncio.Queue() self._worker_num = work_num self._current_worker_num = work_num self._thread = threading.Thread(target=self._run_loop) self._thread.start() self.__loop = None def _run_loop(self): print('_run_loop') self.__loop = asyncio.new_event_loop() asyncio.set_event_loop(self.__loop) self._tasks = [self.__loop.create_task(self._worker()) for _ in range(self._worker_num)] self.__loop.run_forever() print("exit run") if not self.__loop.is_closed(): self.__loop.close() async def _worker(self): print('_worker') while True: task = await self._queue.get() if task is None: # None as a stop signal break func, *args = task # Unpack task print(f"Executing task with args: {args}") await func(*args) # Execute async function self._queue.task_done() print('_worker finish') self._current_worker_num -= 1 if self._current_worker_num == 0: print('loop stop') self.__loop.stop() def add_task(self, func, *args): self.__loop.call_soon_threadsafe(self._queue.put_nowait, (func, *args)) def stop_workers(self): for _ in range(self._worker_num): self.add_task(None) # Send stop signal def clear(self): while not self._queue.empty(): self._queue.get_nowait() self._queue.task_done() def stop(self): self.stop_workers() self._thread.join()