human/utils/async_task_queue.py

62 lines
1.8 KiB
Python
Raw Normal View History

#encoding = utf8
2024-11-07 12:43:46 +00:00
import logging
from queue import Queue
import threading
2024-11-07 12:43:46 +00:00
logger = logging.getLogger(__name__)
class AsyncTaskQueue:
2024-11-07 00:26:03 +00:00
def __init__(self, name, work_num=1):
2024-11-07 12:43:46 +00:00
self._queue = Queue()
2024-10-14 10:20:55 +00:00
self._worker_num = work_num
self._current_worker_num = work_num
2024-11-07 00:26:03 +00:00
self._name = name
2024-11-15 13:23:34 +00:00
self._threads = []
self._lock = threading.Lock()
for _ in range(work_num):
thread = threading.Thread(target=self._worker, name=f'{name}_worker_{_}')
thread.start()
self._threads.append(thread)
def _worker(self):
2024-11-07 12:43:46 +00:00
logging.info(f'{self._name}, _worker')
2024-10-13 14:49:17 +00:00
while True:
2024-11-07 12:43:46 +00:00
try:
task = self._queue.get()
if task is None: # None as a stop signal
break
func, *args = task # Unpack task
if func is None: # None as a stop signal
break
2024-11-15 13:23:34 +00:00
func(*args) # Execute function
2024-11-07 12:43:46 +00:00
except Exception as e:
2024-11-13 04:58:56 +00:00
logging.error(f'{self._name} error: {repr(e)}')
2024-11-07 12:43:46 +00:00
finally:
self._queue.task_done()
2024-10-14 10:20:55 +00:00
2024-11-07 12:43:46 +00:00
logging.info(f'{self._name}, _worker finish')
2024-11-15 13:23:34 +00:00
with self._lock:
self._current_worker_num -= 1
if self._current_worker_num == 0:
self._queue.put(None) # Send stop signal to remaining workers
2024-10-14 10:20:55 +00:00
def add_task(self, func, *args):
2024-11-07 12:43:46 +00:00
self._queue.put((func, *args))
2024-10-14 10:20:55 +00:00
def stop_workers(self):
for _ in range(self._worker_num):
2024-11-05 11:40:03 +00:00
self.add_task(None) # Send stop signal
2024-10-19 10:47:34 +00:00
def clear(self):
2024-11-15 13:23:34 +00:00
with self._queue.mutex:
self._queue.queue.clear()
2024-10-19 10:47:34 +00:00
def stop(self):
2024-10-14 10:20:55 +00:00
self.stop_workers()
2024-11-15 13:23:34 +00:00
for thread in self._threads:
thread.join()