2024-10-08 12:15:04 +00:00
|
|
|
#encoding = utf8
|
|
|
|
|
2024-11-07 12:43:46 +00:00
|
|
|
import logging
|
|
|
|
from queue import Queue
|
2024-10-08 12:15:04 +00:00
|
|
|
import threading
|
|
|
|
|
2024-11-07 12:43:46 +00:00
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
2024-10-08 12:15:04 +00:00
|
|
|
|
|
|
|
class AsyncTaskQueue:
|
2024-11-07 00:26:03 +00:00
|
|
|
def __init__(self, name, work_num=1):
|
2024-11-07 12:43:46 +00:00
|
|
|
self._queue = Queue()
|
2024-10-14 10:20:55 +00:00
|
|
|
self._worker_num = work_num
|
|
|
|
self._current_worker_num = work_num
|
2024-11-07 00:26:03 +00:00
|
|
|
self._name = name
|
2024-11-15 13:23:34 +00:00
|
|
|
self._threads = []
|
|
|
|
self._lock = threading.Lock()
|
|
|
|
for _ in range(work_num):
|
|
|
|
thread = threading.Thread(target=self._worker, name=f'{name}_worker_{_}')
|
|
|
|
thread.start()
|
|
|
|
self._threads.append(thread)
|
|
|
|
|
|
|
|
def _worker(self):
|
2024-11-07 12:43:46 +00:00
|
|
|
logging.info(f'{self._name}, _worker')
|
2024-10-13 14:49:17 +00:00
|
|
|
while True:
|
2024-11-07 12:43:46 +00:00
|
|
|
try:
|
|
|
|
task = self._queue.get()
|
|
|
|
if task is None: # None as a stop signal
|
|
|
|
break
|
|
|
|
|
|
|
|
func, *args = task # Unpack task
|
|
|
|
if func is None: # None as a stop signal
|
|
|
|
break
|
|
|
|
|
2024-11-15 13:23:34 +00:00
|
|
|
func(*args) # Execute function
|
2024-11-07 12:43:46 +00:00
|
|
|
except Exception as e:
|
2024-11-13 04:58:56 +00:00
|
|
|
logging.error(f'{self._name} error: {repr(e)}')
|
2024-11-07 12:43:46 +00:00
|
|
|
finally:
|
|
|
|
self._queue.task_done()
|
2024-10-14 10:20:55 +00:00
|
|
|
|
2024-11-07 12:43:46 +00:00
|
|
|
logging.info(f'{self._name}, _worker finish')
|
2024-11-15 13:23:34 +00:00
|
|
|
with self._lock:
|
|
|
|
self._current_worker_num -= 1
|
|
|
|
if self._current_worker_num == 0:
|
|
|
|
self._queue.put(None) # Send stop signal to remaining workers
|
2024-10-08 12:15:04 +00:00
|
|
|
|
2024-10-14 10:20:55 +00:00
|
|
|
def add_task(self, func, *args):
|
2024-11-07 12:43:46 +00:00
|
|
|
self._queue.put((func, *args))
|
2024-10-08 12:15:04 +00:00
|
|
|
|
2024-10-14 10:20:55 +00:00
|
|
|
def stop_workers(self):
|
|
|
|
for _ in range(self._worker_num):
|
2024-11-05 11:40:03 +00:00
|
|
|
self.add_task(None) # Send stop signal
|
2024-10-08 12:15:04 +00:00
|
|
|
|
2024-10-19 10:47:34 +00:00
|
|
|
def clear(self):
|
2024-11-15 13:23:34 +00:00
|
|
|
with self._queue.mutex:
|
|
|
|
self._queue.queue.clear()
|
2024-10-19 10:47:34 +00:00
|
|
|
|
2024-10-08 12:15:04 +00:00
|
|
|
def stop(self):
|
2024-10-14 10:20:55 +00:00
|
|
|
self.stop_workers()
|
2024-11-15 13:23:34 +00:00
|
|
|
for thread in self._threads:
|
|
|
|
thread.join()
|