from utils.log import logger def play_in_loop_v2( segments, startfrom, batch_num, last_direction, is_silent, first_speak, last_speak, ): """ batch_num: 初始和结束,每一帧都这么判断 1、静默时,在静默段循环, 左边界正向,右边界反向, 根据上一次方向和位置,给出新的方向和位置 2、静默转说话: 就近到说话段,pre_falg, post_flag, 都为true VS 其中一个为true 3、说话转静默: 动作段播完,再进入静默(如果还在持续说话,静默段不循环) 4、在整个视频左端点: 开始端只能正向,静默时循环,说话时走2 5、在整个视频右端点: 开始时只能反向,静默时循环,说话时走2 6、根据方向获取batch_num 数量的视频帧,return batch_idxes, current_direction Args: segments: 循环帧配置 [[st, ed, True], ...] startfrom: cur_pos batch_num: 5 last_direction: 0反向1正向 is_silent: 0说话态1动作态 first_speak: 记录是不是第一次讲话 last_speak: 记录是不是讲话结束那一刻 """ frames = [] cur_pos = startfrom cur_direction = last_direction is_first_speak_frame = first_speak is_last_speak_frame = True if last_speak and batch_num == 1 else False while batch_num != 0: # 获取当前帧的所在子分割区间 sub_seg_idx = subseg_judge(cur_pos, segments) # 获取移动方向 next_direction, next_pos = get_next_direction( segments, cur_pos, cur_direction, is_silent, sub_seg_idx, is_first_speak_frame, is_last_speak_frame, ) # 获取指定方向的帧 next_pos = get_next_frame(next_pos, next_direction) frames.append(next_pos) batch_num -= 1 is_first_speak_frame = ( True if first_speak and batch_num == config.batch_size else False ) is_last_speak_frame = True if last_speak and batch_num == 1 else False cur_direction = next_direction cur_pos = next_pos return frames, next_direction def subseg_judge(cur_pos, segments): for idx, frame_seg in enumerate(segments): if cur_pos >= frame_seg[0] and cur_pos <= frame_seg[1]: return idx if cur_pos == 0: return 0 def get_next_direction( segments, cur_pos, cur_direction, is_silent, sub_seg_idx, is_first_speak_frame: bool = False, is_last_speak_frame: bool = False, ): """ 3.3.0 循环帧需求,想尽快走到预期状态 if 动作段: if 开始说话: if 边界: if 正向: pass else: pass else: if 正向: pass else: pass elif 静默: 同上 elif 说话中: 同上 elif 说话结束: 同上 elif 静默段: 同上 Args: is_first_speak_frame: 开始说话flag is_last_speak_frame: 说话结束flag """ left, right, loop_flag = segments[sub_seg_idx] if loop_flag: if is_silent == 1: next_direct, next_pos = pure_silent( segments, left, right, cur_pos, cur_direction, sub_seg_idx ) logger.debug( f"cur_pos:{cur_pos}, next_direct:{next_direct}, is_first_speak_frame:{is_first_speak_frame}" ) elif is_silent == 0: next_direct, next_pos = silent2action( segments, left, right, cur_pos, cur_direction, sub_seg_idx, is_first_speak_frame, ) logger.debug( f"cur_pos:{cur_pos}, next_direct:{next_direct}, is_first_speak_frame{is_first_speak_frame}" ) else: if is_silent == 1: next_direct, next_pos = action2silent( segments, left, right, cur_pos, cur_direction, sub_seg_idx, is_last_speak_frame, ) logger.debug( f"cur_pos{cur_pos}, next_direct:{next_direct},is_first_speak_frame{is_first_speak_frame},is_last_speak_frame:{is_last_speak_frame}" ) elif is_silent == 0: next_direct, next_pos = pure_action( segments, left, right, cur_pos, cur_direction, sub_seg_idx, is_last_speak_frame, ) logger.debug( f"cur_pos:{cur_pos}, next_direct:{next_direct},is_first_speak_frame{is_first_speak_frame}, is_last_speak_frame:{is_last_speak_frame}" ) return next_direct, next_pos def get_next_frame(cur_pos, cur_direction): """根据当前帧和方向,获取下一帧,这里应该保证方向上的帧是一定能取到的 不需要再做额外的边界判断 """ # 正向 if cur_direction == 1: return cur_pos + 1 # 反向 elif cur_direction == 0: return cur_pos - 1 def pure_silent(segments, left, right, cur_pos, cur_direction, sub_seg_idx): """ loop_flag == True and is_silent==1 whether border whether forward Return: next_direction """ # 左边界正向,右边界反向 if cur_pos == segments[0][0]: return 1, cur_pos if cur_pos == segments[-1][1]: return 0, cur_pos # 右边界,反向 if cur_pos == right: return 0, cur_pos # 左边界,正向 if cur_pos == left: return 1, cur_pos # 非边界,之前正向,则继续正向,否则反向 if cur_pos > left and cur_direction == 1: return 1, cur_pos else: return 0, cur_pos def pure_action( segments, left, right, cur_pos, cur_direction, sub_seg_idx, is_last_speak_frame ): """ loop_flag ==False and is_silent == 0 动作播完,正向到静默段 (存在跳段行为) whether border whether forward # 正播反播 Args: is_last_speak_frame: 最后说话结束时刻 Return: next_direction """ if cur_pos == segments[0][0]: return 1, cur_pos if cur_pos == segments[-1][1]: return 0, cur_pos if is_last_speak_frame: # 动作段在末尾,向前找静默 if sub_seg_idx == len(segments) - 1: return 0, cur_pos # 动作段在开始, 向后 if sub_seg_idx == 0: return 1, cur_pos # 动作段在中间,就近原则 mid = left + (right - left + 1) // 2 # 就近原则优先 if cur_pos < mid: return 0, cur_pos else: return 1, cur_pos else: # 其他情况,播放方向一致 if cur_direction == 1: return 1, cur_pos else: return 0, cur_pos def silent2action( segments, left, right, cur_pos, cur_direction, sub_seg_idx, is_first_speak_frame: bool = False, ): """ 在静默区间但是在讲话 loop_flag=True and is_silent == 0 whether border whether forward Return: next_direction """ # 向最近的动作段移动, 如果左面没有动作段 # TODO: 确认下面逻辑是否正确 if ( cur_pos == segments[0][0] ): # 如果发生过跳跃,新段无论是不是动作段,仍然都向后执行 return 1, cur_pos if cur_pos == segments[-1][1]: return 0, cur_pos # 在静默左边界处,且仍在讲话 if cur_pos == left: if cur_direction == 1: return 1, cur_pos else: return 0, cur_pos # 在静默右边界处,且仍在讲话 elif cur_pos == right: if cur_direction == 1: return 1, cur_pos else: return 0, cur_pos else: mid = left + (right - left + 1) // 2 # !!就近原则只对第一次说话有效,其他情况遵循上一次状态 if is_first_speak_frame: # 如果第一段 if sub_seg_idx == 0 and segments[0][2]: return 1, cur_pos # 如果最后一段 elif sub_seg_idx == len(segments) - 1 and segments[-1][2]: return 0, cur_pos if cur_pos < mid: return 0, cur_pos else: return 1, cur_pos else: if cur_direction == 1: return 1, cur_pos elif cur_direction == 0: return 0, cur_pos def action2silent( segments, left, right, cur_pos, cur_direction, sub_seg_idx, is_last_speak_frame ): """ loop_flag=False and is_silent==1 whether border Return: next_direction """ if cur_pos == segments[0][0]: return 1, cur_pos if cur_pos == segments[-1][1]: return 0, cur_pos # 动作段,说话结束转静默情况下,就近原则,进入静默 if is_last_speak_frame: mid = left + (right - left + 1) // 2 if cur_pos < mid: return 0, cur_pos else: return 1, cur_pos else: if cur_direction == 1: return 1, cur_pos else: return 0, cur_pos if __name__ == "__main__": startfrom = 0 # 上一个batch的最后一帧 frame_config= [[1, 200, True]] audio_frame_length = len(mel_chunks) # TODO: 确认是否为 batch_size startfrom = startfrom if startfrom>= frame_config[0][0] else frame_config[0][0] first_speak, last_speak = True, False is_silent= True # 当前batch是否为静默 last_direction = 1 # -1 为反方向 i = 0 while i<=10: start_idx_list, last_direction = play_in_loop_v2( frame_config, startfrom, audio_frame_length, last_direction, is_silent, first_speak, last_speak, ) startfrom = start_idx_list[-1] i+=1