How to support rtmp streaming with multi-channel pipe:?
Friends, I hope to load the mp4 file, separate the video stream and the audio stream, perform stt/tts processing on the audio stream, and then merge and push the rtmp server. But the actual situation is that there are multiple pipes: The channel cannot play anything. output_process.stdin.write(video_frame) alone Or output_process.stdin.write(audio_data) is normal. Thank you for your help
`import numpy as np import ffmpeg import torch
out_url = 'rtmp://localhost:1935/live/stream'
def main(): try: input_stream = ffmpeg.input( './data/test.mp4', re=None) video_input = ( input_stream.video .output('pipe:', format='rawvideo', pix_fmt='yuv420p', s='854x480', r=30, vsync='cfr') .run_async(pipe_stdout=True) ) audio_input = ( input_stream.audio .output('pipe:', format='s16le', acodec='pcm_s16le', ac=1, ar=16000) .run_async(pipe_stdout=True) ) output_process = ( ffmpeg .output( ffmpeg.input('pipe:', format='rawvideo',pix_fmt='yuv420p', s='854x480'), # ffmpeg.input('pipe:', format='s16le', ac=1, ar=16000), 'rtmp://localhost:1935/live/stream', vcodec='libx264', tune='zerolatency', acodec='aac', preset='ultrafast', f='flv', ) .overwrite_output() .run_async(pipe_stdin=True) ) chunk_size_ms = 4000 buffer_size = int(16000 * 2 * chunk_size_ms / 1000) frame_size = 854 * 480 * 3 // 2
while True:
try:
video_frame = video_input.stdout.read(frame_size)
# audio_data = audio_input.stdout.read(buffer_size)
except Exception as e:
print(f"读取数据时出错: {e}")
break
if video_frame:
try:
output_process.stdin.write(video_frame)
except BrokenPipeError as e:
print(f"视频写入失败: {e}")
break
# if audio_data:
# try:
# output_process.stdin.write(audio_data)
# except BrokenPipeError as e:
# print(f"音频写入失败: {e}")
# break
# 终止条件检查
# if not video_frame and not audio_data:
# break
except Exception as e:
print(f"发生异常: {e}")
finally:
# 确保所有进程正确关闭
if 'audio_input' in locals():
audio_input.terminate()
audio_input.wait()
if 'video_input' in locals():
video_input.terminate()
video_input.wait()
if 'output_process' in locals():
try:
output_process.stdin.close()
except Exception as e:
print(f"关闭输出进程输入管道时出错: {e}")
output_process.terminate()
output_process.wait()
if name == 'main': main()`