diff --git a/klp-wms/src/main/java/com/klp/controller/WmsBatchController.java b/klp-wms/src/main/java/com/klp/controller/WmsBatchController.java index 8b6cfc36..0a002b97 100644 --- a/klp-wms/src/main/java/com/klp/controller/WmsBatchController.java +++ b/klp-wms/src/main/java/com/klp/controller/WmsBatchController.java @@ -5,6 +5,7 @@ import java.util.List; import java.util.Arrays; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import com.klp.domain.vo.BatchGroupVo; import lombok.RequiredArgsConstructor; @@ -107,39 +108,49 @@ public class WmsBatchController extends BaseController { * 生成不会产生死锁的批次分配方案 * 相同processId的任务会合并到一个批次组中 * 不同processId的任务会放在不同的批次组中 + * 对于存在死锁的情况,会避免将冲突的进程合并到一起 * * @param rows 任务执行顺序数组 * @return 批次分配方案 */ @PostMapping("/generate") public R> generateNonDeadlockBatches(@RequestBody List>> rows) { - // 先获取原始的批次分配方案 - List originalBatches = iWmsBatchService.generateNonDeadlockBatches(rows); + // 检查是否存在死锁 + boolean hasDeadlock = iWmsBatchService.checkDeadlock(rows); + + // 获取批次分配方案 + List batches = iWmsBatchService.generateNonDeadlockBatches(rows); + + // 如果不存在死锁,才进行进一步合并 + if (!hasDeadlock) { + // 使用Java 8 Stream API按processId分组并合并任务 + Map> groupedByProcessId = batches.stream() + .collect(Collectors.groupingBy(BatchGroupVo::getProcessId)); - // 使用Java 8 Stream API按processId分组并合并任务 - Map> groupedByProcessId = originalBatches.stream() - .collect(java.util.stream.Collectors.groupingBy(BatchGroupVo::getProcessId)); + // 合并相同processId的批次组 + List mergedBatches = new ArrayList<>(); + AtomicInteger groupCounter = new AtomicInteger(1); - // 合并相同processId的批次组 - List mergedBatches = new ArrayList<>(); - AtomicInteger groupCounter = new AtomicInteger(1); + groupedByProcessId.forEach((processId, groups) -> { + // 创建一个新的合并后的批次组 + BatchGroupVo mergedGroup = new BatchGroupVo(); + mergedGroup.setGroupId("Merged-Group-" + groupCounter.getAndIncrement()); + mergedGroup.setProcessId(processId); - groupedByProcessId.forEach((processId, groups) -> { - // 创建一个新的合并后的批次组 - BatchGroupVo mergedGroup = new BatchGroupVo(); - mergedGroup.setGroupId("Merged-Group-" + groupCounter.getAndIncrement()); - mergedGroup.setProcessId(processId); + // 合并所有taskIds + List allTaskIds = groups.stream() + .flatMap(group -> group.getTaskIds().stream()) + .collect(Collectors.toList()); + mergedGroup.setTaskIds(allTaskIds); - // 合并所有taskIds - List allTaskIds = groups.stream() - .flatMap(group -> group.getTaskIds().stream()) - .collect(java.util.stream.Collectors.toList()); - mergedGroup.setTaskIds(allTaskIds); - - mergedBatches.add(mergedGroup); - }); - - return R.ok(mergedBatches); + mergedBatches.add(mergedGroup); + }); + + return R.ok(mergedBatches); + } else { + // 如果存在死锁,直接返回Service层的结果,不进行额外合并 + return R.ok(batches); + } } diff --git a/klp-wms/src/main/java/com/klp/service/IWmsBatchService.java b/klp-wms/src/main/java/com/klp/service/IWmsBatchService.java index 919fd0ce..fb1ea7ca 100644 --- a/klp-wms/src/main/java/com/klp/service/IWmsBatchService.java +++ b/klp-wms/src/main/java/com/klp/service/IWmsBatchService.java @@ -49,6 +49,14 @@ public interface IWmsBatchService { Boolean deleteWithValidByIds(Collection ids, Boolean isValid); + /** + * 检测任务执行是否会产生死锁 + * + * @param rows 任务执行顺序数组 + * @return 是否存在死锁 + */ + boolean checkDeadlock(List>> rows); + /** * 生成不会产生死锁的批次分配方案 * diff --git a/klp-wms/src/main/java/com/klp/service/impl/WmsBatchServiceImpl.java b/klp-wms/src/main/java/com/klp/service/impl/WmsBatchServiceImpl.java index 073ba8c8..794264de 100644 --- a/klp-wms/src/main/java/com/klp/service/impl/WmsBatchServiceImpl.java +++ b/klp-wms/src/main/java/com/klp/service/impl/WmsBatchServiceImpl.java @@ -116,12 +116,37 @@ public class WmsBatchServiceImpl implements IWmsBatchService { return baseMapper.deleteBatchIds(ids) > 0; } + /** + * 构建任务依赖图 + */ + private Map> buildTaskDependencyGraph(List>> rows) { + Map> graph = new HashMap<>(); + + // 遍历每一行,记录任务间的依赖关系 + for (List> row : rows) { + // 按sequence排序 + row.sort(Comparator.comparingInt(task -> Integer.parseInt(task.get("sequence").toString()))); + + // 构建依赖关系 + for (int i = 0; i < row.size() - 1; i++) { + String currentTask = row.get(i).get("taskId").toString(); + String nextTask = row.get(i + 1).get("taskId").toString(); + + // 添加依赖:nextTask依赖于currentTask + graph.computeIfAbsent(nextTask, k -> new HashSet<>()).add(currentTask); + } + } + + return graph; + } + /** * 检测任务执行是否会产生死锁 * * @param rows 任务执行顺序数组 * @return 是否存在死锁 */ + @Override public boolean checkDeadlock(List>> rows) { // 保存任务执行顺序数组 this.rows = rows; @@ -139,15 +164,86 @@ public class WmsBatchServiceImpl implements IWmsBatchService { // 使用DFS检测是否存在环(死锁) Map visited = new HashMap<>(); + Set currentPath = new HashSet<>(); + List> cycles = new ArrayList<>(); + for (String task : allTasks) { if (visited.getOrDefault(task, 0) == 0) { - if (hasCycleDFS(task, taskGraph, visited)) { + List path = new ArrayList<>(); + if (detectCycleDFS(task, taskGraph, visited, currentPath, path, cycles)) { return true; // 存在死锁 } } } - return false; // 不存在死锁 + // 检查进程冲突 + Map taskToProcess = new HashMap<>(); + for (List> row : rows) { + for (Map task : row) { + String taskId = task.get("taskId").toString(); + String processId = task.get("processId").toString(); + taskToProcess.put(taskId, processId); + } + } + + Map> processGraph = buildProcessDependencyGraph(rows); + Set> conflictProcessPairs = findConflictProcessPairs(processGraph); + + return !conflictProcessPairs.isEmpty(); // 如果存在进程冲突,也视为死锁 + } + + /** + * 增强版DFS检测环,并收集环中的节点 + */ + private boolean detectCycleDFS(String node, Map> graph, + Map visited, Set currentPath, + List path, List> cycles) { + if (node == null) { + return false; + } + + // 标记当前节点为访问中 + visited.put(node, 1); + currentPath.add(node); + path.add(node); + + // 访问所有邻居 + if (graph != null && graph.containsKey(node)) { + Set neighbors = graph.get(node); + if (neighbors != null) { + for (String neighbor : neighbors) { + if (neighbor == null) { + continue; + } + + // 如果邻居在当前路径中,说明存在环 + if (currentPath.contains(neighbor)) { + // 找到环的起始位置 + int startIdx = path.indexOf(neighbor); + if (startIdx >= 0) { + List cycle = new ArrayList<>(path.subList(startIdx, path.size())); + cycles.add(cycle); + return true; + } + } + + // 如果邻居未被访问,继续DFS + if (visited.getOrDefault(neighbor, 0) == 0) { + if (detectCycleDFS(neighbor, graph, visited, currentPath, path, cycles)) { + return true; + } + } + } + } + } + + // 回溯,移除当前节点 + currentPath.remove(node); + path.remove(path.size() - 1); + + // 标记当前节点为已访问 + visited.put(node, 2); + return false; } /** @@ -172,22 +268,126 @@ public class WmsBatchServiceImpl implements IWmsBatchService { for (Map.Entry> entry : taskGraph.entrySet()) { String taskId = entry.getKey(); Set dependencies = entry.getValue(); + if (dependencies == null) { + continue; + } String processId = taskToProcess.get(taskId); + if (processId == null) { + continue; + } for (String depTaskId : dependencies) { String depProcessId = taskToProcess.get(depTaskId); + if (depProcessId == null) { + continue; + } // 如果依赖的是不同进程的任务,则建立进程间依赖 if (!processId.equals(depProcessId)) { - processGraph.putIfAbsent(processId, new HashSet<>()); - processGraph.get(processId).add(depProcessId); + processGraph.computeIfAbsent(processId, k -> new HashSet<>()).add(depProcessId); } } } return processGraph; } + + /** + * 检查两个进程之间是否存在冲突 + * 如果进程A的任务必须在进程B的任务之前执行,同时进程B的任务也必须在进程A的任务之前执行,则存在冲突 + */ + private boolean checkProcessConflict(String processA, String processB, Map> processGraph) { + if (processA == null || processB == null || processGraph == null) { + return false; + } + boolean aDepB = isProcessDependent(processA, processB, processGraph, new HashSet<>()); + boolean bDepA = isProcessDependent(processB, processA, processGraph, new HashSet<>()); + + return aDepB && bDepA; + } + + /** + * 检查进程A是否依赖于进程B + */ + private boolean isProcessDependent(String processA, String processB, + Map> processGraph, Set visited) { + if (processA == null || processB == null || processGraph == null || visited == null) { + return false; + } + + if (processA.equals(processB)) { + return true; + } + + if (visited.contains(processA)) { + return false; + } + + visited.add(processA); + + if (processGraph.containsKey(processA)) { + Set dependencies = processGraph.get(processA); + if (dependencies != null) { + for (String dep : dependencies) { + if (dep != null && (dep.equals(processB) || isProcessDependent(dep, processB, processGraph, visited))) { + return true; + } + } + } + } + + return false; + } + + /** + * 找出所有冲突的进程对 + */ + private Set> findConflictProcessPairs(Map> processGraph) { + Set> conflictPairs = new HashSet<>(); + Set processes = new HashSet<>(); + + if (processGraph == null) { + return conflictPairs; + } + + // 收集所有进程 + for (String process : processGraph.keySet()) { + if (process != null) { + processes.add(process); + } + } + + for (Map.Entry> entry : processGraph.entrySet()) { + Set deps = entry.getValue(); + if (deps != null) { + for (String dep : deps) { + if (dep != null) { + processes.add(dep); + } + } + } + } + + // 检查每对进程是否存在冲突 + List processList = new ArrayList<>(processes); + for (int i = 0; i < processList.size(); i++) { + for (int j = i + 1; j < processList.size(); j++) { + String processA = processList.get(i); + String processB = processList.get(j); + + if (processA != null && processB != null && + checkProcessConflict(processA, processB, processGraph)) { + Set pair = new HashSet<>(); + pair.add(processA); + pair.add(processB); + conflictPairs.add(pair); + } + } + } + + return conflictPairs; + } /** * 生成不会产生死锁的批次分配方案 @@ -210,11 +410,11 @@ public class WmsBatchServiceImpl implements IWmsBatchService { } } -// // 构建进程依赖图 -// Map> processGraph = buildProcessDependencyGraph(rows); - // 构建任务依赖图 Map> taskGraph = buildTaskDependencyGraph(rows); + + // 构建进程依赖图 + Map> processGraph = buildProcessDependencyGraph(rows); // 获取任务与进程的映射关系 Map taskToProcess = new HashMap<>(); @@ -231,9 +431,14 @@ public class WmsBatchServiceImpl implements IWmsBatchService { // 检查是否存在死锁 boolean hasDeadlock = checkDeadlock(rows); - if (hasDeadlock) { - // 如果存在死锁,使用拓扑排序找出可行的批次分配方案 - List> batchGroups = generateBatchGroups(taskGraph, allTasks, taskToProcess); + + // 找出所有冲突的进程对 + Set> conflictProcessPairs = findConflictProcessPairs(processGraph); + + if (hasDeadlock || !conflictProcessPairs.isEmpty()) { + // 如果存在死锁或进程冲突,使用改进的算法找出可行的批次分配方案 + List> batchGroups = generateBatchGroupsWithCycleHandling( + taskGraph, allTasks, taskToProcess, conflictProcessPairs); // 将批次转换为BatchGroupVo格式 List result = new ArrayList<>(); @@ -264,7 +469,9 @@ public class WmsBatchServiceImpl implements IWmsBatchService { Map> processTasks = new HashMap<>(); for (String taskId : allTasks) { String processId = taskToProcess.get(taskId); - processTasks.computeIfAbsent(processId, k -> new ArrayList<>()).add(taskId); + if (processId != null) { + processTasks.computeIfAbsent(processId, k -> new ArrayList<>()).add(taskId); + } } // 创建批次组 @@ -274,78 +481,28 @@ public class WmsBatchServiceImpl implements IWmsBatchService { String processId = entry.getKey(); List tasks = entry.getValue(); - BatchGroupVo batchGroup = new BatchGroupVo(); - batchGroup.setGroupId("Group-" + groupId++); - batchGroup.setProcessId(processId); - batchGroup.setTaskIds(tasks); - result.add(batchGroup); + if (!tasks.isEmpty()) { + BatchGroupVo batchGroup = new BatchGroupVo(); + batchGroup.setGroupId("Group-" + groupId++); + batchGroup.setProcessId(processId); + batchGroup.setTaskIds(tasks); + result.add(batchGroup); + } } return result; } /** - * 构建任务依赖图 - */ - private Map> buildTaskDependencyGraph(List>> rows) { - Map> graph = new HashMap<>(); - - // 遍历每一行,记录任务间的依赖关系 - for (List> row : rows) { - // 按sequare排序 - row.sort(Comparator.comparingInt(task -> Integer.parseInt(task.get("sequence").toString()))); - - // 构建依赖关系 - for (int i = 0; i < row.size() - 1; i++) { - String currentTask = row.get(i).get("taskId").toString(); - String nextTask = row.get(i + 1).get("taskId").toString(); - - // 添加依赖:nextTask依赖于currentTask - graph.putIfAbsent(nextTask, new HashSet<>()); - graph.get(nextTask).add(currentTask); - } - } - - return graph; - } - - - /** - * DFS检测环 - */ - private boolean hasCycleDFS(String node, Map> graph, Map visited) { - // 标记当前节点为访问中 - visited.put(node, 1); - - // 访问所有邻居 - if (graph.containsKey(node)) { - for (String neighbor : graph.get(node)) { - // 如果邻居正在被访问,说明存在环 - if (visited.getOrDefault(neighbor, 0) == 1) { - return true; - } - - // 如果邻居未被访问,继续DFS - if (visited.getOrDefault(neighbor, 0) == 0) { - if (hasCycleDFS(neighbor, graph, visited)) { - return true; - } - } - } - } - - // 标记当前节点为已访问 - visited.put(node, 2); - return false; - } - - /** - * 生成批次分组 + * 生成批次分组,处理环结构 * 相同processId的任务会合并到一个列表中 * 不同processId的任务会放在不同的列表中 + * 处理环结构,确保所有任务都被分配 */ - private List> generateBatchGroups( Map> taskGraph, - Set allTasks, Map taskToProcess) { + private List> generateBatchGroupsWithCycleHandling( + Map> taskGraph, Set allTasks, + Map taskToProcess, Set> conflictProcessPairs) { + // 计算每个任务的入度 Map inDegree = new HashMap<>(); for (String task : allTasks) { @@ -355,7 +512,10 @@ public class WmsBatchServiceImpl implements IWmsBatchService { // 更新入度 for (Map.Entry> entry : taskGraph.entrySet()) { String task = entry.getKey(); - inDegree.put(task, entry.getValue().size()); + Set dependencies = entry.getValue(); + if (dependencies != null) { + inDegree.put(task, dependencies.size()); + } } // 创建队列,将入度为0的任务加入队列 @@ -369,6 +529,9 @@ public class WmsBatchServiceImpl implements IWmsBatchService { // 存储批次分组方案 List> batchGroups = new ArrayList<>(); + // 记录已经分配到批次的任务 + Set processedTasks = new HashSet<>(); + // 记录已经分配到批次的进程ID Set processedProcessIds = new HashSet<>(); @@ -379,52 +542,67 @@ public class WmsBatchServiceImpl implements IWmsBatchService { int size = queue.size(); for (int i = 0; i < size; i++) { - currentLevelTasks.add(queue.poll()); + String task = queue.poll(); + if (task != null) { + currentLevelTasks.add(task); + processedTasks.add(task); + } } - // 按进程ID分组 + // 按进程ID分组,但避免将冲突的进程合并 Map> processBatches = new HashMap<>(); for (String task : currentLevelTasks) { String processId = taskToProcess.get(task); - processBatches.computeIfAbsent(processId, k -> new ArrayList<>()).add(task); + if (processId != null) { + processBatches.computeIfAbsent(processId, k -> new ArrayList<>()).add(task); + } // 更新依赖于当前任务的任务的入度 for (Map.Entry> entry : taskGraph.entrySet()) { String dependentTask = entry.getKey(); Set dependencies = entry.getValue(); - if (dependencies.contains(task)) { - inDegree.put(dependentTask, inDegree.get(dependentTask) - 1); + if (dependencies != null && dependencies.contains(task)) { + int newDegree = inDegree.get(dependentTask) - 1; + inDegree.put(dependentTask, newDegree); // 如果入度为0,加入队列 - if (inDegree.get(dependentTask) == 0) { + if (newDegree == 0) { queue.add(dependentTask); } } } } - // 将每个进程的任务作为一个批次添加 + // 将每个进程的任务作为一个批次添加,但避免合并冲突的进程 for (Map.Entry> entry : processBatches.entrySet()) { String processId = entry.getKey(); List tasks = entry.getValue(); if (!tasks.isEmpty()) { - // 检查该进程是否已经有批次 - boolean merged = false; - // 如果该进程ID已经处理过,则不能再合并 if (processedProcessIds.contains(processId)) { batchGroups.add(tasks); } else { - // 尝试合并到现有批次 + // 尝试合并到现有批次,但避免合并冲突的进程 + boolean merged = false; for (List existingBatch : batchGroups) { - // 获取批次中第一个任务的进程ID if (!existingBatch.isEmpty()) { String existingProcessId = taskToProcess.get(existingBatch.get(0)); - - // 如果进程ID相同,则合并 - if (processId.equals(existingProcessId)) { + + // 检查是否存在进程冲突 + boolean hasConflict = false; + if (existingProcessId != null && conflictProcessPairs != null) { + for (Set conflictPair : conflictProcessPairs) { + if (conflictPair.contains(processId) && conflictPair.contains(existingProcessId)) { + hasConflict = true; + break; + } + } + } + + // 如果进程ID相同且没有冲突,则合并 + if (processId.equals(existingProcessId) && !hasConflict) { existingBatch.addAll(tasks); merged = true; break; @@ -443,10 +621,59 @@ public class WmsBatchServiceImpl implements IWmsBatchService { } } } + + // 处理剩余未分配的任务(可能在环中) + Set remainingTasks = new HashSet<>(allTasks); + remainingTasks.removeAll(processedTasks); + + if (!remainingTasks.isEmpty()) { + // 对于环中的任务,我们需要打破环 + // 策略:将每个未处理的任务单独作为一个批次 + Map> remainingProcessBatches = new HashMap<>(); + + for (String task : remainingTasks) { + String processId = taskToProcess.get(task); + if (processId != null) { + // 按进程ID分组,但不合并冲突的进程 + boolean canMerge = true; + + // 检查是否与已处理的进程冲突 + for (String processedId : processedProcessIds) { + boolean hasConflict = false; + if (conflictProcessPairs != null) { + for (Set conflictPair : conflictProcessPairs) { + if (conflictPair.contains(processId) && conflictPair.contains(processedId)) { + hasConflict = true; + break; + } + } + } + + if (hasConflict) { + canMerge = false; + break; + } + } + + if (canMerge) { + remainingProcessBatches.computeIfAbsent(processId, k -> new ArrayList<>()).add(task); + } else { + // 如果与已处理的进程冲突,则单独作为一个批次 + List singleTaskBatch = new ArrayList<>(); + singleTaskBatch.add(task); + batchGroups.add(singleTaskBatch); + } + } + } + + // 将剩余的按进程分组的任务添加到批次中 + for (List tasks : remainingProcessBatches.values()) { + if (!tasks.isEmpty()) { + batchGroups.add(tasks); + } + } + } return batchGroups; } - - - -} +} \ No newline at end of file