diff --git a/business/src/main/java/com/fizz/business/comm/OPC/OpcMessageSend.java b/business/src/main/java/com/fizz/business/comm/OPC/OpcMessageSend.java index 92e25b9..334cb30 100644 --- a/business/src/main/java/com/fizz/business/comm/OPC/OpcMessageSend.java +++ b/business/src/main/java/com/fizz/business/comm/OPC/OpcMessageSend.java @@ -14,6 +14,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import static com.fizz.business.service.manager.OpcMessageIdsManager.findIdentifierByFieldName; import static com.fizz.business.service.manager.OpcMessageIdsManager.pdiSetupIds; @Service @@ -62,56 +63,116 @@ public class OpcMessageSend { } /** - * 通用写入方法,用于向指定 OPC 节点写入一个值 - * @param address OPC 节点地址 (e.g., "ns=2;s=ProcessCGL.PLCLine.ExitCut.cutLength") + * 通用写数据方法:通过字段名向指定点位写入单个值 + * @param fieldName 字段名(对象属性名),例如:coilId * @param value 要写入的值 + * @return 是否写入成功 */ - public void writeNode(String address, Object value) { + public boolean writeDataByFieldName(String fieldName, Object value) { try { + String identifier = findIdentifierByFieldName(fieldName); + if (identifier == null) { + log.error("未找到字段名对应的 OPC 节点路径,fieldName={}", fieldName); + return false; + } List entities = new ArrayList<>(); entities.add(ReadWriteEntity.builder() - .identifier(address) + .identifier(identifier) .value(value) .build()); miloService.writeToOpcUa(entities); - log.info("写入 OPC 成功, node={}, value={}", address, value); + log.info("写入 OPC 数据成功,fieldName={}, identifier={}, value={}", fieldName, identifier, value); + return true; } catch (Exception e) { - log.error("写入 OPC 失败, node={}, value={}, 原因: {}", address, value, e.getMessage(), e); - // 抛出运行时异常,以便上层调用者(如 SendJobServiceImpl)可以捕获并处理失败状态 - throw new RuntimeException("写入 OPC 失败: " + e.getMessage(), e); + log.error("写入 OPC 数据失败,fieldName={}, value={},原因:{}", fieldName, value, e.getMessage(), e); + return false; } } /** - * 写入 OPC 节点(增强版):根据字符串值尝试转换为数值/布尔,再写入 - * 规则: - * 1) 先尝试转 Integer - * 2) 再尝试转 Double - * 3) 再尝试转 Boolean(true/false/1/0) - * 4) 都不行则按原字符串写入 + * 通用写数据方法:向指定点位写入单个值(直接使用节点路径) + * @param identifier OPC 点位标识符(节点路径) + * @param value 要写入的值 + * @return 是否写入成功 */ - public void writeNode(String address, String valueRaw) { - Object v = valueRaw; - if (valueRaw != null) { - String s = valueRaw.trim(); - // boolean - if ("1".equals(s) || "0".equals(s) || "true".equalsIgnoreCase(s) || "false".equalsIgnoreCase(s)) { - v = ("1".equals(s) || "true".equalsIgnoreCase(s)); - } else { - // int - try { - v = Integer.parseInt(s); - } catch (Exception ignore) { - // double - try { - v = Double.parseDouble(s); - } catch (Exception ignore2) { - v = valueRaw; - } - } - } + public boolean writeData(String identifier, Object value) { + try { + List entities = new ArrayList<>(); + entities.add(ReadWriteEntity.builder() + .identifier(identifier) + .value(value) + .build()); + miloService.writeToOpcUa(entities); + log.info("写入 OPC 数据成功,identifier={}, value={}", identifier, value); + return true; + } catch (Exception e) { + log.error("写入 OPC 数据失败,identifier={}, value={},原因:{}", identifier, value, e.getMessage(), e); + return false; + } + } + + /** + * 批量写数据方法:通过字段名向多个点位写入数据 + * @param fieldDataMap 字段名和值的映射,key 为 fieldName,value 为要写入的值 + * @return 是否全部写入成功 + */ + public boolean batchWriteDataByFieldName(Map fieldDataMap) { + try { + if (fieldDataMap == null || fieldDataMap.isEmpty()) { + log.warn("批量写数据:数据映射为空,跳过写入"); + return false; + } + List entities = new ArrayList<>(); + for (Map.Entry entry : fieldDataMap.entrySet()) { + String fieldName = entry.getKey(); + String identifier = findIdentifierByFieldName(fieldName); + if (identifier == null) { + log.warn("未找到字段名对应的 OPC 节点路径,fieldName={},跳过该字段", fieldName); + continue; + } + entities.add(ReadWriteEntity.builder() + .identifier(identifier) + .value(entry.getValue()) + .build()); + } + if (entities.isEmpty()) { + log.warn("批量写数据:没有有效的字段映射,跳过写入"); + return false; + } + miloService.writeToOpcUa(entities); + log.info("批量写入 OPC 数据成功,共 {} 个点位", entities.size()); + return true; + } catch (Exception e) { + log.error("批量写入 OPC 数据失败,原因:{}", e.getMessage(), e); + return false; + } + } + + /** + * 批量写数据方法:向多个点位写入数据(直接使用节点路径) + * @param dataMap 点位标识符和值的映射,key 为 identifier,value 为要写入的值 + * @return 是否全部写入成功 + */ + public boolean batchWriteData(Map dataMap) { + try { + if (dataMap == null || dataMap.isEmpty()) { + log.warn("批量写数据:数据映射为空,跳过写入"); + return false; + } + List entities = new ArrayList<>(); + for (Map.Entry entry : dataMap.entrySet()) { + entities.add(ReadWriteEntity.builder() + .identifier(entry.getKey()) + .value(entry.getValue()) + .build()); + } + miloService.writeToOpcUa(entities); + log.info("批量写入 OPC 数据成功,共 {} 个点位", entities.size()); + return true; + } catch (Exception e) { + log.error("批量写入 OPC 数据失败,原因:{}", e.getMessage(), e); + return false; } - writeNode(address, v); } private List getWriteEntities(OpcMessage msg, Map msgIds) { @@ -129,7 +190,7 @@ public class OpcMessageSend { } catch (NoSuchFieldException | IllegalAccessException e) { // 处理字段不存在或访问异常,可记录日志或设置默认值 e.printStackTrace(); - return new ArrayList<>(); + return new ArrayList<>(); } ReadWriteEntity entity = ReadWriteEntity.builder() diff --git a/business/src/main/java/com/fizz/business/constants/enums/WsTypeEnum.java b/business/src/main/java/com/fizz/business/constants/enums/WsTypeEnum.java index 51b264a..47582fe 100644 --- a/business/src/main/java/com/fizz/business/constants/enums/WsTypeEnum.java +++ b/business/src/main/java/com/fizz/business/constants/enums/WsTypeEnum.java @@ -17,7 +17,7 @@ import java.util.Map; @Getter @AllArgsConstructor public enum WsTypeEnum { - alarm, track_position, track_measure, track_signal, track_matmap,calc_setup_result; + alarm, track_position, track_measure, track_signal, track_matmap, calc_setup_result, device_history_trend, device_chart_data, device_field_trend; private static final Map MAP = new HashMap<>(8); diff --git a/business/src/main/java/com/fizz/business/controller/DeviceSnapshotController.java b/business/src/main/java/com/fizz/business/controller/DeviceSnapshotController.java new file mode 100644 index 0000000..cd1652d --- /dev/null +++ b/business/src/main/java/com/fizz/business/controller/DeviceSnapshotController.java @@ -0,0 +1,62 @@ +package com.fizz.business.controller; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.fizz.business.domain.DeviceSnapshot; +import com.fizz.business.mapper.DeviceSnapshotMapper; +import com.ruoyi.common.annotation.Anonymous; +import com.ruoyi.common.core.domain.R; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.time.LocalDateTime; +import java.util.List; + +@RestController +@RequestMapping("/api/deviceSnapshot") +@Tag(name = "设备数据快照") +@Anonymous +@RequiredArgsConstructor +public class DeviceSnapshotController { + + private final DeviceSnapshotMapper deviceSnapshotMapper; + + @GetMapping("/latest") + @Operation(description = "获取最新N条设备快照(可按deviceCode过滤)") + public R> latest(@RequestParam(value = "limit", defaultValue = "60") Integer limit, + @RequestParam(value = "deviceCode", required = false) String deviceCode) { + LambdaQueryWrapper qw = new LambdaQueryWrapper() + .orderByDesc(DeviceSnapshot::getCreateTime) + .last("limit " + Math.max(1, Math.min(limit, 500))); + + if (deviceCode != null && !deviceCode.isEmpty()) { + qw.eq(DeviceSnapshot::getDeviceCode, deviceCode); + } + + return R.ok(deviceSnapshotMapper.selectList(qw)); + } + + @GetMapping("/range") + @Operation(description = "按时间范围查询设备快照(可按deviceCode过滤)") + public R> range(@RequestParam("start") String start, + @RequestParam("end") String end, + @RequestParam(value = "deviceCode", required = false) String deviceCode) { + LocalDateTime startTime = LocalDateTime.parse(start); + LocalDateTime endTime = LocalDateTime.parse(end); + + LambdaQueryWrapper qw = new LambdaQueryWrapper() + .between(DeviceSnapshot::getCreateTime, startTime, endTime) + .orderByAsc(DeviceSnapshot::getCreateTime); + + if (deviceCode != null && !deviceCode.isEmpty()) { + qw.eq(DeviceSnapshot::getDeviceCode, deviceCode); + } + + return R.ok(deviceSnapshotMapper.selectList(qw)); + } +} + diff --git a/business/src/main/java/com/fizz/business/controller/OpcDataController.java b/business/src/main/java/com/fizz/business/controller/OpcDataController.java new file mode 100644 index 0000000..29d6016 --- /dev/null +++ b/business/src/main/java/com/fizz/business/controller/OpcDataController.java @@ -0,0 +1,69 @@ +package com.fizz.business.controller; + +import com.fizz.business.form.OpcBatchWriteDataForm; +import com.fizz.business.form.OpcWriteDataForm; +import com.fizz.business.service.OpcDataService; +import com.ruoyi.common.annotation.Anonymous; +import com.ruoyi.common.core.domain.R; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.web.bind.annotation.*; + +import javax.validation.Valid; + +/** + * OPC 数据读写控制器 + */ +@Slf4j +@RestController +@RequiredArgsConstructor +@RequestMapping("/api/opc/data") +@Tag(name = "OPC 数据读写") +@ConditionalOnProperty(prefix = "kangaroohy.milo", name = "enabled", havingValue = "true") +@Anonymous +public class OpcDataController { + + private final OpcDataService opcDataService; + + /** + * 向单个点位写入数据(通过字段名) + */ + @Operation(summary = "向单个点位写入数据", description = "通过字段名向指定的 OPC 点位写入数据") + @PostMapping("/write") + public R writeData(@Valid @RequestBody OpcWriteDataForm form) { + try { + boolean success = opcDataService.writeData(form); + if (success) { + return R.ok("写入成功"); + } else { + return R.fail("写入失败,可能是字段名未找到对应的 OPC 节点路径,请查看日志"); + } + } catch (Exception e) { + log.error("写入 OPC 数据异常", e); + return R.fail("写入异常:" + e.getMessage()); + } + } + + /** + * 批量向多个点位写入数据(通过字段名) + */ + @Operation(summary = "批量向多个点位写入数据", description = "通过字段名向多个 OPC 点位批量写入数据") + @PostMapping("/batchWrite") + public R batchWriteData(@Valid @RequestBody OpcBatchWriteDataForm form) { + try { + boolean success = opcDataService.batchWriteData(form); + if (success) { + return R.ok("批量写入成功,共 " + form.getDataList().size() + " 个点位"); + } else { + return R.fail("批量写入失败,可能是部分字段名未找到对应的 OPC 节点路径,请查看日志"); + } + } catch (Exception e) { + log.error("批量写入 OPC 数据异常", e); + return R.fail("批量写入异常:" + e.getMessage()); + } + } +} + diff --git a/business/src/main/java/com/fizz/business/domain/DeviceSnapshot.java b/business/src/main/java/com/fizz/business/domain/DeviceSnapshot.java new file mode 100644 index 0000000..786f14d --- /dev/null +++ b/business/src/main/java/com/fizz/business/domain/DeviceSnapshot.java @@ -0,0 +1,48 @@ +package com.fizz.business.domain; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.fasterxml.jackson.annotation.JsonFormat; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import org.springframework.format.annotation.DateTimeFormat; + +import java.time.LocalDateTime; + +@Data +@TableName("device_snapshot") +@Schema(description = "设备数据快照") +public class DeviceSnapshot { + + @TableId(type = IdType.ASSIGN_ID) + @Schema(description = "主键") + private Long id; + + @Schema(description = "快照时间") + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private LocalDateTime createTime; + + @Schema(description = "设备idx,对应DeviceEnum.idx") + private Integer deviceId; + + @Schema(description = "设备枚举名,对应DeviceEnum.name()") + private String deviceCode; + + @Schema(description = "设备描述") + private String deviceName; + + @Schema(description = "段类型 ENTRY/PROCESS/EXIT") + private String sectionType; + + @Schema(description = "来源类型 ENTRY/FURNACE/COAT/EXIT") + private String sourceType; + + @Schema(description = "基准位置(m)") + private Double basePosition; + + @Schema(description = "快照数据JSON") + private String snapshotData; +} + diff --git a/business/src/main/java/com/fizz/business/dto/DeviceChartDataDTO.java b/business/src/main/java/com/fizz/business/dto/DeviceChartDataDTO.java new file mode 100644 index 0000000..03e8bfc --- /dev/null +++ b/business/src/main/java/com/fizz/business/dto/DeviceChartDataDTO.java @@ -0,0 +1,153 @@ +package com.fizz.business.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** + * 设备图表数据 DTO + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@Schema(description = "设备图表数据") +public class DeviceChartDataDTO implements Serializable { + private static final long serialVersionUID = 1L; + + @Schema(description = "设备类型分布饼图数据") + private PieChartData deviceTypePieChart; + + @Schema(description = "数据来源分布条形图数据") + private BarChartData sourceTypeBarChart; + + @Schema(description = "同类型设备对比折线图数据(单位相同)") + private LineChartData sameTypeDeviceCompareChart; + + @Schema(description = "设备统计数据") + private List deviceStatistics; + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @Schema(description = "饼图数据") + public static class PieChartData implements Serializable { + private static final long serialVersionUID = 1L; + + @Schema(description = "数据系列") + private List series; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @Schema(description = "饼图系列") + public static class PieSeries implements Serializable { + private static final long serialVersionUID = 1L; + + @Schema(description = "名称") + private String name; + + @Schema(description = "数值") + private Double data; + + @Schema(description = "颜色") + private String color; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @Schema(description = "条形图数据") + public static class BarChartData implements Serializable { + private static final long serialVersionUID = 1L; + + @Schema(description = "分类轴") + private List categories; + + @Schema(description = "数据系列") + private List series; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @Schema(description = "条形图系列") + public static class BarSeries implements Serializable { + private static final long serialVersionUID = 1L; + + @Schema(description = "系列名称") + private String name; + + @Schema(description = "数据值列表") + private List data; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @Schema(description = "折线图数据") + public static class LineChartData implements Serializable { + private static final long serialVersionUID = 1L; + + @Schema(description = "时间轴") + private List categories; + + @Schema(description = "数据系列") + private List series; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @Schema(description = "折线图系列") + public static class LineSeries implements Serializable { + private static final long serialVersionUID = 1L; + + @Schema(description = "系列名称") + private String name; + + @Schema(description = "数据值列表") + private List data; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @Schema(description = "设备统计数据") + public static class DeviceStatistics implements Serializable { + private static final long serialVersionUID = 1L; + + @Schema(description = "设备代码") + private String deviceCode; + + @Schema(description = "设备名称") + private String deviceName; + + @Schema(description = "平均值") + private String avg; + + @Schema(description = "最大值") + private String max; + + @Schema(description = "最小值") + private String min; + + @Schema(description = "当前值") + private String current; + } +} + diff --git a/business/src/main/java/com/fizz/business/dto/DeviceFieldTrendDTO.java b/business/src/main/java/com/fizz/business/dto/DeviceFieldTrendDTO.java new file mode 100644 index 0000000..fff8dc6 --- /dev/null +++ b/business/src/main/java/com/fizz/business/dto/DeviceFieldTrendDTO.java @@ -0,0 +1,55 @@ +package com.fizz.business.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; +import java.util.List; + +/** + * 单设备-单字段的趋势与统计 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@Schema(description = "设备字段趋势数据") +public class DeviceFieldTrendDTO implements Serializable { + private static final long serialVersionUID = 1L; + + @Schema(description = "设备代码") + private String deviceCode; + + @Schema(description = "设备名称") + private String deviceName; + + @Schema(description = "字段名") + private String fieldName; + + @Schema(description = "字段中文名(可选)") + private String fieldLabel; + + @Schema(description = "单位(可选)") + private String unit; + + @Schema(description = "时间轴(HH:mm)") + private List categories; + + @Schema(description = "数据点") + private List data; + + @Schema(description = "统计-平均值") + private Double avg; + + @Schema(description = "统计-最大值") + private Double max; + + @Schema(description = "统计-最小值") + private Double min; + + @Schema(description = "统计-最新值") + private Double last; +} diff --git a/business/src/main/java/com/fizz/business/dto/DeviceFieldTrendSubscribeReq.java b/business/src/main/java/com/fizz/business/dto/DeviceFieldTrendSubscribeReq.java new file mode 100644 index 0000000..3b7ea61 --- /dev/null +++ b/business/src/main/java/com/fizz/business/dto/DeviceFieldTrendSubscribeReq.java @@ -0,0 +1,26 @@ +package com.fizz.business.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +/** + * WS 订阅:设备字段趋势 + */ +@Data +@Schema(description = "设备字段趋势订阅请求") +public class DeviceFieldTrendSubscribeReq implements Serializable { + private static final long serialVersionUID = 1L; + + @Schema(description = "设备代码列表,为空表示不变") + private List deviceCodes; + + @Schema(description = "字段名列表,为空表示不变") + private List fieldNames; + + @Schema(description = "是否只推送可见范围(用于懒加载),true=只推 deviceCodes/fieldNames 命中的数据") + private Boolean lazy; +} + diff --git a/business/src/main/java/com/fizz/business/dto/DeviceHistoryTrendDTO.java b/business/src/main/java/com/fizz/business/dto/DeviceHistoryTrendDTO.java new file mode 100644 index 0000000..e4555b6 --- /dev/null +++ b/business/src/main/java/com/fizz/business/dto/DeviceHistoryTrendDTO.java @@ -0,0 +1,54 @@ +package com.fizz.business.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** + * 设备历史趋势数据 DTO + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@Schema(description = "设备历史趋势数据") +public class DeviceHistoryTrendDTO implements Serializable { + private static final long serialVersionUID = 1L; + + @Schema(description = "设备代码") + private String deviceCode; + + @Schema(description = "设备描述") + private String deviceName; + + @Schema(description = "时间轴(格式:HH:mm)") + private List categories; + + @Schema(description = "数据系列,key为字段名,value为数据值列表") + private List series; + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @Schema(description = "数据系列") + public static class SeriesData implements Serializable { + private static final long serialVersionUID = 1L; + + @Schema(description = "系列名称(字段标签)") + private String name; + + @Schema(description = "字段名") + private String fieldName; + + @Schema(description = "数据值列表") + private List data; + } +} + diff --git a/business/src/main/java/com/fizz/business/form/OpcBatchWriteDataForm.java b/business/src/main/java/com/fizz/business/form/OpcBatchWriteDataForm.java new file mode 100644 index 0000000..594c8d7 --- /dev/null +++ b/business/src/main/java/com/fizz/business/form/OpcBatchWriteDataForm.java @@ -0,0 +1,27 @@ +package com.fizz.business.form; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +import javax.validation.Valid; +import javax.validation.constraints.NotEmpty; +import java.io.Serializable; +import java.util.List; + +/** + * OPC 批量写数据请求表单 + */ +@Data +@Schema(description = "OPC 批量写数据请求") +public class OpcBatchWriteDataForm implements Serializable { + private static final long serialVersionUID = 1L; + + /** + * 批量写数据列表 + */ + @Schema(description = "批量写数据列表") + @NotEmpty(message = "写数据列表不能为空") + @Valid + private List dataList; +} + diff --git a/business/src/main/java/com/fizz/business/form/OpcWriteDataForm.java b/business/src/main/java/com/fizz/business/form/OpcWriteDataForm.java new file mode 100644 index 0000000..12f03e9 --- /dev/null +++ b/business/src/main/java/com/fizz/business/form/OpcWriteDataForm.java @@ -0,0 +1,35 @@ +package com.fizz.business.form; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** + * OPC 写数据请求表单 + */ +@Data +@Schema(description = "OPC 写数据请求") +public class OpcWriteDataForm implements Serializable { + private static final long serialVersionUID = 1L; + + /** + * 字段名(对象属性名),例如:coilId + * 系统会自动查找对应的 OPC 节点路径 + */ + @Schema(description = "字段名(对象属性名)", example = "coilId") + @NotBlank(message = "字段名不能为空") + private String fieldName; + + /** + * 要写入的值(支持多种类型:String, Integer, Double, Boolean 等) + */ + @Schema(description = "要写入的值", example = "COIL001") + @NotNull(message = "写入值不能为空") + private Object value; +} + diff --git a/business/src/main/java/com/fizz/business/mapper/DeviceSnapshotMapper.java b/business/src/main/java/com/fizz/business/mapper/DeviceSnapshotMapper.java new file mode 100644 index 0000000..4a3f717 --- /dev/null +++ b/business/src/main/java/com/fizz/business/mapper/DeviceSnapshotMapper.java @@ -0,0 +1,10 @@ +package com.fizz.business.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.fizz.business.domain.DeviceSnapshot; +import org.apache.ibatis.annotations.Mapper; + +@Mapper +public interface DeviceSnapshotMapper extends BaseMapper { +} + diff --git a/business/src/main/java/com/fizz/business/schedule/DeviceSnapshotSchedule.java b/business/src/main/java/com/fizz/business/schedule/DeviceSnapshotSchedule.java new file mode 100644 index 0000000..2a6fe69 --- /dev/null +++ b/business/src/main/java/com/fizz/business/schedule/DeviceSnapshotSchedule.java @@ -0,0 +1,40 @@ +package com.fizz.business.schedule; + +import com.fizz.business.service.DeviceSnapshotService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +/** + * 设备数据快照定时任务 + * 每10分钟执行一次设备数据快照 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class DeviceSnapshotSchedule { + + private final DeviceSnapshotService deviceSnapshotService; + + /** + * 每10分钟执行一次设备数据快照 + * 使用cron表达式:0 0/10 * * * ? + * 表示每10分钟执行一次,在每分钟的0秒触发 + */ + @Scheduled(cron = "0 0/10 * * * ?") + public void captureDeviceSnapshots() { + try { + log.info("开始执行设备数据快照任务..."); + long startTime = System.currentTimeMillis(); + + // 创建设备数据快照 + deviceSnapshotService.createDeviceSnapshots(); + + long endTime = System.currentTimeMillis(); + log.info("设备数据快照任务执行完成,耗时:{}ms", (endTime - startTime)); + } catch (Exception e) { + log.error("执行设备数据快照任务失败: {}", e.getMessage(), e); + } + } +} diff --git a/business/src/main/java/com/fizz/business/service/DeviceSnapshotService.java b/business/src/main/java/com/fizz/business/service/DeviceSnapshotService.java new file mode 100644 index 0000000..99f58a6 --- /dev/null +++ b/business/src/main/java/com/fizz/business/service/DeviceSnapshotService.java @@ -0,0 +1,787 @@ +package com.fizz.business.service; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.fizz.business.constants.enums.DeviceEnum; +import com.fizz.business.constants.enums.WsTypeEnum; +import com.fizz.business.domain.DeviceSnapshot; +import com.fizz.business.domain.msg.*; +import com.fizz.business.dto.DeviceChartDataDTO; +import com.fizz.business.dto.DeviceFieldTrendDTO; +import com.fizz.business.dto.DeviceHistoryTrendDTO; +import com.fizz.business.mapper.DeviceSnapshotMapper; +import com.fizz.business.utils.WebSocketUtil; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * 设备数据快照服务 + */ +@Slf4j +@Service +@RequiredArgsConstructor +public class DeviceSnapshotService { + + // 使用ConcurrentHashMap存储最新的测量数据 + private final Map, OpcMessage> latestMeasurements = new ConcurrentHashMap<>(); + private final DeviceSnapshotMapper deviceSnapshotMapper; + private final ObjectMapper objectMapper; + + /** + * 更新最新的测量数据 + * @param message 测量数据消息 + */ + public void updateLatestMeasurement(OpcMessage message) { + if (message != null) { + latestMeasurements.put(message.getClass(), message); + } + } + + /** + * 创建当前所有设备的数据快照 + */ + @Transactional(rollbackFor = Exception.class) + public void createDeviceSnapshots() { + try { + // 获取当前时间戳 + LocalDateTime now = LocalDateTime.now(); + + // 获取最新的测量数据 + AppMeasureEntryMessage entry = (AppMeasureEntryMessage) latestMeasurements.get(AppMeasureEntryMessage.class); + AppMeasureFurnaceMessage furnace = (AppMeasureFurnaceMessage) latestMeasurements.get(AppMeasureFurnaceMessage.class); + AppMeasureCoatMessage coat = (AppMeasureCoatMessage) latestMeasurements.get(AppMeasureCoatMessage.class); + AppMeasureExitMessage exit = (AppMeasureExitMessage) latestMeasurements.get(AppMeasureExitMessage.class); + + if (entry == null && furnace == null && coat == null && exit == null) { + log.warn("没有可用的测量数据,跳过快照创建"); + return; + } + + // 为每个设备创建快照 + List snapshots = new ArrayList<>(); + for (DeviceEnum device : DeviceEnum.values()) { + try { + DeviceSnapshot snapshot = createDeviceSnapshot(device, now, entry, furnace, coat, exit); + if (snapshot != null) { + snapshots.add(snapshot); + } + } catch (Exception e) { + log.error("创建设备[{}]快照失败: {}", device.name(), e.getMessage(), e); + } + } + + // 批量保存快照 + if (!snapshots.isEmpty()) { + for (DeviceSnapshot snapshot : snapshots) { + deviceSnapshotMapper.insert(snapshot); + } + log.info("成功创建{}个设备数据快照", snapshots.size()); + + // 推送所有设备的历史趋势数据 + pushAllDevicesHistoryTrend(); + + // 推送设备统计数据 + pushAllChartData(); + + // 推送每个设备每个字段的趋势数据(供前端画“点位趋势小图”) + pushAllDeviceFieldTrends(); + } + } catch (Exception e) { + log.error("创建设备数据快照失败: {}", e.getMessage(), e); + throw e; + } + } + + /** + * 推送所有设备的历史趋势数据(今天一天的数据,10分钟间隔) + */ + public void pushAllDevicesHistoryTrend() { + try { + for (DeviceEnum device : DeviceEnum.values()) { + DeviceHistoryTrendDTO trendData = getDeviceHistoryTrend(device.name()); + if (trendData != null && trendData.getSeries() != null && !trendData.getSeries().isEmpty()) { + WebSocketUtil.sendHistoryTrendMessage(trendData); + } + } + } catch (Exception e) { + log.error("推送设备历史趋势数据失败: {}", e.getMessage(), e); + } + } + + /** + * 推送设备统计数据 + */ + public void pushAllChartData() { + try { + DeviceChartDataDTO chartData = buildAllChartData(); + if (chartData != null) { + WebSocketUtil.sendChartDataMessage(chartData); + } + } catch (Exception e) { + log.error("推送统计数据失败: {}", e.getMessage(), e); + } + } + + /** + * 推送每个设备每个字段的趋势数据(今天范围,10分钟间隔采样) + */ + public void pushAllDeviceFieldTrends() { + try { + for (DeviceEnum device : DeviceEnum.values()) { + List fields = device.getParamFields(); + if (fields == null || fields.isEmpty()) continue; + for (String field : fields) { + DeviceFieldTrendDTO dto = getDeviceFieldTrend(device.name(), field); + if (dto != null) { + WebSocketUtil.sendDeviceFieldTrendMessage(dto); + } + } + } + } catch (Exception e) { + log.error("推送设备字段趋势失败: {}", e.getMessage(), e); + } + } + + /** + * 获取 单设备-单字段 今日趋势与统计(10分钟采样) + */ + public DeviceFieldTrendDTO getDeviceFieldTrend(String deviceCode, String fieldName) { + try { + DeviceEnum device = DeviceEnum.fromName(deviceCode); + if (device == null) return null; + + // 今天范围 + LocalDate today = LocalDate.now(); + LocalDateTime startTime = today.atStartOfDay(); + LocalDateTime endTime = today.atTime(LocalTime.MAX); + + // 查询今天的快照 + LambdaQueryWrapper qw = new LambdaQueryWrapper() + .eq(DeviceSnapshot::getDeviceCode, deviceCode) + .between(DeviceSnapshot::getCreateTime, startTime, endTime) + .orderByAsc(DeviceSnapshot::getCreateTime); + + List rows = deviceSnapshotMapper.selectList(qw); + if (rows == null || rows.isEmpty()) return null; + + // 10分钟采样 + List sampled = sampleDataByInterval(rows, 10); + + DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm"); + List categories = new ArrayList<>(sampled.size()); + List data = new ArrayList<>(sampled.size()); + + for (DeviceSnapshot s : sampled) { + categories.add(s.getCreateTime().format(timeFormatter)); + double v = 0.0; + try { + Map json = objectMapper.readValue(s.getSnapshotData(), Map.class); + Object raw = json.get(fieldName); + if (raw instanceof Number) v = ((Number) raw).doubleValue(); + } catch (Exception ignore) { + } + data.add(v); + } + + // 统计(忽略全0) + List valid = new ArrayList<>(); + for (Double v : data) { + if (v != null && v != 0.0) valid.add(v); + } + + Double avg = null, max = null, min = null; + if (!valid.isEmpty()) { + double sum = 0; + max = valid.get(0); + min = valid.get(0); + for (Double v : valid) { + sum += v; + if (v > max) max = v; + if (v < min) min = v; + } + avg = sum / valid.size(); + } + + Double last = data.isEmpty() ? null : data.get(data.size() - 1); + + return DeviceFieldTrendDTO.builder() + .deviceCode(deviceCode) + .deviceName(device.getDesc()) + .fieldName(fieldName) + // label/unit 目前后端没有统一来源,这里先留空,前端可用 fieldMeta 补齐 + .fieldLabel(null) + .unit(null) + .categories(categories) + .data(data) + .avg(avg) + .max(max) + .min(min) + .last(last) + .build(); + + } catch (Exception e) { + log.error("获取设备字段趋势失败, deviceCode={}, fieldName={}, err={}", deviceCode, fieldName, e.getMessage(), e); + return null; + } + } + + /** + * 构建设备图表数据 + */ + private DeviceChartDataDTO buildAllChartData() { + try { + // 1. 设备类型分布饼图 + DeviceChartDataDTO.PieChartData pieChartData = buildDeviceTypePieChart(); + + // 2. 数据来源分布条形图 + DeviceChartDataDTO.BarChartData barChartData = buildSourceTypeBarChart(); + + // 3. 同类型设备对比折线图(只对比单位相同的设备) + DeviceChartDataDTO.LineChartData lineChartData = buildSameTypeDeviceCompareChart(); + + // 4. 设备统计数据 + List statistics = buildDeviceStatistics(); + + return DeviceChartDataDTO.builder() + .deviceTypePieChart(pieChartData) + .sourceTypeBarChart(barChartData) + .sameTypeDeviceCompareChart(lineChartData) + .deviceStatistics(statistics) + .build(); + } catch (Exception e) { + log.error("构建图表数据失败: {}", e.getMessage(), e); + return null; + } + } + + /** + * 构建设备类型分布饼图数据 + */ + private DeviceChartDataDTO.PieChartData buildDeviceTypePieChart() { + Map distribution = new HashMap<>(); + distribution.put("入口段", 0); + distribution.put("处理段", 0); + distribution.put("出口段", 0); + distribution.put("其他", 0); + + for (DeviceEnum device : DeviceEnum.values()) { + String sectionType = device.getSectionType().name(); + if ("ENTRY".equals(sectionType)) { + distribution.put("入口段", distribution.get("入口段") + 1); + } else if ("PROCESS".equals(sectionType)) { + distribution.put("处理段", distribution.get("处理段") + 1); + } else if ("EXIT".equals(sectionType)) { + distribution.put("出口段", distribution.get("出口段") + 1); + } else { + distribution.put("其他", distribution.get("其他") + 1); + } + } + + String[] colors = {"#0066cc", "#409eff", "#66b1ff", "#a0cfff"}; + List series = new ArrayList<>(); + int colorIndex = 0; + + for (Map.Entry entry : distribution.entrySet()) { + if (entry.getValue() > 0) { + series.add(DeviceChartDataDTO.PieSeries.builder() + .name(entry.getKey()) + .data(entry.getValue().doubleValue()) + .color(colors[colorIndex % colors.length]) + .build()); + colorIndex++; + } + } + + return DeviceChartDataDTO.PieChartData.builder() + .series(series) + .build(); + } + + /** + * 构建数据来源分布条形图数据 + */ + private DeviceChartDataDTO.BarChartData buildSourceTypeBarChart() { + Map distribution = new HashMap<>(); + distribution.put("ENTRY", 0); + distribution.put("FURNACE", 0); + distribution.put("COAT", 0); + distribution.put("EXIT", 0); + + for (DeviceEnum device : DeviceEnum.values()) { + String sourceType = device.getSourceType().name(); + if (distribution.containsKey(sourceType)) { + distribution.put(sourceType, distribution.get(sourceType) + 1); + } + } + + List categories = new ArrayList<>(); + List data = new ArrayList<>(); + + for (Map.Entry entry : distribution.entrySet()) { + if (entry.getValue() > 0) { + categories.add(entry.getKey()); + data.add(entry.getValue().doubleValue()); + } + } + + return DeviceChartDataDTO.BarChartData.builder() + .categories(categories) + .series(Collections.singletonList( + DeviceChartDataDTO.BarSeries.builder() + .name("设备数量") + .data(data) + .build() + )) + .build(); + } + + /** + * 构建同类型设备对比折线图数据(只对比单位相同的设备) + */ + private DeviceChartDataDTO.LineChartData buildSameTypeDeviceCompareChart() { + // 获取今天的数据 + LocalDate today = LocalDate.now(); + LocalDateTime startTime = today.atStartOfDay(); + LocalDateTime endTime = today.atTime(LocalTime.MAX); + + // 按数据来源分组,同一来源的设备单位相同,可以对比 + Map> devicesBySource = new HashMap<>(); + for (DeviceEnum device : DeviceEnum.values()) { + if (device.getParamFields() == null || device.getParamFields().isEmpty()) { + continue; + } + String sourceType = device.getSourceType().name(); + devicesBySource.computeIfAbsent(sourceType, k -> new ArrayList<>()).add(device); + } + + // 选择设备数量最多的数据源,取前5个设备进行对比 + String selectedSource = devicesBySource.entrySet().stream() + .max(Map.Entry.comparingByValue((a, b) -> Integer.compare(a.size(), b.size()))) + .map(Map.Entry::getKey) + .orElse(null); + + if (selectedSource == null || devicesBySource.get(selectedSource).isEmpty()) { + return null; + } + + List selectedDevices = devicesBySource.get(selectedSource).stream() + .limit(5) + .collect(Collectors.toList()); + + List categories = new ArrayList<>(); + List series = new ArrayList<>(); + + // 生成时间轴(最近10个时间点,每10分钟一个) + DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm"); + LocalDateTime now = LocalDateTime.now(); + for (int i = 9; i >= 0; i--) { + LocalDateTime time = now.minusMinutes(i * 10L); + categories.add(time.format(timeFormatter)); + } + + // 为每个设备生成数据系列 + for (DeviceEnum device : selectedDevices) { + if (device.getParamFields() == null || device.getParamFields().isEmpty()) { + continue; + } + + // 获取该设备今天的历史数据 + LambdaQueryWrapper qw = new LambdaQueryWrapper() + .eq(DeviceSnapshot::getDeviceCode, device.name()) + .between(DeviceSnapshot::getCreateTime, startTime, endTime) + .orderByAsc(DeviceSnapshot::getCreateTime); + + List rows = deviceSnapshotMapper.selectList(qw); + List sampledRows = sampleDataByInterval(rows, 10); + + // 使用第一个字段的数据 + String firstField = device.getParamFields().get(0); + List data = new ArrayList<>(); + + // 填充数据(如果历史数据不足,用0填充) + for (int i = 0; i < 10; i++) { + if (i < sampledRows.size()) { + try { + Map jsonData = objectMapper.readValue( + sampledRows.get(i).getSnapshotData(), Map.class); + Object value = jsonData.get(firstField); + double numValue = 0.0; + if (value instanceof Number) { + numValue = ((Number) value).doubleValue(); + } + data.add(numValue); + } catch (Exception e) { + data.add(0.0); + } + } else { + data.add(0.0); + } + } + + series.add(DeviceChartDataDTO.LineSeries.builder() + .name(device.getDesc()) + .data(data) + .build()); + } + + if (series.isEmpty()) { + return null; + } + + return DeviceChartDataDTO.LineChartData.builder() + .categories(categories) + .series(series) + .build(); + } + + /** + * 构建设备统计数据 + */ + private List buildDeviceStatistics() { + List statistics = new ArrayList<>(); + LocalDate today = LocalDate.now(); + LocalDateTime startTime = today.atStartOfDay(); + LocalDateTime endTime = today.atTime(LocalTime.MAX); + + // 获取最新的测量数据 + AppMeasureEntryMessage entry = (AppMeasureEntryMessage) latestMeasurements.get(AppMeasureEntryMessage.class); + AppMeasureFurnaceMessage furnace = (AppMeasureFurnaceMessage) latestMeasurements.get(AppMeasureFurnaceMessage.class); + AppMeasureCoatMessage coat = (AppMeasureCoatMessage) latestMeasurements.get(AppMeasureCoatMessage.class); + AppMeasureExitMessage exit = (AppMeasureExitMessage) latestMeasurements.get(AppMeasureExitMessage.class); + + for (DeviceEnum device : DeviceEnum.values()) { + if (device.getParamFields() == null || device.getParamFields().isEmpty()) { + continue; + } + + String firstField = device.getParamFields().get(0); + + // 获取当前值 + double currentValue = 0.0; + switch (device.getSourceType()) { + case ENTRY: + if (entry != null) { + currentValue = getFieldValue(entry, firstField); + } + break; + case FURNACE: + if (furnace != null) { + currentValue = getFieldValue(furnace, firstField); + } + break; + case COAT: + if (coat != null) { + currentValue = getFieldValue(coat, firstField); + } + break; + case EXIT: + if (exit != null) { + currentValue = getFieldValue(exit, firstField); + } + break; + } + + // 从历史数据中计算统计值 + LambdaQueryWrapper qw = new LambdaQueryWrapper() + .eq(DeviceSnapshot::getDeviceCode, device.name()) + .between(DeviceSnapshot::getCreateTime, startTime, endTime) + .orderByAsc(DeviceSnapshot::getCreateTime); + + List rows = deviceSnapshotMapper.selectList(qw); + List values = new ArrayList<>(); + + for (DeviceSnapshot snapshot : rows) { + try { + Map jsonData = objectMapper.readValue( + snapshot.getSnapshotData(), Map.class); + Object value = jsonData.get(firstField); + if (value instanceof Number) { + double numValue = ((Number) value).doubleValue(); + if (numValue > 0) { + values.add(numValue); + } + } + } catch (Exception e) { + // ignore + } + } + + String avg = "—"; + String max = "—"; + String min = "—"; + + if (!values.isEmpty()) { + double avgValue = values.stream().mapToDouble(Double::doubleValue).average().orElse(0.0); + double maxValue = values.stream().mapToDouble(Double::doubleValue).max().orElse(0.0); + double minValue = values.stream().mapToDouble(Double::doubleValue).min().orElse(0.0); + + avg = String.format("%.2f", avgValue); + max = String.format("%.2f", maxValue); + min = String.format("%.2f", minValue); + } + + statistics.add(DeviceChartDataDTO.DeviceStatistics.builder() + .deviceCode(device.name()) + .deviceName(device.getDesc()) + .avg(avg) + .max(max) + .min(min) + .current(currentValue > 0 ? String.format("%.2f", currentValue) : "—") + .build()); + } + + return statistics; + } + + /** + * 获取字段值 + */ + private double getFieldValue(Object message, String fieldName) { + try { + java.lang.reflect.Field field = message.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + Object value = field.get(message); + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + } catch (Exception e) { + // ignore + } + return 0.0; + } + + /** + * 获取设备今天一天的历史趋势数据(10分钟间隔采样) + * @param deviceCode 设备代码 + * @return 历史趋势数据 + */ + public DeviceHistoryTrendDTO getDeviceHistoryTrend(String deviceCode) { + try { + // 获取今天开始和结束时间 + LocalDate today = LocalDate.now(); + LocalDateTime startTime = today.atStartOfDay(); + LocalDateTime endTime = today.atTime(LocalTime.MAX); + + // 查询今天的所有快照数据 + LambdaQueryWrapper qw = new LambdaQueryWrapper() + .eq(DeviceSnapshot::getDeviceCode, deviceCode) + .between(DeviceSnapshot::getCreateTime, startTime, endTime) + .orderByAsc(DeviceSnapshot::getCreateTime); + + List rows = deviceSnapshotMapper.selectList(qw); + if (rows == null || rows.isEmpty()) { + return null; + } + + // 按10分钟间隔采样数据 + List sampledRows = sampleDataByInterval(rows, 10); + + // 获取设备信息 + DeviceEnum device = DeviceEnum.valueOf(deviceCode); + if (device == null) { + return null; + } + + // 构建历史趋势数据 + List categories = new ArrayList<>(); + Map> seriesMap = new HashMap<>(); + List fields = device.getParamFields(); + + if (fields == null || fields.isEmpty()) { + return null; + } + + // 初始化系列数据 + for (String field : fields) { + seriesMap.put(field, new ArrayList<>()); + } + + // 处理采样后的数据 + DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm"); + for (DeviceSnapshot snapshot : sampledRows) { + categories.add(snapshot.getCreateTime().format(timeFormatter)); + + Map jsonData = new HashMap<>(); + try { + jsonData = objectMapper.readValue(snapshot.getSnapshotData(), Map.class); + } catch (Exception e) { + log.warn("解析快照数据失败: {}", snapshot.getSnapshotData()); + } + + for (String field : fields) { + Object value = jsonData.get(field); + double numValue = 0.0; + if (value instanceof Number) { + numValue = ((Number) value).doubleValue(); + } + seriesMap.get(field).add(numValue); + } + } + + // 构建系列数据(最多6个字段) + List series = fields.stream() + .limit(6) + .map(field -> DeviceHistoryTrendDTO.SeriesData.builder() + .fieldName(field) + .name(field) // 前端可以根据字段元数据替换为标签 + .data(seriesMap.get(field)) + .build()) + .collect(Collectors.toList()); + + return DeviceHistoryTrendDTO.builder() + .deviceCode(deviceCode) + .deviceName(device.getDesc()) + .categories(categories) + .series(series) + .build(); + + } catch (Exception e) { + log.error("获取设备[{}]历史趋势数据失败: {}", deviceCode, e.getMessage(), e); + return null; + } + } + + /** + * 按时间间隔采样数据(每N分钟取一条) + * @param rows 原始数据列表 + * @param intervalMinutes 间隔分钟数 + * @return 采样后的数据列表 + */ + private List sampleDataByInterval(List rows, int intervalMinutes) { + if (rows == null || rows.isEmpty()) { + return new ArrayList<>(); + } + + List sampled = new ArrayList<>(); + LocalDateTime lastTime = null; + + for (DeviceSnapshot row : rows) { + LocalDateTime rowTime = row.getCreateTime(); + + if (lastTime == null) { + // 第一条数据 + sampled.add(row); + lastTime = rowTime; + } else { + // 计算时间差(分钟) + long diffMinutes = java.time.Duration.between(lastTime, rowTime).toMinutes(); + + // 如果时间差大于等于间隔,则采样这条数据 + if (diffMinutes >= intervalMinutes) { + sampled.add(row); + lastTime = rowTime; + } + } + } + + return sampled; + } + + /** + * 创建设备快照 + */ + private DeviceSnapshot createDeviceSnapshot(DeviceEnum device, LocalDateTime timestamp, + AppMeasureEntryMessage entry, + AppMeasureFurnaceMessage furnace, + AppMeasureCoatMessage coat, + AppMeasureExitMessage exit) { + try { + // 创建快照对象 + DeviceSnapshot snapshot = new DeviceSnapshot(); + snapshot.setCreateTime(timestamp); + snapshot.setDeviceId(device.getIdx()); + snapshot.setDeviceCode(device.name()); + snapshot.setDeviceName(device.getDesc()); + snapshot.setSectionType(device.getSectionType().name()); + snapshot.setSourceType(device.getSourceType().name()); + snapshot.setBasePosition(device.getBasePosition()); + + // 根据设备类型获取对应的测量数据 + ObjectNode dataNode = objectMapper.createObjectNode(); + switch (device.getSourceType()) { + case ENTRY: + if (entry != null) { + extractData(entry, device.getParamFields(), dataNode); + } + break; + case FURNACE: + if (furnace != null) { + extractData(furnace, device.getParamFields(), dataNode); + } + break; + case COAT: + if (coat != null) { + extractData(coat, device.getParamFields(), dataNode); + } + break; + case EXIT: + if (exit != null) { + extractData(exit, device.getParamFields(), dataNode); + } + break; + } + + // 如果没有数据,则跳过 + if (dataNode.isEmpty()) { + return null; + } + + // 设置快照数据 + snapshot.setSnapshotData(dataNode.toString()); + return snapshot; + } catch (Exception e) { + log.error("创建设备[{}]快照时发生错误: {}", device.name(), e.getMessage(), e); + return null; + } + } + + /** + * 从消息对象中提取指定字段的数据 + */ + private void extractData(Object message, List fields, ObjectNode dataNode) { + if (message == null || fields == null || fields.isEmpty()) { + return; + } + + try { + for (String field : fields) { + try { + // 使用反射获取字段值 + java.lang.reflect.Field declaredField = message.getClass().getDeclaredField(field); + declaredField.setAccessible(true); + Object value = declaredField.get(message); + + // 处理不同类型的值 + if (value != null) { + if (value instanceof Number) { + dataNode.put(field, (Number) value); + } else if (value instanceof Boolean) { + dataNode.put(field, (Boolean) value); + } else if (value instanceof String) { + dataNode.put(field, (String) value); + } else if (value instanceof Enum) { + dataNode.put(field, value.toString()); + } + } + } catch (NoSuchFieldException e) { + log.trace("字段[{}]在消息[{}]中不存在", field, message.getClass().getSimpleName()); + } catch (Exception e) { + log.warn("获取字段[{}]值失败: {}", field, e.getMessage()); + } + } + } catch (Exception e) { + log.error("提取数据时发生错误: {}", e.getMessage(), e); + } + } +} diff --git a/business/src/main/java/com/fizz/business/service/OpcDataService.java b/business/src/main/java/com/fizz/business/service/OpcDataService.java new file mode 100644 index 0000000..eb04b8f --- /dev/null +++ b/business/src/main/java/com/fizz/business/service/OpcDataService.java @@ -0,0 +1,25 @@ +package com.fizz.business.service; + +import com.fizz.business.form.OpcBatchWriteDataForm; +import com.fizz.business.form.OpcWriteDataForm; + +/** + * OPC 数据读写服务接口 + */ +public interface OpcDataService { + + /** + * 向单个点位写入数据(通过字段名) + * @param form 写数据请求表单 + * @return 是否写入成功 + */ + boolean writeData(OpcWriteDataForm form); + + /** + * 批量向多个点位写入数据(通过字段名) + * @param form 批量写数据请求表单 + * @return 是否全部写入成功 + */ + boolean batchWriteData(OpcBatchWriteDataForm form); +} + diff --git a/business/src/main/java/com/fizz/business/service/client/DeviceFieldTrendSessionState.java b/business/src/main/java/com/fizz/business/service/client/DeviceFieldTrendSessionState.java new file mode 100644 index 0000000..c2303b6 --- /dev/null +++ b/business/src/main/java/com/fizz/business/service/client/DeviceFieldTrendSessionState.java @@ -0,0 +1,45 @@ +package com.fizz.business.service.client; + +import lombok.Data; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * 每个 WS session 的订阅状态(懒加载用) + */ +@Data +public class DeviceFieldTrendSessionState { + + /** + * 订阅的 deviceCode 集合;为空代表未订阅任何(不推送) + */ + private Set deviceCodes = new HashSet<>(); + + /** + * 订阅的 fieldName 集合;为空代表未订阅任何(不推送) + */ + private Set fieldNames = new HashSet<>(); + + /** + * 是否启用懒加载过滤 + */ + private boolean lazy = true; + + public boolean match(String deviceCode, String fieldName) { + if (!lazy) return true; + if (deviceCodes != null && !deviceCodes.isEmpty() && !deviceCodes.contains(deviceCode)) return false; + if (fieldNames != null && !fieldNames.isEmpty() && !fieldNames.contains(fieldName)) return false; + return true; + } + + public static DeviceFieldTrendSessionState emptyLazyOn() { + DeviceFieldTrendSessionState s = new DeviceFieldTrendSessionState(); + s.setLazy(true); + s.setDeviceCodes(Collections.emptySet()); + s.setFieldNames(Collections.emptySet()); + return s; + } +} + diff --git a/business/src/main/java/com/fizz/business/service/client/TrackWsHandler.java b/business/src/main/java/com/fizz/business/service/client/TrackWsHandler.java index b067a94..b5a48ff 100644 --- a/business/src/main/java/com/fizz/business/service/client/TrackWsHandler.java +++ b/business/src/main/java/com/fizz/business/service/client/TrackWsHandler.java @@ -1,5 +1,8 @@ package com.fizz.business.service.client; +import cn.hutool.json.JSONUtil; +import com.fizz.business.constants.enums.WsTypeEnum; +import com.fizz.business.dto.DeviceFieldTrendSubscribeReq; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.web.socket.CloseStatus; @@ -7,8 +10,7 @@ import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.AbstractWebSocketHandler; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** @@ -19,8 +21,22 @@ import java.util.concurrent.ConcurrentHashMap; @Component public class TrackWsHandler extends AbstractWebSocketHandler { + public static DeviceFieldTrendSessionState getFieldTrendState(String sessionId) { + return fieldTrendStates.get(sessionId); + } + + public static Map getAllFieldTrendStates() { + return fieldTrendStates; + } + private static final ConcurrentHashMap> clients = new ConcurrentHashMap<>(); + /** + * 懒加载订阅状态:只用于 device_field_trend + * sessionId -> state + */ + private static final ConcurrentHashMap fieldTrendStates = new ConcurrentHashMap<>(); + /** * socket 建立成功事件 * @@ -38,6 +54,12 @@ public class TrackWsHandler extends AbstractWebSocketHandler { sessionMap.put(sid, session); // 添加 session clients.put(type, sessionMap); + + // device_field_trend 默认启用懒加载:不主动推送,等前端发送订阅消息 + if (WsTypeEnum.device_field_trend.name().equals(type)) { + fieldTrendStates.put(sid, DeviceFieldTrendSessionState.emptyLazyOn()); + } + log.info("[websocket]建立连接:{}", type + sid); } catch (Exception e) { log.error("[websocket]建立连接错误:{}", e.getMessage(), e); @@ -58,6 +80,35 @@ public class TrackWsHandler extends AbstractWebSocketHandler { // 心跳 if (payload.equals("ping")) { session.sendMessage(new TextMessage("pong")); + return; + } + + // device_field_trend 订阅(懒加载) + String type = (String) session.getAttributes().get("type"); + if (WsTypeEnum.device_field_trend.name().equals(type)) { + try { + DeviceFieldTrendSubscribeReq req = JSONUtil.toBean(payload, DeviceFieldTrendSubscribeReq.class); + DeviceFieldTrendSessionState state = fieldTrendStates.get(session.getId()); + if (state == null) { + state = DeviceFieldTrendSessionState.emptyLazyOn(); + fieldTrendStates.put(session.getId(), state); + } + if (req.getLazy() != null) { + state.setLazy(req.getLazy()); + } + if (req.getDeviceCodes() != null) { + state.setDeviceCodes(new HashSet<>(req.getDeviceCodes())); + } + if (req.getFieldNames() != null) { + state.setFieldNames(new HashSet<>(req.getFieldNames())); + } + log.info("[websocket]device_field_trend订阅更新 sid={}, lazy={}, deviceCodes.size={}, fieldNames.size={}", + session.getId(), state.isLazy(), + state.getDeviceCodes() == null ? 0 : state.getDeviceCodes().size(), + state.getFieldNames() == null ? 0 : state.getFieldNames().size()); + } catch (Exception e) { + log.error("[websocket]解析device_field_trend订阅失败 sid={}, payload={}", session.getId(), payload, e); + } } } @@ -92,6 +143,10 @@ public class TrackWsHandler extends AbstractWebSocketHandler { if (webSocketSession != null) { webSocketSession.close(); } + // 如果是 field_trend 连接,清除订阅状态 + if (WsTypeEnum.device_field_trend.name().equals(type)) { + fieldTrendStates.remove(sid); + } } log.info("[websocket]连接关闭:{}", type + "-" + sid); } catch (Exception e) { diff --git a/business/src/main/java/com/fizz/business/service/hanle/AppMeasureHandler.java b/business/src/main/java/com/fizz/business/service/hanle/AppMeasureHandler.java index eaf01f7..4c9a6f0 100644 --- a/business/src/main/java/com/fizz/business/service/hanle/AppMeasureHandler.java +++ b/business/src/main/java/com/fizz/business/service/hanle/AppMeasureHandler.java @@ -4,6 +4,7 @@ import com.fizz.business.anno.OpcMessageHandlerType; import com.fizz.business.constants.enums.OpcMessageType; import com.fizz.business.domain.msg.*; import com.fizz.business.dto.MatmapDTO; +import com.fizz.business.service.DeviceSnapshotService; import com.fizz.business.service.LogDataService; import com.fizz.business.service.OpcMessageHandler; import com.fizz.business.service.strip.SegmentTrackerService; @@ -27,6 +28,7 @@ public class AppMeasureHandler implements OpcMessageHandler { private final SegmentTrackerService tracker; private final LogDataService logDataService; private final Executor coilTrackExecutor; // 注入线程池接口 + private final DeviceSnapshotService deviceSnapshotService; private static final AtomicInteger LOG_COUNTER = new AtomicInteger(0); @@ -39,6 +41,12 @@ public class AppMeasureHandler implements OpcMessageHandler { AppMeasureCoatMessage coat = message.getAppMeasureCoatMessage(); AppMeasureExitMessage exit = message.getAppMeasureExitMessage(); + // 缓存最新测量数据,供10分钟快照任务使用 + if (entry != null) deviceSnapshotService.updateLatestMeasurement(entry); + if (furnace != null) deviceSnapshotService.updateLatestMeasurement(furnace); + if (coat != null) deviceSnapshotService.updateLatestMeasurement(coat); + if (exit != null) deviceSnapshotService.updateLatestMeasurement(exit); + WebSocketUtil.sendMeasureMsg(message); diff --git a/business/src/main/java/com/fizz/business/service/impl/OpcDataServiceImpl.java b/business/src/main/java/com/fizz/business/service/impl/OpcDataServiceImpl.java new file mode 100644 index 0000000..db4d49f --- /dev/null +++ b/business/src/main/java/com/fizz/business/service/impl/OpcDataServiceImpl.java @@ -0,0 +1,52 @@ +package com.fizz.business.service.impl; + +import com.fizz.business.comm.OPC.OpcMessageSend; +import com.fizz.business.form.OpcBatchWriteDataForm; +import com.fizz.business.form.OpcWriteDataForm; +import com.fizz.business.service.OpcDataService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + +import java.util.Map; +import java.util.stream.Collectors; + +/** + * OPC 数据读写服务实现类 + */ +@Slf4j +@Service +@RequiredArgsConstructor +@ConditionalOnProperty(prefix = "kangaroohy.milo", name = "enabled", havingValue = "true") +public class OpcDataServiceImpl implements OpcDataService { + + private final OpcMessageSend opcMessageSend; + + @Override + public boolean writeData(OpcWriteDataForm form) { + try { + return opcMessageSend.writeDataByFieldName(form.getFieldName(), form.getValue()); + } catch (Exception e) { + log.error("写入 OPC 数据异常,fieldName={}, value={}", form.getFieldName(), form.getValue(), e); + return false; + } + } + + @Override + public boolean batchWriteData(OpcBatchWriteDataForm form) { + try { + Map fieldDataMap = form.getDataList().stream() + .collect(Collectors.toMap( + OpcWriteDataForm::getFieldName, + OpcWriteDataForm::getValue, + (v1, v2) -> v2 // 如果有重复的 key,保留后面的值 + )); + return opcMessageSend.batchWriteDataByFieldName(fieldDataMap); + } catch (Exception e) { + log.error("批量写入 OPC 数据异常", e); + return false; + } + } +} + diff --git a/business/src/main/java/com/fizz/business/service/manager/OpcMessageIdsManager.java b/business/src/main/java/com/fizz/business/service/manager/OpcMessageIdsManager.java index d2a858a..c601e79 100644 --- a/business/src/main/java/com/fizz/business/service/manager/OpcMessageIdsManager.java +++ b/business/src/main/java/com/fizz/business/service/manager/OpcMessageIdsManager.java @@ -14,16 +14,6 @@ import java.util.Map; @Component public class OpcMessageIdsManager { - /** - * 生成业务/消息唯一ID(用于发送批次 bizKey 等) - * 规则:PREFIX_时间戳_8位随机 - */ - public String generateMessageId(String prefix) { - String p = (prefix == null || prefix.trim().isEmpty()) ? "MSG" : prefix.trim(); - String random = java.util.UUID.randomUUID().toString().replace("-", "").substring(0, 8); - return (p + "_" + System.currentTimeMillis() + "_" + random).toUpperCase(); - } - public static List msgTriggers = Lists.newArrayList(); public static Map lineMeasureIds = Maps.newHashMap(); @@ -404,4 +394,43 @@ public class OpcMessageIdsManager { msgTriggers.add(DEVICE_NAME+exitMoveIds.get("trigger")); msgTriggers.add(DEVICE_NAME+exitMeasureIds.get("trigger")); } + + /** + * 通过字段名查找对应的 OPC 节点路径 + * @param fieldName 字段名,例如:coilId + * @return OPC 节点路径,如果未找到返回 null + */ + public static String findIdentifierByFieldName(String fieldName) { + if (fieldName == null || fieldName.isEmpty()) { + return null; + } + + // 在所有映射中查找,按优先级顺序查找(pdiSetupIds 优先,因为这是写数据的主要映射) + Map[] allMaps = new Map[]{ + pdiSetupIds, + entryLineMeasureIds, + procLineMeasureIds, + furLineMeasureIds, + exitLineMeasureIds, + entryMoveIds, + exitCutIds, + exitMoveIds, + exitMeasureIds + }; + + for (Map map : allMaps) { + for (Map.Entry entry : map.entrySet()) { + // 跳过 trigger 键 + if ("trigger".equals(entry.getKey())) { + continue; + } + // 如果字段名匹配,返回对应的节点路径 + if (fieldName.equals(entry.getValue())) { + return entry.getKey(); + } + } + } + + return null; + } } diff --git a/business/src/main/java/com/fizz/business/utils/WebSocketUtil.java b/business/src/main/java/com/fizz/business/utils/WebSocketUtil.java index 6ac1a62..3f1885c 100644 --- a/business/src/main/java/com/fizz/business/utils/WebSocketUtil.java +++ b/business/src/main/java/com/fizz/business/utils/WebSocketUtil.java @@ -9,6 +9,9 @@ import com.fizz.business.constants.enums.WsTypeEnum; import com.fizz.business.domain.msg.AppMeasureEntryMessage; import com.fizz.business.domain.msg.AppMeasureMessage; import com.fizz.business.dto.CoilPositionDTO; +import com.fizz.business.dto.DeviceChartDataDTO; +import com.fizz.business.dto.DeviceHistoryTrendDTO; +import com.fizz.business.dto.DeviceFieldTrendDTO; import com.fizz.business.dto.L1CoilLineMeasureDTO; import com.fizz.business.dto.MatmapDTO; import com.fizz.business.dto.WsSignalMsgDTO; @@ -93,4 +96,52 @@ public class WebSocketUtil { if (CollUtil.isEmpty(list)) return; sendMessage(WsTypeEnum.track_matmap, JSONUtil.toJsonStr(list)); } + + /** + * 推送设备历史趋势数据 + * @param trendData 历史趋势数据 + */ + public static void sendHistoryTrendMessage(DeviceHistoryTrendDTO trendData) { + if (trendData == null) return; + sendMessage(WsTypeEnum.device_history_trend, JSONUtil.toJsonStr(trendData)); + } + + /** + * 推送图表数据(饼图、条形图、折线图、统计数据) + * @param chartData 图表数据 + */ + public static void sendChartDataMessage(DeviceChartDataDTO chartData) { + if (chartData == null) return; + sendMessage(WsTypeEnum.device_chart_data, JSONUtil.toJsonStr(chartData)); + } + + /** + * 推送设备-字段趋势数据 + */ + public static void sendDeviceFieldTrendMessage(DeviceFieldTrendDTO dto) { + if (dto == null) return; + + // 懒加载:仅推送给订阅了该 deviceCode/fieldName 的客户端 + Map clients = trackWsHandler.getClients().getOrDefault(WsTypeEnum.device_field_trend.name(), Maps.newConcurrentMap()); + if (clients.isEmpty()) return; + + String payload = JSONUtil.toJsonStr(dto); + TextMessage message = new TextMessage(payload); + + clients.values().forEach(s -> { + try { + var state = TrackWsHandler.getFieldTrendState(s.getId()); + // 未建立订阅状态时默认不推送(避免全量) + if (state == null) { + return; + } + if (!state.match(dto.getDeviceCode(), dto.getFieldName())) { + return; + } + s.sendMessage(message); + } catch (IOException e) { + log.error("[websocket]向客户端[{}]推送device_field_trend异常", s.getId(), e); + } + }); + } }