#encoding = utf8 import asyncio import threading class AsyncTaskQueue: def __init__(self, name, work_num=1): self._queue = asyncio.Queue() self._worker_num = work_num self._current_worker_num = work_num self._name = name self._thread = threading.Thread(target=self._run_loop, name=name) self._thread.start() self.__loop = None def _run_loop(self): print(self._name, '_run_loop') self.__loop = asyncio.new_event_loop() asyncio.set_event_loop(self.__loop) self._tasks = [self.__loop.create_task(self._worker()) for _ in range(self._worker_num)] self.__loop.run_forever() print(self._name, "exit run") if not self.__loop.is_closed(): self.__loop.close() async def _worker(self): print(self._name, '_worker') while True: print(f'{self._name} get queue') task = await self._queue.get() print(f'{self._name} get queue11') print(f"{self._name} Get task size: {self._queue.qsize()}") if task is None: # None as a stop signal break func, *args = task # Unpack task print(f"{self._name}, Executing task with args: {args}") await func(*args) # Execute async function self._queue.task_done() print(self._name, '_worker finish') self._current_worker_num -= 1 if self._current_worker_num == 0: print(self._name, 'loop stop') self.__loop.stop() def add_task(self, func, *args): # return self.__loop.call_soon_threadsafe(self._queue.put_nowait, (func, *args)) self._queue.put_nowait((func, *args)) def stop_workers(self): for _ in range(self._worker_num): self.add_task(None) # Send stop signal def clear(self): while not self._queue.empty(): self._queue.get_nowait() self._queue.task_done() def stop(self): self.stop_workers() self._thread.join()