''' import pygame import time from pydub import AudioSegment from collections import deque class AudioVisualSync: def __init__(self, audio_file, image_files): # 初始化 pygame pygame.init() # 加载音频 self.audio = AudioSegment.from_file(audio_file) self.audio_length = len(self.audio) / 1000.0 # 音频总时长(秒) # 切分音频为 20 毫秒的片段 self.audio_segments = [] segment_duration = 20 # 每个片段 20 毫秒 for start in range(0, len(self.audio), segment_duration): end = min(start + segment_duration, len(self.audio)) segment = self.audio[start:end] self.audio_segments.append(segment) # 加载图像并创建图像队列 self.image_queue = deque() frame_duration = 0.020 # 每帧 20 毫秒(0.020 秒) for index in range(len(self.audio_segments)): timestamp = index * frame_duration # 计算每帧的时间戳 img_index = index % len(image_files) # 循环使用图像 frame = pygame.image.load(image_files[img_index]) self.image_queue.append((timestamp, frame)) self.current_frame = None # 创建窗口 self.screen = pygame.display.set_mode((800, 600)) pygame.display.set_caption("Audio Visual Sync") # 播放音频 self.play_audio() def play_audio(self): pygame.mixer.init() pygame.mixer.music.load(audio_file) pygame.mixer.music.play() # 开始同步 self.run() def run(self): clock = pygame.time.Clock() while pygame.mixer.music.get_busy(): # 当音乐正在播放时 audio_position = pygame.mixer.music.get_pos() / 1000.0 # 当前音频播放时间(秒) self.update_image(audio_position) # 处理 pygame 事件 for event in pygame.event.get(): if event.type == pygame.QUIT: pygame.quit() return # 更新显示 pygame.display.flip() clock.tick(60) # 控制帧率为 60 FPS # 音频播放完毕后关闭窗口 pygame.quit() def update_image(self, audio_position): # 查找应该显示的图像 while self.image_queue: timestamp, frame = self.image_queue[0] # 获取队列中的第一个元素 time_difference = audio_position - timestamp if time_difference >= 0: # 当前音频时间已到该帧 self.current_frame = frame self.image_queue.popleft() # 移除已显示的帧 else: break # 当前音频时间未到该帧,退出循环 # 如果当前帧不为空,则更新显示 if self.current_frame is not None: # 清屏并绘制当前图像 self.screen.fill((0, 0, 0)) # 填充黑色背景 self.screen.blit(self.current_frame, (0, 0)) # 使用示例 if __name__ == "__main__": audio_file = "your_audio_file.mp3" # 替换为你的音频文件 image_files = ["image1.png", "image2.png", "image3.png"] # 替换为你的图像文件 sync_controller = AudioVisualSync(audio_file, image_files) ''' import threading import time import queue class MediaPlayer: def __init__(self, audio_queue, video_queue): self.audio_queue = audio_queue self.video_queue = video_queue self.sync_threshold = 0.01 # 10ms 同步阈值 self.audio_playing = True self.video_playing = True def play_audio(self): while self.audio_playing: if not self.audio_queue.empty(): audio_frame = self.audio_queue.get() # 获取音频帧 audio_timestamp = audio_frame['timestamp'] # 获取音频时间戳 print(f"Playing audio frame with timestamp: {audio_timestamp}") time.sleep(0.02) # 假设每帧播放时间20ms def play_video(self): while self.video_playing: if not self.video_queue.empty(): video_frame = self.video_queue.queue[0] # 获取当前视频帧但不出队 video_timestamp = video_frame['timestamp'] if not self.audio_queue.empty(): audio_frame = self.audio_queue.queue[0] # 获取音频队列中的第一个音频帧 audio_timestamp = audio_frame['timestamp'] # 视频快了,等待 if video_timestamp - audio_timestamp > self.sync_threshold: time.sleep(0.01) # 视频慢了,丢弃帧 elif audio_timestamp - video_timestamp > self.sync_threshold: print(f"Dropping video frame with timestamp: {video_timestamp}") self.video_queue.get() # 丢弃当前帧 else: self.video_queue.get() # 播放当前帧 print(f"Playing video frame with timestamp: {video_timestamp}") time.sleep(0.02) # 假设每帧播放时间20ms def start(self): audio_thread = threading.Thread(target=self.play_audio) video_thread = threading.Thread(target=self.play_video) audio_thread.start() video_thread.start() audio_thread.join() video_thread.join() if __name__ == "__main__": # 初始化音频和视频队列,每个帧包含时间戳 audio_queue = queue.Queue() video_queue = queue.Queue() # 填充一些模拟数据 for i in range(100): audio_queue.put({'timestamp': i * 0.02}) video_queue.put({'timestamp': i * 0.02}) player = MediaPlayer(audio_queue, video_queue) player.start()