Files
rtsp-video-analysis-system/ruoyi-video/src/main/java/com/ruoyi/video/thread/MediaTransferFlvByFFmpeg.java
砂糖 7f07552ecc init
2025-09-26 11:55:38 +08:00

588 lines
19 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.ruoyi.video.thread;
import cn.hutool.core.collection.CollUtil;
import com.ruoyi.video.common.ClientType;
import com.ruoyi.video.common.MediaConstant;
import com.ruoyi.video.domain.dto.CameraDto;
import com.ruoyi.video.service.MediaService;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.bytedeco.javacv.FrameGrabber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @Author: orange
* @CreateTime: 2025-01-16
*/
@Slf4j
public class MediaTransferFlvByFFmpeg extends MediaTransfer {
/**
* ws客户端
*/
private ConcurrentHashMap<String, ChannelHandlerContext> wsClients = new ConcurrentHashMap<>();
/**
* http客户端
*/
private ConcurrentHashMap<String, ChannelHandlerContext> httpClients = new ConcurrentHashMap<>();
/**
* flv header
*/
private byte[] header = null;
/**
* 相机
*/
private CameraDto cameraDto;
private List<String> command = new ArrayList<>();
private ServerSocket tcpServer = null;
private Process process;
private Thread inputThread;
private Thread errThread;
private Thread outputThread;
private Thread listenThread;
private boolean running = false; // 启动
private boolean enableLog = true;
private int hcSize, wcSize = 0;
// 记录当前
long currentTimeMillis = System.currentTimeMillis();
/**
* 用于没有客户端时候的计时
*/
private int noClient = 0;
public MediaTransferFlvByFFmpeg(final String executable) {
command.add(executable);
buildCommand();
}
public MediaTransferFlvByFFmpeg(CameraDto cameraDto) {
command.add(System.getProperty(MediaConstant.ffmpegPathKey));
this.cameraDto = cameraDto;
buildCommand();
}
public MediaTransferFlvByFFmpeg(final String executable, CameraDto cameraDto) {
command.add(executable);
this.cameraDto = cameraDto;
buildCommand();
}
public MediaTransferFlvByFFmpeg(final String executable, CameraDto cameraDto, boolean enableLog) {
command.add(executable);
this.cameraDto = cameraDto;
this.enableLog = enableLog;
buildCommand();
}
public boolean isEnableLog() {
return enableLog;
}
public void setEnableLog(boolean enableLog) {
this.enableLog = enableLog;
}
public boolean isRunning() {
return running;
}
public void setRunning(boolean running) {
this.running = running;
}
private MediaTransferFlvByFFmpeg addArgument(String argument) {
command.add(argument);
return this;
}
/**
* 构建ffmpeg命令
*/
private void buildCommand() {
this.addArgument("-rtsp_transport").addArgument("tcp").addArgument("-i").addArgument(cameraDto.getUrl())
.addArgument("-max_delay").addArgument("1")
// .addArgument("-strict").addArgument("experimental")
.addArgument("-g").addArgument("25").addArgument("-r").addArgument("25")
// .addArgument("-b").addArgument("200000")
// .addArgument("-filter_complex").addArgument("setpts='(RTCTIME - RTCSTART) / (TB * 1000000)'")
.addArgument("-c:v").addArgument("libx264").addArgument("-preset:v").addArgument("ultrafast")
// .addArgument("-preset:v").addArgument("fast")
.addArgument("-tune:v").addArgument("zerolatency")
// .addArgument("-crf").addArgument("26")
.addArgument("-c:a").addArgument("aac")
// .addArgument("-qmin").addArgument("28")
// .addArgument("-qmax").addArgument("32")
// .addArgument("-b:v").addArgument("448k")
// .addArgument("-b:a").addArgument("64k")
.addArgument("-f").addArgument("flv");
}
// private void buildCommand() {
// this
//// .addArgument("-rtsp_transport").addArgument("tcp")
// .addArgument("-i").addArgument(camera.getUrl())
// .addArgument("-max_delay").addArgument("100")
//// .addArgument("-strict").addArgument("experimental")
// .addArgument("-g").addArgument("10")
//// .addArgument("-r").addArgument("25")
//// .addArgument("-b").addArgument("200000")
//// .addArgument("-filter_complex").addArgument("setpts='(RTCTIME - RTCSTART) / (TB * 1000000)'")
// .addArgument("-c:v").addArgument("libx264")
// .addArgument("-preset:v").addArgument("ultrafast")
// .addArgument("-tune:v").addArgument("zerolatency")
//// .addArgument("-crf").addArgument("26")
// .addArgument("-c:a").addArgument("aac")
// .addArgument("-qmin").addArgument("28")
// .addArgument("-qmax").addArgument("32")
// .addArgument("-b:v").addArgument("448k")
// .addArgument("-b:a").addArgument("64k")
// .addArgument("-f").addArgument("flv");
// }
/**
* 执行推流
*
* @return
*/
public MediaTransferFlvByFFmpeg execute() {
String output = getOutput();
command.add(output);
String join = CollUtil.join(command, " ");
log.info(join);
try {
process = new ProcessBuilder(command).start();
running = true;
listenNetTimeout();
dealStream(process);
outputData();
listenClient();
} catch (IOException e) {
e.printStackTrace();
}
return this;
}
/**
* flv数据
*/
private void outputData() {
outputThread = new Thread(new Runnable() {
public void run() {
Socket client = null;
try {
client = tcpServer.accept();
DataInputStream input = new DataInputStream(client.getInputStream());
byte[] buffer = new byte[1024];
int len = 0;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
while (running) {
len = input.read(buffer);
if (len == -1) {
break;
}
bos.write(buffer, 0, len);
if (header == null) {
header = bos.toByteArray();
// System.out.println(HexUtil.encodeHexStr(header));
bos.reset();
continue;
}
// 帧数据
byte[] data = bos.toByteArray();
bos.reset();
// 发送到前端
sendFrameData(data);
}
try {
client.close();
} catch (java.lang.Exception e) {
}
try {
input.close();
} catch (java.lang.Exception e) {
}
try {
bos.close();
} catch (java.lang.Exception e) {
}
log.info("关闭媒体流-ffmpeg{} ", cameraDto.getUrl());
} catch (SocketTimeoutException e1) {
// e1.printStackTrace();
// 超时关闭
} catch (IOException e) {
// e.printStackTrace();
} finally {
MediaService.cameras.remove(cameraDto.getMediaKey());
running = false;
process.destroy();
try {
if (null != client) {
client.close();
}
} catch (IOException e) {
}
try {
if (null != tcpServer) {
tcpServer.close();
}
} catch (IOException e) {
}
}
}
});
outputThread.start();
}
/**
* 监听客户端
*/
public void listenClient() {
listenThread = new Thread(new Runnable() {
public void run() {
while (running) {
hasClient();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
});
listenThread.start();
}
/**
* 监听网络异常超时
*/
public void listenNetTimeout() {
Thread listenNetTimeoutThread = new Thread(new Runnable() {
public void run() {
while (true) {
if ((System.currentTimeMillis() - currentTimeMillis) > 15000) {
log.info("网络异常超时");
MediaService.cameras.remove(cameraDto.getMediaKey());
stopFFmpeg();
break;
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
}
}
});
listenNetTimeoutThread.setDaemon(true);
listenNetTimeoutThread.start();
}
public static MediaTransferFlvByFFmpeg atPath() {
return atPath(null);
}
public static MediaTransferFlvByFFmpeg atPath(final String absPath) {
final String executable;
if (absPath != null) {
executable = absPath;
} else {
executable = System.getProperty(MediaConstant.ffmpegPathKey);
}
return new MediaTransferFlvByFFmpeg(executable);
}
/**
* 控制台输出
*
* @param process
*/
private void dealStream(Process process) {
if (process == null) {
return;
}
// 处理InputStream的线程
inputThread = new Thread() {
@Override
public void run() {
BufferedReader in = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line = null;
try {
while (running) {
line = in.readLine();
currentTimeMillis = System.currentTimeMillis();
if (line == null) {
break;
}
if (enableLog) {
log.info("output: " + line);
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
running = false;
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
};
// 处理ErrorStream的线程
errThread = new Thread() {
@Override
public void run() {
BufferedReader err = new BufferedReader(new InputStreamReader(process.getErrorStream()));
String line = null;
try {
while (running) {
line = err.readLine();
currentTimeMillis = System.currentTimeMillis();
if (line == null) {
break;
}
if (enableLog) {
log.info("err: " + line);
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
running = false;
err.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
};
inputThread.start();
errThread.start();
}
/**
* 输出到tcp
*
* @return
*/
private String getOutput() {
try {
tcpServer = new ServerSocket(0, 1, InetAddress.getLoopbackAddress());
StringBuffer sb = new StringBuffer();
sb.append("tcp://");
sb.append(tcpServer.getInetAddress().getHostAddress());
sb.append(":");
sb.append(tcpServer.getLocalPort());
tcpServer.setSoTimeout(10000);
return sb.toString();
} catch (IOException e) {
e.printStackTrace();
}
new RuntimeException("无法启用端口");
return "";
}
/**
* 关闭
*/
public void stopFFmpeg() {
this.running = false;
try {
this.process.destroy();
log.info("关闭媒体流-ffmpeg{} ", cameraDto.getUrl());
} catch (java.lang.Exception e) {
process.destroyForcibly();
}
// 媒体异常时,主动断开前端长连接
for (Map.Entry<String, ChannelHandlerContext> entry : wsClients.entrySet()) {
try {
entry.getValue().close();
} catch (java.lang.Exception e) {
} finally {
wsClients.remove(entry.getKey());
}
}
for (Map.Entry<String, ChannelHandlerContext> entry : httpClients.entrySet()) {
try {
entry.getValue().close();
} catch (java.lang.Exception e) {
} finally {
httpClients.remove(entry.getKey());
}
}
}
/**
* 关闭流
*
* @return
*/
public void hasClient() {
int newHcSize = httpClients.size();
int newWcSize = wsClients.size();
if (hcSize != newHcSize || wcSize != newWcSize) {
hcSize = newHcSize;
wcSize = newWcSize;
log.info("\r\n{}\r\nhttp连接数{}, ws连接数{} \r\n", cameraDto.getUrl(), newHcSize, newWcSize);
}
// 无需自动关闭
if (!cameraDto.isAutoClose()) {
return;
}
if (httpClients.isEmpty() && wsClients.isEmpty()) {
// 等待20秒还没有客户端则关闭推流
if (noClient > cameraDto.getNoClientsDuration()) {
running = false;
MediaService.cameras.remove(cameraDto.getMediaKey());
} else {
noClient += 1000;
}
} else {
// 重置计时
noClient = 0;
}
}
/**
* 发送帧数据
*
* @param data
*/
private void sendFrameData(byte[] data) {
// ws
for (Map.Entry<String, ChannelHandlerContext> entry : wsClients.entrySet()) {
try {
if (entry.getValue().channel().isWritable()) {
entry.getValue().writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(data)));
} else {
wsClients.remove(entry.getKey());
hasClient();
}
} catch (java.lang.Exception e) {
wsClients.remove(entry.getKey());
hasClient();
e.printStackTrace();
}
}
// http
for (Map.Entry<String, ChannelHandlerContext> entry : httpClients.entrySet()) {
try {
if (entry.getValue().channel().isWritable()) {
entry.getValue().writeAndFlush(Unpooled.copiedBuffer(data));
} else {
httpClients.remove(entry.getKey());
hasClient();
}
} catch (java.lang.Exception e) {
httpClients.remove(entry.getKey());
hasClient();
e.printStackTrace();
}
}
}
/**
* 新增客户端
*
* @param ctx netty client
* @param ctype enum,ClientType
*/
public void addClient(ChannelHandlerContext ctx, ClientType ctype) {
int timeout = 0;
while (true) {
try {
if (header != null) {
try {
if (ctx.channel().isWritable()) {
// 发送帧前先发送header
if (ClientType.HTTP.getType() == ctype.getType()) {
ChannelFuture future = ctx.writeAndFlush(Unpooled.copiedBuffer(header));
future.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws FrameGrabber.Exception {
if (future.isSuccess()) {
httpClients.put(ctx.channel().id().toString(), ctx);
}
}
});
} else if (ClientType.WEBSOCKET.getType() == ctype.getType()) {
ChannelFuture future = ctx
.writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(header)));
future.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws FrameGrabber.Exception {
if (future.isSuccess()) {
wsClients.put(ctx.channel().id().toString(), ctx);
}
}
});
}
}
} catch (java.lang.Exception e) {
e.printStackTrace();
}
break;
}
// 等待推拉流启动
Thread.sleep(50);
// 启动录制器失败
timeout += 50;
if (timeout > 30000) {
break;
}
} catch (java.lang.Exception e) {
e.printStackTrace();
}
}
}
}