OEE初版,错误问题和交互问题1.29再说
This commit is contained in:
@@ -0,0 +1,30 @@
|
||||
package com.klp.framework.websocket;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.lang.NonNull;
|
||||
import org.springframework.web.socket.config.annotation.EnableWebSocket;
|
||||
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
|
||||
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
|
||||
|
||||
/**
|
||||
* 基于 Spring WebSocket 的 type 分流端点(兼容前端 ws://host/websocket?type=xxx 用法)
|
||||
*
|
||||
* 注意:项目中同时存在 javax.websocket 的 {@link WebSocketServer}(/websocket/message),互不冲突。
|
||||
*/
|
||||
@Configuration
|
||||
@EnableWebSocket
|
||||
@RequiredArgsConstructor
|
||||
public class TypeWebSocketConfig implements WebSocketConfigurer {
|
||||
|
||||
private final TypeWebSocketHandler typeWebSocketHandler;
|
||||
private final TypeWebSocketInterceptor typeWebSocketInterceptor;
|
||||
|
||||
@Override
|
||||
public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
|
||||
registry.addHandler(typeWebSocketHandler, "/websocket")
|
||||
.addInterceptors(typeWebSocketInterceptor)
|
||||
.setAllowedOrigins("*");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,93 @@
|
||||
package com.klp.framework.websocket;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.lang.NonNull;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.socket.CloseStatus;
|
||||
import org.springframework.web.socket.TextMessage;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
import org.springframework.web.socket.handler.TextWebSocketHandler;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* /websocket?type=xxx 连接处理器:按 type 维护会话集合。
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class TypeWebSocketHandler extends TextWebSocketHandler {
|
||||
|
||||
/**
|
||||
* type -> (sessionId -> session)
|
||||
*/
|
||||
private final Map<String, Map<String, WebSocketSession>> clients = new ConcurrentHashMap<>();
|
||||
|
||||
public Map<String, Map<String, WebSocketSession>> getClients() {
|
||||
return clients;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterConnectionEstablished(@NonNull WebSocketSession session) {
|
||||
String sid = session.getId();
|
||||
String type = (String) session.getAttributes().get(TypeWebSocketInterceptor.ATTR_TYPE);
|
||||
if (type == null || type.trim().isEmpty()) {
|
||||
type = "DEFAULT";
|
||||
}
|
||||
clients.computeIfAbsent(type, k -> new ConcurrentHashMap<>()).put(sid, session);
|
||||
log.info("[websocket]建立连接: {}-{}", type, sid);
|
||||
try {
|
||||
session.sendMessage(new TextMessage("{\"event\":\"connected\",\"type\":\"" + type + "\"}"));
|
||||
} catch (IOException ignore) {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handleTextMessage(@NonNull WebSocketSession session, @NonNull TextMessage message) throws Exception {
|
||||
String payload = message.getPayload();
|
||||
// 兼容前端心跳
|
||||
if (Objects.equals(payload, "ping")) {
|
||||
session.sendMessage(new TextMessage("pong"));
|
||||
return;
|
||||
}
|
||||
// 默认不做业务处理(目前主要用于服务端推送)
|
||||
log.debug("[websocket]收到客户端消息: {} -> {}", session.getId(), payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) throws Exception {
|
||||
log.warn("[websocket]连接异常: {}", session.getId(), exception);
|
||||
onClose(session);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus status) {
|
||||
onClose(session);
|
||||
}
|
||||
|
||||
private void onClose(WebSocketSession session) {
|
||||
try {
|
||||
String sid = session.getId();
|
||||
String type = (String) session.getAttributes().get(TypeWebSocketInterceptor.ATTR_TYPE);
|
||||
if (type == null || type.trim().isEmpty()) {
|
||||
type = "DEFAULT";
|
||||
}
|
||||
Map<String, WebSocketSession> map = clients.get(type);
|
||||
if (map != null) {
|
||||
map.remove(sid);
|
||||
}
|
||||
try {
|
||||
if (session.isOpen()) {
|
||||
session.close();
|
||||
}
|
||||
} catch (Exception ignore) {
|
||||
}
|
||||
log.info("[websocket]连接关闭: {}-{}", type, sid);
|
||||
} catch (Exception e) {
|
||||
log.warn("[websocket]连接关闭处理异常", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,47 @@
|
||||
package com.klp.framework.websocket;
|
||||
|
||||
import org.springframework.http.server.ServerHttpRequest;
|
||||
import org.springframework.http.server.ServerHttpResponse;
|
||||
import org.springframework.lang.NonNull;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.socket.WebSocketHandler;
|
||||
import org.springframework.web.socket.server.HandshakeInterceptor;
|
||||
import org.springframework.web.util.UriComponentsBuilder;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 从 ws://host/websocket?type=xxx 提取 type,写入 session attributes。
|
||||
*/
|
||||
@Component
|
||||
public class TypeWebSocketInterceptor implements HandshakeInterceptor {
|
||||
|
||||
public static final String ATTR_TYPE = "type";
|
||||
|
||||
@Override
|
||||
public boolean beforeHandshake(@NonNull ServerHttpRequest request,
|
||||
@NonNull ServerHttpResponse response,
|
||||
@NonNull WebSocketHandler wsHandler,
|
||||
@NonNull Map<String, Object> attributes) {
|
||||
try {
|
||||
URI uri = request.getURI();
|
||||
String type = UriComponentsBuilder.fromUri(uri).build().getQueryParams().getFirst("type");
|
||||
if (type != null && !type.trim().isEmpty()) {
|
||||
attributes.put(ATTR_TYPE, type.trim());
|
||||
}
|
||||
} catch (Exception ignore) {
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterHandshake(@NonNull ServerHttpRequest request,
|
||||
@NonNull ServerHttpResponse response,
|
||||
@NonNull WebSocketHandler wsHandler,
|
||||
@Nullable Exception exception) {
|
||||
// no-op
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
package com.klp.framework.websocket;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.socket.TextMessage;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* type 分流 websocket 推送工具:向订阅了指定 type 的客户端广播消息。
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class TypeWebSocketUtil {
|
||||
|
||||
private static TypeWebSocketHandler handler;
|
||||
|
||||
public TypeWebSocketUtil(TypeWebSocketHandler handler) {
|
||||
TypeWebSocketUtil.handler = handler;
|
||||
}
|
||||
|
||||
public static void sendToType(String type, String text) {
|
||||
if (handler == null || type == null) {
|
||||
return;
|
||||
}
|
||||
Map<String, WebSocketSession> sessions = handler.getClients().get(type);
|
||||
if (sessions == null || sessions.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
TextMessage msg = new TextMessage(text == null ? "" : text);
|
||||
sessions.values().forEach(s -> {
|
||||
try {
|
||||
if (s != null && s.isOpen()) {
|
||||
s.sendMessage(msg);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("[websocket]推送消息失败 type={}, sid={}", type, s == null ? null : s.getId(), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user