feat(wms): 优化批次分配策略以解决死锁和进程冲突问题

- 新增 checkDeadlock 方法检测任务执行是否会产生死锁
- 改进 generateNonDeadlockBatches 方法,增加对死锁和进程冲突的处理
- 实现增强版 DFS 检测环,并收集环中的节点- 添加进程依赖图构建和冲突进程对查找功能
- 优化批次分组算法,确保所有任务都被合理分配
This commit is contained in:
2025-08-15 11:25:48 +08:00
parent f3b2ac1f29
commit 59bd751cd7
3 changed files with 363 additions and 117 deletions

View File

@@ -5,6 +5,7 @@ import java.util.List;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import com.klp.domain.vo.BatchGroupVo;
import lombok.RequiredArgsConstructor;
@@ -107,39 +108,49 @@ public class WmsBatchController extends BaseController {
* 生成不会产生死锁的批次分配方案
* 相同processId的任务会合并到一个批次组中
* 不同processId的任务会放在不同的批次组中
* 对于存在死锁的情况,会避免将冲突的进程合并到一起
*
* @param rows 任务执行顺序数组
* @return 批次分配方案
*/
@PostMapping("/generate")
public R<List<BatchGroupVo>> generateNonDeadlockBatches(@RequestBody List<List<Map<String, Object>>> rows) {
// 先获取原始的批次分配方案
List<BatchGroupVo> originalBatches = iWmsBatchService.generateNonDeadlockBatches(rows);
// 检查是否存在死锁
boolean hasDeadlock = iWmsBatchService.checkDeadlock(rows);
// 获取批次分配方案
List<BatchGroupVo> batches = iWmsBatchService.generateNonDeadlockBatches(rows);
// 如果不存在死锁,才进行进一步合并
if (!hasDeadlock) {
// 使用Java 8 Stream API按processId分组并合并任务
Map<String, List<BatchGroupVo>> groupedByProcessId = batches.stream()
.collect(Collectors.groupingBy(BatchGroupVo::getProcessId));
// 使用Java 8 Stream API按processId分组并合并任务
Map<String, List<BatchGroupVo>> groupedByProcessId = originalBatches.stream()
.collect(java.util.stream.Collectors.groupingBy(BatchGroupVo::getProcessId));
// 合并相同processId的批次组
List<BatchGroupVo> mergedBatches = new ArrayList<>();
AtomicInteger groupCounter = new AtomicInteger(1);
// 合并相同processId的批次组
List<BatchGroupVo> mergedBatches = new ArrayList<>();
AtomicInteger groupCounter = new AtomicInteger(1);
groupedByProcessId.forEach((processId, groups) -> {
// 创建一个新的合并后的批次组
BatchGroupVo mergedGroup = new BatchGroupVo();
mergedGroup.setGroupId("Merged-Group-" + groupCounter.getAndIncrement());
mergedGroup.setProcessId(processId);
groupedByProcessId.forEach((processId, groups) -> {
// 创建一个新的合并后的批次组
BatchGroupVo mergedGroup = new BatchGroupVo();
mergedGroup.setGroupId("Merged-Group-" + groupCounter.getAndIncrement());
mergedGroup.setProcessId(processId);
// 合并所有taskIds
List<String> allTaskIds = groups.stream()
.flatMap(group -> group.getTaskIds().stream())
.collect(Collectors.toList());
mergedGroup.setTaskIds(allTaskIds);
// 合并所有taskIds
List<String> allTaskIds = groups.stream()
.flatMap(group -> group.getTaskIds().stream())
.collect(java.util.stream.Collectors.toList());
mergedGroup.setTaskIds(allTaskIds);
mergedBatches.add(mergedGroup);
});
return R.ok(mergedBatches);
mergedBatches.add(mergedGroup);
});
return R.ok(mergedBatches);
} else {
// 如果存在死锁直接返回Service层的结果不进行额外合并
return R.ok(batches);
}
}

View File

@@ -49,6 +49,14 @@ public interface IWmsBatchService {
Boolean deleteWithValidByIds(Collection<Long> ids, Boolean isValid);
/**
* 检测任务执行是否会产生死锁
*
* @param rows 任务执行顺序数组
* @return 是否存在死锁
*/
boolean checkDeadlock(List<List<Map<String, Object>>> rows);
/**
* 生成不会产生死锁的批次分配方案
*

View File

@@ -116,12 +116,37 @@ public class WmsBatchServiceImpl implements IWmsBatchService {
return baseMapper.deleteBatchIds(ids) > 0;
}
/**
* 构建任务依赖图
*/
private Map<String, Set<String>> buildTaskDependencyGraph(List<List<Map<String, Object>>> rows) {
Map<String, Set<String>> graph = new HashMap<>();
// 遍历每一行,记录任务间的依赖关系
for (List<Map<String, Object>> row : rows) {
// 按sequence排序
row.sort(Comparator.comparingInt(task -> Integer.parseInt(task.get("sequence").toString())));
// 构建依赖关系
for (int i = 0; i < row.size() - 1; i++) {
String currentTask = row.get(i).get("taskId").toString();
String nextTask = row.get(i + 1).get("taskId").toString();
// 添加依赖nextTask依赖于currentTask
graph.computeIfAbsent(nextTask, k -> new HashSet<>()).add(currentTask);
}
}
return graph;
}
/**
* 检测任务执行是否会产生死锁
*
* @param rows 任务执行顺序数组
* @return 是否存在死锁
*/
@Override
public boolean checkDeadlock(List<List<Map<String, Object>>> rows) {
// 保存任务执行顺序数组
this.rows = rows;
@@ -139,15 +164,86 @@ public class WmsBatchServiceImpl implements IWmsBatchService {
// 使用DFS检测是否存在环死锁
Map<String, Integer> visited = new HashMap<>();
Set<String> currentPath = new HashSet<>();
List<List<String>> cycles = new ArrayList<>();
for (String task : allTasks) {
if (visited.getOrDefault(task, 0) == 0) {
if (hasCycleDFS(task, taskGraph, visited)) {
List<String> path = new ArrayList<>();
if (detectCycleDFS(task, taskGraph, visited, currentPath, path, cycles)) {
return true; // 存在死锁
}
}
}
return false; // 不存在死锁
// 检查进程冲突
Map<String, String> taskToProcess = new HashMap<>();
for (List<Map<String, Object>> row : rows) {
for (Map<String, Object> task : row) {
String taskId = task.get("taskId").toString();
String processId = task.get("processId").toString();
taskToProcess.put(taskId, processId);
}
}
Map<String, Set<String>> processGraph = buildProcessDependencyGraph(rows);
Set<Set<String>> conflictProcessPairs = findConflictProcessPairs(processGraph);
return !conflictProcessPairs.isEmpty(); // 如果存在进程冲突,也视为死锁
}
/**
* 增强版DFS检测环并收集环中的节点
*/
private boolean detectCycleDFS(String node, Map<String, Set<String>> graph,
Map<String, Integer> visited, Set<String> currentPath,
List<String> path, List<List<String>> cycles) {
if (node == null) {
return false;
}
// 标记当前节点为访问中
visited.put(node, 1);
currentPath.add(node);
path.add(node);
// 访问所有邻居
if (graph != null && graph.containsKey(node)) {
Set<String> neighbors = graph.get(node);
if (neighbors != null) {
for (String neighbor : neighbors) {
if (neighbor == null) {
continue;
}
// 如果邻居在当前路径中,说明存在环
if (currentPath.contains(neighbor)) {
// 找到环的起始位置
int startIdx = path.indexOf(neighbor);
if (startIdx >= 0) {
List<String> cycle = new ArrayList<>(path.subList(startIdx, path.size()));
cycles.add(cycle);
return true;
}
}
// 如果邻居未被访问继续DFS
if (visited.getOrDefault(neighbor, 0) == 0) {
if (detectCycleDFS(neighbor, graph, visited, currentPath, path, cycles)) {
return true;
}
}
}
}
}
// 回溯,移除当前节点
currentPath.remove(node);
path.remove(path.size() - 1);
// 标记当前节点为已访问
visited.put(node, 2);
return false;
}
/**
@@ -172,22 +268,126 @@ public class WmsBatchServiceImpl implements IWmsBatchService {
for (Map.Entry<String, Set<String>> entry : taskGraph.entrySet()) {
String taskId = entry.getKey();
Set<String> dependencies = entry.getValue();
if (dependencies == null) {
continue;
}
String processId = taskToProcess.get(taskId);
if (processId == null) {
continue;
}
for (String depTaskId : dependencies) {
String depProcessId = taskToProcess.get(depTaskId);
if (depProcessId == null) {
continue;
}
// 如果依赖的是不同进程的任务,则建立进程间依赖
if (!processId.equals(depProcessId)) {
processGraph.putIfAbsent(processId, new HashSet<>());
processGraph.get(processId).add(depProcessId);
processGraph.computeIfAbsent(processId, k -> new HashSet<>()).add(depProcessId);
}
}
}
return processGraph;
}
/**
* 检查两个进程之间是否存在冲突
* 如果进程A的任务必须在进程B的任务之前执行同时进程B的任务也必须在进程A的任务之前执行则存在冲突
*/
private boolean checkProcessConflict(String processA, String processB, Map<String, Set<String>> processGraph) {
if (processA == null || processB == null || processGraph == null) {
return false;
}
boolean aDepB = isProcessDependent(processA, processB, processGraph, new HashSet<>());
boolean bDepA = isProcessDependent(processB, processA, processGraph, new HashSet<>());
return aDepB && bDepA;
}
/**
* 检查进程A是否依赖于进程B
*/
private boolean isProcessDependent(String processA, String processB,
Map<String, Set<String>> processGraph, Set<String> visited) {
if (processA == null || processB == null || processGraph == null || visited == null) {
return false;
}
if (processA.equals(processB)) {
return true;
}
if (visited.contains(processA)) {
return false;
}
visited.add(processA);
if (processGraph.containsKey(processA)) {
Set<String> dependencies = processGraph.get(processA);
if (dependencies != null) {
for (String dep : dependencies) {
if (dep != null && (dep.equals(processB) || isProcessDependent(dep, processB, processGraph, visited))) {
return true;
}
}
}
}
return false;
}
/**
* 找出所有冲突的进程对
*/
private Set<Set<String>> findConflictProcessPairs(Map<String, Set<String>> processGraph) {
Set<Set<String>> conflictPairs = new HashSet<>();
Set<String> processes = new HashSet<>();
if (processGraph == null) {
return conflictPairs;
}
// 收集所有进程
for (String process : processGraph.keySet()) {
if (process != null) {
processes.add(process);
}
}
for (Map.Entry<String, Set<String>> entry : processGraph.entrySet()) {
Set<String> deps = entry.getValue();
if (deps != null) {
for (String dep : deps) {
if (dep != null) {
processes.add(dep);
}
}
}
}
// 检查每对进程是否存在冲突
List<String> processList = new ArrayList<>(processes);
for (int i = 0; i < processList.size(); i++) {
for (int j = i + 1; j < processList.size(); j++) {
String processA = processList.get(i);
String processB = processList.get(j);
if (processA != null && processB != null &&
checkProcessConflict(processA, processB, processGraph)) {
Set<String> pair = new HashSet<>();
pair.add(processA);
pair.add(processB);
conflictPairs.add(pair);
}
}
}
return conflictPairs;
}
/**
* 生成不会产生死锁的批次分配方案
@@ -210,11 +410,11 @@ public class WmsBatchServiceImpl implements IWmsBatchService {
}
}
// // 构建进程依赖图
// Map<String, Set<String>> processGraph = buildProcessDependencyGraph(rows);
// 构建任务依赖图
Map<String, Set<String>> taskGraph = buildTaskDependencyGraph(rows);
// 构建进程依赖图
Map<String, Set<String>> processGraph = buildProcessDependencyGraph(rows);
// 获取任务与进程的映射关系
Map<String, String> taskToProcess = new HashMap<>();
@@ -231,9 +431,14 @@ public class WmsBatchServiceImpl implements IWmsBatchService {
// 检查是否存在死锁
boolean hasDeadlock = checkDeadlock(rows);
if (hasDeadlock) {
// 如果存在死锁,使用拓扑排序找出可行的批次分配方案
List<List<String>> batchGroups = generateBatchGroups(taskGraph, allTasks, taskToProcess);
// 找出所有冲突的进程对
Set<Set<String>> conflictProcessPairs = findConflictProcessPairs(processGraph);
if (hasDeadlock || !conflictProcessPairs.isEmpty()) {
// 如果存在死锁或进程冲突,使用改进的算法找出可行的批次分配方案
List<List<String>> batchGroups = generateBatchGroupsWithCycleHandling(
taskGraph, allTasks, taskToProcess, conflictProcessPairs);
// 将批次转换为BatchGroupVo格式
List<BatchGroupVo> result = new ArrayList<>();
@@ -264,7 +469,9 @@ public class WmsBatchServiceImpl implements IWmsBatchService {
Map<String, List<String>> processTasks = new HashMap<>();
for (String taskId : allTasks) {
String processId = taskToProcess.get(taskId);
processTasks.computeIfAbsent(processId, k -> new ArrayList<>()).add(taskId);
if (processId != null) {
processTasks.computeIfAbsent(processId, k -> new ArrayList<>()).add(taskId);
}
}
// 创建批次组
@@ -274,78 +481,28 @@ public class WmsBatchServiceImpl implements IWmsBatchService {
String processId = entry.getKey();
List<String> tasks = entry.getValue();
BatchGroupVo batchGroup = new BatchGroupVo();
batchGroup.setGroupId("Group-" + groupId++);
batchGroup.setProcessId(processId);
batchGroup.setTaskIds(tasks);
result.add(batchGroup);
if (!tasks.isEmpty()) {
BatchGroupVo batchGroup = new BatchGroupVo();
batchGroup.setGroupId("Group-" + groupId++);
batchGroup.setProcessId(processId);
batchGroup.setTaskIds(tasks);
result.add(batchGroup);
}
}
return result;
}
/**
* 构建任务依赖图
*/
private Map<String, Set<String>> buildTaskDependencyGraph(List<List<Map<String, Object>>> rows) {
Map<String, Set<String>> graph = new HashMap<>();
// 遍历每一行,记录任务间的依赖关系
for (List<Map<String, Object>> row : rows) {
// 按sequare排序
row.sort(Comparator.comparingInt(task -> Integer.parseInt(task.get("sequence").toString())));
// 构建依赖关系
for (int i = 0; i < row.size() - 1; i++) {
String currentTask = row.get(i).get("taskId").toString();
String nextTask = row.get(i + 1).get("taskId").toString();
// 添加依赖nextTask依赖于currentTask
graph.putIfAbsent(nextTask, new HashSet<>());
graph.get(nextTask).add(currentTask);
}
}
return graph;
}
/**
* DFS检测环
*/
private boolean hasCycleDFS(String node, Map<String, Set<String>> graph, Map<String, Integer> visited) {
// 标记当前节点为访问中
visited.put(node, 1);
// 访问所有邻居
if (graph.containsKey(node)) {
for (String neighbor : graph.get(node)) {
// 如果邻居正在被访问,说明存在环
if (visited.getOrDefault(neighbor, 0) == 1) {
return true;
}
// 如果邻居未被访问继续DFS
if (visited.getOrDefault(neighbor, 0) == 0) {
if (hasCycleDFS(neighbor, graph, visited)) {
return true;
}
}
}
}
// 标记当前节点为已访问
visited.put(node, 2);
return false;
}
/**
* 生成批次分组
* 生成批次分组,处理环结构
* 相同processId的任务会合并到一个列表中
* 不同processId的任务会放在不同的列表中
* 处理环结构,确保所有任务都被分配
*/
private List<List<String>> generateBatchGroups( Map<String, Set<String>> taskGraph,
Set<String> allTasks, Map<String, String> taskToProcess) {
private List<List<String>> generateBatchGroupsWithCycleHandling(
Map<String, Set<String>> taskGraph, Set<String> allTasks,
Map<String, String> taskToProcess, Set<Set<String>> conflictProcessPairs) {
// 计算每个任务的入度
Map<String, Integer> inDegree = new HashMap<>();
for (String task : allTasks) {
@@ -355,7 +512,10 @@ public class WmsBatchServiceImpl implements IWmsBatchService {
// 更新入度
for (Map.Entry<String, Set<String>> entry : taskGraph.entrySet()) {
String task = entry.getKey();
inDegree.put(task, entry.getValue().size());
Set<String> dependencies = entry.getValue();
if (dependencies != null) {
inDegree.put(task, dependencies.size());
}
}
// 创建队列将入度为0的任务加入队列
@@ -369,6 +529,9 @@ public class WmsBatchServiceImpl implements IWmsBatchService {
// 存储批次分组方案
List<List<String>> batchGroups = new ArrayList<>();
// 记录已经分配到批次的任务
Set<String> processedTasks = new HashSet<>();
// 记录已经分配到批次的进程ID
Set<String> processedProcessIds = new HashSet<>();
@@ -379,52 +542,67 @@ public class WmsBatchServiceImpl implements IWmsBatchService {
int size = queue.size();
for (int i = 0; i < size; i++) {
currentLevelTasks.add(queue.poll());
String task = queue.poll();
if (task != null) {
currentLevelTasks.add(task);
processedTasks.add(task);
}
}
// 按进程ID分组
// 按进程ID分组,但避免将冲突的进程合并
Map<String, List<String>> processBatches = new HashMap<>();
for (String task : currentLevelTasks) {
String processId = taskToProcess.get(task);
processBatches.computeIfAbsent(processId, k -> new ArrayList<>()).add(task);
if (processId != null) {
processBatches.computeIfAbsent(processId, k -> new ArrayList<>()).add(task);
}
// 更新依赖于当前任务的任务的入度
for (Map.Entry<String, Set<String>> entry : taskGraph.entrySet()) {
String dependentTask = entry.getKey();
Set<String> dependencies = entry.getValue();
if (dependencies.contains(task)) {
inDegree.put(dependentTask, inDegree.get(dependentTask) - 1);
if (dependencies != null && dependencies.contains(task)) {
int newDegree = inDegree.get(dependentTask) - 1;
inDegree.put(dependentTask, newDegree);
// 如果入度为0加入队列
if (inDegree.get(dependentTask) == 0) {
if (newDegree == 0) {
queue.add(dependentTask);
}
}
}
}
// 将每个进程的任务作为一个批次添加
// 将每个进程的任务作为一个批次添加,但避免合并冲突的进程
for (Map.Entry<String, List<String>> entry : processBatches.entrySet()) {
String processId = entry.getKey();
List<String> tasks = entry.getValue();
if (!tasks.isEmpty()) {
// 检查该进程是否已经有批次
boolean merged = false;
// 如果该进程ID已经处理过则不能再合并
if (processedProcessIds.contains(processId)) {
batchGroups.add(tasks);
} else {
// 尝试合并到现有批次
// 尝试合并到现有批次,但避免合并冲突的进程
boolean merged = false;
for (List<String> existingBatch : batchGroups) {
// 获取批次中第一个任务的进程ID
if (!existingBatch.isEmpty()) {
String existingProcessId = taskToProcess.get(existingBatch.get(0));
// 如果进程ID相同则合并
if (processId.equals(existingProcessId)) {
// 检查是否存在进程冲突
boolean hasConflict = false;
if (existingProcessId != null && conflictProcessPairs != null) {
for (Set<String> conflictPair : conflictProcessPairs) {
if (conflictPair.contains(processId) && conflictPair.contains(existingProcessId)) {
hasConflict = true;
break;
}
}
}
// 如果进程ID相同且没有冲突则合并
if (processId.equals(existingProcessId) && !hasConflict) {
existingBatch.addAll(tasks);
merged = true;
break;
@@ -443,10 +621,59 @@ public class WmsBatchServiceImpl implements IWmsBatchService {
}
}
}
// 处理剩余未分配的任务(可能在环中)
Set<String> remainingTasks = new HashSet<>(allTasks);
remainingTasks.removeAll(processedTasks);
if (!remainingTasks.isEmpty()) {
// 对于环中的任务,我们需要打破环
// 策略:将每个未处理的任务单独作为一个批次
Map<String, List<String>> remainingProcessBatches = new HashMap<>();
for (String task : remainingTasks) {
String processId = taskToProcess.get(task);
if (processId != null) {
// 按进程ID分组但不合并冲突的进程
boolean canMerge = true;
// 检查是否与已处理的进程冲突
for (String processedId : processedProcessIds) {
boolean hasConflict = false;
if (conflictProcessPairs != null) {
for (Set<String> conflictPair : conflictProcessPairs) {
if (conflictPair.contains(processId) && conflictPair.contains(processedId)) {
hasConflict = true;
break;
}
}
}
if (hasConflict) {
canMerge = false;
break;
}
}
if (canMerge) {
remainingProcessBatches.computeIfAbsent(processId, k -> new ArrayList<>()).add(task);
} else {
// 如果与已处理的进程冲突,则单独作为一个批次
List<String> singleTaskBatch = new ArrayList<>();
singleTaskBatch.add(task);
batchGroups.add(singleTaskBatch);
}
}
}
// 将剩余的按进程分组的任务添加到批次中
for (List<String> tasks : remainingProcessBatches.values()) {
if (!tasks.isEmpty()) {
batchGroups.add(tasks);
}
}
}
return batchGroups;
}
}
}