二级和app修改

This commit is contained in:
2026-01-05 14:29:32 +08:00
parent d48028a3b4
commit 53f8ccbadd
21 changed files with 1741 additions and 49 deletions

View File

@@ -14,6 +14,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static com.fizz.business.service.manager.OpcMessageIdsManager.findIdentifierByFieldName;
import static com.fizz.business.service.manager.OpcMessageIdsManager.pdiSetupIds;
@Service
@@ -62,56 +63,116 @@ public class OpcMessageSend {
}
/**
* 通用写入方法,用于向指定 OPC 节点写入个值
* @param address OPC 节点地址 (e.g., "ns=2;s=ProcessCGL.PLCLine.ExitCut.cutLength")
* 通用写数据方法:通过字段名向指定点位写入个值
* @param fieldName 字段名对象属性名例如coilId
* @param value 要写入的值
* @return 是否写入成功
*/
public void writeNode(String address, Object value) {
public boolean writeDataByFieldName(String fieldName, Object value) {
try {
String identifier = findIdentifierByFieldName(fieldName);
if (identifier == null) {
log.error("未找到字段名对应的 OPC 节点路径fieldName={}", fieldName);
return false;
}
List<ReadWriteEntity> entities = new ArrayList<>();
entities.add(ReadWriteEntity.builder()
.identifier(address)
.identifier(identifier)
.value(value)
.build());
miloService.writeToOpcUa(entities);
log.info("写入 OPC 成功, node={}, value={}", address, value);
log.info("写入 OPC 数据成功fieldName={}, identifier={}, value={}", fieldName, identifier, value);
return true;
} catch (Exception e) {
log.error("写入 OPC 失败, node={}, value={}, 原因: {}", address, value, e.getMessage(), e);
// 抛出运行时异常,以便上层调用者(如 SendJobServiceImpl可以捕获并处理失败状态
throw new RuntimeException("写入 OPC 失败: " + e.getMessage(), e);
log.error("写入 OPC 数据失败fieldName={}, value={},原因:{}", fieldName, value, e.getMessage(), e);
return false;
}
}
/**
* 写入 OPC 节点(增强版):根据字符串值尝试转换为数值/布尔,再写入
* 规则:
* 1) 先尝试转 Integer
* 2) 再尝试转 Double
* 3) 再尝试转 Booleantrue/false/1/0
* 4) 都不行则按原字符串写入
* 通用写数据方法:向指定点位写入单个值(直接使用节点路径)
* @param identifier OPC 点位标识符(节点路径)
* @param value 要写入的值
* @return 是否写入成功
*/
public void writeNode(String address, String valueRaw) {
Object v = valueRaw;
if (valueRaw != null) {
String s = valueRaw.trim();
// boolean
if ("1".equals(s) || "0".equals(s) || "true".equalsIgnoreCase(s) || "false".equalsIgnoreCase(s)) {
v = ("1".equals(s) || "true".equalsIgnoreCase(s));
} else {
// int
try {
v = Integer.parseInt(s);
} catch (Exception ignore) {
// double
try {
v = Double.parseDouble(s);
} catch (Exception ignore2) {
v = valueRaw;
}
}
}
public boolean writeData(String identifier, Object value) {
try {
List<ReadWriteEntity> entities = new ArrayList<>();
entities.add(ReadWriteEntity.builder()
.identifier(identifier)
.value(value)
.build());
miloService.writeToOpcUa(entities);
log.info("写入 OPC 数据成功identifier={}, value={}", identifier, value);
return true;
} catch (Exception e) {
log.error("写入 OPC 数据失败identifier={}, value={},原因:{}", identifier, value, e.getMessage(), e);
return false;
}
}
/**
* 批量写数据方法:通过字段名向多个点位写入数据
* @param fieldDataMap 字段名和值的映射key 为 fieldNamevalue 为要写入的值
* @return 是否全部写入成功
*/
public boolean batchWriteDataByFieldName(Map<String, Object> fieldDataMap) {
try {
if (fieldDataMap == null || fieldDataMap.isEmpty()) {
log.warn("批量写数据:数据映射为空,跳过写入");
return false;
}
List<ReadWriteEntity> entities = new ArrayList<>();
for (Map.Entry<String, Object> entry : fieldDataMap.entrySet()) {
String fieldName = entry.getKey();
String identifier = findIdentifierByFieldName(fieldName);
if (identifier == null) {
log.warn("未找到字段名对应的 OPC 节点路径fieldName={},跳过该字段", fieldName);
continue;
}
entities.add(ReadWriteEntity.builder()
.identifier(identifier)
.value(entry.getValue())
.build());
}
if (entities.isEmpty()) {
log.warn("批量写数据:没有有效的字段映射,跳过写入");
return false;
}
miloService.writeToOpcUa(entities);
log.info("批量写入 OPC 数据成功,共 {} 个点位", entities.size());
return true;
} catch (Exception e) {
log.error("批量写入 OPC 数据失败,原因:{}", e.getMessage(), e);
return false;
}
}
/**
* 批量写数据方法:向多个点位写入数据(直接使用节点路径)
* @param dataMap 点位标识符和值的映射key 为 identifiervalue 为要写入的值
* @return 是否全部写入成功
*/
public boolean batchWriteData(Map<String, Object> dataMap) {
try {
if (dataMap == null || dataMap.isEmpty()) {
log.warn("批量写数据:数据映射为空,跳过写入");
return false;
}
List<ReadWriteEntity> entities = new ArrayList<>();
for (Map.Entry<String, Object> entry : dataMap.entrySet()) {
entities.add(ReadWriteEntity.builder()
.identifier(entry.getKey())
.value(entry.getValue())
.build());
}
miloService.writeToOpcUa(entities);
log.info("批量写入 OPC 数据成功,共 {} 个点位", entities.size());
return true;
} catch (Exception e) {
log.error("批量写入 OPC 数据失败,原因:{}", e.getMessage(), e);
return false;
}
writeNode(address, v);
}
private List<ReadWriteEntity> getWriteEntities(OpcMessage msg, Map<String,String> msgIds) {
@@ -129,7 +190,7 @@ public class OpcMessageSend {
} catch (NoSuchFieldException | IllegalAccessException e) {
// 处理字段不存在或访问异常,可记录日志或设置默认值
e.printStackTrace();
return new ArrayList<>();
return new ArrayList<>();
}
ReadWriteEntity entity = ReadWriteEntity.builder()

View File

@@ -17,7 +17,7 @@ import java.util.Map;
@Getter
@AllArgsConstructor
public enum WsTypeEnum {
alarm, track_position, track_measure, track_signal, track_matmap,calc_setup_result;
alarm, track_position, track_measure, track_signal, track_matmap, calc_setup_result, device_history_trend, device_chart_data, device_field_trend;
private static final Map<String, WsTypeEnum> MAP = new HashMap<>(8);

View File

@@ -0,0 +1,62 @@
package com.fizz.business.controller;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fizz.business.domain.DeviceSnapshot;
import com.fizz.business.mapper.DeviceSnapshotMapper;
import com.ruoyi.common.annotation.Anonymous;
import com.ruoyi.common.core.domain.R;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.util.List;
@RestController
@RequestMapping("/api/deviceSnapshot")
@Tag(name = "设备数据快照")
@Anonymous
@RequiredArgsConstructor
public class DeviceSnapshotController {
private final DeviceSnapshotMapper deviceSnapshotMapper;
@GetMapping("/latest")
@Operation(description = "获取最新N条设备快照可按deviceCode过滤")
public R<List<DeviceSnapshot>> latest(@RequestParam(value = "limit", defaultValue = "60") Integer limit,
@RequestParam(value = "deviceCode", required = false) String deviceCode) {
LambdaQueryWrapper<DeviceSnapshot> qw = new LambdaQueryWrapper<DeviceSnapshot>()
.orderByDesc(DeviceSnapshot::getCreateTime)
.last("limit " + Math.max(1, Math.min(limit, 500)));
if (deviceCode != null && !deviceCode.isEmpty()) {
qw.eq(DeviceSnapshot::getDeviceCode, deviceCode);
}
return R.ok(deviceSnapshotMapper.selectList(qw));
}
@GetMapping("/range")
@Operation(description = "按时间范围查询设备快照可按deviceCode过滤")
public R<List<DeviceSnapshot>> range(@RequestParam("start") String start,
@RequestParam("end") String end,
@RequestParam(value = "deviceCode", required = false) String deviceCode) {
LocalDateTime startTime = LocalDateTime.parse(start);
LocalDateTime endTime = LocalDateTime.parse(end);
LambdaQueryWrapper<DeviceSnapshot> qw = new LambdaQueryWrapper<DeviceSnapshot>()
.between(DeviceSnapshot::getCreateTime, startTime, endTime)
.orderByAsc(DeviceSnapshot::getCreateTime);
if (deviceCode != null && !deviceCode.isEmpty()) {
qw.eq(DeviceSnapshot::getDeviceCode, deviceCode);
}
return R.ok(deviceSnapshotMapper.selectList(qw));
}
}

View File

@@ -0,0 +1,69 @@
package com.fizz.business.controller;
import com.fizz.business.form.OpcBatchWriteDataForm;
import com.fizz.business.form.OpcWriteDataForm;
import com.fizz.business.service.OpcDataService;
import com.ruoyi.common.annotation.Anonymous;
import com.ruoyi.common.core.domain.R;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.web.bind.annotation.*;
import javax.validation.Valid;
/**
* OPC 数据读写控制器
*/
@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/opc/data")
@Tag(name = "OPC 数据读写")
@ConditionalOnProperty(prefix = "kangaroohy.milo", name = "enabled", havingValue = "true")
@Anonymous
public class OpcDataController {
private final OpcDataService opcDataService;
/**
* 向单个点位写入数据(通过字段名)
*/
@Operation(summary = "向单个点位写入数据", description = "通过字段名向指定的 OPC 点位写入数据")
@PostMapping("/write")
public R<?> writeData(@Valid @RequestBody OpcWriteDataForm form) {
try {
boolean success = opcDataService.writeData(form);
if (success) {
return R.ok("写入成功");
} else {
return R.fail("写入失败,可能是字段名未找到对应的 OPC 节点路径,请查看日志");
}
} catch (Exception e) {
log.error("写入 OPC 数据异常", e);
return R.fail("写入异常:" + e.getMessage());
}
}
/**
* 批量向多个点位写入数据(通过字段名)
*/
@Operation(summary = "批量向多个点位写入数据", description = "通过字段名向多个 OPC 点位批量写入数据")
@PostMapping("/batchWrite")
public R<?> batchWriteData(@Valid @RequestBody OpcBatchWriteDataForm form) {
try {
boolean success = opcDataService.batchWriteData(form);
if (success) {
return R.ok("批量写入成功,共 " + form.getDataList().size() + " 个点位");
} else {
return R.fail("批量写入失败,可能是部分字段名未找到对应的 OPC 节点路径,请查看日志");
}
} catch (Exception e) {
log.error("批量写入 OPC 数据异常", e);
return R.fail("批量写入异常:" + e.getMessage());
}
}
}

View File

@@ -0,0 +1,48 @@
package com.fizz.business.domain;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import org.springframework.format.annotation.DateTimeFormat;
import java.time.LocalDateTime;
@Data
@TableName("device_snapshot")
@Schema(description = "设备数据快照")
public class DeviceSnapshot {
@TableId(type = IdType.ASSIGN_ID)
@Schema(description = "主键")
private Long id;
@Schema(description = "快照时间")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime createTime;
@Schema(description = "设备idx对应DeviceEnum.idx")
private Integer deviceId;
@Schema(description = "设备枚举名对应DeviceEnum.name()")
private String deviceCode;
@Schema(description = "设备描述")
private String deviceName;
@Schema(description = "段类型 ENTRY/PROCESS/EXIT")
private String sectionType;
@Schema(description = "来源类型 ENTRY/FURNACE/COAT/EXIT")
private String sourceType;
@Schema(description = "基准位置(m)")
private Double basePosition;
@Schema(description = "快照数据JSON")
private String snapshotData;
}

View File

@@ -0,0 +1,153 @@
package com.fizz.business.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
/**
* 设备图表数据 DTO
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "设备图表数据")
public class DeviceChartDataDTO implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "设备类型分布饼图数据")
private PieChartData deviceTypePieChart;
@Schema(description = "数据来源分布条形图数据")
private BarChartData sourceTypeBarChart;
@Schema(description = "同类型设备对比折线图数据(单位相同)")
private LineChartData sameTypeDeviceCompareChart;
@Schema(description = "设备统计数据")
private List<DeviceStatistics> deviceStatistics;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "饼图数据")
public static class PieChartData implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "数据系列")
private List<PieSeries> series;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "饼图系列")
public static class PieSeries implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "名称")
private String name;
@Schema(description = "数值")
private Double data;
@Schema(description = "颜色")
private String color;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "条形图数据")
public static class BarChartData implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "分类轴")
private List<String> categories;
@Schema(description = "数据系列")
private List<BarSeries> series;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "条形图系列")
public static class BarSeries implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "系列名称")
private String name;
@Schema(description = "数据值列表")
private List<Double> data;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "折线图数据")
public static class LineChartData implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "时间轴")
private List<String> categories;
@Schema(description = "数据系列")
private List<LineSeries> series;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "折线图系列")
public static class LineSeries implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "系列名称")
private String name;
@Schema(description = "数据值列表")
private List<Double> data;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "设备统计数据")
public static class DeviceStatistics implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "设备代码")
private String deviceCode;
@Schema(description = "设备名称")
private String deviceName;
@Schema(description = "平均值")
private String avg;
@Schema(description = "最大值")
private String max;
@Schema(description = "最小值")
private String min;
@Schema(description = "当前值")
private String current;
}
}

View File

@@ -0,0 +1,55 @@
package com.fizz.business.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.List;
/**
* 单设备-单字段的趋势与统计
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "设备字段趋势数据")
public class DeviceFieldTrendDTO implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "设备代码")
private String deviceCode;
@Schema(description = "设备名称")
private String deviceName;
@Schema(description = "字段名")
private String fieldName;
@Schema(description = "字段中文名(可选)")
private String fieldLabel;
@Schema(description = "单位(可选)")
private String unit;
@Schema(description = "时间轴(HH:mm)")
private List<String> categories;
@Schema(description = "数据点")
private List<Double> data;
@Schema(description = "统计-平均值")
private Double avg;
@Schema(description = "统计-最大值")
private Double max;
@Schema(description = "统计-最小值")
private Double min;
@Schema(description = "统计-最新值")
private Double last;
}

View File

@@ -0,0 +1,26 @@
package com.fizz.business.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
* WS 订阅:设备字段趋势
*/
@Data
@Schema(description = "设备字段趋势订阅请求")
public class DeviceFieldTrendSubscribeReq implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "设备代码列表,为空表示不变")
private List<String> deviceCodes;
@Schema(description = "字段名列表,为空表示不变")
private List<String> fieldNames;
@Schema(description = "是否只推送可见范围(用于懒加载)true=只推 deviceCodes/fieldNames 命中的数据")
private Boolean lazy;
}

View File

@@ -0,0 +1,54 @@
package com.fizz.business.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
/**
* 设备历史趋势数据 DTO
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "设备历史趋势数据")
public class DeviceHistoryTrendDTO implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "设备代码")
private String deviceCode;
@Schema(description = "设备描述")
private String deviceName;
@Schema(description = "时间轴格式HH:mm")
private List<String> categories;
@Schema(description = "数据系列key为字段名value为数据值列表")
private List<SeriesData> series;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "数据系列")
public static class SeriesData implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "系列名称(字段标签)")
private String name;
@Schema(description = "字段名")
private String fieldName;
@Schema(description = "数据值列表")
private List<Double> data;
}
}

View File

@@ -0,0 +1,27 @@
package com.fizz.business.form;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import javax.validation.Valid;
import javax.validation.constraints.NotEmpty;
import java.io.Serializable;
import java.util.List;
/**
* OPC 批量写数据请求表单
*/
@Data
@Schema(description = "OPC 批量写数据请求")
public class OpcBatchWriteDataForm implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 批量写数据列表
*/
@Schema(description = "批量写数据列表")
@NotEmpty(message = "写数据列表不能为空")
@Valid
private List<OpcWriteDataForm> dataList;
}

View File

@@ -0,0 +1,35 @@
package com.fizz.business.form;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
/**
* OPC 写数据请求表单
*/
@Data
@Schema(description = "OPC 写数据请求")
public class OpcWriteDataForm implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 字段名对象属性名例如coilId
* 系统会自动查找对应的 OPC 节点路径
*/
@Schema(description = "字段名(对象属性名)", example = "coilId")
@NotBlank(message = "字段名不能为空")
private String fieldName;
/**
* 要写入的值支持多种类型String, Integer, Double, Boolean 等)
*/
@Schema(description = "要写入的值", example = "COIL001")
@NotNull(message = "写入值不能为空")
private Object value;
}

View File

@@ -0,0 +1,10 @@
package com.fizz.business.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.fizz.business.domain.DeviceSnapshot;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface DeviceSnapshotMapper extends BaseMapper<DeviceSnapshot> {
}

View File

@@ -0,0 +1,40 @@
package com.fizz.business.schedule;
import com.fizz.business.service.DeviceSnapshotService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* 设备数据快照定时任务
* 每10分钟执行一次设备数据快照
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class DeviceSnapshotSchedule {
private final DeviceSnapshotService deviceSnapshotService;
/**
* 每10分钟执行一次设备数据快照
* 使用cron表达式0 0/10 * * * ?
* 表示每10分钟执行一次在每分钟的0秒触发
*/
@Scheduled(cron = "0 0/10 * * * ?")
public void captureDeviceSnapshots() {
try {
log.info("开始执行设备数据快照任务...");
long startTime = System.currentTimeMillis();
// 创建设备数据快照
deviceSnapshotService.createDeviceSnapshots();
long endTime = System.currentTimeMillis();
log.info("设备数据快照任务执行完成,耗时:{}ms", (endTime - startTime));
} catch (Exception e) {
log.error("执行设备数据快照任务失败: {}", e.getMessage(), e);
}
}
}

View File

@@ -0,0 +1,787 @@
package com.fizz.business.service;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fizz.business.constants.enums.DeviceEnum;
import com.fizz.business.constants.enums.WsTypeEnum;
import com.fizz.business.domain.DeviceSnapshot;
import com.fizz.business.domain.msg.*;
import com.fizz.business.dto.DeviceChartDataDTO;
import com.fizz.business.dto.DeviceFieldTrendDTO;
import com.fizz.business.dto.DeviceHistoryTrendDTO;
import com.fizz.business.mapper.DeviceSnapshotMapper;
import com.fizz.business.utils.WebSocketUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* 设备数据快照服务
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class DeviceSnapshotService {
// 使用ConcurrentHashMap存储最新的测量数据
private final Map<Class<?>, OpcMessage> latestMeasurements = new ConcurrentHashMap<>();
private final DeviceSnapshotMapper deviceSnapshotMapper;
private final ObjectMapper objectMapper;
/**
* 更新最新的测量数据
* @param message 测量数据消息
*/
public void updateLatestMeasurement(OpcMessage message) {
if (message != null) {
latestMeasurements.put(message.getClass(), message);
}
}
/**
* 创建当前所有设备的数据快照
*/
@Transactional(rollbackFor = Exception.class)
public void createDeviceSnapshots() {
try {
// 获取当前时间戳
LocalDateTime now = LocalDateTime.now();
// 获取最新的测量数据
AppMeasureEntryMessage entry = (AppMeasureEntryMessage) latestMeasurements.get(AppMeasureEntryMessage.class);
AppMeasureFurnaceMessage furnace = (AppMeasureFurnaceMessage) latestMeasurements.get(AppMeasureFurnaceMessage.class);
AppMeasureCoatMessage coat = (AppMeasureCoatMessage) latestMeasurements.get(AppMeasureCoatMessage.class);
AppMeasureExitMessage exit = (AppMeasureExitMessage) latestMeasurements.get(AppMeasureExitMessage.class);
if (entry == null && furnace == null && coat == null && exit == null) {
log.warn("没有可用的测量数据,跳过快照创建");
return;
}
// 为每个设备创建快照
List<DeviceSnapshot> snapshots = new ArrayList<>();
for (DeviceEnum device : DeviceEnum.values()) {
try {
DeviceSnapshot snapshot = createDeviceSnapshot(device, now, entry, furnace, coat, exit);
if (snapshot != null) {
snapshots.add(snapshot);
}
} catch (Exception e) {
log.error("创建设备[{}]快照失败: {}", device.name(), e.getMessage(), e);
}
}
// 批量保存快照
if (!snapshots.isEmpty()) {
for (DeviceSnapshot snapshot : snapshots) {
deviceSnapshotMapper.insert(snapshot);
}
log.info("成功创建{}个设备数据快照", snapshots.size());
// 推送所有设备的历史趋势数据
pushAllDevicesHistoryTrend();
// 推送设备统计数据
pushAllChartData();
// 推送每个设备每个字段的趋势数据(供前端画“点位趋势小图”)
pushAllDeviceFieldTrends();
}
} catch (Exception e) {
log.error("创建设备数据快照失败: {}", e.getMessage(), e);
throw e;
}
}
/**
* 推送所有设备的历史趋势数据今天一天的数据10分钟间隔
*/
public void pushAllDevicesHistoryTrend() {
try {
for (DeviceEnum device : DeviceEnum.values()) {
DeviceHistoryTrendDTO trendData = getDeviceHistoryTrend(device.name());
if (trendData != null && trendData.getSeries() != null && !trendData.getSeries().isEmpty()) {
WebSocketUtil.sendHistoryTrendMessage(trendData);
}
}
} catch (Exception e) {
log.error("推送设备历史趋势数据失败: {}", e.getMessage(), e);
}
}
/**
* 推送设备统计数据
*/
public void pushAllChartData() {
try {
DeviceChartDataDTO chartData = buildAllChartData();
if (chartData != null) {
WebSocketUtil.sendChartDataMessage(chartData);
}
} catch (Exception e) {
log.error("推送统计数据失败: {}", e.getMessage(), e);
}
}
/**
* 推送每个设备每个字段的趋势数据今天范围10分钟间隔采样
*/
public void pushAllDeviceFieldTrends() {
try {
for (DeviceEnum device : DeviceEnum.values()) {
List<String> fields = device.getParamFields();
if (fields == null || fields.isEmpty()) continue;
for (String field : fields) {
DeviceFieldTrendDTO dto = getDeviceFieldTrend(device.name(), field);
if (dto != null) {
WebSocketUtil.sendDeviceFieldTrendMessage(dto);
}
}
}
} catch (Exception e) {
log.error("推送设备字段趋势失败: {}", e.getMessage(), e);
}
}
/**
* 获取 单设备-单字段 今日趋势与统计10分钟采样
*/
public DeviceFieldTrendDTO getDeviceFieldTrend(String deviceCode, String fieldName) {
try {
DeviceEnum device = DeviceEnum.fromName(deviceCode);
if (device == null) return null;
// 今天范围
LocalDate today = LocalDate.now();
LocalDateTime startTime = today.atStartOfDay();
LocalDateTime endTime = today.atTime(LocalTime.MAX);
// 查询今天的快照
LambdaQueryWrapper<DeviceSnapshot> qw = new LambdaQueryWrapper<DeviceSnapshot>()
.eq(DeviceSnapshot::getDeviceCode, deviceCode)
.between(DeviceSnapshot::getCreateTime, startTime, endTime)
.orderByAsc(DeviceSnapshot::getCreateTime);
List<DeviceSnapshot> rows = deviceSnapshotMapper.selectList(qw);
if (rows == null || rows.isEmpty()) return null;
// 10分钟采样
List<DeviceSnapshot> sampled = sampleDataByInterval(rows, 10);
DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm");
List<String> categories = new ArrayList<>(sampled.size());
List<Double> data = new ArrayList<>(sampled.size());
for (DeviceSnapshot s : sampled) {
categories.add(s.getCreateTime().format(timeFormatter));
double v = 0.0;
try {
Map<String, Object> json = objectMapper.readValue(s.getSnapshotData(), Map.class);
Object raw = json.get(fieldName);
if (raw instanceof Number) v = ((Number) raw).doubleValue();
} catch (Exception ignore) {
}
data.add(v);
}
// 统计忽略全0
List<Double> valid = new ArrayList<>();
for (Double v : data) {
if (v != null && v != 0.0) valid.add(v);
}
Double avg = null, max = null, min = null;
if (!valid.isEmpty()) {
double sum = 0;
max = valid.get(0);
min = valid.get(0);
for (Double v : valid) {
sum += v;
if (v > max) max = v;
if (v < min) min = v;
}
avg = sum / valid.size();
}
Double last = data.isEmpty() ? null : data.get(data.size() - 1);
return DeviceFieldTrendDTO.builder()
.deviceCode(deviceCode)
.deviceName(device.getDesc())
.fieldName(fieldName)
// label/unit 目前后端没有统一来源,这里先留空,前端可用 fieldMeta 补齐
.fieldLabel(null)
.unit(null)
.categories(categories)
.data(data)
.avg(avg)
.max(max)
.min(min)
.last(last)
.build();
} catch (Exception e) {
log.error("获取设备字段趋势失败, deviceCode={}, fieldName={}, err={}", deviceCode, fieldName, e.getMessage(), e);
return null;
}
}
/**
* 构建设备图表数据
*/
private DeviceChartDataDTO buildAllChartData() {
try {
// 1. 设备类型分布饼图
DeviceChartDataDTO.PieChartData pieChartData = buildDeviceTypePieChart();
// 2. 数据来源分布条形图
DeviceChartDataDTO.BarChartData barChartData = buildSourceTypeBarChart();
// 3. 同类型设备对比折线图(只对比单位相同的设备)
DeviceChartDataDTO.LineChartData lineChartData = buildSameTypeDeviceCompareChart();
// 4. 设备统计数据
List<DeviceChartDataDTO.DeviceStatistics> statistics = buildDeviceStatistics();
return DeviceChartDataDTO.builder()
.deviceTypePieChart(pieChartData)
.sourceTypeBarChart(barChartData)
.sameTypeDeviceCompareChart(lineChartData)
.deviceStatistics(statistics)
.build();
} catch (Exception e) {
log.error("构建图表数据失败: {}", e.getMessage(), e);
return null;
}
}
/**
* 构建设备类型分布饼图数据
*/
private DeviceChartDataDTO.PieChartData buildDeviceTypePieChart() {
Map<String, Integer> distribution = new HashMap<>();
distribution.put("入口段", 0);
distribution.put("处理段", 0);
distribution.put("出口段", 0);
distribution.put("其他", 0);
for (DeviceEnum device : DeviceEnum.values()) {
String sectionType = device.getSectionType().name();
if ("ENTRY".equals(sectionType)) {
distribution.put("入口段", distribution.get("入口段") + 1);
} else if ("PROCESS".equals(sectionType)) {
distribution.put("处理段", distribution.get("处理段") + 1);
} else if ("EXIT".equals(sectionType)) {
distribution.put("出口段", distribution.get("出口段") + 1);
} else {
distribution.put("其他", distribution.get("其他") + 1);
}
}
String[] colors = {"#0066cc", "#409eff", "#66b1ff", "#a0cfff"};
List<DeviceChartDataDTO.PieSeries> series = new ArrayList<>();
int colorIndex = 0;
for (Map.Entry<String, Integer> entry : distribution.entrySet()) {
if (entry.getValue() > 0) {
series.add(DeviceChartDataDTO.PieSeries.builder()
.name(entry.getKey())
.data(entry.getValue().doubleValue())
.color(colors[colorIndex % colors.length])
.build());
colorIndex++;
}
}
return DeviceChartDataDTO.PieChartData.builder()
.series(series)
.build();
}
/**
* 构建数据来源分布条形图数据
*/
private DeviceChartDataDTO.BarChartData buildSourceTypeBarChart() {
Map<String, Integer> distribution = new HashMap<>();
distribution.put("ENTRY", 0);
distribution.put("FURNACE", 0);
distribution.put("COAT", 0);
distribution.put("EXIT", 0);
for (DeviceEnum device : DeviceEnum.values()) {
String sourceType = device.getSourceType().name();
if (distribution.containsKey(sourceType)) {
distribution.put(sourceType, distribution.get(sourceType) + 1);
}
}
List<String> categories = new ArrayList<>();
List<Double> data = new ArrayList<>();
for (Map.Entry<String, Integer> entry : distribution.entrySet()) {
if (entry.getValue() > 0) {
categories.add(entry.getKey());
data.add(entry.getValue().doubleValue());
}
}
return DeviceChartDataDTO.BarChartData.builder()
.categories(categories)
.series(Collections.singletonList(
DeviceChartDataDTO.BarSeries.builder()
.name("设备数量")
.data(data)
.build()
))
.build();
}
/**
* 构建同类型设备对比折线图数据(只对比单位相同的设备)
*/
private DeviceChartDataDTO.LineChartData buildSameTypeDeviceCompareChart() {
// 获取今天的数据
LocalDate today = LocalDate.now();
LocalDateTime startTime = today.atStartOfDay();
LocalDateTime endTime = today.atTime(LocalTime.MAX);
// 按数据来源分组,同一来源的设备单位相同,可以对比
Map<String, List<DeviceEnum>> devicesBySource = new HashMap<>();
for (DeviceEnum device : DeviceEnum.values()) {
if (device.getParamFields() == null || device.getParamFields().isEmpty()) {
continue;
}
String sourceType = device.getSourceType().name();
devicesBySource.computeIfAbsent(sourceType, k -> new ArrayList<>()).add(device);
}
// 选择设备数量最多的数据源取前5个设备进行对比
String selectedSource = devicesBySource.entrySet().stream()
.max(Map.Entry.comparingByValue((a, b) -> Integer.compare(a.size(), b.size())))
.map(Map.Entry::getKey)
.orElse(null);
if (selectedSource == null || devicesBySource.get(selectedSource).isEmpty()) {
return null;
}
List<DeviceEnum> selectedDevices = devicesBySource.get(selectedSource).stream()
.limit(5)
.collect(Collectors.toList());
List<String> categories = new ArrayList<>();
List<DeviceChartDataDTO.LineSeries> series = new ArrayList<>();
// 生成时间轴最近10个时间点每10分钟一个
DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm");
LocalDateTime now = LocalDateTime.now();
for (int i = 9; i >= 0; i--) {
LocalDateTime time = now.minusMinutes(i * 10L);
categories.add(time.format(timeFormatter));
}
// 为每个设备生成数据系列
for (DeviceEnum device : selectedDevices) {
if (device.getParamFields() == null || device.getParamFields().isEmpty()) {
continue;
}
// 获取该设备今天的历史数据
LambdaQueryWrapper<DeviceSnapshot> qw = new LambdaQueryWrapper<DeviceSnapshot>()
.eq(DeviceSnapshot::getDeviceCode, device.name())
.between(DeviceSnapshot::getCreateTime, startTime, endTime)
.orderByAsc(DeviceSnapshot::getCreateTime);
List<DeviceSnapshot> rows = deviceSnapshotMapper.selectList(qw);
List<DeviceSnapshot> sampledRows = sampleDataByInterval(rows, 10);
// 使用第一个字段的数据
String firstField = device.getParamFields().get(0);
List<Double> data = new ArrayList<>();
// 填充数据如果历史数据不足用0填充
for (int i = 0; i < 10; i++) {
if (i < sampledRows.size()) {
try {
Map<String, Object> jsonData = objectMapper.readValue(
sampledRows.get(i).getSnapshotData(), Map.class);
Object value = jsonData.get(firstField);
double numValue = 0.0;
if (value instanceof Number) {
numValue = ((Number) value).doubleValue();
}
data.add(numValue);
} catch (Exception e) {
data.add(0.0);
}
} else {
data.add(0.0);
}
}
series.add(DeviceChartDataDTO.LineSeries.builder()
.name(device.getDesc())
.data(data)
.build());
}
if (series.isEmpty()) {
return null;
}
return DeviceChartDataDTO.LineChartData.builder()
.categories(categories)
.series(series)
.build();
}
/**
* 构建设备统计数据
*/
private List<DeviceChartDataDTO.DeviceStatistics> buildDeviceStatistics() {
List<DeviceChartDataDTO.DeviceStatistics> statistics = new ArrayList<>();
LocalDate today = LocalDate.now();
LocalDateTime startTime = today.atStartOfDay();
LocalDateTime endTime = today.atTime(LocalTime.MAX);
// 获取最新的测量数据
AppMeasureEntryMessage entry = (AppMeasureEntryMessage) latestMeasurements.get(AppMeasureEntryMessage.class);
AppMeasureFurnaceMessage furnace = (AppMeasureFurnaceMessage) latestMeasurements.get(AppMeasureFurnaceMessage.class);
AppMeasureCoatMessage coat = (AppMeasureCoatMessage) latestMeasurements.get(AppMeasureCoatMessage.class);
AppMeasureExitMessage exit = (AppMeasureExitMessage) latestMeasurements.get(AppMeasureExitMessage.class);
for (DeviceEnum device : DeviceEnum.values()) {
if (device.getParamFields() == null || device.getParamFields().isEmpty()) {
continue;
}
String firstField = device.getParamFields().get(0);
// 获取当前值
double currentValue = 0.0;
switch (device.getSourceType()) {
case ENTRY:
if (entry != null) {
currentValue = getFieldValue(entry, firstField);
}
break;
case FURNACE:
if (furnace != null) {
currentValue = getFieldValue(furnace, firstField);
}
break;
case COAT:
if (coat != null) {
currentValue = getFieldValue(coat, firstField);
}
break;
case EXIT:
if (exit != null) {
currentValue = getFieldValue(exit, firstField);
}
break;
}
// 从历史数据中计算统计值
LambdaQueryWrapper<DeviceSnapshot> qw = new LambdaQueryWrapper<DeviceSnapshot>()
.eq(DeviceSnapshot::getDeviceCode, device.name())
.between(DeviceSnapshot::getCreateTime, startTime, endTime)
.orderByAsc(DeviceSnapshot::getCreateTime);
List<DeviceSnapshot> rows = deviceSnapshotMapper.selectList(qw);
List<Double> values = new ArrayList<>();
for (DeviceSnapshot snapshot : rows) {
try {
Map<String, Object> jsonData = objectMapper.readValue(
snapshot.getSnapshotData(), Map.class);
Object value = jsonData.get(firstField);
if (value instanceof Number) {
double numValue = ((Number) value).doubleValue();
if (numValue > 0) {
values.add(numValue);
}
}
} catch (Exception e) {
// ignore
}
}
String avg = "";
String max = "";
String min = "";
if (!values.isEmpty()) {
double avgValue = values.stream().mapToDouble(Double::doubleValue).average().orElse(0.0);
double maxValue = values.stream().mapToDouble(Double::doubleValue).max().orElse(0.0);
double minValue = values.stream().mapToDouble(Double::doubleValue).min().orElse(0.0);
avg = String.format("%.2f", avgValue);
max = String.format("%.2f", maxValue);
min = String.format("%.2f", minValue);
}
statistics.add(DeviceChartDataDTO.DeviceStatistics.builder()
.deviceCode(device.name())
.deviceName(device.getDesc())
.avg(avg)
.max(max)
.min(min)
.current(currentValue > 0 ? String.format("%.2f", currentValue) : "")
.build());
}
return statistics;
}
/**
* 获取字段值
*/
private double getFieldValue(Object message, String fieldName) {
try {
java.lang.reflect.Field field = message.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
Object value = field.get(message);
if (value instanceof Number) {
return ((Number) value).doubleValue();
}
} catch (Exception e) {
// ignore
}
return 0.0;
}
/**
* 获取设备今天一天的历史趋势数据10分钟间隔采样
* @param deviceCode 设备代码
* @return 历史趋势数据
*/
public DeviceHistoryTrendDTO getDeviceHistoryTrend(String deviceCode) {
try {
// 获取今天开始和结束时间
LocalDate today = LocalDate.now();
LocalDateTime startTime = today.atStartOfDay();
LocalDateTime endTime = today.atTime(LocalTime.MAX);
// 查询今天的所有快照数据
LambdaQueryWrapper<DeviceSnapshot> qw = new LambdaQueryWrapper<DeviceSnapshot>()
.eq(DeviceSnapshot::getDeviceCode, deviceCode)
.between(DeviceSnapshot::getCreateTime, startTime, endTime)
.orderByAsc(DeviceSnapshot::getCreateTime);
List<DeviceSnapshot> rows = deviceSnapshotMapper.selectList(qw);
if (rows == null || rows.isEmpty()) {
return null;
}
// 按10分钟间隔采样数据
List<DeviceSnapshot> sampledRows = sampleDataByInterval(rows, 10);
// 获取设备信息
DeviceEnum device = DeviceEnum.valueOf(deviceCode);
if (device == null) {
return null;
}
// 构建历史趋势数据
List<String> categories = new ArrayList<>();
Map<String, List<Double>> seriesMap = new HashMap<>();
List<String> fields = device.getParamFields();
if (fields == null || fields.isEmpty()) {
return null;
}
// 初始化系列数据
for (String field : fields) {
seriesMap.put(field, new ArrayList<>());
}
// 处理采样后的数据
DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm");
for (DeviceSnapshot snapshot : sampledRows) {
categories.add(snapshot.getCreateTime().format(timeFormatter));
Map<String, Object> jsonData = new HashMap<>();
try {
jsonData = objectMapper.readValue(snapshot.getSnapshotData(), Map.class);
} catch (Exception e) {
log.warn("解析快照数据失败: {}", snapshot.getSnapshotData());
}
for (String field : fields) {
Object value = jsonData.get(field);
double numValue = 0.0;
if (value instanceof Number) {
numValue = ((Number) value).doubleValue();
}
seriesMap.get(field).add(numValue);
}
}
// 构建系列数据最多6个字段
List<DeviceHistoryTrendDTO.SeriesData> series = fields.stream()
.limit(6)
.map(field -> DeviceHistoryTrendDTO.SeriesData.builder()
.fieldName(field)
.name(field) // 前端可以根据字段元数据替换为标签
.data(seriesMap.get(field))
.build())
.collect(Collectors.toList());
return DeviceHistoryTrendDTO.builder()
.deviceCode(deviceCode)
.deviceName(device.getDesc())
.categories(categories)
.series(series)
.build();
} catch (Exception e) {
log.error("获取设备[{}]历史趋势数据失败: {}", deviceCode, e.getMessage(), e);
return null;
}
}
/**
* 按时间间隔采样数据每N分钟取一条
* @param rows 原始数据列表
* @param intervalMinutes 间隔分钟数
* @return 采样后的数据列表
*/
private List<DeviceSnapshot> sampleDataByInterval(List<DeviceSnapshot> rows, int intervalMinutes) {
if (rows == null || rows.isEmpty()) {
return new ArrayList<>();
}
List<DeviceSnapshot> sampled = new ArrayList<>();
LocalDateTime lastTime = null;
for (DeviceSnapshot row : rows) {
LocalDateTime rowTime = row.getCreateTime();
if (lastTime == null) {
// 第一条数据
sampled.add(row);
lastTime = rowTime;
} else {
// 计算时间差(分钟)
long diffMinutes = java.time.Duration.between(lastTime, rowTime).toMinutes();
// 如果时间差大于等于间隔,则采样这条数据
if (diffMinutes >= intervalMinutes) {
sampled.add(row);
lastTime = rowTime;
}
}
}
return sampled;
}
/**
* 创建设备快照
*/
private DeviceSnapshot createDeviceSnapshot(DeviceEnum device, LocalDateTime timestamp,
AppMeasureEntryMessage entry,
AppMeasureFurnaceMessage furnace,
AppMeasureCoatMessage coat,
AppMeasureExitMessage exit) {
try {
// 创建快照对象
DeviceSnapshot snapshot = new DeviceSnapshot();
snapshot.setCreateTime(timestamp);
snapshot.setDeviceId(device.getIdx());
snapshot.setDeviceCode(device.name());
snapshot.setDeviceName(device.getDesc());
snapshot.setSectionType(device.getSectionType().name());
snapshot.setSourceType(device.getSourceType().name());
snapshot.setBasePosition(device.getBasePosition());
// 根据设备类型获取对应的测量数据
ObjectNode dataNode = objectMapper.createObjectNode();
switch (device.getSourceType()) {
case ENTRY:
if (entry != null) {
extractData(entry, device.getParamFields(), dataNode);
}
break;
case FURNACE:
if (furnace != null) {
extractData(furnace, device.getParamFields(), dataNode);
}
break;
case COAT:
if (coat != null) {
extractData(coat, device.getParamFields(), dataNode);
}
break;
case EXIT:
if (exit != null) {
extractData(exit, device.getParamFields(), dataNode);
}
break;
}
// 如果没有数据,则跳过
if (dataNode.isEmpty()) {
return null;
}
// 设置快照数据
snapshot.setSnapshotData(dataNode.toString());
return snapshot;
} catch (Exception e) {
log.error("创建设备[{}]快照时发生错误: {}", device.name(), e.getMessage(), e);
return null;
}
}
/**
* 从消息对象中提取指定字段的数据
*/
private void extractData(Object message, List<String> fields, ObjectNode dataNode) {
if (message == null || fields == null || fields.isEmpty()) {
return;
}
try {
for (String field : fields) {
try {
// 使用反射获取字段值
java.lang.reflect.Field declaredField = message.getClass().getDeclaredField(field);
declaredField.setAccessible(true);
Object value = declaredField.get(message);
// 处理不同类型的值
if (value != null) {
if (value instanceof Number) {
dataNode.put(field, (Number) value);
} else if (value instanceof Boolean) {
dataNode.put(field, (Boolean) value);
} else if (value instanceof String) {
dataNode.put(field, (String) value);
} else if (value instanceof Enum) {
dataNode.put(field, value.toString());
}
}
} catch (NoSuchFieldException e) {
log.trace("字段[{}]在消息[{}]中不存在", field, message.getClass().getSimpleName());
} catch (Exception e) {
log.warn("获取字段[{}]值失败: {}", field, e.getMessage());
}
}
} catch (Exception e) {
log.error("提取数据时发生错误: {}", e.getMessage(), e);
}
}
}

View File

@@ -0,0 +1,25 @@
package com.fizz.business.service;
import com.fizz.business.form.OpcBatchWriteDataForm;
import com.fizz.business.form.OpcWriteDataForm;
/**
* OPC 数据读写服务接口
*/
public interface OpcDataService {
/**
* 向单个点位写入数据(通过字段名)
* @param form 写数据请求表单
* @return 是否写入成功
*/
boolean writeData(OpcWriteDataForm form);
/**
* 批量向多个点位写入数据(通过字段名)
* @param form 批量写数据请求表单
* @return 是否全部写入成功
*/
boolean batchWriteData(OpcBatchWriteDataForm form);
}

View File

@@ -0,0 +1,45 @@
package com.fizz.business.service.client;
import lombok.Data;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
/**
* 每个 WS session 的订阅状态(懒加载用)
*/
@Data
public class DeviceFieldTrendSessionState {
/**
* 订阅的 deviceCode 集合;为空代表未订阅任何(不推送)
*/
private Set<String> deviceCodes = new HashSet<>();
/**
* 订阅的 fieldName 集合;为空代表未订阅任何(不推送)
*/
private Set<String> fieldNames = new HashSet<>();
/**
* 是否启用懒加载过滤
*/
private boolean lazy = true;
public boolean match(String deviceCode, String fieldName) {
if (!lazy) return true;
if (deviceCodes != null && !deviceCodes.isEmpty() && !deviceCodes.contains(deviceCode)) return false;
if (fieldNames != null && !fieldNames.isEmpty() && !fieldNames.contains(fieldName)) return false;
return true;
}
public static DeviceFieldTrendSessionState emptyLazyOn() {
DeviceFieldTrendSessionState s = new DeviceFieldTrendSessionState();
s.setLazy(true);
s.setDeviceCodes(Collections.emptySet());
s.setFieldNames(Collections.emptySet());
return s;
}
}

View File

@@ -1,5 +1,8 @@
package com.fizz.business.service.client;
import cn.hutool.json.JSONUtil;
import com.fizz.business.constants.enums.WsTypeEnum;
import com.fizz.business.dto.DeviceFieldTrendSubscribeReq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
@@ -7,8 +10,7 @@ import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -19,8 +21,22 @@ import java.util.concurrent.ConcurrentHashMap;
@Component
public class TrackWsHandler extends AbstractWebSocketHandler {
public static DeviceFieldTrendSessionState getFieldTrendState(String sessionId) {
return fieldTrendStates.get(sessionId);
}
public static Map<String, DeviceFieldTrendSessionState> getAllFieldTrendStates() {
return fieldTrendStates;
}
private static final ConcurrentHashMap<String, Map<String, WebSocketSession>> clients = new ConcurrentHashMap<>();
/**
* 懒加载订阅状态:只用于 device_field_trend
* sessionId -> state
*/
private static final ConcurrentHashMap<String, DeviceFieldTrendSessionState> fieldTrendStates = new ConcurrentHashMap<>();
/**
* socket 建立成功事件
*
@@ -38,6 +54,12 @@ public class TrackWsHandler extends AbstractWebSocketHandler {
sessionMap.put(sid, session);
// 添加 session
clients.put(type, sessionMap);
// device_field_trend 默认启用懒加载:不主动推送,等前端发送订阅消息
if (WsTypeEnum.device_field_trend.name().equals(type)) {
fieldTrendStates.put(sid, DeviceFieldTrendSessionState.emptyLazyOn());
}
log.info("[websocket]建立连接:{}", type + sid);
} catch (Exception e) {
log.error("[websocket]建立连接错误:{}", e.getMessage(), e);
@@ -58,6 +80,35 @@ public class TrackWsHandler extends AbstractWebSocketHandler {
// 心跳
if (payload.equals("ping")) {
session.sendMessage(new TextMessage("pong"));
return;
}
// device_field_trend 订阅(懒加载)
String type = (String) session.getAttributes().get("type");
if (WsTypeEnum.device_field_trend.name().equals(type)) {
try {
DeviceFieldTrendSubscribeReq req = JSONUtil.toBean(payload, DeviceFieldTrendSubscribeReq.class);
DeviceFieldTrendSessionState state = fieldTrendStates.get(session.getId());
if (state == null) {
state = DeviceFieldTrendSessionState.emptyLazyOn();
fieldTrendStates.put(session.getId(), state);
}
if (req.getLazy() != null) {
state.setLazy(req.getLazy());
}
if (req.getDeviceCodes() != null) {
state.setDeviceCodes(new HashSet<>(req.getDeviceCodes()));
}
if (req.getFieldNames() != null) {
state.setFieldNames(new HashSet<>(req.getFieldNames()));
}
log.info("[websocket]device_field_trend订阅更新 sid={}, lazy={}, deviceCodes.size={}, fieldNames.size={}",
session.getId(), state.isLazy(),
state.getDeviceCodes() == null ? 0 : state.getDeviceCodes().size(),
state.getFieldNames() == null ? 0 : state.getFieldNames().size());
} catch (Exception e) {
log.error("[websocket]解析device_field_trend订阅失败 sid={}, payload={}", session.getId(), payload, e);
}
}
}
@@ -92,6 +143,10 @@ public class TrackWsHandler extends AbstractWebSocketHandler {
if (webSocketSession != null) {
webSocketSession.close();
}
// 如果是 field_trend 连接,清除订阅状态
if (WsTypeEnum.device_field_trend.name().equals(type)) {
fieldTrendStates.remove(sid);
}
}
log.info("[websocket]连接关闭:{}", type + "-" + sid);
} catch (Exception e) {

View File

@@ -4,6 +4,7 @@ import com.fizz.business.anno.OpcMessageHandlerType;
import com.fizz.business.constants.enums.OpcMessageType;
import com.fizz.business.domain.msg.*;
import com.fizz.business.dto.MatmapDTO;
import com.fizz.business.service.DeviceSnapshotService;
import com.fizz.business.service.LogDataService;
import com.fizz.business.service.OpcMessageHandler;
import com.fizz.business.service.strip.SegmentTrackerService;
@@ -27,6 +28,7 @@ public class AppMeasureHandler implements OpcMessageHandler<AppMeasureMessage> {
private final SegmentTrackerService tracker;
private final LogDataService logDataService;
private final Executor coilTrackExecutor; // 注入线程池接口
private final DeviceSnapshotService deviceSnapshotService;
private static final AtomicInteger LOG_COUNTER = new AtomicInteger(0);
@@ -39,6 +41,12 @@ public class AppMeasureHandler implements OpcMessageHandler<AppMeasureMessage> {
AppMeasureCoatMessage coat = message.getAppMeasureCoatMessage();
AppMeasureExitMessage exit = message.getAppMeasureExitMessage();
// 缓存最新测量数据供10分钟快照任务使用
if (entry != null) deviceSnapshotService.updateLatestMeasurement(entry);
if (furnace != null) deviceSnapshotService.updateLatestMeasurement(furnace);
if (coat != null) deviceSnapshotService.updateLatestMeasurement(coat);
if (exit != null) deviceSnapshotService.updateLatestMeasurement(exit);
WebSocketUtil.sendMeasureMsg(message);

View File

@@ -0,0 +1,52 @@
package com.fizz.business.service.impl;
import com.fizz.business.comm.OPC.OpcMessageSend;
import com.fizz.business.form.OpcBatchWriteDataForm;
import com.fizz.business.form.OpcWriteDataForm;
import com.fizz.business.service.OpcDataService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.stream.Collectors;
/**
* OPC 数据读写服务实现类
*/
@Slf4j
@Service
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "kangaroohy.milo", name = "enabled", havingValue = "true")
public class OpcDataServiceImpl implements OpcDataService {
private final OpcMessageSend opcMessageSend;
@Override
public boolean writeData(OpcWriteDataForm form) {
try {
return opcMessageSend.writeDataByFieldName(form.getFieldName(), form.getValue());
} catch (Exception e) {
log.error("写入 OPC 数据异常fieldName={}, value={}", form.getFieldName(), form.getValue(), e);
return false;
}
}
@Override
public boolean batchWriteData(OpcBatchWriteDataForm form) {
try {
Map<String, Object> fieldDataMap = form.getDataList().stream()
.collect(Collectors.toMap(
OpcWriteDataForm::getFieldName,
OpcWriteDataForm::getValue,
(v1, v2) -> v2 // 如果有重复的 key保留后面的值
));
return opcMessageSend.batchWriteDataByFieldName(fieldDataMap);
} catch (Exception e) {
log.error("批量写入 OPC 数据异常", e);
return false;
}
}
}

View File

@@ -14,16 +14,6 @@ import java.util.Map;
@Component
public class OpcMessageIdsManager {
/**
* 生成业务/消息唯一ID用于发送批次 bizKey 等)
* 规则PREFIX_时间戳_8位随机
*/
public String generateMessageId(String prefix) {
String p = (prefix == null || prefix.trim().isEmpty()) ? "MSG" : prefix.trim();
String random = java.util.UUID.randomUUID().toString().replace("-", "").substring(0, 8);
return (p + "_" + System.currentTimeMillis() + "_" + random).toUpperCase();
}
public static List<String> msgTriggers = Lists.newArrayList();
public static Map<String,String> lineMeasureIds = Maps.newHashMap();
@@ -404,4 +394,43 @@ public class OpcMessageIdsManager {
msgTriggers.add(DEVICE_NAME+exitMoveIds.get("trigger"));
msgTriggers.add(DEVICE_NAME+exitMeasureIds.get("trigger"));
}
/**
* 通过字段名查找对应的 OPC 节点路径
* @param fieldName 字段名例如coilId
* @return OPC 节点路径,如果未找到返回 null
*/
public static String findIdentifierByFieldName(String fieldName) {
if (fieldName == null || fieldName.isEmpty()) {
return null;
}
// 在所有映射中查找按优先级顺序查找pdiSetupIds 优先,因为这是写数据的主要映射)
Map<String, String>[] allMaps = new Map[]{
pdiSetupIds,
entryLineMeasureIds,
procLineMeasureIds,
furLineMeasureIds,
exitLineMeasureIds,
entryMoveIds,
exitCutIds,
exitMoveIds,
exitMeasureIds
};
for (Map<String, String> map : allMaps) {
for (Map.Entry<String, String> entry : map.entrySet()) {
// 跳过 trigger 键
if ("trigger".equals(entry.getKey())) {
continue;
}
// 如果字段名匹配,返回对应的节点路径
if (fieldName.equals(entry.getValue())) {
return entry.getKey();
}
}
}
return null;
}
}

View File

@@ -9,6 +9,9 @@ import com.fizz.business.constants.enums.WsTypeEnum;
import com.fizz.business.domain.msg.AppMeasureEntryMessage;
import com.fizz.business.domain.msg.AppMeasureMessage;
import com.fizz.business.dto.CoilPositionDTO;
import com.fizz.business.dto.DeviceChartDataDTO;
import com.fizz.business.dto.DeviceHistoryTrendDTO;
import com.fizz.business.dto.DeviceFieldTrendDTO;
import com.fizz.business.dto.L1CoilLineMeasureDTO;
import com.fizz.business.dto.MatmapDTO;
import com.fizz.business.dto.WsSignalMsgDTO;
@@ -93,4 +96,52 @@ public class WebSocketUtil {
if (CollUtil.isEmpty(list)) return;
sendMessage(WsTypeEnum.track_matmap, JSONUtil.toJsonStr(list));
}
/**
* 推送设备历史趋势数据
* @param trendData 历史趋势数据
*/
public static void sendHistoryTrendMessage(DeviceHistoryTrendDTO trendData) {
if (trendData == null) return;
sendMessage(WsTypeEnum.device_history_trend, JSONUtil.toJsonStr(trendData));
}
/**
* 推送图表数据(饼图、条形图、折线图、统计数据)
* @param chartData 图表数据
*/
public static void sendChartDataMessage(DeviceChartDataDTO chartData) {
if (chartData == null) return;
sendMessage(WsTypeEnum.device_chart_data, JSONUtil.toJsonStr(chartData));
}
/**
* 推送设备-字段趋势数据
*/
public static void sendDeviceFieldTrendMessage(DeviceFieldTrendDTO dto) {
if (dto == null) return;
// 懒加载:仅推送给订阅了该 deviceCode/fieldName 的客户端
Map<String, WebSocketSession> clients = trackWsHandler.getClients().getOrDefault(WsTypeEnum.device_field_trend.name(), Maps.newConcurrentMap());
if (clients.isEmpty()) return;
String payload = JSONUtil.toJsonStr(dto);
TextMessage message = new TextMessage(payload);
clients.values().forEach(s -> {
try {
var state = TrackWsHandler.getFieldTrendState(s.getId());
// 未建立订阅状态时默认不推送(避免全量)
if (state == null) {
return;
}
if (!state.match(dto.getDeviceCode(), dto.getFieldName())) {
return;
}
s.sendMessage(message);
} catch (IOException e) {
log.error("[websocket]向客户端[{}]推送device_field_trend异常", s.getId(), e);
}
});
}
}