@@ -0,0 +1,186 @@
package com.ruoyi.video.scheduler ;
import com.ruoyi.common.utils.DateUtils ;
import com.ruoyi.video.domain.InspectionTask ;
import com.ruoyi.video.mapper.InspectionTaskMapper ;
import com.ruoyi.video.service.InspectionTaskService ;
import lombok.extern.slf4j.Slf4j ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.scheduling.support.CronExpression ;
import org.springframework.stereotype.Component ;
import javax.annotation.PostConstruct ;
import javax.annotation.PreDestroy ;
import java.time.LocalDateTime ;
import java.time.ZoneId ;
import java.util.Date ;
import java.util.List ;
import java.util.Map ;
import java.util.Objects ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.Executors ;
import java.util.concurrent.ScheduledExecutorService ;
import java.util.concurrent.TimeUnit ;
/**
* 内部巡检任务调度器( 不依赖若依Quartz)
* - 周期性轮询 v_inspection_task 中启用(status='0')的任务
* - 使用 Spring CronExpression 计算下一次执行
* - 到点调用 InspectionTaskService.executeInspectionTask(taskId)
*/
@Slf4j
@Component
public class InspectionCronScheduler {
@Autowired
private InspectionTaskMapper inspectionTaskMapper ;
@Autowired
private InspectionTaskService inspectionTaskService ;
// 轮询调度线程池
private final ScheduledExecutorService poller = Executors . newSingleThreadScheduledExecutor ( r - > {
Thread t = new Thread ( r , " inspection-cron-poller " ) ;
t . setDaemon ( true ) ;
return t ;
} ) ;
// 触发去抖:记录最近一次触发时间,避免同一窗口重复触发
private final Map < Long , Long > lastTriggeredAt = new ConcurrentHashMap < > ( ) ;
// 最小触发间隔(毫秒),防重入
private static final long MIN_TRIGGER_GAP_MS = 5_000L ;
@PostConstruct
public void start ( ) {
// 延迟3秒启动, 之后每10秒轮询一次
poller . scheduleWithFixedDelay ( this : : tick , 3 , 10 , TimeUnit . SECONDS ) ;
log . info ( " InspectionCronScheduler started. " ) ;
}
@PreDestroy
public void shutdown ( ) {
try {
poller . shutdownNow ( ) ;
} catch ( Exception ignore ) {
}
log . info ( " InspectionCronScheduler stopped. " ) ;
}
private void tick ( ) {
try {
// 查询所有任务( 通过通用查询, 传空对象获取全量) , 避免被服务把status改为“2”后无法再次调度
List < InspectionTask > tasks = inspectionTaskMapper . selectInspectionTaskList ( new InspectionTask ( ) ) ;
if ( tasks = = null | | tasks . isEmpty ( ) ) {
return ;
}
LocalDateTime now = LocalDateTime . now ( ) ;
for ( InspectionTask task : tasks ) {
Long taskId = task . getTaskId ( ) ;
String cronStr = task . getCronExpression ( ) ;
String status = task . getStatus ( ) ;
if ( taskId = = null | | cronStr = = null | | cronStr . trim ( ) . isEmpty ( ) ) {
continue ;
}
// 跳过停用状态( 1=停用)。其余状态(0=启用, 2=服务执行后置状态)都按启用处理
if ( " 1 " . equals ( status ) ) {
continue ;
}
// 解析 cron
CronExpression cron ;
try {
cron = CronExpression . parse ( cronStr ) ;
} catch ( Exception e ) {
log . warn ( " 巡检任务cron无效, 跳过: taskId={}, cron={} " , taskId , cronStr ) ;
continue ;
}
// 计算下次执行时间(基于当前时刻)
LocalDateTime next = cron . next ( now ) ;
if ( next = = null ) {
// 无后续执行时间,跳过
continue ;
}
Date nextDate = Date . from ( next . atZone ( ZoneId . systemDefault ( ) ) . toInstant ( ) ) ;
// 如果数据库中的 nextExecuteTime 为空或不同步,则回填
try {
if ( task . getNextExecuteTime ( ) = = null | | ! Objects . equals ( task . getNextExecuteTime ( ) , nextDate ) ) {
InspectionTask patch = new InspectionTask ( ) ;
patch . setTaskId ( taskId ) ;
patch . setNextExecuteTime ( nextDate ) ;
patch . setUpdateTime ( DateUtils . getNowDate ( ) ) ;
inspectionTaskMapper . updateInspectionTask ( patch ) ;
}
} catch ( Exception e ) {
log . debug ( " 更新nextExecuteTime失败: taskId={}, err={} " , taskId , e . getMessage ( ) ) ;
}
// 若发现状态为“2”( 服务执行完成后置状态) , 将其规范化回“0”以维持启用
if ( " 2 " . equals ( status ) ) {
try {
InspectionTask patch = new InspectionTask ( ) ;
patch . setTaskId ( taskId ) ;
patch . setStatus ( " 0 " ) ;
patch . setUpdateTime ( DateUtils . getNowDate ( ) ) ;
inspectionTaskMapper . updateInspectionTask ( patch ) ;
task . setStatus ( " 0 " ) ;
} catch ( Exception e ) {
log . debug ( " 规范化任务状态失败: taskId={}, err={} " , taskId , e . getMessage ( ) ) ;
}
}
// 到点触发判定:如果 next <= now + 10s 窗口内,就触发一次
long nowMs = System . currentTimeMillis ( ) ;
long nextMs = nextDate . getTime ( ) ;
if ( nextMs - nowMs < = 10_000L ) {
// 防抖:避免同一时间窗口重复触发
Long last = lastTriggeredAt . get ( taskId ) ;
if ( last ! = null & & ( nowMs - last ) < MIN_TRIGGER_GAP_MS ) {
continue ;
}
lastTriggeredAt . put ( taskId , nowMs ) ;
// 回填最后执行时间(预写),并异步执行任务
try {
InspectionTask patch = new InspectionTask ( ) ;
patch . setTaskId ( taskId ) ;
patch . setLastExecuteTime ( new Date ( ) ) ;
patch . setUpdateTime ( DateUtils . getNowDate ( ) ) ;
inspectionTaskMapper . updateInspectionTask ( patch ) ;
} catch ( Exception e ) {
log . debug ( " 更新lastExecuteTime失败: taskId={}, err={} " , taskId , e . getMessage ( ) ) ;
}
log . info ( " 触发巡检任务执行: taskId={}, cron={}, next={} " , taskId , cronStr , next ) ;
try {
// 交给服务的异步方法执行
inspectionTaskService . executeInspectionTask ( taskId ) ;
} catch ( Exception e ) {
log . error ( " 执行巡检任务触发失败: taskId={}, err={} " , taskId , e . getMessage ( ) , e ) ;
}
// 延迟规范化状态为启用( 服务执行期间可能将其改为1或2)
try {
poller . schedule ( ( ) - > {
try {
InspectionTask patch2 = new InspectionTask ( ) ;
patch2 . setTaskId ( taskId ) ;
patch2 . setStatus ( " 0 " ) ;
patch2 . setUpdateTime ( DateUtils . getNowDate ( ) ) ;
inspectionTaskMapper . updateInspectionTask ( patch2 ) ;
} catch ( Exception ignored ) { }
} , 3 , TimeUnit . SECONDS ) ;
} catch ( Exception ignore ) { }
}
}
} catch ( Exception e ) {
log . error ( " InspectionCronScheduler tick 异常: {} " , e . getMessage ( ) , e ) ;
}
}
}