package com.ruoyi.video.thread; import com.ruoyi.video.common.ClientType; import com.ruoyi.video.common.ModelManager; // ★ 新增:多模型管理(见前面提供的类) import com.ruoyi.video.domain.Detection; // ★ 新增:检测结果(见前面提供的类) import com.ruoyi.video.domain.dto.CameraDto; import com.ruoyi.video.service.MediaService; import com.ruoyi.video.thread.detector.CompositeDetector; // ★ 新增:并行多模型 import com.ruoyi.video.thread.detector.YoloDetector; // ★ 新增:检测接口 import com.ruoyi.video.utils.Overlay; // ★ 新增:画框工具 import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import lombok.extern.slf4j.Slf4j; import org.bytedeco.ffmpeg.avcodec.AVPacket; import org.bytedeco.ffmpeg.global.avcodec; import org.bytedeco.ffmpeg.global.avutil; import org.bytedeco.javacv.*; import org.bytedeco.opencv.opencv_core.Mat; import org.springframework.scheduling.annotation.Async; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.URL; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; import static org.bytedeco.opencv.global.opencv_core.CV_8UC3; /** * @Author: orange * @CreateTime: 2025-01-16 */ @Slf4j public class MediaTransferFlvByJavacv extends MediaTransfer implements Runnable { static { avutil.av_log_set_level(avutil.AV_LOG_ERROR); FFmpegLogCallback.set(); } /*** ====== 原有字段 ====== ***/ private final ConcurrentHashMap wsClients = new ConcurrentHashMap<>(); private final ConcurrentHashMap httpClients = new ConcurrentHashMap<>(); private volatile boolean running = false; private boolean grabberStatus = false; private boolean recorderStatus = false; private int hcSize, wcSize = 0; private int noClient = 0; private byte[] header = null; private final ByteArrayOutputStream bos = new ByteArrayOutputStream(); private FFmpegFrameGrabber grabber; // 拉流器 private FFmpegFrameRecorder recorder; // 推流录制器 /** true:转复用,false:转码 */ private boolean transferFlag = false; // 默认转码 private final CameraDto cameraDto; private Thread listenThread; /*** ====== 新增:推理相关字段 ====== ***/ // 开关:是否启用检测(可对外提供 setter) private boolean enableDetection = true; // 模型与推理 private ModelManager modelManager; private YoloDetector detector; // 三线程解耦所需 private final OpenCVFrameConverter.ToMat toMat = new OpenCVFrameConverter.ToMat(); private final AtomicReference latestFrame = new AtomicReference<>(); private final AtomicReference> latestDetections = new AtomicReference<>(java.util.Collections.emptyList()); public MediaTransferFlvByJavacv(CameraDto cameraDto) { super(); this.cameraDto = cameraDto; } public boolean isRunning() { return running; } public void setRunning(boolean running) { this.running = running; } public boolean isGrabberStatus() { return grabberStatus; } public void setGrabberStatus(boolean grabberStatus) { this.grabberStatus = grabberStatus; } public boolean isRecorderStatus() { return recorderStatus; } public void setRecorderStatus(boolean recorderStatus) { this.recorderStatus = recorderStatus; } public void setEnableDetection(boolean enable) { this.enableDetection = enable; } /*** ====== 推理初始化 ====== ***/ private void initDetectors() throws Exception { if (!enableDetection) return; modelManager = new ModelManager(); URL json = getClass().getResource("/models/models.json"); modelManager.load(json); // 单模型: detector = modelManager.get("person-helmet"); // 多模型并行(示例),并行度按CPU核数/模型大小调整: detector = new CompositeDetector( "all-models", java.util.List.of(modelManager.get("person-helmet"), modelManager.get("vehicle-plate")), 2 ); log.info("YOLO detectors ready: {}", detector.name()); } /*** ====== 拉流器 ====== ***/ protected boolean createGrabber() { grabber = new FFmpegFrameGrabber(cameraDto.getUrl()); // 注意:这些是微秒字符串 String fiveSecUs = "5000000"; String oneMb = "1048576"; grabber.setOption("threads", "1"); grabber.setOption("buffer_size", oneMb); grabber.setOption("rw_timeout", fiveSecUs); grabber.setOption("stimeout", fiveSecUs); grabber.setOption("probesize", "1048576"); // ← 修正:probesize 是“字节” grabber.setOption("analyzeduration", fiveSecUs); grabber.setOption("fflags", "nobuffer"); grabber.setOption("flags", "low_delay"); grabber.setOption("loglevel", "error"); // 稳定后压低日志 if (cameraDto.getUrl().toLowerCase().startsWith("rtsp://")) { grabber.setOption("rtsp_transport", "tcp"); // 你要测 UDP 再改 grabber.setOption("allowed_media_types", "video"); grabber.setOption("max_delay", "500000"); grabber.setOption("user_agent", "Lavf/60"); } else if (cameraDto.getUrl().toLowerCase().startsWith("rtmp://")) { grabber.setOption("rtmp_buffer", "1000"); } else if ("desktop".equalsIgnoreCase(cameraDto.getUrl())) { grabber.setFormat("gdigrab"); grabber.setOption("draw_mouse", "1"); grabber.setNumBuffers(0); grabber.setOption("fflags", "nobuffer"); grabber.setOption("framerate", "25"); grabber.setFrameRate(25); } try { grabber.start(); log.info("\n{}\n启动拉流器成功", cameraDto.getUrl()); return (grabberStatus = true); } catch (FrameGrabber.Exception e) { MediaService.cameras.remove(cameraDto.getMediaKey()); log.error("\n{}\n启动拉流器失败,网络超时或视频源不可用({})", cameraDto.getUrl(), e.getMessage()); return (grabberStatus = false); } } /*** ====== 录制器(转码/转复用) ====== ***/ protected boolean createTransterOrRecodeRecorder() { // 若启用检测,必须转码(因为需要在像素上画框) if (enableDetection) transferFlag = false; recorder = new FFmpegFrameRecorder(bos, grabber.getImageWidth(), grabber.getImageHeight(), grabber.getAudioChannels()); recorder.setFormat("flv"); if (!transferFlag) { // 转码(低延迟 H.264) recorder.setInterleaved(false); recorder.setVideoOption("tune", "zerolatency"); recorder.setVideoOption("preset", "ultrafast"); recorder.setVideoOption("crf", "26"); recorder.setVideoOption("threads", "1"); recorder.setFrameRate(25); recorder.setGopSize(25); recorder.setVideoCodec(avcodec.AV_CODEC_ID_H264); recorder.setPixelFormat(avutil.AV_PIX_FMT_YUV420P); recorder.setAudioCodec(avcodec.AV_CODEC_ID_AAC); recorder.setTrellis(1); recorder.setMaxDelay(0); try { recorder.start(); return recorderStatus = true; } catch (FrameRecorder.Exception e1) { log.info("启动转码录制器失败", e1); MediaService.cameras.remove(cameraDto.getMediaKey()); } } else { // 转复用(不画框时可用) recorder.setCloseOutputStream(false); try { recorder.start(grabber.getFormatContext()); return recorderStatus = true; } catch (FrameRecorder.Exception e) { log.warn("\r\n{}\r\n启动转复用录制器失败,自动切换转码", cameraDto.getUrl()); transferFlag = false; try { recorder.stop(); } catch (FrameRecorder.Exception ignored) {} if (createTransterOrRecodeRecorder()) { log.error("\r\n{}\r\n切换到转码模式", cameraDto.getUrl()); return true; } log.error("\r\n{}\r\n切换转码模式失败", cameraDto.getUrl(), e); } } return recorderStatus = false; } private boolean supportFlvFormatCodec() { int vcodec = grabber.getVideoCodec(); int acodec = grabber.getAudioCodec(); return (cameraDto.getType() == 0) && ("desktop".equals(cameraDto.getUrl()) || avcodec.AV_CODEC_ID_H264 == vcodec || avcodec.AV_CODEC_ID_H263 == vcodec) && (avcodec.AV_CODEC_ID_AAC == acodec || avcodec.AV_CODEC_ID_AAC_LATM == acodec); } /*** ====== 主流程:转换为 FLV 并输出 ====== ***/ protected void transferStream2Flv() { try { if (enableDetection) initDetectors(); } catch (Exception e) { log.error("初始化检测模型失败:{}", e.getMessage(), e); // 模型失败也不中断推流,只是不画框 enableDetection = false; } if (!createGrabber()) return; // 如果未启用检测,且编解码本身支持 FLV,可以转复用提升性能 if (!enableDetection) transferFlag = supportFlvFormatCodec(); if (!createTransterOrRecodeRecorder()) return; try { grabber.flush(); } catch (FrameGrabber.Exception e) { log.info("清空拉流器缓存失败", e); } if (header == null) { header = bos.toByteArray(); bos.reset(); } running = true; listenClient(); long startTime = 0; long videoTS = 0; // === 若启用检测,启动“解码→推理→渲染”解耦线程 === Thread tDecode = null, tInfer = null; if (enableDetection) { // 解码线程:仅更新 latestFrame(覆盖式,不阻塞) tDecode = new Thread(() -> { while (running && grabberStatus) { try { Frame f = grabber.grabImage(); if (f == null) continue; Mat m = toMat.convert(f); if (m == null || m.empty()) continue; Mat copy = new Mat(m.rows(), m.cols(), CV_8UC3); m.copyTo(copy); Mat old = latestFrame.getAndSet(copy); if (old != null) old.release(); } catch (Exception e) { log.debug("decode err: {}", e.getMessage()); } } }, "det-decode"); // 推理线程:限速(默认 15 FPS),更新 latestDetections int inferFps = 15; long period = 1_000_000_000L / inferFps; tInfer = new Thread(() -> { long next = System.nanoTime(); while (running && grabberStatus) { long now = System.nanoTime(); if (now < next) { LockSupport.parkNanos(next - now); continue; } next += period; Mat src = latestFrame.get(); if (src == null || src.empty()) continue; Mat snap = new Mat(); src.copyTo(snap); try { List dets = detector.detect(snap); latestDetections.set(dets); } catch (Throwable e) { log.debug("infer err: {}", e.getMessage()); } finally { snap.release(); } } }, "det-infer"); // 抢先预热一次,避免前几帧无框 try { Frame warm = grabber.grabImage(); if (warm != null) { Mat wm = toMat.convert(warm); if (wm != null && !wm.empty() && detector != null) { latestDetections.set(detector.detect(wm)); } } } catch (Exception ignored) { } tDecode.start(); tInfer.start(); } // === 主发送循环(转复用/转码两种路径) === for (; running && grabberStatus && recorderStatus; ) { try { if (transferFlag) { // ---- 转复用(不画框)---- long startGrab = System.currentTimeMillis(); AVPacket pkt = grabber.grabPacket(); if ((System.currentTimeMillis() - startGrab) > 5000) { log.info("\r\n{}\r\n视频流网络异常>>>", cameraDto.getUrl()); closeMedia(); break; } if (pkt != null && !pkt.isNull()) { if (startTime == 0) startTime = System.currentTimeMillis(); videoTS = 1000 * (System.currentTimeMillis() - startTime); if (videoTS > recorder.getTimestamp()) recorder.setTimestamp(videoTS); recorder.recordPacket(pkt); } } else { // ---- 转码(可画框)---- long startGrab = System.currentTimeMillis(); Frame frame; if (enableDetection) { // 如果启用检测,解码线程已在跑;这里直接从 latestFrame 取,减少重复解码 Mat src = latestFrame.get(); if (src == null || src.empty()) continue; // 叠加最近一次检测结果 Overlay.draw(latestDetections.get(), src); frame = toMat.convert(src); } else { // 未开启检测:直接 grab 并转码 frame = grabber.grab(); } if ((System.currentTimeMillis() - startGrab) > 5000) { log.info("\r\n{}\r\n视频流网络异常>>>", cameraDto.getUrl()); closeMedia(); break; } if (frame != null) { if (startTime == 0) startTime = System.currentTimeMillis(); videoTS = 1000 * (System.currentTimeMillis() - startTime); if (videoTS > recorder.getTimestamp()) recorder.setTimestamp(videoTS); recorder.record(frame); } } } catch (FrameGrabber.Exception e) { grabberStatus = false; MediaService.cameras.remove(cameraDto.getMediaKey()); } catch (FrameRecorder.Exception e) { recorderStatus = false; MediaService.cameras.remove(cameraDto.getMediaKey()); } if (bos.size() > 0) { byte[] b = bos.toByteArray(); bos.reset(); sendFrameData(b); } } // === 收尾 === try { if (detector != null) try { detector.close(); } catch (Exception ignored) {} if (modelManager != null) try { modelManager.close(); } catch (Exception ignored) {} if (recorder != null) recorder.close(); if (grabber != null) grabber.close(); bos.close(); } catch (Exception ignored) { } finally { Mat m = latestFrame.getAndSet(null); if (m != null) m.release(); closeMedia(); } log.info("关闭媒体流-javacv,{} ", cameraDto.getUrl()); } /*** ====== 网络发送(原样保留) ====== ***/ private void sendFrameData(byte[] data) { // ws for (Map.Entry entry : wsClients.entrySet()) { try { if (entry.getValue().channel().isWritable()) { entry.getValue().writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(data))); } else { wsClients.remove(entry.getKey()); hasClient(); } } catch (Exception e) { wsClients.remove(entry.getKey()); hasClient(); e.printStackTrace(); } } // http for (Map.Entry entry : httpClients.entrySet()) { try { if (entry.getValue().channel().isWritable()) { entry.getValue().writeAndFlush(Unpooled.copiedBuffer(data)); } else { httpClients.remove(entry.getKey()); hasClient(); } } catch (Exception e) { httpClients.remove(entry.getKey()); hasClient(); e.printStackTrace(); } } } public void hasClient() { int newHcSize = httpClients.size(); int newWcSize = wsClients.size(); if (hcSize != newHcSize || wcSize != newWcSize) { hcSize = newHcSize; wcSize = newWcSize; log.info("\r\n{}\r\nhttp连接数:{}, ws连接数:{} \r\n", cameraDto.getUrl(), newHcSize, newWcSize); } if (!cameraDto.isAutoClose()) return; if (httpClients.isEmpty() && wsClients.isEmpty()) { if (noClient > cameraDto.getNoClientsDuration()) { closeMedia(); } else { noClient += 1000; } } else { noClient = 0; } } public void listenClient() { listenThread = new Thread(() -> { while (running) { hasClient(); try { Thread.sleep(1000); } catch (InterruptedException ignored) {} } }); listenThread.start(); } private void closeMedia() { running = false; MediaService.cameras.remove(cameraDto.getMediaKey()); for (Map.Entry entry : wsClients.entrySet()) { try { entry.getValue().close(); } catch (Exception ignored) {} finally { wsClients.remove(entry.getKey()); } } for (Map.Entry entry : httpClients.entrySet()) { try { entry.getValue().close(); } catch (Exception ignored) {} finally { httpClients.remove(entry.getKey()); } } } public void addClient(ChannelHandlerContext ctx, ClientType ctype) { int timeout = 0; while (true) { try { if (header != null) { try { if (ctx.channel().isWritable()) { if (ClientType.HTTP.getType() == ctype.getType()) { ChannelFuture future = ctx.writeAndFlush(Unpooled.copiedBuffer(header)); future.addListener(new GenericFutureListener>() { @Override public void operationComplete(Future future) { if (future.isSuccess()) { httpClients.put(ctx.channel().id().toString(), ctx); } } }); } else if (ClientType.WEBSOCKET.getType() == ctype.getType()) { ChannelFuture future = ctx.writeAndFlush( new BinaryWebSocketFrame(Unpooled.copiedBuffer(header))); future.addListener(new GenericFutureListener>() { @Override public void operationComplete(Future future) { if (future.isSuccess()) { wsClients.put(ctx.channel().id().toString(), ctx); } } }); } } } catch (Exception e) { e.printStackTrace(); } break; } Thread.sleep(50); timeout += 50; if (timeout > 30000) break; } catch (Exception e) { e.printStackTrace(); } } } @Override public void run() { transferStream2Flv(); } }