diff --git a/ruoyi-video/src/main/java/com/ruoyi/video/service/ImageStoreService.java b/ruoyi-video/src/main/java/com/ruoyi/video/service/ImageStoreService.java new file mode 100644 index 0000000..815ed0e --- /dev/null +++ b/ruoyi-video/src/main/java/com/ruoyi/video/service/ImageStoreService.java @@ -0,0 +1,17 @@ +package com.ruoyi.video.service; + +import org.bytedeco.opencv.opencv_core.Mat; + +/** + * 截取“叠好框的最新一帧”并存证(文件系统 / 数据库BLOB) + */ +public interface ImageStoreService { + + /** + * 从指定 device 的推流实例中,读取“叠好框”的最新一帧并保存。 + * @param deviceId 设备ID + * @return 文件路径,或 "db://image/{id}" + */ + String saveLastAnnotatedFrame(Long deviceId); + +} diff --git a/ruoyi-video/src/main/java/com/ruoyi/video/service/MediaService.java b/ruoyi-video/src/main/java/com/ruoyi/video/service/MediaService.java index 0d9a20d..14f7c04 100644 --- a/ruoyi-video/src/main/java/com/ruoyi/video/service/MediaService.java +++ b/ruoyi-video/src/main/java/com/ruoyi/video/service/MediaService.java @@ -8,9 +8,11 @@ import com.ruoyi.video.thread.MediaTransfer; import com.ruoyi.video.thread.MediaTransferFlvByFFmpeg; import com.ruoyi.video.thread.MediaTransferFlvByJavacv; import io.netty.channel.ChannelHandlerContext; -import java.util.concurrent.ConcurrentHashMap; import org.springframework.stereotype.Service; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + /** * 媒体服务,支持全局网络超时、读写超时、无人拉流持续时长自动关闭流等配置 * @Author: orange @@ -24,20 +26,19 @@ public class MediaService { */ public static ConcurrentHashMap cameras = new ConcurrentHashMap<>(); - /** * http-flv播放 - * @param cameraDto - * @param ctx + * @param cameraDto 摄像头配置 + * @param ctx Netty上下文 */ public void playForHttp(CameraDto cameraDto, ChannelHandlerContext ctx) { if (cameras.containsKey(cameraDto.getMediaKey())) { MediaTransfer mediaConvert = cameras.get(cameraDto.getMediaKey()); - if(mediaConvert instanceof MediaTransferFlvByJavacv) { + if (mediaConvert instanceof MediaTransferFlvByJavacv) { MediaTransferFlvByJavacv mediaTransferFlvByJavacv = (MediaTransferFlvByJavacv) mediaConvert; //如果当前已经用ffmpeg,则重新拉流 - if(cameraDto.isEnabledFFmpeg()) { + if (cameraDto.isEnabledFFmpeg()) { mediaTransferFlvByJavacv.setRunning(false); cameras.remove(cameraDto.getMediaKey()); this.playForHttp(cameraDto, ctx); @@ -47,7 +48,7 @@ public class MediaService { } else if (mediaConvert instanceof MediaTransferFlvByFFmpeg) { MediaTransferFlvByFFmpeg mediaTransferFlvByFFmpeg = (MediaTransferFlvByFFmpeg) mediaConvert; //如果当前已经用javacv,则关闭再重新拉流 - if(!cameraDto.isEnabledFFmpeg()) { + if (!cameraDto.isEnabledFFmpeg()) { mediaTransferFlvByFFmpeg.stopFFmpeg(); cameras.remove(cameraDto.getMediaKey()); this.playForHttp(cameraDto, ctx); @@ -57,7 +58,7 @@ public class MediaService { } } else { - if(cameraDto.isEnabledFFmpeg()) { + if (cameraDto.isEnabledFFmpeg()) { MediaTransferFlvByFFmpeg mediaft = new MediaTransferFlvByFFmpeg(cameraDto); mediaft.execute(); cameras.put(cameraDto.getMediaKey(), mediaft); @@ -74,17 +75,17 @@ public class MediaService { /** * ws-flv播放 - * @param cameraDto - * @param ctx + * @param cameraDto 摄像头配置 + * @param ctx Netty上下文 */ public void playForWs(CameraDto cameraDto, ChannelHandlerContext ctx) { if (cameras.containsKey(cameraDto.getMediaKey())) { MediaTransfer mediaConvert = cameras.get(cameraDto.getMediaKey()); - if(mediaConvert instanceof MediaTransferFlvByJavacv) { + if (mediaConvert instanceof MediaTransferFlvByJavacv) { MediaTransferFlvByJavacv mediaTransferFlvByJavacv = (MediaTransferFlvByJavacv) mediaConvert; //如果当前已经用ffmpeg,则重新拉流 - if(cameraDto.isEnabledFFmpeg()) { + if (cameraDto.isEnabledFFmpeg()) { mediaTransferFlvByJavacv.setRunning(false); cameras.remove(cameraDto.getMediaKey()); this.playForWs(cameraDto, ctx); @@ -94,7 +95,7 @@ public class MediaService { } else if (mediaConvert instanceof MediaTransferFlvByFFmpeg) { MediaTransferFlvByFFmpeg mediaTransferFlvByFFmpeg = (MediaTransferFlvByFFmpeg) mediaConvert; //如果当前已经用javacv,则关闭再重新拉流 - if(!cameraDto.isEnabledFFmpeg()) { + if (!cameraDto.isEnabledFFmpeg()) { mediaTransferFlvByFFmpeg.stopFFmpeg(); cameras.remove(cameraDto.getMediaKey()); this.playForWs(cameraDto, ctx); @@ -103,7 +104,7 @@ public class MediaService { } } } else { - if(cameraDto.isEnabledFFmpeg()) { + if (cameraDto.isEnabledFFmpeg()) { MediaTransferFlvByFFmpeg mediaft = new MediaTransferFlvByFFmpeg(cameraDto); mediaft.execute(); cameras.put(cameraDto.getMediaKey(), mediaft); @@ -119,8 +120,8 @@ public class MediaService { /** * api播放 - * @param cameraDto - * @return + * @param cameraDto 摄像头配置 + * @return 是否启动成功 */ public boolean playForApi(CameraDto cameraDto) { // 区分不同媒体 @@ -130,7 +131,7 @@ public class MediaService { MediaTransfer mediaTransfer = cameras.get(cameraDto.getMediaKey()); if (null == mediaTransfer) { - if(cameraDto.isEnabledFFmpeg()) { + if (cameraDto.isEnabledFFmpeg()) { MediaTransferFlvByFFmpeg mediaft = new MediaTransferFlvByFFmpeg(cameraDto); mediaft.execute(); cameras.put(cameraDto.getMediaKey(), mediaft); @@ -143,7 +144,7 @@ public class MediaService { mediaTransfer = cameras.get(cameraDto.getMediaKey()); //同步等待 - if(mediaTransfer instanceof MediaTransferFlvByJavacv) { + if (mediaTransfer instanceof MediaTransferFlvByJavacv) { MediaTransferFlvByJavacv mediaft = (MediaTransferFlvByJavacv) mediaTransfer; // 30秒还没true认为启动不了 for (int i = 0; i < 60; i++) { @@ -153,6 +154,7 @@ public class MediaService { try { Thread.sleep(500); } catch (InterruptedException e) { + // ignore } } } else if (mediaTransfer instanceof MediaTransferFlvByFFmpeg) { @@ -165,6 +167,7 @@ public class MediaService { try { Thread.sleep(500); } catch (InterruptedException e) { + // ignore } } } @@ -173,14 +176,14 @@ public class MediaService { /** * 关闭流 - * @param cameraDto + * @param cameraDto 摄像头配置 */ public void closeForApi(CameraDto cameraDto) { cameraDto.setEnabledFlv(false); if (cameras.containsKey(cameraDto.getMediaKey())) { MediaTransfer mediaConvert = cameras.get(cameraDto.getMediaKey()); - if(mediaConvert instanceof MediaTransferFlvByJavacv) { + if (mediaConvert instanceof MediaTransferFlvByJavacv) { MediaTransferFlvByJavacv mediaTransferFlvByJavacv = (MediaTransferFlvByJavacv) mediaConvert; mediaTransferFlvByJavacv.setRunning(false); cameras.remove(cameraDto.getMediaKey()); @@ -192,4 +195,62 @@ public class MediaService { } } + /* =========================== 新增便捷方法 =========================== */ + + /** 直接从缓存取 MediaTransfer(可能是 FFmpeg 或 JavaCV)。不存在返回 null。 */ + public MediaTransfer getMedia(String mediaKey) { + return cameras.get(mediaKey); + } + + /** 只取 JavaCV 实例;如果不是 JavaCV 或不存在则返回 null。 */ + public MediaTransferFlvByJavacv getJavacv(String mediaKey) { + MediaTransfer mt = cameras.get(mediaKey); + return (mt instanceof MediaTransferFlvByJavacv) ? (MediaTransferFlvByJavacv) mt : null; + } + + /** + * 取或启动 JavaCV 实例: + * - 已有 JavaCV:直接返回 + * - 已有 FFmpeg:先停止 FFmpeg,再切换 JavaCV + * - 不存在:启动 JavaCV + * + * @param cameraDto 需包含 url / mediaKey(mediaKey 为空则用 url 的 MD5 生成) + * @param beforeStart 启动前对 cameraDto 做一次定制(可 null),例如 dto -> dto.setEnableDetection(true) + */ + public MediaTransferFlvByJavacv getOrStartJavacv(CameraDto cameraDto, Consumer beforeStart) { + // 兜底 mediaKey + if (cameraDto.getMediaKey() == null || cameraDto.getMediaKey().isEmpty()) { + String mediaKey = MD5.create().digestHex(cameraDto.getUrl()); + cameraDto.setMediaKey(mediaKey); + } + + MediaTransfer mt = cameras.get(cameraDto.getMediaKey()); + if (mt instanceof MediaTransferFlvByJavacv) { + return (MediaTransferFlvByJavacv) mt; + } + + // 若已存在 FFmpeg 实例,先停掉 + if (mt instanceof MediaTransferFlvByFFmpeg) { + ((MediaTransferFlvByFFmpeg) mt).stopFFmpeg(); + cameras.remove(cameraDto.getMediaKey()); + } + + // 启动 JavaCV + if (beforeStart != null) beforeStart.accept(cameraDto); + MediaTransferFlvByJavacv mediaConvert = new MediaTransferFlvByJavacv(cameraDto); + cameras.put(cameraDto.getMediaKey(), mediaConvert); + ThreadUtil.execute(mediaConvert); + return mediaConvert; + } + + /** 可选:根据 mediaKey 强制停止并移除(两种实现都兼容) */ + public void stopByMediaKey(String mediaKey) { + MediaTransfer mt = cameras.get(mediaKey); + if (mt instanceof MediaTransferFlvByJavacv) { + ((MediaTransferFlvByJavacv) mt).setRunning(false); + } else if (mt instanceof MediaTransferFlvByFFmpeg) { + ((MediaTransferFlvByFFmpeg) mt).stopFFmpeg(); + } + cameras.remove(mediaKey); + } } diff --git a/ruoyi-video/src/main/java/com/ruoyi/video/service/impl/FileImageStoreServiceImpl.java b/ruoyi-video/src/main/java/com/ruoyi/video/service/impl/FileImageStoreServiceImpl.java new file mode 100644 index 0000000..dc8a73d --- /dev/null +++ b/ruoyi-video/src/main/java/com/ruoyi/video/service/impl/FileImageStoreServiceImpl.java @@ -0,0 +1,126 @@ +package com.ruoyi.video.service.impl; + +import com.ruoyi.common.config.RuoYiConfig; +import com.ruoyi.common.utils.file.FileUploadUtils; +import com.ruoyi.video.domain.Device; +import com.ruoyi.video.service.IDeviceService; +import com.ruoyi.video.service.ImageStoreService; +import com.ruoyi.video.service.MediaService; +import com.ruoyi.video.thread.MediaTransferFlvByJavacv; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.bytedeco.javacpp.BytePointer; +import org.bytedeco.opencv.opencv_core.Mat; +import org.springframework.stereotype.Service; +import org.springframework.web.multipart.MultipartFile; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; + +import static org.bytedeco.opencv.global.opencv_imgcodecs.imencode; + +/** + * 后端从视频流抓帧 -> JPEG -> 包成 MultipartFile -> 交给 FileUploadUtils.upload 存储。 + */ +@Slf4j +@Service +@RequiredArgsConstructor +public class FileImageStoreServiceImpl implements ImageStoreService { + + private final IDeviceService deviceService; + private final MediaService mediaService; + + @Override + public String saveLastAnnotatedFrame(Long deviceId) { + // 1) 定位设备 & JavaCV 实例 + Device device = deviceService.selectDeviceByDeviceId(deviceId); + if (device == null) throw new IllegalArgumentException("device not found: " + deviceId); + + MediaTransferFlvByJavacv mt = mediaService.getJavacv(device.getMediaKey()); + if (mt == null) { + throw new IllegalStateException("media (javacv) not running for mediaKey=" + device.getMediaKey()); + } + + // 2) 取“叠好框”的最近一帧 + Mat mat = mt.getLatestAnnotatedFrameCopy(); + if (mat == null || mat.empty()) { + throw new IllegalStateException("no annotated frame available currently."); + } + + try { + // 3) 编码为 JPEG 字节 + BytePointer buf = new BytePointer(); + if (!imencode(".jpg", mat, buf) || buf.isNull() || buf.limit() <= 0) { + throw new IllegalStateException("encode jpeg failed."); + } + byte[] bytes = new byte[(int) buf.limit()]; + buf.get(bytes); + + // 4) 计算保存目录:{profile}/snapshots/device-{deviceId}/ + String profile = RuoYiConfig.getProfile(); + String uploadBaseDir = Paths.get(profile, "snapshots", "device-" + deviceId).toString(); + + // 5) 生成文件名(给 MultipartFile 用;最终 FileUploadUtils 会做统一命名与日期分桶) + String fileName = buildFileName(deviceId); + + // 6) 把字节包成 MultipartFile,走若依工具存储 + MultipartFile multipart = new InMemoryMultipartFile( + "file", + fileName, + "image/jpeg", + bytes + ); + + String stored = FileUploadUtils.upload(uploadBaseDir, multipart); + log.info("snapshot saved by FileUploadUtils: {}", stored); + return stored; // 形如 /profile/snapshots/device-x/20250927/xxx.jpg + } catch (Exception e) { + log.error("saveLastAnnotatedFrame failed: {}", e.getMessage(), e); + throw new RuntimeException("save snapshot failed", e); + } finally { + try { mat.release(); } catch (Exception ignore) {} + } + } + + private String buildFileName(Long deviceId) { + ZonedDateTime now = ZonedDateTime.now(ZoneId.systemDefault()); + String ts = now.format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss_SSS")); + return "cam" + deviceId + "_" + ts + ".jpg"; + } + + /** + * 轻量内存 MultipartFile,不依赖 spring-test。 + */ + static class InMemoryMultipartFile implements MultipartFile { + private final String name; + private final String originalFilename; + private final String contentType; + private final byte[] content; + + InMemoryMultipartFile(String name, String originalFilename, String contentType, byte[] content) { + this.name = name; + this.originalFilename = originalFilename; + this.contentType = contentType; + this.content = content != null ? content : new byte[0]; + } + + @Override public String getName() { return name; } + @Override public String getOriginalFilename() { return originalFilename; } + @Override public String getContentType() { return contentType; } + @Override public boolean isEmpty() { return content.length == 0; } + @Override public long getSize() { return content.length; } + @Override public byte[] getBytes() { return content; } + @Override public InputStream getInputStream() { return new ByteArrayInputStream(content); } + @Override public void transferTo(java.io.File dest) throws IOException { + try (var in = getInputStream(); var out = new java.io.FileOutputStream(dest)) { + in.transferTo(out); + } + } + } +} diff --git a/ruoyi-video/src/main/java/com/ruoyi/video/thread/MediaTransferFlvByJavacv.java b/ruoyi-video/src/main/java/com/ruoyi/video/thread/MediaTransferFlvByJavacv.java index e76400a..34d147f 100644 --- a/ruoyi-video/src/main/java/com/ruoyi/video/thread/MediaTransferFlvByJavacv.java +++ b/ruoyi-video/src/main/java/com/ruoyi/video/thread/MediaTransferFlvByJavacv.java @@ -1,29 +1,29 @@ package com.ruoyi.video.thread; import com.ruoyi.video.common.ClientType; -import com.ruoyi.video.common.ModelManager; // ★ 新增:多模型管理(见前面提供的类) -import com.ruoyi.video.domain.Detection; // ★ 新增:检测结果(见前面提供的类) +import com.ruoyi.video.common.ModelManager; +import com.ruoyi.video.domain.Detection; import com.ruoyi.video.domain.dto.CameraDto; import com.ruoyi.video.service.MediaService; -import com.ruoyi.video.thread.detector.CompositeDetector; // ★ 新增:并行多模型 -import com.ruoyi.video.thread.detector.YoloDetector; // ★ 新增:检测接口 -import com.ruoyi.video.utils.Overlay; // ★ 新增:画框工具 +import com.ruoyi.video.thread.detector.CompositeDetector; +import com.ruoyi.video.thread.detector.YoloDetector; +import com.ruoyi.video.utils.Overlay; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.bytedeco.ffmpeg.avcodec.AVPacket; import org.bytedeco.ffmpeg.global.avcodec; import org.bytedeco.ffmpeg.global.avutil; import org.bytedeco.javacv.*; import org.bytedeco.opencv.opencv_core.Mat; -import org.springframework.scheduling.annotation.Async; +import org.springframework.util.CollectionUtils; import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.net.URL; import java.util.List; import java.util.Map; @@ -34,18 +34,46 @@ import java.util.concurrent.locks.LockSupport; import static org.bytedeco.opencv.global.opencv_core.CV_8UC3; /** - * @Author: orange - * @CreateTime: 2025-01-16 + * 推流(FLV) + JavaCV 解码/转码 + (可选)YOLO 检测叠框 + * - 支持“窗口巡检”:在给定秒数内启用推理与统计,并通过 DetectionListener 回调让上层落库/告警 + * - 播放开始可触发 10 秒试跑:attachDetectionListener(jobId, deviceId, 10, listener) + * + * 依赖:ModelManager / YoloDetector / CompositeDetector / Detection / Overlay / MediaService / CameraDto / ClientType + * + * @author orange + * @since 2025-01-16 */ @Slf4j public class MediaTransferFlvByJavacv extends MediaTransfer implements Runnable { + /* ===================== 内部回调/统计类型(如已外部定义,可移除) ===================== */ + + public interface DetectionListener { + /** 每次推理得到 detections 时回调(建议上层做节流) */ + void onDetections(Long jobId, Long deviceId, List detections, long frameTsMs); + /** 一个“窗口巡检”结束时回调(含统计数据) */ + void onWindowFinished(Long jobId, Long deviceId, WindowStats stats); + } + + @Data + public static class WindowStats { + private int frames; + private int detectedFrames; + private int objects; + private double maxScore; + private long startMs; + private long endMs; + } + + /* ===================== FFmpeg/JavaCV 初始化 ===================== */ + static { avutil.av_log_set_level(avutil.AV_LOG_ERROR); FFmpegLogCallback.set(); } - /*** ====== 原有字段 ====== ***/ + /* ===================== 原有字段 ===================== */ + private final ConcurrentHashMap wsClients = new ConcurrentHashMap<>(); private final ConcurrentHashMap httpClients = new ConcurrentHashMap<>(); @@ -61,76 +89,169 @@ public class MediaTransferFlvByJavacv extends MediaTransfer implements Runnable private FFmpegFrameGrabber grabber; // 拉流器 private FFmpegFrameRecorder recorder; // 推流录制器 - /** true:转复用,false:转码 */ - private boolean transferFlag = false; // 默认转码 + /** true: 转复用;false: 转码。启用检测时强制转码(要在像素上叠框) */ + private boolean transferFlag = false; private final CameraDto cameraDto; private Thread listenThread; - /*** ====== 新增:推理相关字段 ====== ***/ - // 开关:是否启用检测(可对外提供 setter) + /* ===================== 推理相关字段 ===================== */ + + // 外部开关:是否启用检测(默认启用;也可由任务/页面配置动态设置) private boolean enableDetection = true; - // 模型与推理 private ModelManager modelManager; private YoloDetector detector; - // 三线程解耦所需 + // 解码/推理/发送解耦 private final OpenCVFrameConverter.ToMat toMat = new OpenCVFrameConverter.ToMat(); private final AtomicReference latestFrame = new AtomicReference<>(); private final AtomicReference> latestDetections = new AtomicReference<>(java.util.Collections.emptyList()); + // 窗口巡检控制 + private volatile boolean windowMode = false; + private volatile long windowEndMs = 0L; + private Long currentJobId; + private Long currentDeviceId; + private DetectionListener detectionListener; + private final WindowStats stats = new WindowStats(); + + // 导出最近一次“叠好框的帧”用于截图存证 + private final AtomicReference latestAnnotatedFrame = new AtomicReference<>(); + public MediaTransferFlvByJavacv(CameraDto cameraDto) { super(); this.cameraDto = cameraDto; } + public void setRunning(boolean running) { + boolean prev = this.running; + this.running = running; + // 如果是从 true -> false,则按“关闭”处理 + if (prev && !running) { + try { + closeMedia(); // 内部会 stopWindowIfAny()、关闭连接等 + } catch (Exception ignore) {} + } + } + + /** 推荐的新接口:显式停止并释放资源 */ + public void stop() { + setRunning(false); + } + + /* ===================== 外部控制 API ===================== */ + public boolean isRunning() { return running; } - public void setRunning(boolean running) { this.running = running; } public boolean isGrabberStatus() { return grabberStatus; } - public void setGrabberStatus(boolean grabberStatus) { this.grabberStatus = grabberStatus; } public boolean isRecorderStatus() { return recorderStatus; } - public void setRecorderStatus(boolean recorderStatus) { this.recorderStatus = recorderStatus; } public void setEnableDetection(boolean enable) { this.enableDetection = enable; } - /*** ====== 推理初始化 ====== ***/ + /** + * 开启一个“窗口巡检”,持续 windowSeconds 秒;期间每次推理回调 onDetections,结束时 onWindowFinished + */ + public void attachDetectionListener(Long jobId, Long deviceId, int windowSeconds, DetectionListener listener) { + if (windowSeconds <= 0 || listener == null) return; + this.currentJobId = jobId; + this.currentDeviceId = deviceId; + this.detectionListener = listener; + this.windowMode = true; + long now = System.currentTimeMillis(); + this.stats.setStartMs(now); + this.windowEndMs = now + windowSeconds * 1000L; + this.stats.setFrames(0); + this.stats.setDetectedFrames(0); + this.stats.setObjects(0); + this.stats.setMaxScore(0.0); + log.info("[job:{} device:{}] window started {}s", jobId, deviceId, windowSeconds); + } + + /** 主动结束当前窗口(可用于任务被中断的场景) */ + public void stopWindowIfAny() { + if (!windowMode) return; + this.windowMode = false; + stats.setEndMs(System.currentTimeMillis()); + if (detectionListener != null && currentJobId != null && currentDeviceId != null) { + try { + detectionListener.onWindowFinished(currentJobId, currentDeviceId, cloneStats(stats)); + } catch (Exception ignore) {} + } + currentJobId = null; + currentDeviceId = null; + detectionListener = null; + log.info("window finished (stopWindowIfAny)"); + } + + /** 导出最近一次“叠好框的帧”(深拷贝),用于截图/存证。调用方负责释放 Mat */ + public Mat getLatestAnnotatedFrameCopy() { + Mat src = latestAnnotatedFrame.get(); + if (src == null || src.empty()) return null; + Mat copy = new Mat(src.rows(), src.cols(), src.type()); + src.copyTo(copy); + return copy; + } + + /* ===================== 初始化推理 ===================== */ + private void initDetectors() throws Exception { if (!enableDetection) return; + modelManager = new ModelManager(); URL json = getClass().getResource("/models/models.json"); modelManager.load(json); - // 单模型: detector = modelManager.get("person-helmet"); - // 多模型并行(示例),并行度按CPU核数/模型大小调整: + // 你可按需切换单模型或多模型并行 + // detector = modelManager.get("person-helmet"); detector = new CompositeDetector( "all-models", - java.util.List.of(modelManager.get("person-helmet"), modelManager.get("vehicle-plate")), - 2 + java.util.List.of( + modelManager.get("person-helmet"), + modelManager.get("vehicle-plate") + ), + 2 // 并行度 ); log.info("YOLO detectors ready: {}", detector.name()); + + // 预热一次,避免前几帧“无框” + try { + Frame warm = grabber != null ? grabber.grabImage() : null; + if (warm != null) { + Mat wm = toMat.convert(warm); + if (wm != null && !wm.empty()) { + long t0 = System.currentTimeMillis(); + List dets = detector.detect(wm); + long cost = System.currentTimeMillis() - t0; + latestDetections.set(dets); + log.info("Detector warm-up OK, cost={}ms, dets={}", cost, + CollectionUtils.isEmpty(dets) ? 0 : dets.size()); + } + } + } catch (Throwable e) { + log.warn("Detector warm-up failed: {}", e.getMessage()); + } } - /*** ====== 拉流器 ====== ***/ + /* ===================== 拉流/推流 ===================== */ + protected boolean createGrabber() { grabber = new FFmpegFrameGrabber(cameraDto.getUrl()); - // 注意:这些是微秒字符串 String fiveSecUs = "5000000"; String oneMb = "1048576"; grabber.setOption("threads", "1"); grabber.setOption("buffer_size", oneMb); grabber.setOption("rw_timeout", fiveSecUs); - grabber.setOption("stimeout", fiveSecUs); - grabber.setOption("probesize", "1048576"); // ← 修正:probesize 是“字节” + grabber.setOption("stimeout", fiveSecUs); + grabber.setOption("probesize", "1048576"); grabber.setOption("analyzeduration", fiveSecUs); grabber.setOption("fflags", "nobuffer"); - grabber.setOption("flags", "low_delay"); - grabber.setOption("loglevel", "error"); // 稳定后压低日志 + grabber.setOption("flags", "low_delay"); + grabber.setOption("loglevel", "error"); if (cameraDto.getUrl().toLowerCase().startsWith("rtsp://")) { - grabber.setOption("rtsp_transport", "tcp"); // 你要测 UDP 再改 + grabber.setOption("rtsp_transport", "tcp"); grabber.setOption("allowed_media_types", "video"); grabber.setOption("max_delay", "500000"); grabber.setOption("user_agent", "Lavf/60"); @@ -147,19 +268,17 @@ public class MediaTransferFlvByJavacv extends MediaTransfer implements Runnable try { grabber.start(); - log.info("\n{}\n启动拉流器成功", cameraDto.getUrl()); + log.info("启动拉流器成功: {}", cameraDto.getUrl()); return (grabberStatus = true); } catch (FrameGrabber.Exception e) { MediaService.cameras.remove(cameraDto.getMediaKey()); - log.error("\n{}\n启动拉流器失败,网络超时或视频源不可用({})", - cameraDto.getUrl(), e.getMessage()); + log.error("启动拉流器失败: {} ({})", cameraDto.getUrl(), e.getMessage()); return (grabberStatus = false); } } - /*** ====== 录制器(转码/转复用) ====== ***/ protected boolean createTransterOrRecodeRecorder() { - // 若启用检测,必须转码(因为需要在像素上画框) + // 启用检测时必须转码(需要像素级叠框) if (enableDetection) transferFlag = false; recorder = new FFmpegFrameRecorder(bos, grabber.getImageWidth(), grabber.getImageHeight(), @@ -167,7 +286,7 @@ public class MediaTransferFlvByJavacv extends MediaTransfer implements Runnable recorder.setFormat("flv"); if (!transferFlag) { - // 转码(低延迟 H.264) + // 转码:低延迟 H.264 recorder.setInterleaved(false); recorder.setVideoOption("tune", "zerolatency"); recorder.setVideoOption("preset", "ultrafast"); @@ -188,20 +307,20 @@ public class MediaTransferFlvByJavacv extends MediaTransfer implements Runnable MediaService.cameras.remove(cameraDto.getMediaKey()); } } else { - // 转复用(不画框时可用) + // 转复用(仅不叠框时) recorder.setCloseOutputStream(false); try { recorder.start(grabber.getFormatContext()); return recorderStatus = true; } catch (FrameRecorder.Exception e) { - log.warn("\r\n{}\r\n启动转复用录制器失败,自动切换转码", cameraDto.getUrl()); + log.warn("{} 启动转复用失败,自动切换转码", cameraDto.getUrl()); transferFlag = false; try { recorder.stop(); } catch (FrameRecorder.Exception ignored) {} if (createTransterOrRecodeRecorder()) { - log.error("\r\n{}\r\n切换到转码模式", cameraDto.getUrl()); + log.error("{} 切换到转码模式", cameraDto.getUrl()); return true; } - log.error("\r\n{}\r\n切换转码模式失败", cameraDto.getUrl(), e); + log.error("{} 切换转码模式失败", cameraDto.getUrl(), e); } } return recorderStatus = false; @@ -216,23 +335,21 @@ public class MediaTransferFlvByJavacv extends MediaTransfer implements Runnable && (avcodec.AV_CODEC_ID_AAC == acodec || avcodec.AV_CODEC_ID_AAC_LATM == acodec); } - /*** ====== 主流程:转换为 FLV 并输出 ====== ***/ + /* ===================== 主流程 ===================== */ + protected void transferStream2Flv() { try { if (enableDetection) initDetectors(); } catch (Exception e) { log.error("初始化检测模型失败:{}", e.getMessage(), e); - // 模型失败也不中断推流,只是不画框 - enableDetection = false; + enableDetection = false; // 模型失败不影响推流 } if (!createGrabber()) return; - // 如果未启用检测,且编解码本身支持 FLV,可以转复用提升性能 if (!enableDetection) transferFlag = supportFlvFormatCodec(); - if (!createTransterOrRecodeRecorder()) return; - try { grabber.flush(); } catch (FrameGrabber.Exception e) { log.info("清空拉流器缓存失败", e); } + try { grabber.flush(); } catch (FrameGrabber.Exception e) { log.debug("flush grabber fail", e); } if (header == null) { header = bos.toByteArray(); bos.reset(); @@ -242,12 +359,11 @@ public class MediaTransferFlvByJavacv extends MediaTransfer implements Runnable listenClient(); long startTime = 0; - long videoTS = 0; + long videoTS; - // === 若启用检测,启动“解码→推理→渲染”解耦线程 === + // === 解耦线程:解码 / 推理 === Thread tDecode = null, tInfer = null; if (enableDetection) { - // 解码线程:仅更新 latestFrame(覆盖式,不阻塞) tDecode = new Thread(() -> { while (running && grabberStatus) { try { @@ -265,7 +381,6 @@ public class MediaTransferFlvByJavacv extends MediaTransfer implements Runnable } }, "det-decode"); - // 推理线程:限速(默认 15 FPS),更新 latestDetections int inferFps = 15; long period = 1_000_000_000L / inferFps; tInfer = new Thread(() -> { @@ -277,10 +392,19 @@ public class MediaTransferFlvByJavacv extends MediaTransfer implements Runnable Mat src = latestFrame.get(); if (src == null || src.empty()) continue; - Mat snap = new Mat(); src.copyTo(snap); + + Mat snap = new Mat(); + src.copyTo(snap); try { List dets = detector.detect(snap); latestDetections.set(dets); + + // 窗口巡检期间:回调 onDetections + if (windowMode && detectionListener != null && currentJobId != null && currentDeviceId != null) { + long ts = System.currentTimeMillis(); + try { detectionListener.onDetections(currentJobId, currentDeviceId, dets, ts); } + catch (Exception ignore) {} + } } catch (Throwable e) { log.debug("infer err: {}", e.getMessage()); } finally { @@ -289,30 +413,19 @@ public class MediaTransferFlvByJavacv extends MediaTransfer implements Runnable } }, "det-infer"); - // 抢先预热一次,避免前几帧无框 - try { - Frame warm = grabber.grabImage(); - if (warm != null) { - Mat wm = toMat.convert(warm); - if (wm != null && !wm.empty() && detector != null) { - latestDetections.set(detector.detect(wm)); - } - } - } catch (Exception ignored) { } - tDecode.start(); tInfer.start(); } - // === 主发送循环(转复用/转码两种路径) === + // === 主发送循环 === for (; running && grabberStatus && recorderStatus; ) { try { if (transferFlag) { - // ---- 转复用(不画框)---- + // 仅转复用(未叠框) long startGrab = System.currentTimeMillis(); AVPacket pkt = grabber.grabPacket(); if ((System.currentTimeMillis() - startGrab) > 5000) { - log.info("\r\n{}\r\n视频流网络异常>>>", cameraDto.getUrl()); + log.info("{} 网络异常(复用)", cameraDto.getUrl()); closeMedia(); break; } @@ -323,30 +436,44 @@ public class MediaTransferFlvByJavacv extends MediaTransfer implements Runnable recorder.recordPacket(pkt); } } else { - // ---- 转码(可画框)---- + // 转码(可叠框) long startGrab = System.currentTimeMillis(); Frame frame; + if (enableDetection) { - // 如果启用检测,解码线程已在跑;这里直接从 latestFrame 取,减少重复解码 Mat src = latestFrame.get(); if (src == null || src.empty()) continue; // 叠加最近一次检测结果 - Overlay.draw(latestDetections.get(), src); + List dets = latestDetections.get(); + if (!CollectionUtils.isEmpty(dets)) { + Overlay.draw(dets, src); + } + // 更新“最近叠好框的帧”用于存证 + updateLatestAnnotated(src); + + // 统计(仅窗口巡检时) + if (windowMode) updateStats(dets); + + // 窗口结束判定 + if (windowMode && System.currentTimeMillis() >= windowEndMs) { + finishWindow(); + } + frame = toMat.convert(src); } else { - // 未开启检测:直接 grab 并转码 frame = grabber.grab(); } if ((System.currentTimeMillis() - startGrab) > 5000) { - log.info("\r\n{}\r\n视频流网络异常>>>", cameraDto.getUrl()); + log.info("{} 网络异常(转码)", cameraDto.getUrl()); closeMedia(); break; } if (frame != null) { - if (startTime == 0) startTime = System.currentTimeMillis(); - videoTS = 1000 * (System.currentTimeMillis() - startTime); + long now = System.currentTimeMillis(); + if (startTime == 0) startTime = now; + videoTS = 1000 * (now - startTime); if (videoTS > recorder.getTimestamp()) recorder.setTimestamp(videoTS); recorder.record(frame); } @@ -359,6 +486,7 @@ public class MediaTransferFlvByJavacv extends MediaTransfer implements Runnable MediaService.cameras.remove(cameraDto.getMediaKey()); } + // 输出缓存到客户端 if (bos.size() > 0) { byte[] b = bos.toByteArray(); bos.reset(); @@ -371,18 +499,65 @@ public class MediaTransferFlvByJavacv extends MediaTransfer implements Runnable if (detector != null) try { detector.close(); } catch (Exception ignored) {} if (modelManager != null) try { modelManager.close(); } catch (Exception ignored) {} if (recorder != null) recorder.close(); - if (grabber != null) grabber.close(); + if (grabber != null) grabber.close(); bos.close(); } catch (Exception ignored) { } finally { Mat m = latestFrame.getAndSet(null); if (m != null) m.release(); + Mat a = latestAnnotatedFrame.getAndSet(null); + if (a != null) a.release(); closeMedia(); } - log.info("关闭媒体流-javacv,{} ", cameraDto.getUrl()); + log.info("关闭媒体流-javacv: {}", cameraDto.getUrl()); } - /*** ====== 网络发送(原样保留) ====== ***/ + /* ===================== 统计 / 窗口结束 ===================== */ + + private void updateStats(List dets) { + stats.setFrames(stats.getFrames() + 1); + if (dets != null && !dets.isEmpty()) { + stats.setDetectedFrames(stats.getDetectedFrames() + 1); + stats.setObjects(stats.getObjects() + dets.size()); + double localMax = dets.stream().mapToDouble(Detection::conf).max().orElse(0.0); + if (localMax > stats.getMaxScore()) stats.setMaxScore(localMax); + } + } + + private void finishWindow() { + windowMode = false; + stats.setEndMs(System.currentTimeMillis()); + if (detectionListener != null && currentJobId != null && currentDeviceId != null) { + try { detectionListener.onWindowFinished(currentJobId, currentDeviceId, cloneStats(stats)); } + catch (Exception ignore) {} + } + currentJobId = null; + currentDeviceId = null; + detectionListener = null; + log.info("window finished (timeout)"); + } + + private static WindowStats cloneStats(WindowStats s) { + WindowStats c = new WindowStats(); + c.setFrames(s.getFrames()); + c.setDetectedFrames(s.getDetectedFrames()); + c.setObjects(s.getObjects()); + c.setMaxScore(s.getMaxScore()); + c.setStartMs(s.getStartMs()); + c.setEndMs(s.getEndMs()); + return c; + } + + private void updateLatestAnnotated(Mat src) { + if (src == null || src.empty()) return; + Mat copy = new Mat(src.rows(), src.cols(), src.type()); + src.copyTo(copy); + Mat old = latestAnnotatedFrame.getAndSet(copy); + if (old != null) old.release(); + } + + /* ===================== 网络发送/连接管理 ===================== */ + private void sendFrameData(byte[] data) { // ws for (Map.Entry entry : wsClients.entrySet()) { @@ -396,7 +571,7 @@ public class MediaTransferFlvByJavacv extends MediaTransfer implements Runnable } catch (Exception e) { wsClients.remove(entry.getKey()); hasClient(); - e.printStackTrace(); + log.debug("ws send err", e); } } // http @@ -411,7 +586,7 @@ public class MediaTransferFlvByJavacv extends MediaTransfer implements Runnable } catch (Exception e) { httpClients.remove(entry.getKey()); hasClient(); - e.printStackTrace(); + log.debug("http send err", e); } } } @@ -422,7 +597,7 @@ public class MediaTransferFlvByJavacv extends MediaTransfer implements Runnable if (hcSize != newHcSize || wcSize != newWcSize) { hcSize = newHcSize; wcSize = newWcSize; - log.info("\r\n{}\r\nhttp连接数:{}, ws连接数:{} \r\n", cameraDto.getUrl(), newHcSize, newWcSize); + log.info("{} http连接数:{}, ws连接数:{}", cameraDto.getUrl(), newHcSize, newWcSize); } if (!cameraDto.isAutoClose()) return; @@ -448,15 +623,16 @@ public class MediaTransferFlvByJavacv extends MediaTransfer implements Runnable } private void closeMedia() { + // 结束窗口(如果还在) + stopWindowIfAny(); + running = false; MediaService.cameras.remove(cameraDto.getMediaKey()); for (Map.Entry entry : wsClients.entrySet()) { - try { entry.getValue().close(); } catch (Exception ignored) {} - finally { wsClients.remove(entry.getKey()); } + try { entry.getValue().close(); } catch (Exception ignored) {} finally { wsClients.remove(entry.getKey()); } } for (Map.Entry entry : httpClients.entrySet()) { - try { entry.getValue().close(); } catch (Exception ignored) {} - finally { httpClients.remove(entry.getKey()); } + try { entry.getValue().close(); } catch (Exception ignored) {} finally { httpClients.remove(entry.getKey()); } } } @@ -491,7 +667,7 @@ public class MediaTransferFlvByJavacv extends MediaTransfer implements Runnable } } } catch (Exception e) { - e.printStackTrace(); + log.debug("send header err", e); } break; } @@ -499,10 +675,13 @@ public class MediaTransferFlvByJavacv extends MediaTransfer implements Runnable timeout += 50; if (timeout > 30000) break; } catch (Exception e) { - e.printStackTrace(); + log.debug("addClient err", e); } } } - @Override public void run() { transferStream2Flv(); } + @Override + public void run() { + transferStream2Flv(); + } }