Files
klp-oa/klp-wms/src/main/java/com/klp/service/impl/WmsBatchServiceImpl.java

453 lines
16 KiB
Java
Raw Normal View History

package com.klp.service.impl;
import cn.hutool.core.bean.BeanUtil;
import com.klp.common.core.page.TableDataInfo;
import com.klp.common.core.domain.PageQuery;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.klp.common.utils.StringUtils;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import com.klp.domain.bo.WmsBatchBo;
import com.klp.domain.vo.BatchGroupVo;
import com.klp.domain.vo.WmsBatchVo;
import com.klp.domain.WmsBatch;
import com.klp.mapper.WmsBatchMapper;
import com.klp.service.IWmsBatchService;
import java.util.*;
import java.util.stream.Collectors;
/**
* 批次合并相同工艺的任务Service业务层处理
*
* @author klp
* @date 2025-08-14
*/
@RequiredArgsConstructor
@Service
public class WmsBatchServiceImpl implements IWmsBatchService {
private final WmsBatchMapper baseMapper;
// 存储任务执行顺序数组的成员变量
private List<List<Map<String, Object>>> rows;
/**
* 查询批次合并相同工艺的任务
*/
@Override
public WmsBatchVo queryById(Long batchId){
return baseMapper.selectVoById(batchId);
}
/**
* 查询批次合并相同工艺的任务列表
*/
@Override
public TableDataInfo<WmsBatchVo> queryPageList(WmsBatchBo bo, PageQuery pageQuery) {
LambdaQueryWrapper<WmsBatch> lqw = buildQueryWrapper(bo);
Page<WmsBatchVo> result = baseMapper.selectVoPage(pageQuery.build(), lqw);
return TableDataInfo.build(result);
}
/**
* 查询批次合并相同工艺的任务列表
*/
@Override
public List<WmsBatchVo> queryList(WmsBatchBo bo) {
LambdaQueryWrapper<WmsBatch> lqw = buildQueryWrapper(bo);
return baseMapper.selectVoList(lqw);
}
private LambdaQueryWrapper<WmsBatch> buildQueryWrapper(WmsBatchBo bo) {
Map<String, Object> params = bo.getParams();
LambdaQueryWrapper<WmsBatch> lqw = Wrappers.lambdaQuery();
lqw.eq(StringUtils.isNotBlank(bo.getBatchNo()), WmsBatch::getBatchNo, bo.getBatchNo());
lqw.eq(bo.getProcessId() != null, WmsBatch::getProcessId, bo.getProcessId());
lqw.eq(bo.getTotalQuantity() != null, WmsBatch::getTotalQuantity, bo.getTotalQuantity());
lqw.eq(StringUtils.isNotBlank(bo.getMergeSource()), WmsBatch::getMergeSource, bo.getMergeSource());
lqw.eq(bo.getEstimatedStartTime() != null, WmsBatch::getEstimatedStartTime, bo.getEstimatedStartTime());
lqw.eq(bo.getEstimatedEndTime() != null, WmsBatch::getEstimatedEndTime, bo.getEstimatedEndTime());
lqw.eq(StringUtils.isNotBlank(bo.getBatchStatus()), WmsBatch::getBatchStatus, bo.getBatchStatus());
lqw.eq(bo.getPlanId() != null, WmsBatch::getPlanId, bo.getPlanId());
return lqw;
}
/**
* 新增批次合并相同工艺的任务
*/
@Override
public Boolean insertByBo(WmsBatchBo bo) {
WmsBatch add = BeanUtil.toBean(bo, WmsBatch.class);
validEntityBeforeSave(add);
boolean flag = baseMapper.insert(add) > 0;
if (flag) {
bo.setBatchId(add.getBatchId());
}
return flag;
}
/**
* 修改批次合并相同工艺的任务
*/
@Override
public Boolean updateByBo(WmsBatchBo bo) {
WmsBatch update = BeanUtil.toBean(bo, WmsBatch.class);
validEntityBeforeSave(update);
return baseMapper.updateById(update) > 0;
}
/**
* 保存前的数据校验
*/
private void validEntityBeforeSave(WmsBatch entity){
//TODO 做一些数据校验,如唯一约束
}
/**
* 批量删除批次合并相同工艺的任务
*/
@Override
public Boolean deleteWithValidByIds(Collection<Long> ids, Boolean isValid) {
if(isValid){
//TODO 做一些业务上的校验,判断是否需要校验
}
return baseMapper.deleteBatchIds(ids) > 0;
}
/**
* 检测任务执行是否会产生死锁
*
* @param rows 任务执行顺序数组
* @return 是否存在死锁
*/
public boolean checkDeadlock(List<List<Map<String, Object>>> rows) {
// 保存任务执行顺序数组
this.rows = rows;
// 构建任务依赖图
Map<String, Set<String>> taskGraph = buildTaskDependencyGraph(rows);
// 获取所有任务
Set<String> allTasks = new HashSet<>();
for (List<Map<String, Object>> row : rows) {
for (Map<String, Object> task : row) {
allTasks.add(task.get("taskId").toString());
}
}
// 使用DFS检测是否存在环死锁
Map<String, Integer> visited = new HashMap<>();
for (String task : allTasks) {
if (visited.getOrDefault(task, 0) == 0) {
if (hasCycleDFS(task, taskGraph, visited)) {
return true; // 存在死锁
}
}
}
return false; // 不存在死锁
}
/**
* 构建进程依赖图
* 如果进程A的任务必须在进程B的任务之前执行则进程B依赖于进程A
*/
private Map<String, Set<String>> buildProcessDependencyGraph(List<List<Map<String, Object>>> rows) {
Map<String, Set<String>> processGraph = new HashMap<>();
Map<String, Set<String>> taskGraph = buildTaskDependencyGraph(rows);
// 获取任务与进程的映射关系
Map<String, String> taskToProcess = new HashMap<>();
for (List<Map<String, Object>> row : rows) {
for (Map<String, Object> task : row) {
String taskId = task.get("taskId").toString();
String processId = task.get("processId").toString();
taskToProcess.put(taskId, processId);
}
}
// 根据任务依赖关系构建进程依赖关系
for (Map.Entry<String, Set<String>> entry : taskGraph.entrySet()) {
String taskId = entry.getKey();
Set<String> dependencies = entry.getValue();
String processId = taskToProcess.get(taskId);
for (String depTaskId : dependencies) {
String depProcessId = taskToProcess.get(depTaskId);
// 如果依赖的是不同进程的任务,则建立进程间依赖
if (!processId.equals(depProcessId)) {
processGraph.putIfAbsent(processId, new HashSet<>());
processGraph.get(processId).add(depProcessId);
}
}
}
return processGraph;
}
/**
* 生成不会产生死锁的批次分配方案
* 相同processId的任务会合并到一个批次组中
* 不同processId的任务会放在不同的批次组中
*
* @param rows 任务执行顺序数组
* @return 批次分配方案
*/
@Override
public List<BatchGroupVo> generateNonDeadlockBatches(List<List<Map<String, Object>>> rows) {
// 保存任务执行顺序数组
this.rows = rows;
// 获取所有任务
Set<String> allTasks = new HashSet<>();
for (List<Map<String, Object>> row : rows) {
for (Map<String, Object> task : row) {
allTasks.add(task.get("taskId").toString());
}
}
// // 构建进程依赖图
// Map<String, Set<String>> processGraph = buildProcessDependencyGraph(rows);
// 构建任务依赖图
Map<String, Set<String>> taskGraph = buildTaskDependencyGraph(rows);
// 获取任务与进程的映射关系
Map<String, String> taskToProcess = new HashMap<>();
// 获取任务详细信息
Map<String, Map<String, Object>> taskDetails = new HashMap<>();
for (List<Map<String, Object>> row : rows) {
for (Map<String, Object> task : row) {
String taskId = task.get("taskId").toString();
String processId = task.get("processId").toString();
taskToProcess.put(taskId, processId);
taskDetails.put(taskId, task);
}
}
// 检查是否存在死锁
boolean hasDeadlock = checkDeadlock(rows);
if (hasDeadlock) {
// 如果存在死锁,使用拓扑排序找出可行的批次分配方案
List<List<String>> batchGroups = generateBatchGroups(taskGraph, allTasks, taskToProcess);
// 将批次转换为BatchGroupVo格式
List<BatchGroupVo> result = new ArrayList<>();
int groupId = 1;
for (List<String> group : batchGroups) {
if (!group.isEmpty()) {
String processId = taskToProcess.get(group.get(0));
BatchGroupVo batchGroup = new BatchGroupVo();
batchGroup.setGroupId("Group-" + groupId++);
batchGroup.setProcessId(processId);
batchGroup.setTaskIds(group);
result.add(batchGroup);
}
}
return result;
} else {
// 如果不存在死锁按processId合并任务
return mergeTasksByProcessId(allTasks, taskToProcess);
}
}
/**
* 按processId合并任务
*/
private List<BatchGroupVo> mergeTasksByProcessId(Set<String> allTasks, Map<String, String> taskToProcess) {
// 按processId分组任务
Map<String, List<String>> processTasks = new HashMap<>();
for (String taskId : allTasks) {
String processId = taskToProcess.get(taskId);
processTasks.computeIfAbsent(processId, k -> new ArrayList<>()).add(taskId);
}
// 创建批次组
List<BatchGroupVo> result = new ArrayList<>();
int groupId = 1;
for (Map.Entry<String, List<String>> entry : processTasks.entrySet()) {
String processId = entry.getKey();
List<String> tasks = entry.getValue();
BatchGroupVo batchGroup = new BatchGroupVo();
batchGroup.setGroupId("Group-" + groupId++);
batchGroup.setProcessId(processId);
batchGroup.setTaskIds(tasks);
result.add(batchGroup);
}
return result;
}
/**
* 构建任务依赖图
*/
private Map<String, Set<String>> buildTaskDependencyGraph(List<List<Map<String, Object>>> rows) {
Map<String, Set<String>> graph = new HashMap<>();
// 遍历每一行,记录任务间的依赖关系
for (List<Map<String, Object>> row : rows) {
// 按sequare排序
row.sort(Comparator.comparingInt(task -> Integer.parseInt(task.get("sequence").toString())));
// 构建依赖关系
for (int i = 0; i < row.size() - 1; i++) {
String currentTask = row.get(i).get("taskId").toString();
String nextTask = row.get(i + 1).get("taskId").toString();
// 添加依赖nextTask依赖于currentTask
graph.putIfAbsent(nextTask, new HashSet<>());
graph.get(nextTask).add(currentTask);
}
}
return graph;
}
/**
* DFS检测环
*/
private boolean hasCycleDFS(String node, Map<String, Set<String>> graph, Map<String, Integer> visited) {
// 标记当前节点为访问中
visited.put(node, 1);
// 访问所有邻居
if (graph.containsKey(node)) {
for (String neighbor : graph.get(node)) {
// 如果邻居正在被访问,说明存在环
if (visited.getOrDefault(neighbor, 0) == 1) {
return true;
}
// 如果邻居未被访问继续DFS
if (visited.getOrDefault(neighbor, 0) == 0) {
if (hasCycleDFS(neighbor, graph, visited)) {
return true;
}
}
}
}
// 标记当前节点为已访问
visited.put(node, 2);
return false;
}
/**
* 生成批次分组
* 相同processId的任务会合并到一个列表中
* 不同processId的任务会放在不同的列表中
*/
private List<List<String>> generateBatchGroups( Map<String, Set<String>> taskGraph,
Set<String> allTasks, Map<String, String> taskToProcess) {
// 计算每个任务的入度
Map<String, Integer> inDegree = new HashMap<>();
for (String task : allTasks) {
inDegree.put(task, 0);
}
// 更新入度
for (Map.Entry<String, Set<String>> entry : taskGraph.entrySet()) {
String task = entry.getKey();
inDegree.put(task, entry.getValue().size());
}
// 创建队列将入度为0的任务加入队列
Queue<String> queue = new LinkedList<>();
for (Map.Entry<String, Integer> entry : inDegree.entrySet()) {
if (entry.getValue() == 0) {
queue.add(entry.getKey());
}
}
// 存储批次分组方案
List<List<String>> batchGroups = new ArrayList<>();
// 记录已经分配到批次的进程ID
Set<String> processedProcessIds = new HashSet<>();
// 进行拓扑排序
while (!queue.isEmpty()) {
// 当前层级可以并行执行的任务
List<String> currentLevelTasks = new ArrayList<>();
int size = queue.size();
for (int i = 0; i < size; i++) {
currentLevelTasks.add(queue.poll());
}
// 按进程ID分组
Map<String, List<String>> processBatches = new HashMap<>();
for (String task : currentLevelTasks) {
String processId = taskToProcess.get(task);
processBatches.computeIfAbsent(processId, k -> new ArrayList<>()).add(task);
// 更新依赖于当前任务的任务的入度
for (Map.Entry<String, Set<String>> entry : taskGraph.entrySet()) {
String dependentTask = entry.getKey();
Set<String> dependencies = entry.getValue();
if (dependencies.contains(task)) {
inDegree.put(dependentTask, inDegree.get(dependentTask) - 1);
// 如果入度为0加入队列
if (inDegree.get(dependentTask) == 0) {
queue.add(dependentTask);
}
}
}
}
// 将每个进程的任务作为一个批次添加
for (Map.Entry<String, List<String>> entry : processBatches.entrySet()) {
String processId = entry.getKey();
List<String> tasks = entry.getValue();
if (!tasks.isEmpty()) {
// 检查该进程是否已经有批次
boolean merged = false;
// 如果该进程ID已经处理过则不能再合并
if (processedProcessIds.contains(processId)) {
batchGroups.add(tasks);
} else {
// 尝试合并到现有批次
for (List<String> existingBatch : batchGroups) {
// 获取批次中第一个任务的进程ID
if (!existingBatch.isEmpty()) {
String existingProcessId = taskToProcess.get(existingBatch.get(0));
// 如果进程ID相同则合并
if (processId.equals(existingProcessId)) {
existingBatch.addAll(tasks);
merged = true;
break;
}
}
}
// 如果没有合并到现有批次,则创建新批次
if (!merged) {
batchGroups.add(tasks);
}
// 标记该进程ID已处理
processedProcessIds.add(processId);
}
}
}
}
return batchGroups;
}
}