当前位置: 首页 > news >正文

网站建设与用户体验网站域名使用方法

网站建设与用户体验,网站域名使用方法,外贸零售平台,一锅汤资源分享网站建设大全目录 前言设计思想 前言 之前,我们写了通信协议的具体设计,接下来我们设计服务器类 设计思想 我们先只考虑一个虚拟主机的情况下, 在一个虚拟主机的情况下,我们需要有一个session会话来帮助我们存储信息,并且既然是网络通信,那么socket关键字肯定也必不可少,我们在引入一个线… 目录 前言设计思想 前言 之前,我们写了通信协议的具体设计,接下来我们设计服务器类 设计思想 我们先只考虑一个虚拟主机的情况下, 在一个虚拟主机的情况下,我们需要有一个session会话来帮助我们存储信息,并且既然是网络通信,那么socket关键字肯定也必不可少,我们在引入一个线程池,用来处理多个客户端的请求 private ServerSocket serverSocket null;// 当前考虑一个 BrokerServer 上只有一个 虚拟主机private VirtuaHost virtualHost new VirtuaHost(default);// 使用这个 哈希表 表示当前的所有会话(也就是说有哪些客户端正在和咱们的服务器进行通信)// 此处的 key 是 channelId, value 为对应的 Socket 对象private ConcurrentHashMapString, Socket sessions new ConcurrentHashMapString, Socket();// 引入一个线程池, 来处理多个客户端的请求.private ExecutorService executorService null;// 引入一个 boolean 变量控制服务器是否继续运行private volatile boolean runnable true;代码实现 package com.example.demo.mqServer;import com.example.demo.Common.*; import com.example.demo.mqServer.core.BasicProperties;import javax.websocket.Session; import java.io.*; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;/* 消息队列的本体服务器, 是TCP服务器 */ public class BrokerServer {private ServerSocket serverSocket null;// 当前考虑一个 BrokerServer 上只有一个 虚拟主机private VirtuaHost virtualHost new VirtuaHost(default);// 使用这个 哈希表 表示当前的所有会话(也就是说有哪些客户端正在和咱们的服务器进行通信)// 此处的 key 是 channelId, value 为对应的 Socket 对象private ConcurrentHashMapString, Socket sessions new ConcurrentHashMapString, Socket();// 引入一个线程池, 来处理多个客户端的请求.private ExecutorService executorService null;// 引入一个 boolean 变量控制服务器是否继续运行private volatile boolean runnable true;public BrokerServer (int port) throws IOException {serverSocket new ServerSocket(port);}// 开始服务器public void start() throws IOException {System.out.println([BrokerServer] 启动!);executorService Executors.newCachedThreadPool();try {while (runnable) {Socket clientSocket serverSocket.accept();// 把处理连接的逻辑丢给这个线程池.executorService.submit(() - {processConnection(clientSocket);});}} catch (SocketException e) {System.out.println([BrokerServer] 服务器停止运行!);// e.printStackTrace();}}// 停止服务器public void stop() throws IOException {runnablefalse;executorService.shutdownNow();serverSocket.close();}// 处理一个客户端的链接// 在这一个链接中, 可能会涉及到多个请求和响应private void processConnection(Socket clientSocket) {try (InputStream inputStream clientSocket.getInputStream();OutputStream outputStream clientSocket.getOutputStream()){try (DataInputStream dataInputStream new DataInputStream(inputStream);DataOutputStream dataOutputStream new DataOutputStream(outputStream)){while (true){// 1 读取请求并解析Request request readRequest(dataInputStream);// 2 根据请求计算响应Response response process(request, clientSocket);// 3. 把响应写回给客户端writeResponse(dataOutputStream, response);}} catch (EOFException | SocketException e) {// 对于这个代码, DataInputStream 如果读到 EOF , 就会抛出一个 EOFException 异常.// 需要借助这个异常来结束循环System.out.println([BrokerServer] connection 关闭! 客户端的地址: clientSocket.getInetAddress().toString() : clientSocket.getPort());} catch (ClassNotFoundException | MqException e) {e.printStackTrace();}} catch (IOException e) {System.out.println([BrokerServer] connection 出现异常!);e.printStackTrace();} finally {try {// 当连接处理完了, 就需要记得关闭 socketclientSocket.close();// 一个 TCP 连接中, 可能包含多个 channel. 需要把当前这个 socket 对应的所有 channel 也顺便清理掉.clearClosedSession(clientSocket);} catch (IOException e) {e.printStackTrace();}}}private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload new byte[request.getLength()];int n dataInputStream.read(payload);if (n ! request.getLength()) {throw new IOException(读取请求格式出错!);}request.setPayload(payload);return request;}private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());// 这个刷新缓冲区也是重要的操作!!dataOutputStream.flush();}private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {// 1. 把 request 中的 payload 做一个初步的解析.BasicArguments basicArguments (BasicArguments) BinaryTool.toObject(request.getPayload());System.out.println([Request] rid basicArguments.getRid() , channelId basicArguments.getChannelId() , type request.getType() , length request.getLength());// 2. 根据 type 的值, 来进一步区分接下来这次请求要干啥.boolean ok true;if (request.getType() 0x1) {// 创建 channelsessions.put(basicArguments.getChannelId(), clientSocket);System.out.println([BrokerServer] 创建 channel 完成! channelId basicArguments.getChannelId());} else if (request.getType() 0x2) {// 销毁 channelsessions.remove(basicArguments.getChannelId());System.out.println([BrokerServer] 销毁 channel 完成! channelId basicArguments.getChannelId());} else if (request.getType() 0x3) {// 创建交换机. 此时 payload 就是 ExchangeDeclareArguments 对象了.ExchangeDeclareArguments arguments (ExchangeDeclareArguments) basicArguments;ok virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() 0x4) {ExchangeDeleteArguments arguments (ExchangeDeleteArguments) basicArguments;ok virtualHost.exchangeDelete(arguments.getExchangeName());} else if (request.getType() 0x5) {QueueDeclareArguments arguments (QueueDeclareArguments) basicArguments;ok virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),arguments.isExclusive(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() 0x6) {QueueDeleteArguments arguments (QueueDeleteArguments) basicArguments;ok virtualHost.queueDelete((arguments.getQueueName()));} else if (request.getType() 0x7) {QueueBindArguments arguments (QueueBindArguments) basicArguments;ok virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());} else if (request.getType() 0x8) {QueueUnbindArguments arguments (QueueUnbindArguments) basicArguments;ok virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());} else if (request.getType() 0x9) {BasicPublishArguments arguments (BasicPublishArguments) basicArguments;ok virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),arguments.getBasicProperties(), arguments.getBody());} else if (request.getType() 0xa) {BasicConsumeArguments arguments (BasicConsumeArguments) basicArguments;ok virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),new Consumer() {// 这个回调函数要做的工作, 就是把服务器收到的消息可以直接推送回对应的消费者客户端Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {// 先知道当前这个收到的消息, 要发给哪个客户端.// 此处 consumerTag 其实是 channelId. 根据 channelId 去 sessions 中查询, 就可以得到对应的// socket 对象了, 从而可以往里面发送数据了// 1. 根据 channelId 找到 socket 对象Socket clientSockte sessions.get(consumerTag);if (clientSocket null || clientSocket.isClosed()) {throw new MqException([BrokerServer] 订阅消息的客户端已经关闭!);}SubScribeReturns subScribeReturns new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid();subScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBody(body);subScribeReturns.setBasicProperties(basicProperties);byte[] payload BinaryTool.toBytes(subScribeReturns);// 2. 构造响应数据Response response new Response();// oxc 服务器给消费者客户端托送的消息数据response.setType(0xc);// response 的 payload 就是一个 SubScribeReturnsresponse.setLength(payload.length);response.setPayload(payload);// 把数据写回到客户端 , 写入到响应之中DataOutputStream dataOutputStream new DataOutputStream(clientSockte.getOutputStream());writeResponse(dataOutputStream,response);}});} else if (request.getType() 0xb) {// 调用 basicAck 确认消息.BasicAckArguments arguments (BasicAckArguments) basicArguments;ok virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());} else {// 当前的 type 是非法的.throw new MqException([BrokerServer] 未知的 type! type request.getType());}// 3. 构造响应BasicReturns basicReturns new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload BinaryTool.toBytes(basicReturns);Response response new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);System.out.println([Response] rid basicReturns.getRid() , channelId basicReturns.getChannelId() , type response.getType() , length response.getLength());return response;}private void clearClosedSession(Socket clientSocket) {// 这里主要做的事情就是, 将遍历哈希表, 将不用的session 清楚掉// 不能在集合类中变查询, 边删除ListString toDeleteChannelId new ArrayList();for (Map.EntryString ,Socket entry:sessions.entrySet()) {toDeleteChannelId.add(entry.getKey());}for (String s:toDeleteChannelId) {sessions.remove(s);}}}
http://www.w-s-a.com/news/934348/

相关文章:

  • 网络架构扁平化windows优化大师好不好
  • 安康养老院收费价格表兰州seo整站优化服务商
  • 网站开发技术方案模板无锡网站建设推荐
  • 自助建站系统注册三维家3d设计软件免费
  • 做seo网站标题重要吗郑州众诚建设监理有限公司网站
  • 建设网站南沙区百度关键词推广怎么做
  • 网站建设公司做销售前景好不好石家庄外贸网站制作
  • windows2008做网站网站首页打开速度
  • 做外贸要做什么网站服装设计图
  • 中山市路桥建设有限公司网站网站开发角色分配权限
  • 加强档案网站建设网站搭建好了不用会不会被攻击
  • 维护网站信息网络建设服务
  • 网站建设策划书模板下载用自己电脑配置服务器做网站
  • 360免费建站空间淘宝数据网站开发
  • 做分销的网站本地dede网站怎么上线
  • 中学网站模板北京管理咨询公司
  • 网站开发用哪个软件方便二级网站建设 管理思路
  • 个人怎么创建网站中国建设银行网站口
  • 跟知乎一样的网站做展示网站步骤
  • 邯郸网站建设效果好wordpress app 加载慢
  • 做app的网站有哪些功能广州自适应网站建设
  • 兰州建设网站的网站开源网站建设
  • 深圳网站建设南山指数基金是什么意思
  • 备案中又需要建设网站网站信息组织优化
  • 做网站推广需要什么asp响应式h5网站源码下载
  • 柳州建设网官方网站免费自助建站哪个平台好
  • 论坛网站模板源码下载网站建设与网页设计是什么
  • 跑流量的网站淘宝网站的建设目标是
  • 网站计费系统怎么做九一制作网站
  • 网红营销推广温州seo博客