human/ui/ipc_render.py
2024-12-09 01:20:48 +08:00

74 lines
2.3 KiB
Python

#encoding = utf8
import os
import logging
import time
import numpy as np
from human import HumanRender, RenderStatus
from ipc import IPCUtil
from utils import render_image
logger = logging.getLogger(__name__)
current_file_path = os.path.dirname(os.path.abspath(__file__))
class IpcRender(HumanRender):
def __init__(self, context):
super().__init__(context, None)
self._ipc = IPCUtil('human_product', 'human_render')
self._current_text = ''
def _send_image(self, image):
height, width, channels = image.shape
t = time.perf_counter()
width_bytes = width.to_bytes(4, byteorder='little')
height_bytes = height.to_bytes(4, byteorder='little')
bit_depth_bytes = channels.to_bytes(4, byteorder='little')
img_bytes = image.tobytes()
identifier = b'\x01'
data = identifier + width_bytes + height_bytes + bit_depth_bytes + img_bytes
self._ipc.send_binary(data, len(data))
def _send_voice(self, voice):
voice_identifier = b'\x02'
data = voice_identifier
for audio_frame in voice:
frame, type_ = audio_frame
chunk, txt = frame
if txt != self._current_text:
self._current_text = txt
logging.info(f'VoiceRender: {txt}')
chunk = (chunk * 32767).astype(np.int16)
voice_bytes = chunk.tobytes()
data = data + voice_bytes
self._ipc.send_binary(data, len(data))
def _on_reader_callback(self, data_str, size):
data_str = data_str.decode('utf-8')
print(f'on_reader_callback: {data_str}, size:{size}')
if 'quit' == data_str:
self._context.stop()
elif 'heartbeat' == data_str:
pass
elif 'full' == data_str:
self._render_status = RenderStatus.E_Full
elif 'empty' == data_str:
self._render_status = RenderStatus.E_Empty
elif 'normal' == data_str:
self._render_status = RenderStatus.E_Normal
def run(self):
self._ipc.set_reader_callback(self._on_reader_callback)
logger.info(f'ipc listen:{self._ipc.listen()}')
super().run()
def _render(self, video_frame, voice_frame):
image = render_image(self._context, video_frame)
self._send_image(image)
self._send_voice(voice_frame)