Files
klp-oa/klp-wms/src/main/java/com/klp/service/impl/WmsBatchServiceImpl.java
Joshi 59bd751cd7 feat(wms): 优化批次分配策略以解决死锁和进程冲突问题
- 新增 checkDeadlock 方法检测任务执行是否会产生死锁
- 改进 generateNonDeadlockBatches 方法,增加对死锁和进程冲突的处理
- 实现增强版 DFS 检测环,并收集环中的节点- 添加进程依赖图构建和冲突进程对查找功能
- 优化批次分组算法,确保所有任务都被合理分配
2025-08-15 11:25:48 +08:00

679 lines
26 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.klp.service.impl;
import cn.hutool.core.bean.BeanUtil;
import com.klp.common.core.page.TableDataInfo;
import com.klp.common.core.domain.PageQuery;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.klp.common.utils.StringUtils;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import com.klp.domain.bo.WmsBatchBo;
import com.klp.domain.vo.BatchGroupVo;
import com.klp.domain.vo.WmsBatchVo;
import com.klp.domain.WmsBatch;
import com.klp.mapper.WmsBatchMapper;
import com.klp.service.IWmsBatchService;
import java.util.*;
import java.util.stream.Collectors;
/**
* 批次合并相同工艺的任务Service业务层处理
*
* @author klp
* @date 2025-08-14
*/
@RequiredArgsConstructor
@Service
public class WmsBatchServiceImpl implements IWmsBatchService {
private final WmsBatchMapper baseMapper;
// 存储任务执行顺序数组的成员变量
private List<List<Map<String, Object>>> rows;
/**
* 查询批次(合并相同工艺的任务)
*/
@Override
public WmsBatchVo queryById(Long batchId){
return baseMapper.selectVoById(batchId);
}
/**
* 查询批次(合并相同工艺的任务)列表
*/
@Override
public TableDataInfo<WmsBatchVo> queryPageList(WmsBatchBo bo, PageQuery pageQuery) {
LambdaQueryWrapper<WmsBatch> lqw = buildQueryWrapper(bo);
Page<WmsBatchVo> result = baseMapper.selectVoPage(pageQuery.build(), lqw);
return TableDataInfo.build(result);
}
/**
* 查询批次(合并相同工艺的任务)列表
*/
@Override
public List<WmsBatchVo> queryList(WmsBatchBo bo) {
LambdaQueryWrapper<WmsBatch> lqw = buildQueryWrapper(bo);
return baseMapper.selectVoList(lqw);
}
private LambdaQueryWrapper<WmsBatch> buildQueryWrapper(WmsBatchBo bo) {
Map<String, Object> params = bo.getParams();
LambdaQueryWrapper<WmsBatch> lqw = Wrappers.lambdaQuery();
lqw.eq(StringUtils.isNotBlank(bo.getBatchNo()), WmsBatch::getBatchNo, bo.getBatchNo());
lqw.eq(bo.getProcessId() != null, WmsBatch::getProcessId, bo.getProcessId());
lqw.eq(bo.getTotalQuantity() != null, WmsBatch::getTotalQuantity, bo.getTotalQuantity());
lqw.eq(StringUtils.isNotBlank(bo.getMergeSource()), WmsBatch::getMergeSource, bo.getMergeSource());
lqw.eq(bo.getEstimatedStartTime() != null, WmsBatch::getEstimatedStartTime, bo.getEstimatedStartTime());
lqw.eq(bo.getEstimatedEndTime() != null, WmsBatch::getEstimatedEndTime, bo.getEstimatedEndTime());
lqw.eq(StringUtils.isNotBlank(bo.getBatchStatus()), WmsBatch::getBatchStatus, bo.getBatchStatus());
lqw.eq(bo.getPlanId() != null, WmsBatch::getPlanId, bo.getPlanId());
return lqw;
}
/**
* 新增批次(合并相同工艺的任务)
*/
@Override
public Boolean insertByBo(WmsBatchBo bo) {
WmsBatch add = BeanUtil.toBean(bo, WmsBatch.class);
validEntityBeforeSave(add);
boolean flag = baseMapper.insert(add) > 0;
if (flag) {
bo.setBatchId(add.getBatchId());
}
return flag;
}
/**
* 修改批次(合并相同工艺的任务)
*/
@Override
public Boolean updateByBo(WmsBatchBo bo) {
WmsBatch update = BeanUtil.toBean(bo, WmsBatch.class);
validEntityBeforeSave(update);
return baseMapper.updateById(update) > 0;
}
/**
* 保存前的数据校验
*/
private void validEntityBeforeSave(WmsBatch entity){
//TODO 做一些数据校验,如唯一约束
}
/**
* 批量删除批次(合并相同工艺的任务)
*/
@Override
public Boolean deleteWithValidByIds(Collection<Long> ids, Boolean isValid) {
if(isValid){
//TODO 做一些业务上的校验,判断是否需要校验
}
return baseMapper.deleteBatchIds(ids) > 0;
}
/**
* 构建任务依赖图
*/
private Map<String, Set<String>> buildTaskDependencyGraph(List<List<Map<String, Object>>> rows) {
Map<String, Set<String>> graph = new HashMap<>();
// 遍历每一行,记录任务间的依赖关系
for (List<Map<String, Object>> row : rows) {
// 按sequence排序
row.sort(Comparator.comparingInt(task -> Integer.parseInt(task.get("sequence").toString())));
// 构建依赖关系
for (int i = 0; i < row.size() - 1; i++) {
String currentTask = row.get(i).get("taskId").toString();
String nextTask = row.get(i + 1).get("taskId").toString();
// 添加依赖nextTask依赖于currentTask
graph.computeIfAbsent(nextTask, k -> new HashSet<>()).add(currentTask);
}
}
return graph;
}
/**
* 检测任务执行是否会产生死锁
*
* @param rows 任务执行顺序数组
* @return 是否存在死锁
*/
@Override
public boolean checkDeadlock(List<List<Map<String, Object>>> rows) {
// 保存任务执行顺序数组
this.rows = rows;
// 构建任务依赖图
Map<String, Set<String>> taskGraph = buildTaskDependencyGraph(rows);
// 获取所有任务
Set<String> allTasks = new HashSet<>();
for (List<Map<String, Object>> row : rows) {
for (Map<String, Object> task : row) {
allTasks.add(task.get("taskId").toString());
}
}
// 使用DFS检测是否存在环死锁
Map<String, Integer> visited = new HashMap<>();
Set<String> currentPath = new HashSet<>();
List<List<String>> cycles = new ArrayList<>();
for (String task : allTasks) {
if (visited.getOrDefault(task, 0) == 0) {
List<String> path = new ArrayList<>();
if (detectCycleDFS(task, taskGraph, visited, currentPath, path, cycles)) {
return true; // 存在死锁
}
}
}
// 检查进程冲突
Map<String, String> taskToProcess = new HashMap<>();
for (List<Map<String, Object>> row : rows) {
for (Map<String, Object> task : row) {
String taskId = task.get("taskId").toString();
String processId = task.get("processId").toString();
taskToProcess.put(taskId, processId);
}
}
Map<String, Set<String>> processGraph = buildProcessDependencyGraph(rows);
Set<Set<String>> conflictProcessPairs = findConflictProcessPairs(processGraph);
return !conflictProcessPairs.isEmpty(); // 如果存在进程冲突,也视为死锁
}
/**
* 增强版DFS检测环并收集环中的节点
*/
private boolean detectCycleDFS(String node, Map<String, Set<String>> graph,
Map<String, Integer> visited, Set<String> currentPath,
List<String> path, List<List<String>> cycles) {
if (node == null) {
return false;
}
// 标记当前节点为访问中
visited.put(node, 1);
currentPath.add(node);
path.add(node);
// 访问所有邻居
if (graph != null && graph.containsKey(node)) {
Set<String> neighbors = graph.get(node);
if (neighbors != null) {
for (String neighbor : neighbors) {
if (neighbor == null) {
continue;
}
// 如果邻居在当前路径中,说明存在环
if (currentPath.contains(neighbor)) {
// 找到环的起始位置
int startIdx = path.indexOf(neighbor);
if (startIdx >= 0) {
List<String> cycle = new ArrayList<>(path.subList(startIdx, path.size()));
cycles.add(cycle);
return true;
}
}
// 如果邻居未被访问继续DFS
if (visited.getOrDefault(neighbor, 0) == 0) {
if (detectCycleDFS(neighbor, graph, visited, currentPath, path, cycles)) {
return true;
}
}
}
}
}
// 回溯,移除当前节点
currentPath.remove(node);
path.remove(path.size() - 1);
// 标记当前节点为已访问
visited.put(node, 2);
return false;
}
/**
* 构建进程依赖图
* 如果进程A的任务必须在进程B的任务之前执行则进程B依赖于进程A
*/
private Map<String, Set<String>> buildProcessDependencyGraph(List<List<Map<String, Object>>> rows) {
Map<String, Set<String>> processGraph = new HashMap<>();
Map<String, Set<String>> taskGraph = buildTaskDependencyGraph(rows);
// 获取任务与进程的映射关系
Map<String, String> taskToProcess = new HashMap<>();
for (List<Map<String, Object>> row : rows) {
for (Map<String, Object> task : row) {
String taskId = task.get("taskId").toString();
String processId = task.get("processId").toString();
taskToProcess.put(taskId, processId);
}
}
// 根据任务依赖关系构建进程依赖关系
for (Map.Entry<String, Set<String>> entry : taskGraph.entrySet()) {
String taskId = entry.getKey();
Set<String> dependencies = entry.getValue();
if (dependencies == null) {
continue;
}
String processId = taskToProcess.get(taskId);
if (processId == null) {
continue;
}
for (String depTaskId : dependencies) {
String depProcessId = taskToProcess.get(depTaskId);
if (depProcessId == null) {
continue;
}
// 如果依赖的是不同进程的任务,则建立进程间依赖
if (!processId.equals(depProcessId)) {
processGraph.computeIfAbsent(processId, k -> new HashSet<>()).add(depProcessId);
}
}
}
return processGraph;
}
/**
* 检查两个进程之间是否存在冲突
* 如果进程A的任务必须在进程B的任务之前执行同时进程B的任务也必须在进程A的任务之前执行则存在冲突
*/
private boolean checkProcessConflict(String processA, String processB, Map<String, Set<String>> processGraph) {
if (processA == null || processB == null || processGraph == null) {
return false;
}
boolean aDepB = isProcessDependent(processA, processB, processGraph, new HashSet<>());
boolean bDepA = isProcessDependent(processB, processA, processGraph, new HashSet<>());
return aDepB && bDepA;
}
/**
* 检查进程A是否依赖于进程B
*/
private boolean isProcessDependent(String processA, String processB,
Map<String, Set<String>> processGraph, Set<String> visited) {
if (processA == null || processB == null || processGraph == null || visited == null) {
return false;
}
if (processA.equals(processB)) {
return true;
}
if (visited.contains(processA)) {
return false;
}
visited.add(processA);
if (processGraph.containsKey(processA)) {
Set<String> dependencies = processGraph.get(processA);
if (dependencies != null) {
for (String dep : dependencies) {
if (dep != null && (dep.equals(processB) || isProcessDependent(dep, processB, processGraph, visited))) {
return true;
}
}
}
}
return false;
}
/**
* 找出所有冲突的进程对
*/
private Set<Set<String>> findConflictProcessPairs(Map<String, Set<String>> processGraph) {
Set<Set<String>> conflictPairs = new HashSet<>();
Set<String> processes = new HashSet<>();
if (processGraph == null) {
return conflictPairs;
}
// 收集所有进程
for (String process : processGraph.keySet()) {
if (process != null) {
processes.add(process);
}
}
for (Map.Entry<String, Set<String>> entry : processGraph.entrySet()) {
Set<String> deps = entry.getValue();
if (deps != null) {
for (String dep : deps) {
if (dep != null) {
processes.add(dep);
}
}
}
}
// 检查每对进程是否存在冲突
List<String> processList = new ArrayList<>(processes);
for (int i = 0; i < processList.size(); i++) {
for (int j = i + 1; j < processList.size(); j++) {
String processA = processList.get(i);
String processB = processList.get(j);
if (processA != null && processB != null &&
checkProcessConflict(processA, processB, processGraph)) {
Set<String> pair = new HashSet<>();
pair.add(processA);
pair.add(processB);
conflictPairs.add(pair);
}
}
}
return conflictPairs;
}
/**
* 生成不会产生死锁的批次分配方案
* 相同processId的任务会合并到一个批次组中
* 不同processId的任务会放在不同的批次组中
*
* @param rows 任务执行顺序数组
* @return 批次分配方案
*/
@Override
public List<BatchGroupVo> generateNonDeadlockBatches(List<List<Map<String, Object>>> rows) {
// 保存任务执行顺序数组
this.rows = rows;
// 获取所有任务
Set<String> allTasks = new HashSet<>();
for (List<Map<String, Object>> row : rows) {
for (Map<String, Object> task : row) {
allTasks.add(task.get("taskId").toString());
}
}
// 构建任务依赖图
Map<String, Set<String>> taskGraph = buildTaskDependencyGraph(rows);
// 构建进程依赖图
Map<String, Set<String>> processGraph = buildProcessDependencyGraph(rows);
// 获取任务与进程的映射关系
Map<String, String> taskToProcess = new HashMap<>();
// 获取任务详细信息
Map<String, Map<String, Object>> taskDetails = new HashMap<>();
for (List<Map<String, Object>> row : rows) {
for (Map<String, Object> task : row) {
String taskId = task.get("taskId").toString();
String processId = task.get("processId").toString();
taskToProcess.put(taskId, processId);
taskDetails.put(taskId, task);
}
}
// 检查是否存在死锁
boolean hasDeadlock = checkDeadlock(rows);
// 找出所有冲突的进程对
Set<Set<String>> conflictProcessPairs = findConflictProcessPairs(processGraph);
if (hasDeadlock || !conflictProcessPairs.isEmpty()) {
// 如果存在死锁或进程冲突,使用改进的算法找出可行的批次分配方案
List<List<String>> batchGroups = generateBatchGroupsWithCycleHandling(
taskGraph, allTasks, taskToProcess, conflictProcessPairs);
// 将批次转换为BatchGroupVo格式
List<BatchGroupVo> result = new ArrayList<>();
int groupId = 1;
for (List<String> group : batchGroups) {
if (!group.isEmpty()) {
String processId = taskToProcess.get(group.get(0));
BatchGroupVo batchGroup = new BatchGroupVo();
batchGroup.setGroupId("Group-" + groupId++);
batchGroup.setProcessId(processId);
batchGroup.setTaskIds(group);
result.add(batchGroup);
}
}
return result;
} else {
// 如果不存在死锁按processId合并任务
return mergeTasksByProcessId(allTasks, taskToProcess);
}
}
/**
* 按processId合并任务
*/
private List<BatchGroupVo> mergeTasksByProcessId(Set<String> allTasks, Map<String, String> taskToProcess) {
// 按processId分组任务
Map<String, List<String>> processTasks = new HashMap<>();
for (String taskId : allTasks) {
String processId = taskToProcess.get(taskId);
if (processId != null) {
processTasks.computeIfAbsent(processId, k -> new ArrayList<>()).add(taskId);
}
}
// 创建批次组
List<BatchGroupVo> result = new ArrayList<>();
int groupId = 1;
for (Map.Entry<String, List<String>> entry : processTasks.entrySet()) {
String processId = entry.getKey();
List<String> tasks = entry.getValue();
if (!tasks.isEmpty()) {
BatchGroupVo batchGroup = new BatchGroupVo();
batchGroup.setGroupId("Group-" + groupId++);
batchGroup.setProcessId(processId);
batchGroup.setTaskIds(tasks);
result.add(batchGroup);
}
}
return result;
}
/**
* 生成批次分组,处理环结构
* 相同processId的任务会合并到一个列表中
* 不同processId的任务会放在不同的列表中
* 处理环结构,确保所有任务都被分配
*/
private List<List<String>> generateBatchGroupsWithCycleHandling(
Map<String, Set<String>> taskGraph, Set<String> allTasks,
Map<String, String> taskToProcess, Set<Set<String>> conflictProcessPairs) {
// 计算每个任务的入度
Map<String, Integer> inDegree = new HashMap<>();
for (String task : allTasks) {
inDegree.put(task, 0);
}
// 更新入度
for (Map.Entry<String, Set<String>> entry : taskGraph.entrySet()) {
String task = entry.getKey();
Set<String> dependencies = entry.getValue();
if (dependencies != null) {
inDegree.put(task, dependencies.size());
}
}
// 创建队列将入度为0的任务加入队列
Queue<String> queue = new LinkedList<>();
for (Map.Entry<String, Integer> entry : inDegree.entrySet()) {
if (entry.getValue() == 0) {
queue.add(entry.getKey());
}
}
// 存储批次分组方案
List<List<String>> batchGroups = new ArrayList<>();
// 记录已经分配到批次的任务
Set<String> processedTasks = new HashSet<>();
// 记录已经分配到批次的进程ID
Set<String> processedProcessIds = new HashSet<>();
// 进行拓扑排序
while (!queue.isEmpty()) {
// 当前层级可以并行执行的任务
List<String> currentLevelTasks = new ArrayList<>();
int size = queue.size();
for (int i = 0; i < size; i++) {
String task = queue.poll();
if (task != null) {
currentLevelTasks.add(task);
processedTasks.add(task);
}
}
// 按进程ID分组但避免将冲突的进程合并
Map<String, List<String>> processBatches = new HashMap<>();
for (String task : currentLevelTasks) {
String processId = taskToProcess.get(task);
if (processId != null) {
processBatches.computeIfAbsent(processId, k -> new ArrayList<>()).add(task);
}
// 更新依赖于当前任务的任务的入度
for (Map.Entry<String, Set<String>> entry : taskGraph.entrySet()) {
String dependentTask = entry.getKey();
Set<String> dependencies = entry.getValue();
if (dependencies != null && dependencies.contains(task)) {
int newDegree = inDegree.get(dependentTask) - 1;
inDegree.put(dependentTask, newDegree);
// 如果入度为0加入队列
if (newDegree == 0) {
queue.add(dependentTask);
}
}
}
}
// 将每个进程的任务作为一个批次添加,但避免合并冲突的进程
for (Map.Entry<String, List<String>> entry : processBatches.entrySet()) {
String processId = entry.getKey();
List<String> tasks = entry.getValue();
if (!tasks.isEmpty()) {
// 如果该进程ID已经处理过则不能再合并
if (processedProcessIds.contains(processId)) {
batchGroups.add(tasks);
} else {
// 尝试合并到现有批次,但避免合并冲突的进程
boolean merged = false;
for (List<String> existingBatch : batchGroups) {
if (!existingBatch.isEmpty()) {
String existingProcessId = taskToProcess.get(existingBatch.get(0));
// 检查是否存在进程冲突
boolean hasConflict = false;
if (existingProcessId != null && conflictProcessPairs != null) {
for (Set<String> conflictPair : conflictProcessPairs) {
if (conflictPair.contains(processId) && conflictPair.contains(existingProcessId)) {
hasConflict = true;
break;
}
}
}
// 如果进程ID相同且没有冲突则合并
if (processId.equals(existingProcessId) && !hasConflict) {
existingBatch.addAll(tasks);
merged = true;
break;
}
}
}
// 如果没有合并到现有批次,则创建新批次
if (!merged) {
batchGroups.add(tasks);
}
// 标记该进程ID已处理
processedProcessIds.add(processId);
}
}
}
}
// 处理剩余未分配的任务(可能在环中)
Set<String> remainingTasks = new HashSet<>(allTasks);
remainingTasks.removeAll(processedTasks);
if (!remainingTasks.isEmpty()) {
// 对于环中的任务,我们需要打破环
// 策略:将每个未处理的任务单独作为一个批次
Map<String, List<String>> remainingProcessBatches = new HashMap<>();
for (String task : remainingTasks) {
String processId = taskToProcess.get(task);
if (processId != null) {
// 按进程ID分组但不合并冲突的进程
boolean canMerge = true;
// 检查是否与已处理的进程冲突
for (String processedId : processedProcessIds) {
boolean hasConflict = false;
if (conflictProcessPairs != null) {
for (Set<String> conflictPair : conflictProcessPairs) {
if (conflictPair.contains(processId) && conflictPair.contains(processedId)) {
hasConflict = true;
break;
}
}
}
if (hasConflict) {
canMerge = false;
break;
}
}
if (canMerge) {
remainingProcessBatches.computeIfAbsent(processId, k -> new ArrayList<>()).add(task);
} else {
// 如果与已处理的进程冲突,则单独作为一个批次
List<String> singleTaskBatch = new ArrayList<>();
singleTaskBatch.add(task);
batchGroups.add(singleTaskBatch);
}
}
}
// 将剩余的按进程分组的任务添加到批次中
for (List<String> tasks : remainingProcessBatches.values()) {
if (!tasks.isEmpty()) {
batchGroups.add(tasks);
}
}
}
return batchGroups;
}
}