#encoding = utf8 import threading from eventbus import EventBus from .asr_observer import AsrObserver class AsrBase: def __init__(self): self._hot_words_file = '' self._sample_rate = 32000 self._samples_per_read = 100 self._observers = [] EventBus().register('stop', self._on_stop) self._stop_event = threading.Event() self._stop_event.set() self._thread = threading.Thread(target=self._recognize_loop, name="AsrBaseThread") self._thread.start() def __del__(self): EventBus().unregister('stop', self._on_stop) def _on_stop(self, *args, **kwargs): self.stop() def _recognize_loop(self): pass def _notify_process(self, message: str): for observer in self._observers: observer.process(message) def _notify_complete(self, message: str): EventBus().post('clear_cache') for observer in self._observers: observer.completed(message) def stop(self): self._stop_event.clear() self._thread.join() def attach(self, observer: AsrObserver): self._observers.append(observer) def detach(self, observer: AsrObserver): self._observers.remove(observer)