From b69dd5a800d96400fe07d382a529dd3661877be7 Mon Sep 17 00:00:00 2001 From: brige Date: Fri, 1 Nov 2024 20:38:57 +0800 Subject: [PATCH] modify render sync --- human/audio_inference_handler.py | 18 ++++++++++-------- human/audio_mal_handler.py | 23 ++++++++--------------- human/human_render.py | 5 ++++- nlp/nlp_doubao.py | 1 - render/base_render.py | 7 +++---- render/video_render.py | 9 +++++---- render/voice_render.py | 8 +++++--- ui.py | 2 +- utils/sync_queue.py | 21 +++++++++++---------- 9 files changed, 47 insertions(+), 47 deletions(-) diff --git a/human/audio_inference_handler.py b/human/audio_inference_handler.py index dc78aea..f7f2432 100644 --- a/human/audio_inference_handler.py +++ b/human/audio_inference_handler.py @@ -20,11 +20,11 @@ class AudioInferenceHandler(AudioHandler): def __init__(self, context, handler): super().__init__(context, handler) - self._mal_queue = Queue() - self._audio_queue = SyncQueue(context.batch_size * 2) + self._mal_queue = SyncQueue(1, "AudioInferenceHandler_Mel") + self._audio_queue = SyncQueue(context.batch_size * 2, "AudioInferenceHandler_Audio") self._exit_event = Event() - self._run_thread = Thread(target=self.__on_run) + self._run_thread = Thread(target=self.__on_run, name="AudioInferenceHandlerThread") self._exit_event.set() self._run_thread.start() logger.info("AudioInferenceHandler init") @@ -34,7 +34,7 @@ class AudioInferenceHandler(AudioHandler): self._mal_queue.put(stream) elif type_ == 0: self._audio_queue.put(stream) - print('AudioInferenceHandler on_handle', type_, self._audio_queue.size()) + # print('AudioInferenceHandler on_handle', type_, self._audio_queue.size()) def on_message(self, message): super().on_message(message) @@ -61,16 +61,18 @@ class AudioInferenceHandler(AudioHandler): start_time = time.perf_counter() batch_size = self._context.batch_size try: - mel_batch = self._mal_queue.get(block=True, timeout=1) + mel_batch = self._mal_queue.get() + size = self._audio_queue.size() + # print('AudioInferenceHandler mel_batch:', len(mel_batch), 'size:', size) except queue.Empty: continue - print('origin mel_batch:', len(mel_batch)) + # print('origin mel_batch:', len(mel_batch)) is_all_silence = True audio_frames = [] for _ in range(batch_size * 2): frame, type_ = self._audio_queue.get() - print('AudioInferenceHandler type_', type_) + # print('AudioInferenceHandler type_', type_) audio_frames.append((frame, type_)) if type_ == 0: is_all_silence = False @@ -89,7 +91,7 @@ class AudioInferenceHandler(AudioHandler): face = face_list_cycle[idx] img_batch.append(face) - print('orign img_batch:', len(img_batch), 'origin mel_batch:', len(mel_batch)) + # print('orign img_batch:', len(img_batch), 'origin mel_batch:', len(mel_batch)) img_batch = np.asarray(img_batch) mel_batch = np.asarray(mel_batch) img_masked = img_batch.copy() diff --git a/human/audio_mal_handler.py b/human/audio_mal_handler.py index 2a08f11..4190692 100644 --- a/human/audio_mal_handler.py +++ b/human/audio_mal_handler.py @@ -2,12 +2,11 @@ import logging import queue import time -from queue import Queue + from threading import Thread, Event, Condition import numpy as np -from human.message_type import MessageType from human_handler import AudioHandler from utils import melspectrogram, SyncQueue @@ -18,9 +17,9 @@ class AudioMalHandler(AudioHandler): def __init__(self, context, handler): super().__init__(context, handler) - self._queue = SyncQueue(context.batch_size) + self._queue = SyncQueue(context.batch_size, "AudioMalHandler_queue") self._exit_event = Event() - self._thread = Thread(target=self._on_run) + self._thread = Thread(target=self._on_run, name="AudioMalHandlerThread") self._exit_event.set() self._thread.start() @@ -32,7 +31,7 @@ class AudioMalHandler(AudioHandler): super().on_message(message) def on_handle(self, stream, index): - print('AudioMalHandler on_handle', index) + # print('AudioMalHandler on_handle', index) self._queue.put(stream) def _on_run(self): @@ -49,13 +48,12 @@ class AudioMalHandler(AudioHandler): frame, _type = self.get_audio_frame() self.frames.append(frame) self.on_next_handle((frame, _type), 0) - print("AudioMalHandler _type", _type) count = count + 1 # context not enough, do not run network. if len(self.frames) <= self._context.stride_left_size + self._context.stride_right_size: return - print('AudioMalHandler _run_step', count) + # print('AudioMalHandler _run_step', count) inputs = np.concatenate(self.frames) # [N * chunk] mel = melspectrogram(inputs) # print(mel.shape[0],mel.shape,len(mel[0]),len(self.frames)) @@ -70,13 +68,10 @@ class AudioMalHandler(AudioHandler): start_idx = int(left + i * mel_idx_multiplier) # print(start_idx) if start_idx + mel_step_size > len(mel[0]): - print("AudioMalHandler start_idx", start_idx) mel_chunks.append(mel[:, len(mel[0]) - mel_step_size:]) else: - print("AudioMalHandler start_idx222", start_idx + mel_step_size - start_idx) mel_chunks.append(mel[:, start_idx: start_idx + mel_step_size]) i += 1 - print("AudioMalHandler start_idx333", i) self.on_next_handle(mel_chunks, 1) # discard the old part to save memory @@ -84,15 +79,13 @@ class AudioMalHandler(AudioHandler): def get_audio_frame(self): try: - frame = self._queue.get() + # print('AudioMalHandler get_audio_frame') + frame = self._queue.get(timeout=0.02) type_ = 0 - if frame is None: - frame = np.zeros(self.chunk, dtype=np.float32) - type_ = 1 except queue.Empty: frame = np.zeros(self.chunk, dtype=np.float32) type_ = 1 - print('AudioMalHandler get_audio_frame type:', type_) + # print('AudioMalHandler get_audio_frame type:', type_) return frame, type_ def stop(self): diff --git a/human/human_render.py b/human/human_render.py index 6f88673..8bd8537 100644 --- a/human/human_render.py +++ b/human/human_render.py @@ -33,7 +33,7 @@ class HumanRender(AudioHandler): def on_handle(self, stream, index): res_frame, idx, audio_frames = stream self._voice_render.put(audio_frames, self._last_audio_ps) - self._last_audio_ps = self._last_audio_ps + 0.2 + self._last_audio_ps = self._last_audio_ps + 0.4 type_ = 1 if audio_frames[0][1] != 0 and audio_frames[1][1] != 0: type_ = 0 @@ -43,6 +43,9 @@ class HumanRender(AudioHandler): if self._voice_render.is_full(): self._context.notify({'msg_id': MessageType.Video_Render_Queue_Full}) + def get_audio_queue_size(self): + return self._voice_render.size() + def pause_talk(self): self._voice_render.pause_talk() self._video_render.pause_talk() diff --git a/nlp/nlp_doubao.py b/nlp/nlp_doubao.py index a8a9a3c..d1b41da 100644 --- a/nlp/nlp_doubao.py +++ b/nlp/nlp_doubao.py @@ -44,7 +44,6 @@ class DouBao(NLPBase): sec = '' async for completion in stream: sec = sec + completion.choices[0].delta.content - print('DouBao content:', sec) sec, message = self._split_handle.handle(sec) if len(message) > 0: self._on_callback(message) diff --git a/render/base_render.py b/render/base_render.py index c31fedc..e9c80ff 100644 --- a/render/base_render.py +++ b/render/base_render.py @@ -11,14 +11,14 @@ logger = logging.getLogger(__name__) class BaseRender(ABC): - def __init__(self, play_clock, context, type_, delay=0.02): + def __init__(self, play_clock, context, type_, delay=0.02, thread_name="BaseRenderThread"): self._play_clock = play_clock self._context = context self._type = type_ self._delay = delay - self._queue = SyncQueue(context.batch_size) + self._queue = SyncQueue(context.batch_size, f'{type_}RenderQueue') self._exit_event = Event() - self._thread = Thread(target=self._on_run) + self._thread = Thread(target=self._on_run, name=thread_name) self._exit_event.set() self._thread.start() @@ -31,7 +31,6 @@ class BaseRender(ABC): logging.info(f'{self._type} render exit') def put(self, frame, ps): - print('put:', ps) self._queue.put((frame, ps)) def size(self): diff --git a/render/video_render.py b/render/video_render.py index 9fca442..045ef42 100644 --- a/render/video_render.py +++ b/render/video_render.py @@ -13,7 +13,7 @@ from human.message_type import MessageType class VideoRender(BaseRender): def __init__(self, play_clock, context, human_render): - super().__init__(play_clock, context, 'Video', 0.038) + super().__init__(play_clock, context, 'Video', 0.03, "VideoRenderThread") self._human_render = human_render self._diff_avg_count = 0 @@ -31,7 +31,7 @@ class VideoRender(BaseRender): clock_time = self._play_clock.clock_time() time_difference = clock_time - ps if abs(time_difference) > self._play_clock.audio_diff_threshold: - if self._diff_avg_count < 10: + if self._diff_avg_count < 5: self._diff_avg_count += 1 else: if time_difference < -self._play_clock.audio_diff_threshold: @@ -47,8 +47,9 @@ class VideoRender(BaseRender): else: self._diff_avg_count = 0 - print('video render:', ps, clock_time, time_difference, - 'get face', self._queue.size(), self._diff_avg_count) + print('video render:', + 'get face', self._queue.size(), + 'audio queue', self._human_render.get_audio_queue_size()) if type_ == 0: combine_frame = self._context.frame_list_cycle[idx] diff --git a/render/voice_render.py b/render/voice_render.py index a62d07f..6ea22f5 100644 --- a/render/voice_render.py +++ b/render/voice_render.py @@ -16,7 +16,7 @@ class VoiceRender(BaseRender): def __init__(self, play_clock, context): self._audio_render = AudioRender() self._is_empty = True - super().__init__(play_clock, context, 'Voice') + super().__init__(play_clock, context, 'Voice', 0.03, "VoiceRenderThread") def is_full(self): return self._queue.size() >= self._context.render_batch * 2 @@ -27,7 +27,7 @@ class VoiceRender(BaseRender): if value is None: return audio_frames, ps = value - # print('voice render queue size', self._queue.qsize()) + # print('voice render queue size', self._queue.size()) except Empty: self._context.notify({'msg_id': MessageType.Video_Render_Queue_Empty}) if not self._is_empty: @@ -55,6 +55,8 @@ class VoiceRender(BaseRender): if self._audio_render is not None: try: - self._audio_render.write(frame.tobytes(), int(frame.shape[0] * 2)) + chunk_len = int(frame.shape[0] * 2) + # print('audio frame:', frame.shape, chunk_len) + self._audio_render.write(frame.tobytes(), chunk_len) except Exception as e: logging.error(f'Error writing audio frame: {e}') diff --git a/ui.py b/ui.py index d3b4dff..6d46fa2 100644 --- a/ui.py +++ b/ui.py @@ -57,7 +57,7 @@ class App(customtkinter.CTk): # self.main_button_1.grid(row=2, column=2, padx=(20, 20), pady=(20, 20), sticky="nsew") background = os.path.join(current_file_path, 'data', 'background', 'background.webp') logger.info(f'background: {background}') - self._background = ImageTk.PhotoImage(read_image(background)) + # self._background = ImageTk.PhotoImage(read_image(background)) self._init_image_canvas() diff --git a/utils/sync_queue.py b/utils/sync_queue.py index 17f8dc3..8809f24 100644 --- a/utils/sync_queue.py +++ b/utils/sync_queue.py @@ -5,35 +5,36 @@ from queue import Queue class SyncQueue: - def __init__(self, maxsize): + def __init__(self, maxsize, name): + self._name = name self._queue = Queue(maxsize) - # self._queue = Queue() self._condition = threading.Condition() def put(self, item): - # self._queue.put(item) with self._condition: while self._queue.full(): - print('put wait') + # print(self._name, 'put wait') self._condition.wait() self._queue.put(item) self._condition.notify() - def get(self): - # return self._queue.get(block=True, timeout=0.01) + def get(self, timeout=None): + # 添加超时时间,防止死锁 with self._condition: while self._queue.empty(): - self._condition.wait() - item = self._queue.get() + self._condition.wait(timeout=timeout) + # print(self._name, 'get wait') + if timeout is not None: + break + item = self._queue.get(block=False) self._condition.notify() return item def clear(self): - # self._queue.queue.clear() with self._condition: while not self._queue.empty(): self._queue.queue.clear() - self._condition.notify_all() + self._condition.notify() def size(self): return self._queue.qsize()