#encoding = utf8

import asyncio
import threading


class AsyncTaskQueue:
    def __init__(self, work_num=1):
        self._queue = asyncio.Queue()
        self._worker_num = work_num
        self._current_worker_num = work_num
        self._condition = threading.Condition()
        self._thread = threading.Thread(target=self._run_loop)
        self._thread.start()
        self.__loop = None

    def _run_loop(self):
        print('_run_loop')
        self.__loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.__loop)
        self._tasks = [self.__loop.create_task(self._worker()) for _ in range(self._worker_num)]
        self.__loop.run_forever()
        print("exit run")
        if not self.__loop.is_closed():
            self.__loop.close()

    async def _worker(self):
        print('_worker')
        while True:
            with self._condition:
                self._condition.wait_for(lambda: not self._queue.empty())
                task = await self._queue.get()
                func, *args = task  # 解包任务
                if func is None:  # None 作为结束信号
                    break

                print(f"Executing task with args: {args}")
                await func(*args)  # 执行异步函数
                self._queue.task_done()

        print('_worker finish')
        self._current_worker_num = self._current_worker_num - 1
        if self._current_worker_num == 0:
            print('loop stop')
            self.__loop.stop()

    def add_task(self, func, *args):
        with self._condition:
            self._queue.put_nowait((func, *args))
            self._condition.notify()

    def stop_workers(self):
        for _ in range(self._worker_num):
            self.add_task(None)  # 发送结束信号

    def stop(self):
        self.stop_workers()
        self._thread.join()