#encoding = utf8 import asyncio import logging from queue import Queue import threading logger = logging.getLogger(__name__) class AsyncTaskQueue: def __init__(self, name, work_num=1): self._queue = Queue() self._worker_num = work_num self._current_worker_num = work_num self._name = name self._thread = threading.Thread(target=self._run_loop, name=name) self._thread.start() self.__loop = None def _run_loop(self): logging.info(f'{self._name}, _run_loop') self.__loop = asyncio.new_event_loop() asyncio.set_event_loop(self.__loop) self._tasks = [self.__loop.create_task(self._worker()) for _ in range(self._worker_num)] try: self.__loop.run_forever() finally: logging.info(f'{self._name}, exit run') self.__loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(self.__loop))) self.__loop.close() logging.info(f'{self._name}, close loop') async def _worker(self): logging.info(f'{self._name}, _worker') while True: try: task = self._queue.get() if task is None: # None as a stop signal break func, *args = task # Unpack task if func is None: # None as a stop signal break await func(*args) # Execute async function except Exception as e: logging.error(f'{self._name} error:', e) finally: self._queue.task_done() logging.info(f'{self._name}, _worker finish') self._current_worker_num -= 1 if self._current_worker_num == 0: self.__loop.call_soon_threadsafe(self.__loop.stop) def add_task(self, func, *args): self._queue.put((func, *args)) def stop_workers(self): for _ in range(self._worker_num): self.add_task(None) # Send stop signal def clear(self): while not self._queue.empty(): self._queue.get_nowait() self._queue.task_done() def stop(self): self.stop_workers() self._thread.join()