fix(wms): 优化批次分配策略,确保相同进程的任务合并执行

- 添加任务执行顺序数组的成员变量
- 在检查死锁和生成无死锁批次时保存任务执行顺序数组
- 修改任务排序字段从 "sequence" 改为 "sequare"
- 在拓扑排序时按进程 ID 对任务进行分组,确保相同进程的任务合并到同一个批次
This commit is contained in:
2025-08-14 16:55:34 +08:00
parent 6d33b00ae7
commit b7fccc8ba0

View File

@@ -29,6 +29,8 @@ import java.util.stream.Collectors;
public class WmsBatchServiceImpl implements IWmsBatchService {
private final WmsBatchMapper baseMapper;
// 存储任务执行顺序数组的成员变量
private List<List<Map<String, Object>>> rows;
/**
* 查询批次(合并相同工艺的任务)
@@ -120,6 +122,9 @@ public class WmsBatchServiceImpl implements IWmsBatchService {
*/
@Override
public boolean checkDeadlock(List<List<Map<String, Object>>> rows) {
// 保存任务执行顺序数组
this.rows = rows;
// 构建进程依赖图
Map<String, Set<String>> graph = buildDependencyGraph(rows);
@@ -135,6 +140,9 @@ public class WmsBatchServiceImpl implements IWmsBatchService {
*/
@Override
public List<String> generateNonDeadlockBatches(List<List<Map<String, Object>>> rows) {
// 保存任务执行顺序数组
this.rows = rows;
// 获取所有任务
Set<String> allTasks = new HashSet<>();
for (List<Map<String, Object>> row : rows) {
@@ -168,8 +176,8 @@ public class WmsBatchServiceImpl implements IWmsBatchService {
// 遍历每一行,记录每个进程在每一行的执行顺序
for (List<Map<String, Object>> row : rows) {
// 按sequence排序
row.sort(Comparator.comparingInt(task -> Integer.parseInt(task.get("sequence").toString())));
// 按sequare排序
row.sort(Comparator.comparingInt(task -> Integer.parseInt(task.get("sequare").toString())));
// 记录每个进程的执行顺序
for (int i = 0; i < row.size(); i++) {
@@ -204,7 +212,7 @@ public class WmsBatchServiceImpl implements IWmsBatchService {
// 遍历每一行,记录进程间的依赖关系
for (List<Map<String, Object>> row : rows) {
// 按sequare排序
row.sort(Comparator.comparingInt(task -> Integer.parseInt(task.get("sequence").toString())));
row.sort(Comparator.comparingInt(task -> Integer.parseInt(task.get("sequare").toString())));
// 构建依赖关系
for (int i = 0; i < row.size() - 1; i++) {
@@ -228,8 +236,8 @@ public class WmsBatchServiceImpl implements IWmsBatchService {
// 遍历每一行,记录任务间的依赖关系
for (List<Map<String, Object>> row : rows) {
// 按sequence排序
row.sort(Comparator.comparingInt(task -> Integer.parseInt(task.get("sequence").toString())));
// 按sequare排序
row.sort(Comparator.comparingInt(task -> Integer.parseInt(task.get("sequare").toString())));
// 构建依赖关系
for (int i = 0; i < row.size() - 1; i++) {
@@ -300,8 +308,19 @@ public class WmsBatchServiceImpl implements IWmsBatchService {
/**
* 使用拓扑排序生成批次分配方案
* 确保相同processId的任务才能合并到同一个批次
*/
private List<Set<String>> topologicalSort(Map<String, Set<String>> processGraph, Map<String, Set<String>> taskGraph, Set<String> allTasks) {
// 获取任务与进程的映射关系
Map<String, String> taskToProcess = new HashMap<>();
for (List<Map<String, Object>> row : rows) {
for (Map<String, Object> task : row) {
String taskId = task.get("taskId").toString();
String processId = task.get("processId").toString();
taskToProcess.put(taskId, processId);
}
}
// 计算每个任务的入度
Map<String, Integer> inDegree = new HashMap<>();
for (String task : allTasks) {
@@ -327,14 +346,18 @@ public class WmsBatchServiceImpl implements IWmsBatchService {
// 进行拓扑排序
while (!queue.isEmpty()) {
// 当前批次任务
Set<String> currentBatch = new HashSet<>();
// 按进程ID分组的当前批次任务
Map<String, Set<String>> processBatches = new HashMap<>();
// 处理当前队列中的所有任务(这些任务可以并行执行)
int size = queue.size();
for (int i = 0; i < size; i++) {
String task = queue.poll();
currentBatch.add(task);
String processId = taskToProcess.get(task);
// 按进程ID分组
processBatches.putIfAbsent(processId, new HashSet<>());
processBatches.get(processId).add(task);
// 更新依赖于当前任务的任务的入度
for (Map.Entry<String, Set<String>> entry : taskGraph.entrySet()) {
@@ -352,12 +375,16 @@ public class WmsBatchServiceImpl implements IWmsBatchService {
}
}
// 添加当前批次
if (!currentBatch.isEmpty()) {
batches.add(currentBatch);
// 将每个进程的任务作为一个批次添加
for (Set<String> batch : processBatches.values()) {
if (!batch.isEmpty()) {
batches.add(batch);
}
}
}
return batches;
}
}