#encoding = utf8 import logging from queue import Queue import threading logger = logging.getLogger(__name__) class AsyncTaskQueue: def __init__(self, name, work_num=1): self._queue = Queue() self._worker_num = work_num self._current_worker_num = work_num self._name = name self._threads = [] self._lock = threading.Lock() for _ in range(work_num): thread = threading.Thread(target=self._worker, name=f'{name}_worker_{_}') thread.start() self._threads.append(thread) def _worker(self): logging.info(f'{self._name}, _worker') while True: try: task = self._queue.get() if task is None: # None as a stop signal break func, *args = task # Unpack task if func is None: # None as a stop signal break func(*args) # Execute function except Exception as e: logging.error(f'{self._name} error: {repr(e)}') finally: self._queue.task_done() logging.info(f'{self._name}, _worker finish') with self._lock: self._current_worker_num -= 1 if self._current_worker_num == 0: self._queue.put(None) # Send stop signal to remaining workers def add_task(self, func, *args): self._queue.put((func, *args)) def stop_workers(self): for _ in range(self._worker_num): self.add_task(None) # Send stop signal def clear(self): with self._queue.mutex: self._queue.queue.clear() def stop(self): self.stop_workers() for thread in self._threads: thread.join()