数据管理网站模板,登录网站模板,长春市建设厅网站,Wordpress税表场景介绍
最近开发一个系统#xff0c;其中一个模块需要展示实时的执行过程#xff0c;过程日志可能比较多。以前的方案都是前端定时轮询#xff0c;比如每秒查一次后端接口#xff0c;将拉取回来的日志重新展示。轮询方案简单容易实现#xff0c;但是比较消耗资源#…场景介绍
最近开发一个系统其中一个模块需要展示实时的执行过程过程日志可能比较多。以前的方案都是前端定时轮询比如每秒查一次后端接口将拉取回来的日志重新展示。轮询方案简单容易实现但是比较消耗资源后端没有数据的时候会造成大量的无用轮询。所以这次我们采用长连接的方案优化这块的逻辑提升用户体验。
WebSocket介绍 参考https://liaoxuefeng.com/books/java/spring/web/websocket/ WebSocket 是一种基于 HTTP 的长连接技术。传统的 HTTP 协议采用请求-响应模型浏览器不发送请求时服务器无法主动推送数据给浏览器。因此当需要定期或不定期向浏览器推送数据例如股票行情或在线聊天时传统的 HTTP 协议只能通过浏览器的 JavaScript 定时轮询来实现。这种方法效率低下且实时性不高。
由于 HTTP 本身基于 TCP 连接WebSocket 在 HTTP 协议的基础上进行了简单的升级。建立 TCP 连接后浏览器在发送请求时附带以下头部信息
GET /chat HTTP/1.1
Host: www.example.com
Upgrade: websocket
Connection: Upgrade这表示客户端希望升级为长连接的 WebSocket。服务器返回升级成功的响应
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade收到成功响应后WebSocket 握手即告完成。这意味着代表 WebSocket 的 TCP 连接将不会被服务器关闭而是保持开放状态服务器和浏览器可以随时互相推送消息。这些消息既可以是文本也可以是二进制数据。通常大多数应用程序会发送基于 JSON 的文本消息。
现代浏览器均已支持 WebSocket 协议服务器端则需要底层框架的支持。Java 的 Servlet 规范从 3.1 开始支持 WebSocket因此必须选择支持 Servlet 3.1 或更高版本的容器才能使用 WebSocket。最新版本的 Tomcat、Jetty 等开源服务器均已支持 WebSocket。 实践演示
Java后端
我们以实际代码来演示如何在Springboot项目中实现对Websocket的支持。
step1: 添加websocket依赖 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-websocket/artifactId/dependencystep2: 增加配置
这个配置的主要作用是自动启动使用了注解ServerEndpoint的类
Configuration
EnableWebSocket
public class WebSocketConfiguration {Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}step3 创建一个ws endpoint
ServerEndpoint(value ChaosConst.CHAOS_WS_API /execute/log/{bizType}/{bizId})
Component
Slf4j
public class LogWsEndpoint implements ConsumerChaosLogEvent {// 对话的标识private String bizKey;// 存储每个会话private static final ConcurrentHashMapString, ListLogWsEndpoint endpointMap new ConcurrentHashMap();// 将会话放入到线程池中异步将数据返回给前端private static ThreadPoolExecutor wsThreadPoolExecutor;// 核心逻辑处理器private ChaosLogEventHandler handler;// 业务写和读logprivate static ChaosLogger chaosLogger;AutowiredQualifier(wsThreadPoolExecutor)public void setWsThreadPoolExecutor(ThreadPoolExecutor wsThreadPoolExecutor) {if (null ! wsThreadPoolExecutor) {LogWsEndpoint.wsThreadPoolExecutor wsThreadPoolExecutor;}}Autowiredpublic void setChaosLogger(ChaosLogger chaosLogger) {if (null ! chaosLogger) {LogWsEndpoint.chaosLogger chaosLogger;}}OnOpenpublic void onOpen(Session session, PathParam(bizType) String bizType, PathParam(bizId) String bizId) {this.bizKey String.format(%s-%s, bizType, bizId);log.info([ws-chaos-log]连接建立中 bizKey : {}, bizKey);this.handler new ChaosLogEventHandler(chaosLogger, session);wsThreadPoolExecutor.submit(() - flushMessage(bizType, bizId, true));endpointMap.compute(bizKey, (key, value) - {ListLogWsEndpoint ends null value ? new ArrayList() : value;ends.add(this);return ends;});log.info([ws-chaos-log]连接建立成功 sessionId:{}, bizKey : {},session.getId(), bizKey);}public void flushMessage(String bizType, String bizId, boolean force) {this.handler.flushMessage(bizType, bizId, force);}OnClosepublic void onClose() {log.info(websocket log server close);if (StringUtils.isBlank(bizKey)) {return;}endpointMap.compute(bizKey, (key, endpoints) - {if (null ! endpoints) {endpoints.remove(this);}return endpoints;});log.info([ws-chaos-log]连接关闭成功关闭该连接信息sessionId : {} bizKey : {}。, handler.getSession().getId(), bizKey);}OnMessagepublic void onMessage(String message, Session session) throws IOException {log.info([ws-chaos-log]服务端收到客户端消息 sessionId : {}, bizKey : {}, message : {}, handler.getSession().getId(), bizKey, message);}OnErrorpublic void onError(Session session, Throwable error) {log.error([ws-chaos-log]WebSocket发生错误sessionId : {}, bizKey : {}, handler.getSession().getId(), bizKey);}Overridepublic void accept(ChaosLogEvent chaosLogEvent) {String contextId String.format(%s-%s, chaosLogEvent.getBizType(), chaosLogEvent.getBizId());log.info(accept chaosLogEvent : {}, JSON.toJSONString(chaosLogEvent));ListLogWsEndpoint logWsEndpoints endpointMap.get(contextId);if (CollectionUtils.isEmpty(logWsEndpoints)) {return;}logWsEndpoints.forEach(endpoint - wsThreadPoolExecutor.submit(() - endpoint.flushMessage(chaosLogEvent.getBizType(), chaosLogEvent.getBizId(), true)));}
}注意上面有个accept()方法这个方法后面也会讲到主要就是用于触发已经建立连接Websocket发送消息。
核心逻辑实现, 这里读取的日志文件是存储在百度云的oisois读取逻辑忽略。
Slf4j
public class ChaosLogEventHandler {private static final long READ_LOG_MOST_LEN 1024 * 1024 * 5L; // 5Mprivate final ChaosLogger chaosLogger;Getterprivate final Session session;private final AtomicLong offset new AtomicLong(-1L);private final AtomicBoolean hasTruncated new AtomicBoolean(false);private final AtomicLong waitEventCnt new AtomicLong(0L);private final Lock lock new ReentrantLock();public ChaosLogEventHandler(ChaosLogger chaosLogger, Session session) {this.chaosLogger chaosLogger;this.session session;}public void flushMessage(String bizType, String bizId, boolean force) {String bizKey bizType - bizId;if (!lock.tryLock()) {waitEventCnt.incrementAndGet();log.info([WS]获取锁失败直接返回 sessionId : {}, bizKey:{}, session.getId(), bizKey);return;}try {if (!force waitEventCnt.getAndSet(0L) 1) {log.info([ws-chaos-log]没有待处理事件直接返回 sessionId : {}, bizKey:{}, session.getId(), bizKey);// 没有待处理的事件return;}log.info([ws-chaos-log]向客户端刷新数据 sessionId : {}, bizKey : {}, offset : {}, session.getId(), bizKey, offset.get());if (offset.get() 0) {long contentLength chaosLogger.getLogContentLength(bizType, bizId);log.info([ws-chaos-log]contentLength {} for bizLogKey {}, contentLength, bizKey);if (contentLength 0) {return;}if (contentLength READ_LOG_MOST_LEN) {offset.set(contentLength - READ_LOG_MOST_LEN);hasTruncated.set(true);log.info([ws-chaos-log]文件过大截取最后10M sessionId : {}, bizKey : {} contentLength{} offset{}, session.getId(), bizKey, contentLength, offset.get());} else {offset.set(0L);}} else if (!force) {long contentLength chaosLogger.getLogContentLength(bizType, bizId);if (contentLength offset.get()) {log.info([ws-chaos-log]文件长度小于offset无需刷新 sessionId : {}, bizKey : {} contentLength{} offset{}, session.getId(), bizKey, contentLength, offset.get());return;}}// 读取日志内容BosObject bosObject chaosLogger.readLogObject(bizType, bizId, offset.get(), Long.MAX_VALUE);try (BufferedReader reader new BufferedReader(new InputStreamReader(bosObject.getObjectContent()))) {String line null;while (null ! (line reader.readLine())) {if (hasTruncated.get()) {hasTruncated.set(false);log.info([ws-chaos-log]hasTruncated changed to false);} else {log.info([ws-chaos-log]send ws msg:{}, line);try {session.getBasicRemote().sendText(line \n);} catch (IllegalStateException e) {log.info([ws-chaos-log]发送消息过程中连接状态异常跳过, e);}}// 1是因为每一行结尾会有一个回车offset.addAndGet(line.getBytes(StandardCharsets.UTF_8).length 1);}} catch (IOException e) {log.error(, e);}} catch (NotFoundException e) {log.info([ws-chaos-log]未找到数据无需向客户端同步bizKey:{}, bizKey, e);} catch (RuntimeException e) {log.error(, e);} finally {lock.unlock();}log.info([ws-chaos-log]向客户端刷新数据,完成 sessionId : {}, bizKey : {}, session.getId(), bizKey);// 在处理过程中可能又有新的事件所以再次尝试刷新数据flushMessage(bizType, bizKey, false);}
}stept5: 广播事件全局监听
前后端建立连接的时候绑定了后端一台机器但是后台一般都是多台服务器如果事件传递到其他服务器那么已经建立的连接如何监听到并返回内呢
这里使用了rocketmq的机制每台机器都会监听到事件的变化从而触发当前机器将变更内容发回到前端。
Component
RocketMQMessageListener(topic EXECUTE_FLOW_LOG, selectorExpression log, consumerGroup flow-log, messageModel MessageModel.BROADCASTING)
Slf4j
public class ChaosLogEventConsumer implements RocketMQListenerString {Autowired(required false)private ListConsumerChaosLogEvent chaosLogEventConsumers Collections.emptyList();Overridepublic void onMessage(String message) {log.info([MQ]receive ChaosLogEvent message:{}, message);ChaosLogEvent event JsonUtils.fromJson(message, ChaosLogEvent.class);for (ConsumerChaosLogEvent consumer : chaosLogEventConsumers) {try {consumer.accept(event);} catch (RuntimeException e) {log.error([MQ] failed consume ChaosLogEvent messageconsumer: consumer.getClass(), e);}}}
}前端代码实现
以react为例仅供参考
export const fetchExecuteLogs (bizType: string, bizId: any, logsRef: any, setLogs: any) {if (!bizType || !bizId) {console.log(fetchLogs: logContextToken or node is null)return}setLogs([])if (logsRef.current[0]) {console.log(close ws)logsRef.current[0].close()}let host wsHost ? wsHost : window.location.hostlet protocol window.location.protocol https: ? wss : wslet client new WebSocket(${protocol}://${host}/ws/ark/chaos/execute/log/${bizType}/${bizId})logsRef.current [client, []]// 报错的回调函数client.onerror (e: any) {console.log(Connection Error)console.log(e)}//链接打开的回调函数client.onopen () {console.log(WebSocket Client Connected)}//链接关闭的回调函数client.onclose () {console.log(echo-protocol Client Closed)}//收到消息的处理函数client.onmessage (e: any) {if (logsRef.current[0] client) {if (typeof e.data string) {let newLogs [...logsRef.current[1], e.data]if (newLogs.length 250) {newLogs newLogs.slice(200)}setLogs(newLogs)logsRef.current [client, newLogs]}} else {client.close()}}const sendPing () {if (logsRef.current[0] client) {const data { message: heartbeat }client.send(JSON.stringify(data))setTimeout(sendPing, 10000)}}setTimeout(sendPing, 10000)
}