#encoding = utf8 import copy import logging import os import time from queue import Queue import cv2 import numpy as np import pygame from pygame.locals import * from human import HumanContext from ipc import IPCUtil from utils import config_logging logger = logging.getLogger(__name__) current_file_path = os.path.dirname(os.path.abspath(__file__)) ipc = IPCUtil('ipc_sender', 'ipc_sender') def send_image(image): identifier = b'\x01' height, width, channels = image.shape width_bytes = width.to_bytes(4, byteorder='little') height_bytes = height.to_bytes(4, byteorder='little') bit_depth_bytes = channels.to_bytes(4, byteorder='little') img_bytes = image.tobytes() data = identifier + width_bytes + height_bytes + bit_depth_bytes + img_bytes ipc.send_binary(data, len(data)) def cal_box(inv_m, p): x = inv_m[0][0] * p[0] + inv_m[0][1] * p[1] + inv_m[0][2] y = inv_m[1][0] * p[0] + inv_m[1][1] * p[1] + inv_m[1][2] return x, y def img_warp_back_inv_m(img, img_to, inv_m): h_up, w_up, c = img_to.shape mask = np.ones_like(img).astype(np.float32) inv_mask = cv2.warpAffine(mask, inv_m, (w_up, h_up)) inv_img = cv2.warpAffine(img, inv_m, (w_up, h_up)) mask_indices = inv_mask == 1 if 4 == c: send_image(img_to) img_to[:, :, :3][mask_indices] = inv_img[mask_indices] else: img_to[inv_mask == 1] = inv_img[inv_mask == 1] cv2.imwrite('./full.png', img_to) return img_to # h_up, w_up, _ = img.shape # _, _, c = img_to.shape # # cv2.imwrite('./face.png', img) # tx = int(inv_m[0][2]) # ty = int(inv_m[1][2]) # # inv_m[0][2] = 0 # inv_m[1][2] = 0 # # p0 = cal_box(inv_m, (0, 0)) # p1 = cal_box(inv_m, (w_up, 0)) # p2 = cal_box(inv_m, (w_up, h_up)) # p3 = cal_box(inv_m, (0, h_up)) # lp = (min(p0[0], p3[0]), min(p0[1], p1[1])) # rp = (max(p2[0], p1[0]), min(p2[1], p3[1])) # # w_up = int(rp[0] - lp[0]) # h_up = int(rp[1] - lp[1]) # # # print(f'src_x:{w_up}, src_y:{h_up}') # inv_m[0][2] = 0 # inv_m[1][2] = abs(lp[1]) # # mask = np.ones_like(img, dtype=np.float32) # inv_mask = cv2.warpAffine(mask, inv_m, (w_up, h_up)) # inv_img = cv2.warpAffine(img, inv_m, (w_up, h_up)) # # if c == 4: # # img_to[30:h, 30:w][:, :, :3] = img # img_to[ty:(h_up + ty), tx:(w_up + tx)][:, :, :3][inv_mask == 1] = inv_img[inv_mask == 1] # else: # img_to[inv_mask == 1] = inv_img[inv_mask == 1] # # # cv2.imwrite('./full1.png', img_to) # return img_to def render_image(context, frame): res_frame, idx, type_ = frame if type_ == 0: combine_frame = context.frame_list_cycle[idx] else: bbox = context.coord_list_cycle[idx] combine_frame = copy.deepcopy(context.frame_list_cycle[idx]) af = context.align_frames[idx] inv_m = context.inv_m_frames[idx] y1, y2, x1, x2 = bbox try: t = time.perf_counter() res_frame = cv2.resize(res_frame.astype(np.uint8), (x2 - x1, y2 - y1)) af[y1:y2, x1:x2] = res_frame combine_frame = img_warp_back_inv_m(af, combine_frame, inv_m) except Exception as e: logging.error(f'resize error:{e}') return image = combine_frame return image class PyGameUI: def __init__(self): self._human_context = None self._queue = None self.screen_ = pygame.display.set_mode((920, 860), HWSURFACE | DOUBLEBUF | RESIZABLE) self.clock = pygame.time.Clock() background = os.path.join(current_file_path, '..', 'data', 'background', 'background.jpg') logger.info(f'background: {background}') self._background = pygame.image.load(background).convert() self.background_display_ = pygame.transform.scale(self._background, (920, 860)) self._human_image = None self.running = True def start(self): self._queue = Queue() self._human_context = HumanContext() self._human_context.build() render = self._human_context.render_handler render.set_image_render(self) def run(self): self.start() while self.running: self.clock.tick(60) for event in pygame.event.get(): if event.type == pygame.QUIT: self.running = False elif event.type == VIDEORESIZE: self.background_display_ = pygame.transform.scale(self._background, event.dict['size']) self.screen_.blit(self.background_display_, (0, 0)) self._update_human() if self._human_image is not None: self.screen_.blit(self._human_image, (0, -300)) fps = self.clock.get_fps() pygame.display.set_caption('fps:{:.2f}'.format(fps)) pygame.display.flip() self.stop() pygame.quit() def _update_human(self): if self._queue.empty(): return if self._queue.qsize() > 5: print('render queue is slower') image = self._queue.get() image = render_image(self._human_context, image) color_format = "RGB" if 4 == image.shape[2]: color_format = "RGBA" image = cv2.cvtColor(image, cv2.COLOR_BGRA2RGBA) else: image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) self._human_image = pygame.image.frombuffer(image.tobytes(), image.shape[1::-1], color_format) def stop(self): logger.info('stop') if self._human_context is not None: # self._human_context.pause_talk() self._human_context.stop() def on_render(self, image): self._queue.put(image) if __name__ == '__main__': config_logging('../logs/info.log', logging.INFO, logging.INFO) logger.info('------------start------------') ui = PyGameUI() ui.run() logger.info('------------finish------------')