human/nlp/nlp_base.py
2025-01-07 21:52:15 +08:00

93 lines
2.6 KiB
Python

#encoding = utf8
import logging
import os
from asr import AsrObserver
from eventbus import EventBus
from utils import AsyncTaskQueue
logger = logging.getLogger(__name__)
current_file_path = os.path.dirname(os.path.abspath(__file__))
class NLPBase(AsrObserver):
def __init__(self, context, split, callback=None):
self._ask_queue = AsyncTaskQueue('NLPBaseQueue')
self._context = context
self._split_handle = split
self._callback = callback
self._is_running = True
self._test_page = 1
EventBus().register('stop', self.on_stop)
EventBus().register('clear_cache', self.on_clear_cache)
def __del__(self):
EventBus().unregister('stop', self.on_stop)
EventBus().unregister('clear_cache', self.on_clear_cache)
def on_stop(self, *args, **kwargs):
self.stop()
def on_clear_cache(self, *args, **kwargs):
logger.info('NLPBase clear_cache')
self.pause_talk()
if self._callback is not None:
self._callback.on_clear()
@property
def callback(self):
return self._callback
@callback.setter
def callback(self, value):
self._callback = value
def _on_callback(self, txt: str):
if self._callback is not None and self._is_running:
self._callback.on_message(txt)
def _request(self, question):
pass
def _on_close(self):
pass
def process(self, message: str):
pass
def completed(self, message: str):
if not self._is_running:
return
message = f'讲解第{self._test_page}'
if self._test_page == 1:
message = '讲解一下汉代的女童教育'
EventBus().post('ppt_open', f'{current_file_path}/../data/ppt/test_20250103213237A001.pptx')
else:
EventBus().post('ppt_goto', self._test_page)
self._test_page = self._test_page + 1
logger.info(f'complete:{message}')
self.ask(message)
def ask(self, question):
logger.info(f'ask:{question}')
self._is_running = True
self._ask_queue.add_task(self._request, question)
logger.info(f'ask:{question} completed')
def stop(self):
logger.info('NLPBase stop')
self._is_running = False
self._ask_queue.clear()
self._ask_queue.add_task(self._on_close)
logger.info('NLPBase add close')
self._ask_queue.stop()
logger.info('NLPBase _ask_queue stop')
def pause_talk(self):
logger.info('NLPBase pause_talk')
self._ask_queue.clear()