2024-10-21 11:55:04 +00:00
|
|
|
|
'''
|
2024-10-20 17:37:45 +00:00
|
|
|
|
import pygame
|
|
|
|
|
import time
|
|
|
|
|
from pydub import AudioSegment
|
|
|
|
|
from collections import deque
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AudioVisualSync:
|
|
|
|
|
def __init__(self, audio_file, image_files):
|
|
|
|
|
# 初始化 pygame
|
|
|
|
|
pygame.init()
|
|
|
|
|
|
|
|
|
|
# 加载音频
|
|
|
|
|
self.audio = AudioSegment.from_file(audio_file)
|
|
|
|
|
self.audio_length = len(self.audio) / 1000.0 # 音频总时长(秒)
|
|
|
|
|
|
|
|
|
|
# 切分音频为 20 毫秒的片段
|
|
|
|
|
self.audio_segments = []
|
|
|
|
|
segment_duration = 20 # 每个片段 20 毫秒
|
|
|
|
|
|
|
|
|
|
for start in range(0, len(self.audio), segment_duration):
|
|
|
|
|
end = min(start + segment_duration, len(self.audio))
|
|
|
|
|
segment = self.audio[start:end]
|
|
|
|
|
self.audio_segments.append(segment)
|
|
|
|
|
|
|
|
|
|
# 加载图像并创建图像队列
|
|
|
|
|
self.image_queue = deque()
|
|
|
|
|
frame_duration = 0.020 # 每帧 20 毫秒(0.020 秒)
|
|
|
|
|
|
|
|
|
|
for index in range(len(self.audio_segments)):
|
|
|
|
|
timestamp = index * frame_duration # 计算每帧的时间戳
|
|
|
|
|
img_index = index % len(image_files) # 循环使用图像
|
|
|
|
|
frame = pygame.image.load(image_files[img_index])
|
|
|
|
|
self.image_queue.append((timestamp, frame))
|
|
|
|
|
|
|
|
|
|
self.current_frame = None
|
|
|
|
|
|
|
|
|
|
# 创建窗口
|
|
|
|
|
self.screen = pygame.display.set_mode((800, 600))
|
|
|
|
|
pygame.display.set_caption("Audio Visual Sync")
|
|
|
|
|
|
|
|
|
|
# 播放音频
|
|
|
|
|
self.play_audio()
|
|
|
|
|
|
|
|
|
|
def play_audio(self):
|
|
|
|
|
pygame.mixer.init()
|
|
|
|
|
pygame.mixer.music.load(audio_file)
|
|
|
|
|
pygame.mixer.music.play()
|
|
|
|
|
|
|
|
|
|
# 开始同步
|
|
|
|
|
self.run()
|
|
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
|
clock = pygame.time.Clock()
|
|
|
|
|
|
|
|
|
|
while pygame.mixer.music.get_busy(): # 当音乐正在播放时
|
|
|
|
|
audio_position = pygame.mixer.music.get_pos() / 1000.0 # 当前音频播放时间(秒)
|
|
|
|
|
self.update_image(audio_position)
|
|
|
|
|
|
|
|
|
|
# 处理 pygame 事件
|
|
|
|
|
for event in pygame.event.get():
|
|
|
|
|
if event.type == pygame.QUIT:
|
|
|
|
|
pygame.quit()
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
# 更新显示
|
|
|
|
|
pygame.display.flip()
|
|
|
|
|
clock.tick(60) # 控制帧率为 60 FPS
|
|
|
|
|
|
|
|
|
|
# 音频播放完毕后关闭窗口
|
|
|
|
|
pygame.quit()
|
|
|
|
|
|
|
|
|
|
def update_image(self, audio_position):
|
|
|
|
|
# 查找应该显示的图像
|
|
|
|
|
while self.image_queue:
|
|
|
|
|
timestamp, frame = self.image_queue[0] # 获取队列中的第一个元素
|
|
|
|
|
time_difference = audio_position - timestamp
|
|
|
|
|
|
|
|
|
|
if time_difference >= 0: # 当前音频时间已到该帧
|
|
|
|
|
self.current_frame = frame
|
|
|
|
|
self.image_queue.popleft() # 移除已显示的帧
|
|
|
|
|
else:
|
|
|
|
|
break # 当前音频时间未到该帧,退出循环
|
|
|
|
|
|
|
|
|
|
# 如果当前帧不为空,则更新显示
|
|
|
|
|
if self.current_frame is not None:
|
|
|
|
|
# 清屏并绘制当前图像
|
|
|
|
|
self.screen.fill((0, 0, 0)) # 填充黑色背景
|
|
|
|
|
self.screen.blit(self.current_frame, (0, 0))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 使用示例
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
audio_file = "your_audio_file.mp3" # 替换为你的音频文件
|
|
|
|
|
image_files = ["image1.png", "image2.png", "image3.png"] # 替换为你的图像文件
|
|
|
|
|
sync_controller = AudioVisualSync(audio_file, image_files)
|
2024-10-21 11:55:04 +00:00
|
|
|
|
'''
|
|
|
|
|
|
|
|
|
|
import threading
|
|
|
|
|
import time
|
|
|
|
|
import queue
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MediaPlayer:
|
|
|
|
|
def __init__(self, audio_queue, video_queue):
|
|
|
|
|
self.audio_queue = audio_queue
|
|
|
|
|
self.video_queue = video_queue
|
|
|
|
|
self.sync_threshold = 0.01 # 10ms 同步阈值
|
|
|
|
|
self.audio_playing = True
|
|
|
|
|
self.video_playing = True
|
|
|
|
|
|
|
|
|
|
def play_audio(self):
|
|
|
|
|
while self.audio_playing:
|
|
|
|
|
if not self.audio_queue.empty():
|
|
|
|
|
audio_frame = self.audio_queue.get() # 获取音频帧
|
|
|
|
|
audio_timestamp = audio_frame['timestamp'] # 获取音频时间戳
|
|
|
|
|
print(f"Playing audio frame with timestamp: {audio_timestamp}")
|
|
|
|
|
time.sleep(0.02) # 假设每帧播放时间20ms
|
|
|
|
|
|
|
|
|
|
def play_video(self):
|
|
|
|
|
while self.video_playing:
|
|
|
|
|
if not self.video_queue.empty():
|
|
|
|
|
video_frame = self.video_queue.queue[0] # 获取当前视频帧但不出队
|
|
|
|
|
video_timestamp = video_frame['timestamp']
|
|
|
|
|
|
|
|
|
|
if not self.audio_queue.empty():
|
|
|
|
|
audio_frame = self.audio_queue.queue[0] # 获取音频队列中的第一个音频帧
|
|
|
|
|
audio_timestamp = audio_frame['timestamp']
|
|
|
|
|
|
|
|
|
|
# 视频快了,等待
|
|
|
|
|
if video_timestamp - audio_timestamp > self.sync_threshold:
|
|
|
|
|
time.sleep(0.01)
|
|
|
|
|
|
|
|
|
|
# 视频慢了,丢弃帧
|
|
|
|
|
elif audio_timestamp - video_timestamp > self.sync_threshold:
|
|
|
|
|
print(f"Dropping video frame with timestamp: {video_timestamp}")
|
|
|
|
|
self.video_queue.get() # 丢弃当前帧
|
|
|
|
|
else:
|
|
|
|
|
self.video_queue.get() # 播放当前帧
|
|
|
|
|
print(f"Playing video frame with timestamp: {video_timestamp}")
|
|
|
|
|
time.sleep(0.02) # 假设每帧播放时间20ms
|
|
|
|
|
|
|
|
|
|
def start(self):
|
|
|
|
|
audio_thread = threading.Thread(target=self.play_audio)
|
|
|
|
|
video_thread = threading.Thread(target=self.play_video)
|
|
|
|
|
audio_thread.start()
|
|
|
|
|
video_thread.start()
|
|
|
|
|
audio_thread.join()
|
|
|
|
|
video_thread.join()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
# 初始化音频和视频队列,每个帧包含时间戳
|
|
|
|
|
audio_queue = queue.Queue()
|
|
|
|
|
video_queue = queue.Queue()
|
|
|
|
|
|
|
|
|
|
# 填充一些模拟数据
|
|
|
|
|
for i in range(100):
|
|
|
|
|
audio_queue.put({'timestamp': i * 0.02})
|
|
|
|
|
video_queue.put({'timestamp': i * 0.02})
|
|
|
|
|
|
|
|
|
|
player = MediaPlayer(audio_queue, video_queue)
|
|
|
|
|
player.start()
|