二级后端添加数据快照修正。前端添加数字孪生
This commit is contained in:
@@ -8,7 +8,7 @@ import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 设备数据快照定时任务
|
||||
* 每10分钟执行一次设备数据快照
|
||||
* 每5分钟执行一次设备数据快照
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@@ -18,23 +18,23 @@ public class DeviceSnapshotSchedule {
|
||||
private final DeviceSnapshotService deviceSnapshotService;
|
||||
|
||||
/**
|
||||
* 每10分钟执行一次设备数据快照
|
||||
* 使用cron表达式:0 0/10 * * * ?
|
||||
* 表示每10分钟执行一次,在每分钟的0秒触发
|
||||
* 每5分钟执行一次设备数据快照
|
||||
* 使用cron表达式:0 0/5 * * * ?
|
||||
* 表示每5分钟执行一次,在每分钟的0秒触发
|
||||
*/
|
||||
@Scheduled(cron = "0 0/10 * * * ?")
|
||||
@Scheduled(cron = "0 0/5 * * * ?")
|
||||
public void captureDeviceSnapshots() {
|
||||
try {
|
||||
log.info("开始执行设备数据快照任务...");
|
||||
log.info("========== 开始执行设备数据快照任务 ==========");
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
// 创建设备数据快照
|
||||
deviceSnapshotService.createDeviceSnapshots();
|
||||
|
||||
long endTime = System.currentTimeMillis();
|
||||
log.info("设备数据快照任务执行完成,耗时:{}ms", (endTime - startTime));
|
||||
log.info("========== 设备数据快照任务执行完成,耗时:{}ms ==========", (endTime - startTime));
|
||||
} catch (Exception e) {
|
||||
log.error("执行设备数据快照任务失败: {}", e.getMessage(), e);
|
||||
log.error("========== 执行设备数据快照任务失败: {} ==========", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,6 +38,11 @@ public class DeviceSnapshotService {
|
||||
private final DeviceSnapshotMapper deviceSnapshotMapper;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
/**
|
||||
* 最近一次成功创建设备快照的时间(用于兜底触发,避免完全依赖调度器)
|
||||
*/
|
||||
private volatile LocalDateTime lastSnapshotTime;
|
||||
|
||||
/**
|
||||
* 更新最新的测量数据
|
||||
* @param message 测量数据消息
|
||||
@@ -45,6 +50,25 @@ public class DeviceSnapshotService {
|
||||
public void updateLatestMeasurement(OpcMessage message) {
|
||||
if (message != null) {
|
||||
latestMeasurements.put(message.getClass(), message);
|
||||
log.trace("更新测量数据缓存: {}", message.getClass().getSimpleName());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 由实时测量处理链路兜底触发快照创建:
|
||||
* - 与定时任务互不冲突
|
||||
* - 仅当距离上次快照 >= 5 分钟时才真正落库
|
||||
*/
|
||||
public void captureSnapshotIfNeeded() {
|
||||
try {
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
LocalDateTime last = lastSnapshotTime;
|
||||
if (last == null || java.time.Duration.between(last, now).toMinutes() >= 5) {
|
||||
createDeviceSnapshots();
|
||||
lastSnapshotTime = now;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("兜底创建设备数据快照失败: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,6 +80,7 @@ public class DeviceSnapshotService {
|
||||
try {
|
||||
// 获取当前时间戳
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
log.info("开始创建设备快照,当前时间: {}, latestMeasurements缓存大小: {}", now, latestMeasurements.size());
|
||||
|
||||
// 获取最新的测量数据
|
||||
AppMeasureEntryMessage entry = (AppMeasureEntryMessage) latestMeasurements.get(AppMeasureEntryMessage.class);
|
||||
@@ -63,30 +88,56 @@ public class DeviceSnapshotService {
|
||||
AppMeasureCoatMessage coat = (AppMeasureCoatMessage) latestMeasurements.get(AppMeasureCoatMessage.class);
|
||||
AppMeasureExitMessage exit = (AppMeasureExitMessage) latestMeasurements.get(AppMeasureExitMessage.class);
|
||||
|
||||
log.info("测量数据状态 - entry: {}, furnace: {}, coat: {}, exit: {}",
|
||||
entry != null ? "有数据" : "null",
|
||||
furnace != null ? "有数据" : "null",
|
||||
coat != null ? "有数据" : "null",
|
||||
exit != null ? "有数据" : "null");
|
||||
|
||||
if (entry == null && furnace == null && coat == null && exit == null) {
|
||||
log.warn("没有可用的测量数据,跳过快照创建");
|
||||
log.warn("没有可用的测量数据,跳过快照创建。latestMeasurements缓存内容: {}", latestMeasurements.keySet());
|
||||
return;
|
||||
}
|
||||
|
||||
// 为每个设备创建快照
|
||||
List<DeviceSnapshot> snapshots = new ArrayList<>();
|
||||
int deviceCount = 0;
|
||||
for (DeviceEnum device : DeviceEnum.values()) {
|
||||
deviceCount++;
|
||||
try {
|
||||
DeviceSnapshot snapshot = createDeviceSnapshot(device, now, entry, furnace, coat, exit);
|
||||
if (snapshot != null) {
|
||||
snapshots.add(snapshot);
|
||||
log.debug("设备[{}]快照创建成功", device.name());
|
||||
} else {
|
||||
log.debug("设备[{}]快照返回null(可能没有对应数据)", device.name());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("创建设备[{}]快照失败: {}", device.name(), e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
log.info("共处理{}个设备,成功创建{}个快照", deviceCount, snapshots.size());
|
||||
|
||||
// 批量保存快照
|
||||
if (!snapshots.isEmpty()) {
|
||||
int savedCount = 0;
|
||||
for (DeviceSnapshot snapshot : snapshots) {
|
||||
deviceSnapshotMapper.insert(snapshot);
|
||||
try {
|
||||
int result = deviceSnapshotMapper.insert(snapshot);
|
||||
if (result > 0) {
|
||||
savedCount++;
|
||||
log.debug("设备快照保存成功: deviceCode={}, createTime={}",
|
||||
snapshot.getDeviceCode(), snapshot.getCreateTime());
|
||||
} else {
|
||||
log.warn("设备快照保存失败(insert返回0): deviceCode={}", snapshot.getDeviceCode());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("保存设备快照到数据库失败: deviceCode={}, error={}",
|
||||
snapshot.getDeviceCode(), e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
log.info("成功创建{}个设备数据快照", snapshots.size());
|
||||
log.info("成功保存{}个设备数据快照到数据库", savedCount);
|
||||
|
||||
// 推送所有设备的历史趋势数据
|
||||
pushAllDevicesHistoryTrend();
|
||||
@@ -94,8 +145,10 @@ public class DeviceSnapshotService {
|
||||
// 推送设备统计数据
|
||||
pushAllChartData();
|
||||
|
||||
// 推送每个设备每个字段的趋势数据(供前端画“点位趋势小图”)
|
||||
// 推送每个设备每个字段的趋势数据(供前端画"点位趋势小图")
|
||||
pushAllDeviceFieldTrends();
|
||||
} else {
|
||||
log.warn("没有可保存的快照数据,所有设备快照创建都返回null");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("创建设备数据快照失败: {}", e.getMessage(), e);
|
||||
@@ -707,38 +760,55 @@ public class DeviceSnapshotService {
|
||||
snapshot.setSourceType(device.getSourceType().name());
|
||||
snapshot.setBasePosition(device.getBasePosition());
|
||||
|
||||
List<String> paramFields = device.getParamFields();
|
||||
log.debug("设备[{}]参数字段列表: {}", device.name(), paramFields);
|
||||
|
||||
// 根据设备类型获取对应的测量数据
|
||||
ObjectNode dataNode = objectMapper.createObjectNode();
|
||||
switch (device.getSourceType()) {
|
||||
case ENTRY:
|
||||
if (entry != null) {
|
||||
extractData(entry, device.getParamFields(), dataNode);
|
||||
extractData(entry, paramFields, dataNode);
|
||||
log.debug("设备[{}]从ENTRY提取数据,dataNode大小: {}", device.name(), dataNode.size());
|
||||
} else {
|
||||
log.debug("设备[{}]需要ENTRY数据,但entry为null", device.name());
|
||||
}
|
||||
break;
|
||||
case FURNACE:
|
||||
if (furnace != null) {
|
||||
extractData(furnace, device.getParamFields(), dataNode);
|
||||
extractData(furnace, paramFields, dataNode);
|
||||
log.debug("设备[{}]从FURNACE提取数据,dataNode大小: {}", device.name(), dataNode.size());
|
||||
} else {
|
||||
log.debug("设备[{}]需要FURNACE数据,但furnace为null", device.name());
|
||||
}
|
||||
break;
|
||||
case COAT:
|
||||
if (coat != null) {
|
||||
extractData(coat, device.getParamFields(), dataNode);
|
||||
extractData(coat, paramFields, dataNode);
|
||||
log.debug("设备[{}]从COAT提取数据,dataNode大小: {}", device.name(), dataNode.size());
|
||||
} else {
|
||||
log.debug("设备[{}]需要COAT数据,但coat为null", device.name());
|
||||
}
|
||||
break;
|
||||
case EXIT:
|
||||
if (exit != null) {
|
||||
extractData(exit, device.getParamFields(), dataNode);
|
||||
extractData(exit, paramFields, dataNode);
|
||||
log.debug("设备[{}]从EXIT提取数据,dataNode大小: {}", device.name(), dataNode.size());
|
||||
} else {
|
||||
log.debug("设备[{}]需要EXIT数据,但exit为null", device.name());
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
// 如果没有数据,则跳过
|
||||
if (dataNode.isEmpty()) {
|
||||
log.debug("设备[{}]快照数据为空,跳过快照创建", device.name());
|
||||
return null;
|
||||
}
|
||||
|
||||
// 设置快照数据
|
||||
snapshot.setSnapshotData(dataNode.toString());
|
||||
log.debug("设备[{}]快照创建成功,数据长度: {}", device.name(), dataNode.toString().length());
|
||||
return snapshot;
|
||||
} catch (Exception e) {
|
||||
log.error("创建设备[{}]快照时发生错误: {}", device.name(), e.getMessage(), e);
|
||||
@@ -751,9 +821,13 @@ public class DeviceSnapshotService {
|
||||
*/
|
||||
private void extractData(Object message, List<String> fields, ObjectNode dataNode) {
|
||||
if (message == null || fields == null || fields.isEmpty()) {
|
||||
log.debug("extractData参数无效: message={}, fields={}",
|
||||
message != null ? message.getClass().getSimpleName() : "null",
|
||||
fields != null ? fields.size() : "null");
|
||||
return;
|
||||
}
|
||||
|
||||
int extractedCount = 0;
|
||||
try {
|
||||
for (String field : fields) {
|
||||
try {
|
||||
@@ -766,19 +840,34 @@ public class DeviceSnapshotService {
|
||||
if (value != null) {
|
||||
if (value instanceof Integer) {
|
||||
dataNode.put(field, (Integer) value);
|
||||
extractedCount++;
|
||||
} else if (value instanceof Long) {
|
||||
dataNode.put(field, (Long) value);
|
||||
extractedCount++;
|
||||
} else if (value instanceof Float) {
|
||||
dataNode.put(field, (Float) value);
|
||||
extractedCount++;
|
||||
} else if (value instanceof Double) {
|
||||
dataNode.put(field, (Double) value);
|
||||
extractedCount++;
|
||||
} else if (value instanceof Boolean) {
|
||||
dataNode.put(field, (Boolean) value);
|
||||
extractedCount++;
|
||||
} else if (value instanceof String) {
|
||||
dataNode.put(field, (String) value);
|
||||
extractedCount++;
|
||||
} else if (value instanceof Enum) {
|
||||
dataNode.put(field, value.toString());
|
||||
extractedCount++;
|
||||
} else if (value instanceof java.math.BigDecimal) {
|
||||
// 处理BigDecimal类型(很多测量字段是BigDecimal)
|
||||
dataNode.put(field, ((java.math.BigDecimal) value).doubleValue());
|
||||
extractedCount++;
|
||||
} else {
|
||||
log.debug("字段[{}]类型[{}]未处理,值: {}", field, value.getClass().getSimpleName(), value);
|
||||
}
|
||||
} else {
|
||||
log.trace("字段[{}]值为null", field);
|
||||
}
|
||||
} catch (NoSuchFieldException e) {
|
||||
log.trace("字段[{}]在消息[{}]中不存在", field, message.getClass().getSimpleName());
|
||||
@@ -786,6 +875,8 @@ public class DeviceSnapshotService {
|
||||
log.warn("获取字段[{}]值失败: {}", field, e.getMessage());
|
||||
}
|
||||
}
|
||||
log.debug("从[{}]提取了{}个字段,共{}个字段",
|
||||
message.getClass().getSimpleName(), extractedCount, fields.size());
|
||||
} catch (Exception e) {
|
||||
log.error("提取数据时发生错误: {}", e.getMessage(), e);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user