免费空间域名可以做淘宝客网站推广吗,杭州网站优化公司哪家好,紫竹桥网站建设,手机网站建设推广系列文章目录
异步I/O操作函数aio_xxx函数 https://blog.csdn.net/surfaceyan/article/details/134710393 文章目录 系列文章目录前言一、5种IO模型二、IO多路复用APIselectpollepoll 三、两种高效的事件处理模式Reactor模式Proactor模式模拟 Proactor 模式基于事件驱动的非阻…系列文章目录
异步I/O操作函数aio_xxx函数 https://blog.csdn.net/surfaceyan/article/details/134710393 文章目录 系列文章目录前言一、5种IO模型二、IO多路复用APIselectpollepoll 三、两种高效的事件处理模式Reactor模式Proactor模式模拟 Proactor 模式基于事件驱动的非阻塞同步IO辅助函数 四、多种线程池的实现方式基本的modern C references 前言 一、5种IO模型
阻塞IO非阻塞IOIO复用信号驱动 Linux用套接字进行信号驱动IO安装一个信号处理函数进程继续运行并不阻塞当IO事件就绪进程收到SIGIO信号然后处理IO事件异步 https://blog.csdn.net/surfaceyan/article/details/134710393
二、IO多路复用API
include
#include stdio.h
#include stdlib.h
#include sys/types.h
#include sys/select.h
#include poll.h
#include sys/epoll.hselect
selelct 能够监控的最大文件描述符数量必须小于FD_SETSIZEpoll和epoll没有文件描述符数量限制 select返回后所有的参数都看成未定义的需要重填 int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);- nfds: 监听集合中最大的文件描述符1数组索引1- readfds: 监听待读取的文件描述符集合- writefds: 监听待写入的文件描述符集合- exceptfds 监听“exceptional conditions”see POLLPRI in poll(2)
当函数返回后上面的集合都会被清零除了集合中满足条件的
- timeout: 超时时间 为NULL, select阻塞时间为0则函数立即返回函数会在以下情况时返回- 一个文件描述符处于就绪状态- 调用被信号句柄中断- 时间到期select调用后可将timeout看成未定义的timeout剩余时间有些系统可能不会这样做
select调用后返回 r w e集合总共被置位的个数0代表到期
-1代表错误可能原因: badfd, signal int, nfds 0 RLIMIT_NOFILE, timeout invalid, 内存不足导致无法分配内部表格int pselect(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, const struct timespec *timeout,const sigset_t *sigmask);
pselect允许捕获信号
select可能更新timeout为剩余时间pselect不会改变这个参数
int main()
{fd_set rfds;struct timeval tv;int retval;FD_ZERO(rfds);int fd 0;FD_SET(fd, rfds);tv.tv_sec 5;tv.tv_usec 0;retval select(fd1, rfds, NULL, NULL, tv);if (retval 0)perror(select());else if (retval) {printf(data is %d.\n, FD_ISSET(fd, rfds);char buf[BUFSIZ] {0};int n read(fd, buf, BUFSIZ);printf(n %d: %s\n, n, buf);} else {printf(timeout\n);}
}poll
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
- fds: 待监控的fd集合struct pollfd {int fd; // 如果 0 则events会被忽略revents返回0short events; // requested events 输入参数short revents; // returned events 输出参数};
- nfds: fds数组大小
- timeout: 阻塞的毫秒数-1代表阻塞0代表不阻塞
epoll会一直阻塞直到- 一个文件描述符准备就绪- 调用被信号中断- 时间到期POLLIN 就绪读
POLLPRI 异常条件
POLLOUT: 可写的
POLLRDHUP socket对端关闭了连接
POLLERR必有
POLLHUP: 必有对端关闭链接后再read返回0(EOF)(仅当所有残留的数据都被读取时)
POLLNVAL必有无效请求(fd没有open)成功返回非负值指明pollfds中有几个revents为非零0代表时间到期
-1 on errorEFAULT fds指针错误EINTR 被信号中断EINVAL The nfds value exceeds the RLIMIT_NOFILE value.EINVAL (ppoll()) The timeout value expressed in *ip is invalid (negative).ENOMEM 不能为内核数据结构分配内存int ppoll(struct pollfd *fds, nfds_t nfds,const struct timespec *tmo_p, const sigset_t *sigmask);
允许应用程序安全地等待直到文件描述符准备就绪或捕获到信号。
#define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); \} while (0)int main(int argc, char *argv[])
{int nfds, num_open_fds;struct pollfd *pfds;if (argc 2) {fprintf(stderr, Usage: %s file...\n, argv[0]);exit(EXIT_FAILURE);}num_open_fds nfds argc - 1;pfds (pollfd*)calloc(nfds, sizeof(struct pollfd));if (pfds NULL)errExit(malloc);/* Open each file on command line, and add it pfds array */for (int j 0; j nfds; j) {pfds[j].fd open(argv[j 1], O_RDONLY);if (pfds[j].fd -1)errExit(open);printf(Opened \%s\ on fd %d\n, argv[j 1], pfds[j].fd);pfds[j].events POLLIN;}/* Keep calling poll() as long as at least one file descriptor isopen */while (num_open_fds 0) {int ready;printf(About to poll()\n);ready poll(pfds, nfds, -1);if (ready -1)errExit(poll);printf(Ready: %d\n, ready);/* Deal with array returned by poll() */for (int j 0; j nfds; j) {char buf[10];if (pfds[j].revents ! 0) {printf( fd%d; events: %s%s%s\n, pfds[j].fd,(pfds[j].revents POLLIN) ? POLLIN : ,(pfds[j].revents POLLHUP) ? POLLHUP : ,(pfds[j].revents POLLERR) ? POLLERR : );if (pfds[j].revents POLLIN) {ssize_t s read(pfds[j].fd, buf, sizeof(buf));if (s -1)errExit(read);printf( read %zd bytes: %.*s\n,s, (int) s, buf);} else { /* POLLERR | POLLHUP */printf( closing fd %d\n, pfds[j].fd);if (close(pfds[j].fd) -1)errExit(close);num_open_fds--;}}}}printf(All file descriptors closed; bye\n);exit(EXIT_SUCCESS);
}epoll
epoll_create(2), epoll_create1(2), epoll_ctl(2), epoll_wait(2)
int epoll_create1(int flags);
创建一个epoll实例
- flags 为0 等价于epoll_createEPOLL_CLOEXECtypedef union epoll_data {void *ptr;int fd;uint32_t u32;uint64_t u64;} epoll_data_t;struct epoll_event {uint32_t events; /* Epoll events */epoll_data_t data; /* User data variable */};
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
int epoll_pwait(int epfd, struct epoll_event *events,int maxevents, int timeout,const sigset_t *sigmask);
;
返回就绪的问题件描述符个数
0代表时间到期
-1代表 EINTR被信号中断int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);- op: - EPOLL_CTL_ADD: - EPOLL_CTL_MOD- EPOLL_CTL_DELevents为带监听的事件
- EPOLLIN :
- EPOLLOUT :
- EPOLLRDHUP :对端关闭连接√
- EPOLLPRI :异常条件√
- EPOLLERR :发生错误(默认必监听)√
- EPOLLHUP : 类似对端关闭链接(默认必监听)√
- EPOLLET :
- EPOLLONESHOT : 只会触发一次
- EPOLLWAKEUP :
- EPOLLEXCLUSIVE: (默认必监听)√#define MAX_EVENTS 10
struct epoll_event ev, events[MAX_EVENTS];
int listen_sock, conn_sock, nfds, epollfd;
epollfd epoll_create1(EPOLL_CLOEXEC);
ev.events EPOLLIN;
ev.data.fd listen_sock;
epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, ev);
for (;;)
{nfds epoll_wait(epollfd, events, MAX_EVENTS, -1);if (nfds -1) {error;}for (int i 0; i nfds; n) {if (events[n].data.fd listen_sock) {con_sock accept(listen_sock, NULL, NULL);setnonblocking(conn_sock);ev.data.fd conn_sock;ev.events EPOLLIN | EPOLLET;epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock, ev);} else {do_use_fd(events[i].data.fd);}}
}三、两种高效的事件处理模式
服务器程序通常需要处理三类事件I/O 事件、信号及定时事件。有两种高效的事件处理模式Reactor 和 Proactor同步 I/O 模型通常用于实现 Reactor 模式异步 I/O 模型通常用于实现 Proactor 模式。
Reactor模式
要求主线程I/O处理单元只负责监听文件描述符上是否有事件发生有的话就立即将该事件通知工作 线程逻辑单元将 socket 可读可写事件放入请求队列交给工作线程处理。除此之外主线程不做 任何其他实质性的工作。读写数据接受新的连接以及处理客户请求均在工作线程中完成。
使用同步 I/O以 epoll_wait 为例实现的 Reactor 模式的工作流程是
主线程往 epoll 内核事件表中注册 socket 上的读就绪事件。主线程调用 epoll_wait 等待 socket 上有数据可读。当 socket 上有数据可读时 epoll_wait 通知主线程。主线程则将 socket 可读事件放入请求队列。.睡眠在请求队列上的某个工作线程被唤醒它从 socket 读取数据并处理客户请求然后往 epoll 内核事件表中注册该 socket 上的写就绪事件。.当主线程调用 epoll_wait 等待 socket 可写。当 socket 可写时epoll_wait 通知主线程。主线程将 socket 可写事件放入请求队列。睡眠在请求队列上的某个工作线程被唤醒它往 socket 上写入服务器处理客户请求的结果。 Proactor模式
Proactor 模式将所有 I/O 操作都交给主线程和内核来处理进行读、写工作线程仅仅负责业务逻 辑。使用异步 I/O 模型以 aio_read 和 aio_write 为例实现的 Proactor 模式的工作流程是
主线程调用 aio_read 函数向内核注册 socket 上的读完成事件并告诉内核用户读缓冲区的位置以及读操作完成时如何通知应用程序这里以信号为例。主线程继续处理其他逻辑。当 socket 上的数据被读入用户缓冲区后内核将向应用程序发送一个信号以通知应用程序数据已经可用。应用程序预先定义好的信号处理函数选择一个工作线程来处理客户请求。工作线程处理完客户请求后调用 aio_write 函数向内核注册 socket 上的写完成事件并告诉内核用户写缓冲区的位置以及写操作完成时如何通知应用程序。主线程继续处理其他逻辑。当用户缓冲区的数据被写入 socket 之后内核将向应用程序发送一个信号以通知应用程序数据已经发送完毕。应用程序预先定义好的信号处理函数选择一个工作线程来做善后处理比如决定是否关闭 socket。 模拟 Proactor 模式
使用同步 I/O 方式模拟出 Proactor 模式。原理是主线程执行数据读写操作读写完成之后主线程向工作线程通知这一”完成事件“。那么从工作线程的角度来看它们就直接获得了数据读写的结果接下来要做的只是对读写的结果进行逻辑处理。 使用同步 I/O 模型以 epoll_wait为例模拟出的 Proactor 模式的工作流程如下
. 主线程往 epoll 内核事件表中注册 socket 上的读就绪事件。主线程调用 epoll_wait 等待 socket 上有数据可读。当 socket 上有数据可读时epoll_wait 通知主线程。主线程从 socket 循环读取数据直到没有更多数据可读然后将读取到的数据封装成一个请求对象并插入请求队列。睡眠在请求队列上的某个工作线程被唤醒它获得请求对象并处理客户请求然后往 epoll 内核事件表中注册 socket 上的写就绪事件。主线程调用 epoll_wait 等待 socket 可写。. 主线程调用 epoll_wait 等待 socket 可写。当 socket 可写时epoll_wait 通知主线程。主线程往 socket 上写入服务器处理客户请求的结果。 基于事件驱动的非阻塞同步IO
int main(int argc, char* argv[])
{Client* clients new Client[MAXIMUM_FD];int listen_fd socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, 0);struct sockaddr_in address;address.sin_addr.s_addr INADDR_ANY;address.sin_family AF_INET;address.sin_port htons( std::atoi(argv[1]) );int reuse 1;int ret setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, reuse, sizeof(reuse));assert(ret 0);ret bind(listen_fd, (struct sockaddr*)address, sizeof(address));assert(ret 0);ret listen(listen_fd, 5);assert(ret 0);epoll_event events[MAX_EVENTS];int epoll_fd epoll_create1(EPOLL_CLOEXEC);assert(epoll_fd 0);addfd2epoll(epoll_fd, listen_fd, false);Client::listen_fd listen_fd;Client::epoll_fd epoll_fd;while (1) {int number epoll_pwait(epoll_fd, events, MAX_EVENTS, -1, nullptr);if (number 0) {if (errno EINTR) {fprintf(stderr, interrupted by a sig\n);break;} else {perror(epoll_wait);break;}} else if (number 0){continue; // timeout}for (int i 0; i number; i) {int sockfd events[i].data.fd;if (sockfd listen_fd) {int client_fd accept4(listen_fd, NULL, NULL, SOCK_NONBLOCK | SOCK_CLOEXEC);if (client_fd 0) continue;if (client_fd MAXIMUM_FD) {fprintf(stderr, clients out of limits\n);close(client_fd);continue;}clients[client_fd].init(client_fd, NULL);} else if (events[i].events (EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)) {clients[sockfd].close_connection(); // error} else if (events[i].events EPOLLIN) {Client* client clients sockfd;if ( client-do_read() pool-push_back(client)) {continue;}client-close_connection();} else if (events[i].events EPOLLOUT) {Client* client clients sockfd;if (client-do_write() false) {client-close_connection();}}}}close(epoll_fd);close(listen_fd);delete[] clients;delete pool;return 0;
}bool Client::do_read()
{if (recv_idx_ recv_buf_len_) return false;while (true) {int n_read recv(client_fd_, recv_buf_recv_idx_, grecv_buf_len-recv_idx_, 0);if (n_read 0) {if (errno EAGAIN || errno EWOULDBLOCK) break; // read donereturn false;} else if (n_read 0) { // closed by peerreturn false;}recv_idx_ n_read;}return true;
}bool Client::do_write()
{if (send_num_ send_idx_) {init();modfd2epoll(epoll_fd, client_fd_, EPOLLIN);return true;}while (1) {int n_send send(client_fd_, send_buf_send_num_, send_idx_-send_num_, 0);if (n_send 0) {if (errno EAGAIN || errno EWOULDBLOCK) {modfd2epoll(epoll_fd, client_fd_, EPOLLOUT);return true;}return false;} else if (n_send 0) { // 对端已经关闭然后继续写会收到 SIGPIPEfprintf(stderr, client: %d\n n_send 0\n, client_fd_);return false;}send_num_ n_send;if (send_num_ send_idx_) {init();modfd2epoll(epoll_fd, client_fd_, EPOLLIN);return true;}}
}辅助函数
int setnonblocking(int fd)
{int old_opt fcntl(fd, F_GETFL);int new_opt old_opt | O_NONBLOCK;fcntl(fd, F_SETFL, new_opt);return old_opt;
}
int setblocking(int fd)
{int old_opt fcntl(fd, F_GETFL);int new_opt old_opt (~O_NONBLOCK);fcntl(fd, F_SETFL, new_opt);return old_opt;
}
int setcloexec(int fd)
{int old_opt fcntl(fd, F_GETFD);int new_opt old_opt | FD_CLOEXEC;fcntl(fd, F_SETFD, new_opt);return old_opt;
}
void addfd2epoll(int epoll, int fd, bool one_shot)
{epoll_event event;event.data.fd fd;event.events EPOLLIN | EPOLLRDHUP | EPOLLPRI;if (one_shot){event.events | EPOLLONESHOT;}epoll_ctl(epoll, EPOLL_CTL_ADD, fd, event);
}
void rmfromepoll(int epoll, int fd)
{epoll_ctl(epoll, EPOLL_CTL_DEL, fd, nullptr);
}
void modfd2epoll(int epoll, int fd, int ev)
{epoll_event event;event.data.fd fd;event.events ev | EPOLLET | EPOLLONESHOT | EPOLLRDHUP | EPOLLPRI;epoll_ctl(epoll, EPOLL_CTL_MOD, fd, event);
}四、多种线程池的实现方式
基本的
#include pthread.h
#include semaphore.h
#include list
#include exception
class ThpoolException : public std::exception
{
public:const char* what() const _GLIBCXX_TXN_SAFE_DYN _GLIBCXX_NOTHROW override{ return thread pool init failed\n; }
};
class LockGuard // RAII
{
private:pthread_mutex_t mtx_;
public:LockGuard(pthread_mutex_t mtx) : mtx_(mtx){ pthread_mutex_lock(mtx_); }~LockGuard(){ pthread_mutex_unlock(mtx_); }
};templatetypename T
class thread_pool
{
public:static bool running;
private:int thread_num;std::listT* queue;int max_q_len;pthread_mutex_t q_mtx;sem_t q_sem;
public:thread_pool(int thread_num_, int max_len) : thread_num(thread_num_), max_q_len(max_len){int ret 0;ret pthread_mutex_init(q_mtx, nullptr);if (ret ! 0) throw ThpoolException();ret sem_init(q_sem, 0, 0);if (ret ! 0) throw ThpoolException();for (int i0; i thread_num; i) {pthread_t thid;ret pthread_create(thid, nullptr, thread_pool::on_process, this);if (ret ! 0) throw ThpoolException();ret pthread_detach(thid);if (ret ! 0) throw ThpoolException();}}~thread_pool(){pthread_mutex_destroy(q_mtx);sem_destroy(q_sem);running false;}bool push_back(T* client){bool success;{ LockGuard lg(q_mtx);if (queue.size() max_q_len) {queue.push_back(client);success true;sem_post(q_sem);}else success false;}return success;}static void* on_process(void* arg){ ((thread_pool*)arg)-do_process(); return nullptr; }void do_process(){while (running){T* client nullptr;sem_wait(q_sem);{LockGuard lg(q_mtx);if (queue.empty() false){client queue.front();queue.pop_front();}}if (client) client-do_process();}}
};
templatetypename T
bool thread_poolT::running true;
modern C
class TaskQueue {
public:TaskQueue() default;virtual ~TaskQueue() default;virtual bool enqueue(std::functionvoid() fn) 0;virtual void shutdown() 0;virtual void on_idle() {}
};
class ThreadPool final : public TaskQueue {
public:explicit ThreadPool(size_t n, size_t mqr 0): shutdown_(false), max_queued_requests_(mqr) {while (n) {threads_.emplace_back(worker(*this));n--;}}ThreadPool(const ThreadPool ) delete;~ThreadPool() override default;bool enqueue(std::functionvoid() fn) override {{std::unique_lockstd::mutex lock(mutex_);if (max_queued_requests_ 0 jobs_.size() max_queued_requests_) {return false;}jobs_.push_back(std::move(fn));}cond_.notify_one();return true;}void shutdown() override {// Stop all worker threads...{std::unique_lockstd::mutex lock(mutex_);shutdown_ true;}cond_.notify_all();// Join...for (auto t : threads_) {t.join();}}private:struct worker {explicit worker(ThreadPool pool) : pool_(pool) {}void operator()() {for (;;) {std::functionvoid() fn;{std::unique_lockstd::mutex lock(pool_.mutex_);pool_.cond_.wait(lock, [] { return !pool_.jobs_.empty() || pool_.shutdown_; });if (pool_.shutdown_ pool_.jobs_.empty()) { break; }fn pool_.jobs_.front();pool_.jobs_.pop_front();}assert(true static_castbool(fn));fn();}}ThreadPool pool_;};friend struct worker;std::vectorstd::thread threads_;std::liststd::functionvoid() jobs_;bool shutdown_;size_t max_queued_requests_ 0;std::condition_variable cond_;std::mutex mutex_;
};references IO多路复用 https://www.cnblogs.com/flashsun/p/14591563.html socket 网络编程——端口复用技术 https://blog.csdn.net/JMW1407/article/details/107321853