当前位置: 首页 > news >正文

制作网站的布局网络wordpress 代码开发

制作网站的布局网络,wordpress 代码开发,wordpress 手机 app,淘宝网页设计代码前言 作为一名开发人员我们经常会听到HTTP协议、TCP/IP协议、UDP协议、Socket、Socket长连接、Socket连接池等字眼#xff0c;然而它们之间的关系、区别及原理并不是所有人都能理解清楚#xff0c;这篇文章就从网络协议基础开始到Socket连接池#xff0c;一步一步解释他们之…前言 作为一名开发人员我们经常会听到HTTP协议、TCP/IP协议、UDP协议、Socket、Socket长连接、Socket连接池等字眼然而它们之间的关系、区别及原理并不是所有人都能理解清楚这篇文章就从网络协议基础开始到Socket连接池一步一步解释他们之间的关系。 七层网络模型 ​ 首先从网络通信的分层模型讲起七层模型亦称OSI(Open System Interconnection)模型。自下往上分为物理层、数据链路层、网络层、传输层、会话层、表示层和应用层。所有有关通信的都离不开它下面这张图片介绍了各层所对应的一些协议和硬件 通过上图我知道IP协议对应于网络层TCP、UDP协议对应于传输层而HTTP协议对应于应用层OSI并没有Socket那什么是Socket后面我们将结合代码具体详细介绍。 TCP和UDP连接 ​ 关于传输层TCP、UDP协议可能我们平时遇见的会比较多有人说TCP是安全的UDP是不安全的UDP传输比TCP快那为什么呢我们先从TCP的连接建立的过程开始分析然后解释UDP和TCP的区别。 TCP的三次握手和四次分手 ​ 我们知道TCP建立连接需要经过三次握手而断开连接需要经过四次分手那三次握手和四次分手分别做了什么和如何进行的。 第一次握手建立连接。客户端发送连接请求报文段将SYN位置为1Sequence Number为x然后客户端进入SYN_SEND状态等待服务器的确认第二次握手服务器收到客户端的SYN报文段需要对这个SYN报文段进行确认设置Acknowledgment Number为x1(Sequence Number1)同时自己自己还要发送SYN请求信息将SYN位置为1Sequence Number为y服务器端将上述所有信息放到一个报文段即SYNACK报文段中一并发送给客户端此时服务器进入SYN_RECV状态第三次握手客户端收到服务器的SYNACK报文段。然后将Acknowledgment Number设置为y1向服务器发送ACK报文段这个报文段发送完毕以后客户端和服务器端都进入ESTABLISHED状态完成TCP三次握手。 完成了三次握手客户端和服务器端就可以开始传送数据。以上就是TCP三次握手的总体介绍。通信结束客户端和服务端就断开连接需要经过四次分手确认。 第一次分手主机1可以使客户端也可以是服务器端设置Sequence Number和Acknowledgment Number向主机2发送一个FIN报文段此时主机1进入FIN_WAIT_1状态这表示主机1没有数据要发送给主机2了第二次分手主机2收到了主机1发送的FIN报文段向主机1回一个ACK报文段Acknowledgment Number为Sequence Number加1主机1进入FIN_WAIT_2状态主机2告诉主机1我“同意”你的关闭请求第三次分手主机2向主机1发送FIN报文段请求关闭连接同时主机2进入LAST_ACK状态第四次分手主机1收到主机2发送的FIN报文段向主机2发送ACK报文段然后主机1进入TIME_WAIT状态主机2收到主机1的ACK报文段以后就关闭连接此时主机1等待2MSL后依然没有收到回复则证明Server端已正常关闭那好主机1也可以关闭连接了。 可以看到一次tcp请求的建立及关闭至少进行7次通信这还不包过数据的通信而UDP不需3次握手和4次分手。 TCP和UDP的区别 1、TCP是面向链接的虽然说网络的不安全不稳定特性决定了多少次握手都不能保证连接的可靠性但TCP的三次握手在最低限度上(实际上也很大程度上保证了)保证了连接的可靠性;而UDP不是面向连接的UDP传送数据前并不与对方建立连接对接收到的数据也不发送确认信号发送端不知道数据是否会正确接收当然也不用重发所以说UDP是无连接的、不可靠的一种数据传输协议。   2、也正由于1所说的特点使得UDP的开销更小数据传输速率更高因为不必进行收发数据的确认所以UDP的实时性更好。知道了TCP和UDP的区别就不难理解为何采用TCP传输协议的MSN比采用UDP的QQ传输文件慢了但并不能说QQ的通信是不安全的因为程序员可以手动对UDP的数据收发进行验证比如发送方对每个数据包进行编号然后由接收方进行验证啊什么的即使是这样UDP因为在底层协议的封装上没有采用类似TCP的“三次握手”而实现了TCP所无法达到的传输效率。 问题 关于传输层我们会经常听到一些问题 1TCP服务器最大并发连接数是多少 关于TCP服务器最大并发连接数有一种误解就是“因为端口号上限为65535,所以TCP服务器理论上的可承载的最大并发连接数也是65535”。首先需要理解一条TCP连接的组成部分客户端IP、客户端端口、服务端IP、服务端端口。所以对于TCP服务端进程来说他可以同时连接的客户端数量并不受限于可用端口号理论上一个服务器的一个端口能建立的连接数是全球的IP数*每台机器的端口数。实际并发连接数受限于linux可打开文件数这个数是可以配置的可以非常大所以实际上受限于系统性能。通过#ulimit -n 查看服务的最大文件句柄数通过ulimit -n xxx 修改 xxx是你想要能打开的数量。也可以通过修改系统参数 #vi /etc/security/limits.conf *  soft  nofile  65536 *  hard  nofile  65536 2为什么TIME_WAIT状态还需要等2MSL后才能返回到CLOSED状态 这是因为虽然双方都同意关闭连接了而且握手的4个报文也都协调和发送完毕按理可以直接回到CLOSED状态就好比从SYN_SEND状态到ESTABLISH状态那样但是因为我们必须要假想网络是不可靠的你无法保证你最后发送的ACK报文会一定被对方收到因此对方处于LAST_ACK状态下的Socket可能会因为超时未收到ACK报文而重发FIN报文所以这个TIME_WAIT状态的作用就是用来重发可能丢失的ACK报文。 3TIME_WAIT状态还需要等2MSL后才能返回到CLOSED状态会产生什么问题 通信双方建立TCP连接后主动关闭连接的一方就会进入TIME_WAIT状态TIME_WAIT状态维持时间是两个MSL时间长度也就是在1-4分钟Windows操作系统就是4分钟。进入TIME_WAIT状态的一般情况下是客户端一个TIME_WAIT状态的连接就占用了一个本地端口。一台机器上端口号数量的上限是65536个如果在同一台机器上进行压力测试模拟上万的客户请求并且循环与服务端进行短连接通信那么这台机器将产生4000个左右的TIME_WAIT Socket后续的短连接就会产生address already in use : connect的异常如果使用Nginx作为方向代理也需要考虑TIME_WAIT状态发现系统存在大量TIME_WAIT状态的连接通过调整内核参数解决。 vi /etc/sysctl.conf 编辑文件加入以下内容 net.ipv4.tcp_syncookies 1 net.ipv4.tcp_tw_reuse 1 net.ipv4.tcp_tw_recycle 1 net.ipv4.tcp_fin_timeout 30 然后执行 /sbin/sysctl -p 让参数生效。 net.ipv4.tcp_syncookies 1 表示开启SYN Cookies。当出现SYN等待队列溢出时启用cookies来处理可防范少量SYN攻击默认为0表示关闭 net.ipv4.tcp_tw_reuse 1 表示开启重用。允许将TIME-WAIT sockets重新用于新的TCP连接默认为0表示关闭 net.ipv4.tcp_tw_recycle 1 表示开启TCP连接中TIME-WAIT sockets的快速回收默认为0表示关闭。 net.ipv4.tcp_fin_timeout 修改系統默认的TIMEOUT时间 相关视频推荐 10道网络八股文每道都很经典让你在面试中逼格满满 徒手实现网络协议栈请准备好环境一起来写代码 C网络面试题TCP/UDP应用场景分析UDP如何实现可靠性设计 免费学习地址C/CLinux服务器开发/后台架构师 需要C/C Linux服务器架构师学习资料加qun812855908获取资料包括C/CLinuxgolang技术NginxZeroMQMySQLRedisfastdfsMongoDBZK流媒体CDNP2PK8SDockerTCP/IP协程DPDKffmpeg等免费分享 HTTP协议 关于TCP/IP和HTTP协议的关系网络有一段比较容易理解的介绍“我们在传输数据时可以只使用(传输层)TCP/IP协议但是那样的话如果没有应用层便无法识别数据内容。如果想要使传输的数据有意义则必须使用到应用层协议。应用层协议有很多比如HTTP、FTP、TELNET等也可以自己定义应用层协议。 HTTP协议即超文本传送协议(Hypertext Transfer Protocol )是Web联网的基础也是手机联网常用的协议之一WEB使用HTTP协议作应用层协议以封装HTTP文本信息然后使用TCP/IP做传输层协议将它发到网络上。 由于HTTP在每次请求结束后都会主动释放连接因此HTTP连接是一种“短连接”要保持客户端程序的在线状态需要不断地向服务器发起连接请求。通常 的做法是即时不需要获得任何数据客户端也保持每隔一段固定的时间向服务器发送一次“保持连接”的请求服务器在收到该请求后对客户端进行回复表明知道 客户端“在线”。若服务器长时间无法收到客户端的请求则认为客户端“下线”若客户端长时间无法收到服务器的回复则认为网络已经断开。 下面是一个简单的HTTP Post application/json数据内容的请求 POST HTTP/1.1 Host: 127.0.0.1:9017 Content-Type: application/json Cache-Control: no-cache{a:a} 关于Socket套接字 现在我们了解到TCP/IP只是一个协议栈就像操作系统的运行机制一样必须要具体实现同时还要提供对外的操作接口。就像操作系统会提供标准的编程接口比如Win32编程接口一样TCP/IP也必须对外提供编程接口这就是Socket。现在我们知道Socket跟TCP/IP并没有必然的联系。Socket编程接口在设计的时候就希望也能适应其他的网络协议。所以Socket的出现只是可以更方便的使用TCP/IP协议栈而已其对TCP/IP进行了抽象形成了几个最基本的函数接口。比如createlistenacceptconnectread和write等等。 不同语言都有对应的建立Socket服务端和客户端的库下面举例Nodejs如何创建服务端和客户端服务端 const net require(net); const server net.createServer(); server.on(connection, (client) {client.write(Hi!\n); // 服务端向客户端输出信息使用 write() 方法client.write(Bye!\n);//client.end(); // 服务端结束该次会话 }); server.listen(9000); 服务监听9000端口 下面使用命令行发送http请求和telnet $ curl http://127.0.0.1:9000 Bye!$telnet 127.0.0.1 9000 Trying 192.168.1.21... Connected to 192.168.1.21. Escape character is ^]. Hi! Bye! Connection closed by foreign host. 注意到curl只处理了一次报文。客户端 const client new net.Socket(); client.connect(9000, 127.0.0.1, function () { }); client.on(data, (chunk) {console.log(data, chunk.toString())//data Hi!//Bye! }); Socket长连接 所谓长连接指在一个TCP连接上可以连续发送多个数据包在TCP连接保持期间如果没有数据包发送需要双方发检测包以维持此连接(心跳包)一般需要自己做在线维持。 短连接是指通信双方有数据交互时就建立一个TCP连接数据发送完成后则断开此TCP连接。比如Http的只是连接、请求、关闭过程时间较短,服务器若是一段时间内没有收到请求即可关闭连接。其实长连接是相对于通常的短连接而说的也就是长时间保持客户端与服务端的连接状态。 通常的短连接操作步骤是 连接→数据传输→关闭连接 而长连接通常就是 连接→数据传输→保持连接(心跳)→数据传输→保持连接(心跳)→……→关闭连接 什么时候用长连接短连接 长连接多用于操作频繁点对点的通讯而且连接数不能太多情况。每个TCP连接都需要三步握手这需要时间如果每个操作都是先连接再操作的话那么处理 速度会降低很多所以每个操作完后都不断开次处理时直接发送数据包就OK了不用建立TCP连接。例如数据库的连接用长连接 如果用短连接频繁的通信会造成Socket错误而且频繁的Socket创建也是对资源的浪费。 什么是心跳包为什么需要 心跳包就是在客户端和服务端间定时通知对方自己状态的一个自己定义的命令字按照一定的时间间隔发送类似于心跳所以叫做心跳包。网络中的接收和发送数据都是使用Socket进行实现。但是如果此套接字已经断开比如一方断网了那发送数据和接收数据的时候就一定会有问题。可是如何判断这个套接字是否还可以使用呢这个就需要在系统中创建心跳机制。其实TCP中已经为我们实现了一个叫做心跳的机制。如果你设置了心跳那TCP就会在一定的时间比如你设置的是3秒钟内发送你设置的次数的心跳比如说2次并且此信息不会影响你自己定义的协议。也可以自己定义所谓“心跳”就是定时发送一个自定义的结构体心跳包或心跳帧让对方知道自己“在线”,以确保链接的有效性。实现 服务端 const net require(net);let clientList []; const heartbeat HEARTBEAT; // 定义心跳包内容确保和平时发送的数据不会冲突const server net.createServer(); server.on(connection, (client) {console.log(客户端建立连接:, client.remoteAddress : client.remotePort);clientList.push(client);client.on(data, (chunk) {let content chunk.toString();if (content heartbeat) {console.log(收到客户端发过来的一个心跳包);} else {console.log(收到客户端发过来的数据:, content);client.write(服务端的数据: content);}});client.on(end, () {console.log(收到客户端end);clientList.splice(clientList.indexOf(client), 1);});client.on(error, () {clientList.splice(clientList.indexOf(client), 1);}) }); server.listen(9000); setInterval(broadcast, 10000); // 定时发送心跳包 function broadcast() {console.log(broadcast heartbeat, clientList.length);let cleanup []for (let i0;iclientList.length;i1) {if (clientList[i].writable) { // 先检查 sockets 是否可写clientList[i].write(heartbeat);} else {console.log(一个无效的客户端);cleanup.push(clientList[i]); // 如果不可写收集起来销毁。销毁之前要 Socket.destroy() 用 API 的方法销毁。clientList[i].destroy();}}//Remove dead Nodes out of write loop to avoid trashing loop indexfor (let i0; icleanup.length; i1) {console.log(删除无效的客户端:, cleanup[i].name);clientList.splice(clientList.indexOf(cleanup[i]), 1);} } 服务端输出结果 客户端建立连接: ::ffff:127.0.0.1:57125 broadcast heartbeat 1 收到客户端发过来的数据: Thu, 29 Mar 2018 03:45:15 GMT 收到客户端发过来的一个心跳包 收到客户端发过来的数据: Thu, 29 Mar 2018 03:45:20 GMT broadcast heartbeat 1 收到客户端发过来的数据: Thu, 29 Mar 2018 03:45:25 GMT 收到客户端发过来的一个心跳包 客户端建立连接: ::ffff:127.0.0.1:57129 收到客户端发过来的一个心跳包 收到客户端发过来的数据: Thu, 29 Mar 2018 03:46:00 GMT 收到客户端发过来的数据: Thu, 29 Mar 2018 03:46:04 GMT broadcast heartbeat 2 收到客户端发过来的数据: Thu, 29 Mar 2018 03:46:05 GMT 收到客户端发过来的一个心跳包 客户端代码 const net require(net);const heartbeat HEARTBEAT; const client new net.Socket(); client.connect(9000, 127.0.0.1, () {}); client.on(data, (chunk) {let content chunk.toString();if (content heartbeat) {console.log(收到心跳包, content);} else {console.log(收到数据, content);} });// 定时发送数据 setInterval(() {console.log(发送数据, new Date().toUTCString());client.write(new Date().toUTCString()); }, 5000);// 定时发送心跳包 setInterval(function () {client.write(heartbeat); }, 10000); 客户端输出结果 发送数据 Thu, 29 Mar 2018 03:46:04 GMT 收到数据 服务端的数据:Thu, 29 Mar 2018 03:46:04 GMT 收到心跳包 HEARTBEAT 发送数据 Thu, 29 Mar 2018 03:46:09 GMT 收到数据 服务端的数据:Thu, 29 Mar 2018 03:46:09 GMT 发送数据 Thu, 29 Mar 2018 03:46:14 GMT 收到数据 服务端的数据:Thu, 29 Mar 2018 03:46:14 GMT 收到心跳包 HEARTBEAT 发送数据 Thu, 29 Mar 2018 03:46:19 GMT 收到数据 服务端的数据:Thu, 29 Mar 2018 03:46:19 GMT 发送数据 Thu, 29 Mar 2018 03:46:24 GMT 收到数据 服务端的数据:Thu, 29 Mar 2018 03:46:24 GMT 收到心跳包 HEARTBEAT 定义自己的协议 如果想要使传输的数据有意义则必须使用到应用层协议比如Http、Mqtt、Dubbo等。基于TCP协议上自定义自己的应用层的协议需要解决的几个问题 心跳包格式的定义及处理报文头的定义就是你发送数据的时候需要先发送报文头报文里面能解析出你将要发送的数据长度你发送数据包的格式是json的还是其他序列化的方式 下面我们就一起来定义自己的协议并编写服务的和客户端进行调用 定义报文头格式 length:000000000xxxx; xxxx代表数据的长度总长度20,举例子不严谨。 数据表的格式: Json 服务端: const net require(net); const server net.createServer(); let clientList []; const heartBeat HeartBeat; // 定义心跳包内容确保和平时发送的数据不会冲突 const getHeader (num) {return length: (Array(13).join(0) num).slice(-13); } server.on(connection, (client) {client.name client.remoteAddress : client.remotePort// client.write(Hi client.name !\n);console.log(客户端建立连接, client.name);clientList.push(client)let chunks [];let length 0;client.on(data, (chunk) {let content chunk.toString();console.log(content:, content, content.length);if (content heartBeat) {console.log(收到客户端发过来的一个心跳包);} else {if (content.indexOf(length:) 0){length parseInt(content.substring(7,20));console.log(length, length);chunks [chunk.slice(20, chunk.length)];} else {chunks.push(chunk);}let heap Buffer.concat(chunks);console.log(heap.length, heap.length)if (heap.length length) {try {console.log(收到数据, JSON.parse(heap.toString()));let data 服务端的数据数据: heap.toString();;let dataBuff Buffer.from(JSON.stringify(data));let header getHeader(dataBuff.length)client.write(header);client.write(dataBuff);} catch (err) {console.log(数据解析失败);}}}})client.on(end, () {console.log(收到客户端end);clientList.splice(clientList.indexOf(client), 1);});client.on(error, () {clientList.splice(clientList.indexOf(client), 1);}) }); server.listen(9000); setInterval(broadcast, 10000); // 定时检查客户端 并发送心跳包 function broadcast() {console.log(broadcast heartbeat, clientList.length);let cleanup []for(var i0;iclientList.length;i1) {if(clientList[i].writable) { // 先检查 sockets 是否可写// clientList[i].write(heartBeat); // 发送心跳数据} else {console.log(一个无效的客户端)cleanup.push(clientList[i]) // 如果不可写收集起来销毁。销毁之前要 Socket.destroy() 用 API 的方法销毁。clientList[i].destroy();}}// 删除无效的客户端for(i0; icleanup.length; i1) {console.log(删除无效的客户端:, cleanup[i].name);clientList.splice(clientList.indexOf(cleanup[i]), 1)} } 日志打印 客户端建立连接 ::ffff:127.0.0.1:50178content: length:0000000000031 20length 31heap.length 0content: Tue, 03 Apr 2018 06:12:37 GMT 31heap.length 31收到数据 Tue, 03 Apr 2018 06:12:37 GMTbroadcast heartbeat 1content: HeartBeat 9收到客户端发过来的一个心跳包content: length:0000000000031Tue, 03 Apr 2018 06:12:42 GMT 51length 31heap.length 31收到数据 Tue, 03 Apr 2018 06:12:42 GMT 客户端 const net require(net); const client new net.Socket(); const heartBeat HeartBeat; // 定义心跳包内容确保和平时发送的数据不会冲突 const getHeader (num) {return length: (Array(13).join(0) num).slice(-13); } client.connect(9000, 127.0.0.1, function () {}); let chunks []; let length 0; client.on(data, (chunk) {let content chunk.toString();console.log(content:, content, content.length);if (content heartBeat) {console.log(收到服务端发过来的一个心跳包);} else {if (content.indexOf(length:) 0){length parseInt(content.substring(7,20));console.log(length, length);chunks [chunk.slice(20, chunk.length)];} else {chunks.push(chunk);}let heap Buffer.concat(chunks);console.log(heap.length, heap.length)if (heap.length length) {try {console.log(收到数据, JSON.parse(heap.toString()));} catch (err) {console.log(数据解析失败);}}} }); // 定时发送数据 setInterval(function () {let data new Date().toUTCString();let dataBuff Buffer.from(JSON.stringify(data));let header getHeader(dataBuff.length);client.write(header);client.write(dataBuff); }, 5000); // 定时发送心跳包 setInterval(function () {client.write(heartBeat); }, 10000); 日志打印 content: length:0000000000060 20length 60heap.length 0content: 服务端的数据数据:\Tue, 03 Apr 2018 06:12:37 GMT\ 44heap.length 60收到数据 服务端的数据数据:Tue, 03 Apr 2018 06:12:37 GMTcontent: length:0000000000060服务端的数据数据:\Tue, 03 Apr 2018 06:12:42 GMT\ 64length 60heap.length 60收到数据 服务端的数据数据:Tue, 03 Apr 2018 06:12:42 GMT 客户端定时发送自定义协议数据到服务端先发送头数据在发送内容数据另外一个定时器发送心跳数据服务端判断是心跳数据再判断是不是头数据再是内容数据然后解析后再发送数据给客户端。从日志的打印可以看出客户端先后writeheader和data数据服务端可能在一个data事件里面接收到。 这里可以看到一个客户端在同一个时间内处理一个请求可以很好的工作但是想象这么一个场景如果同一时间内让同一个客户端去多次调用服务端请求发送多次头数据和内容数据服务端的data事件收到的数据就很难区别哪些数据是哪次请求的比如两次头数据同时到达服务端服务端就会忽略其中一次而后面的内容数据也不一定就对应于这个头的。所以想复用长连接并能很好的高并发处理服务端请求就需要连接池这种方式了。 Socket连接池 什么是Socket连接池,池的概念可以联想到是一种资源的集合所以Socket连接池就是维护着一定数量Socket长连接的集合。它能自动检测Socket长连接的有效性剔除无效的连接补充连接池的长连接的数量。从代码层次上其实是人为实现这种功能的类一般一个连接池包含下面几个属性 空闲可使用的长连接队列正在运行的通信的长连接队列等待去获取一个空闲长连接的请求的队列无效长连接的剔除功能长连接资源池的数量配置长连接资源的新建功能 场景 一个请求过来首先去资源池要求获取一个长连接资源如果空闲队列里面有长连接就获取到这个长连接Socket,并把这个Socket移到正在运行的长连接队列。如果空闲队列里面没有且正在运行的队列长度小于配置的连接池资源的数量就新建一个长连接到正在运行的队列去如果正在运行的不下于配置的资源池长度则这个请求进入到等待队列去。当一个正在运行的Socket完成了请求就从正在运行的队列移到空闲的队列并触发等待请求队列去获取空闲资源如果有等待的情况。 这里简单介绍Nodejs的Socket连接池generic-pool模块的源码。主要文件目录结构 . |————lib ------------------------- 代码库 | |————DefaultEvictor.js ---------- | |————Deferred.js ---------------- | |————Deque.js ------------------- | |————DequeIterator.js ----------- | |————DoublyLinkedList.js -------- | |————DoublyLinkedListIterator.js- | |————factoryValidator.js -------- | |————Pool.js -------------------- 连接池主要代码 | |————PoolDefaults.js ------------ | |————PooledResource.js ---------- | |————Queue.js ------------------- 队列 | |————ResourceLoan.js ------------ | |————ResourceRequest.js --------- | |————utils.js ------------------- 工具 |————test ------------------------- 测试目录 |————README.md ------------------- 项目描述文件 |————.eslintrc ------------------- eslint静态检查配置文件 |————.eslintignore --------------- eslint静态检查忽略的文件 |————package.json ----------------- npm包依赖配置 下面介绍库的使用 初始化连接池 use strict; const net require(net); const genericPool require(generic-pool);function createPool(conifg) {let options Object.assign({fifo: true, // 是否优先使用老的资源priorityRange: 1, // 优先级testOnBorrow: true, // 是否开启获取验证// acquireTimeoutMillis: 10 * 1000, // 获取的超时时间autostart: true, // 自动初始化和释放调度启用min: 10, // 初始化连接池保持的长连接最小数量max: 0, // 最大连接池保持的长连接数量evictionRunIntervalMillis: 0, // 资源释放检验间隔检查 设置了下面几个参数才起效果numTestsPerEvictionRun: 3, // 每次释放资源数量softIdleTimeoutMillis: -1, // 可用的超过了最小的min 且空闲时间时间 达到释放idleTimeoutMillis: 30000 // 强制释放// maxWaitingClients: 50 // 最大等待}, conifg.options);const factory {create: function () {return new Promise((resolve, reject) {let socket new net.Socket();socket.setKeepAlive(true);socket.connect(conifg.port, conifg.host);// TODO 心跳包的处理逻辑socket.on(connect, () {console.log(socket_pool, conifg.host, conifg.port, connect );resolve(socket);});socket.on(close, (err) { // 先end 事件再close事件console.log(socket_pool, conifg.host, conifg.port, close, err);});socket.on(error, (err) {console.log(socket_pool, conifg.host, conifg.port, error, err);reject(err);});});},//销毁连接destroy: function (socket) {return new Promise((resolve) {socket.destroy(); // 不会触发end 事件 第一次会触发发close事件 如果有message会触发error事件resolve();});},validate: function (socket) { //获取资源池校验资源有效性return new Promise((resolve) {// console.log(socket.destroyed:, socket.destroyed, socket.readable:, socket.readable, socket.writable:, socket.writable);if (socket.destroyed || !socket.readable || !socket.writable) {return resolve(false);} else {return resolve(true);}});}};const pool genericPool.createPool(factory, options);pool.on(factoryCreateError, (err) { // 监听新建长连接出错 让请求直接返回错误const clientResourceRequest pool._waitingClientsQueue.dequeue();if (clientResourceRequest) {clientResourceRequest.reject(err);}});return pool; };let pool createPool({port: 9000,host: 127.0.0.1,options: {min: 0, max: 10} }); 使用连接池 下面连接池的使用使用的协议是我们之前自定义的协议。 let pool createPool({port: 9000,host: 127.0.0.1,options: {min: 0, max: 10} }); const getHeader (num) {return length: (Array(13).join(0) num).slice(-13); } const request async (requestDataBuff) {let client;try {client await pool.acquire();} catch (e) {console.log(acquire socket client failed: , e);throw e;}let timeout 10000;return new Promise((resolve, reject) {let chunks [];let length 0;client.setTimeout(timeout);client.removeAllListeners(error);client.on(error, (err) {client.removeAllListeners(error);client.removeAllListeners(data);client.removeAllListeners(timeout);pool.destroyed(client);reject(err);});client.on(timeout, () {client.removeAllListeners(error);client.removeAllListeners(data);client.removeAllListeners(timeout);// 应该销毁以防下一个req的data事件监听才返回数据pool.destroy(client);// pool.release(client);reject(socket connect timeout set ${timeout});});let header getHeader(requestDataBuff.length);client.write(header);client.write(requestDataBuff);client.on(data, (chunk) {let content chunk.toString();console.log(content, content, content.length);// TODO 过滤心跳包if (content.indexOf(length:) 0){length parseInt(content.substring(7,20));console.log(length, length);chunks [chunk.slice(20, chunk.length)];} else {chunks.push(chunk);}let heap Buffer.concat(chunks);console.log(heap.length, heap.length);if (heap.length length) {pool.release(client);client.removeAllListeners(error);client.removeAllListeners(data);client.removeAllListeners(timeout);try {// console.log(收到数据, JSON.parse(heap.toString()));resolve(JSON.parse(heap.toString()));} catch (err) {reject(err);console.log(数据解析失败);}}});}); } request(Buffer.from(JSON.stringify({a: a}))).then((data) {console.log(收到服务的数据,data)}).catch(err {console.log(err);});request(Buffer.from(JSON.stringify({b: b}))).then((data) {console.log(收到服务的数据,data)}).catch(err {console.log(err);});setTimeout(function () { //查看是否会复用Socket 有没有建立新的连接request(Buffer.from(JSON.stringify({c: c}))).then((data) {console.log(收到服务的数据,data)}).catch(err {console.log(err);});request(Buffer.from(JSON.stringify({d: d}))).then((data) {console.log(收到服务的数据,data)}).catch(err {console.log(err);}); }, 1000) 日志打印 socket_pool 127.0.0.1 9000 connectsocket_pool 127.0.0.1 9000 connectcontent length:0000000000040服务端的数据数据:{\a\:\a\} 44length 40heap.length 40收到服务的数据 服务端的数据数据:{a:a}content length:0000000000040服务端的数据数据:{\b\:\b\} 44length 40heap.length 40收到服务的数据 服务端的数据数据:{b:b}content length:0000000000040 20length 40heap.length 0content 服务端的数据数据:{\c\:\c\} 24heap.length 40收到服务的数据 服务端的数据数据:{c:c}content length:0000000000040服务端的数据数据:{\d\:\d\} 44length 40heap.length 40收到服务的数据 服务端的数据数据:{d:d} 这里看到前面两个请求都建立了新的Socket连接 socket_pool 127.0.0.1 9000 connect定时器结束后重新发起两个请求就没有建立新的Socket连接了直接从连接池里面获取Socket连接资源。 源码分析 发现主要的代码就位于lib文件夹中的Pool.js 构造函数 lib/Pool.js /*** Generate an Object pool with a specified factory and config.** param {typeof DefaultEvictor} Evictor* param {typeof Deque} Deque* param {typeof PriorityQueue} PriorityQueue* param {Object} factory* Factory to be used for generating and destroying the items.* param {Function} factory.create* Should create the item to be acquired,* and call its first callback argument with the generated item as its argument.* param {Function} factory.destroy* Should gently close any resources that the item is using.* Called before the items is destroyed.* param {Function} factory.validate* Test if a resource is still valid .Should return a promise that resolves to a boolean, true if resource is still valid and false* If it should be removed from pool.* param {Object} options*/constructor(Evictor, Deque, PriorityQueue, factory, options) {super();factoryValidator(factory); // 检验我们定义的factory的有效性包含create destroy validatethis._config new PoolOptions(options); // 连接池配置// TODO: fix up this ugly glue-ingthis._Promise this._config.Promise;this._factory factory;this._draining false;this._started false;/*** Holds waiting clients* type {PriorityQueue}*/this._waitingClientsQueue new PriorityQueue(this._config.priorityRange); // 请求的对象管管理队列queue 初始化queue的size 1 { _size: 1, _slots: [ Queue { _list: [Object] } ] }/*** Collection of promises for resource creation calls made by the pool to factory.create* type {Set}*/this._factoryCreateOperations new Set(); // 正在创建的长连接/*** Collection of promises for resource destruction calls made by the pool to factory.destroy* type {Set}*/this._factoryDestroyOperations new Set(); // 正在销毁的长连接/*** A queue/stack of pooledResources awaiting acquisition* TODO: replace with LinkedList backed array* type {Deque}*/this._availableObjects new Deque(); // 空闲的资源长连接/*** Collection of references for any resource that are undergoing validation before being acquired* type {Set}*/this._testOnBorrowResources new Set(); // 正在检验有效性的资源/*** Collection of references for any resource that are undergoing validation before being returned* type {Set}*/this._testOnReturnResources new Set();/*** Collection of promises for any validations currently in process* type {Set}*/this._validationOperations new Set();// 正在校验的中间temp/*** All objects associated with this pool in any state (except destroyed)* type {Set}*/this._allObjects new Set(); // 所有的链接资源 是一个 PooledResource对象/*** Loans keyed by the borrowed resource* type {Map}*/this._resourceLoans new Map(); // 被借用的对象的map release的时候用到/*** Infinitely looping iterator over available object* type {DequeIterator}*/this._evictionIterator this._availableObjects.iterator(); // 一个迭代器this._evictor new Evictor();/*** handle for setTimeout for next eviction run* type {(number|null)}*/this._scheduledEviction null;// create initial resources (if factory.min 0)if (this._config.autostart true) { // 初始化最小的连接数量this.start();}} 可以看到包含之前说的空闲的资源队列正在请求的资源队列正在等待的请求队列等。 下面查看 Pool.acquire 方法 lib/Pool.js /*** Request a new resource. The callback will be called,* when a new resource is available, passing the resource to the callback.* TODO: should we add a seperate acquireWithPriority function** param {Number} [priority0]* Optional. Integer between 0 and (priorityRange - 1). Specifies the priority* of the caller if there are no available resources. Lower numbers mean higher* priority.** returns {Promise}*/acquire(priority) { // 空闲资源队列资源是有优先等级的 if (this._started false this._config.autostart false) {this.start(); // 会在this._allObjects 添加min的连接对象数}if (this._draining) { // 如果是在资源释放阶段就不能再请求资源了return this._Promise.reject(new Error(pool is draining and cannot accept work));}// 如果要设置了等待队列的长度且要等待 如果超过了就返回资源不可获取// TODO: should we defer this check till after this event loop incase the situation changes in the meantimeif (this._config.maxWaitingClients ! undefined this._waitingClientsQueue.length this._config.maxWaitingClients) {return this._Promise.reject(new Error(max waitingClients count exceeded));}const resourceRequest new ResourceRequest(this._config.acquireTimeoutMillis, // 对象里面的超时配置 表示等待时间 会启动一个定时 超时了就触发resourceRequest.promise 的reject触发this._Promise);// console.log(resourceRequest)this._waitingClientsQueue.enqueue(resourceRequest, priority); // 请求进入等待请求队列this._dispense(); // 进行资源分发 最终会触发resourceRequest.promise的resolve(client) return resourceRequest.promise; // 返回的是一个promise对象resolve却是在其他地方触发} /*** Attempt to resolve an outstanding resource request using an available resource from* the pool, or creating new ones** private*/_dispense() {/*** Local variables for ease of reading/writing* these dont (shouldnt) change across the execution of this fn*/const numWaitingClients this._waitingClientsQueue.length; // 正在等待的请求的队列长度 各个优先级的总和console.log(numWaitingClients, numWaitingClients) // 1// If there arent any waiting requests then there is nothing to do// so lets short-circuitif (numWaitingClients 1) {return;}// max: 10, min: 4console.log(_potentiallyAllocableResourceCount, this._potentiallyAllocableResourceCount) // 目前潜在空闲可用的连接数量const resourceShortfall numWaitingClients - this._potentiallyAllocableResourceCount; // 还差几个可用的 小于零表示不需要 大于0表示需要新建长连接的数量console.log(spareResourceCapacity, this.spareResourceCapacity) // 距离max数量的还有几个没有创建const actualNumberOfResourcesToCreate Math.min(this.spareResourceCapacity, // -6resourceShortfall // 这个是 -3); // 如果resourceShortfall0 表示需要新建但是这新建的数量不能超过spareResourceCapacity最多可创建的console.log(actualNumberOfResourcesToCreate, actualNumberOfResourcesToCreate) // 如果actualNumberOfResourcesToCreate 0 表示需要创建连接for (let i 0; actualNumberOfResourcesToCreate i; i) {this._createResource(); // 新增新的长连接}// If we are doing test-on-borrow see how many more resources need to be moved into test// to help satisfy waitingClientsif (this._config.testOnBorrow true) { // 如果开启了使用前校验资源的有效性// how many available resources do we need to shift into testconst desiredNumberOfResourcesToMoveIntoTest numWaitingClients - this._testOnBorrowResources.size;// 1const actualNumberOfResourcesToMoveIntoTest Math.min(this._availableObjects.length, // 3desiredNumberOfResourcesToMoveIntoTest // 1);for (let i 0; actualNumberOfResourcesToMoveIntoTest i; i) { // 需要有效性校验的数量 至少满足最小的waiting clinetthis._testOnBorrow(); // 资源有效校验后再分发}}// if we arent testing-on-borrow then lets try to allocate what we canif (this._config.testOnBorrow false) { // 如果没有开启有效性校验 就开启有效资源的分发const actualNumberOfResourcesToDispatch Math.min(this._availableObjects.length,numWaitingClients);for (let i 0; actualNumberOfResourcesToDispatch i; i) { // 开始分发资源this._dispatchResource();}}}/*** Attempt to move an available resource to a waiting client* return {Boolean} [description]*/_dispatchResource() {if (this._availableObjects.length 1) {return false;}const pooledResource this._availableObjects.shift(); // 从可以资源池里面取出一个this._dispatchPooledResourceToNextWaitingClient(pooledResource); // 分发return false;}/*** Dispatches a pooledResource to the next waiting client (if any) else* puts the PooledResource back on the available list* param {PooledResource} pooledResource [description]* return {Boolean} [description]*/_dispatchPooledResourceToNextWaitingClient(pooledResource) {const clientResourceRequest this._waitingClientsQueue.dequeue(); // 可能是undefined 取出一个等待的queneconsole.log(clientResourceRequest.state, clientResourceRequest.state);if (clientResourceRequest undefined ||clientResourceRequest.state ! Deferred.PENDING) {console.log(没有等待的)// While we were away either all the waiting clients timed out// or were somehow fulfilled. put our pooledResource back.this._addPooledResourceToAvailableObjects(pooledResource); // 在可用的资源里面添加一个// TODO: do need to trigger anything before we leave?return false;}// TODO clientResourceRequest 的state是否需要判断 如果已经是resolve的状态 已经超时回去了 这个是否有问题const loan new ResourceLoan(pooledResource, this._Promise); this._resourceLoans.set(pooledResource.obj, loan); // _resourceLoans 是个map kvalue pooledResource.obj 就是socket本身pooledResource.allocate(); // 标识资源的状态是正在被使用clientResourceRequest.resolve(pooledResource.obj); // acquire方法返回的promise对象的resolve在这里执行的return true;} 上面的代码就按种情况一直走下到最终获取到长连接的资源其他更多代码大家可以自己去深入了解。
http://www.w-s-a.com/news/2078/

相关文章:

  • 河南企业网站优化外包网站怎么做来流量
  • 网站建设的参考文献网站设计网页的优缺点
  • WordPress多站點支付插件内江市网站建设培训
  • 做做网站已更新动漫制作专业需要学什么
  • dfv印花图案设计网站网站建设应该应聘什么岗位
  • 网站后台管理系统模板下载专业网站推广的公司哪家好
  • 克拉玛依市建设局网站网页设计板式重构
  • 网站新闻专题怎么做湖南营销型网站建设 要上磐石网络
  • 阿里云发布网站成都轨迹公布
  • php网站源码架构谷歌站群系统
  • 潮州网站seowordpress 调用置顶文章
  • 做带会员后台的网站用什么软件旅游网站建设资金请示
  • 商品网站怎么做wordpress 表情拉长
  • 商城网站设计费用网络公司怎样推广网站
  • 视频公司的网站设计工图网
  • 免费快速网站十八个免费的舆情网站