#encoding = utf8 import asyncio import threading class AsyncTaskQueue: def __init__(self): self._queue = asyncio.Queue() self._loop = asyncio.new_event_loop() self._thread = threading.Thread(target=self._run_loop) self._worker_task = None self._loop_running = threading.Event() self._thread.start() def _run_loop(self): asyncio.set_event_loop(self._loop) self._loop_running.set() self._loop.run_forever() async def _worker(self): while self._loop_running.is_set(): task = await self._queue.get() if task is None: break await task self._queue.task_done() def add_task(self, coro): asyncio.run_coroutine_threadsafe(self._queue.put(coro), self._loop) def start_worker(self): if not self._worker_task: self._worker_task = asyncio.run_coroutine_threadsafe(self._worker(), self._loop) def stop(self): self._loop_running.clear() asyncio.run_coroutine_threadsafe(self._queue.put(None), self._loop).result() if self._worker_task: self._worker_task.result() self._loop.call_soon_threadsafe(self._loop.stop) self._thread.join() self._loop.close()