#encoding = utf8

import asyncio
import threading


class AsyncTaskQueue:
    def __init__(self, work_num=1):
        self._queue = asyncio.Queue()
        self._worker_num = work_num
        self._current_worker_num = work_num
        self._thread = threading.Thread(target=self._run_loop)
        self._thread.start()
        self.__loop = None

    def _run_loop(self):
        print('_run_loop')
        self.__loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.__loop)
        self._tasks = [self.__loop.create_task(self._worker()) for _ in range(self._worker_num)]
        self.__loop.run_forever()
        print("exit run")
        if not self.__loop.is_closed():
            self.__loop.close()

    async def _worker(self):
        print('_worker')
        while True:
            task = await self._queue.get()
            if task is None:  # None as a stop signal
                break

            func, *args = task  # Unpack task
            print(f"Executing task with args: {args}")
            await func(*args)  # Execute async function
            self._queue.task_done()

        print('_worker finish')
        self._current_worker_num -= 1
        if self._current_worker_num == 0:
            print('loop stop')
            self.__loop.stop()

    def add_task(self, func, *args):
        self.__loop.call_soon_threadsafe(self._queue.put_nowait, (func, *args))

    def stop_workers(self):
        for _ in range(self._worker_num):
            self.add_task(None)  # Send stop signal

    def clear(self):
        while not self._queue.empty():
            self._queue.get_nowait()
            self._queue.task_done()

    def stop(self):
        self.stop_workers()
        self._thread.join()