human/test/test_render_queue.py
2024-10-21 19:55:04 +08:00

164 lines
5.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

'''
import pygame
import time
from pydub import AudioSegment
from collections import deque
class AudioVisualSync:
def __init__(self, audio_file, image_files):
# 初始化 pygame
pygame.init()
# 加载音频
self.audio = AudioSegment.from_file(audio_file)
self.audio_length = len(self.audio) / 1000.0 # 音频总时长(秒)
# 切分音频为 20 毫秒的片段
self.audio_segments = []
segment_duration = 20 # 每个片段 20 毫秒
for start in range(0, len(self.audio), segment_duration):
end = min(start + segment_duration, len(self.audio))
segment = self.audio[start:end]
self.audio_segments.append(segment)
# 加载图像并创建图像队列
self.image_queue = deque()
frame_duration = 0.020 # 每帧 20 毫秒0.020 秒)
for index in range(len(self.audio_segments)):
timestamp = index * frame_duration # 计算每帧的时间戳
img_index = index % len(image_files) # 循环使用图像
frame = pygame.image.load(image_files[img_index])
self.image_queue.append((timestamp, frame))
self.current_frame = None
# 创建窗口
self.screen = pygame.display.set_mode((800, 600))
pygame.display.set_caption("Audio Visual Sync")
# 播放音频
self.play_audio()
def play_audio(self):
pygame.mixer.init()
pygame.mixer.music.load(audio_file)
pygame.mixer.music.play()
# 开始同步
self.run()
def run(self):
clock = pygame.time.Clock()
while pygame.mixer.music.get_busy(): # 当音乐正在播放时
audio_position = pygame.mixer.music.get_pos() / 1000.0 # 当前音频播放时间(秒)
self.update_image(audio_position)
# 处理 pygame 事件
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
return
# 更新显示
pygame.display.flip()
clock.tick(60) # 控制帧率为 60 FPS
# 音频播放完毕后关闭窗口
pygame.quit()
def update_image(self, audio_position):
# 查找应该显示的图像
while self.image_queue:
timestamp, frame = self.image_queue[0] # 获取队列中的第一个元素
time_difference = audio_position - timestamp
if time_difference >= 0: # 当前音频时间已到该帧
self.current_frame = frame
self.image_queue.popleft() # 移除已显示的帧
else:
break # 当前音频时间未到该帧,退出循环
# 如果当前帧不为空,则更新显示
if self.current_frame is not None:
# 清屏并绘制当前图像
self.screen.fill((0, 0, 0)) # 填充黑色背景
self.screen.blit(self.current_frame, (0, 0))
# 使用示例
if __name__ == "__main__":
audio_file = "your_audio_file.mp3" # 替换为你的音频文件
image_files = ["image1.png", "image2.png", "image3.png"] # 替换为你的图像文件
sync_controller = AudioVisualSync(audio_file, image_files)
'''
import threading
import time
import queue
class MediaPlayer:
def __init__(self, audio_queue, video_queue):
self.audio_queue = audio_queue
self.video_queue = video_queue
self.sync_threshold = 0.01 # 10ms 同步阈值
self.audio_playing = True
self.video_playing = True
def play_audio(self):
while self.audio_playing:
if not self.audio_queue.empty():
audio_frame = self.audio_queue.get() # 获取音频帧
audio_timestamp = audio_frame['timestamp'] # 获取音频时间戳
print(f"Playing audio frame with timestamp: {audio_timestamp}")
time.sleep(0.02) # 假设每帧播放时间20ms
def play_video(self):
while self.video_playing:
if not self.video_queue.empty():
video_frame = self.video_queue.queue[0] # 获取当前视频帧但不出队
video_timestamp = video_frame['timestamp']
if not self.audio_queue.empty():
audio_frame = self.audio_queue.queue[0] # 获取音频队列中的第一个音频帧
audio_timestamp = audio_frame['timestamp']
# 视频快了,等待
if video_timestamp - audio_timestamp > self.sync_threshold:
time.sleep(0.01)
# 视频慢了,丢弃帧
elif audio_timestamp - video_timestamp > self.sync_threshold:
print(f"Dropping video frame with timestamp: {video_timestamp}")
self.video_queue.get() # 丢弃当前帧
else:
self.video_queue.get() # 播放当前帧
print(f"Playing video frame with timestamp: {video_timestamp}")
time.sleep(0.02) # 假设每帧播放时间20ms
def start(self):
audio_thread = threading.Thread(target=self.play_audio)
video_thread = threading.Thread(target=self.play_video)
audio_thread.start()
video_thread.start()
audio_thread.join()
video_thread.join()
if __name__ == "__main__":
# 初始化音频和视频队列,每个帧包含时间戳
audio_queue = queue.Queue()
video_queue = queue.Queue()
# 填充一些模拟数据
for i in range(100):
audio_queue.put({'timestamp': i * 0.02})
video_queue.put({'timestamp': i * 0.02})
player = MediaPlayer(audio_queue, video_queue)
player.start()