package com.ruoyi.video.thread; import com.ruoyi.video.common.ClientType; import com.ruoyi.video.common.ModelManager; import com.ruoyi.video.domain.Detection; import com.ruoyi.video.domain.dto.CameraDto; import com.ruoyi.video.service.MediaService; import com.ruoyi.video.thread.detector.CompositeDetector; import com.ruoyi.video.thread.detector.YoloDetector; import com.ruoyi.video.utils.Overlay; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.bytedeco.ffmpeg.avcodec.AVPacket; import org.bytedeco.ffmpeg.global.avcodec; import org.bytedeco.ffmpeg.global.avutil; import org.bytedeco.javacv.*; import org.bytedeco.opencv.opencv_core.Mat; import org.springframework.util.CollectionUtils; import java.io.ByteArrayOutputStream; import java.net.URL; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; import static org.bytedeco.opencv.global.opencv_core.CV_8UC3; /** * 推流(FLV) + JavaCV 解码/转码 + (可选)YOLO 检测叠框 * - 支持“窗口巡检”:在给定秒数内启用推理与统计,并通过 DetectionListener 回调让上层落库/告警 * - 播放开始可触发 10 秒试跑:attachDetectionListener(jobId, deviceId, 10, listener) * * 依赖:ModelManager / YoloDetector / CompositeDetector / Detection / Overlay / MediaService / CameraDto / ClientType * * @author orange * @since 2025-01-16 */ @Slf4j public class MediaTransferFlvByJavacv extends MediaTransfer implements Runnable { /* ===================== 内部回调/统计类型(如已外部定义,可移除) ===================== */ public interface DetectionListener { /** 每次推理得到 detections 时回调(建议上层做节流) */ void onDetections(Long jobId, Long deviceId, List detections, long frameTsMs); /** 一个“窗口巡检”结束时回调(含统计数据) */ void onWindowFinished(Long jobId, Long deviceId, WindowStats stats); } @Data public static class WindowStats { private int frames; private int detectedFrames; private int objects; private double maxScore; private long startMs; private long endMs; } /* ===================== FFmpeg/JavaCV 初始化 ===================== */ static { avutil.av_log_set_level(avutil.AV_LOG_ERROR); FFmpegLogCallback.set(); } /* ===================== 原有字段 ===================== */ private final ConcurrentHashMap wsClients = new ConcurrentHashMap<>(); private final ConcurrentHashMap httpClients = new ConcurrentHashMap<>(); private volatile boolean running = false; private boolean grabberStatus = false; private boolean recorderStatus = false; private int hcSize, wcSize = 0; private int noClient = 0; private byte[] header = null; private final ByteArrayOutputStream bos = new ByteArrayOutputStream(); private FFmpegFrameGrabber grabber; // 拉流器 private FFmpegFrameRecorder recorder; // 推流录制器 /** true: 转复用;false: 转码。启用检测时强制转码(要在像素上叠框) */ private boolean transferFlag = false; private final CameraDto cameraDto; private Thread listenThread; /* ===================== 推理相关字段 ===================== */ // 外部开关:是否启用检测(默认启用;也可由任务/页面配置动态设置) private boolean enableDetection = true; private ModelManager modelManager; private YoloDetector detector; // 解码/推理/发送解耦 private final OpenCVFrameConverter.ToMat toMat = new OpenCVFrameConverter.ToMat(); private final AtomicReference latestFrame = new AtomicReference<>(); private final AtomicReference> latestDetections = new AtomicReference<>(java.util.Collections.emptyList()); // 窗口巡检控制 private volatile boolean windowMode = false; private volatile long windowEndMs = 0L; private Long currentJobId; private Long currentDeviceId; private DetectionListener detectionListener; private final WindowStats stats = new WindowStats(); // 导出最近一次“叠好框的帧”用于截图存证 private final AtomicReference latestAnnotatedFrame = new AtomicReference<>(); public MediaTransferFlvByJavacv(CameraDto cameraDto) { super(); this.cameraDto = cameraDto; } public void setRunning(boolean running) { boolean prev = this.running; this.running = running; // 如果是从 true -> false,则按“关闭”处理 if (prev && !running) { try { closeMedia(); // 内部会 stopWindowIfAny()、关闭连接等 } catch (Exception ignore) {} } } /** 推荐的新接口:显式停止并释放资源 */ public void stop() { setRunning(false); } /* ===================== 外部控制 API ===================== */ public boolean isRunning() { return running; } public boolean isGrabberStatus() { return grabberStatus; } public boolean isRecorderStatus() { return recorderStatus; } public void setEnableDetection(boolean enable) { this.enableDetection = enable; } /** * 开启一个“窗口巡检”,持续 windowSeconds 秒;期间每次推理回调 onDetections,结束时 onWindowFinished */ public void attachDetectionListener(Long jobId, Long deviceId, int windowSeconds, DetectionListener listener) { if (windowSeconds <= 0 || listener == null) return; this.currentJobId = jobId; this.currentDeviceId = deviceId; this.detectionListener = listener; this.windowMode = true; long now = System.currentTimeMillis(); this.stats.setStartMs(now); this.windowEndMs = now + windowSeconds * 1000L; this.stats.setFrames(0); this.stats.setDetectedFrames(0); this.stats.setObjects(0); this.stats.setMaxScore(0.0); log.info("[job:{} device:{}] window started {}s", jobId, deviceId, windowSeconds); } /** 主动结束当前窗口(可用于任务被中断的场景) */ public void stopWindowIfAny() { if (!windowMode) return; this.windowMode = false; stats.setEndMs(System.currentTimeMillis()); if (detectionListener != null && currentJobId != null && currentDeviceId != null) { try { detectionListener.onWindowFinished(currentJobId, currentDeviceId, cloneStats(stats)); } catch (Exception ignore) {} } currentJobId = null; currentDeviceId = null; detectionListener = null; log.info("window finished (stopWindowIfAny)"); } /** 导出最近一次“叠好框的帧”(深拷贝),用于截图/存证。调用方负责释放 Mat */ public Mat getLatestAnnotatedFrameCopy() { Mat src = latestAnnotatedFrame.get(); if (src == null || src.empty()) return null; Mat copy = new Mat(src.rows(), src.cols(), src.type()); src.copyTo(copy); return copy; } /* ===================== 初始化推理 ===================== */ private void initDetectors() throws Exception { if (!enableDetection) return; modelManager = new ModelManager(); URL json = getClass().getResource("/models/models.json"); modelManager.load(json); // 你可按需切换单模型或多模型并行 // detector = modelManager.get("person-helmet"); detector = new CompositeDetector( "all-models", java.util.List.of( modelManager.get("person-helmet"), modelManager.get("vehicle-plate") ), 2 // 并行度 ); log.info("YOLO detectors ready: {}", detector.name()); // 预热一次,避免前几帧“无框” try { Frame warm = grabber != null ? grabber.grabImage() : null; if (warm != null) { Mat wm = toMat.convert(warm); if (wm != null && !wm.empty()) { long t0 = System.currentTimeMillis(); List dets = detector.detect(wm); long cost = System.currentTimeMillis() - t0; latestDetections.set(dets); log.info("Detector warm-up OK, cost={}ms, dets={}", cost, CollectionUtils.isEmpty(dets) ? 0 : dets.size()); } } } catch (Throwable e) { log.warn("Detector warm-up failed: {}", e.getMessage()); } } /* ===================== 拉流/推流 ===================== */ protected boolean createGrabber() { grabber = new FFmpegFrameGrabber(cameraDto.getUrl()); String fiveSecUs = "5000000"; String oneMb = "1048576"; grabber.setOption("threads", "1"); grabber.setOption("buffer_size", oneMb); grabber.setOption("rw_timeout", fiveSecUs); grabber.setOption("stimeout", fiveSecUs); grabber.setOption("probesize", "1048576"); grabber.setOption("analyzeduration", fiveSecUs); grabber.setOption("fflags", "nobuffer"); grabber.setOption("flags", "low_delay"); grabber.setOption("loglevel", "error"); if (cameraDto.getUrl().toLowerCase().startsWith("rtsp://")) { grabber.setOption("rtsp_transport", "tcp"); grabber.setOption("allowed_media_types", "video"); grabber.setOption("max_delay", "500000"); grabber.setOption("user_agent", "Lavf/60"); } else if (cameraDto.getUrl().toLowerCase().startsWith("rtmp://")) { grabber.setOption("rtmp_buffer", "1000"); } else if ("desktop".equalsIgnoreCase(cameraDto.getUrl())) { grabber.setFormat("gdigrab"); grabber.setOption("draw_mouse", "1"); grabber.setNumBuffers(0); grabber.setOption("fflags", "nobuffer"); grabber.setOption("framerate", "25"); grabber.setFrameRate(25); } try { grabber.start(); log.info("启动拉流器成功: {}", cameraDto.getUrl()); return (grabberStatus = true); } catch (FrameGrabber.Exception e) { MediaService.cameras.remove(cameraDto.getMediaKey()); log.error("启动拉流器失败: {} ({})", cameraDto.getUrl(), e.getMessage()); return (grabberStatus = false); } } protected boolean createTransterOrRecodeRecorder() { // 启用检测时必须转码(需要像素级叠框) if (enableDetection) transferFlag = false; recorder = new FFmpegFrameRecorder(bos, grabber.getImageWidth(), grabber.getImageHeight(), grabber.getAudioChannels()); recorder.setFormat("flv"); if (!transferFlag) { // 转码:低延迟 H.264 recorder.setInterleaved(false); recorder.setVideoOption("tune", "zerolatency"); recorder.setVideoOption("preset", "ultrafast"); recorder.setVideoOption("crf", "26"); recorder.setVideoOption("threads", "1"); recorder.setFrameRate(25); recorder.setGopSize(25); recorder.setVideoCodec(avcodec.AV_CODEC_ID_H264); recorder.setPixelFormat(avutil.AV_PIX_FMT_YUV420P); recorder.setAudioCodec(avcodec.AV_CODEC_ID_AAC); recorder.setTrellis(1); recorder.setMaxDelay(0); try { recorder.start(); return recorderStatus = true; } catch (FrameRecorder.Exception e1) { log.info("启动转码录制器失败", e1); MediaService.cameras.remove(cameraDto.getMediaKey()); } } else { // 转复用(仅不叠框时) recorder.setCloseOutputStream(false); try { recorder.start(grabber.getFormatContext()); return recorderStatus = true; } catch (FrameRecorder.Exception e) { log.warn("{} 启动转复用失败,自动切换转码", cameraDto.getUrl()); transferFlag = false; try { recorder.stop(); } catch (FrameRecorder.Exception ignored) {} if (createTransterOrRecodeRecorder()) { log.error("{} 切换到转码模式", cameraDto.getUrl()); return true; } log.error("{} 切换转码模式失败", cameraDto.getUrl(), e); } } return recorderStatus = false; } private boolean supportFlvFormatCodec() { int vcodec = grabber.getVideoCodec(); int acodec = grabber.getAudioCodec(); return (cameraDto.getType() == 0) && ("desktop".equals(cameraDto.getUrl()) || avcodec.AV_CODEC_ID_H264 == vcodec || avcodec.AV_CODEC_ID_H263 == vcodec) && (avcodec.AV_CODEC_ID_AAC == acodec || avcodec.AV_CODEC_ID_AAC_LATM == acodec); } /* ===================== 主流程 ===================== */ protected void transferStream2Flv() { try { if (enableDetection) initDetectors(); } catch (Exception e) { log.error("初始化检测模型失败:{}", e.getMessage(), e); enableDetection = false; // 模型失败不影响推流 } if (!createGrabber()) return; if (!enableDetection) transferFlag = supportFlvFormatCodec(); if (!createTransterOrRecodeRecorder()) return; try { grabber.flush(); } catch (FrameGrabber.Exception e) { log.debug("flush grabber fail", e); } if (header == null) { header = bos.toByteArray(); bos.reset(); } running = true; listenClient(); long startTime = 0; long videoTS; // === 解耦线程:解码 / 推理 === Thread tDecode = null, tInfer = null; if (enableDetection) { tDecode = new Thread(() -> { while (running && grabberStatus) { try { Frame f = grabber.grabImage(); if (f == null) continue; Mat m = toMat.convert(f); if (m == null || m.empty()) continue; Mat copy = new Mat(m.rows(), m.cols(), CV_8UC3); m.copyTo(copy); Mat old = latestFrame.getAndSet(copy); if (old != null) old.release(); } catch (Exception e) { log.debug("decode err: {}", e.getMessage()); } } }, "det-decode"); int inferFps = 15; long period = 1_000_000_000L / inferFps; tInfer = new Thread(() -> { long next = System.nanoTime(); while (running && grabberStatus) { long now = System.nanoTime(); if (now < next) { LockSupport.parkNanos(next - now); continue; } next += period; Mat src = latestFrame.get(); if (src == null || src.empty()) continue; Mat snap = new Mat(); src.copyTo(snap); try { List dets = detector.detect(snap); latestDetections.set(dets); // 窗口巡检期间:回调 onDetections if (windowMode && detectionListener != null && currentJobId != null && currentDeviceId != null) { long ts = System.currentTimeMillis(); try { detectionListener.onDetections(currentJobId, currentDeviceId, dets, ts); } catch (Exception ignore) {} } } catch (Throwable e) { log.debug("infer err: {}", e.getMessage()); } finally { snap.release(); } } }, "det-infer"); tDecode.start(); tInfer.start(); } // === 主发送循环 === for (; running && grabberStatus && recorderStatus; ) { try { if (transferFlag) { // 仅转复用(未叠框) long startGrab = System.currentTimeMillis(); AVPacket pkt = grabber.grabPacket(); if ((System.currentTimeMillis() - startGrab) > 5000) { log.info("{} 网络异常(复用)", cameraDto.getUrl()); closeMedia(); break; } if (pkt != null && !pkt.isNull()) { if (startTime == 0) startTime = System.currentTimeMillis(); videoTS = 1000 * (System.currentTimeMillis() - startTime); if (videoTS > recorder.getTimestamp()) recorder.setTimestamp(videoTS); recorder.recordPacket(pkt); } } else { // 转码(可叠框) long startGrab = System.currentTimeMillis(); Frame frame; if (enableDetection) { Mat src = latestFrame.get(); if (src == null || src.empty()) continue; // 叠加最近一次检测结果 List dets = latestDetections.get(); if (!CollectionUtils.isEmpty(dets)) { Overlay.draw(dets, src); } // 更新“最近叠好框的帧”用于存证 updateLatestAnnotated(src); // 统计(仅窗口巡检时) if (windowMode) updateStats(dets); // 窗口结束判定 if (windowMode && System.currentTimeMillis() >= windowEndMs) { finishWindow(); } frame = toMat.convert(src); } else { frame = grabber.grab(); } if ((System.currentTimeMillis() - startGrab) > 5000) { log.info("{} 网络异常(转码)", cameraDto.getUrl()); closeMedia(); break; } if (frame != null) { long now = System.currentTimeMillis(); if (startTime == 0) startTime = now; videoTS = 1000 * (now - startTime); if (videoTS > recorder.getTimestamp()) recorder.setTimestamp(videoTS); recorder.record(frame); } } } catch (FrameGrabber.Exception e) { grabberStatus = false; MediaService.cameras.remove(cameraDto.getMediaKey()); } catch (FrameRecorder.Exception e) { recorderStatus = false; MediaService.cameras.remove(cameraDto.getMediaKey()); } // 输出缓存到客户端 if (bos.size() > 0) { byte[] b = bos.toByteArray(); bos.reset(); sendFrameData(b); } } // === 收尾 === try { if (detector != null) try { detector.close(); } catch (Exception ignored) {} if (modelManager != null) try { modelManager.close(); } catch (Exception ignored) {} if (recorder != null) recorder.close(); if (grabber != null) grabber.close(); bos.close(); } catch (Exception ignored) { } finally { Mat m = latestFrame.getAndSet(null); if (m != null) m.release(); Mat a = latestAnnotatedFrame.getAndSet(null); if (a != null) a.release(); closeMedia(); } log.info("关闭媒体流-javacv: {}", cameraDto.getUrl()); } /* ===================== 统计 / 窗口结束 ===================== */ private void updateStats(List dets) { stats.setFrames(stats.getFrames() + 1); if (dets != null && !dets.isEmpty()) { stats.setDetectedFrames(stats.getDetectedFrames() + 1); stats.setObjects(stats.getObjects() + dets.size()); double localMax = dets.stream().mapToDouble(Detection::conf).max().orElse(0.0); if (localMax > stats.getMaxScore()) stats.setMaxScore(localMax); } } private void finishWindow() { windowMode = false; stats.setEndMs(System.currentTimeMillis()); if (detectionListener != null && currentJobId != null && currentDeviceId != null) { try { detectionListener.onWindowFinished(currentJobId, currentDeviceId, cloneStats(stats)); } catch (Exception ignore) {} } currentJobId = null; currentDeviceId = null; detectionListener = null; log.info("window finished (timeout)"); } private static WindowStats cloneStats(WindowStats s) { WindowStats c = new WindowStats(); c.setFrames(s.getFrames()); c.setDetectedFrames(s.getDetectedFrames()); c.setObjects(s.getObjects()); c.setMaxScore(s.getMaxScore()); c.setStartMs(s.getStartMs()); c.setEndMs(s.getEndMs()); return c; } private void updateLatestAnnotated(Mat src) { if (src == null || src.empty()) return; Mat copy = new Mat(src.rows(), src.cols(), src.type()); src.copyTo(copy); Mat old = latestAnnotatedFrame.getAndSet(copy); if (old != null) old.release(); } /* ===================== 网络发送/连接管理 ===================== */ private void sendFrameData(byte[] data) { // ws for (Map.Entry entry : wsClients.entrySet()) { try { if (entry.getValue().channel().isWritable()) { entry.getValue().writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(data))); } else { wsClients.remove(entry.getKey()); hasClient(); } } catch (Exception e) { wsClients.remove(entry.getKey()); hasClient(); log.debug("ws send err", e); } } // http for (Map.Entry entry : httpClients.entrySet()) { try { if (entry.getValue().channel().isWritable()) { entry.getValue().writeAndFlush(Unpooled.copiedBuffer(data)); } else { httpClients.remove(entry.getKey()); hasClient(); } } catch (Exception e) { httpClients.remove(entry.getKey()); hasClient(); log.debug("http send err", e); } } } public void hasClient() { int newHcSize = httpClients.size(); int newWcSize = wsClients.size(); if (hcSize != newHcSize || wcSize != newWcSize) { hcSize = newHcSize; wcSize = newWcSize; log.info("{} http连接数:{}, ws连接数:{}", cameraDto.getUrl(), newHcSize, newWcSize); } if (!cameraDto.isAutoClose()) return; if (httpClients.isEmpty() && wsClients.isEmpty()) { if (noClient > cameraDto.getNoClientsDuration()) { closeMedia(); } else { noClient += 1000; } } else { noClient = 0; } } public void listenClient() { listenThread = new Thread(() -> { while (running) { hasClient(); try { Thread.sleep(1000); } catch (InterruptedException ignored) {} } }); listenThread.start(); } private void closeMedia() { // 结束窗口(如果还在) stopWindowIfAny(); running = false; MediaService.cameras.remove(cameraDto.getMediaKey()); for (Map.Entry entry : wsClients.entrySet()) { try { entry.getValue().close(); } catch (Exception ignored) {} finally { wsClients.remove(entry.getKey()); } } for (Map.Entry entry : httpClients.entrySet()) { try { entry.getValue().close(); } catch (Exception ignored) {} finally { httpClients.remove(entry.getKey()); } } } public void addClient(ChannelHandlerContext ctx, ClientType ctype) { int timeout = 0; while (true) { try { if (header != null) { try { if (ctx.channel().isWritable()) { if (ClientType.HTTP.getType() == ctype.getType()) { ChannelFuture future = ctx.writeAndFlush(Unpooled.copiedBuffer(header)); future.addListener(new GenericFutureListener>() { @Override public void operationComplete(Future future) { if (future.isSuccess()) { httpClients.put(ctx.channel().id().toString(), ctx); } } }); } else if (ClientType.WEBSOCKET.getType() == ctype.getType()) { ChannelFuture future = ctx.writeAndFlush( new BinaryWebSocketFrame(Unpooled.copiedBuffer(header))); future.addListener(new GenericFutureListener>() { @Override public void operationComplete(Future future) { if (future.isSuccess()) { wsClients.put(ctx.channel().id().toString(), ctx); } } }); } } } catch (Exception e) { log.debug("send header err", e); } break; } Thread.sleep(50); timeout += 50; if (timeout > 30000) break; } catch (Exception e) { log.debug("addClient err", e); } } } @Override public void run() { transferStream2Flv(); } }