diff --git a/face/img00016.jpg.bak b/face/img00016.jpg similarity index 100% rename from face/img00016.jpg.bak rename to face/img00016.jpg diff --git a/face/img00018.jpg b/face/img00018.jpg.bak similarity index 100% rename from face/img00018.jpg rename to face/img00018.jpg.bak diff --git a/human/audio_inference_handler.py b/human/audio_inference_handler.py index dd37dc8..befe88b 100644 --- a/human/audio_inference_handler.py +++ b/human/audio_inference_handler.py @@ -35,6 +35,10 @@ class AudioInferenceHandler(AudioHandler): elif type_ == 0: self._audio_queue.put(stream) + def on_message(self, message): + print('human render notify:', message) + super().on_message(message) + def __on_run(self): wav2lip_path = os.path.join(current_file_path, '..', 'checkpoints', 'wav2lip.pth') logger.info(f'AudioInferenceHandler init, path:{wav2lip_path}') diff --git a/human/audio_mal_handler.py b/human/audio_mal_handler.py index e0b2912..34cc377 100644 --- a/human/audio_mal_handler.py +++ b/human/audio_mal_handler.py @@ -3,10 +3,11 @@ import logging import queue import time from queue import Queue -from threading import Thread, Event +from threading import Thread, Event, Condition import numpy as np +from human.message_type import MessageType from human_handler import AudioHandler from utils import melspectrogram @@ -18,6 +19,8 @@ class AudioMalHandler(AudioHandler): super().__init__(context, handler) self._queue = Queue() + self._wait = False + self._condition = Condition() self._exit_event = Event() self._thread = Thread(target=self._on_run) self._exit_event.set() @@ -27,14 +30,31 @@ class AudioMalHandler(AudioHandler): self.chunk = context.sample_rate // context.fps logger.info("AudioMalHandler init") + def on_message(self, message): + if message['msg_id'] == MessageType.Video_Render_Queue_Empty: + with self._condition: + if self._wait: + self._wait = False + self._condition.notify() + print('AudioMalHandler notify') + elif message['msg_id'] == MessageType.Video_Render_Queue_Full: + if not self._wait: + self._wait = True + print('AudioMalHandler wait') + else: + super().on_message(message) + def on_handle(self, stream, index): self._queue.put(stream) def _on_run(self): logging.info('chunk2mal run') while self._exit_event.is_set(): + with self._condition: + self._condition.wait_for(lambda: not self._wait) + print('AudioMalHandler run') self._run_step() - time.sleep(0.3) + time.sleep(0.02) logging.info('chunk2mal exit') diff --git a/human/human_context.py b/human/human_context.py index b6991a1..265e97f 100644 --- a/human/human_context.py +++ b/human/human_context.py @@ -22,6 +22,7 @@ class HumanContext: self._sample_rate = 16000 self._stride_left_size = 10 self._stride_right_size = 10 + self._render_batch = 5 self._device = get_device() print(f'device:{self._device}') @@ -61,6 +62,10 @@ class HumanContext: def image_size(self): return self._image_size + @property + def render_batch(self): + return self._render_batch + @property def device(self): return self._device @@ -97,6 +102,12 @@ class HumanContext: def render_handler(self): return self._render_handler + def notify(self, message): + if self._tts_handle is not None: + self._tts_handle.on_message(message) + else: + logger.info(f'notify message:{message}') + def build(self): self._render_handler = HumanRender(self, None) self._infer_handler = AudioInferenceHandler(self, self._render_handler) diff --git a/human/human_render.py b/human/human_render.py index 8837012..a8bc60b 100644 --- a/human/human_render.py +++ b/human/human_render.py @@ -9,6 +9,7 @@ from threading import Thread, Event import cv2 import numpy as np +from human.message_type import MessageType from human_handler import AudioHandler from render import VoiceRender, VideoRender, PlayClock @@ -20,10 +21,10 @@ class HumanRender(AudioHandler): super().__init__(context, handler) play_clock = PlayClock() - self._voice_render = VoiceRender(play_clock) + self._voice_render = VoiceRender(play_clock, context) self._video_render = VideoRender(play_clock, context, self) - self._queue = Queue(context.batch_size * 2) self._image_render = None + self._last_ps = 0 def set_image_render(self, render): self._image_render = render @@ -32,17 +33,21 @@ class HumanRender(AudioHandler): if self._image_render is not None: self._image_render.on_render(image) + def on_message(self, message): + print('human render notify:', message) + super().on_message(message) + def on_handle(self, stream, index): res_frame, idx, audio_frames = stream - self._voice_render.put(audio_frames) + self._voice_render.put(audio_frames, self._last_ps) type_ = 1 if audio_frames[0][1] != 0 and audio_frames[1][1] != 0: type_ = 0 - self._video_render.put((res_frame, idx, type_)) + self._video_render.put((res_frame, idx, type_), self._last_ps) + self._last_ps = self._last_ps + 0.2 - def pause_handle(self): - if self._video_render.size() > self._context.batch_size * 2: - super().pause_handle() + if self._voice_render.is_full(): + self._context.notify({'msg_id': MessageType.Video_Render_Queue_Full}) def pause_talk(self): self._voice_render.pause_talk() diff --git a/human/message_type.py b/human/message_type.py new file mode 100644 index 0000000..9b845b5 --- /dev/null +++ b/human/message_type.py @@ -0,0 +1,9 @@ +#encoding = utf8 +from enum import Enum + + +class MessageType(Enum): + Unknown = 0 + Video_Render_Queue_Empty = 1 + Video_Render_Queue_Not_Empty = 2 + Video_Render_Queue_Full = 3 diff --git a/human_handler/audio_handler.py b/human_handler/audio_handler.py index 2b8df73..2accd5c 100644 --- a/human_handler/audio_handler.py +++ b/human_handler/audio_handler.py @@ -14,12 +14,16 @@ class AudioHandler(ABC): def on_handle(self, stream, index): pass - @abstractmethod - def pause_handle(self): + def on_message(self, message): if self._handler is not None: - self._handler.pause_handle() - else: - logging.info(f'_handler is None') + self._handler.on_message(message) + + # @abstractmethod + # def pause_handle(self): + # if self._handler is not None: + # self._handler.pause_handle() + # else: + # logging.info(f'_handler is None') @abstractmethod def stop(self): diff --git a/render/base_render.py b/render/base_render.py index 2f71140..cdb4c33 100644 --- a/render/base_render.py +++ b/render/base_render.py @@ -9,8 +9,10 @@ logger = logging.getLogger(__name__) class BaseRender(ABC): - def __init__(self, play_clock, delay=0.02): + def __init__(self, play_clock, context, type_, delay=0.02): self._play_clock = play_clock + self._context = context + self._type = type_ self._delay = delay self._queue = Queue() self._exit_event = Event() @@ -19,15 +21,14 @@ class BaseRender(ABC): self._thread.start() def _on_run(self): - logging.info('Audio render run') + logging.info(f'{self._type} render run') while self._exit_event.is_set(): self._run_step() time.sleep(self._delay) - logging.info('Audio render exit') + logging.info(f'{self._type} render exit') - def put(self, frame): - ps = time.time() - self._play_clock.start_time + def put(self, frame, ps): self._queue.put_nowait((frame, ps)) def size(self): diff --git a/render/play_clock.py b/render/play_clock.py index 8bcb767..870aee4 100644 --- a/render/play_clock.py +++ b/render/play_clock.py @@ -17,14 +17,14 @@ class PlayClock: def current_time(self): return self._current_time - @property - def audio_diff_threshold(self): - return self._audio_diff_threshold - @current_time.setter def current_time(self, v): self._current_time = v + @property + def audio_diff_threshold(self): + return self._audio_diff_threshold + @property def display_time(self): return self._display_time diff --git a/render/video_render.py b/render/video_render.py index 4df39c9..e6ccb7b 100644 --- a/render/video_render.py +++ b/render/video_render.py @@ -2,64 +2,59 @@ import copy import time from queue import Empty +from enum import Enum import cv2 import numpy as np from .base_render import BaseRender +from human.message_type import MessageType class VideoRender(BaseRender): def __init__(self, play_clock, context, human_render): - super().__init__(play_clock, 0.02) - self._context = context + super().__init__(play_clock, context, 'Video') self._human_render = human_render def _run_step(self): - try: - frame, ps = self._queue.get(block=True, timeout=0.01) - res_frame, idx, type_ = frame - print('video render queue size', self._queue.qsize()) - except Empty: - return - - if type_ == 0: - combine_frame = self._context.frame_list_cycle[idx] - else: - print('get face', self._queue.qsize()) - bbox = self._context.coord_list_cycle[idx] - combine_frame = copy.deepcopy(self._context.frame_list_cycle[idx]) - y1, y2, x1, x2 = bbox + while self._exit_event.is_set(): try: - res_frame = cv2.resize(res_frame.astype(np.uint8), (x2 - x1, y2 - y1)) - except: - print('resize error') + frame, ps = self._queue.get(block=True, timeout=0.01) + res_frame, idx, type_ = frame + except Empty: return - # combine_frame = get_image(ori_frame,res_frame,bbox) - # t=time.perf_counter() - combine_frame[y1:y2, x1:x2] = res_frame - clock_time = self._play_clock.clock_time() - time_difference = clock_time - ps + clock_time = self._play_clock.clock_time() + time_difference = clock_time - ps - print('video render:', ps, ' ', clock_time, ' ', time_difference) - if time_difference < -0.01: # 音频比视频快超过10ms - sleep_time = abs(time_difference + 0.01) - print("Video frame waiting to catch up with audio", sleep_time) - if sleep_time > 0: - time.sleep(sleep_time) # 只在正值时调用 sleep - return # 继续等待 + print('video render:', ps, ' ', clock_time, ' ', time_difference) + if time_difference < -self._play_clock.audio_diff_threshold: + sleep_time = abs(time_difference + self._play_clock.audio_diff_threshold) + print("Video frame waiting to catch up with audio", sleep_time) + if sleep_time > 0: + time.sleep(sleep_time) - elif time_difference < -0.01: # 视频比音频快超过10ms - print("Video frame dropped to catch up with audio") - return # 丢帧 - # if time_difference > self._play_clock.audio_diff_threshold: - # # print('video is slow') - # return - # elif time_difference < self._play_clock.audio_diff_threshold: + elif time_difference > self._play_clock.audio_diff_threshold: # 视频比音频快超过10ms + print("Video frame dropped to catch up with audio") + continue - image = combine_frame - image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) - if self._human_render is not None: - self._human_render.put_image(image) + print('get face', self._queue.qsize()) + if type_ == 0: + combine_frame = self._context.frame_list_cycle[idx] + else: + bbox = self._context.coord_list_cycle[idx] + combine_frame = copy.deepcopy(self._context.frame_list_cycle[idx]) + y1, y2, x1, x2 = bbox + try: + res_frame = cv2.resize(res_frame.astype(np.uint8), (x2 - x1, y2 - y1)) + except: + print('resize error') + return + combine_frame[y1:y2, x1:x2] = res_frame + + image = combine_frame + image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) + if self._human_render is not None: + self._human_render.put_image(image) + return diff --git a/render/voice_render.py b/render/voice_render.py index aa0f283..5e0dde5 100644 --- a/render/voice_render.py +++ b/render/voice_render.py @@ -6,23 +6,35 @@ from queue import Empty import numpy as np from audio_render import AudioRender +from human.message_type import MessageType from .base_render import BaseRender logger = logging.getLogger(__name__) class VoiceRender(BaseRender): - def __init__(self, play_clock): - super().__init__(play_clock) + def __init__(self, play_clock, context): + super().__init__(play_clock, context, 'Voice') self._audio_render = AudioRender() + def is_full(self): + return self._queue.qsize() >= self._context.render_batch * 2 + def _run_step(self): try: audio_frames, ps = self._queue.get(block=True, timeout=0.01) print('voice render queue size', self._queue.qsize()) except Empty: + self._context.notify({'msg_id': MessageType.Video_Render_Queue_Empty}) return + status = MessageType.Video_Render_Queue_Not_Empty + if self._queue.qsize() < self._context.render_batch: + status = MessageType.Video_Render_Queue_Empty + elif self._queue.qsize() >= self._context.render_batch * 2: + status = MessageType.Video_Render_Queue_Full + self._context.notify({'msg_id': status}) + self._play_clock.update_display_time() self._play_clock.current_time = ps diff --git a/tts/tts_edge_http.py b/tts/tts_edge_http.py index 46ba42c..1114269 100644 --- a/tts/tts_edge_http.py +++ b/tts/tts_edge_http.py @@ -33,8 +33,6 @@ class TTSEdgeHttp(TTSBase): async with session.post(self._url, json=data) as response: if response.status == 200: stream = BytesIO(await response.read()) - - print("Audio data received and saved to output_audio.wav") return stream else: byte_stream = None