做动态的网站,山东网站建设团队,长沙seo关键词排名优化,在线制作网址免费目录 项目初始与项目演示HTTP服务器基础认识Reactor模式基础认识单Reactor单线程模式认识单Reactor多线程模式认识多Reactor多线程模式认识 目标定位总体大模块划分server模块的管理思想Buffer子模块Socket子模块Channel子模块Connection子模块Acceptor子模块TimerQueue子模块P… 目录 项目初始与项目演示HTTP服务器基础认识Reactor模式基础认识单Reactor单线程模式认识单Reactor多线程模式认识多Reactor多线程模式认识 目标定位总体大模块划分server模块的管理思想Buffer子模块Socket子模块Channel子模块Connection子模块Acceptor子模块TimerQueue子模块Poller子模块EventLoop子模块TcpServer子模块通信连接管理模块关系图监听连接管理模块关系图事件监控管理模块关系图 bind函数的认识与基本使用timerfd的认识与基本使用时间轮定时器的基本思想理解时间轮定时器的代码设计及实现正则表达式基本认识正则表达式提取HTTP请求行通用类型容器Any类设计思想Buffer缓冲区设计思想日志打印宏的编写Socket套接字类设计思想Channel事件管理类设计思想Poller描述符监控类设计思想Poller模块与Channel模块整合与测试EventLoop模块中eventfd的认识EventLoop模块设计思想EventLoop与TimerWheel定时器模块整合EventLoop模块联调中的模块流程关系图Connection模块设计思想Acceptor模块设计思想LoopThread模块设计思想LoopThreadPool模块设计思想TcpServer模块设计思想基于TcpServer实现回显服务器EchoServer回显服务器性能测试EchoServer回显服务器模块关系图HTTP协议模块的子模块划分Util工具类设计思想Util工具类字符串分割函数实现Util工具类文件数据读取函数实现Util工具类文件数据写入函数实现Util工具类UrlEncode函数实现Util工具类UrlDecode函数实现Util工具类Mime与StatuUtil工具类文件类型判断接口实现Util工具类路径有效性判断接口实现 HttpRequest模块设计思想HttpResponse模块设计思想HttpContext模块设计思想HttpServer模块设计思想基于HttpServer搭建HTTP服务器HTTP服务器长连接测试HTTP服务器超时连接测试HTTP服务器错误请求测试HTTP服务器业务处理超时测试HTTP服务器同时多条请求测试HTTP服务器大文件传输测试HTTP服务器性能压力测试说明 项目源代码 https://gitee.com/wufangitee/linux-study/tree/master/Project_mudouServer/httpServer 项目初始与项目演示 上面演示了服务器在一万并发量的压力下运行。但此时为了演示效果这并不是性能的极致。 通过咱们实现的高并发服务器组件可以简洁快速的完成一个高性能的服务器搭建。 并且我们组件也提供的应用层的协议支持通过组件内的应用层协议支持可以快速的完成一个高性能应用服务器的搭建当前为了便于项目的演示项目中提供的是HTTP协议组件的支持。 注意能支持不同的应用层协议只支持了HTTP协议和只能支持HTTP协议是不一样的而我们实现的就是前者。 在这里要明确的是咱们要实现的是一个高并发服务器组件因此当前的项目中并不包含实际的业务内容。 了解了我们要做什么下面就展示一下代码的概况 server.hpp是高性能服务器的实现一共1500行代码左右 在这个服务器之上可以提供不同协议支持目前支持的是HTTP服务器。在http.hpp文件里代码量是900行左右总代码量就是2500行左右。 我们不能把所有的应用层协议都给支持了因为应用层协议太多了所以就实现了一个最常用的HTTP协议来支持。 再明确一下目标 1.高并发服务器 2.在服务器基础上提供应用层协议的支持HTTP HTTP服务器基础认识 HTTPHyper Text Transter Protocol超文本传输协议是一种简单的请求-响应协议客户端根据自己的需要向服务器发送请求服务器针对请求提供服务完毕后通信结束。 HTTP协议是一个运行在TCP协议之上的应用层协议这一点本质上是告诉我们HTTP服务器其实就是一个TCP服务器只不过在应用层基于HTTP协议格式进行数据的组织和解析来明确客户端的请求并完成业务处理。 仅了解这个通信模式还不够还需要有更深的理解理解HTTP协议的格式如何解析HTTP协议的请求以及如何去组织HTTP的响应。 如果看项目的同学对HTTP协议还并不是特别的了解那么建议把HTTP协议格式这一块去回顾一下。HTTP协议在往期的文章中也给大家讲过。 这个例子是最简单的HTTP服务器代码再讲这些简单代码是没有必要的 #include iostream
#include cstring
#include string
#include unistd.h
#include sys/types.h
#include sys/socket.h
#include arpa/inet.hint main()
{int sockfd socket(AF_INET, SOCK_STREAM, 0);if (sockfd 0){perror(socket failed);return -1;}struct sockaddr_in server;server.sin_addr.s_addr INADDR_ANY;server.sin_family AF_INET;server.sin_port htons(8085);int n bind(sockfd, (const sockaddr*)server, sizeof(server));if (n 0){perror(bind failed);return -1;}n listen(sockfd, 5);if (n 0){perror(listen failed);return -1;}while (1){int newsockfd accept(sockfd, nullptr, nullptr);if (newsockfd 0){perror(accept error);continue;}char buffer[4096];n recv(newsockfd, buffer, 4095, 0);if (n 0){perror(recv failed);continue;}std::string body htmlbodyh1Hello World/h1/body/html;std::string rsp HTTP/1.1 200 OK\r\n;rsp Content-Length: std::to_string(body.size()) \r\n;rsp Content-Type: text/html\r\n;rsp \r\n;rsp body;n send(newsockfd, rsp.c_str(), rsp.size(), 0);if (n 0){perror(send error);close(newsockfd);}}
}从这里面也能看出来其实HTTP服务器的搭建就是基于一个TCP服务器的搭建。只不过上层通信的数据是用HTTP协议格式来通信数据。 实现HTTP服务器简单的但是实现一个高性能的HTTP的服务器却并不简单。上面是一个客户端一个浏览器请求我们的服务器那如果有上万个客户端访问那我们的服务器能抗得住那么多的客户端请求吗所以我们就要考虑如何将我们这个服务器实现成高性能的这才是我们需要更加去关注的一个重点 Reactor模式基础认识 Reactor模式也叫做事件驱动处理模式。 一个或多个客户端同时连接到我们的服务器上面然后去请求我们的服务器那我们的服务器如何对我们的客户端进行处理呢处理过程就是哪一个客户端给我发送了数据触发了我的事件那么我就去处理谁你要是没有给我发送数据没触发我的事件我就不取处理你。这就是事件驱动处理模式谁触发了事件服务器就处理谁那么问题来了服务器怎么知道谁触发了事件 所以在Reactor模式里面就使用到了一个非常关键的技术使用I/O多路复用也叫I/O多路转接。统一监听事件收到事件后分发给处理进行或线程这是编写高性能网络服务器的必备技术之一。 我们将Reactor模式分为了三种 单Reactor单线程单I/O多路复用 业务处理单Reactor多线程单I/O多路复用 线程池业务处理多Reactor多线程多I/O多路复用 线程池业务处理 单Reactor单线程模式认识 优点因为是单线程操作操作都是串行化思想较为简单编码流程也较为简单不用考虑进行或线程间的通信以及线程安全问题 缺点因为所有的时间监控以及业务处理都是在一个线程中完成的因此很容易造成性能瓶颈。如有一万个客户端连接上来了有八千个触发了事件我得一个一个处理我在处理这个的时候肯定处理不了下一个下一个就得等着就导致后面的客户端就等待了很长时间就会导致连接超时。 适用场景客户端数量较少且业务处理简单快速的场景。所有的技术都是因地制宜的不能因为缺点比较大就放弃它。 单Reactor多线程模式认识 有一个线程进行Reactor模式事件监控客户端的请求到来之后我们的服务器这边有一个线程专门对所有客户端进行事件监控一有事件触发了服务器就进行I/O处理把你的数据读取上来读取上来之后并不进行业务处理因为我认为业务处理时间比较长这样就会导致我对其它客户端处理的时候不够及时无法充分利用CPU多核资源所以这时候我们就加上了业务功能的线程池。 将读取上来的数据投入到业务处理的线程池里面让线程池里面的线程进行业务处理事件监控的线程池只需要做两件事情事件监控以及客户端的IO操作。业务处理就分离开了对我们CPU多核资源有充分的利用。 优点充分利用了CPU多核资源处理效率可以更高降低了代码的耦合度 缺点在单个Reactor线程中包含了对所有客户端的事件监控以及所有客户端的IO操作不利于高并发场景如我这个Reactor线程还在给某一个客户端进行IO操作但这时候有大量的客户端连接请求到来了我在处理这个客户端IO的时候就来不及处理其他客户端的连接请求 多Reactor多线程模式认识 多Reactor多线程模式基于但Reactor多线程的缺点考虑如果IO的时候有连接到来无法处理因此将连接处理单拎出来。 因此让一个Reactor线程仅仅进行新连接的处理让其他的Reactor线程进行IO处理IO Reactor线程拿到数据分发给业务线程池进行业务处理。 因此多Reactor多线程模式也叫做主从Reactor模型 主Reactor线程进行新连接事件监控 从属Reactor线程进行IO事件监控 业务线程池进行业务处理 优点充分利用CPU多核资源并且可以进行合理分配 但是大家也要理解执行流并不是越多越好因为执行流多了反而会增加CPU切换调度的成本 所以有些主从Reactor模式里不设置业务线程业务也再我们的从属Reactor里面来完成也就是说从属Reactor既进行IO事件监控IO处理也进行业务处理对资源的最大化利用。线程多了看似耦合度降低了模块清晰了但是你要处理的会变得更加复杂在多线程的操作中就要涉及到锁的操作涉及到锁就会降低效率第二线程多了CPU的切换调度频繁了又是进一步的降低效率。 那么还不如在某些情况下把业务处理放到从属Reactor里面读取完数据之后直接进行业务处理处理完毕之后然后进行响应。 目标定位 我们实现的高并发服务器用的模型就是主从Reactor模型我们采用的思想是one thread one loop就是把业务线程池摘掉将业务处理也放入从属Reactor当中只有主Reactor线程和多个从属Reactor线程。 主线程要做的事新连接监控新连接处理。 从属线程要做的事IO事件监控IO处理业务处理 这样做的好处把框架做了中间的简化不让这个框架过于复杂因为在同一台主机上执行流并不是越多越好执行流多了在多对多的情况下我们需要考虑加锁来考虑线程安全问题平白又多出了锁的消耗。 因为我们在进行CPU资源分配的时候只需要考虑需要去创建多少个从属Reactor线程就可以了并且进行业务处理的时候也不需要再去考虑更多的线程安全的操作了对于每一个连接的所有的操作来说都是放到同一个从属Reactor线程里面来完成的。我们也不需要考虑和其它线程的锁争抢的操作。这就是我们要实现的高并发的one thread one loop思想的模式。 总体大模块划分 实现这个项目之前对我们的项目进行整体的模块划分因为对模块划分之后才能对我们项目有更透彻的理解理解了各个模块之间的关系之后再去实现我们服务器的时候才会更加的轻松更加的有条理才能明白我们写的每一句代码到底是什么。 我们要实现的是一个带有协议支持的Reactor模型高性能服务器因此将模块总共划分为了两个大模块 第一个模块是Server模块实现的功能是Reactor高性能的TCP服务器。 第二个模块是协议模块对当前的Reactor模型服务器提供应用层协议支持。 server模块的管理思想 server模块就是对所有的连接和线程进行管理让他们各司其职在合适的时候做合适的事。 而具体的管理也分为四个方面 监听连接管理。要搭建一个服务器我们首先得有监听套接字进行监听获取新连接通信连接管理。获取新连接之后就相当于有了通信连接的套接字我们需要对它有不同事件到来的处理超时连接管理。我搭建了TCP服务器管理了大量的客户端如果有些恶意的客户端连接到我的服务器上面了之后并不通信就是占着我服务器的资源那怎么办所以我们还要对我们的服务器有超时连接的管理。在一个连接超时后能释放掉归还我们的资源。线程的管理创建的线程它内部事件循环的管理 下面我们对server模块进行更加细致的划分 Buffer子模块 Buffer模块就是一个缓冲区模块用于实现通信中用户态的接收缓冲区和发送缓冲区功能 Socket子模块 就是对套接字操作进行封装的模块对套接字进行操作的时候更加的简便这个模块我们在往期的文章中也经常出现。 Channel子模块 每一个描述符在进行事件监控的时候都有可能会监控可读、可写、错误事件如果我们想要对一个文件描述符进行可写监控如果可写监控已经被设置那我们还需要再对可写监控设置一遍吗显然这是没有意义的有人就要说了重新设置就重新设置吧反正在设置也不会有什么问题问题在于我们我们系统调用是有消耗的并且这样杂乱无章的管理很不好所以我们就设计出了Channel模块。还有一点如果这个描述符触发了可读事件接下来按理来说应该去读取数据放到缓冲区里面去如果写原始代码的话很多地方都要写这段代码太麻烦了我们可以给channel模块再设置一些事件回调函数让他们在触发了某个事件的时候直接去调用某个函数就行了这样设置代码会更加清晰更加有条理。 Channel模块是对一个描述符需要进行IO事件管理的模块实现对描述符可读可写错误…事件的管理操作以及Poller模块对描述符进行IO事件监控就绪后根据不同的事件回调不同的处理函数功能。 Connection子模块 Connection模块是对Buffer模块Socket模块Channel模块的一个整体封装实现了第一个通信套接字的整体的管理。 具体处理流程如下 1.实现想Channel提供可读可写错误等不同事件的IO事件回调函数然后将Channel和对应的描述符添加到Poller事件监控中。 2.当描述符在Poller模块中就绪了IO可读事件则调用描述符对应Channel中保存的读事件处理函数进行数据读取全部读取到Connection管理的用户态接收缓冲区中。然后调用由组件使用者传入的新数据到来回调函数进行处理。 3.组件使用者进行数据的业务处理完毕后通过Connection用使用者提供的数据发送接口将数据写入Connection的发送缓冲区中。 4.启动描述符在Poll模块中的IO写事件监控就绪后调用Channel中保存的写事件处理函数将发送缓冲区中的数据通过Socket进行面向系统的实际数据发送。 Acceptor子模块 Acceptor模块是对Socket模块Channel模块的一个整体封装实现了对一个监听套接字的整体管理 监听到新连接了我们接下来的操作应该是获取一个新的文件描述符封装成Connection对象出来并给这个对象设置各种事件回调函数但Acceptor也不知道监听到新连接了该怎么办通信套接字的各种事件该怎么处理所以在功能设计里面需要设置一个回调函数让Server模块来告诉它获取到新连接该怎么处理。 具体处理流程 1.向监听套接字对应的Channel提供可读事件的回调函数函数的功能就是如何获取新的通信连接。 2.为新连接构建一个Connection对象出来。 TimerQueue子模块 TimerQueue模块是实现固定时间定时任务的模块可以理解就是向定时任务管理器中添加一个任务任务将在固定的时间后被执行同时也可以通过刷新定时任务来延迟任务的执行。 为了实现我们服务器的超时连接管理如果一个连接半天不发消息有些时候我们就希望把这个连接给释放掉。而我们就只需将这个固定时间执行的任务设置成销毁连接即可。 这个模块主要是对Connection对象的生命周期管理对非活跃连接进行超时后的释放功能。 Poller子模块 Poller模块是对epoll进行封装的一个模块主要实现epoll的IO事件添加修改移除获取活跃连接功能。 EventLoop子模块 我们要实现的服务器是一个one thread one loop的服务器这里面的loop指的是EventLoop事件循环 这个模块必然是我一个模块对应一个线程在一个线程内部是运行EventLoop的启动函数。 为什么要一个模块对应一个线程 因为我们提供了外界使用者可以对通信连接进行设计操作的函数所以不仅仅是自己线程的EventLoop会产生的事件会对Connection处理。其他的线程也会对Connection进行处理。 而使用这个组件的用户是如何操作的我们这个组件是不知道的但我们知道一定在大多数情况中存在多个线程对Connection进行操作如果Connection不在自己对应的线程去执行则可能存在线程安全问题。 解决方案 一个线程里面监控有很多的连接不能立马执行你这个Connection的任务所以线程中要有一个任务队列。 TcpServer子模块 这是一个最终的整合模块前面的子模块要么是功能性模块对内而这个模块是完全提供给外界使用者对外用来搭建服务器的模块。 通信连接管理模块关系图 我们了解了每个模块的功能、意义和接口下面我们来看一下Server下各个模块的关系是什么样的 Buffer接收到数据之后就要去调用TcpServer设置的事件回调中的新数据接收后的回调即业务处理。任意事件触发了不仅仅要刷新活跃度还要执行组件使用者的任意事件回调。以及连接关闭了挂断了我们不单要执行自己的还要调用用户设置的。 监听连接管理模块关系图 Acceptor模块一旦触发了可读事件获取了新连接这时候初始化的过程就是为新连接创建一个Connection对象并且为Connection对象设置一系列的回调函数TcpServer的回调函数告诉它一旦有事件到来了要该怎么处理之后将这个Connection添加事件监控。一旦触发了事件就可以调用对应的Channel里面的回调函数。 事件监控管理模块关系图 bind函数的认识与基本使用 需要了解的一些前置知识这样才能在我们的项目中用到的时候更加轻松一点。 bind函数函数有几个参数我们就需要传递几个参数这个函数的功能给某一个参数给一个固定值生成一个新的函数对象。 举个例子
#include iostream
#include string
#include functionalint Sum(int a, int b)
{return a b;
}int main()
{auto func std::bind(Sum, 1, 2);std::cout func() std::endl;return 0;
}#include iostream
#include string
#include functionalint Sum(int a, int b)
{return a b;
}int Sum100(int b)
{return 100 b;
}int main()
{auto func std::bind(Sum, 100, std::placeholders::_1);//此时std::bind(Sum, 100, std::placeholders::_1)就等价于Sum100std::cout func(1) std::endl;std::cout func(2) std::endl;std::cout func(3) std::endl;std::cout Sum100(1) std::endl;std::cout Sum100(2) std::endl;std::cout Sum100(3) std::endl;return 0;
} 这有什么作用呢 基于bind的作用当我们在设计一些线程池或者任务池的时候就可以将任务池中的任务设置为函数类型函数的参数由添加任务者直接使用bind进行适配绑定设置而任务池中的任务被处理只需要取出一个个的函数进行执行即可。 #include iostream
#include string
#include vector
#include functionalint Sum(int a, int b)
{return a b;
}int Sum100(int b)
{return 100 b;
}int main()
{using Task std::functionint();std::vectorTask arry;arry.push_back(std::bind(Sum, 1, 2));arry.push_back(std::bind(Sum, 3, 4));arry.push_back(std::bind(Sum, 5, 6));arry.push_back(std::bind(Sum, 7, 8));for (auto f : arry){std::cout f() std::endl;}return 0;
} 这样做有个好处就是这种任务池在设计的时候不用考虑都有哪些任务处理方式了处理函数是如何设计的有多少个什么样的参数这些都不用考虑了降低了代码之间的耦合度。 timerfd的认识与基本使用 在当前的高并发服务器中我们不得不考虑一个问题那就是连接的超时关闭问题。我们需要避免一个连接长时间不通信但是也不关闭空耗资源的情况。这时候我们就需要一个定时任务定时对超时过期的连接进行释放 Linux给我们提供了定时器 功能创建一个定时器。Linux下一切皆文件对定时器操作就是对文件进行操作。 clockid CLOCK_REALTIME以系统时间作为计时基准值如果系统时间发生了改变就会出问题 CLOCK_MONOTONIC以系统启动事件进行递增的一个基准值定时器不会随着系统时间改变而改变从系统开机到现在经过了多长时间作为定值并不受你系统时间的改变而受影响。 flags0 - 阻塞操作 返回值文件描述符 定时器定时的原理每隔一段时间定时器的超时时间系统就会给这个描述符对应的定时器写入一个8字节数据。 假如创建了一个定时器定时器的超时时间是3s每隔3s系统会都会给描述符写入一个1表示从上一次读取数据到现在超时了1次又过了30s你才来读取则你这时候就会读取到一个10表示从上一次读取数据到限制超时了10次。 功能启动定时器 fdtimerfd_create函数的返回值文件描述符 – 创建的定时器的标识符 flags默认设置为0 - 使用相对时间相对于现在的时间往后延长几秒钟为一次超时 struct itimerspec* new设置的超时时间 old用于接收当前定时器原有的超时时间设置主要是用于还原以前设置的定时器如果不关心可以设置NULL it_interval第一次超时之后每次的超时间隔时间 it_value第一次超时的时间 举例
#include iostream
#include sys/timerfd.h
#include cstring
#include unistd.hint main()
{int timerfd timerfd_create(CLOCK_MONOTONIC, 0);if (timerfd 0){perror(create timerfd failed);return -1;}struct itimerspec itime;itime.it_value.tv_sec 1;itime.it_value.tv_nsec 0;itime.it_interval.tv_sec 1;itime.it_interval.tv_nsec 0;timerfd_settime(timerfd, 0, itime, NULL);while (1){uint64_t times;int ret read(timerfd, times, sizeof(times)); //每次写入固定位8字节if (ret 0){perror(read failed);break;}printf(距离上一次超时了%ld次\n, times); //注意是%ld不是%d}close(timerfd);return 0;
}这个接口是需要配合读事件监控来使用的。了解了Linux这个接口后我们就可以这样设计启动这个定时器每隔一秒钟就去检测一下然后将所有连接拿过来遍历一遍看谁已经非活跃超时了超时了则释放掉。 时间轮定时器的基本思想理解 上述设计定时器的解决方案存在一个很大的问题每次超时都要将所有的连接遍历一遍如果有上万个连接效率无疑是较为低下的。 如何提高我们的效率我们就可以用到我们的小根堆了这样只需要每次针对堆顶部分的连接逐个释放直到没有超时的连接为止这样就可以大大提高处理的效率。 上述方法可以实现定时任务但是这里给大家介绍另一种方案时间轮 时间轮的思想来源于钟表如果我们定了一个3点钟的闹钟则当时针走到3的时候就代表时间到了。 同样的道理如果我们定义了一个大小为60个元素的数组并且有一个指针指向数组起始位置这个指针每秒钟向后走动一步走到哪里则代表哪里的任务该被执行了那么如果我们想要定一个3s后的任务则只需要将任务添加到tick3位置则每秒钟走一步三秒钟后tick走到对应位置这时候执行对应位置的任务即可。 这样就会有一个问题了如果是以秒作为计时单位当前的这个数组有7个元素则最大定时时间就只有7s如果数组有60个元素则最大定时时间为60s。如果我想定一天的呢那不是需要开很大的空间如果我想定两天、一个月、一年的呢 解决方案 上面思想仍存在的问题 1。同一时刻的定时任务只能添加一个因为元素只有一块空间需要考虑如何在同一时刻支持支持多个定时任务。解决方案将时间轮的一维数组设计为二维数组。 2。假设当前的定时任务是一个连接的非活跃销毁任务这个任务什么时候添加到时间轮中比较合适 我们并不知道一个连接什么时候是非活跃的只知道连接什么时候活跃当一个连接有事件了就是活跃的。我们可以判断如果一个连接30s都没有事件通信则是一个非活跃连接这时候就该销毁了。但是一个连接在建立的时候添加了一个30s后的销毁任务如果这个连接在30s内事件通信了一次则不是一个非活跃连接所以我们需要在一个连接有IO事件产生的时候能延时定时任务的执行。 如何实现这样一个功能呢 解决方案类的析构函数 智能指针shared_ptr通过这两个技术可以实现定时任务的延时。 1。使用一个类对定时任务进行封装就是一个定时任务对象当对象被销毁的时候就会自动去执行定时任务将定时任务的执行放到析构函数中 2。shared_ptr用于对new的对象进行空间管理当shared_ptr对一个任务对象进行管理的时候内部有一个计数器计数器为0的时候则释放所管理的对象。 基于这个思想我们可以使用shared_ptr来管理定时器任务对象。 时间轮定时器的代码设计及实现
#include unistd.h
#include cstdint
#include functional
#include vector
#include unordered_map
#include memory
#include iostreamusing TaskFunc std::functionvoid();
using ReleaseFunc std::functionvoid();
class TimerTask
{
private:uint64_t _id; //定时器任务对象ID 定时任务必须得找得着一个程序里定时任务可能有很多uint32_t _timeout; //定时任务的超时时间TaskFunc _task_cb; //定时器要执行的任务//用于删除TimerWheel中保存的定时器任务对象信息定时任务释放的时候也要清理TimerWheel中保存的定时器对象信息//为什么将这个_release设置到TimerTask里面呢不在TimerWheel层管理//因为这个TimerWheel不知道是否某个定时任务真的释放了而TimerTask是最清楚的自己真的释放了就会调用析构函数ReleaseFunc _release;bool _canceled; //false - 代表没有被取消true - 代表取消了
public:TimerTask(uint64_t id, uint32_t timeout, const TaskFunc cb):_id(id), _timeout(timeout), _task_cb(cb), _canceled(false){}~TimerTask(){if (_canceled false) //如果定时任务没有被取消_task_cb();_release();}void SetRelease(const ReleaseFunc cb){_release cb;}uint32_t DelayTime(){return _timeout;}void Cancel(){_canceled true;}
};class TimerWheel
{
private:using WeakTask std::weak_ptrTimerTask;using PtrTask std::shared_ptrTimerTask;std::vectorstd::vectorPtrTask _wheel;int _tick; //当前的秒针走到哪里释放哪里就相当于执行哪里的任务int _capacity; //表盘最大数量 -- 也是最大能设置的延时时间//为什么不用普通指针要用weak_ptr因为刷新定时任务的时候需要通过该weak_ptr找到曾经shared_ptr而普通指针则不行std::unordered_mapuint64_t, WeakTask _timers; //对所有的定时任务进行管理
private:void RemoveTimer(uint64_t id){auto pos _timers.find(id);if (pos ! _timers.end()){_timers.erase(pos);}}
public:TimerWheel():_capacity(60), _tick(0), _wheel(_capacity){}//时间轮提供了一个功能释放定时任务的功能//至于释放的任务是什么这个组件也不知道需要上层对内 设置回调函数void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc cb) //添加定时任务{//1.构建定时任务PtrTask pt(new TimerTask(id, delay, cb));pt-SetRelease(std::bind(TimerWheel::RemoveTimer, this, id));//2.将定时任务加入到_wheel中int i (_tick delay) % _capacity;_wheel[i].push_back(pt);//3.加入到时间轮的_timers里面// std::unordered_mapuint64_t, WeakTask::iterator pos _timers.find(id);auto pos _timers.find(id);if (pos _timers.end()){_timers.insert({id, pt});}}void TimerRefresh(uint64_t id) //刷新/延迟定时任务{//通过id找到对应的定时任务auto pos _timers.find(id);if (pos _timers.end()){//如果没找到定时任务则没办法更新return;}//获取到对应定时任务的shared_ptr并构建一个新的智能指针对应的计数加1PtrTask pt pos-second.lock();//将对应的pt加入到_wheel中int delay pt-DelayTime();int i (_tick delay) % _capacity;_wheel[i].push_back(pt);}//这个函数会每秒钟执行一次相当于秒针向后走了一步void RunTimerTask(){_tick (_tick 1) % _capacity;_wheel[_tick].clear(); //情况指定位置的数组就会把数组中保存的所有管理定时器对象的shared_ptr释放掉}void TimerCancel(uint64_t id){auto pos _timers.find(id);if (pos _timers.end()){//没找到定时任务没法刷新没法延时return;}PtrTask pt pos-second.lock();pt-Cancel();}
};//该Test类用于测试能清楚的观察到过程
struct Test
{Test(){std::cout Test() std::endl;}~Test(){std::cout ~Test() std::endl;}
};//要交给时间轮的定时任务
void DelTest(Test* t)
{delete t;
}int main()
{TimerWheel tw;Test* t new Test();tw.TimerAdd(888, 5, std::bind(DelTest, t));for (int i 0; i 8; i){sleep(1);tw.RunTimerTask(); //硬编码模拟时间轮走动std::cout 走了 i 1 秒 std::endl;}while (1){sleep(1);std::cout ---------------------- std::endl;tw.RunTimerTask();}return 0;
}预期结果过5秒钟定时任务就结束 测试TimerRefresh功能更新main函数
int main()
{TimerWheel tw;Test* t new Test();tw.TimerAdd(888, 5, std::bind(DelTest, t));for (int i 0; i 8; i){sleep(1);tw.RunTimerTask(); //硬编码模拟时间轮走动tw.TimerRefresh(888); //每一秒都刷新定时任务std::cout 走了 i 1 秒 std::endl;}while (1){sleep(1);std::cout ---------------------- std::endl;tw.RunTimerTask();}return 0;
}预期结果过13秒钟定时任务才结束 测试TimerCancel功能更新main函数
int main()
{TimerWheel tw;Test* t new Test();tw.TimerAdd(888, 5, std::bind(DelTest, t));for (int i 0; i 8; i){sleep(1);tw.RunTimerTask(); //硬编码模拟时间轮走动tw.TimerRefresh(888); //每一秒都刷新定时任务std::cout 走了 i 1 秒 std::endl;}tw.TimerCancel(888);while (1){sleep(1);std::cout ---------------------- std::endl;tw.RunTimerTask();}return 0;
}预期结果定时任务不会被执行 正则表达式基本认识 正则表达式regular expression描述了一种字符串的匹配模式可以用来检测一个串是否含有某种特定格式的子串、将匹配的子串替换或者从某个串中提取符合某个条件的子串等。 HTTP协议的数据就有特定的组织格式以前我们都是用原始字符串的格式一个一个来解析我们现在只需要编辑出一种符合HTTP协议规则的正则表达式通过正则表达式来进行匹配以及提取出HTTP请求的各项数据可以让我们更加简单灵活但是效率没有提高 bool std::regex_match (const std::string src, std::smatch matches, std::regex e) src原始字符串 matches正则表达式可以从原始字符串中匹配并提取符合某种规则的数据提取的数据就放在matches中是一个类似于数组的容器。 e正则表达式的匹配规则 返回值用于确定匹配是否成功 举例
#include iostream
#include string
#include regexint main()
{std::string str /numbers/1234; //现在我们想提取出里面的数字字符串std::regex e(/numbers/(\\d)); //括号表示提取数字/* \d在正则表达式里表示数字表示匹配前面子表达式一次或多次转的时候第一步字符串转义。第二步正则表达式转义\d在字符串里表示对d字符进行转义 \\在字符串里表示\所以在字符串里\\d才表示正则表达式的\d*/std::smatch matches;bool ret std::regex_match(str, matches, e);if (ret false){return -1; //字符串整体匹配失败就返回false}for (auto s : matches){//因为我们在匹配的时候首先是一个整体的规则匹配看整体的是否匹配成功首先匹配到的肯定是原始字符串//所以首先存储了原始字符串然后再去存储我们想要提取出来的字符串。所以首先打印的是原始字符串std::cout s std::endl;}return 0;
}编译器版本过低可能产生各种各样的错误如果莫名奇妙的出错了就是需要更新编译器了。 运行结果 正则表达式不用刻意去记忆用的时候去网上搜索即可用到哪些查哪些学哪些忘记了就再查。 正则表达式提取HTTP请求行
#include iostream
#include string
#include regexint main()
{std::string str GET /helloworld/login?userxiaomingpasswd123123 HTTP/1.1;std::smatch matches;std::regex e((GET|HEAD|PUT|POST|DELETE) (/.*) HTTP/(1.[01]));/*(POST|GET)|表示匹配任一字符串并提取出来使用|要加().点表示匹配除\n和\r之外的任何单个字符*表示匹配前面子表达式任意次可0次[01]表示匹配0或1的任一字符*/bool ret regex_match(str, matches, e);if (ret false){std::cerr 匹配失败 std::endl;return -1;}for (auto match : matches){std::cout match std::endl;}return 0;
}运行结果 继续对请求路径和查询字符串进行正则匹配 因为请求行中可能没有查询字符串所以我们写正则表达式的时候需要将这一点考虑进去 #include iostream
#include string
#include regexint main()
{std::string str GET /helloworld/login?userxiaomingpasswd123123 HTTP/1.1;std::smatch matches;// std::regex e((GET|HEAD|PUT|POST|DELETE) (/.*) HTTP/(1.[01]));/*(POST|GET)|表示匹配任一字符串并提取出来使用|要加().点表示匹配除\n和\r之外的任何单个字符*表示匹配前面子表达式任意次可0次[01]表示匹配0或1的任一字符*/std::regex e((GET|HEAD|PUT|POST|DELETE) (/[^?]*(?://?(.*))?) HTTP/(1.[01]));/*[^?]表示匹配非问号的单个字符(?:……)表示匹配某个字符串但是不提取字符串内有()则表示提取?表示匹配前面的子表达式1次或0次因为?在正则表达式里面有特殊含义所以需要转义为/?但/?在字符串里有特殊含义所以需要将/先转义//会被转义为/所以最后写为//?表示匹配一个?字符*/bool ret regex_match(str, matches, e);if (ret false){std::cerr 匹配失败 std::endl;return -1;}for (auto match : matches){std::cout match size: match.str().size() std::endl;}return 0;
}运行结果 当没有查询字符串运行结果 #include iostream
#include string
#include regexint main()
{// std::string str GET /helloworld/login?userxiaomingpasswd123123 HTTP/1.1;std::string str GET /helloworld/login HTTP/1.1;std::smatch matches;// std::regex e((GET|HEAD|PUT|POST|DELETE) (/.*) HTTP/(1.[01]));/*(POST|GET)|表示匹配任一字符串并提取出来使用|要加().点表示匹配除\n和\r之外的任何单个字符*表示匹配前面子表达式任意次可0次[01]表示匹配0或1的任一字符*/std::regex e((GET|HEAD|PUT|POST|DELETE) (/[^?]*(?://?(.*))?) HTTP/(1.[01]));/*[^?]表示匹配非问号的单个字符(?:……)表示匹配某个字符串但是不提取字符串内有()则表示提取?表示匹配前面的子表达式1次或0次因为?在正则表达式里面有特殊含义所以需要转义为/?但/?在字符串里有特殊含义所以需要将/先转义//会被转义为/所以最后写为//?表示匹配一个?字符*/bool ret regex_match(str, matches, e);if (ret false){std::cerr 匹配失败 std::endl;return -1;}for (auto match : matches){std::cout match size: match.str().size() std::endl;}return 0;
}运行结果 通用类型容器Any类设计思想 每一个Connection对连接进行管理最终都不可避免需要涉及到应用层协议的处理因此在Connection中需要设置协议处理的上下文来控制处理节奏。但是应用层协议千千万为了降低耦合度这个协议接收解析上下文就不能有明显的协议倾向它可以使任意协议的上下文信息因此就需要有一个通用的类型来保存各种不同的数据结构。 我们组件可以提供各种不同的协议支持以便于用户去搭建一些应用服务器。问题就在于我们对协议进行支持的时候我们支持的协议是什么我们本身也不清楚。所以我们的并发服务器就需要考虑一个点当我们的socket有数据到来了接收它的数据发现缓冲区的数据不足一条完整的请求或者比一条完整的请求多TCP面向字节流的特点。或者发送请求的数据比内核缓冲区还大如果我们希望把整个数据取出来再解析那你永远也获取不到一个完整的请求。 所以就会出现一种情况处理到一半就没有数据了我们就需要等到下一半数据到来接着处理所以解决方案给服务器里面每一个连接设置一个协议请求处理的上下文这个上下文就是专门来控制请求数据接收以及解析节奏的对于我们的协议支持来说它的功能就是用来记录当前的请求处理到什么的阶段了下次有数据到来了我们应该从缓冲区里取到的数据应该哪里开始接着继续处理。 怎么去保存接收与解析的上下文呢既然我们要支持HTTP协议那我们是不是得定义一个HTTP协议这样的结构这样就能够对接收到的数据进行解析并填充进去并记录解析的状态如果我们的服务器只支持HTTP服务器这么做是没有问题的但是我们的服务器不单单要支持我们的HTTP协议它还要支持各种不同的协议所以在连接里面所设计的上下文就不能是一个固定结构的上下文这时候我们就得有一个容器能够去接收各种不同的数据。 在C语言中通用类型可以使用void*来管理在C中boost库和C17给我们提供了一个通用类型any来灵活使用我们用C17特性中的any而这个any通用类型类实现起来并不复杂我们自己来设计一个any类。 是一个容器容器中可以保存各种不同类型的数据 #include iostream
#include stringtemplateclass T
class Any
{
private:T _content;
};int main()
{Any a;a 100;a std::string(string);return 0;
}这样设计是不行的并不能满足我们的事例。 解决方法Any类保存的是父类对象根据传的参数new出一个子类赋值给父类子类是可变的模版类。 Any类中保存的是holder类的指针当Any容器需要保存一个数据的时候只需要通过placeholder子类实例化一个特定类型的子类对象出来让子类对象保存数据。 #include iostream
#include string
#include typeinfo
#include utilityclass Any
{
public:class holder{public:virtual ~holder(){}virtual const std::type_info type() 0; //获取当前子类的数据类型 -- 返回类型是const type_infovirtual holder* clone() 0; //针对当前的对象自身克隆出一个新的子类对象};templateclass Tclass placeholder : public holder{public:placeholder(const T val T()):_val(val){}virtual const std::type_info type(){return typeid(T);}virtual holder* clone(){return new placeholder(_val);}T _val;};Any():_content(NULL){}template class TAny(const T content):_content(new placeholderT(content)){}~Any(){delete _content;}Any swap(Any other){std::swap(_content, other._content);return *this;}templateclass TAny operator(const T val){Any(val).swap(*this); //这样写的好处Any(val)为临时对象交换完生命周期就到了就会调用自己的析构函数return *this;}Any operator(const Any other){Any(other).swap(*this);return *this;}templateclass TT* get() //返回子类对象保存数据的指针{if (typeid(T) ! _content-type()) //如果你要的类型和我保存的类型不匹配return NULL;return (((placeholderT*)_content)-_val);}
public:holder* _content;
};class Test
{
public:Test(){std::cout Test() std::endl;}Test(const Test t){std::cout Test() std::endl;}~Test(){std::cout ~Test() std::endl;}
};int main()
{Any a;{Test t;a t;}a 10;int *pa a.getint();std::cout *pa std::endl;a std::string(nihao);std::string *ps a.getstd::string();std::cout *ps std::endl;return 0;
}运行结果 有人就想既然有现成的为什么不用现成的当然你也可以用我们把any的思想介绍给大家并且也不难实现。 查C17文档可对照文档的示例进行理解 #include iostream
#include string
#include typeinfo
#include utility
#include anyint main()
{//官方any的使用方法主要还是要查文档std::any a;a 10;int* pi std::any_castint(a);std::cout *pi std::endl;a std::string(hello);std::string* ps std::any_caststd::string(a);std::cout *ps std::endl;return 0;
}运行结果 Buffer缓冲区设计思想 我们要开始写我们项目的代码了所以我们创建一个新的源代码目录source。 我们写源码考虑的是我们将高性能服务器开发完了我们给别人提供的时候怎么提供呢我们可以给别人提供编译完成的库然后把头文件给它。也可以是全部写到头文件里面去别人用的时候直接包含头文件就可以了这样就可以简单一点。而我们的方式就是这第二种方式这也是一种比较典型的用法单头文件的方式。 Buffer模块 提供的功能存储数据取出数据 实现思想 1。实现缓冲区得有一块内存空间采用vector char的数据结构。为什么使用vector char而不使用string呢string更多的表示是字符串的操作字符串的操作有一个缺陷就是遇到了’\0’就会停止因为网络传输什么样的数据都有可能传输就有可能存在这种数据像图片、视频这种二进制文件。 2。要素a.默认的空间大小b.当前的读取数据位置c.当前的写入数据位置 3。操作 1)写入数据当前写入位置指向哪里就从哪里开始写入如果后沿空间剩余空间不够了考虑前沿后沿空闲的空间是否足够a.足够将数据移动到起始位置。b.不够扩容从当前写位置开始扩容足够大小。数据一旦写入成功当前写位置就要向后偏移。 我们不可能说每次要存储数据的时候就要把Buffer中的数据全部挪到起始位置上去那这样效率就太低了。只有在后沿空间不够前沿空间后沿空间足够的时候才将所有的数据挪到前面去。前沿空间后沿空间不足够的时候不挪动直接扩容再存储。这样就可以保证挪动数据不那么频繁。 2)读取数据当前的读取位置指向哪里就从哪里开始读取前提是有数据可读。可读数据大小当前写入位置减去当前读取位置。 #include iostream
#include vector
#include cstdint
#include cassert
#include string
#include cstring
#include memory#define BUFFER_DEFAULT_SIZE 1024
class Buffer
{
private:std::vectorchar _buffer;uint64_t _read_idx; //相对读偏移量uint64_t _write_idx; //相对写偏移量
public:Buffer():_read_idx(0), _write_idx(0), _buffer(1024){}//获取起始地址char* Begin() const{return (char*)_buffer[0];}//获取当前写入起始地址char* WritePosition() const{return Begin() _write_idx;}//获取当前读取起始地址char* ReadPosition() const{return Begin() _read_idx;}//获取缓冲区末尾空闲空间大小 -- 写偏移之后的空闲空间uint64_t TailIdleSize() const{return _buffer.size() - _write_idx;}//获取缓冲区起始空闲空间大小 -- 读偏移之前的空闲空间uint64_t HeadIdleSize() const{return _read_idx;}//获取可读数据大小uint64_t ReadAbleSize() const{return _write_idx - _read_idx;}//将读偏移向后移动必须小于可读数据大小void MoveReadOffset(uint64_t len){assert(len ReadAbleSize());_read_idx len;}//将写偏移向后移动向后移动的大小必须小于前沿和后沿的空闲空间的大小void MoveWriteOffset(uint64_t len){assert(len HeadIdleSize() TailIdleSize());_write_idx len;}//确保可写空间足够(整体空闲空间够了就移动数据否则就扩容)void EnsureWriteSpace(uint64_t len){if (TailIdleSize() len) //如果末尾空闲空间大小足够直接返回return;//末尾空间不够则判断加上起始空闲位置的空闲空间大小是否足够if (len TailIdleSize() HeadIdleSize()){uint64_t rsz ReadAbleSize(); //把当前数据大小先保存起来//copy第一个参数要拷贝的起始地址。第二个参数要拷贝的末尾地址。第三个参数拷贝到某个地址上面std::copy(ReadPosition(), ReadPosition() rsz, Begin());_read_idx 0; //将读偏移归0_write_idx rsz; //将写偏移置为可读数据大小}else //总体空间不够扩容{_buffer.resize(_write_idx len);}}//写入数据void Write(const void* data, uint64_t len){//1。保证有足够空间 2。拷贝数据进去if (len 0) //防御性编程不要嫌弃多次判断return;EnsureWriteSpace(len);std::copy((char*)data, (char*)data len, WritePosition());}void WriteAndPush(const void* data, uint64_t len){Write(data, len);MoveWriteOffset(len);}void WriteString(const std::string data){Write((const void*)data.c_str(), data.size());}void WriteStringAndPush(const std::string data){WriteString(data);MoveWriteOffset(data.size());}void WriteBuffer(Buffer data){Write((const void*)data.ReadPosition(), data.ReadAbleSize());} void WriteBufferAndPush(Buffer data){WriteBuffer(data);MoveWriteOffset(data.ReadAbleSize());}//读取数据void Read(void* buf, uint64_t len){//要求读取的数据大小必须小于可读数据的大小assert(len ReadAbleSize());std::copy(ReadPosition(), ReadPosition() len, (char*)buf);}void ReadAndPop(void* buf, uint64_t len){Read(buf, len);MoveReadOffset(len);}std::string ReadAsString(uint64_t len){assert(len ReadAbleSize());std::string str;str.resize(len);Read(str[0], len);return str;}std::string ReadAsStringAndPop(uint64_t len){std::string str ReadAsString(len);MoveReadOffset(len);return str;}char* FindCRLF() //查找回车字符的地址{//查找某一个字节/字符char* res (char*)memchr(ReadPosition(), \n, ReadAbleSize());return res;}//获取一行数据std::string GetLine(){const char* pos FindCRLF();if (pos NULL)return ;return ReadAsString(pos - ReadPosition() 1); //包括将\n也读取进去 -- 方便后面对HTTP协议的操作}std::string GetLineAndPop(){std::string str GetLine();MoveReadOffset(str.size());return str;}//清空缓冲区void Clear(){//只需要将偏移量归0即可_read_idx 0;_write_idx 0;}
};测试1
#include ../source/server.hppint main()
{Buffer buf;std::string str hello!!;buf.WriteStringAndPush(str);std::string tmp;tmp buf.ReadAsStringAndPop(buf.ReadAbleSize());std::cout tmp std::endl;std::cout buf.ReadAbleSize() std::endl;return 0;
}运行结果 测试2
#include ../source/server.hppint main()
{Buffer buf;std::string str hello!!;buf.WriteStringAndPush(str);Buffer buf1;buf1.WriteBufferAndPush(buf);std::string tmp;tmp buf1.ReadAsStringAndPop(buf1.ReadAbleSize());std::cout tmp std::endl;std::cout buf.ReadAbleSize() std::endl;std::cout buf1.ReadAbleSize() std::endl;return 0;
}运行结果 测试3
#include ../source/server.hppint main()
{Buffer buf;for (int i 0; i 300; i){std::string str hello!! std::to_string(i) \n;buf.WriteStringAndPush(str);}while (buf.ReadAbleSize() 0){std::string line buf.GetLineAndPop();std::cout line;}return 0;
}运行结果 日志打印宏的编写 程序一旦出问题了最简单的排查流程就是在代码里面做打印观察当然gdb调试也可以打印有一个专业叫法叫做日志程序运行的过程中程序出错了日志打印的信息能快速的定位是哪一行出的问题。如果是cout打印的话当我们调试完不需要这些打印了我们还需要逐个的把他们找到并删除掉很麻烦。我可以通过日志设置一个打印等级。 打印等级分为普通信息打印调试信息打印错误信息打印。打印等级为普通信息则所有的信息都打印打印等级设置为调试等级就代表打印调试信息和错误信息。调试完后我们不需要删除代码只需要把等级设置为错误信息等级普通信息和调试信息都不再打印只打印错误信息 我们这个日志不实现的那么复杂只做一个宏在我们项目中够用即可。 #include stdio.h
#include iostream
#include string#define LOG(msg) fprintf(stdout, [%s:%d] %s\n, __FILE__, __LINE__, msg);int main()
{for (int i 0; i 8; i){LOG(hello world);}return 0;
}这样写只能打印字符串数据只有一个参数只有一个这样也不合适。我希望你能想printf一样的打印 #include stdio.h
#include iostream
#include string#define LOG(format, ...) fprintf(stdout, [%s:%d] format \n, __FILE__, __LINE__, __VA_ARGS__);int main()
{for (int i 0; i 8; i){LOG(hello world, %d, i 1);}return 0;
}运行结果 如果这样传就会报错因为这种方式代表后面没有不定参只有一个参数。所以C语言提供了一种方案 #include stdio.h
#include iostream
#include string#define LOG(format, ...) fprintf(stdout, [%s:%d] format \n, __FILE__, __LINE__, ##__VA_ARGS__);int main()
{for (int i 0; i 8; i){LOG(hello world);}return 0;
}运行结果 除此之外我们还要做的更完善一点带上一个时间有些时候还希望看到这个日志是什么时候打印的。我们后面项目需要做一些超时的测试都会用到这个时间。 将struct tm以固定的格式写到字符串s里面 #include stdio.h
#include iostream
#include string
#include ctime
#include unistd.h//技巧加上do while循环能应用于代码的各种情况
//宏里面不能有换行所以加上\转义换行代表后面没有换行
#define LOG(format, ...) \do \{ \time_t t time(NULL); \struct tm* lltime localtime(t); \char time[32]; \strftime(time, 31, %H:%M:%S, lltime); \fprintf(stdout, [%s %s:%d] format \n, time, __FILE__, __LINE__, ##__VA_ARGS__); \} while (0)int main()
{for (int i 0; i 8; i){sleep(1);LOG(hello world);}return 0;
}设置日志等级 #include stdio.h
#include iostream
#include string
#include ctime
#include unistd.h//技巧加上do while循环能应用于代码的各种情况
//宏里面不能有换行所以加上\转义换行代表后面没有换行
#define INF 0
#define DBG 1
#define ERR 2
#define LOG_LEVEL DBG
#define LOG(level, format, ...) \do \{ \if (LOG_LEVEL level) \break; \time_t t time(NULL); \struct tm* lltime localtime(t); \char time[32]; \strftime(time, 31, %H:%M:%S, lltime); \fprintf(stdout, [%p %s %s:%d] format \n,(void*)pthread_self(), time, __FILE__, __LINE__, ##__VA_ARGS__); \} while (0)
#define INF_LOG(format, ...) LOG(INF, format, ##__VA_ARGS__)// 使用不定参的...只能声明定义的时候用使用的时候用__VA_ARGS__
#define DBG_LOG(format, ...) LOG(DBG, format, ##__VA_ARGS__)
#define ERR_LOG(format, ...) LOG(ERR, format, ##__VA_ARGS__)
int main()
{for (int i 0; i 2; i){sleep(1);INF_LOG(hello world1);DBG_LOG(hello world2);ERR_LOG(hello world3);}return 0;
}运行结果 Socket套接字类设计思想 为了对socket的基本接口更加方便的使用对其进行一个封装封装成一个类出来通过这个类的接口能更简便的完成套接字的各种操作 #define MAX_LISTEN 1024
class Socket
{
private:int _sockfd;
public:Socket():_sockfd(-1){}Socket(int fd):_sockfd(fd){}~Socket(){Close();}int Fd(){return _sockfd;}//创建套接字bool Create(){int ret socket(AF_INET, SOCK_STREAM, 0);if (ret 0){ERR_LOG(CREATE SOCKET FAILED:%s, strerror(errno));return false;}_sockfd ret;return true;}//绑定地址信息bool Bind(const std::string ip, uint16_t port){struct sockaddr_in addr;addr.sin_addr.s_addr inet_addr(ip.c_str());addr.sin_family AF_INET;addr.sin_port htons(port);int ret bind(_sockfd, (const sockaddr*)addr, sizeof(addr));if (ret 0){ERR_LOG(BIND SOCKET FAILED:%s, strerror(errno));return false;}return true;}//开始监听bool Listen(int backlog MAX_LISTEN){int ret listen(_sockfd, backlog);if (ret 0){ERR_LOG(LISTEN SOCKET FAILED:%s, strerror(errno));return false;}return true;}//向服务器发起连接bool Connect(const std::string ip, uint16_t port){struct sockaddr_in addr;addr.sin_addr.s_addr inet_addr(ip.c_str());addr.sin_port htons(port);addr.sin_family AF_INET;int ret connect(_sockfd, (const sockaddr*)addr, sizeof(addr));if (ret 0){ERR_LOG(CONNECT SERVER FAILED:%s, strerror(errno));return false;}return true;}//获取新连接int Accept(){int newfd accept(_sockfd, NULL, NULL);if (newfd 0){ERR_LOG(ACCEPT SOCKET FAILED:%s, strerror(errno));return -1;}return newfd;}//接收数据ssize_t Recv(void* Buffer, size_t len, int flag 0){int n recv(_sockfd, Buffer, len, flag);if (n 0){//等于0的时候表示连接断开DBG_LOG(CONNECTION CLOSED);return -1;}if (n 0)//小于0的时候表示读出错了{if (errno EAGAIN || errno EINTR){//EAGAIN 表示当前socket的接收缓冲区中没有数据在非阻塞情况下才会出现这种错误//EINTR 表示当前socket的阻塞等待被信号打断了return 0; //表示这次没有接收到数据}ERR_LOG(SOCKET RECV FAILED);return -1;}return n;}//非阻塞读取ssize_t NonBlockRecv(void* buffer, size_t len){return Recv(buffer, len, MSG_DONTWAIT);}//发送数据 -- 外部可以根据实际发送的数据长度来决定下一步的处理ssize_t Send(const void* buf, size_t len, int flag 0){int n send(_sockfd, buf, len, flag);if (n 0){if (errno EAGAIN || errno EINTR){return 0;}ERR_LOG(SOCKET SEND FAILED);return -1;}return n;}//非阻塞发送数据ssize_t NonBlockSend(void* buf, size_t len){return send(_sockfd, buf, len, MSG_DONTWAIT);}//关闭套接字void Close(){if (_sockfd ! -1)close(_sockfd);_sockfd -1;}//创建一个服务器连接bool CreateServer(uint16_t port, const std::string ip 0.0.0.0, bool block_flag false){//创建套接字if (Create() false)return false;if (block_flag true)NonBlock();//设置端口复用ReuseAddress();//绑定地址if (Bind(ip, port) false)return false;//开始监听if (Listen() false)return false;return true;}//创建一个客户端连接bool CreateClient(uint16_t port, const std::string ip){//创建套接字if (Create() false)return false;//连接服务器if (Connect(ip, port) false)return false;return true;}/*一个连接绑定了地址和端口之后一旦主动关闭连接的一方最终会进入time_wait状态这时候套接字并不会立即被释放因此IP地址和端口依然被占用导致我们无法立即去使用它在服务器使用的时候崩溃了退出了会无法立即重启所以我们要开启地址重用。*///设置套接字选项 -- 开启端口复用void ReuseAddress(){int opt 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT | SO_REUSEADDR, opt, sizeof(opt));}/*我们在使用套接字去接收数据的时候一次性可能取不完数据就需要循环去接收缓冲区里的数据什么时候取完呢就是取到没有数据为止但是套接字默认是阻塞的没有数据的时候再去取就会被阻塞住程序就无法继续往下走了所以我们需要将套接字设置为非阻塞*///设置套接字阻塞属性 -- 设置为非阻塞void NonBlock(){int flag fcntl(_sockfd, F_GETFL, 0);fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);}
};测试
#include ../source/server.hppint main()
{Socket lst_sock;lst_sock.CreateServer(8085);while (1){int newfd lst_sock.Accept();if (newfd 0){continue;}DBG_LOG(ACCEPT NEW SOCKET SUCCESS);Socket cli_sock(newfd);char buf[4096] {0};int n cli_sock.Recv(buf, 4095);if (n 0){//说明读取出错cli_sock.Close();continue;}cli_sock.Send(buf, n);DBG_LOG(CLOSE CLIENT);cli_sock.Close();}lst_sock.Close();return 0;
}#include ../source/server.hppint main()
{Socket cli_socket;cli_socket.CreateClient(8085, 127.0.0.1);std::string str hello world!!!;cli_socket.Send(str.c_str(), str.size());char buf[4096] {0};cli_socket.Recv(buf, 4095);DBG_LOG(%s, buf);return 0;
}运行结果 Channel事件管理类设计思想 Channel模块是对一个文件描述符进行监控事件管理的模块。 意义1。用户对描述符的监控操作变得更加简单2。使用的时候流程更加清晰 之前我们画了一个流程图可以看出Channel模块只是Connection的一个子模块只是连接管理里面的事件管理模块事件触发了之后要对连接进行什么样的操作呢会调用Connection给他设置过的回调函数。这就是他们之间的关系。 EPOLL的事件前面的事件都在之前的文章中有讲解。 什么是EPOLLHUPEPOLLHUP一般与EPOLLERR同时触发一般在一个连接的文件描述符完全被关闭但你仍然去操作了这个文件描述符的时候会触发。主要处理全关闭状态。 什么是EPOLLRDHUP你在读取的时候对端连接断开触发的epoll事件会包含EPOLLIN | EPOLLRDHUP。主要是处理半关闭状态。 什么是EPOLLPRrecv的第三个参数可设置为优先带外数据。在之前文章中也有讲解。 事件说明EPOLLIN可读EPOLLOUT可写EPOLLEDHUP连接断开EPOLLPRI优先数据EPOLLERR错误EPOLLHUP挂断
class Channel
{
private:uint32_t _events; //当前需要监控的事件uint32_t _revents; //当前连接触发的事件using EventCallback std::functionvoid();/*只有我们的连接才知道一旦事件触发了该去怎么处理所以需要设置回调函数。当启动读事件监控就需要将channel挂到EventLoop上面进行事件监控当可读事件触发就会调用channel里设置的回调函数。*/EventCallback _read_callback; //可读事件被触发的回调函数EventCallback _write_callback; //可写事件被触发的回调函数EventCallback _error_callback; //错误事件被触发的回调函数EventCallback _close_callback; //连接断开事件被触发的回调函数EventCallback _event_callback; //任意事件被触发的回调函数int _fd;
public:Channel(int fd):_fd(fd), _events(0), _revents(0){}int Fd(){return _fd;}void SetREvents(uint32_t events){_revents events;}void SetReadCallback(const EventCallback cb){_read_callback cb;}void SetWriteCallback(const EventCallback cb){_write_callback cb;}void SetErrorCallback(const EventCallback cb){_error_callback cb;}void SetCloseCallback(const EventCallback cb){_close_callback cb;}void SetEventCallback(const EventCallback cb){_event_callback cb;}//当前是否监控了可读bool ReadAble(){return (_events EPOLLIN);}//当前是否监控了可写bool WriteAble() {return (_events EPOLLOUT);}//启动读事件监控void EnableRead(){_events | EPOLLIN;//后边会添加到EventLoop的事件监控中暂时不写因为EventLoop模块还没有写}//启动可写事件监控void EnableWrite(){_events | EPOLLOUT;//后边会添加到EventLoop的事件监控中暂时不写因为EventLoop模块还没有写}//关闭读事件监控void DisableRead(){_events ~EPOLLIN;//后边会修改在EventLoop的事件监控中暂时不写因为EventLoop模块还没有写}//关闭写事件监控void DisableWrite(){_events ~EPOLLOUT;//后边会修改在EventLoop的事件监控中暂时不写因为EventLoop模块还没有写}//关闭所有事件监控void DisableAll(){_events 0;}/*关闭事件监控只是不去关心这个事件了但还是在EventLoop中。移除事件监控才是真正的将它从EventLoop中移除*///移除监控void Remove(){//后边会调用EventLoop接口来移除监控暂时不写因为EventLoop模块还没有写}/*EventLoop不用 你触发了什么事件我就去调用对应的回调函数EventLoop不用管。EventLoop只管你触发了事件我就调用你的HandleEvent你自己来决定什么样的事件该如何处理这是最能体现Channel模块作用的功能之一*/void HandleEvent(){if ((_revents EPOLLIN) || (_revents EPOLLRDHUP) || (_revents EPOLLPRI)){if (_read_callback)_read_callback();}if (_revents EPOLLOUT){if (_write_callback)_write_callback();}if (_revents EPOLLERR){if (_error_callback)_error_callback();}if (_revents EPOLLHUP){if (_close_callback)_close_callback();}if (_event_callback)_event_callback();}
};Poller描述符监控类设计思想 Channel最终是要被添加到EventLoop的事件监控中的如何添加呢EventLoop会通过我们的poller所提供的接口添加到Poller被其所管理所以Poller不仅仅管理的是描述符还有对应的channel。 Poller模块描述符IO事件监控的模块。 功能 1。添加/修改描述符的事件监控不存在则添加存在则修改 2。移除描述符的事件监控 逻辑流程 1。Poller对描述符进行监控通过Channel才能知道描述符需要监控什么事件 2。当描述符就绪了Poller通过描述符在hash表中找到对应的Channel得到了Channel才能知道什么事件如何处理并返回就绪描述符对应的Channel。外界才能通过这个Channel才能知道你就绪了什么事件如何去处理它 #define MAX_EPOLLEVENTS 1024
class Poller
{
private:int _epfd;struct epoll_event _evs[MAX_EPOLLEVENTS];std::unordered_mapint, Channel* _channels; //这个和epoll里面的所有文件描述符是强绑定的
private://对epoll的直接操作void Update(Channel* channel, int op){int fd channel-Fd();struct epoll_event ev;ev.data.fd fd;ev.events channel-Events();int ret epoll_ctl(_epfd, op, fd, ev);if (ret 0){ERR_LOG(EPOLLCTL FAILED);}return;}//判断一个Channel是否已经添加了事件监控bool HasChannel(Channel* channel){auto pos _channels.find(channel-Fd());if (pos _channels.end()){return false;}return true;}
public:Poller(){_epfd epoll_create(MAX_EPOLLEVENTS);if (_epfd 0){ERR_LOG(EPOLL CREATE TAILED!);abort(); //退出程序}}//添加或修改监控事件void UpdateEvent(Channel* channel){bool ret HasChannel(channel);if (ret false){_channels.insert(std::make_pair(channel-Fd(), channel));return Update(channel, EPOLL_CTL_ADD);}return Update(channel, EPOLL_CTL_MOD);}//移除监控void RemoveEvent(Channel* channel){bool ret HasChannel(channel);if (ret false){return;}_channels.erase(channel-Fd());return Update(channel, EPOLL_CTL_DEL);}//开始监控返回活跃连接void Poll(std::vectorChannel** active){int n epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1);if (n 0){if (errno EINTR) //如果被信号打断了{return;}ERR_LOG(EPOLL WAIT FAILED:%s, strerror(errno));abort();}for (int i 0; i n; i){auto it _channels.find(_evs[i].data.fd);assert(it ! _channels.end());it-second-SetREvents(_evs[i].events);active-push_back(it-second);}}
};Poller模块与Channel模块整合与测试 虽然Channel模块是和EventLoop模块进行整合但是我们可以先用Poller模块进行一个基本测试。实际操作的时候不是Poller是依赖EventLoopPoller只是EventLoop的子模块而已 更新Poller模块和Channel模块
class Poller;class Channel
{
private:Poller* _poller;uint32_t _events; //当前需要监控的事件uint32_t _revents; //当前连接触发的事件using EventCallback std::functionvoid();/*只有我们的连接才知道一旦事件触发了该去怎么处理所以需要设置回调函数。当启动读事件监控就需要将channel挂到EventLoop上面进行事件监控当可读事件触发就会调用channel里设置的回调函数。*/EventCallback _read_callback; //可读事件被触发的回调函数EventCallback _write_callback; //可写事件被触发的回调函数EventCallback _error_callback; //错误事件被触发的回调函数EventCallback _close_callback; //连接断开事件被触发的回调函数EventCallback _event_callback; //任意事件被触发的回调函数int _fd;
public:Channel(Poller* poller, int fd):_poller(poller), _fd(fd), _events(0), _revents(0){}int Fd(){return _fd;}uint32_t Events(){return _events;}void SetREvents(uint32_t events){_revents events;}void SetReadCallback(const EventCallback cb){_read_callback cb;}void SetWriteCallback(const EventCallback cb){_write_callback cb;}void SetErrorCallback(const EventCallback cb){_error_callback cb;}void SetCloseCallback(const EventCallback cb){_close_callback cb;}void SetEventCallback(const EventCallback cb){_event_callback cb;}//当前是否监控了可读bool ReadAble(){return (_events EPOLLIN);}//当前是否监控了可写bool WriteAble() {return (_events EPOLLOUT);}//启动读事件监控void EnableRead(){_events | EPOLLIN;Update();//因为EventLoop还没有实现为了测试Channel和Poller先暂时直接用Poller里的接口进行操作}//启动可写事件监控void EnableWrite(){_events | EPOLLOUT;Update();//因为EventLoop还没有实现为了测试Channel和Poller先暂时直接用Poller里的接口进行操作}//关闭读事件监控void DisableRead(){_events ~EPOLLIN;Update();//因为EventLoop还没有实现为了测试Channel和Poller先暂时直接用Poller里的接口进行操作}//关闭写事件监控void DisableWrite(){_events ~EPOLLOUT;Update();//因为EventLoop还没有实现为了测试Channel和Poller先暂时直接用Poller里的接口进行操作}//关闭所有事件监控void DisableAll(){_events 0;}/*关闭事件监控只是不去关心这个事件了但还是在EventLoop中。移除事件监控才是真正的将它从EventLoop中移除*///移除监控void Remove();//因为里面用到了Poller的成员所以需要在Poller代码的下面去实现该函数void Update();/*EventLoop不用 你触发了什么事件我就去调用对应的回调函数EventLoop不用管。EventLoop只管你触发了事件我就调用你的HandleEvent你自己来决定什么样的事件该如何处理这是最能体现Channel模块作用的功能之一*/void HandleEvent(){if ((_revents EPOLLIN) || (_revents EPOLLRDHUP) || (_revents EPOLLPRI)){if (_read_callback)_read_callback();}if (_revents EPOLLOUT){if (_write_callback)_write_callback();}if (_revents EPOLLERR){if (_error_callback)_error_callback();}if (_revents EPOLLHUP){if (_close_callback)_close_callback();}if (_event_callback)_event_callback();}};#define MAX_EPOLLEVENTS 1024
class Poller
{
private:int _epfd;struct epoll_event _evs[MAX_EPOLLEVENTS];std::unordered_mapint, Channel* _channels; //这个和epoll里面的所有文件描述符是强绑定的
private://对epoll的直接操作void Update(Channel* channel, int op){int fd channel-Fd();struct epoll_event ev;ev.data.fd fd;ev.events channel-Events();int ret epoll_ctl(_epfd, op, fd, ev);if (ret 0){ERR_LOG(EPOLLCTL FAILED);}return;}//判断一个Channel是否已经添加了事件监控bool HasChannel(Channel* channel){auto pos _channels.find(channel-Fd());if (pos _channels.end()){return false;}return true;}
public:Poller(){_epfd epoll_create(MAX_EPOLLEVENTS);if (_epfd 0){ERR_LOG(EPOLL CREATE TAILED!);abort(); //退出程序}}//添加或修改监控事件void UpdateEvent(Channel* channel){bool ret HasChannel(channel);if (ret false){_channels.insert(std::make_pair(channel-Fd(), channel));return Update(channel, EPOLL_CTL_ADD);}return Update(channel, EPOLL_CTL_MOD);}//移除监控void RemoveEvent(Channel* channel){bool ret HasChannel(channel);if (ret false){return;}_channels.erase(channel-Fd());return Update(channel, EPOLL_CTL_DEL);}//开始监控返回活跃连接void Poll(std::vectorChannel** active){int n epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1);if (n 0){if (errno EINTR) //如果被信号打断了{return;}ERR_LOG(EPOLL WAIT FAILED:%s, strerror(errno));abort();}for (int i 0; i n; i){auto it _channels.find(_evs[i].data.fd);assert(it ! _channels.end());it-second-SetREvents(_evs[i].events);active-push_back(it-second);}}
};void Channel::Remove()//因为EventLoop还没有实现为了测试Channel和Poller先暂时直接用Poller里的接口进行操作
{return _poller-RemoveEvent(this);
}
void Channel::Update()//因为EventLoop还没有实现为了测试Channel和Poller先暂时直接用Poller里的接口进行操作
{return _poller-UpdateEvent(this);
}测试
#include ../source/server.hppvoid HandleClose(Channel* channel)
{std::cout close: channel-Fd() std::endl;channel-Remove(); //移除监控delete channel; //这里直接进行delete是不合理的因为接下来其他地方可能还要使用这个channel
}
void HandleRead(Channel* channel)
{int fd channel-Fd();char buf[1024] {0};int ret recv(fd, buf, 1023, 0);if (ret 0) //如果对端关闭或者读取出错{return HandleClose(channel); //关闭释放}std::cout buf std::endl;channel-EnableWrite(); //启动可写事件
}
void HandleWrite(Channel* channel)
{int fd channel-Fd();const char* data 天气真不错;int ret send(fd, data, strlen(data), 0);if (ret 0){return HandleClose(channel); //关闭释放}channel-DisableWrite(); //关闭写监控
}
void HandleError(Channel* channel)
{return HandleClose(channel); //关闭释放
}
void HandleEvent(Channel* channel)
{std::cout 有了一个事件 std::endl;
}void Acceptor(Poller* poller, Channel* lst_channel)
{int fd lst_channel-Fd();int newfd accept(fd, NULL, NULL);if (newfd 0) return;Channel* channel new Channel(poller, newfd);//对新连接的channel进行设置设置的是事件到来了该如何处理channel-SetReadCallback(std::bind(HandleRead, channel)); //为通信套接字设置可读事件的回调函数channel-SetWriteCallback(std::bind(HandleWrite, channel)); //可写事件的回调函数channel-SetCloseCallback(std::bind(HandleClose, channel)); //关闭事件的回调函数channel-SetErrorCallback(std::bind(HandleError, channel)); //错误事件的回调函数channel-SetEventCallback(std::bind(HandleEvent, channel)); //任意事件的回调函数channel-EnableRead(); //启动可读事件
}int main()
{Poller poller;Socket lst_sock;lst_sock.CreateServer(8085);//为监听套接字创建一个Channel进行事件的管理以及事件的处理Channel channel(poller, lst_sock.Fd());channel.SetReadCallback(std::bind(Acceptor, poller, channel));channel.EnableRead(); //启动可读事件监控while (1){std::vectorChannel* actives;poller.Poll(actives);for (auto a : actives){a-HandleEvent();}}lst_sock.Close();return 0;
}#include ../source/server.hppint main()
{Socket cli_socket;cli_socket.CreateClient(8085, 127.0.0.1);while (1){std::string str hello world!!!;cli_socket.Send(str.c_str(), str.size());char buf[4096] {0};cli_socket.Recv(buf, 4095);DBG_LOG(%s, buf);sleep(1);}return 0;
}运行结果 问题一为什么打印了两次“有了一个事件”因为读事件触发了一次写事件又触发了一次。 问题二为什么客户端关闭连接之后服务器直接崩溃了 因为服务器在读取的时候客户端退出了会触发EPOLLIN事件和EPOLLRDHUP和EPOLLERR事件。 在处理读事件的时候如果对端关闭这里会调用一次HandleClose 处理错误事件的时候又调用了一次HandleClose 所以相当于是double close在某些平台这可能导致程序崩溃但是在这里还有一个真正崩溃的原因close后调用又调用了_event_callback函数对已经close的文件描述符进行了操作。 close直接关闭了文件描述符这是不合理的因为可能文件描述符还有其他事件要处理。 后面会解决这个问题解决方法用智能指针对其管理释放连接的时候是对其智能指针进行释放如果其他地方还会使用则让它使用用完了自然会释放也不会造成崩溃。 画一幅图来理解这个思想 最后poller监控的哪个事件触发了就会调用channel对应的回调函数 EventLoop模块中eventfd的认识 功能创建一个文件描述符用于事件通知。 参数 initval计数初值 flagsEFD_CLOEXEC - 禁止进程复制。EFD_NONBLOCK - 启动非阻塞属性 返回值返回一个文件描述符用于操作。 eventfd也是通过read/write/close进行操作的。read/write进行IO的时候数据只能是一个8字节数据。 我们不是也可以用信号来达到事件通知的效果吗信号是针对进程进行事件通知的而具体的信号被进程中的哪一个线程收到并处理是不一定的。 eventfd本质就是在内核里面管理一个计数器创建eventfd就是在内核里创建一个计数器结果。每当向eventfd中写入一个数值就表示事件一次通知。用read进行数据读取读取到的数据就是通知的次数。 用处在EventLoop模块中实现线程间的事件通知功能。 示例
#include iostream
#include sys/eventfd.h
#include stdint.h
#include cstring
#include unistd.hint main()
{int efd eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if (efd 0){perror(eventfd failed);return -1;}uint64_t val 1;write(efd, val, sizeof(val)); write(efd, val, sizeof(val));write(efd, val, sizeof(val));uint64_t res1 0;read(efd, res1, sizeof(res1));std::cout res1 std::endl;uint64_t res2 0;read(efd, res2, sizeof(res2));std::cout res2 std::endl;close(efd);return 0;
}运行结果 每加一次就代表我给你了一次事件通知如果efd有事件了你就去读你就知道我给你通知了几次事件。 EventLoop模块设计思想 EventLoop进行事件监控以及事件处理的模块。 关键点这个模块与线程是一一对应的。 监控了一个连接这个连接就绪了要进行事件处理。但是如果这个连接的描述符在多个线程中被操作处理就会存在线程安全问题。因此我们需要将一个连接的事件监控、连接事件处理以及其他操作都放在同一个线程中进行。为什么不给连接操作都加一把锁如果有一万个连接就需要创建一万把锁这种方法是很不划算的。 如何保证一个连接的所有操作都在EventLoop对应的线程中 解决方案给EventLoop模块中添加一个任务队列对连接的所有操作都进行封装将对连接的操作并不直接执行而是当做任务添加到任务队列中。 EventLoop处理流程 1。在线程中对描述符进行事件监控 2。有描述符就绪则对描述符直接进行事件处理此时保证了处理回调函数中的操作都在自己对应的线程中 3。所有的就绪事件处理完了这时候再去将任务队列中的所有任务一一执行。 这样能够保证对于连接的所有操作都是在一个线程中进行的不涉及线程安全问题但多线程可能对任务队列同时进行操作所以任务队列需要加上一把锁。 注意因为一个线程有可能阻塞在IO事件的监控一直没有事件就绪导致执行流流程阻塞这时候任务队列中的任务得不到执行因此得有一个事件通知的eventfd能够唤醒事件监控的阻塞。 更新Channel模块实现EventLoop模块
class EventLoop;
class Channel
{
private:EventLoop* _loop;uint32_t _events; //当前需要监控的事件uint32_t _revents; //当前连接触发的事件using EventCallback std::functionvoid();/*只有我们的连接才知道一旦事件触发了该去怎么处理所以需要设置回调函数。当启动读事件监控就需要将channel挂到EventLoop上面进行事件监控当可读事件触发就会调用channel里设置的回调函数。*/EventCallback _read_callback; //可读事件被触发的回调函数EventCallback _write_callback; //可写事件被触发的回调函数EventCallback _error_callback; //错误事件被触发的回调函数EventCallback _close_callback; //连接断开事件被触发的回调函数EventCallback _event_callback; //任意事件被触发的回调函数int _fd;
public:Channel(EventLoop* loop, int fd):_loop(loop), _fd(fd), _events(0), _revents(0){}int Fd(){return _fd;}uint32_t Events(){return _events;}void SetREvents(uint32_t events){_revents events;}void SetReadCallback(const EventCallback cb){_read_callback cb;}void SetWriteCallback(const EventCallback cb){_write_callback cb;}void SetErrorCallback(const EventCallback cb){_error_callback cb;}void SetCloseCallback(const EventCallback cb){_close_callback cb;}void SetEventCallback(const EventCallback cb){_event_callback cb;}//当前是否监控了可读bool ReadAble(){return (_events EPOLLIN);}//当前是否监控了可写bool WriteAble() {return (_events EPOLLOUT);}//启动读事件监控void EnableRead(){_events | EPOLLIN;Update();}//启动可写事件监控void EnableWrite(){_events | EPOLLOUT;Update();}//关闭读事件监控void DisableRead(){_events ~EPOLLIN;Update();}//关闭写事件监控void DisableWrite(){_events ~EPOLLOUT;Update();}//关闭所有事件监控void DisableAll(){_events 0;}/*关闭事件监控只是不去关心这个事件了但还是在EventLoop中。移除事件监控才是真正的将它从EventLoop中移除*///移除监控void Remove();void Update();/*EventLoop不用 你触发了什么事件我就去调用对应的回调函数EventLoop不用管。EventLoop只管你触发了事件我就调用你的HandleEvent你自己来决定什么样的事件该如何处理这是最能体现Channel模块作用的功能之一*/void HandleEvent(){if ((_revents EPOLLIN) || (_revents EPOLLRDHUP) || (_revents EPOLLPRI)){if (_event_callback)_event_callback();if (_read_callback)_read_callback();}//有可能会释放连接的操作事件一次只处理一个if (_revents EPOLLOUT){if (_event_callback)_event_callback();if (_write_callback)_write_callback();}else if (_revents EPOLLERR){if (_event_callback)_event_callback();if (_error_callback)_error_callback();}else if (_revents EPOLLHUP){if (_event_callback)_event_callback();if (_close_callback)_close_callback();}}
};class EventLoop
{
private:std::thread::id _thread_id; //线程IDint _event_fd; //唤醒阻塞的IO事件监控std::unique_ptrChannel _event_channel; //为了能更好的管理_event_fd为其创建一个channelPoller _poller; //进行所有的描述符的事件监控using Functor std::functionvoid();std::vectorFunctor _tasks; //任务池std::mutex _mutex; //保证多线程对任务池进行操作的线程安全
public://执行任务池中的所有任务void RunAllTask(){std::vectorFunctor functor;{std::unique_lockstd::mutex lock(_mutex);functor.swap(_tasks);}for (auto f : functor){f();}}static int CreateEventFd(){int efd eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if (efd 0){perror(eventfd failed);abort();}return efd;}void ReadEvent(){uint64_t res 0;int ret read(_event_fd, res, sizeof(res));if (ret 0){//EINTR -- 被信号打断 EAGAIN -- 暂时无数据可读非阻塞时才会触发if (errno EINTR || errno EAGAIN){return;}ERR_LOG(READ EVENTED FAILED);abort();}}void WeakUpEventFd(){uint64_t val 1;int ret write(_event_fd, val, sizeof(val));if (ret 0){if (errno EINTR){return;}ERR_LOG(WRITE EVENTFD FAILED);abort();}}
public:EventLoop():_thread_id(std::this_thread::get_id()) //获取的是当前实例化该EventLoop线程的id进行线程绑定,_event_fd(CreateEventFd()),_event_channel(new Channel(this, _event_fd)){//给eventfd添加可读事件回调读取eventfd事件通知次数_event_channel-SetReadCallback(std::bind(EventLoop::ReadEvent, this));//启动eventfd的读事件监控_event_channel-EnableRead();}//判断将要执行的任务是否处于当前线程中如果是则执行不是则压入队列void RunInLoop(const Functor cb){if (IsInLoop()){return cb();}return QueueInLoop(cb);}//将操作压入任务池void QueueInLoop(const Functor cb){{std::unique_lockstd::mutex lock(_mutex);_tasks.push_back(cb);}//其他线程把任务投入到你的EventLoop的任务池里面了需要执行任务池里的这个任务//所以需要唤醒这个线程因为这个线程可能在等待事件就绪的阻塞状态WeakUpEventFd();}//用于判断当前线程是否是EventLoop对应的线程bool IsInLoop(){return _thread_id std::this_thread::get_id();}//添加/修改描述符的事件监控void UpdateEvent(Channel* channel){_poller.UpdateEvent(channel);}//移除描述符的监控void RemoveEvent(Channel* channel){_poller.RemoveEvent(channel);}void Start(){while (1){//事件监控std::vectorChannel* actives;_poller.Poll(actives);//就绪事件处理for (auto channel : actives){channel-HandleEvent();}//执行任务池中的任务RunAllTask();}}
};void Channel::Remove()
{return _loop-RemoveEvent(this);
}
void Channel::Update()
{return _loop-UpdateEvent(this);
}测试
#include ../source/server.hppvoid HandleClose(Channel* channel)
{std::cout close: channel-Fd() std::endl;channel-Remove(); //移除监控delete channel;
}void HandleRead(Channel* channel)
{int fd channel-Fd();char buf[1024] {0};int ret recv(fd, buf, 1023, 0);if (ret 0) //如果对端关闭或者读取出错{HandleClose(channel); //关闭释放return;}std::cout buf std::endl;channel-EnableWrite(); //启动可写事件
}void HandleWrite(Channel* channel)
{int fd channel-Fd();const char* data 天气真不错;int ret send(fd, data, strlen(data), 0);if (ret 0){return HandleClose(channel); //关闭释放}channel-DisableWrite(); //关闭写监控
}
void HandleError(Channel* channel)
{return HandleClose(channel); //关闭释放
}
void HandleEvent(Channel* channel)
{std::cout 有了一个事件 std::endl;
}void Acceptor(EventLoop* loop, Channel* lst_channel)
{int fd lst_channel-Fd();int newfd accept(fd, NULL, NULL);if (newfd 0) return;Channel* channel new Channel(loop, newfd);//对新连接的channel进行设置设置的是事件到来了该如何处理channel-SetReadCallback(std::bind(HandleRead, channel)); //为通信套接字设置可读事件的回调函数channel-SetWriteCallback(std::bind(HandleWrite, channel)); //可写事件的回调函数channel-SetCloseCallback(std::bind(HandleClose, channel)); //关闭事件的回调函数channel-SetErrorCallback(std::bind(HandleError, channel)); //错误事件的回调函数channel-SetEventCallback(std::bind(HandleEvent, channel)); //任意事件的回调函数channel-EnableRead(); //启动可读事件
}int main()
{EventLoop loop;Socket lst_sock;lst_sock.CreateServer(8085);//为监听套接字创建一个Channel进行事件的管理以及事件的处理Channel channel(loop, lst_sock.Fd());channel.SetReadCallback(std::bind(Acceptor, loop, channel));channel.EnableRead(); //启动可读事件监控while (1){std::vectorChannel* actives;loop.Start();}lst_sock.Close();return 0;
}#include ../source/server.hppint main()
{Socket cli_socket;cli_socket.CreateClient(8085, 127.0.0.1);while (1){std::string str hello world!!!;cli_socket.Send(str.c_str(), str.size());char buf[4096] {0};cli_socket.Recv(buf, 4095);DBG_LOG(%s, buf);sleep(1);}return 0;
}运行结果 我们将EventLoop模块进行了整合测试。客户端退出导致服务器崩溃之后再来解决。 EventLoop与TimerWheel定时器模块整合 在之前时间轮的测试代码中我们是硬编码每秒钟定时执行一次RunTimerTask函数 我们要怎么做到每秒钟定时自动执行一次呢我们可以通过timerfd每秒钟触发一次可读事件则TimerWheel可以实现每秒钟执行一次RunTimerTask。所以我们需要将我们的timerfd和timewheel整合到一起来实现一个完整的定时器。 注意点定时器对外的接口也要考虑线程安全问题定时器信息的操作有可能在多线程中进行(如主线程想添加给所有的连接添加一个定时任务)如果不想加锁那就把对应的所有操作都放到一个线程中进行。 实现TimerWheel模块和更新EventLoop模块
using TaskFunc std::functionvoid();
using ReleaseFunc std::functionvoid();
class TimerTask
{
private:uint64_t _id; //定时器任务对象ID 定时任务必须得找得着一个程序里定时任务可能有很多uint32_t _timeout; //定时任务的超时时间TaskFunc _task_cb; //定时器要执行的任务//用于删除TimerWheel中保存的定时器任务对象信息定时任务释放的时候也要清理TimerWheel中保存的定时器对象信息//为什么将这个_release设置到TimerTask里面呢不在TimerWheel层管理//因为这个TimerWheel不知道是否某个定时任务真的释放了而TimerTask是最清楚的自己真的释放了就会调用析构函数ReleaseFunc _release;bool _canceled; //false - 代表没有被取消true - 代表取消了
public:TimerTask(uint64_t id, uint32_t timeout, const TaskFunc cb):_id(id), _timeout(timeout), _task_cb(cb), _canceled(false){}~TimerTask(){if (_canceled false) //如果定时任务没有被取消_task_cb();_release();}void SetRelease(const ReleaseFunc cb){_release cb;}uint32_t DelayTime(){return _timeout;}void Cancel(){_canceled true;}
};class TimerWheel
{
private:using WeakTask std::weak_ptrTimerTask;using PtrTask std::shared_ptrTimerTask;int _tick; //当前的秒针走到哪里释放哪里就相当于执行哪里的任务int _capacity; //表盘最大数量 -- 也是最大能设置的延时时间std::vectorstd::vectorPtrTask _wheel;std::unordered_mapuint64_t, WeakTask _timers; //对所有的定时任务进行管理EventLoop* _loop; //timerwheel所关联的EventLoopint _timerfd; //定时器描述符std::unique_ptrChannel _timer_channel;
public:void RemoveTimer(uint64_t id){auto pos _timers.find(id);if (pos ! _timers.end()){_timers.erase(pos);}}static int CreateTimerFd(){int timerfd timerfd_create(CLOCK_MONOTONIC, 0);if (timerfd 0){ERR_LOG(TIMERFD CREATE FAILED);abort();}struct itimerspec itime;itime.it_value.tv_sec 1;itime.it_value.tv_nsec 0;itime.it_interval.tv_sec 1;itime.it_interval.tv_nsec 0;timerfd_settime(timerfd, 0, itime, NULL);return timerfd;}void ReadTimefd(){uint64_t times;int ret read(_timerfd, times, 8);if (ret 0){ERR_LOG(READ TIMEFD FAILED);abort();}}void RunTimerTask(){_tick (_tick 1) % _capacity;_wheel[_tick].clear(); //情况指定位置的数组就会把数组中保存的所有管理定时器对象的shared_ptr释放掉}//这个函数每秒钟会被执行一次相当于秒针向后走了一步void OnTime(){ReadTimefd();RunTimerTask();}void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc cb){//1.构建定时任务PtrTask pt(new TimerTask(id, delay, cb));pt-SetRelease(std::bind(TimerWheel::RemoveTimer, this, id));//2.将定时任务加入到_wheel中int i (_tick delay) % _capacity;_wheel[i].push_back(pt);//3.加入到时间轮的_timers里面// std::unordered_mapuint64_t, WeakTask::iterator pos _timers.find(id);auto pos _timers.find(id);if (pos _timers.end()){_timers.insert({id, pt});}}void TimerRefreshInLoop(uint64_t id) //刷新/延迟定时任务{//通过id找到对应的定时任务auto pos _timers.find(id);if (pos _timers.end()){//如果没找到定时任务则没办法更新return;}//获取到对应定时任务的shared_ptr并构建一个新的智能指针对应的计数加1PtrTask pt pos-second.lock();//将对应的pt加入到_wheel中int delay pt-DelayTime();int i (_tick delay) % _capacity;_wheel[i].push_back(pt);}void TimerCancelInLoop(uint64_t id){auto pos _timers.find(id);if (pos _timers.end()){//没找到定时任务没法刷新没法延时return;}PtrTask pt pos-second.lock();if (pt) //如果自己已经销毁则可能为空pt-Cancel();}
public:TimerWheel(EventLoop* loop):_capacity(60), _tick(0), _wheel(_capacity), _loop(loop), _timerfd(CreateTimerFd()), _timer_channel(new Channel(loop, _timerfd)){_timer_channel-SetReadCallback(std::bind(TimerWheel::OnTime, this));_timer_channel-EnableRead();}//定时器中有个_timers成员定时器信息的操作有可能在多线程中进行(如主线程想添加给所有的连接添加一个定时任务)因此需要考虑线程安全问题//如果不想加锁那就把对应定期的所有操作都放到一个线程中进行void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc cb); //因为用到了EventLoop的_loop成员所以要在EventLoop代码后面去实现void TimerRefresh(uint64_t id);void TimerCancel(uint64_t id);//这个接口存在线程安全问题--这个接口不能被外界使用者调用只能在模块内在对应的EventLoop线程内执行bool HasTimer(uint64_t id){auto it _timers.find(id);if (it _timers.end()){return false;}return true;}
};class EventLoop
{
private:std::thread::id _thread_id; //线程IDint _event_fd; //唤醒阻塞的IO事件监控std::unique_ptrChannel _event_channel; //为了能更好的管理_event_fd为其创建一个channelPoller _poller; //进行所有的描述符的事件监控using Functor std::functionvoid();std::vectorFunctor _tasks; //任务池std::mutex _mutex; //保证多线程对任务池进行操作的线程安全TimerWheel _timer_wheel;
public://执行任务池中的所有任务void RunAllTask(){std::vectorFunctor functor;{std::unique_lockstd::mutex lock(_mutex);functor.swap(_tasks);}for (auto f : functor){f();}}static int CreateEventFd(){int efd eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if (efd 0){perror(eventfd failed);abort();}return efd;}void ReadEvent(){uint64_t res 0;int ret read(_event_fd, res, sizeof(res));if (ret 0){//EINTR -- 被信号打断 EAGAIN -- 暂时无数据可读非阻塞时才会触发if (errno EINTR || errno EAGAIN){return;}ERR_LOG(READ EVENTED FAILED);abort();}}void WeakUpEventFd(){uint64_t val 1;int ret write(_event_fd, val, sizeof(val));if (ret 0){if (errno EINTR){return;}ERR_LOG(WRITE EVENTFD FAILED);abort();}}
public:EventLoop():_thread_id(std::this_thread::get_id()) //获取的是当前实例化该EventLoop线程的id进行线程绑定,_event_fd(CreateEventFd()),_event_channel(new Channel(this, _event_fd)),_timer_wheel(this){//给eventfd添加可读事件回调读取eventfd事件通知次数_event_channel-SetReadCallback(std::bind(EventLoop::ReadEvent, this));//启动eventfd的读事件监控_event_channel-EnableRead();}//判断将要执行的任务是否处于当前线程中如果是则执行不是则压入队列void RunInLoop(const Functor cb){if (IsInLoop()){return cb();}return QueueInLoop(cb);}//将操作压入任务池void QueueInLoop(const Functor cb){{std::unique_lockstd::mutex lock(_mutex);_tasks.push_back(cb);}//其他线程把任务投入到你的EventLoop的任务池里面了需要执行任务池里的这个任务//所以需要唤醒这个线程因为这个线程可能在等待事件就绪的阻塞状态WeakUpEventFd();}//用于判断当前线程是否是EventLoop对应的线程bool IsInLoop(){return _thread_id std::this_thread::get_id();}//添加/修改描述符的事件监控void UpdateEvent(Channel* channel){_poller.UpdateEvent(channel);}//移除描述符的监控void RemoveEvent(Channel* channel){_poller.RemoveEvent(channel);}void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc cb){return _timer_wheel.TimerAdd(id, delay, cb);}void TimerRefresh(uint64_t id){return _timer_wheel.TimerRefresh(id);}void TiemrCancel(uint64_t id){return _timer_wheel.TimerCancel(id);}bool HasTimer(uint64_t id){return _timer_wheel.HasTimer(id);}void Start(){while (1){//事件监控std::vectorChannel* actives;_poller.Poll(actives);//就绪事件处理for (auto channel : actives){channel-HandleEvent();}//执行任务池中的任务RunAllTask();}}
};void TimerWheel::TimerAdd(uint64_t id, uint32_t delay, const TaskFunc cb)
{_loop-RunInLoop(std::bind(TimerWheel::TimerAddInLoop, this, id, delay, cb));
}
void TimerWheel::TimerRefresh(uint64_t id)
{_loop-RunInLoop(std::bind(TimerWheel::TimerRefreshInLoop, this, id));
}
void TimerWheel::TimerCancel(uint64_t id)
{_loop-RunInLoop(std::bind(TimerWheel::TimerCancelInLoop, this, id));
}测试
#include ../source/server.hppvoid HandleClose(Channel* channel)
{DBG_LOG(close: %d, channel-Fd());channel-Remove(); //移除监控delete channel;
}void HandleRead(Channel* channel)
{int fd channel-Fd();char buf[1024] {0};int ret recv(fd, buf, 1023, 0);if (ret 0) //如果对端关闭或者读取出错{HandleClose(channel); //关闭释放return;}DBG_LOG(%s, buf);channel-EnableWrite(); //启动可写事件
}void HandleWrite(Channel* channel)
{int fd channel-Fd();const char* data 天气真不错;int ret send(fd, data, strlen(data), 0);if (ret 0){return HandleClose(channel); //关闭释放}channel-DisableWrite(); //关闭写监控
}
void HandleError(Channel* channel)
{return HandleClose(channel); //关闭释放
}
void HandleEvent(EventLoop* loop, Channel* channel, uint64_t timerid)
{loop-TimerRefresh(timerid);
}void Acceptor(EventLoop* loop, Channel* lst_channel)
{int fd lst_channel-Fd();int newfd accept(fd, NULL, NULL);if (newfd 0) return;int timerid rand() % 10000;Channel* channel new Channel(loop, newfd);//对新连接的channel进行设置设置的是事件到来了该如何处理channel-SetReadCallback(std::bind(HandleRead, channel)); //为通信套接字设置可读事件的回调函数channel-SetWriteCallback(std::bind(HandleWrite, channel)); //可写事件的回调函数channel-SetCloseCallback(std::bind(HandleClose, channel)); //关闭事件的回调函数channel-SetErrorCallback(std::bind(HandleError, channel)); //错误事件的回调函数channel-SetEventCallback(std::bind(HandleEvent, loop, channel, timerid)); //任意事件的回调函数//非活跃连接的超时释放操作10s后关闭连接//注意定时销毁任务必须在启动读事件之前因为有可能启动了事件监控后立即就有了事件就去处理事件了//但是这时候还没有定时任务之后就会去执行TimerRefresh虽然我们做了防御性编程但这总归来说是不好的。loop-TimerAdd(timerid, 10, std::bind(HandleClose, channel));channel-EnableRead(); //启动可读事件
}int main()
{srand(time(NULL));EventLoop loop;Socket lst_sock;lst_sock.CreateServer(8085);//为监听套接字创建一个Channel进行事件的管理以及事件的处理Channel channel(loop, lst_sock.Fd());channel.SetReadCallback(std::bind(Acceptor, loop, channel));channel.EnableRead(); //启动可读事件监控while (1){loop.Start();}lst_sock.Close();return 0;
}#include ../source/server.hppint main()
{Socket cli_socket;cli_socket.CreateClient(8085, 127.0.0.1);for (int i 0; i 5; i){std::string str hello world!!!;cli_socket.Send(str.c_str(), str.size());char buf[4096] {0};cli_socket.Recv(buf, 4095);DBG_LOG(%s, buf);sleep(1);}while (1){}return 0;
}运行结果 EventLoop模块联调中的模块流程关系图 Connection模块设计思想 目的对连接进行全方位的管理对通信连接的所有操作都是通过这个模块提供的功能完成。 管理 1。套接字的管理能够进行套接字的操作 2。连接的可读可写错误挂断任意事件的管理 3。缓冲区的管理便于socket数据的接收和发送 4。协议上下文的管理记录请求数据的处理过程用户对缓冲区进行处理拿出请求的各项信息如果不是完整的数据则只会处理一部分再等到下一次进行处理下一次数据到来了则可以继续处理若别人来处理它也知道处理到哪里了也能对其进行处理 5。回调函数的管理 一个连接接收到数据之后该如何处理由用户决定因此有业务处理的回调函数。 一个连接建立成功后该如何处理由用户决定因此有连接建立成功的回调函数。 一个连接关闭前该如何处理有用户决定因此有关闭连接时的回调函数。 任意事件的产生是否还需要某些处理业务由用户决定因此有任意事件的回调函数。 对外提供的功能 1。发送数据 – 给用户提供的发送数据接口并不是真正的发送接口而只是把数据放到发送缓冲区然后启动写事件监控 2。关闭连接 – 给用户提供的关闭连接接口应该在实际释放连接之前看看输入输出缓冲区是否有数据待处理。 3。启动非活跃连接的超时销毁功能。 4。取消非活跃连接的超时销毁功能。 5。协议切换一个连接接收数据后如何处理取决于上下文以及业务函数 之前我们一直有一个问题没解决客户端主动退出导致服务器崩溃。崩溃原因是对连接进行操作的时候但是连接已经被释放导致内存访问错误最终程序崩溃。 解决方案使用智能指针shared_ptr对Connection对象进行管理这样就能保证任意一个地方对Connection对象进行操作的时候保存了一份shared_ptr因此就算其他地方进行释放操作也只是对shared_ptr的计数器-1而不会导致Connection的实际释放。所以对外的接口都是采用shared_ptr对连接进行操作。 实现Connection模块
class Any
{
public:class holder{public:virtual ~holder(){}virtual const std::type_info type() 0; //获取当前子类的数据类型 -- 返回类型是const type_infovirtual holder* clone() 0; //针对当前的对象自身克隆出一个新的子类对象};templateclass Tclass placeholder : public holder{public:placeholder(const T val T()):_val(val){}virtual const std::type_info type(){return typeid(T);}virtual holder* clone(){return new placeholder(_val);}T _val; //主要是这里如果不通过virtual则无法释放这里的对象};Any():_content(NULL){}template class TAny(const T content):_content(new placeholderT(content)){}Any(const Any other):_content(other._content ? other._content-clone() : NULL){}~Any(){delete _content;}Any swap(Any other){std::swap(_content, other._content);return *this;}templateclass TAny operator(const T val){Any(val).swap(*this); //这样写的好处Any(val)为临时对象交换完生命周期就到了就会调用自己的析构函数return *this;}Any operator(const Any other){Any(other).swap(*this);return *this;}templateclass TT* get() //返回子类对象保存数据的指针{if (typeid(T) ! _content-type()) //如果你要的类型和我保存的类型不匹配return NULL;return (((placeholderT*)_content)-_val);}
public:holder* _content;
};typedef enum
{DISCONNECTED, //连接关闭状态CONNECTING, //连接建立成功 - 待处理状态CONNECTED, //连接建立完成各种设置已完成可以通信的状态DISCONNECTING //待关闭状态
}ConnStatu;class Connection : public std::enable_shared_from_thisConnection
{
private:uint64_t _conn_id; //连接的唯一ID也是定时任务唯一ID便于连接的管理和查找int _sockfd; //连接关联的文件描述符bool _enable_inactive_release; //连接是否启动非活跃销毁的判断标志默认为falseEventLoop* _loop; //连接所关联的一个EventLoopConnStatu _statu; //连接状态Socket _socket; //套接字操作管理Channel _channel; //连接的事件管理Buffer _in_buffer; //输入缓冲区 -- 存放从socket中读取到的数据Buffer _out_buffer; //输出缓冲区 -- 存放要发送给对端的数据Any _context; //请求的接收处理上下文//以下这四个回调函数吗是让Server模块来设置的(服务器模块的处理回调是组件使用者设置的)using ConnectedCallback std::functionvoid(const PtrConnection);using MessageCallback std::functionvoid(const PtrConnection, Buffer*);using ClosedCallback std::functionvoid(const PtrConnection);using AnyEventCallback std::functionvoid(const PtrConnection);ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;//组件内的连接关闭回调 -- 组件内设置的因为服务器组件内会把所有的连接管理起来。//一旦某个连接要关闭就应该从管理的地方移除掉自己的信息。ClosedCallback _server_closed_callback;
private://五个channel的事件回调函数//描述符可读事件触发后调用的函数接收socket数据放到接收缓冲区中然后调用_message_callbackvoid HandleRead(){char buf[65536];ssize_t ret _socket.NonBlockRecv(buf, 65535);if (ret 0){//读出错了可能是客户端关闭不能直接关闭连接因为可能有数据没发送或者有数据还没处理return ShutdownInLoop();}else if (ret 0){//表示没有读取到数据并不是连接断开因为我们调用的是自己封装的NonBlockRecvreturn;}_in_buffer.WriteAndPush(buf, ret);//2.调用message_callback进行业务处理if (_in_buffer.ReadAbleSize() 0){//shared_from_this -- 从当前对象自身获取自身的shared_ptr管理对象_message_callback(shared_from_this(), _in_buffer);}}//描述符可写事件触发后调用的函数将发送缓冲区中的数据进行发送void HandleWrite(){ssize_t ret _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());if (ret 0){if (_in_buffer.ReadAbleSize() 0){_message_callback(shared_from_this(), _in_buffer);}return ReleaseInLoop(); //实际的关闭释放操作}_out_buffer.MoveReadOffset(ret); //千万不要忘了将读偏移向后移动if (_out_buffer.ReadAbleSize() 0){_channel.DisableWrite(); //如果数据发送完了就关闭写事件监控//如果当前是连接待关闭状态并且数据发送完毕则可以将连接直接释放if (_statu DISCONNECTING){return ReleaseInLoop();}}//发送数据可能发不完不关闭写事件监控return;}//描述符触发挂断事件void HandleClose(){if (_in_buffer.ReadAbleSize() 0){_message_callback(shared_from_this(), _in_buffer);}return ReleaseInLoop();}//描述符触发出错事件void HandleError(){return HandleClose();}//描述符触发任意事件void HandleEvent(){//刷新连接活跃度if (_enable_inactive_release true){_loop-TimerRefresh(_conn_id);}//调用组件使用者的任意事件回调if (_event_callback)_event_callback(shared_from_this());}//连接获取之后所处的状态下要进行各种设置void EstablishedInLoop(){//修改连接状态assert(_statu CONNECTING);_statu CONNECTED;_channel.EnableRead();if (_connected_callback)_connected_callback(shared_from_this());}//这个接口才是实际的释放接口void ReleaseInLoop(){//修改连接状态将其置为DISCONNECTED_statu DISCONNECTED;//移除连接的事件监控_channel.Remove();//关闭描述符_socket.Close();//如果当前定时器队列中还有定时任务则取消任务if (_loop-HasTimer(_conn_id))CancelInactiveRelease();//调用关闭回调函数避免先移除服务器管理的连接信息导致Connection被释放因此先调用户的回调函数if (_closed_callback)_closed_callback(shared_from_this());if (_server_closed_callback)_server_closed_callback(shared_from_this());}//这个接口并不是实际的发送接口而只是把数据放到了发送缓冲区启动了可写事件监控//为什么要这么做因为可写条件可能不就绪即内核缓冲区的数据满了写不进去了void SendInLoop(Buffer buf){if (_statu DISCONNECTED) //如果状态已经关闭则直接return已经关闭则代表发送缓冲区数据为0return;_out_buffer.WriteBufferAndPush(buf);if (_channel.WriteAble() false)_channel.EnableWrite();}//这个关闭操作并非实际的连接释放操作需要判断还有没有数据待处理待发送void ShutdownInLoop(){_statu DISCONNECTING; //设置为半关闭状态if (_in_buffer.ReadAbleSize() 0)_message_callback(shared_from_this(), _in_buffer);if (_out_buffer.ReadAbleSize() 0)_channel.EnableWrite();//因为可能发送缓冲区将数据发送不完所以写关心就不用关闭了也不用真正释放了if (_out_buffer.ReadAbleSize() 0){_channel.DisableWrite();ReleaseInLoop();}}//启动非活跃连接超时释放规则void EnableInactiveReleaseInLoop(int sec){//将判断标志 _enable_inactive_erlease置为true_enable_inactive_release true;//如果当前定时销毁任务已经存在那就刷新延迟一下即可if (_loop-HasTimer(_conn_id))return _loop-TimerRefresh(_conn_id);//如果不存在定时销毁任务则新增_loop-TimerAdd(_conn_id, sec, std::bind(Connection::ReleaseInLoop, this));}//取消非活跃连接超时释放规则void CancelInactiveReleaseInLoop(){_enable_inactive_release false;if (_loop-HasTimer(_conn_id))_loop-TimerCancel(_conn_id);}//切换/升级协议void UpgradeInLoop(const Any context, const ConnectedCallback conn, const MessageCallback msg, const ClosedCallback closed, const AnyEventCallback event){_context context; //改变上下文_connected_callback conn;_message_callback msg;_closed_callback closed;_event_callback event;}
public:Connection(EventLoop* loop, uint64_t conn_id, int sockfd):_conn_id(conn_id),_sockfd(sockfd),_enable_inactive_release(false),_loop(loop),_statu(CONNECTING),_socket(sockfd),_channel(loop, _sockfd){_channel.SetReadCallback(std::bind(Connection::HandleRead, this));_channel.SetWriteCallback(std::bind(Connection::HandleWrite, this));_channel.SetCloseCallback(std::bind(Connection::HandleClose, this));_channel.SetErrorCallback(std::bind(Connection::HandleError, this));_channel.SetEventCallback(std::bind(Connection::HandleEvent, this));}~Connection(){DBG_LOG(RELEASE CONNEDCTION: %p, this);}//获取管理的文件描述符int Fd(){return _sockfd;}//获取连接IDint Id(){return _conn_id;}//是否处于CONNECTED状态bool Connected(){return (_statu CONNECTED);}//设置上下文 -- 连接建立完成时进行调用void SetContext(const Any context){_context context;}//获取上下文返回的是指针Any* GetContext(){return _context;}void SetConnectedCallback(const ConnectedCallback cb){_connected_callback cb;}void SetMessageCallback(const MessageCallback cb){_message_callback cb;}void SetClosedCallback(const ClosedCallback cb){_closed_callback cb;}void SetSvrClosedCallback(const ClosedCallback cb){_server_closed_callback cb;}void SetAnyEventCallback(const AnyEventCallback cb){_event_callback cb;}//连接建立就绪后进行channel回调设置启动读监控调用_connected_callbackvoid Establised(){_loop-RunInLoop(std::bind(Connection::EstablishedInLoop, this));}//发送数据将数据放到发送缓冲区启动写事件监控void Send(const char* data, size_t len){Buffer buf; //为什么要重新创建一个临时变量因为data可能是一个可能被释放的空间我们将其压入任务队列等待被执行的过程中空间可能被释放了buf.WriteAndPush(data, len);_loop-RunInLoop(std::bind(Connection::SendInLoop, this, std::move(buf)));}//提供给组件使用者的关闭接口 -- 并不实际关闭需要判断有没有数据待处理void Shutdown(){_loop-RunInLoop(std::bind(Connection::ShutdownInLoop, this));}//启动非活跃销毁并定义多长时间无通信就是非活跃添加定时任务void EnableInactiveRelease(int sec){_loop-RunInLoop(std::bind(Connection::EnableInactiveReleaseInLoop, this, sec));}//取消非活跃销毁void CancelInactiveRelease(){_loop-RunInLoop(std::bind(Connection::CancelInactiveReleaseInLoop, this));}//切换协议 -- 重置上下文以及阶段性处理函数void Upgrade(const Any context, const ConnectedCallback conn, const MessageCallback msg, const ClosedCallback closed, const AnyEventCallback event){_loop-RunInLoop(std::bind(Connection::UpgradeInLoop, this, context, conn, msg, closed, event));}
};测试
#include ../source/server.hpp//管理所有的连接
std::unordered_mapuint64_t, PtrConnection _conns;
uint64_t conn_id 0;void ConnectionDestroy(const PtrConnection conn) //这些函数未来是TcpServer模块提供的
{_conns.erase(conn-Id());
}
void OnConnected(const PtrConnection conn)
{DBG_LOG(NEW CONNECTION: %p, conn.get());
}
void OnMessage(const PtrConnection conn, Buffer* buf)
{buf-MoveReadOffset(buf-ReadAbleSize());std::string str Hello World;conn-Send(str.c_str(), str.size());
}
void Acceptor(EventLoop* loop, Channel* lst_channel)
{int fd lst_channel-Fd();int newfd accept(fd, NULL, NULL);if (newfd 0) return;conn_id;PtrConnection conn(new Connection(loop, conn_id, newfd));conn-SetMessageCallback(OnMessage);conn-SetSvrClosedCallback(ConnectionDestroy);conn-SetConnectedCallback(OnConnected);conn-EnableInactiveRelease(10); //启动非活跃超时销毁conn-Establised(); //就绪初始化_conns.insert(std::make_pair(conn_id, conn));
}int main()
{EventLoop loop;Socket lst_sock;lst_sock.CreateServer(8085);//为监听套接字创建一个Channel进行事件的管理以及事件的处理Channel channel(loop, lst_sock.Fd());channel.SetReadCallback(std::bind(Acceptor, loop, channel));channel.EnableRead(); //启动可读事件监控while (1){loop.Start();}lst_sock.Close();return 0;
}#include ../source/server.hppint main()
{Socket cli_socket;cli_socket.CreateClient(8085, 127.0.0.1);for (int i 0; i 5; i){std::string str hello world!!!;cli_socket.Send(str.c_str(), str.size());char buf[4096] {0};cli_socket.Recv(buf, 4095);DBG_LOG(%s, buf);sleep(1);}while (1){}return 0;
}运行结果 可以看到客户端主动退出并不会使服务器崩溃了。 Acceptor模块设计思想 功能对监听套接字进行管理 1。创建一个监听套接字 2。启动读事件监控 3。事件触发后获取新连接 4。调用新连接获取成功后的回调函数回调函数为新连接创建Connection进行管理。该回调函数是由Server模块提供 实现Acceptor模块
class Acceptor
{
private:Socket _socket; //用于创建监听套接字EventLoop* _loop; //用于对监听套接字进行事件监控Channel _channel; //对于对监听套接字进行事件管理using AcceptCallback std::functionvoid(int);AcceptCallback _accept_callback; //由Server模块提供的回调函数
private://监听套接字的读事件回调处理函数 -- 获取新连接调用_accept_callback函数进行新连接处理void HandleRead(){int newfd _socket.Accept();if (newfd 0)return;if (_accept_callback)_accept_callback(newfd);}int CreateServer(uint16_t port){bool ret _socket.CreateServer(port);if (ret 0)abort();return _socket.Fd();}
public:Acceptor(EventLoop* loop, int port):_loop(loop),_socket(CreateServer(port)),_channel(loop, _socket.Fd()){_channel.SetReadCallback(std::bind(Acceptor::HandleRead, this));}void SetAcceptCallback(const AcceptCallback cb){_accept_callback cb;}void Listen(){_channel.EnableRead();//启动监听的可读事件监控 -- 会自动将自己挂到poller中}
};测试
#include ../source/server.hpp//管理所有的连接
std::unordered_mapuint64_t, PtrConnection _conns;
uint64_t conn_id 0;
EventLoop loop;void ConnectionDestroy(const PtrConnection conn) //这些函数未来是TcpServer模块提供的
{_conns.erase(conn-Id());
}
void OnConnected(const PtrConnection conn)
{DBG_LOG(NEW CONNECTION: %p, conn.get());
}
void OnMessage(const PtrConnection conn, Buffer* buf)
{buf-MoveReadOffset(buf-ReadAbleSize());std::string str Hello World;conn-Send(str.c_str(), str.size());
}
void NewConnection(int fd)
{conn_id;PtrConnection conn(new Connection(loop, conn_id, fd));conn-SetMessageCallback(OnMessage);conn-SetSvrClosedCallback(ConnectionDestroy);conn-SetConnectedCallback(OnConnected);conn-EnableInactiveRelease(10); //启动非活跃超时销毁conn-Establised(); //就绪初始化_conns.insert(std::make_pair(conn_id, conn));
}int main()
{Acceptor acceptor(loop, 8085);acceptor.SetAcceptCallback(NewConnection);acceptor.Listen();while (1){loop.Start();}return 0;
}
#include ../source/server.hppint main()
{Socket cli_socket;cli_socket.CreateClient(8085, 127.0.0.1);for (int i 0; i 5; i){std::string str hello world!!!;cli_socket.Send(str.c_str(), str.size());char buf[4096] {0};cli_socket.Recv(buf, 4095);DBG_LOG(%s, buf);sleep(1);}while (1){}return 0;
}运行结果 LoopThread模块设计思想 前面我们的EventLoop并没有和线程结合起来接下来我们就要将线程和EventLoop整合起来。 关键点 1。EventLoop模块与线程是一一对应的。 2。EventLoop模块实例化的对象在构造的时候就会初始化_thread_id而后边需要判断当前是否运行在EventLoop模块对应的线程中就是将此时的线程ID与EventLoop模块中的thread_id进行一个比较相同就代表在同一个线程不同就代表当前运行线程并不是在EventLoop模块对应的线程中 如何将EventLoop分配给各个线程 1。先创建多个EventLoop对象然后创建多个线程将各个线程的ID分配给EventLoop进行设置。这种解决方案很难控制不推荐。 2。先创建线程在线程内部实例化EventLoop对象EventLoop对象就可以直接分配当前的线程ID。 实现LoopThread模块
class LoopThread
{
private:std::mutex _mutex; //互斥锁std::condition_variable _cond; //条件变量EventLoop* _loop;std::thread _thread; //EventLoop对应的线程
private://实例化EventLoop对象并且开始运行EventLoop模块的功能void ThreadEntry(){EventLoop loop; //因为下面Start会一直循环运行所以EventLoop的生命周期不会结束{std::unique_lockstd::mutex lock(_mutex);_loop loop;_cond.notify_all();}loop.Start();}
public:LoopThread():_loop(NULL),_thread(std::thread(LoopThread::ThreadEntry))//创建线程设定线程入口函数{}//返回当前线程关联的EventLoop对象指针EventLoop* GetLoop(){EventLoop* loop NULL;{std::unique_lockstd::mutex lock(_mutex); //加锁//第二个参数时一个bool的函数如果为false就一直阻塞住被唤醒才能继续往下走_cond.wait(lock, [](){return _loop ! nullptr;});loop _loop;}return loop;}
};测试
#include ../source/server.hpp//管理所有的连接
std::unordered_mapuint64_t, PtrConnection _conns;
uint64_t conn_id 0;
EventLoop base_loop;
std::vectorLoopThread threads(2);
int next_loop 0;void ConnectionDestroy(const PtrConnection conn) //这些函数未来是TcpServer模块提供的
{_conns.erase(conn-Id());
}
void OnConnected(const PtrConnection conn)
{DBG_LOG(NEW CONNECTION: %p, conn.get());
}
void OnMessage(const PtrConnection conn, Buffer* buf)
{buf-MoveReadOffset(buf-ReadAbleSize());std::string str Hello World;conn-Send(str.c_str(), str.size());
}
void NewConnection(int fd)
{conn_id;next_loop (next_loop 1) % 2;PtrConnection conn(new Connection(threads[next_loop].GetLoop(), conn_id, fd));conn-SetMessageCallback(OnMessage);conn-SetSvrClosedCallback(ConnectionDestroy);conn-SetConnectedCallback(OnConnected);conn-EnableInactiveRelease(10); //启动非活跃超时销毁conn-Establised(); //就绪初始化_conns.insert(std::make_pair(conn_id, conn));
}int main()
{Acceptor acceptor(base_loop, 8085);acceptor.SetAcceptCallback(NewConnection);acceptor.Listen();while (1){base_loop.Start();}return 0;
}#include ../source/server.hppint main()
{Socket cli_socket;cli_socket.CreateClient(8085, 127.0.0.1);for (int i 0; i 5; i){std::string str hello world!!!;cli_socket.Send(str.c_str(), str.size());char buf[4096] {0};cli_socket.Recv(buf, 4095);DBG_LOG(%s, buf);sleep(1);}while (1){}return 0;
}运行结果 这里就完成了主从Reactor的模型主线程只负责对监听套接字进行处理保证了连接的效率不会因为业务的处理而导致无法处理新连接从属线程对通信连接进行监控和处理。 LoopThreadPool模块设计思想 该模块是针对LoopThread设计一个线程池可以对所有的LoopThread进行管理及分配。 功能 1。线程数量可配置0个或多个。在服务器中主从Reactor模型是主线程只负责新连接获取从属线程负责新连接的事件监控及处理。线程池中有可能从属线程会数量为0也就是单Reactor服务器一个线程负责获取新连接也负责新连接的处理。 2。对所有的线程进行管理管理0个或多个LoopThread对象。 3。提供线程分配的功能。当主线程获取了一个新连接需要将新连接挂到从属线程上进行事件监控及处理假设有0个从属线程则直接分配给主线程的EventLoop进行处理假设有多个从属线程则采用RR轮转思想进行线程的分配将对应线程的EventLoop获取到设置给对应的Connection 实现LoopThread模块
class LoopThreadPool
{
private:int _thread_count; //从属线程的数量int _next_idx;EventLoop* _baseLoop; //主EventLoop运行在主线程从属线程数量为0则所有操作都在baseloop中进行std::vectorLoopThread* _threads; //保存所有的LoopThread对象std::vectorEventLoop* _loops; //从属线程数量大于0则从_loops中进行线程EventLoop分配
public:LoopThreadPool(EventLoop* baseLoop):_thread_count(0),_next_idx(0),_baseLoop(baseLoop){}//设置线程数量void SetThreadCount(int count){_thread_count count;}//创建所有的从属线程void Create(){if (_thread_count 0){_threads.resize(_thread_count);_loops.resize(_thread_count);for (int i 0; i _thread_count; i){_threads[i] new LoopThread;_loops[i] _threads[i]-GetLoop();}}}//为了实现RR轮转返回下一个从属线程的EventLoopEventLoop* NextLoop(){if (_thread_count 0){return _baseLoop;}_next_idx (_next_idx 1) % _thread_count;return _loops[_next_idx];}
};测试
#include ../source/server.hpp//管理所有的连接
std::unordered_mapuint64_t, PtrConnection _conns;
uint64_t conn_id 0;
EventLoop base_loop;
LoopThreadPool* loopPool;void ConnectionDestroy(const PtrConnection conn) //这些函数未来是TcpServer模块提供的
{_conns.erase(conn-Id());
}
void OnConnected(const PtrConnection conn)
{DBG_LOG(NEW CONNECTION: %p, conn.get());
}
void OnMessage(const PtrConnection conn, Buffer* buf)
{buf-MoveReadOffset(buf-ReadAbleSize());std::string str Hello World;conn-Send(str.c_str(), str.size());
}
void NewConnection(int fd)
{conn_id;PtrConnection conn(new Connection(loopPool-NextLoop(), conn_id, fd));conn-SetMessageCallback(OnMessage);conn-SetSvrClosedCallback(ConnectionDestroy);conn-SetConnectedCallback(OnConnected);conn-EnableInactiveRelease(10); //启动非活跃超时销毁conn-Establised(); //就绪初始化_conns.insert(std::make_pair(conn_id, conn));DBG_LOG(获取到了一个新的连接);
}int main()
{loopPool new LoopThreadPool(base_loop);// loopPool-SetThreadCount(2);loopPool-Create();Acceptor acceptor(base_loop, 8085);acceptor.SetAcceptCallback(NewConnection);acceptor.Listen();while (1){base_loop.Start();}return 0;
}#include ../source/server.hppint main()
{Socket cli_socket;cli_socket.CreateClient(8085, 127.0.0.1);for (int i 0; i 5; i){std::string str hello world!!!;cli_socket.Send(str.c_str(), str.size());char buf[4096] {0};cli_socket.Recv(buf, 4095);DBG_LOG(%s, buf);sleep(1);}while (1){}return 0;
}创建两个从属线程运行结果 只有一个主线程运行结果 TcpServer模块设计思想 这个模块是对所有模块的整合通过TcpServer模块实例化的对象可以非常简单的完成一个服务器的搭建。 管理 1。Acceptor对象创建一个监听套接字 2。EventLoop对象对监听套接字的事件监控 3。std::unordered_mapuint64_t, PtrConnection _conns实现对所有新建连接的管理 4。LoopThreadPool对象创建Loop线程池对通信连接进行事件监控及处理 功能 1。设置从属线程池数量 2。启动服务器 3。设置各种回调函数连接建立完成消息关闭任意用户设置给TcpServerTcpServer设置给获取的新连接 4。是否启动非活跃连接超时销毁功能 5。添加定时任务功能 流程 1。在TcpServer中实例化一个Acceptor对象以及一个EventLoop对象baseloop 2。将Acceptor挂到baseloop上进行事件监控 3。一旦Acceptor对象就绪了可读事件则执行读事件回调函数获取新通信连接 4。对新连接创建一个Connection进行管理。 5。对连接对应的Connection设置功能回调新建连接完成消息关闭任意事件 6。启动Connection的非活跃连接的超时销毁规则 7。将新连接对应的Connection挂到LoopThreadPool中的从属线程对应的EventLoop中进行事件监控 8。一旦Connection对应的连接就绪了可读事件则这时候执行读事件回调函数读取数据读取完毕后调用TcpServer设置的消息回调。 实现TcpServer模块
class TcpServer
{
private:uint16_t _port;uint64_t _next_id; //这是Connection和定时任务公用的idint _timeout; //非活跃连接的超时时间bool _enable_inactive_release; //是否启动了非活跃连接超时销毁的判断标志EventLoop _baseloop; //主线程EventLoop对象负责 监听事件的处理Acceptor _acceptor; //监听套接字的管理对象LoopThreadPool _pool; //这是从属EventLoop线程池std::unordered_mapuint64_t, PtrConnection _conns; //保存管理所有连接对应的shared_ptr对象 -- 这里面的东西被删除就意味着这个连接在某个不久的将来会被释放//用户设置的回调函数 -- 未来要设置给Connectionusing ConnectedCallback std::functionvoid(const PtrConnection);using MessageCallback std::functionvoid(const PtrConnection, Buffer*);using ClosedCallback std::functionvoid(const PtrConnection);using AnyEventCallback std::functionvoid(const PtrConnection);using Functor std::functionvoid();ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;
private://为新连接构造一个Connectionvoid NewConnection(int fd){_next_id;PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));conn-SetMessageCallback(_message_callback);conn-SetClosedCallback(_closed_callback);conn-SetSvrClosedCallback(std::bind(TcpServer::RemoveConnection, this, std::placeholders::_1));conn-SetConnectedCallback(_connected_callback);conn-SetAnyEventCallback(_event_callback);if (_enable_inactive_release true)conn-EnableInactiveRelease(_timeout); //启动非活跃超时销毁conn-Establised(); //就绪初始化_conns.insert(std::make_pair(_next_id, conn));DBG_LOG(获取到了一个新的连接);}void RemoveConnectionInLoop(const PtrConnection conn){int id conn-Id();auto it _conns.find(id);if (it ! _conns.end()){_conns.erase(it);}}//从管理Connection的_conns中移除连接信息 -- 因为对STL容器进行操作所以需要考虑线程安全问题void RemoveConnection(const PtrConnection conn){_baseloop.RunInLoop(std::bind(TcpServer::RemoveConnectionInLoop, this, conn));}void RunAfterInLoop(const Functor task, int delay){_next_id;_baseloop.TimerAdd(_next_id, delay, task);}
public:TcpServer(int port):_port(port), _next_id(0), _enable_inactive_release(false), _acceptor(_baseloop, port),_pool(_baseloop){_acceptor.SetAcceptCallback(std::bind(TcpServer::NewConnection, this, std::placeholders::_1));_acceptor.Listen();}//设置从属线程数量void SetThreadCount(int count){return _pool.SetThreadCount(count);}void SetConnectedCallback(const ConnectedCallback cb){_connected_callback cb;}void SetMessageCallback(const MessageCallback cb){_message_callback cb;}void SetClosedCallback(const ClosedCallback cb){_closed_callback cb;}void SetAnyEventCallback(const AnyEventCallback cb){_event_callback cb;}void EnalbeInactiveRelease(int timeout){_timeout timeout;_enable_inactive_release true;}//用于添加一个定时任务 -- 提供给我们用户的void RunAfter(const Functor task, int delay){_baseloop.RunInLoop(std::bind(TcpServer::RunAfterInLoop, this, task, delay));}void Start(){_pool.Create();_baseloop.Start();}
};
//可以把网络通信看成一个管道通信这个类的功能是防止客户端退出即读端关闭服务器写端还在写服务器极小几率可能会收到管道信号导致服务器退出。
class NetWork
{
public:NetWork(){DBG_LOG(SIGPIPE INIT);signal(SIGPIPE, SIG_IGN);}
};
static NetWork nw; //主要是调用该类的构造函数测试
#include ../source/server.hppvoid OnConnected(const PtrConnection conn)
{DBG_LOG(NEW CONNECTION: %p, conn.get());
}
void OnClosed(const PtrConnection conn)
{DBG_LOG(CLOSE CONNECTION:%p, conn.get());
}
void OnMessage(const PtrConnection conn, Buffer* buf)
{DBG_LOG(%s, buf-ReadPosition());buf-MoveReadOffset(buf-ReadAbleSize());std::string str Hello World;conn-Send(str.c_str(), str.size());
}int main()
{TcpServer server(8085);server.SetThreadCount(2);server.EnalbeInactiveRelease(10);server.SetConnectedCallback(OnConnected);server.SetMessageCallback(OnMessage);server.Start();return 0;
}#include ../source/server.hppint main()
{Socket cli_socket;cli_socket.CreateClient(8085, 127.0.0.1);for (int i 0; i 5; i){std::string str hello world!!!;cli_socket.Send(str.c_str(), str.size());char buf[4096] {0};cli_socket.Recv(buf, 4095);DBG_LOG(%s, buf);sleep(1);}while (1){}return 0;
}运行结果 基于TcpServer实现回显服务器 回显服务器就相当于是用户使用的了 目前为止项目的所有的文件结构 实现Echo.hpp服务器
#include ../server.hppclass EchoServer
{
private:TcpServer _server;
private:void OnConnected(const PtrConnection conn){DBG_LOG(NEW CONNECTGION:%p, conn.get());}void OnClosed(const PtrConnection conn){DBG_LOG(CLOSE CONNECTION:%p, conn.get());}//我们将echo服务器设置为短连接即处理一次业务逻辑就关闭连接因为长连接太占用服务器资源了void OnMessage(const PtrConnection conn, Buffer* buf){conn-Send(buf-ReadPosition(), buf-ReadAbleSize());buf-MoveReadOffset(buf-ReadAbleSize());conn-Shutdown();}
public:EchoServer(int port):_server(port){_server.SetThreadCount(2);_server.EnalbeInactiveRelease(10);_server.SetClosedCallback(std::bind(EchoServer::OnClosed, this, std::placeholders::_1));_server.SetConnectedCallback(std::bind(EchoServer::OnConnected, this, std::placeholders::_1));_server.SetMessageCallback(std::bind(EchoServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));}void Start(){_server.Start();}
};可以看到如果我们想基于TCP搭建一个服务器这个TCP服务器提供了很多功能用户想用哪些方法就可以用也可以不用。很简单的就可以搭建完成。 测试 main.cc
#include echo.hppint main()
{EchoServer server(8085);server.Start();return 0;
}tcp_cli.cpp
#include ../source/server.hppint main()
{Socket cli_socket;cli_socket.CreateClient(8085, 127.0.0.1);for (int i 0; i 5; i){std::string str hello world!!!;cli_socket.Send(str.c_str(), str.size());char buf[4096] {0};cli_socket.Recv(buf, 4095);DBG_LOG(%s, buf);sleep(1);}while (1){}return 0;
}运行结果 EchoServer回显服务器性能测试 用Webbench来测试Webbench的原理就是使用fork模拟多个客户端同时访问我们的服务器 下载好后可以直接拖拽进来并解压。 进入目录make编译 ./webbench -c 10000 -t 20 http://127,0.0.1:8085/以1万并发量访问20s不断的访问服务器 运行结果图片有限制大小剪辑了 speed每分钟能处理的请求数量365082每秒钟可以传递的字节数369458。 EchoServer回显服务器模块关系图 HTTP协议模块的子模块划分 下面开始我们项目第二个大模块的设计协议模块 协议支持模块当前我们的服务器只实现了对HTTP服务器的支持毕竟应用层的协议千千万我们用HTTP作为最为典型的协议作为一个基本的实现。通过这些应用层协议来搭建指定应用的服务器会变得更加简单。 我们的HTTP协议模块又划分为了以下几个主要模块 Util工具模块主要是实现我们的工具接口比如像我们的文件读写URL的编码和解码以及关于HTTP的状态码与描述的映射关系。 Request和Response模块HTTP协议分为HTTP请求和HTTP响应我们进行业务处理的时候就要根据里面要素信息才能明确客户端是想要什么样的请求。这两个模块是专门来存储HTTP请求和响应里面的关键信息。 HttpContext模块HTTP请求接收的上下文模块 我们之前也讲过在我们的高并发服务器里一个客户端对应的新建连接如果它现在收到了一个数据收到了数据就需要进行处理比如我们接收到了一个请求万一这不是一个完整的请求怎么办因此我们需要通过一个上下文将我们已经接收到的数据进行解析并存起来并且记录下它现在处于什么样的解析阶段然后等到下次有新数据到来了再按照当前阶段再进行下一步的处理。 我们接下来要做的事情就是将这些模块整合起来来实现一个HttpServer模块而这个HttpServer模块它所提供的功能就是基于TCP的服务器来实现的接收到一个数据之后对数据进行解析解析之后得到一个HttpRequest并根据HttpRequest进行业务处理业务处理了之后填充一个HttpResponse响应信息然后将这里面的信息进行一个组织序列化发送给Http客户端这就是该服务器的大致工作流程。 Util工具类设计思想
Util工具类字符串分割函数实现
#include ../server.hppclass Util
{
public://字符串分割函数将src字符串按照sep字符进行分割得到的各个子串放到arry中最终返回子串的数量static size_t Split(const std::string src, const std::string sep, std::vectorstd::string* arry){size_t offset 0;while (offset src.size()){size_t pos src.find(sep, offset);if (pos src.npos){arry-push_back(src.substr(offset));return arry-size();}if (pos ! offset)//如果不是一个空串就加入结果arry-push_back(src.substr(offset, pos - offset));offset pos sep.size();}}//读取文件内容static bool ReadFile();//向文件写入数据static bool WriteFile();//URL编码static std::stringUrlEncode();//URL解码static std::string UrlDecode();//响应状态码的描述信息获取static std::string StatuDesc();//根据文件后缀名获取文件mimestatic std::string ExtMime();//判断一个文件是否是一个目录static bool IsDirectory();//判断一个文件是否是一个普通文件static bool IsRegular();//http请求的资源路径有效性判断static bool ValidPath();
};测试
#include http.hppint main()
{std::string str ,,abc,bcd,efg;std::vectorstd::string arry;Util::Split(str, ,, arry);for (auto s : arry){std::cout [ s ]\n; }return 0;
}运行结果 Util工具类文件数据读取函数实现 如何获取一个文件的大小 1。stat函数别人提供好的 2。把文件偏移移动到文件末尾此时末尾偏移就是文件的大小。用seekg对文件偏移量进行操作。 //读取文件内容将读取的内容放到一个String中如果放到Buffer中会用到临时空间比如读1G的文件就会占用2G的空间
static bool ReadFile(const std::string filename, std::string* buf)
{std::ifstream ifs(filename, std::ios::binary);if (ifs.is_open()){printf(open %s file failed!!, filename.c_str());return false;}size_t fsize 0;ifs.seekg(0, ifs.end); //将文件偏移移动到文件的末尾fsize ifs.tellg(); //此时偏移的位置就是文件的大小ifs.seekg(0, ifs.beg); //将文件偏移移动到文件起始位置buf-resize(fsize);ifs.read((*buf)[0], fsize);if (ifs.good() false) //判断上次操作是否出问题{printf(read %s file failed!!, filename.c_str());ifs.close();return false;}ifs.close();return true;
}测试
#include http.hppint main()
{std::string str;Util::ReadFile(./http.hpp, str);std::cout str std::endl;return 0;
}运行结果 Util工具类文件数据写入函数实现
//向文件写入数据
static bool WriteFile(const std::string filename, const std::string buf)
{std::ofstream ofs(filename, std::ios::binary | std::ios::trunc); //trunc: 截断不要文件的原有内容即覆盖写if (!ofs.is_open()){printf(open %s file failed!!, filename.c_str());return false;}ofs.write(buf.c_str(), buf.size());if (ofs.good() false){ERR_LOG(write %s file failed!, filename.c_str());ofs.close();return false;}ofs.close();return true;
}测试
#include http.hppint main()
{std::string str;Util::ReadFile(./http.hpp, str);Util::WriteFile(./ttttttttt.c, str);return 0;
}运行结果 可以看到两个文件的md5值相同说明文件内容相同。 Util工具类UrlEncode函数实现 URL编码避免URL中资源路径与查询字符串中的特殊字符串与HTTP请求中特殊字符产生歧义。 RFC3986文档规定编码格式将一个特殊字符的ASCII值转换为前缀% 占2为的十六进制字符。比如 C转换为C%2B%2B的ASCII值为43十六进制为2B。.-_~以及字母和数字属于绝对不编码字符。 在W3C标准中规定查询字符串中的空格需要编码为解码则是转空格。 //URL编码
static std::string UrlEncode(const std::string url, bool convert_space_to_plus)
{std::string res;for (auto c : url){if (c . || c - || c _ || c ~ || isalnum(c)){res c;continue;}else if (c convert_space_to_plus true){res ;continue;}//剩下的字符都是需要编码成为%HH格式char tmp[4];snprintf(tmp, 4, %%%02X, c);//前两个%表示一个%后面的是%02X表示用十六进制表示res tmp;}return res;
}测试
#include http.hppint main()
{std::string str /login?userbitepasswdC;std::string res Util::UrlEncode(str, false);std::cout res std::endl;return 0;
}运行结果 Util工具类UrlDecode函数实现 遇到了%则将紧随其后的2个字符转为数字第一个数字左移4为然后加上第二个数字如十六进制是2B(2 4) B 43 static char HEXTOI(char c)
{if (c 0 c 9)return c - 0;else if (c a c z)return c - a 10;else if (c A c Z)return c - A 10;return -1;}//URL解码static std::string UrlDecode(const std::string url, bool convert_plus_to_space){std::string res;for (int i 0; i url.size(); i){if (url[i] % i 2 url.size()){char v1 HEXTOI(url[i 1]);char v2 HEXTOI(url[i 2]);char v (v1 * 16) v2;res v;i 2;continue;}if (convert_plus_to_space true url[i] ){res ;continue;}res url[i];}return res;}测试
#include http.hppint main()
{std::string str C ;std::string res Util::UrlEncode(str, true);std::string tmp Util::UrlDecode(res, true);std::cout [ res ]\n;std::cout [ tmp ]\n;return 0;
}运行结果 Util工具类Mime与Statu
//这些信息在文档里查看
std::unordered_mapint, std::string _statu_msg {{100, Continue},{101, Switching Protocol},{102, Processing},{103, Early Hints},{200, OK},{201, Created},{202, Accepted},{203, Non-Authoritative Information},{204, No Content},{205, Reset Content},{206, Partial Content},{207, Multi-Status},{208, Already Reported},{226, IM Used},{300, Multiple Choice},{301, Moved Permanently},{302, Found},{303, See Other},{304, Not Modified},{305, Use Proxy},{306, unused},{307, Temporary Redirect},{308, Permanent Redirect},{400, Bad Request},{401, Unauthorized},{402, Payment Required},{403, Forbidden},{404, Not Found},{405, Method Not Allowed},{406, Not Acceptable},{407, Proxy Authentication Required},{408, Request Timeout},{409, Conflict},{410, Gone},{411, Length Required},{412, Precondition Failed},{413, Payload Too Large},{414, URI Too Long},{415, Unsupported Media Type},{416, Range Not Satisfiable},{417, Expectation Failed},{418, Im a teapot},{421, Misdirected Request},{422, Unprocessable Entity},{423, Locked},{424, Failed Dependency},{425, Too Early},{426, Upgrade Required},{428, Precondition Required},{429, Too Many Requests},{431, Request Header Fields Too Large},{451, Unavailable For Legal Reasons},{501, Not Implemented},{502, Bad Gateway},{503, Service Unavailable},{504, Gateway Timeout},{505, HTTP Version Not Supported},{506, Variant Also Negotiates},{507, Insufficient Storage},{508, Loop Detected},{510, Not Extended},{511, Network Authentication Required}
};
//响应状态码的描述信息获取
static std::string StatuDesc(int statu)
{auto it _statu_msg.find(statu);if (it ! _statu_msg.end()){return it-second;}return Unkown;
}测试
#include http.hppint main()
{std::cout Util::StatuDesc(200) std::endl;std::cout Util::StatuDesc(302) std::endl;return 0;
}运行结果 std::unordered_mapstd::string, std::string _mime_msg {{.aac, audio/aac},{.abw, application/x-abiword},{.arc, application/x-freearc},{.avi, video/x-msvideo},{.azw, application/vnd.amazon.ebook},{.bin, application/octet-stream},{.bmp, image/bmp},{.bz, application/x-bzip},{.bz2, application/x-bzip2},{.csh, application/x-csh},{.css, text/css},{.csv, text/csv},{.doc, application/msword},{.docx, application/vnd.openxmlformats-officedocument.wordprocessingml.document},{.eot, application/vnd.ms-fontobject},{.epub, application/epubzip},{.gif, image/gif},{.htm, text/html},{.html, text/html},{.ico, image/vnd.microsoft.icon},{.ics, text/calendar},{.jar, application/java-archive},{.jpeg, image/jpeg},{.jpg, image/jpeg},{.js, text/javascript},{.json, application/json},{.jsonld, application/ldjson},{.mid, audio/midi},{.midi, audio/x-midi},{.mjs, text/javascript},{.mp3, audio/mpeg},{.mpeg, video/mpeg},{.mpkg, application/vnd.apple.installerxml},{.odp, application/vnd.oasis.opendocument.presentation},{.ods, application/vnd.oasis.opendocument.spreadsheet},{.odt, application/vnd.oasis.opendocument.text},{.oga, audio/ogg},{.ogv, video/ogg},{.ogx, application/ogg},{.otf, font/otf},{.png, image/png},{.pdf, application/pdf},{.ppt, application/vnd.ms-powerpoint},{.pptx, application/vnd.openxmlformats-officedocument.presentationml.presentation},{.rar, application/x-rar-compressed},{.rtf, application/rtf},{.sh, application/x-sh},{.svg, image/svgxml},{.swf, application/x-shockwave-flash},{.tar, application/x-tar},{.tif, image/tiff},{.tiff, image/tiff},{.ttf, font/ttf},{.txt, text/plain},{.vsd, application/vnd.visio},{.wav, audio/wav},{.weba, audio/webm},{.webm, video/webm},{.webp, image/webp},{.woff, font/woff},{.woff2, font/woff2},{.xhtml, application/xhtmlxml},{.xls, application/vnd.ms-excel},{.xlsx, application/vnd.openxmlformats-officedocument.spreadsheetml.sheet},{.xml, application/xml},{.xul, application/vnd.mozilla.xulxml},{.zip, application/zip},{.3gp, video/3gpp},{.3g2, video/3gpp2},{.7z, application/x-7z-compressed}
};
//根据文件后缀名获取文件mime
static std::string ExMime(const std::string filename)
{size_t pos filename.find_last_of(.);if (pos std::string::npos){return application/occtet-stream;}//根据扩展名获取mimestd::string ext filename.substr(pos);auto it _mime_msg.find(ext);if (it _mime_msg.end()){return application/occtet-stream;}return it-second;
}测试
#include http.hppint main()
{std::cout Util::ExtMime(a.txt) std::endl;std::cout Util::ExtMime(a.png) std::endl;return 0;
}运行结果 Util工具类文件类型判断接口实现 如何判断文件类型 用stat函数 //判断一个文件是否是一个目录
static bool IsDirectory(const std::string filename)
{struct stat st;int ret stat(filename.c_str(), st); //获取到文件属性if (ret 0){return false;}return S_ISDIR(st.st_mode);
}
//判断一个文件是否是一个普通文件
static bool IsRegular(const std::string filename)
{struct stat st;int ret stat(filename.c_str(), st); //获取到文件属性if (ret 0){return false;}return S_ISREG(st.st_mode);
}测试
#include http.hppint main()
{std::cout Util::IsRegular(main.cc) std::endl;std::cout Util::IsRegular(../Http) std::endl;std::cout Util::IsDirectory(main.cc) std::endl;std::cout Util::IsDirectory(../Http) std::endl;return 0;
}运行结果 Util工具类路径有效性判断接口实现 思想按照/进行路径分割根据有多少子目录计算目录深度有多少层深度不能小于0 //http请求的资源路径有效性判断
static bool ValidPath(const std::string path)
{std::vectorstd::string subdir;Util::Split(path, /, subdir);int level 0;for (auto dir : subdir){if (dir ..){level--;if (level 0)//任意一层走出相对根目录就认为有问题{return false;}}level;}return true;
}测试
#include http.hppint main()
{std::cout Util::ValidPath(/abc/eee/../ff) std::endl;std::cout Util::ValidPath(/../Http) std::endl;return 0;
}运行结果: HttpRequest模块设计思想 功能存储HTTP请求信息要素提供简单的功能性接口 要素请求方法资源路径查询字符串头部字段正文协议版本std::smatch保存首行使用regex正则进行解析后所提取的数据比如提取资源路径中的数字 功能性接口 1。将成员变量设置为公有成员便于直接访问 2。提供查询字符串以及头部字段的单个查询和获取插入功能 3。获取正文长度 4。判断长连接和短连接 class HttpRequest
{
public:std::string _method; //请求方法std::string _path; //资源路径std::string _version; //协议版本std::string _body; //请求正文std::smatch _matches; //资源路径的正则提取数据std::unordered_mapstd::string, std::string _headers; //头部字段std::unordered_mapstd::string, std::string _params; //查询字符串
public:HttpRequest():_version(HTTP/1.1){}//重置 -- 每一次上下文里面的内容处理完了我们就要重置一下因为不重置就代表这个请求还在处理这次的信息就会对下次的请求信息造成影响void ReSet(){_method.clear();_path.clear();_version HTTP/1.1;_body.clear();std::smatch match; //因为smatch没有clear函数通过swap也可以达到clear的效果_matches.swap(match);_headers.clear();_params.clear();}//插入头部字段void SetHeader(const std::string key, const std::string val){_headers.insert({key, val});}//判断是否存在指定头部字段bool HasHeader(const std::string key){auto it _headers.find(key);if (it _headers.end())return false;return true;}//获取指定头部字段std::string GetHeader(const std::string key) const {auto it _headers.find(key);if (it _headers.end()){return ;}return it-second;}//插入查询字符串void SetParam(std::string key, std::string val){_params.insert({key, val});}//判断是否有某个指定的查询字符串bool HasParam(std::string key){auto it _params.find(key);if (it _params.end())return false;return true;}//获取指定的查询字符串std::string GetParam(std::string key){auto it _params.find(key);if (it _params.end()){return ;}return it-second;}//获取正文长度size_t ContentLength(){bool ret HasHeader(Content-Length);if (ret false)return 0;return std::stoi(GetHeader(Content-Length));}//判断是否是短连接bool Close() const{if (GetHeader(Connection) keep-alive)return false;return true;}
};HttpResponse模块设计思想 功能存储HTTP响应信息要素提供简单的功能性接口 要素响应状态码头部字段响应正文重定向信息是否进行了重定向的标志重定向的路径 功能性接口 1。头部字段的新增查询获取 2。正文的设置 3。重定向的设置 4。长短连接的判断 class HttpResponse
{
public:int _statu; //状态码bool _redirect_flag; //是否重定向std::string _body; //响应正文std::string _redirect_url; //重定向路径std::unordered_mapstd::string, std::string _headers; //响应头部字段
public:HttpResponse(int statu 200):_redirect_flag(false), _statu(statu){}void ReSet(){_statu 200;_redirect_flag false;_body.clear();_redirect_url.clear();_headers.clear();}//插入头部字段void SetHeader(const std::string key, const std::string val){_headers.insert({key, val});}//判断是否存在指定头部字段bool HasHeader(const std::string key){auto it _headers.find(key);if (it _headers.end())return false;return true;}//获取指定头部字段std::string GetHeader(const std::string key){auto it _headers.find(key);if (it _headers.end()){return ;}return it-second;}void SetContent(const std::string body, const std::string type text/html){_body body;SetHeader(Content-Type, type);}void SetRedirect(const std::string url, int statu 302){_statu statu;_redirect_flag true;_redirect_url url;}//判断是否是短连接bool Close(){if (GetHeader(Connection) keep-alive)return false;return true;}
};HttpContext模块设计思想
typedef enum
{RECV_HTTP_ERROR, //出错RECV_HTTP_LINE, //请求行RECV_HTTP_HEAD, //请求头部RECV_HTTP_BODY, //请求正文RECV_HTTP_OVER //结束阶段
}HttpRecvStatu;#define MAX_LINE 8192
class HttpContext
{
public:int _resp_statu; //响应状态码HttpRecvStatu _recv_statu; //当前接收机解析的阶段状态HttpRequest _request; //已经解析得到的请求信息
private:bool ParseHttpLine(const std::string line){std::smatch matches;std::regex e((GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:\\?(.*))? (HTTP/1\\.[01])(?:\n|\r\n)?, std::regex::icase);//icase忽略大小写bool ret std::regex_match(line, matches, e);if (ret false){_recv_statu RECV_HTTP_ERROR;_resp_statu 400; //BAD REQUESTreturn false;}// 0: GET /biejiuyeke/login?userxiaomingpasswd123123 HTTP/1.1 size:60// 1: GET size:3// 2: /biejiuyeke/login size:17// 3: userxiaomingpasswd123123 size:27// 4: HTTP/1.1 size:8//第0个是url本身第一个是请求方法第二个是资源路径第三个是查询字符串第四个是协议版本//请求方法的获取_request._method matches[1];std::transform(_request._method.begin(), _request._method.end(), _request._method.begin(), ::toupper);//字母都转为大写//查询路径可能也是经过url编码的资源路径的获取需要进行URL解码操作但是不需要转空格_request._path Util::UrlDecode(matches[2], false);//协议版本获取_request._version matches[4];//查询字符串的获取与处理std::vectorstd::string query_string_arry;std::string query_string matches[3];//查询字符串的格式keyvalkeyvalkeyval...以符号进行分割得到各个子串Util::Split(query_string, , query_string_arry);//针对各个子串以符号进行分割得到key和val得到之后也需要进行URL解码for (auto str : query_string_arry){size_t pos str.find(); //查询字符串都是一种keyvalue的字符串if (pos std::string::npos){_recv_statu RECV_HTTP_ERROR;_resp_statu 400; //BAD REQUESTreturn false;}std::string key Util::UrlDecode(str.substr(0, pos), true);std::string val Util::UrlDecode(str.substr(pos 1), true);_request.SetParam(key, val);}return true;}//接收请求行bool RecvHttpLine(Buffer* buf){if (_recv_statu ! RECV_HTTP_LINE)return false;//1。获取一行数据带有末尾的换行std::string line buf-GetLineAndPop(); //这就是为什么之前设计的时候读取带有末尾的换行//2。需要考虑的一些要素缓冲区中的数据不足一行或者获取一行的数据超大if (line.size() 0){//说明缓冲区中数据不足一行if (buf-ReadAbleSize() MAX_LINE) //判断缓冲区中的可读数据长度如果很长了都不足一行说明有问题{_recv_statu RECV_HTTP_ERROR;_resp_statu 414; //URI TOO LONGreturn false;}//缓冲区中的数据不足一行长度也合理就等下一次的新数据到来再处理return true;}if (line.size() MAX_LINE){_recv_statu RECV_HTTP_ERROR;_resp_statu 414; //URI TOO LONGreturn false;}//走到这里说明得到了完整的请求行就可以进行解析了bool ret ParseHttpLine(line);if (ret false)return false;//首行处理完毕进入头部获取阶段_recv_statu RECV_HTTP_HEAD;return true;}bool ParseHttpHead(std::string line){if (line.back() \n)line.pop_back();if (line.back() \r)line.pop_back();size_t pos line.find(: );if (pos line.npos){_recv_statu RECV_HTTP_ERROR;_resp_statu 400;return false;}std::string key line.substr(0, pos);std::string val line.substr(pos 2);_request.SetHeader(key, val);return true;}//接收请求头部字段bool RecvHttpHead(Buffer* buf){if (_recv_statu ! RECV_HTTP_HEAD)return false;while (1){//一行一行取出数据知道遇到空行为止头部格式 key: val\r\nkey: val\r\n...//1。获取一行数据std::string line buf-GetLineAndPop();//2。需要考虑的一些要素缓冲区中的数据不足一行获取的一行数据超大if (line.size() 0){//缓冲区中的数据不足一行则需要判断缓冲区的可读数据长度如果很长都不足一行这是有问题的if (buf-ReadAbleSize() MAX_LINE){_recv_statu RECV_HTTP_ERROR;_resp_statu 414;return false;}//缓冲区中数据不足一行符合长度预期就等待新数据的到来return true;}if (line.size() MAX_LINE){_recv_statu RECV_HTTP_ERROR;_resp_statu 414; //URI TOO LONGreturn false;}if (line \n || line \r\n) //说明读到了空行break;bool ret ParseHttpHead(line);if (ret false)return false;}//头部处理完毕进入正文获取阶段_recv_statu RECV_HTTP_BODY;return true;}//接收请求正文bool RecvHttpBody(Buffer* buf){if (_recv_statu ! RECV_HTTP_BODY)return false;//1。获取正文长度size_t content_length _request.ContentLength();if (content_length 0){//没有正文则请求接收解析完毕_recv_statu RECV_HTTP_OVER;return true;}//2。当前已经接收了多少正文其实就是往_request._body中放了多少数据size_t real_len content_length - _request._body.size(); //实际还要接收的正文长度//3。接收正文放到body中但是也要考虑当前缓冲区中的数据是否是全部的正文//3.1 缓冲区中数据包含了当前请求的所有正文则取出所需数据。缓冲区中也有可能包含下一个协议的内容if (buf-ReadAbleSize() real_len){_request._body.append(buf-ReadPosition(), real_len);buf-MoveReadOffset(real_len);_recv_statu RECV_HTTP_OVER;return true;}//3.2 缓冲区中数据无法满足当前正文的需要数据不足取出数据然后等待新数据到来_request._body.append(buf-ReadPosition(), buf-ReadAbleSize());buf-MoveReadOffset(buf-ReadAbleSize());return true;}
public:HttpContext():_resp_statu(200), _recv_statu(RECV_HTTP_LINE){}void ReSet(){_resp_statu 200;_recv_statu RECV_HTTP_LINE;_request.ReSet();}//获取相应状态码int RespStatu(){return _resp_statu;}//获取接收状态HttpRecvStatu RecvStatu(){return _recv_statu;}//获取已经得到的请求信息HttpRequest Request(){return _request;}//接受并解析HTTP请求void RecvHttpRequest(Buffer* buf){//不同的状态做不同的事情但是这里不要break因为处理完请求后应该立即处理头部而不是退出等新数据switch(_recv_statu){case RECV_HTTP_LINE:RecvHttpLine(buf);case RECV_HTTP_HEAD:RecvHttpHead(buf);case RECV_HTTP_BODY:RecvHttpBody(buf);}}
};HttpServer模块设计思想 设计一张请求路由表 表中记录了针对哪个请求应该使用哪个函数来进行业务处理的映射关系 当服务器收到了一个请求就在请求路由表中查找有没有对应请求的处理函数如果有则执行对应的处理函数即可 这样做的好处用户只需要实现业务处理函数然后然后将请求与处理函数的映射关系添加到服务器中而服务器只需要接收数据解析数据查找路由表映射关系执行业务处理函数。说白了什么请求怎么处理有用户设定服务器收到了请求只需要执行函数即可。 要素 1。GET请求的路由映射表 2。POST请求的路由映射表 3。PUT请求的路由映射表 4。DELETE请求的路由映射表 5。高性能TCP服务器 6。静态资源相对根目录 服务器处理流程 1。从socket接收数据放到缓冲区 2。调用OnMessage回调函数进行业务处理 3。对请求进行解析得到一个HttpRequest结构包含了所有的请求要素 4。进行请求的路由查找 – 找到对应请求的处理方法处理方法又分为两种 a。静态资源请求 – 一些实体文件资源的请求htmlimage…。将静态资源文件读取出来填充到HttpResponse结构中 b。功能性请求 – 在请求路由映射表中查找处理函数找到了则执行函数并进行HttpResponse结果的数据填充。 5。对静态资源请求/功能性请求进行处理完毕后得到了一个HttpResponse对象组织Http格式响应进行发送。 接口 添加请求 - 处理函数映射信息GET/POST/PUT/DELETE 设置静态资源根目录 设置是否启动超时连接关闭 设置线程池中线程数量 启动服务器 OnConnected – 用于给TcpServer设置协议上下文 OnMessage – 用于进行缓冲区数据解析处理 请求静态资源请求/功能性请求的路由查找 组织响应进行回复 #define DEFAULT_TIMEOUT 10class HttpServer
{
private:using Handler std::functionvoid(const HttpRequest, HttpResponse*);using Handlers std::vectorstd::pairstd::regex, Handler;//有四个功能性请求的路由表Handlers _get_route;Handlers _post_route;Handlers _put_route;Handlers _delete_route;std::string _basedir; //静态资源根目录TcpServer _server;
private:void ErrorHandler(const HttpRequest req, HttpResponse* rsp){//1。组织一个错误展示页面std::string body;body html;body head;body meta http-equivContent-Type contenttext/html;charsetutf-8;body /head;body body;body h1;body std::to_string(rsp-_statu);body ;body Util::StatuDesc(rsp-_statu);body /h1;body /body;body /html;//2.将页面数据当做响应正文放入rsp中rsp-SetContent(body, text/html);}//将HttpResponse中的要素按照http协议格式进行组织发送void WriteResponse(const PtrConnection conn, const HttpRequest req, HttpResponse rsp){//1.先完善头部字段 -- 这几个是几乎必要的头部字段if(req.Close() true)rsp.SetHeader(Connection, close);else rsp.SetHeader(Connection, keep-alive);if (rsp._body.empty() false rsp.HasHeader(Content-Length) false)//如果没有设置长度则要设置长度rsp.SetHeader(Content-Length, std::to_string(rsp._body.size()));if (rsp._body.empty() false rsp.HasHeader(Content-Type) false)//如果没有设置Content-Type则要设置rsp.SetHeader(Content-Type, application/octet-stream);if (rsp._redirect_flag true)rsp.SetHeader(Location, rsp._redirect_url);//2。将rsp中的要素按照http协议格式进行组织std::stringstream rsp_str; //技巧快速的拼接字符串rsp_str req._version std::to_string(rsp._statu) Util::StatuDesc(rsp._statu) \r\n;for (auto head : rsp._headers){rsp_str head.first : head.second \r\n;}rsp_str \r\n;rsp_str rsp._body;//3。发送数据conn-Send(rsp_str.str().c_str(), rsp_str.str().size());}//是否是一个静态资源请求bool IsFileHandler(const HttpRequest req){//1.必须设置了静态资源根目录if (_basedir.empty())return false;//2.请求方法必须是GET/HEAD方法 -- 只有这两个才是获取实体资源的请求 POST/PUT/DELETE通常更多是功能性的请求if (req._method ! GET req._method ! HEAD)return false;//3.请求的资源路径必须是一个合法的路径if (Util::ValidPath(req._path) false)return false;//4.请求的资源必须存在且是一个普通文件std::string req_path _basedir req._path;if (req._path.back() /)req_path index.html;if (Util::IsRegular(req_path) false)return false;return true;}//静态资源的请求处理void FileHandler(const HttpRequest req, HttpResponse* rsp){std::string req_path _basedir req._path;if (req._path.back() /){req_path index.html;}bool ret Util::ReadFile(req_path, (rsp-_body));if (ret false)return;std::string mime Util::ExMime(req_path); //通过扩展名来获取它的mimersp-SetHeader(Content-Type, mime);return;}//功能性请求的分类处理void Dispatcher(HttpRequest req, HttpResponse* rsp, Handlers handlers){//在对应请求方法的路由表中查找对应是否含有对应资源请求的处理函数有则调用没有则返回404//思想路由表存储的是键值对正则表达式, 处理函数//使用正则表达式对请求的资源路径进行正则匹配匹配成功就使用对应函数进行处理//为什么用正则表达式而不用字符串因为查询路径可能有很多种情况我们不能全写进路由表for (auto handler : handlers)//将每一个正则表达式拿出来进行与查询路径进行匹配{const std::regex re handler.first;const Handler functor handler.second;bool ret std::regex_match(req._path, req._matches, re);if (ret false)continue;return functor(req, rsp); //传入请求信息和空的rsp执行处理函数}rsp-_statu 404;}void Route(HttpRequest req, HttpResponse* rsp){//对请求进行分析是一个静态资源请求还是一个功能性请求// 静态资源请求则进行静态资源的处理// 功能性请求则需要通过几个请求路由表来确定是否有处理函数// 既不是静态资源请求也没有设置对应的功能请求处理函数就返回405if (IsFileHandler(req) true){//是一个静态资源请求则进行静态资源请求的处理return FileHandler(req, rsp);}//走到这里则说明是功能性请求if (req._method GET || req._method HEAD)return Dispatcher(req, rsp, _get_route);else if (req._method POST)return Dispatcher(req, rsp, _post_route);else if (req._method PUT)return Dispatcher(req, rsp, _put_route);else if (req._method DELETE)return Dispatcher(req, rsp, _delete_route);rsp-_statu 405; //Method Not Allowed}//设置上下文void OnConnected(const PtrConnection conn){conn-SetContext(HttpContext());DBG_LOG(NEW CONNECTION %p, conn.get());}//缓冲区数据解析 处理void OnMessage(const PtrConnection conn, Buffer* buffer){while (buffer-ReadAbleSize() 0){//1。获取上下文HttpContext* context conn-GetContext()-getHttpContext();//2。通过上下文对缓冲区数据进行解析得到HttpRequest对象//a。如果缓冲区的数据解析出错就直接回复出错响应//b。如果解析正常且请求已经获取完毕才开始进行处理context-RecvHttpRequest(buffer);HttpRequest req context-Request();HttpResponse rsp(context-RespStatu());if (context-RespStatu() 400) //代表数据解析出错{//进行错误响应关闭连接ErrorHandler(req, rsp); //填充一个错误显示页面数据到rsp中WriteResponse(conn, req, rsp); //组织响应发送给客户端context-ReSet();buffer-MoveReadOffset(buffer-ReadAbleSize()); //出错了就把缓冲区数据清空conn-Shutdown(); //关闭连接return;}if (context-_recv_statu ! RECV_HTTP_OVER)//代表这不是一个完整的请求return;//3。路由请求 业务处理Route(req, rsp);//4。对HttpResponse进行组织发送WriteResponse(conn, req, rsp);//5。重置上下文context-ReSet();//6。根据长短连接判断是否关闭连接或者继续处理 -- 短连接接收处理一次服务器就把连接关闭 长连接一直与你通信if (rsp.Close() true)conn-Shutdown();}}
public:HttpServer(int port, int timeout DEFAULT_TIMEOUT):_server(port){_server.EnableInactiveRelease(timeout);_server.SetConnectedCallback(std::bind(HttpServer::OnConnected, this, std::placeholders::_1));_server.SetMessageCallback(std::bind(HttpServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));}void SetBaseDir(const std::string path){assert(Util::IsDirectory(path) true);_basedir path;}//设置/添加请求与处理函数的映射关系void Get(const std::string pattern, const Handler handler){_get_route.push_back(std::make_pair(std::regex(pattern), handler));}void Post(const std::string pattern, const Handler handler){_post_route.push_back(std::make_pair(std::regex(pattern), handler));}void Put(const std::string pattern, const Handler handler){_put_route.push_back(std::make_pair(std::regex(pattern), handler));}void Delete(const std::string pattern, const Handler handler){_delete_route.push_back(std::make_pair(std::regex(pattern), handler));}void SetThreadCount(int count){_server.SetThreadCount(count);}void Listen(){_server.Start();}
};基于HttpServer搭建HTTP服务器 目前为止HTTP服务器搭建的文章目录如下 index.html文件
!DOCTYPE html
html langen
headmeta charsetUTF-8titleDocument/title
/head
bodyform action /login methodpostinput typetext nameusernamebrinput typepassword namepassword placeholder请输入密码brinput typesubmit value 提交 namesubmit
/body
/htmlmain.cc文件
#include http.hpp#define WWWROOT ./wwwroot///业务进行回显
std::string RequestStr(const HttpRequest req)
{std::stringstream ss;ss req._method req._path req._version \r\n;for (auto it : req._params){ss it.first : it.second \r\n;}for (auto it : req._headers){ss it.first : it.second \r\n;}ss \r\n;ss req._body;return ss.str();
}
void Hello(const HttpRequest req, HttpResponse* rsp)
{rsp-SetContent(RequestStr(req), text/plain);
}
void Login(const HttpRequest req, HttpResponse* rsp)
{rsp-SetContent(RequestStr(req), text/plain);
}
void PutFile(const HttpRequest req, HttpResponse* rsp)
{std::string pathname WWWROOT req._path;Util::WriteFile(pathname, req._body);
}
void DelFile(const HttpRequest req, HttpResponse* rsp)
{rsp-SetContent(RequestStr(req), text/plain);
}int main()
{HttpServer server(8085);server.SetThreadCount(5);server.SetBaseDir(WWWROOT);server.Get(/hello, Hello);server.Post(/login, Login);server.Put(/1234.txt, PutFile);server.Delete(/1234.txt, DelFile);server.Listen();return 0;
}测试结果 GET的功能性请求 静态资源登录界面。点击登录POST的功能性请求。 用postman软件来测试PUT方法 测试DELETE方法我们DELETE的功能也设置的是回显 HTTP服务器长连接测试 接下来进行边界性的测试创建一个客户端持续的给服务器发送消息直到超过了超时时间看客户端是否还能正常通信。也就是看客户端能否持续和服务器通信。 #include ../source/server.hppint main()
{Socket cli_sock;cli_sock.CreateClient(8085, 127.0.0.1);std::string req GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n;while (1){assert(cli_sock.Send(req.c_str(), req.size()) ! -1);char buf[1024] {0};assert(cli_sock.Recv(buf, 1023));DBG_LOG([%s], buf);sleep(3);}cli_sock.Close();return 0;
}测试结果 可以看到超过了10s客户端仍然可以一直给服务器发消息 HTTP服务器超时连接测试 连接上服务器之后发送一次数据给服务器之后不发送看服务器是否会关闭连接 #include ../source/server.hppint main()
{Socket cli_sock;cli_sock.CreateClient(8085, 127.0.0.1);std::string req GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n;while (1){assert(cli_sock.Send(req.c_str(), req.size()) ! -1);char buf[1024] {0};assert(cli_sock.Recv(buf, 1023));DBG_LOG([%s], buf);sleep(15);}cli_sock.Close();return 0;
}测试结果 HTTP服务器错误请求测试 给服务器发送一个数据告诉服务器要发送1024字节的数据但是实际发送的数据不足1024查看服务器处理结果。 服务器正常预期的处理结果 1。数据只发送一次服务器得不到完整的请求就不会进行业务处理客户端也就得不到响应最终超时关闭连接 2。连着给服务器发送多次小的请求服务器会将后面的请求当做前边请求的正文进行处理而后边处理的时候有可能就会因为处理错误而关闭连接 #include ../source/server.hppint main()
{Socket cli_sock;cli_sock.CreateClient(8085, 127.0.0.1);std::string req GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 100\r\n\r\nbitejiuyeke;while (1){assert(cli_sock.Send(req.c_str(), req.size()) ! -1);assert(cli_sock.Send(req.c_str(), req.size()) ! -1);assert(cli_sock.Send(req.c_str(), req.size()) ! -1);assert(cli_sock.Send(req.c_str(), req.size()) ! -1);assert(cli_sock.Send(req.c_str(), req.size()) ! -1);char buf[1024] {0};assert(cli_sock.Recv(buf, 1023));DBG_LOG([%s], buf);sleep(3);}cli_sock.Close();return 0;
}测试结果 可以看到服务器仍然正常在运行不会因为客户端请求的问题而使服务器崩溃 HTTP服务器业务处理超时测试 当服务器达到了一个性能瓶颈在一次业务处理中花费了太长的时间超过了服务器设置的非活跃超时时间。在一次业务处理中耗费太长时间导致其他的连接也被连累超时。 这样就会产生一种错误情况处理完前一个描述符后花费了很长时间再处理下一个连接的时候这个连接可能已经超时释放了会导致程序崩溃。因此在本次事件处理中为了防止这种情况并不能直接对连接进行释放而应该将释放操作压入到任务池中等到事件处理完了执行任务池的任务的时候再去释放。 更新我们Connection模块中的释放接口
class Connection : public std::enable_shared_from_thisConnection
{
private:uint64_t _conn_id; //连接的唯一ID也是定时任务唯一ID便于连接的管理和查找int _sockfd; //连接关联的文件描述符bool _enable_inactive_release; //连接是否启动非活跃销毁的判断标志默认为falseEventLoop* _loop; //连接所关联的一个EventLoopConnStatu _statu; //连接状态Socket _socket; //套接字操作管理Channel _channel; //连接的事件管理Buffer _in_buffer; //输入缓冲区 -- 存放从socket中读取到的数据Buffer _out_buffer; //输出缓冲区 -- 存放要发送给对端的数据Any _context; //请求的接收处理上下文//以下这四个回调函数吗是让Server模块来设置的(服务器模块的处理回调是组件使用者设置的)using ConnectedCallback std::functionvoid(const PtrConnection);using MessageCallback std::functionvoid(const PtrConnection, Buffer*);using ClosedCallback std::functionvoid(const PtrConnection);using AnyEventCallback std::functionvoid(const PtrConnection);ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;//组件内的连接关闭回调 -- 组件内设置的因为服务器组件内会把所有的连接管理起来。//一旦某个连接要关闭就应该从管理的地方移除掉自己的信息。ClosedCallback _server_closed_callback;
private://五个channel的事件回调函数//描述符可读事件触发后调用的函数接收socket数据放到接收缓冲区中然后调用_message_callbackvoid HandleRead(){char buf[65536];ssize_t ret _socket.NonBlockRecv(buf, 65535);if (ret 0){//读出错了可能是客户端关闭不能直接关闭连接因为可能有数据没发送或者有数据还没处理return ShutdownInLoop();}else if (ret 0){//表示没有读取到数据并不是连接断开因为我们调用的是自己封装的NonBlockRecvreturn;}_in_buffer.WriteAndPush(buf, ret);//2.调用message_callback进行业务处理if (_in_buffer.ReadAbleSize() 0){//shared_from_this -- 从当前对象自身获取自身的shared_ptr管理对象_message_callback(shared_from_this(), _in_buffer);}}//描述符可写事件触发后调用的函数将发送缓冲区中的数据进行发送void HandleWrite(){ssize_t ret _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());if (ret 0){if (_in_buffer.ReadAbleSize() 0){_message_callback(shared_from_this(), _in_buffer);}return Release(); //实际的关闭释放操作}_out_buffer.MoveReadOffset(ret); //千万不要忘了将读偏移向后移动if (_out_buffer.ReadAbleSize() 0){_channel.DisableWrite(); //如果数据发送完了就关闭写事件监控//如果当前是连接待关闭状态并且数据发送完毕则可以将连接直接释放if (_statu DISCONNECTING){return Release();}}//发送数据可能发不完不关闭写事件监控return;}//描述符触发挂断事件void HandleClose(){if (_in_buffer.ReadAbleSize() 0){_message_callback(shared_from_this(), _in_buffer);}return Release();}//描述符触发出错事件void HandleError(){return HandleClose();}//描述符触发任意事件void HandleEvent(){//刷新连接活跃度if (_enable_inactive_release true){_loop-TimerRefresh(_conn_id);}//调用组件使用者的任意事件回调if (_event_callback)_event_callback(shared_from_this());}//连接获取之后所处的状态下要进行各种设置void EstablishedInLoop(){//修改连接状态assert(_statu CONNECTING);_statu CONNECTED;_channel.EnableRead();if (_connected_callback)_connected_callback(shared_from_this());}//这个接口才是实际的释放接口void ReleaseInLoop(){//修改连接状态将其置为DISCONNECTED_statu DISCONNECTED;//移除连接的事件监控_channel.Remove();//关闭描述符_socket.Close();//如果当前定时器队列中还有定时任务则取消任务if (_loop-HasTimer(_conn_id))CancelInactiveRelease();//调用关闭回调函数避免先移除服务器管理的连接信息导致Connection被释放因此先调用户的回调函数if (_closed_callback)_closed_callback(shared_from_this());if (_server_closed_callback)_server_closed_callback(shared_from_this());}//这个接口并不是实际的发送接口而只是把数据放到了发送缓冲区启动了可写事件监控//为什么要这么做因为可写条件可能不就绪即内核缓冲区的数据满了写不进去了void SendInLoop(Buffer buf){if (_statu DISCONNECTED) //如果状态已经关闭则直接return已经关闭则代表发送缓冲区数据为0return;_out_buffer.WriteBufferAndPush(buf);if (_channel.WriteAble() false)_channel.EnableWrite();}//这个关闭操作并非实际的连接释放操作需要判断还有没有数据待处理待发送void ShutdownInLoop(){_statu DISCONNECTING; //设置为半关闭状态if (_in_buffer.ReadAbleSize() 0)_message_callback(shared_from_this(), _in_buffer);if (_out_buffer.ReadAbleSize() 0)_channel.EnableWrite();//因为可能发送缓冲区将数据发送不完所以写关心就不用关闭了也不用真正释放了if (_out_buffer.ReadAbleSize() 0){_channel.DisableWrite();Release();}}//启动非活跃连接超时释放规则void EnableInactiveReleaseInLoop(int sec){//将判断标志 _enable_inactive_erlease置为true_enable_inactive_release true;//如果当前定时销毁任务已经存在那就刷新延迟一下即可if (_loop-HasTimer(_conn_id))return _loop-TimerRefresh(_conn_id);//如果不存在定时销毁任务则新增_loop-TimerAdd(_conn_id, sec, std::bind(Connection::Release, this));}//取消非活跃连接超时释放规则void CancelInactiveReleaseInLoop(){_enable_inactive_release false;if (_loop-HasTimer(_conn_id))_loop-TimerCancel(_conn_id);}//切换/升级协议void UpgradeInLoop(const Any context, const ConnectedCallback conn, const MessageCallback msg, const ClosedCallback closed, const AnyEventCallback event){_context context; //改变上下文_connected_callback conn;_message_callback msg;_closed_callback closed;_event_callback event;}
public:Connection(EventLoop* loop, uint64_t conn_id, int sockfd):_conn_id(conn_id),_sockfd(sockfd),_enable_inactive_release(false),_loop(loop),_statu(CONNECTING),_socket(sockfd),_channel(loop, _sockfd){_channel.SetReadCallback(std::bind(Connection::HandleRead, this));_channel.SetWriteCallback(std::bind(Connection::HandleWrite, this));_channel.SetCloseCallback(std::bind(Connection::HandleClose, this));_channel.SetErrorCallback(std::bind(Connection::HandleError, this));_channel.SetEventCallback(std::bind(Connection::HandleEvent, this));}~Connection(){DBG_LOG(RELEASE CONNEDCTION: %p, this);}//获取管理的文件描述符int Fd(){return _sockfd;}//获取连接IDint Id(){return _conn_id;}//是否处于CONNECTED状态bool Connected(){return (_statu CONNECTED);}//设置上下文 -- 连接建立完成时进行调用void SetContext(const Any context){_context context;}//获取上下文返回的是指针Any* GetContext(){return _context;}void SetConnectedCallback(const ConnectedCallback cb){_connected_callback cb;}void SetMessageCallback(const MessageCallback cb){_message_callback cb;}void SetClosedCallback(const ClosedCallback cb){_closed_callback cb;}void SetSvrClosedCallback(const ClosedCallback cb){_server_closed_callback cb;}void SetAnyEventCallback(const AnyEventCallback cb){_event_callback cb;}//连接建立就绪后进行channel回调设置启动读监控调用_connected_callbackvoid Establised(){_loop-RunInLoop(std::bind(Connection::EstablishedInLoop, this));}//发送数据将数据放到发送缓冲区启动写事件监控void Send(const char* data, size_t len){Buffer buf; //为什么要重新创建一个临时变量因为data可能是一个可能被释放的空间我们将其压入任务队列等待被执行的过程中空间可能被释放了buf.WriteAndPush(data, len);_loop-RunInLoop(std::bind(Connection::SendInLoop, this, std::move(buf)));}//提供给组件使用者的关闭接口 -- 并不实际关闭需要判断有没有数据待处理void Shutdown(){_loop-RunInLoop(std::bind(Connection::ShutdownInLoop, this));}void Release(){_loop-QueueInLoop(std::bind(Connection::ReleaseInLoop, this));}//启动非活跃销毁并定义多长时间无通信就是非活跃添加定时任务void EnableInactiveRelease(int sec){_loop-RunInLoop(std::bind(Connection::EnableInactiveReleaseInLoop, this, sec));}//取消非活跃销毁void CancelInactiveRelease(){_loop-RunInLoop(std::bind(Connection::CancelInactiveReleaseInLoop, this));}//切换协议 -- 重置上下文以及阶段性处理函数void Upgrade(const Any context, const ConnectedCallback conn, const MessageCallback msg, const ClosedCallback closed, const AnyEventCallback event){_loop-RunInLoop(std::bind(Connection::UpgradeInLoop, this, context, conn, msg, closed, event));}
};有可能因为其他描述符的事件处理花费事件比较长然后在处理定时器描述符事件的时候已经超时了很多次了而我们的服务器就也应该处理多少次 更新TImerWheel模块中的ReadTimefd函数和OnTime函数
int ReadTimefd()
{uint64_t times;int ret read(_timerfd, times, 8);if (ret 0){ERR_LOG(READ TIMEFD FAILED);abort();}return times;
}
void OnTime()
{int times ReadTimefd();for (int i 0; i times; i)RunTimerTask();
}将业务处理时间延长至15s
void Hello(const HttpRequest req, HttpResponse* rsp)
{rsp-SetContent(RequestStr(req), text/plain);sleep(15);
}运行结果 HTTP服务器同时多条请求测试 一次性给服务器发送多条数据然后查看服务器的处理结果。预期结果每一条请求都应该得到正常处理 测试
#include ../source/server.hppint main()
{signal(SIGCHLD, SIG_IGN);for (int i 0; i 10; i){pid_t pid fork();if (pid 0){DBG_LOG(FORK ERROR);return -1;}else if (pid 0){Socket cli_sock;cli_sock.CreateClient(8085, 127.0.0.1);std::string req GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n;assert(cli_sock.Send(req.c_str(), req.size()) ! -1);char buf[1024] {0};assert(cli_sock.Recv(buf, 1023));DBG_LOG([%s], buf);while (1) sleep(1);}}return 0;
}测试结果 HTTP服务器大文件传输测试 给服务器上传一个大文件服务器将文件保存下来观察处理结果。预期上传的文件和服务器保存的文件一致 dd if/dev/zero of./hello.txt bs1G count1创建一个内容全为0的1G的文件 测试 #include ../source/server.hpp
#include ../source/Http/http.hppint main()
{Socket cli_sock;cli_sock.CreateClient(8085, 60.205.245.92);std::string req PUT /1234.txt HTTP/1.1\r\nConnection: keep-alive\r\n;std::string body;Util::ReadFile(./hello.txt, body);req Content-Length: std::to_string(body.size()) \r\n\r\n;assert(cli_sock.Send(req.c_str(), req.size()) ! -1);assert(cli_sock.Send(body.c_str(), body.size()) ! -1);char buf[1024] {0};assert(cli_sock.Recv(buf, 1023));DBG_LOG([%s], buf);sleep(3);cli_sock.Close();return 0;
}测试结果 可以看到他们的md5值是一样的说明传输大文件也没问题。 HTTP服务器性能压力测试说明 并发量可以同时多少客户端的请求而不会出现连接失败 QPS每秒钟处理数据包的数量 借助Webbench工具。原理创建大量的进程在进程中创建客户端连接服务器发送请求收到响应后关闭连接开始下一个连接的建立。 抛开环境说性能测试都是无知的 别人会问你是怎么测的。你说用Webbench测的。别人那是怎么测的呢 测试环境 如果你用的是云服务器测试那你就说你云服务器的配置。 比如云服务是2核2G带宽1M的云服务器。客户端环境是xxx环境。使用Webbench以10000并发量向服务器发送请求进行了24小时测试最终得到的结果是xxx 现在我用我的wsl2Windows的子系统Linux不考虑带宽来测试 可以看到普通家用的电脑配置也可以带动上万的并发量。 每分钟可以处理包的数量为437152 每秒钟可以处理的字节数1074256