This commit is contained in:
砂糖
2025-10-07 14:28:17 +08:00
3 changed files with 193 additions and 1 deletions

View File

@@ -18,6 +18,12 @@
</properties>
<dependencies>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
<!-- 通用工具 -->
<dependency>
<groupId>com.ruoyi</groupId>

View File

@@ -0,0 +1,186 @@
package com.ruoyi.video.scheduler;
import com.ruoyi.common.utils.DateUtils;
import com.ruoyi.video.domain.InspectionTask;
import com.ruoyi.video.mapper.InspectionTaskMapper;
import com.ruoyi.video.service.InspectionTaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.support.CronExpression;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 内部巡检任务调度器不依赖若依Quartz
* - 周期性轮询 v_inspection_task 中启用(status='0')的任务
* - 使用 Spring CronExpression 计算下一次执行
* - 到点调用 InspectionTaskService.executeInspectionTask(taskId)
*/
@Slf4j
@Component
public class InspectionCronScheduler {
@Autowired
private InspectionTaskMapper inspectionTaskMapper;
@Autowired
private InspectionTaskService inspectionTaskService;
// 轮询调度线程池
private final ScheduledExecutorService poller = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "inspection-cron-poller");
t.setDaemon(true);
return t;
});
// 触发去抖:记录最近一次触发时间,避免同一窗口重复触发
private final Map<Long, Long> lastTriggeredAt = new ConcurrentHashMap<>();
// 最小触发间隔(毫秒),防重入
private static final long MIN_TRIGGER_GAP_MS = 5_000L;
@PostConstruct
public void start() {
// 延迟3秒启动之后每10秒轮询一次
poller.scheduleWithFixedDelay(this::tick, 3, 10, TimeUnit.SECONDS);
log.info("InspectionCronScheduler started.");
}
@PreDestroy
public void shutdown() {
try {
poller.shutdownNow();
} catch (Exception ignore) {
}
log.info("InspectionCronScheduler stopped.");
}
private void tick() {
try {
// 查询所有任务通过通用查询传空对象获取全量避免被服务把status改为“2”后无法再次调度
List<InspectionTask> tasks = inspectionTaskMapper.selectInspectionTaskList(new InspectionTask());
if (tasks == null || tasks.isEmpty()) {
return;
}
LocalDateTime now = LocalDateTime.now();
for (InspectionTask task : tasks) {
Long taskId = task.getTaskId();
String cronStr = task.getCronExpression();
String status = task.getStatus();
if (taskId == null || cronStr == null || cronStr.trim().isEmpty()) {
continue;
}
// 跳过停用状态1=停用)。其余状态(0=启用, 2=服务执行后置状态)都按启用处理
if ("1".equals(status)) {
continue;
}
// 解析 cron
CronExpression cron;
try {
cron = CronExpression.parse(cronStr);
} catch (Exception e) {
log.warn("巡检任务cron无效跳过: taskId={}, cron={}", taskId, cronStr);
continue;
}
// 计算下次执行时间(基于当前时刻)
LocalDateTime next = cron.next(now);
if (next == null) {
// 无后续执行时间,跳过
continue;
}
Date nextDate = Date.from(next.atZone(ZoneId.systemDefault()).toInstant());
// 如果数据库中的 nextExecuteTime 为空或不同步,则回填
try {
if (task.getNextExecuteTime() == null || !Objects.equals(task.getNextExecuteTime(), nextDate)) {
InspectionTask patch = new InspectionTask();
patch.setTaskId(taskId);
patch.setNextExecuteTime(nextDate);
patch.setUpdateTime(DateUtils.getNowDate());
inspectionTaskMapper.updateInspectionTask(patch);
}
} catch (Exception e) {
log.debug("更新nextExecuteTime失败: taskId={}, err={}", taskId, e.getMessage());
}
// 若发现状态为“2”服务执行完成后置状态将其规范化回“0”以维持启用
if ("2".equals(status)) {
try {
InspectionTask patch = new InspectionTask();
patch.setTaskId(taskId);
patch.setStatus("0");
patch.setUpdateTime(DateUtils.getNowDate());
inspectionTaskMapper.updateInspectionTask(patch);
task.setStatus("0");
} catch (Exception e) {
log.debug("规范化任务状态失败: taskId={}, err={}", taskId, e.getMessage());
}
}
// 到点触发判定:如果 next <= now + 10s 窗口内,就触发一次
long nowMs = System.currentTimeMillis();
long nextMs = nextDate.getTime();
if (nextMs - nowMs <= 10_000L) {
// 防抖:避免同一时间窗口重复触发
Long last = lastTriggeredAt.get(taskId);
if (last != null && (nowMs - last) < MIN_TRIGGER_GAP_MS) {
continue;
}
lastTriggeredAt.put(taskId, nowMs);
// 回填最后执行时间(预写),并异步执行任务
try {
InspectionTask patch = new InspectionTask();
patch.setTaskId(taskId);
patch.setLastExecuteTime(new Date());
patch.setUpdateTime(DateUtils.getNowDate());
inspectionTaskMapper.updateInspectionTask(patch);
} catch (Exception e) {
log.debug("更新lastExecuteTime失败: taskId={}, err={}", taskId, e.getMessage());
}
log.info("触发巡检任务执行: taskId={}, cron={}, next={}", taskId, cronStr, next);
try {
// 交给服务的异步方法执行
inspectionTaskService.executeInspectionTask(taskId);
} catch (Exception e) {
log.error("执行巡检任务触发失败: taskId={}, err={}", taskId, e.getMessage(), e);
}
// 延迟规范化状态为启用服务执行期间可能将其改为1或2
try {
poller.schedule(() -> {
try {
InspectionTask patch2 = new InspectionTask();
patch2.setTaskId(taskId);
patch2.setStatus("0");
patch2.setUpdateTime(DateUtils.getNowDate());
inspectionTaskMapper.updateInspectionTask(patch2);
} catch (Exception ignored) {}
}, 3, TimeUnit.SECONDS);
} catch (Exception ignore) {}
}
}
} catch (Exception e) {
log.error("InspectionCronScheduler tick 异常: {}", e.getMessage(), e);
}
}
}

View File

@@ -66,7 +66,7 @@ public class VideoAnalysisService {
private com.ruoyi.video.mapper.InspectionTaskRecordMapper inspectionTaskRecordMapper;
// 检测器配置 - 使用容器名而不是localhost
private static final String PYTHON_API_URL = "http://rtsp-python-service:8000/api/detect/file";
private static final String PYTHON_API_URL = "http://rtsp-python-service:10083/api/detect/file";
private static final String MODEL_NAME = "yolov8_detector";
/**