human/utils/async_task_queue.py

59 lines
1.8 KiB
Python
Raw Normal View History

#encoding = utf8
import asyncio
import threading
class AsyncTaskQueue:
2024-10-14 10:20:55 +00:00
def __init__(self, work_num=1):
self._queue = asyncio.Queue()
2024-10-14 10:20:55 +00:00
self._worker_num = work_num
self._current_worker_num = work_num
self._condition = threading.Condition()
self._thread = threading.Thread(target=self._run_loop)
self._thread.start()
2024-10-14 10:20:55 +00:00
self.__loop = None
def _run_loop(self):
2024-10-10 11:01:13 +00:00
print('_run_loop')
2024-10-14 10:20:55 +00:00
self.__loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.__loop)
self._tasks = [self.__loop.create_task(self._worker()) for _ in range(self._worker_num)]
self.__loop.run_forever()
print("exit run")
if not self.__loop.is_closed():
self.__loop.close()
async def _worker(self):
2024-10-10 11:01:13 +00:00
print('_worker')
2024-10-13 14:49:17 +00:00
while True:
2024-10-14 10:20:55 +00:00
with self._condition:
self._condition.wait_for(lambda: not self._queue.empty())
task = await self._queue.get()
func, *args = task # 解包任务
if func is None: # None 作为结束信号
break
print(f"Executing task with args: {args}")
await func(*args) # 执行异步函数
self._queue.task_done()
2024-10-10 11:01:13 +00:00
print('_worker finish')
2024-10-14 10:20:55 +00:00
self._current_worker_num = self._current_worker_num - 1
if self._current_worker_num == 0:
print('loop stop')
self.__loop.stop()
2024-10-14 10:20:55 +00:00
def add_task(self, func, *args):
with self._condition:
self._queue.put_nowait((func, *args))
self._condition.notify()
2024-10-14 10:20:55 +00:00
def stop_workers(self):
for _ in range(self._worker_num):
self.add_task(None) # 发送结束信号
def stop(self):
2024-10-14 10:20:55 +00:00
self.stop_workers()
2024-10-10 03:31:09 +00:00
self._thread.join()