diff --git a/ruoyi-video/pom.xml b/ruoyi-video/pom.xml index 248cc58..79ebf93 100644 --- a/ruoyi-video/pom.xml +++ b/ruoyi-video/pom.xml @@ -18,6 +18,12 @@ + + javax.annotation + javax.annotation-api + 1.3.2 + + com.ruoyi diff --git a/ruoyi-video/src/main/java/com/ruoyi/video/scheduler/InspectionCronScheduler.java b/ruoyi-video/src/main/java/com/ruoyi/video/scheduler/InspectionCronScheduler.java new file mode 100644 index 0000000..5eec258 --- /dev/null +++ b/ruoyi-video/src/main/java/com/ruoyi/video/scheduler/InspectionCronScheduler.java @@ -0,0 +1,186 @@ +package com.ruoyi.video.scheduler; + +import com.ruoyi.common.utils.DateUtils; +import com.ruoyi.video.domain.InspectionTask; +import com.ruoyi.video.mapper.InspectionTaskMapper; +import com.ruoyi.video.service.InspectionTaskService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.support.CronExpression; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * 内部巡检任务调度器(不依赖若依Quartz) + * - 周期性轮询 v_inspection_task 中启用(status='0')的任务 + * - 使用 Spring CronExpression 计算下一次执行 + * - 到点调用 InspectionTaskService.executeInspectionTask(taskId) + */ +@Slf4j +@Component +public class InspectionCronScheduler { + + @Autowired + private InspectionTaskMapper inspectionTaskMapper; + + @Autowired + private InspectionTaskService inspectionTaskService; + + // 轮询调度线程池 + private final ScheduledExecutorService poller = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "inspection-cron-poller"); + t.setDaemon(true); + return t; + }); + + // 触发去抖:记录最近一次触发时间,避免同一窗口重复触发 + private final Map lastTriggeredAt = new ConcurrentHashMap<>(); + + // 最小触发间隔(毫秒),防重入 + private static final long MIN_TRIGGER_GAP_MS = 5_000L; + + @PostConstruct + public void start() { + // 延迟3秒启动,之后每10秒轮询一次 + poller.scheduleWithFixedDelay(this::tick, 3, 10, TimeUnit.SECONDS); + log.info("InspectionCronScheduler started."); + } + + @PreDestroy + public void shutdown() { + try { + poller.shutdownNow(); + } catch (Exception ignore) { + } + log.info("InspectionCronScheduler stopped."); + } + + private void tick() { + try { + // 查询所有任务(通过通用查询,传空对象获取全量),避免被服务把status改为“2”后无法再次调度 + List tasks = inspectionTaskMapper.selectInspectionTaskList(new InspectionTask()); + if (tasks == null || tasks.isEmpty()) { + return; + } + + LocalDateTime now = LocalDateTime.now(); + for (InspectionTask task : tasks) { + Long taskId = task.getTaskId(); + String cronStr = task.getCronExpression(); + String status = task.getStatus(); + if (taskId == null || cronStr == null || cronStr.trim().isEmpty()) { + continue; + } + + // 跳过停用状态(1=停用)。其余状态(0=启用, 2=服务执行后置状态)都按启用处理 + if ("1".equals(status)) { + continue; + } + + // 解析 cron + CronExpression cron; + try { + cron = CronExpression.parse(cronStr); + } catch (Exception e) { + log.warn("巡检任务cron无效,跳过: taskId={}, cron={}", taskId, cronStr); + continue; + } + + // 计算下次执行时间(基于当前时刻) + LocalDateTime next = cron.next(now); + if (next == null) { + // 无后续执行时间,跳过 + continue; + } + + Date nextDate = Date.from(next.atZone(ZoneId.systemDefault()).toInstant()); + + // 如果数据库中的 nextExecuteTime 为空或不同步,则回填 + try { + if (task.getNextExecuteTime() == null || !Objects.equals(task.getNextExecuteTime(), nextDate)) { + InspectionTask patch = new InspectionTask(); + patch.setTaskId(taskId); + patch.setNextExecuteTime(nextDate); + patch.setUpdateTime(DateUtils.getNowDate()); + inspectionTaskMapper.updateInspectionTask(patch); + } + } catch (Exception e) { + log.debug("更新nextExecuteTime失败: taskId={}, err={}", taskId, e.getMessage()); + } + + // 若发现状态为“2”(服务执行完成后置状态),将其规范化回“0”以维持启用 + if ("2".equals(status)) { + try { + InspectionTask patch = new InspectionTask(); + patch.setTaskId(taskId); + patch.setStatus("0"); + patch.setUpdateTime(DateUtils.getNowDate()); + inspectionTaskMapper.updateInspectionTask(patch); + task.setStatus("0"); + } catch (Exception e) { + log.debug("规范化任务状态失败: taskId={}, err={}", taskId, e.getMessage()); + } + } + + // 到点触发判定:如果 next <= now + 10s 窗口内,就触发一次 + long nowMs = System.currentTimeMillis(); + long nextMs = nextDate.getTime(); + if (nextMs - nowMs <= 10_000L) { + // 防抖:避免同一时间窗口重复触发 + Long last = lastTriggeredAt.get(taskId); + if (last != null && (nowMs - last) < MIN_TRIGGER_GAP_MS) { + continue; + } + + lastTriggeredAt.put(taskId, nowMs); + + // 回填最后执行时间(预写),并异步执行任务 + try { + InspectionTask patch = new InspectionTask(); + patch.setTaskId(taskId); + patch.setLastExecuteTime(new Date()); + patch.setUpdateTime(DateUtils.getNowDate()); + inspectionTaskMapper.updateInspectionTask(patch); + } catch (Exception e) { + log.debug("更新lastExecuteTime失败: taskId={}, err={}", taskId, e.getMessage()); + } + + log.info("触发巡检任务执行: taskId={}, cron={}, next={}", taskId, cronStr, next); + try { + // 交给服务的异步方法执行 + inspectionTaskService.executeInspectionTask(taskId); + } catch (Exception e) { + log.error("执行巡检任务触发失败: taskId={}, err={}", taskId, e.getMessage(), e); + } + + // 延迟规范化状态为启用(服务执行期间可能将其改为1或2) + try { + poller.schedule(() -> { + try { + InspectionTask patch2 = new InspectionTask(); + patch2.setTaskId(taskId); + patch2.setStatus("0"); + patch2.setUpdateTime(DateUtils.getNowDate()); + inspectionTaskMapper.updateInspectionTask(patch2); + } catch (Exception ignored) {} + }, 3, TimeUnit.SECONDS); + } catch (Exception ignore) {} + } + } + } catch (Exception e) { + log.error("InspectionCronScheduler tick 异常: {}", e.getMessage(), e); + } + } +}