Files
xgy-oa/klp-wms/src/main/java/com/klp/service/impl/WmsBatchServiceImpl.java
Joshi add788c148 feat(wms-batch):增加死锁检测和批次分配功能
- 在 IWmsBatchService 接口中添加了 checkDeadlock 和 generateNonDeadlockBatches 方法
- 在 WmsBatchController 中添加了对应的控制器方法
- 在 WmsBatchServiceImpl 中实现了死锁检测和批次分配的逻辑
- 新增了构建依赖图、检测环、拓扑排序等辅助方法
2025-08-14 16:35:46 +08:00

364 lines
12 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.klp.service.impl;
import cn.hutool.core.bean.BeanUtil;
import com.klp.common.core.page.TableDataInfo;
import com.klp.common.core.domain.PageQuery;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.klp.common.utils.StringUtils;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import com.klp.domain.bo.WmsBatchBo;
import com.klp.domain.vo.WmsBatchVo;
import com.klp.domain.WmsBatch;
import com.klp.mapper.WmsBatchMapper;
import com.klp.service.IWmsBatchService;
import java.util.*;
import java.util.stream.Collectors;
/**
* 批次合并相同工艺的任务Service业务层处理
*
* @author klp
* @date 2025-08-14
*/
@RequiredArgsConstructor
@Service
public class WmsBatchServiceImpl implements IWmsBatchService {
private final WmsBatchMapper baseMapper;
/**
* 查询批次(合并相同工艺的任务)
*/
@Override
public WmsBatchVo queryById(Long batchId){
return baseMapper.selectVoById(batchId);
}
/**
* 查询批次(合并相同工艺的任务)列表
*/
@Override
public TableDataInfo<WmsBatchVo> queryPageList(WmsBatchBo bo, PageQuery pageQuery) {
LambdaQueryWrapper<WmsBatch> lqw = buildQueryWrapper(bo);
Page<WmsBatchVo> result = baseMapper.selectVoPage(pageQuery.build(), lqw);
return TableDataInfo.build(result);
}
/**
* 查询批次(合并相同工艺的任务)列表
*/
@Override
public List<WmsBatchVo> queryList(WmsBatchBo bo) {
LambdaQueryWrapper<WmsBatch> lqw = buildQueryWrapper(bo);
return baseMapper.selectVoList(lqw);
}
private LambdaQueryWrapper<WmsBatch> buildQueryWrapper(WmsBatchBo bo) {
Map<String, Object> params = bo.getParams();
LambdaQueryWrapper<WmsBatch> lqw = Wrappers.lambdaQuery();
lqw.eq(StringUtils.isNotBlank(bo.getBatchNo()), WmsBatch::getBatchNo, bo.getBatchNo());
lqw.eq(bo.getProcessId() != null, WmsBatch::getProcessId, bo.getProcessId());
lqw.eq(bo.getTotalQuantity() != null, WmsBatch::getTotalQuantity, bo.getTotalQuantity());
lqw.eq(StringUtils.isNotBlank(bo.getMergeSource()), WmsBatch::getMergeSource, bo.getMergeSource());
lqw.eq(bo.getEstimatedStartTime() != null, WmsBatch::getEstimatedStartTime, bo.getEstimatedStartTime());
lqw.eq(bo.getEstimatedEndTime() != null, WmsBatch::getEstimatedEndTime, bo.getEstimatedEndTime());
lqw.eq(StringUtils.isNotBlank(bo.getBatchStatus()), WmsBatch::getBatchStatus, bo.getBatchStatus());
return lqw;
}
/**
* 新增批次(合并相同工艺的任务)
*/
@Override
public Boolean insertByBo(WmsBatchBo bo) {
WmsBatch add = BeanUtil.toBean(bo, WmsBatch.class);
validEntityBeforeSave(add);
boolean flag = baseMapper.insert(add) > 0;
if (flag) {
bo.setBatchId(add.getBatchId());
}
return flag;
}
/**
* 修改批次(合并相同工艺的任务)
*/
@Override
public Boolean updateByBo(WmsBatchBo bo) {
WmsBatch update = BeanUtil.toBean(bo, WmsBatch.class);
validEntityBeforeSave(update);
return baseMapper.updateById(update) > 0;
}
/**
* 保存前的数据校验
*/
private void validEntityBeforeSave(WmsBatch entity){
//TODO 做一些数据校验,如唯一约束
}
/**
* 批量删除批次(合并相同工艺的任务)
*/
@Override
public Boolean deleteWithValidByIds(Collection<Long> ids, Boolean isValid) {
if(isValid){
//TODO 做一些业务上的校验,判断是否需要校验
}
return baseMapper.deleteBatchIds(ids) > 0;
}
/**
* 检测任务执行是否会产生死锁
*
* @param rows 任务执行顺序数组
* @return 是否存在死锁
*/
@Override
public boolean checkDeadlock(List<List<Map<String, Object>>> rows) {
// 构建进程依赖图
Map<String, Set<String>> graph = buildDependencyGraph(rows);
// 检测是否存在环(死锁)
return hasCycle(graph);
}
/**
* 生成不会产生死锁的批次分配方案
*
* @param rows 任务执行顺序数组
* @return 批次分配方案
*/
@Override
public List<String> generateNonDeadlockBatches(List<List<Map<String, Object>>> rows) {
// 获取所有任务
Set<String> allTasks = new HashSet<>();
for (List<Map<String, Object>> row : rows) {
for (Map<String, Object> task : row) {
allTasks.add(task.get("taskId").toString());
}
}
// 构建进程依赖图
Map<String, Set<String>> processGraph = buildProcessDependencyGraph(rows);
// 构建任务依赖图
Map<String, Set<String>> taskGraph = buildTaskDependencyGraph(rows);
// 使用拓扑排序找出可行的批次分配方案
List<Set<String>> batches = topologicalSort(processGraph, taskGraph, allTasks);
// 将批次转换为字符串格式
return batches.stream()
.map(batch -> String.join(",", batch))
.collect(Collectors.toList());
}
/**
* 构建依赖图
* 如果进程A的任务必须在进程B的任务之前执行则B依赖于A
*/
private Map<String, Set<String>> buildDependencyGraph(List<List<Map<String, Object>>> rows) {
Map<String, Set<String>> graph = new HashMap<>();
Map<String, Integer> processSequence = new HashMap<>();
// 遍历每一行,记录每个进程在每一行的执行顺序
for (List<Map<String, Object>> row : rows) {
// 按sequare排序
row.sort(Comparator.comparingInt(task -> Integer.parseInt(task.get("sequare").toString())));
// 记录每个进程的执行顺序
for (int i = 0; i < row.size(); i++) {
Map<String, Object> task = row.get(i);
String processId = task.get("processId").toString();
// 如果当前进程已经有更早的执行顺序,则保留更早的顺序
processSequence.putIfAbsent(processId, i);
processSequence.put(processId, Math.min(processSequence.get(processId), i));
}
// 构建依赖关系
for (int i = 0; i < row.size() - 1; i++) {
String currentProcess = row.get(i).get("processId").toString();
String nextProcess = row.get(i + 1).get("processId").toString();
// 添加依赖nextProcess依赖于currentProcess
graph.putIfAbsent(nextProcess, new HashSet<>());
graph.get(nextProcess).add(currentProcess);
}
}
return graph;
}
/**
* 构建进程依赖图
*/
private Map<String, Set<String>> buildProcessDependencyGraph(List<List<Map<String, Object>>> rows) {
Map<String, Set<String>> graph = new HashMap<>();
// 遍历每一行,记录进程间的依赖关系
for (List<Map<String, Object>> row : rows) {
// 按sequare排序
row.sort(Comparator.comparingInt(task -> Integer.parseInt(task.get("sequare").toString())));
// 构建依赖关系
for (int i = 0; i < row.size() - 1; i++) {
String currentProcess = row.get(i).get("processId").toString();
String nextProcess = row.get(i + 1).get("processId").toString();
// 添加依赖nextProcess依赖于currentProcess
graph.putIfAbsent(nextProcess, new HashSet<>());
graph.get(nextProcess).add(currentProcess);
}
}
return graph;
}
/**
* 构建任务依赖图
*/
private Map<String, Set<String>> buildTaskDependencyGraph(List<List<Map<String, Object>>> rows) {
Map<String, Set<String>> graph = new HashMap<>();
// 遍历每一行,记录任务间的依赖关系
for (List<Map<String, Object>> row : rows) {
// 按sequare排序
row.sort(Comparator.comparingInt(task -> Integer.parseInt(task.get("sequare").toString())));
// 构建依赖关系
for (int i = 0; i < row.size() - 1; i++) {
String currentTask = row.get(i).get("taskId").toString();
String nextTask = row.get(i + 1).get("taskId").toString();
// 添加依赖nextTask依赖于currentTask
graph.putIfAbsent(nextTask, new HashSet<>());
graph.get(nextTask).add(currentTask);
}
}
return graph;
}
/**
* 检测图中是否存在环(死锁)
*/
private boolean hasCycle(Map<String, Set<String>> graph) {
// 所有节点的状态0=未访问1=访问中2=已访问
Map<String, Integer> visited = new HashMap<>();
// 初始化所有节点为未访问
for (String node : graph.keySet()) {
visited.put(node, 0);
}
// 对每个未访问的节点进行DFS
for (String node : graph.keySet()) {
if (visited.get(node) == 0) {
if (hasCycleDFS(node, graph, visited)) {
return true;
}
}
}
return false;
}
/**
* DFS检测环
*/
private boolean hasCycleDFS(String node, Map<String, Set<String>> graph, Map<String, Integer> visited) {
// 标记当前节点为访问中
visited.put(node, 1);
// 访问所有邻居
if (graph.containsKey(node)) {
for (String neighbor : graph.get(node)) {
// 如果邻居正在被访问,说明存在环
if (visited.getOrDefault(neighbor, 0) == 1) {
return true;
}
// 如果邻居未被访问继续DFS
if (visited.getOrDefault(neighbor, 0) == 0) {
if (hasCycleDFS(neighbor, graph, visited)) {
return true;
}
}
}
}
// 标记当前节点为已访问
visited.put(node, 2);
return false;
}
/**
* 使用拓扑排序生成批次分配方案
*/
private List<Set<String>> topologicalSort(Map<String, Set<String>> processGraph, Map<String, Set<String>> taskGraph, Set<String> allTasks) {
// 计算每个任务的入度
Map<String, Integer> inDegree = new HashMap<>();
for (String task : allTasks) {
inDegree.put(task, 0);
}
// 更新入度
for (Map.Entry<String, Set<String>> entry : taskGraph.entrySet()) {
String task = entry.getKey();
inDegree.put(task, entry.getValue().size());
}
// 创建队列将入度为0的任务加入队列
Queue<String> queue = new LinkedList<>();
for (Map.Entry<String, Integer> entry : inDegree.entrySet()) {
if (entry.getValue() == 0) {
queue.add(entry.getKey());
}
}
// 存储批次分配方案
List<Set<String>> batches = new ArrayList<>();
// 进行拓扑排序
while (!queue.isEmpty()) {
// 当前批次的任务
Set<String> currentBatch = new HashSet<>();
// 处理当前队列中的所有任务(这些任务可以并行执行)
int size = queue.size();
for (int i = 0; i < size; i++) {
String task = queue.poll();
currentBatch.add(task);
// 更新依赖于当前任务的任务的入度
for (Map.Entry<String, Set<String>> entry : taskGraph.entrySet()) {
String dependentTask = entry.getKey();
Set<String> dependencies = entry.getValue();
if (dependencies.contains(task)) {
inDegree.put(dependentTask, inDegree.get(dependentTask) - 1);
// 如果入度为0加入队列
if (inDegree.get(dependentTask) == 0) {
queue.add(dependentTask);
}
}
}
}
// 添加当前批次
if (!currentBatch.isEmpty()) {
batches.add(currentBatch);
}
}
return batches;
}
}