feat(video): 添加内部巡检任务调度器当点击启动的时候应该执行定时任务,此处不借用若依自带的定时任务 因为需要在系统监控里面加入定时任务才能使用多了一个步骤 除了status为1(停止)的其余都需要加入轮询
- 引入 javax.annotation-api 依赖以支持注解生命周期管理- 实现 InspectionCronScheduler 调度器组件 - 使用 ScheduledExecutorService 每10秒轮询一次巡检任务- 支持解析 Cron 表达式并计算下次执行时间 - 自动更新任务的下次执行时间和状态 - 添加触发去抖机制防止重复执行 - 异步调用 inspectionTaskService 执行具体任务 -任务执行后自动规范化状态为启用- 增加详细的日志记录和异常处理机制
This commit is contained in:
@@ -18,6 +18,12 @@
|
|||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>javax.annotation</groupId>
|
||||||
|
<artifactId>javax.annotation-api</artifactId>
|
||||||
|
<version>1.3.2</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- 通用工具 -->
|
<!-- 通用工具 -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.ruoyi</groupId>
|
<groupId>com.ruoyi</groupId>
|
||||||
|
|||||||
@@ -0,0 +1,186 @@
|
|||||||
|
package com.ruoyi.video.scheduler;
|
||||||
|
|
||||||
|
import com.ruoyi.common.utils.DateUtils;
|
||||||
|
import com.ruoyi.video.domain.InspectionTask;
|
||||||
|
import com.ruoyi.video.mapper.InspectionTaskMapper;
|
||||||
|
import com.ruoyi.video.service.InspectionTaskService;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.scheduling.support.CronExpression;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
|
import javax.annotation.PreDestroy;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.ZoneId;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 内部巡检任务调度器(不依赖若依Quartz)
|
||||||
|
* - 周期性轮询 v_inspection_task 中启用(status='0')的任务
|
||||||
|
* - 使用 Spring CronExpression 计算下一次执行
|
||||||
|
* - 到点调用 InspectionTaskService.executeInspectionTask(taskId)
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
public class InspectionCronScheduler {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private InspectionTaskMapper inspectionTaskMapper;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private InspectionTaskService inspectionTaskService;
|
||||||
|
|
||||||
|
// 轮询调度线程池
|
||||||
|
private final ScheduledExecutorService poller = Executors.newSingleThreadScheduledExecutor(r -> {
|
||||||
|
Thread t = new Thread(r, "inspection-cron-poller");
|
||||||
|
t.setDaemon(true);
|
||||||
|
return t;
|
||||||
|
});
|
||||||
|
|
||||||
|
// 触发去抖:记录最近一次触发时间,避免同一窗口重复触发
|
||||||
|
private final Map<Long, Long> lastTriggeredAt = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
// 最小触发间隔(毫秒),防重入
|
||||||
|
private static final long MIN_TRIGGER_GAP_MS = 5_000L;
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void start() {
|
||||||
|
// 延迟3秒启动,之后每10秒轮询一次
|
||||||
|
poller.scheduleWithFixedDelay(this::tick, 3, 10, TimeUnit.SECONDS);
|
||||||
|
log.info("InspectionCronScheduler started.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@PreDestroy
|
||||||
|
public void shutdown() {
|
||||||
|
try {
|
||||||
|
poller.shutdownNow();
|
||||||
|
} catch (Exception ignore) {
|
||||||
|
}
|
||||||
|
log.info("InspectionCronScheduler stopped.");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void tick() {
|
||||||
|
try {
|
||||||
|
// 查询所有任务(通过通用查询,传空对象获取全量),避免被服务把status改为“2”后无法再次调度
|
||||||
|
List<InspectionTask> tasks = inspectionTaskMapper.selectInspectionTaskList(new InspectionTask());
|
||||||
|
if (tasks == null || tasks.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
LocalDateTime now = LocalDateTime.now();
|
||||||
|
for (InspectionTask task : tasks) {
|
||||||
|
Long taskId = task.getTaskId();
|
||||||
|
String cronStr = task.getCronExpression();
|
||||||
|
String status = task.getStatus();
|
||||||
|
if (taskId == null || cronStr == null || cronStr.trim().isEmpty()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 跳过停用状态(1=停用)。其余状态(0=启用, 2=服务执行后置状态)都按启用处理
|
||||||
|
if ("1".equals(status)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 解析 cron
|
||||||
|
CronExpression cron;
|
||||||
|
try {
|
||||||
|
cron = CronExpression.parse(cronStr);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("巡检任务cron无效,跳过: taskId={}, cron={}", taskId, cronStr);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 计算下次执行时间(基于当前时刻)
|
||||||
|
LocalDateTime next = cron.next(now);
|
||||||
|
if (next == null) {
|
||||||
|
// 无后续执行时间,跳过
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
Date nextDate = Date.from(next.atZone(ZoneId.systemDefault()).toInstant());
|
||||||
|
|
||||||
|
// 如果数据库中的 nextExecuteTime 为空或不同步,则回填
|
||||||
|
try {
|
||||||
|
if (task.getNextExecuteTime() == null || !Objects.equals(task.getNextExecuteTime(), nextDate)) {
|
||||||
|
InspectionTask patch = new InspectionTask();
|
||||||
|
patch.setTaskId(taskId);
|
||||||
|
patch.setNextExecuteTime(nextDate);
|
||||||
|
patch.setUpdateTime(DateUtils.getNowDate());
|
||||||
|
inspectionTaskMapper.updateInspectionTask(patch);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.debug("更新nextExecuteTime失败: taskId={}, err={}", taskId, e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
// 若发现状态为“2”(服务执行完成后置状态),将其规范化回“0”以维持启用
|
||||||
|
if ("2".equals(status)) {
|
||||||
|
try {
|
||||||
|
InspectionTask patch = new InspectionTask();
|
||||||
|
patch.setTaskId(taskId);
|
||||||
|
patch.setStatus("0");
|
||||||
|
patch.setUpdateTime(DateUtils.getNowDate());
|
||||||
|
inspectionTaskMapper.updateInspectionTask(patch);
|
||||||
|
task.setStatus("0");
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.debug("规范化任务状态失败: taskId={}, err={}", taskId, e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 到点触发判定:如果 next <= now + 10s 窗口内,就触发一次
|
||||||
|
long nowMs = System.currentTimeMillis();
|
||||||
|
long nextMs = nextDate.getTime();
|
||||||
|
if (nextMs - nowMs <= 10_000L) {
|
||||||
|
// 防抖:避免同一时间窗口重复触发
|
||||||
|
Long last = lastTriggeredAt.get(taskId);
|
||||||
|
if (last != null && (nowMs - last) < MIN_TRIGGER_GAP_MS) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
lastTriggeredAt.put(taskId, nowMs);
|
||||||
|
|
||||||
|
// 回填最后执行时间(预写),并异步执行任务
|
||||||
|
try {
|
||||||
|
InspectionTask patch = new InspectionTask();
|
||||||
|
patch.setTaskId(taskId);
|
||||||
|
patch.setLastExecuteTime(new Date());
|
||||||
|
patch.setUpdateTime(DateUtils.getNowDate());
|
||||||
|
inspectionTaskMapper.updateInspectionTask(patch);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.debug("更新lastExecuteTime失败: taskId={}, err={}", taskId, e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("触发巡检任务执行: taskId={}, cron={}, next={}", taskId, cronStr, next);
|
||||||
|
try {
|
||||||
|
// 交给服务的异步方法执行
|
||||||
|
inspectionTaskService.executeInspectionTask(taskId);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("执行巡检任务触发失败: taskId={}, err={}", taskId, e.getMessage(), e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 延迟规范化状态为启用(服务执行期间可能将其改为1或2)
|
||||||
|
try {
|
||||||
|
poller.schedule(() -> {
|
||||||
|
try {
|
||||||
|
InspectionTask patch2 = new InspectionTask();
|
||||||
|
patch2.setTaskId(taskId);
|
||||||
|
patch2.setStatus("0");
|
||||||
|
patch2.setUpdateTime(DateUtils.getNowDate());
|
||||||
|
inspectionTaskMapper.updateInspectionTask(patch2);
|
||||||
|
} catch (Exception ignored) {}
|
||||||
|
}, 3, TimeUnit.SECONDS);
|
||||||
|
} catch (Exception ignore) {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("InspectionCronScheduler tick 异常: {}", e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user