diff --git a/utils/log.py b/utils/log.py new file mode 100644 index 0000000..a531729 --- /dev/null +++ b/utils/log.py @@ -0,0 +1,119 @@ +import logging +import os +import sys +import redis +from loguru import logger as logurulogger +import redis.exceptions +from app.config import config +import json +from redis.retry import Retry +from redis.backoff import ExponentialBackoff + +LOG_FORMAT = ( + "{level: <8} " + "{process.name} | " # 进程名 + "{thread.name} | " + "{time:YYYY-MM-DD HH:mm:ss.SSS} - " + "{process} " + "{module}.{function}:{line} - " + "{message}" +) +LOG_NAME = ["uvicorn", "uvicorn.access", "uvicorn.error", "flask"] + +# 配置 Redis 连接池 +redis_pool = redis.ConnectionPool( + host=config.LOG_REDIS_HOST, # Redis 服务器地址 + port=config.LOG_REDIS_PORT, # Redis 服务器端口 + db=config.LOG_REDIS_DB, # 数据库编号 + password=config.LOG_REDIS_AUTH, # 密码 + max_connections=config.max_connections, # 最大连接数 + socket_connect_timeout=config.socket_connect_timeout, # 连接超时时间 + socket_timeout=config.socket_timeout, # 等待超时时间 +) + + +class InterceptHandler(logging.Handler): + def emit(self, record): + try: + level = logurulogger.level(record.levelname).name + except AttributeError: + level = logging._levelToName[record.levelno] + + frame, depth = logging.currentframe(), 2 + while frame.f_code.co_filename == logging.__file__: + frame = frame.f_back + depth += 1 + + logurulogger.opt(depth=depth, exception=record.exc_info).log( + level, record.getMessage() + ) + +class Logging: + """自定义日志""" + + def __init__(self): + self.log_path = "logs" + self._connect_redis() + if config.IS_LOCAL: + os.makedirs(self.log_path, exist_ok=True) + self._initlogger() + self._reset_log_handler() + + def _connect_redis(self): + retry = Retry(ExponentialBackoff(), 3) # 重试3次,指数退避 + self.redis_client = redis.Redis(connection_pool=redis_pool,retry=retry) # 使用连接池 + + def _initlogger(self): + """初始化loguru配置""" + logurulogger.remove() + if config.IS_LOCAL: + logurulogger.add( + os.path.join(self.log_path, "error.log.{time:YYYY-MM-DD}"), + format=LOG_FORMAT, + level=logging.ERROR, + rotation="00:00", + retention="1 week", + backtrace=True, + diagnose=True, + enqueue=True + ) + logurulogger.add( + os.path.join(self.log_path, "info.log.{time:YYYY-MM-DD}"), + format=LOG_FORMAT, + level=logging.INFO, + rotation="00:00", + retention="1 week", + enqueue=True + ) + logurulogger.add( + sys.stdout, + format=LOG_FORMAT, + level=logging.DEBUG, + colorize=True, + ) + + logurulogger.add(self._log_to_redis, level="INFO", format=LOG_FORMAT) + self.logger = logurulogger + + + def _log_to_redis(self, message): + """将日志写入 Redis 列表""" + try: + self.redis_client.rpush(f"nlp.logger.{config.env_version}.log", json.dumps({"message": message})) + except redis.exceptions.ConnectionError as e: + logger.error(f"write {message} Redis connection error: {e}") + except redis.exceptions.TimeoutError as e: + logger.error(f"write {message} Redis operation timed out: {e}") + except Exception as e: + logger.error(f"write {message} Unexpected error: {e}") + + def _reset_log_handler(self): + for log in LOG_NAME: + logger = logging.getLogger(log) + logger.handlers = [InterceptHandler()] + + def getlogger(self): + return self.logger + +logger = Logging().getlogger() + diff --git a/utils/loop_frame_tool.py b/utils/loop_frame_tool.py new file mode 100644 index 0000000..9343cab --- /dev/null +++ b/utils/loop_frame_tool.py @@ -0,0 +1,318 @@ +from utils.log import logger + + +def play_in_loop_v2( + segments, + startfrom, + batch_num, + last_direction, + is_silent, + is_silent_, + first_speak, + last_speak, +): + """ + batch_num: 初始和结束,每一帧都这么判断 + 1、静默时,在静默段循环, 左边界正向,右边界反向, 根据上一次方向和位置,给出新的方向和位置 + 2、静默转说话: 就近到说话段,pre_falg, post_flag, 都为true VS 其中一个为true + 3、说话转静默: 动作段播完,再进入静默(如果还在持续说话,静默段不循环) + 4、在整个视频左端点: 开始端只能正向,静默时循环,说话时走2 + 5、在整个视频右端点: 开始时只能反向,静默时循环,说话时走2 + 6、根据方向获取batch_num 数量的视频帧,return batch_idxes, current_direction + Args: + segments: 循环帧配置 [[st, ed, True], ...] + startfrom: cur_pos + batch_num: 5 + last_direction: 0反向1正向 + is_silent: 0说话态1动作态 + is_silent_: 目前不明确,后面可能废弃 + first_speak: 记录是不是第一次讲话 + last_speak: 记录是不是讲话结束那一刻 + """ + frames = [] + cur_pos = startfrom + cur_direction = last_direction + is_first_speak_frame = first_speak + is_last_speak_frame = True if last_speak and batch_num == 1 else False + while batch_num != 0: + # 获取当前帧的所在子分割区间 + sub_seg_idx = subseg_judge(cur_pos, segments) + # 获取移动方向 + next_direction, next_pos = get_next_direction( + segments, + cur_pos, + cur_direction, + is_silent, + sub_seg_idx, + is_first_speak_frame, + is_last_speak_frame, + ) + # 获取指定方向的帧 + next_pos = get_next_frame(next_pos, next_direction) + frames.append(next_pos) + batch_num -= 1 + is_first_speak_frame = ( + True if first_speak and batch_num == config.batch_size else False + ) + is_last_speak_frame = True if last_speak and batch_num == 1 else False + + cur_direction = next_direction + cur_pos = next_pos + return frames, next_direction + + +def subseg_judge(cur_pos, segments): + for idx, frame_seg in enumerate(segments): + if cur_pos >= frame_seg[0] and cur_pos <= frame_seg[1]: + return idx + if cur_pos == 0: + return 0 + +def get_next_direction( + segments, + cur_pos, + cur_direction, + is_silent, + sub_seg_idx, + is_first_speak_frame: bool = False, + is_last_speak_frame: bool = False, +): + """ + 3.3.0 循环帧需求,想尽快走到预期状态 + if 动作段: + if 开始说话: + if 边界: + if 正向: + pass + else: + pass + else: + if 正向: + pass + else: + pass + elif 静默: + 同上 + elif 说话中: + 同上 + elif 说话结束: + 同上 + elif 静默段: + 同上 + Args: + is_first_speak_frame: 开始说话flag + is_last_speak_frame: 说话结束flag + """ + left, right, loop_flag = segments[sub_seg_idx] + if loop_flag: + if is_silent == 1: + next_direct, next_pos = pure_silent( + segments, left, right, cur_pos, cur_direction, sub_seg_idx + ) + logger.debug( + f"cur_pos:{cur_pos}, next_direct:{next_direct}, is_first_speak_frame:{is_first_speak_frame}" + ) + elif is_silent == 0: + next_direct, next_pos = silent2action( + segments, + left, + right, + cur_pos, + cur_direction, + sub_seg_idx, + is_first_speak_frame, + ) + logger.debug( + f"cur_pos:{cur_pos}, next_direct:{next_direct}, is_first_speak_frame{is_first_speak_frame}" + ) + else: + if is_silent == 1: + next_direct, next_pos = action2silent( + segments, + left, + right, + cur_pos, + cur_direction, + sub_seg_idx, + is_last_speak_frame, + ) + logger.debug( + f"cur_pos{cur_pos}, next_direct:{next_direct},is_first_speak_frame{is_first_speak_frame},is_last_speak_frame:{is_last_speak_frame}" + ) + elif is_silent == 0: + next_direct, next_pos = pure_action( + segments, + left, + right, + cur_pos, + cur_direction, + sub_seg_idx, + is_last_speak_frame, + ) + logger.debug( + f"cur_pos:{cur_pos}, next_direct:{next_direct},is_first_speak_frame{is_first_speak_frame}, is_last_speak_frame:{is_last_speak_frame}" + ) + return next_direct, next_pos + +def get_next_frame(cur_pos, cur_direction): + """根据当前帧和方向,获取下一帧,这里应该保证方向上的帧是一定能取到的 + 不需要再做额外的边界判断 + """ + # 正向 + if cur_direction == 1: + return cur_pos + 1 + # 反向 + elif cur_direction == 0: + return cur_pos - 1 + +def pure_silent(segments, left, right, cur_pos, cur_direction, sub_seg_idx): + """ + loop_flag == True and is_silent==1 + whether border + whether forward + Return: + next_direction + """ + # 左边界正向,右边界反向 + if cur_pos == segments[0][0]: + return 1, cur_pos + if cur_pos == segments[-1][1]: + return 0, cur_pos + # 右边界,反向 + if cur_pos == right: + return 0, cur_pos + # 左边界,正向 + if cur_pos == left: + return 1, cur_pos + # 非边界,之前正向,则继续正向,否则反向 + if cur_pos > left and cur_direction == 1: + return 1, cur_pos + else: + return 0, cur_pos + + +def pure_action( + segments, left, right, cur_pos, cur_direction, sub_seg_idx, is_last_speak_frame +): + """ + loop_flag ==False and is_silent == 0 + 动作播完,正向到静默段 (存在跳段行为) + whether border + whether forward # 正播反播 + Args: + is_last_speak_frame: 最后说话结束时刻 + Return: next_direction + """ + if cur_pos == segments[0][0]: + return 1, cur_pos + if cur_pos == segments[-1][1]: + return 0, cur_pos + + if is_last_speak_frame: + # 动作段在末尾,向前找静默 + if sub_seg_idx == len(segments) - 1: + return 0, cur_pos + # 动作段在开始, 向后 + if sub_seg_idx == 0: + return 1, cur_pos + # 动作段在中间,就近原则 + mid = left + (right - left + 1) // 2 + # 就近原则优先 + if cur_pos < mid: + return 0, cur_pos + else: + return 1, cur_pos + + else: + # 其他情况,播放方向一致 + if cur_direction == 1: + return 1, cur_pos + else: + return 0, cur_pos + + +def silent2action( + segments, + left, + right, + cur_pos, + cur_direction, + sub_seg_idx, + is_first_speak_frame: bool = False, +): + """ + 在静默区间但是在讲话 + loop_flag=True and is_silent == 0 + whether border + whether forward + + Return: next_direction + """ + # 向最近的动作段移动, 如果左面没有动作段 + # TODO: 确认下面逻辑是否正确 + if ( + cur_pos == segments[0][0] + ): # 如果发生过跳跃,新段无论是不是动作段,仍然都向后执行 + return 1, cur_pos + if cur_pos == segments[-1][1]: + return 0, cur_pos + # 在静默左边界处,且仍在讲话 + if cur_pos == left: + if cur_direction == 1: + return 1, cur_pos + else: + return 0, cur_pos + # 在静默右边界处,且仍在讲话 + elif cur_pos == right: + if cur_direction == 1: + return 1, cur_pos + else: + return 0, cur_pos + else: + mid = left + (right - left + 1) // 2 + # !!就近原则只对第一次说话有效,其他情况遵循上一次状态 + if is_first_speak_frame: + # 如果第一段 + if sub_seg_idx == 0 and segments[0][2]: + return 1, cur_pos + # 如果最后一段 + elif sub_seg_idx == len(segments) - 1 and segments[-1][2]: + return 0, cur_pos + + if cur_pos < mid: + return 0, cur_pos + else: + return 1, cur_pos + else: + if cur_direction == 1: + return 1, cur_pos + elif cur_direction == 0: + return 0, cur_pos + + +def action2silent( + segments, left, right, cur_pos, cur_direction, sub_seg_idx, is_last_speak_frame +): + """ + loop_flag=False and is_silent==1 + whether border + Return: next_direction + """ + if cur_pos == segments[0][0]: + return 1, cur_pos + if cur_pos == segments[-1][1]: + return 0, cur_pos + # 动作段,说话结束转静默情况下,就近原则,进入静默 + if is_last_speak_frame: + mid = left + (right - left + 1) // 2 + if cur_pos < mid: + return 0, cur_pos + else: + return 1, cur_pos + + else: + if cur_direction == 1: + return 1, cur_pos + else: + return 0, cur_pos +