#encoding = utf8 import logging import queue import time from queue import Queue from threading import Thread, Event, Condition import numpy as np from human.message_type import MessageType from human_handler import AudioHandler from utils import melspectrogram logger = logging.getLogger(__name__) class AudioMalHandler(AudioHandler): def __init__(self, context, handler): super().__init__(context, handler) self._queue = Queue() self._wait = False self._condition = Condition() self._exit_event = Event() self._thread = Thread(target=self._on_run) self._exit_event.set() self._thread.start() self.frames = [] self.chunk = context.sample_rate // context.fps logger.info("AudioMalHandler init") def on_message(self, message): if message['msg_id'] == MessageType.Video_Render_Queue_Empty: with self._condition: if self._wait: self._wait = False self._condition.notify() print('AudioMalHandler notify') elif message['msg_id'] == MessageType.Video_Render_Queue_Full: if not self._wait: self._wait = True print('AudioMalHandler wait') else: super().on_message(message) def on_handle(self, stream, index): self._queue.put(stream) def _on_run(self): logging.info('chunk2mal run') while self._exit_event.is_set(): with self._condition: self._condition.wait_for(lambda: not self._wait) print('AudioMalHandler run') self._run_step() time.sleep(0.02) logging.info('chunk2mal exit') def _run_step(self): for _ in range(self._context.batch_size * 2): frame, _type = self.get_audio_frame() self.frames.append(frame) self.on_next_handle((frame, _type), 0) # context not enough, do not run network. if len(self.frames) <= self._context.stride_left_size + self._context.stride_right_size: return inputs = np.concatenate(self.frames) # [N * chunk] mel = melspectrogram(inputs) # print(mel.shape[0],mel.shape,len(mel[0]),len(self.frames)) # cut off stride left = max(0, self._context.stride_left_size * 80 / 50) right = min(len(mel[0]), len(mel[0]) - self._context.stride_right_size * 80 / 50) mel_idx_multiplier = 80. * 2 / self._context.fps mel_step_size = 16 i = 0 mel_chunks = [] while i < (len(self.frames) - self._context.stride_left_size - self._context.stride_right_size) / 2: start_idx = int(left + i * mel_idx_multiplier) # print(start_idx) if start_idx + mel_step_size > len(mel[0]): mel_chunks.append(mel[:, len(mel[0]) - mel_step_size:]) else: mel_chunks.append(mel[:, start_idx: start_idx + mel_step_size]) i += 1 self.on_next_handle(mel_chunks, 1) # discard the old part to save memory self.frames = self.frames[-(self._context.stride_left_size + self._context.stride_right_size):] def get_audio_frame(self): try: frame = self._queue.get(block=True, timeout=0.01) type_ = 0 except queue.Empty: frame = np.zeros(self.chunk, dtype=np.float32) type_ = 1 return frame, type_ def stop(self): logging.info('stop') if self._exit_event is None: return self._exit_event.clear() if self._thread.is_alive(): self._thread.join() logging.info('chunk2mal stop') def pause_talk(self): self._queue.queue.clear()