human/utils/async_task_queue.py
2024-11-16 14:05:13 +08:00

62 lines
1.8 KiB
Python

#encoding = utf8
import logging
from queue import Queue
import threading
logger = logging.getLogger(__name__)
class AsyncTaskQueue:
def __init__(self, name, work_num=1):
self._queue = Queue()
self._worker_num = work_num
self._current_worker_num = work_num
self._name = name
self._threads = []
self._lock = threading.Lock()
for _ in range(work_num):
thread = threading.Thread(target=self._worker, name=f'{name}_worker_{_}')
thread.start()
self._threads.append(thread)
def _worker(self):
logging.info(f'{self._name}, _worker')
while True:
try:
task = self._queue.get()
if task is None: # None as a stop signal
break
func, *args = task # Unpack task
if func is None: # None as a stop signal
break
func(*args) # Execute function
except Exception as e:
logging.error(f'{self._name} error: {repr(e)}')
finally:
self._queue.task_done()
logging.info(f'{self._name}, _worker finish')
with self._lock:
self._current_worker_num -= 1
if self._current_worker_num == 0:
self._queue.put(None) # Send stop signal to remaining workers
def add_task(self, func, *args):
self._queue.put((func, *args))
def stop_workers(self):
for _ in range(self._worker_num):
self.add_task(None) # Send stop signal
def clear(self):
with self._queue.mutex:
self._queue.queue.clear()
def stop(self):
self.stop_workers()
for thread in self._threads:
thread.join()