#encoding = utf8

import asyncio
import logging
from queue import Queue
import threading

logger = logging.getLogger(__name__)


class AsyncTaskQueue:
    def __init__(self, name, work_num=1):
        self._queue = Queue()
        self._worker_num = work_num
        self._current_worker_num = work_num
        self._name = name
        self._thread = threading.Thread(target=self._run_loop, name=name)
        self._thread.start()
        self.__loop = None

    def _run_loop(self):
        logging.info(f'{self._name}, _run_loop')
        self.__loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.__loop)
        self._tasks = [self.__loop.create_task(self._worker()) for _ in range(self._worker_num)]
        try:
            self.__loop.run_forever()
        finally:
            logging.info(f'{self._name}, exit run')
            self.__loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(self.__loop)))
            self.__loop.close()
            logging.info(f'{self._name}, close loop')

    async def _worker(self):
        logging.info(f'{self._name}, _worker')
        while True:
            try:
                task = self._queue.get()
                if task is None:  # None as a stop signal
                    break

                func, *args = task  # Unpack task
                if func is None:  # None as a stop signal
                    break

                await func(*args)  # Execute async function
            except Exception as e:
                logging.error(f'{self._name} error: {e}')
            finally:
                self._queue.task_done()

        logging.info(f'{self._name}, _worker finish')
        self._current_worker_num -= 1
        if self._current_worker_num == 0:
            self.__loop.call_soon_threadsafe(self.__loop.stop)

    def add_task(self, func, *args):
        self._queue.put((func, *args))

    def stop_workers(self):
        for _ in range(self._worker_num):
            self.add_task(None)  # Send stop signal

    def clear(self):
        while not self._queue.empty():
            self._queue.get_nowait()
            self._queue.task_done()

    def stop(self):
        self.stop_workers()
        self._thread.join()