崇州市建设局网站,网站跳出率一般多少,淄博网站制作定制,wordpress编辑分段在线用户逻辑修复
在进行测试时#xff0c;发现当前代码有个问题#xff0c;如果test1在服务器进行连接#xff0c;本地的test2给test1发消息#xff0c;虽然test1能收到服务器上的信息#xff0c;但是本地服务日志中会报teset1不在线#xff0c;需要对该种情况进行修复…在线用户逻辑修复
在进行测试时发现当前代码有个问题如果test1在服务器进行连接本地的test2给test1发消息虽然test1能收到服务器上的信息但是本地服务日志中会报teset1不在线需要对该种情况进行修复
修复方案使用redis存储在线用户
WebSocketEndpoint
利用setBit记录登录用户,key为用户名的hashcode即便有可能冲突但是概率较小可以接受。
package com.example.im.endpoint;import com.example.im.app.service.WebSocketMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;/*** author PC*/
Component
ServerEndpoint(/ws)
public class WebSocketEndpoint {private final static Logger logger LoggerFactory.getLogger(WebSocketEndpoint.class);public static final ConcurrentHashMapInteger, WebSocketEndpoint WEB_SOCKET_ENDPOINT_MAP new ConcurrentHashMap();/*** redis中用户SessionMap*/public static final String ONLINE_USER cus:ws:online-user;private Session session;private static WebSocketMessageService webSocketMessageService;private static RedisTemplateString, String redisTemplate;Autowiredpublic void setWebSocketMessageService(WebSocketMessageService webSocketMessageService) {WebSocketEndpoint.webSocketMessageService webSocketMessageService;}Autowiredpublic void setRedisTemplate(RedisTemplateString, String redisTemplate) {WebSocketEndpoint.redisTemplate redisTemplate;}/*** 打开ws连接** param session 会话*/OnOpenpublic void onOpen(Session session) {//连接成功String userName getUserName(session);logger.info(The connection is successful: getUserName(session));this.session session;//是有hash冲突的可能性的不过触发概率很低可以忽略int hashCode userName.hashCode();redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, true);WEB_SOCKET_ENDPOINT_MAP.put(hashCode, this);}/*** 断开ws连接** param session 会话*/OnClosepublic void onClose(Session session) {String userName getUserName(session);int hashCode userName.hashCode();redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, false);WEB_SOCKET_ENDPOINT_MAP.remove(hashCode);//断开连接logger.info(Disconnect: userName);}/*** 接收到的消息** param message 消息内容*/OnMessagepublic void onMessage(String message, Session session) {//接收消息String sendUserName getUserName(session);webSocketMessageService.sendMessage(sendUserName, message);}private String getUserName(Session session) {return Optional.ofNullable(session.getRequestParameterMap().get(userName)).orElse(new ArrayList()).stream().findFirst().orElse(anonymous_users);}public Session getSession() {return session;}public void setSession(Session session) {this.session session;}
}DefaultSendExecutor
适配改变类型后的WEB_SOCKET_ENDPOINT_MAP并调整代码结构
package com.example.im.endpoint;import com.example.im.app.service.WebSocketMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;/*** author PC*/
Component
ServerEndpoint(/ws)
public class WebSocketEndpoint {private final static Logger logger LoggerFactory.getLogger(WebSocketEndpoint.class);public static final ConcurrentHashMapInteger, WebSocketEndpoint WEB_SOCKET_ENDPOINT_MAP new ConcurrentHashMap();/*** redis中用户SessionMap*/public static final String ONLINE_USER cus:ws:online-user;private Session session;private static WebSocketMessageService webSocketMessageService;private static RedisTemplateString, String redisTemplate;Autowiredpublic void setWebSocketMessageService(WebSocketMessageService webSocketMessageService) {WebSocketEndpoint.webSocketMessageService webSocketMessageService;}Autowiredpublic void setRedisTemplate(RedisTemplateString, String redisTemplate) {WebSocketEndpoint.redisTemplate redisTemplate;}/*** 打开ws连接** param session 会话*/OnOpenpublic void onOpen(Session session) {//连接成功String userName getUserName(session);logger.info(The connection is successful: getUserName(session));this.session session;//是有hash冲突的可能性的不过触发概率很低可以忽略int hashCode userName.hashCode();redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, true);WEB_SOCKET_ENDPOINT_MAP.put(hashCode, this);}/*** 断开ws连接** param session 会话*/OnClosepublic void onClose(Session session) {String userName getUserName(session);int hashCode userName.hashCode();redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, false);WEB_SOCKET_ENDPOINT_MAP.remove(hashCode);//断开连接logger.info(Disconnect: userName);}/*** 接收到的消息** param message 消息内容*/OnMessagepublic void onMessage(String message, Session session) {//接收消息String sendUserName getUserName(session);webSocketMessageService.sendMessage(sendUserName, message);}private String getUserName(Session session) {return Optional.ofNullable(session.getRequestParameterMap().get(userName)).orElse(new ArrayList()).stream().findFirst().orElse(anonymous_users);}public Session getSession() {return session;}public void setSession(Session session) {this.session session;}
}重复代码提取
在增加了stream和redis渠道后消息监听处的代码有大段重复可以进行提取处理
MessageInfoUtil
package com.example.im.infra.executor.send.util;import com.example.im.infra.executor.send.DefaultSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.util.JsonUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;/*** author PC* MessageInfo工具类*/
Component
public class MessageInfoUtil {private final static Logger logger LoggerFactory.getLogger(MessageInfoUtil.class);private DefaultSendExecutor defaultSendExecutor;Autowiredpublic void setDefaultSendExecutor(DefaultSendExecutor defaultSendExecutor) {this.defaultSendExecutor defaultSendExecutor;}public MessageInfo sendMessageByMessageInfoByte(byte[] messageInfoByte) {String messageJson new String(messageInfoByte, StandardCharsets.UTF_8);MessageInfo messageInfo JsonUtils.toObjectByTypeReference(messageJson, new TypeReferenceMessageInfo() {});switch (messageInfo.getScopeOfSending()) {case USER:defaultSendExecutor.sendToUser(messageInfo.getSendUserName(), messageInfo.getMessage());break;case ALL:defaultSendExecutor.sendToAll(messageInfo.getSendUserName(), messageInfo.getMessage());break;default://一般来说不会出现该情况除非用户覆盖了ScopeOfSending后续可以开个扩展发送范围的口子logger.warn(invalid sending range: messageInfo.getScopeOfSending().getScopeCode());break;}return messageInfo;}
}RedisMessageListener
package com.example.im.infra.executor.send.redis;import com.example.im.infra.executor.send.util.MessageInfoUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;/*** author PC* redis监听*/
Component
public class RedisMessageListener implements MessageListener {private final static Logger logger LoggerFactory.getLogger(RedisMessageListener.class);private MessageInfoUtil messageInfoUtil;Autowiredpublic void setMessageInfoUtil(MessageInfoUtil messageInfoUtil) {this.messageInfoUtil messageInfoUtil;}Overridepublic void onMessage(Message message, byte[] pattern) {logger.debug(send redis info);messageInfoUtil.sendMessageByMessageInfoByte(message.getBody());}
}StreamMessageListener
package com.example.im.infra.executor.send.stream;import com.example.im.infra.executor.send.util.MessageInfoUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.util.function.Function;/*** author PC* 消息队列监听*/
Component
public class StreamMessageListener {private final static Logger logger LoggerFactory.getLogger(StreamSendExecutor.class);private MessageInfoUtil messageInfoUtil;Autowiredpublic void setMessageInfoUtil(MessageInfoUtil messageInfoUtil) {this.messageInfoUtil messageInfoUtil;}Beanpublic FunctionFluxMessagebyte[], MonoVoid listener() {logger.debug(send stream info);return messageInfoFlux - messageInfoFlux.map(message - messageInfoUtil.sendMessageByMessageInfoByte(message.getPayload())).then();}
}未用到的bean不加载
当未用到某些渠道时无需进行相关配置如使用了redis渠道yml中就无需配置kafka信息
Redis渠道
调整config文件当渠道非redis时对redis渠道相关bean不进行加载
WebSocketConfig
package com.example.im.config;import com.example.im.infra.handle.ImRejectExecutionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;import javax.annotation.Resource;/*** author PC*/
Configuration
EnableWebSocket
public class WebSocketConfig {Resourceprivate WebSocketProperties webSocketProperties;Beanpublic ServerEndpointExporter serverEndpoint() {return new ServerEndpointExporter();}/**** 配置线程池* return 线程池*/Beanpublic TaskExecutor taskExecutor() {WebSocketProperties.ExecutorProperties executorProperties webSocketProperties.getExecutorProperties();ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();// 设置核心线程数executor.setCorePoolSize(executorProperties.getCorePoolSize());// 设置最大线程数executor.setMaxPoolSize(executorProperties.getMaxPoolSize());// 设置队列容量executor.setQueueCapacity(executorProperties.getQueueCapacity());// 设置线程活跃时间秒executor.setKeepAliveSeconds(executorProperties.getKeepAliveSeconds());// 设置默认线程名称executor.setThreadNamePrefix(im-);// 设置拒绝策略executor.setRejectedExecutionHandler(new ImRejectExecutionHandler());// 等待所有任务结束后再关闭线程池executor.setWaitForTasksToCompleteOnShutdown(true);return executor;}
}RedisConfiguration
缓存用到了redis,RedisTemplate需要加载
package com.example.im.config;import com.example.im.infra.executor.send.redis.RedisMessageListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** author PC*/
Configuration
public class RedisConfiguration {/*** redisTemplate配置** return RedisTemplate*/Beanpublic RedisTemplateString, String redisTemplate(RedisConnectionFactory connectionFactory) {RedisTemplateString, String redisTemplate new RedisTemplate();StringRedisSerializer stringRedisSerializer new StringRedisSerializer();redisTemplate.setKeySerializer(stringRedisSerializer);redisTemplate.setValueSerializer(stringRedisSerializer);redisTemplate.setStringSerializer(stringRedisSerializer);redisTemplate.setDefaultSerializer(stringRedisSerializer);redisTemplate.setHashKeySerializer(stringRedisSerializer);redisTemplate.setHashValueSerializer(stringRedisSerializer);redisTemplate.setConnectionFactory(connectionFactory);return redisTemplate;}BeanConditionalOnProperty(name cus.ws.communication-type, havingValue redis)RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {RedisMessageListenerContainer container new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 订阅一个或多个频道container.addMessageListener(listenerAdapter, new PatternTopic(redis-websocket-*));return container;}BeanConditionalOnProperty(name cus.ws.communication-type, havingValue redis)MessageListenerAdapter listenerAdapter(RedisMessageListener redisMessageListener) {return new MessageListenerAdapter(redisMessageListener);}
}RedisMessageListener
Component
ConditionalOnProperty(name cus.ws.communication-type, havingValue redis)
public class RedisMessageListener implements MessageListener
RedisSendExecutor
Component
ConditionalOnProperty(name cus.ws.communication-type, havingValue redis)
public class RedisSendExecutor extends AbstractBaseSendExecutor
Kafka渠道
StreamMessageListener
Component
ConditionalOnProperty(name cus.ws.communication-type, havingValue stream)
public class StreamMessageListener
StreamSendExecutor
Component
ConditionalOnProperty(name cus.ws.communication-type, havingValue stream)
public class StreamSendExecutor extends AbstractBaseSendExecutor
参考资料
[1].im项目