#encoding = utf8 import logging import queue import time from threading import Thread, Event from eventbus import EventBus import numpy as np from human_handler import AudioHandler from utils import melspectrogram, SyncQueue logger = logging.getLogger(__name__) class AudioMalHandler(AudioHandler): def __init__(self, context, handler): super().__init__(context, handler) EventBus().register('stop', self._on_stop) EventBus().register('clear_cache', self.on_clear_cache) self._is_running = True self._queue = SyncQueue(context.batch_size * 2, "AudioMalHandler_queue") self.frames = [] self.chunk = context.sample_rate // context.fps self._exit_event = Event() self._exit_event.set() self._thread = Thread(target=self._on_run, name="AudioMalHandlerThread") self._thread.start() logger.info("AudioMalHandler init") def __del__(self): EventBus().unregister('stop', self._on_stop) EventBus().unregister('clear_cache', self.on_clear_cache) def _on_stop(self, *args, **kwargs): self.stop() def on_clear_cache(self, *args, **kwargs): self.frames.clear() self._queue.clear() def on_message(self, message): super().on_message(message) def on_handle(self, stream, index): # print('AudioMalHandler on_handle', index) self._queue.put(stream) def _on_run(self): logging.info('chunk2mal run') while self._exit_event.is_set() and self._is_running: self._run_step() time.sleep(0.02) logging.info('chunk2mal exit') def _run_step(self): count = 0 for _ in range(self._context.batch_size * 2): frame, _type = self.get_audio_frame() chunk, txt = frame self.frames.append(chunk) self.on_next_handle((frame, _type), 0) count = count + 1 if self._is_running is False: return # context not enough, do not run network. if len(self.frames) <= self._context.stride_left_size + self._context.stride_right_size: return # print('AudioMalHandler _run_step', count) inputs = np.concatenate(self.frames) # [N * chunk] mel = melspectrogram(inputs) # print(mel.shape[0],mel.shape,len(mel[0]),len(self.frames)) # cut off stride left = max(0, self._context.stride_left_size * 80 / 50) right = min(len(mel[0]), len(mel[0]) - self._context.stride_right_size * 80 / 50) mel_idx_multiplier = 80. * 2 / self._context.fps mel_step_size = 16 i = 0 mel_chunks = [] while i < (len(self.frames) - self._context.stride_left_size - self._context.stride_right_size) / 2\ and self._is_running: start_idx = int(left + i * mel_idx_multiplier) # print(start_idx) if start_idx + mel_step_size > len(mel[0]): mel_chunks.append(mel[:, len(mel[0]) - mel_step_size:]) else: mel_chunks.append(mel[:, start_idx: start_idx + mel_step_size]) i += 1 self.on_next_handle(mel_chunks, 1) # discard the old part to save memory self.frames = self.frames[-(self._context.stride_left_size + self._context.stride_right_size):] def get_audio_frame(self): if not self._queue.is_empty(): frame = self._queue.get() type_ = 0 else: chunk = np.zeros(self.chunk, dtype=np.float32) frame = (chunk, '') type_ = 1 # print('AudioMalHandler get_audio_frame type:', type_) return frame, type_ def stop(self): logging.info('stop') self._is_running = False if self._exit_event is None: return self._exit_event.clear() if self._thread.is_alive(): self._thread.join() logging.info('chunk2mal stop') def pause_talk(self): print('AudioMalHandler pause_talk', self._queue.size()) self._queue.clear()