Files
rtsp-video-analysis-system/ruoyi-video/src/main/java/com/ruoyi/video/thread/MediaTransferFlvByJavacv.java
2025-09-28 12:01:41 +08:00

737 lines
30 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.ruoyi.video.thread;
import com.ruoyi.video.common.ClientType;
import com.ruoyi.video.common.ModelManager;
import com.ruoyi.video.domain.Detection;
import com.ruoyi.video.domain.dto.CameraDto;
import com.ruoyi.video.service.MediaService;
import com.ruoyi.video.thread.detector.CompositeDetector;
import com.ruoyi.video.thread.detector.YoloDetector;
import com.ruoyi.video.utils.Overlay;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.bytedeco.ffmpeg.avcodec.AVPacket;
import org.bytedeco.ffmpeg.global.avcodec;
import org.bytedeco.ffmpeg.global.avutil;
import org.bytedeco.javacv.*;
import org.bytedeco.opencv.opencv_core.Mat;
import org.springframework.util.CollectionUtils;
import java.io.ByteArrayOutputStream;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.Collections;
import static org.bytedeco.opencv.global.opencv_core.CV_8UC3;
/**
* 推流(FLV) + JavaCV 解码/转码 + (可选)YOLO 检测叠框
* - 支持“窗口巡检”:在给定秒数内启用推理与统计,并通过 DetectionListener 回调让上层落库/告警
* - 播放开始可触发 10 秒试跑attachDetectionListener(jobId, deviceId, 10, listener)
*
* 依赖ModelManager / YoloDetector / CompositeDetector / Detection / Overlay / MediaService / CameraDto / ClientType
*
* @author orange
* @since 2025-01-16
*/
@Slf4j
public class MediaTransferFlvByJavacv extends MediaTransfer implements Runnable {
/* ===================== 内部回调/统计类型(如已外部定义,可移除) ===================== */
public interface DetectionListener {
/** 每次推理得到 detections 时回调(建议上层做节流) */
void onDetections(Long jobId, Long deviceId, List<Detection> detections, long frameTsMs);
/** 一个“窗口巡检”结束时回调(含统计数据) */
void onWindowFinished(Long jobId, Long deviceId, WindowStats stats);
}
@Data
public static class WindowStats {
private int frames;
private int detectedFrames;
private int objects;
private double maxScore;
private long startMs;
private long endMs;
}
/* ===================== FFmpeg/JavaCV 初始化 ===================== */
static {
avutil.av_log_set_level(avutil.AV_LOG_ERROR);
FFmpegLogCallback.set();
}
/* ===================== 原有字段 ===================== */
private final ConcurrentHashMap<String, ChannelHandlerContext> wsClients = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, ChannelHandlerContext> httpClients = new ConcurrentHashMap<>();
private volatile boolean running = false;
private boolean grabberStatus = false;
private boolean recorderStatus = false;
private int hcSize, wcSize = 0;
private int noClient = 0;
private byte[] header = null;
private final ByteArrayOutputStream bos = new ByteArrayOutputStream();
private FFmpegFrameGrabber grabber; // 拉流器
private FFmpegFrameRecorder recorder; // 推流录制器
/** true: 转复用false: 转码。启用检测时强制转码(要在像素上叠框) */
private boolean transferFlag = false;
private final CameraDto cameraDto;
private Thread listenThread;
/* ===================== 推理相关字段 ===================== */
// 外部开关:是否启用检测(默认启用;也可由任务/页面配置动态设置)
private boolean enableDetection = true;
private ModelManager modelManager;
private YoloDetector detector;
// 解码/推理/发送解耦
private final OpenCVFrameConverter.ToMat toMat = new OpenCVFrameConverter.ToMat();
private final OpenCVFrameConverter.ToMat matToFrameConverter = new OpenCVFrameConverter.ToMat();
private final AtomicReference<Mat> latestFrame = new AtomicReference<>();
private final AtomicReference<List<Detection>> latestDetections =
new AtomicReference<>(java.util.Collections.emptyList());
// 窗口巡检控制
private volatile boolean windowMode = false;
private volatile long windowEndMs = 0L;
private Long currentJobId;
private Long currentDeviceId;
private DetectionListener detectionListener;
private final WindowStats stats = new WindowStats();
// 导出最近一次“叠好框的帧”用于截图存证
private final AtomicReference<Mat> latestAnnotatedFrame = new AtomicReference<>();
public MediaTransferFlvByJavacv(CameraDto cameraDto) {
super();
this.cameraDto = cameraDto;
}
public void setRunning(boolean running) {
boolean prev = this.running;
this.running = running;
// 如果是从 true -> false则按“关闭”处理
if (prev && !running) {
try {
closeMedia(); // 内部会 stopWindowIfAny()、关闭连接等
} catch (Exception ignore) {}
}
}
/** 推荐的新接口:显式停止并释放资源 */
public void stop() {
setRunning(false);
}
/* ===================== 外部控制 API ===================== */
public boolean isRunning() { return running; }
public boolean isGrabberStatus() { return grabberStatus; }
public boolean isRecorderStatus() { return recorderStatus; }
public void setEnableDetection(boolean enable) { this.enableDetection = enable; }
/**
* 开启一个“窗口巡检”,持续 windowSeconds 秒;期间每次推理回调 onDetections结束时 onWindowFinished
*/
public void attachDetectionListener(Long jobId, Long deviceId, int windowSeconds, DetectionListener listener) {
if (windowSeconds <= 0 || listener == null) return;
this.currentJobId = jobId;
this.currentDeviceId = deviceId;
this.detectionListener = listener;
this.windowMode = true;
long now = System.currentTimeMillis();
this.stats.setStartMs(now);
this.windowEndMs = now + windowSeconds * 1000L;
this.stats.setFrames(0);
this.stats.setDetectedFrames(0);
this.stats.setObjects(0);
this.stats.setMaxScore(0.0);
log.info("[job:{} device:{}] window started {}s", jobId, deviceId, windowSeconds);
}
/** 主动结束当前窗口(可用于任务被中断的场景) */
public void stopWindowIfAny() {
if (!windowMode) return;
this.windowMode = false;
stats.setEndMs(System.currentTimeMillis());
if (detectionListener != null && currentJobId != null && currentDeviceId != null) {
try {
detectionListener.onWindowFinished(currentJobId, currentDeviceId, cloneStats(stats));
} catch (Exception ignore) {}
}
currentJobId = null;
currentDeviceId = null;
detectionListener = null;
log.info("window finished (stopWindowIfAny)");
}
/** 导出最近一次“叠好框的帧”(深拷贝),用于截图/存证。调用方负责释放 Mat */
public Mat getLatestAnnotatedFrameCopy() {
Mat src = latestAnnotatedFrame.get();
if (src == null || src.empty()) return null;
Mat copy = new Mat(src.rows(), src.cols(), src.type());
src.copyTo(copy);
return copy;
}
/* ===================== 初始化推理 ===================== */
private void initDetectors() throws Exception {
if (!enableDetection) return;
modelManager = new ModelManager();
URL json = getClass().getResource("/models/models.json");
modelManager.load(json);
// 你可按需切换单模型或多模型并行
// detector = modelManager.get("person-helmet");
detector = new CompositeDetector(
"all-models",
java.util.List.of(
modelManager.get("person-helmet"),
modelManager.get("vehicle-plate")
),
2 // 并行度
);
log.info("YOLO detectors ready: {}", detector.name());
// 预热一次,避免前几帧“无框”
try {
Frame warm = grabber != null ? grabber.grabImage() : null;
if (warm != null) {
Mat wm = toMat.convert(warm);
if (wm != null && !wm.empty()) {
long t0 = System.currentTimeMillis();
List<Detection> dets = detector.detect(wm);
long cost = System.currentTimeMillis() - t0;
latestDetections.set(dets);
log.info("Detector warm-up OK, cost={}ms, dets={}", cost,
CollectionUtils.isEmpty(dets) ? 0 : dets.size());
}
}
} catch (Throwable e) {
log.warn("Detector warm-up failed: {}", e.getMessage());
}
}
/* ===================== 拉流/推流 ===================== */
protected boolean createGrabber() {
grabber = new FFmpegFrameGrabber(cameraDto.getUrl());
String fiveSecUs = "5000000";
String oneMb = "1048576";
grabber.setOption("threads", "1");
grabber.setOption("buffer_size", oneMb);
grabber.setOption("rw_timeout", fiveSecUs);
grabber.setOption("stimeout", fiveSecUs);
grabber.setOption("probesize", "1048576");
grabber.setOption("analyzeduration", fiveSecUs);
grabber.setOption("fflags", "nobuffer");
grabber.setOption("flags", "low_delay");
grabber.setOption("loglevel", "error");
if (cameraDto.getUrl().toLowerCase().startsWith("rtsp://")) {
grabber.setOption("rtsp_transport", "tcp");
grabber.setOption("allowed_media_types", "video");
grabber.setOption("max_delay", "500000");
grabber.setOption("user_agent", "Lavf/60");
} else if (cameraDto.getUrl().toLowerCase().startsWith("rtmp://")) {
grabber.setOption("rtmp_buffer", "1000");
} else if ("desktop".equalsIgnoreCase(cameraDto.getUrl())) {
grabber.setFormat("gdigrab");
grabber.setOption("draw_mouse", "1");
grabber.setNumBuffers(0);
grabber.setOption("fflags", "nobuffer");
grabber.setOption("framerate", "25");
grabber.setFrameRate(25);
}
try {
grabber.start();
log.info("启动拉流器成功: {}", cameraDto.getUrl());
return (grabberStatus = true);
} catch (FrameGrabber.Exception e) {
MediaService.cameras.remove(cameraDto.getMediaKey());
log.error("启动拉流器失败: {} ({})", cameraDto.getUrl(), e.getMessage());
return (grabberStatus = false);
}
}
protected boolean createTransterOrRecodeRecorder() {
// 启用检测时必须转码(需要像素级叠框)
if (enableDetection) transferFlag = false;
recorder = new FFmpegFrameRecorder(bos, grabber.getImageWidth(), grabber.getImageHeight(),
grabber.getAudioChannels());
recorder.setFormat("flv");
if (!transferFlag) {
// 转码:低延迟 H.264
recorder.setInterleaved(false);
recorder.setVideoOption("tune", "zerolatency");
recorder.setVideoOption("preset", "ultrafast");
recorder.setVideoOption("crf", "26");
recorder.setVideoOption("threads", "1");
recorder.setFrameRate(25);
recorder.setGopSize(25);
recorder.setVideoCodec(avcodec.AV_CODEC_ID_H264);
recorder.setPixelFormat(avutil.AV_PIX_FMT_YUV420P);
recorder.setAudioCodec(avcodec.AV_CODEC_ID_AAC);
recorder.setTrellis(1);
recorder.setMaxDelay(0);
try {
recorder.start();
return recorderStatus = true;
} catch (FrameRecorder.Exception e1) {
log.info("启动转码录制器失败", e1);
MediaService.cameras.remove(cameraDto.getMediaKey());
}
} else {
// 转复用(仅不叠框时)
recorder.setCloseOutputStream(false);
try {
recorder.start(grabber.getFormatContext());
return recorderStatus = true;
} catch (FrameRecorder.Exception e) {
log.warn("{} 启动转复用失败,自动切换转码", cameraDto.getUrl());
transferFlag = false;
try { recorder.stop(); } catch (FrameRecorder.Exception ignored) {}
if (createTransterOrRecodeRecorder()) {
log.error("{} 切换到转码模式", cameraDto.getUrl());
return true;
}
log.error("{} 切换转码模式失败", cameraDto.getUrl(), e);
}
}
return recorderStatus = false;
}
private boolean supportFlvFormatCodec() {
int vcodec = grabber.getVideoCodec();
int acodec = grabber.getAudioCodec();
return (cameraDto.getType() == 0)
&& ("desktop".equals(cameraDto.getUrl()) || avcodec.AV_CODEC_ID_H264 == vcodec
|| avcodec.AV_CODEC_ID_H263 == vcodec)
&& (avcodec.AV_CODEC_ID_AAC == acodec || avcodec.AV_CODEC_ID_AAC_LATM == acodec);
}
/* ===================== 主流程 ===================== */
protected void transferStream2Flv() {
try {
if (enableDetection) initDetectors();
} catch (Exception e) {
log.error("初始化检测模型失败:{}", e.getMessage(), e);
}
if (!createGrabber()) return;
if (!enableDetection) transferFlag = supportFlvFormatCodec();
if (!createTransterOrRecodeRecorder()) return;
try { grabber.flush(); } catch (FrameGrabber.Exception e) { log.debug("flush grabber fail", e); }
if (header == null) {
header = bos.toByteArray();
bos.reset();
}
running = true;
listenClient();
long startTime = 0;
long videoTS;
// 检测频率控制变量
final long DETECTION_INTERVAL_MS = 3000; // 每3秒检测一次
long lastDetectionTime = 0;
List<Detection> currentDetections = Collections.emptyList(); // 当前显示的检测结果
for (; running && grabberStatus && recorderStatus; ) {
try {
if (transferFlag) {
// 仅转复用(未叠框)
long startGrab = System.currentTimeMillis();
AVPacket pkt = grabber.grabPacket();
if ((System.currentTimeMillis() - startGrab) > 5000) {
log.info("{} 网络异常(复用)", cameraDto.getUrl());
closeMedia();
break;
}
if (pkt != null && !pkt.isNull()) {
if (startTime == 0) startTime = System.currentTimeMillis();
videoTS = 1000 * (System.currentTimeMillis() - startTime);
if (videoTS > recorder.getTimestamp()) recorder.setTimestamp(videoTS);
recorder.recordPacket(pkt);
}
} else {
// 转码(可叠框)
long startGrab = System.currentTimeMillis();
Frame frame = grabber.grab();
if ((System.currentTimeMillis() - startGrab) > 5000) {
log.info("{} 网络异常(转码)", cameraDto.getUrl());
closeMedia();
break;
}
if (frame != null && enableDetection) {
// 将Frame转换为Mat以进行处理
Mat mat = toMat.convert(frame);
if (mat != null && !mat.empty()) {
long currentTime = System.currentTimeMillis();
// 每隔DETECTION_INTERVAL_MS执行一次检测
if (currentTime - lastDetectionTime >= DETECTION_INTERVAL_MS) {
try {
log.debug("执行新一轮检测,上次检测时间: {}ms前",
currentTime - lastDetectionTime);
// 创建副本进行检测
Mat detectionMat = new Mat();
mat.copyTo(detectionMat);
// 执行检测
currentDetections = detector.detect(detectionMat);
lastDetectionTime = currentTime;
latestDetections.set(currentDetections);
// 释放检测Mat
detectionMat.release();
// 窗口巡检回调
if (windowMode && detectionListener != null &&
currentJobId != null && currentDeviceId != null) {
detectionListener.onDetections(currentJobId,
currentDeviceId,
currentDetections,
currentTime);
}
log.debug("检测完成,发现 {} 个目标框将保持3秒",
currentDetections == null ? 0 : currentDetections.size());
} catch (Exception e) {
log.debug("检测异常: {}", e.getMessage());
}
}
// 每一帧都使用最新的检测结果绘制框
// 这样框会保持在原位置,直到下一次检测更新
if (currentDetections != null && !currentDetections.isEmpty()) {
try {
// 在当前帧上绘制检测框
Overlay.draw(currentDetections, mat);
} catch (Exception e) {
log.debug("绘制检测框异常: {}", e.getMessage());
}
}
// 更新"最近叠好框的帧"用于存证
updateLatestAnnotated(mat);
// 统计(仅窗口巡检时)
if (windowMode) updateStats(currentDetections);
// 窗口结束判定
if (windowMode && System.currentTimeMillis() >= windowEndMs) {
finishWindow();
}
// 将处理后的Mat转换回Frame
try {
// 创建新的转换器
OpenCVFrameConverter.ToMat converter = new OpenCVFrameConverter.ToMat();
Frame processedFrame = converter.convert(mat);
if (processedFrame != null) {
// 使用处理后的帧替换原始帧
frame = processedFrame;
}
} catch (Exception e) {
log.debug("Mat转Frame异常: {}", e.getMessage());
// 如果转换失败,继续使用原始帧
}
// 释放Mat
mat.release();
}
}
// 记录帧
if (frame != null) {
long now = System.currentTimeMillis();
if (startTime == 0) startTime = now;
videoTS = 1000 * (now - startTime);
if (videoTS > recorder.getTimestamp()) recorder.setTimestamp(videoTS);
recorder.record(frame);
}
}
} catch (FrameGrabber.Exception e) {
log.error("拉流异常: {}", e.getMessage());
grabberStatus = false;
MediaService.cameras.remove(cameraDto.getMediaKey());
} catch (FrameRecorder.Exception e) {
log.error("推流异常: {}", e.getMessage());
recorderStatus = false;
MediaService.cameras.remove(cameraDto.getMediaKey());
} catch (Exception e) {
log.error("其他异常: {}", e.getMessage());
// 不要立即退出,尝试继续处理
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
// 输出缓存到客户端
try {
if (bos.size() > 0) {
byte[] b = bos.toByteArray();
bos.reset();
sendFrameData(b);
}
} catch (Exception e) {
log.error("发送数据异常: {}", e.getMessage());
}
}
// 安全地关闭资源
safeCloseResources();
}
// 将资源关闭逻辑提取到单独的方法
private void safeCloseResources() {
try {
if (detector != null) {
try { detector.close(); } catch (Exception ignored) {}
}
if (modelManager != null) {
try { modelManager.close(); } catch (Exception ignored) {}
}
if (recorder != null) {
try { recorder.close(); } catch (Exception ignored) {}
}
if (grabber != null) {
try { grabber.close(); } catch (Exception ignored) {}
}
try { bos.close(); } catch (Exception ignored) {}
Mat m = latestFrame.getAndSet(null);
if (m != null) {
try { m.release(); } catch (Exception ignored) {}
}
Mat a = latestAnnotatedFrame.getAndSet(null);
if (a != null) {
try { a.release(); } catch (Exception ignored) {}
}
closeMedia();
} catch (Exception e) {
log.error("关闭资源异常: {}", e.getMessage());
}
log.info("关闭媒体流-javacv: {}", cameraDto.getUrl());
}
/* ===================== 统计 / 窗口结束 ===================== */
private void updateStats(List<Detection> dets) {
stats.setFrames(stats.getFrames() + 1);
if (dets != null && !dets.isEmpty()) {
stats.setDetectedFrames(stats.getDetectedFrames() + 1);
stats.setObjects(stats.getObjects() + dets.size());
double localMax = dets.stream().mapToDouble(Detection::conf).max().orElse(0.0);
if (localMax > stats.getMaxScore()) stats.setMaxScore(localMax);
}
}
private void finishWindow() {
windowMode = false;
stats.setEndMs(System.currentTimeMillis());
if (detectionListener != null && currentJobId != null && currentDeviceId != null) {
try { detectionListener.onWindowFinished(currentJobId, currentDeviceId, cloneStats(stats)); }
catch (Exception ignore) {}
}
currentJobId = null;
currentDeviceId = null;
detectionListener = null;
log.info("window finished (timeout)");
}
private static WindowStats cloneStats(WindowStats s) {
WindowStats c = new WindowStats();
c.setFrames(s.getFrames());
c.setDetectedFrames(s.getDetectedFrames());
c.setObjects(s.getObjects());
c.setMaxScore(s.getMaxScore());
c.setStartMs(s.getStartMs());
c.setEndMs(s.getEndMs());
return c;
}
private void updateLatestAnnotated(Mat src) {
if (src == null || src.empty()) return;
Mat copy = new Mat(src.rows(), src.cols(), src.type());
src.copyTo(copy);
Mat old = latestAnnotatedFrame.getAndSet(copy);
if (old != null) old.release();
}
/* ===================== 网络发送/连接管理 ===================== */
private void sendFrameData(byte[] data) {
// ws
for (Map.Entry<String, ChannelHandlerContext> entry : wsClients.entrySet()) {
try {
if (entry.getValue().channel().isWritable()) {
entry.getValue().writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(data)));
} else {
wsClients.remove(entry.getKey());
hasClient();
}
} catch (Exception e) {
wsClients.remove(entry.getKey());
hasClient();
log.debug("ws send err", e);
}
}
// http
for (Map.Entry<String, ChannelHandlerContext> entry : httpClients.entrySet()) {
try {
if (entry.getValue().channel().isWritable()) {
entry.getValue().writeAndFlush(Unpooled.copiedBuffer(data));
} else {
httpClients.remove(entry.getKey());
hasClient();
}
} catch (Exception e) {
httpClients.remove(entry.getKey());
hasClient();
log.debug("http send err", e);
}
}
}
public void hasClient() {
int newHcSize = httpClients.size();
int newWcSize = wsClients.size();
if (hcSize != newHcSize || wcSize != newWcSize) {
hcSize = newHcSize;
wcSize = newWcSize;
log.info("{} http连接数{}, ws连接数{}", cameraDto.getUrl(), newHcSize, newWcSize);
}
if (!cameraDto.isAutoClose()) return;
if (httpClients.isEmpty() && wsClients.isEmpty()) {
if (noClient > cameraDto.getNoClientsDuration()) {
closeMedia();
} else {
noClient += 1000;
}
} else {
noClient = 0;
}
}
public void listenClient() {
listenThread = new Thread(() -> {
while (running) {
hasClient();
try { Thread.sleep(1000); } catch (InterruptedException ignored) {}
}
});
listenThread.start();
}
private void closeMedia() {
// 结束窗口(如果还在)
stopWindowIfAny();
running = false;
MediaService.cameras.remove(cameraDto.getMediaKey());
for (Map.Entry<String, ChannelHandlerContext> entry : wsClients.entrySet()) {
try { entry.getValue().close(); } catch (Exception ignored) {} finally { wsClients.remove(entry.getKey()); }
}
for (Map.Entry<String, ChannelHandlerContext> entry : httpClients.entrySet()) {
try { entry.getValue().close(); } catch (Exception ignored) {} finally { httpClients.remove(entry.getKey()); }
}
}
public void addClient(ChannelHandlerContext ctx, ClientType ctype) {
int timeout = 0;
while (true) {
try {
if (header != null) {
try {
if (ctx.channel().isWritable()) {
if (ClientType.HTTP.getType() == ctype.getType()) {
ChannelFuture future = ctx.writeAndFlush(Unpooled.copiedBuffer(header));
future.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) {
if (future.isSuccess()) {
httpClients.put(ctx.channel().id().toString(), ctx);
}
}
});
} else if (ClientType.WEBSOCKET.getType() == ctype.getType()) {
ChannelFuture future = ctx.writeAndFlush(
new BinaryWebSocketFrame(Unpooled.copiedBuffer(header)));
future.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) {
if (future.isSuccess()) {
wsClients.put(ctx.channel().id().toString(), ctx);
}
}
});
}
}
} catch (Exception e) {
log.debug("send header err", e);
}
break;
}
Thread.sleep(50);
timeout += 50;
if (timeout > 30000) break;
} catch (Exception e) {
log.debug("addClient err", e);
}
}
}
@Override
public void run() {
transferStream2Flv();
}
}