企业型商务网站制作做法,网页设计素材加代码,长春seo公司排名,学历提升报名网目录
1、前言
2、多线程理解
2.1 线程
2.2 通俗了解进程和线程
2.2.1 进程是资源分配的基本单位
2.2.2 Linux中的线程是一种轻量化进程
2.3 进程和线程详解
2.3.1 创建一个线程 (pthread_create)
2.3.2 线程自己的一部分数据
2.3.3 线程组
2.3.4 关于进程的其他操作…目录
1、前言
2、多线程理解
2.1 线程
2.2 通俗了解进程和线程
2.2.1 进程是资源分配的基本单位
2.2.2 Linux中的线程是一种轻量化进程
2.3 进程和线程详解
2.3.1 创建一个线程 (pthread_create)
2.3.2 线程自己的一部分数据
2.3.3 线程组
2.3.4 关于进程的其他操作
2.4 Linux线程互斥
2.4.1 互斥量mutex
2.4.2 互斥量的接口
2.5 可重入和线程安全
2.5.1 线程安全
2.5.2 重入
2.6 死锁
2.6.1 死锁四个必要条件
2.6.2 避免死锁
2.7 线程同步
2.7.1 条件变量
2.7.2 同步概念与竞态条件
2.7.3 条件变量函数
2.8 POSIX信号量
3、实现简易的线程池生产者消费着模型
生产者消费者模型
线程池:
1使用条件变量和阻塞队列实现线程池
2使用信号量和循环队列实现线程池
4、结语 1、前言 今天呢我们来深度理解一下什么时多线程还有他的相关操作可是为什么叫Linux中的多线程剖析呢因为多线程在Linux和Windows系统底层中的实现并不一样相比之下Windows的实现更加的复杂而Linux中的简单一些有利于理解好了下面我们进入正片。注如果本文中有什么错误请在评论区指出或者私信作者谢谢。
2、多线程理解
2.1 线程 要理解多线程那我们就先得知道什么叫做线程通俗的讲呢就是一个程序的执行路线就是一个线程我们写的程序大多都是只有一条执行路线的从调用main函数一直结束都只有一个路线这就是一个线程。 可是我们之前接触过一个概念叫做进程那么线程和进程有什么区别又有什么联系呢
2.2 通俗了解进程和线程 首先呢线程是运行在进程中的 进程是资源分配的基本单位而进程是CPU调度的基本单位。 接下来我们详细理解
2.2.1 进程是资源分配的基本单位 在我之前的文章中讲过在Linux中呢进程在内存中是由PCB进行管理的而具体实现就是task_struct 结构体这个结构体中保存着对应进程的相关信息比如地址空间文件描述相关详细可以参照我之前的文章 如下如Linux中的进程包括PCB(task_struct)地址空间页表以及页表映射在内存中的数据这些东西组成了一个进程当然下面这张图只是一个单线程进程 2.2.2 Linux中的线程是一种轻量化进程 那么在Linux中是怎么对进程进行实现的呢 如上图所示Linux中在设计线程的时候采用了和进程管理一样的模块描述符一个task_struct就是一个线程但是这些task_struct并没有指向自己独有的地址空间和资源这样的进程结构我们称作轻量化进程LWP也就是Linux中对线程的实现 这种实现让CPU不能分辨自己处理的是进程还是线程但是这就是它设计的精妙之处。 对于轻量化进程来说他们都指向同一个地址空间所以他们可以很轻松的共一些数据和资源但是对于Windows来说他们为进程单独设计了一种结构那样子的话工程量就非常大了具体可以自己了解。
2.3 进程和线程详解 通过上面的通俗了解我们就对进程和线程有了一定的了解和认识那么我们详细看看
2.3.1 创建一个线程 (pthread_create) Linux对外提供了一个用户级别的库不是系统调用接口这个库当中就提供了一些对线程操作的函数
#include pthread.h
int pthread_create(pthread_t *thread, //返回线程IDconst pthread_attr_t *attr, //设置线程的属性attr为NULL表示使用默认属性void *(*start_routine) (void*), //是个函数地址线程启动后要执行的函数void *arg //传给线程启动函数的参数);
//返回值成功返回0错误返回错误码
int pthread_join(pthread_t thread, void **value_ptr); 我们可以使用这个函数来创建一个线程但是等线程执行万指定的函数之后我们需要对线程资源进行释放 要注意的是这里我们使用函数创建的线程是用户态的线程对应底层的维护是使用轻量级进程来维护的
#include iostream
#include pthread.hvoid* fun1(void* arg){std::cout (char*)arg std::endl;return nullptr;
}
void* fun2(void* arg){std::cout (char*)arg std::endl;return nullptr;
}int main() {pthread_t p1,p2;//创建并启动线程pthread_create(p1,nullptr,fun1,(void*)我是线程p1);pthread_create(p2,nullptr,fun2,(void*)我是线程p2);//阻塞等待线程结束并对其进行资源释放pthread_join(p1, nullptr);pthread_join(p2, nullptr);return 0;
} 在运行的时候要注意要指定pthread库的库名不然会链接错误
makefile:
.PHONY:all
all:thread01thread01:thread01.cppg -o $ $^ -stdc11 -lpthread.PHONY:clean
clean:rm -f thread01
运行结果 2.3.2 线程自己的一部分数据 大家肯定注意到了一个点那就是pthread_create中有一个输出型参数他返回的是什么呢我们打印 出来看一下
std::cout p1: p1 std::endl;
std::cout p2: p2 std::endl;
//运行结果
//p1:139891316045568
//p2:139891307652864 这么大长串的东西叫做他的线程ID但是进程ID那么一点点这个东西怎么会那么大大家不要将这两个东西搞混了这里所说的线程ID并不是我们在监视窗口上看到的那个ID在监视窗口上那个是线程的tid这里的进程ID是他在编码层面上的ID我们可以详细理解一下
监视窗口上的进程IDLWP 编码层面上的线程ID到底是个什么鬼这里我们要知道一个东西就是线程是CPU调度的最小粒子就是他是要在单独运行的那我们知道一个程序都是在栈里运行的但是现在很多个线程都公用一个地址空间那栈也是公用的吗并不是那样就太复杂了在弹栈压栈的时候并没有那么多的寄存器可以供我们维护这个东西 所以呢在地址空间中有一块叫做共享区的地方这块地方上面是栈下边是堆他们两个相向而生所以说这块空间非常大所以呢在设计之初就使用这块地方来为本进程创建的线程开辟栈区和一些线程私有的资源区 如上图所示这就是创建的线程的资源分配我们使用 pthread_create 的时候返回给我们的线程ID其实就是对应的线程资源在地址空间中的地址属于NPTL线程库的范畴。线程库的后续操作就是根据该线程ID来操作线程的。线程库NPTL提供了pthread_ self函数可以获得线程自身的ID
pthread_t pthread_self(void); 线程自己的东西除了栈还有线程ID一组寄存器errno信号屏蔽字调度优先级
2.3.3 线程组 没有线程之前一个进程对应内核里的一个进程描述符对应一个进程ID。但是引入线程概念之后情况发生了变化一个用户进程下管辖N个用户态线程每个线程作为一个独立的调度实体在内核态都有自己的进程描述符进程和内核的描述符一下子就变成了1N关系POSIX标准又要求进程内的所有线程调用getpid函数时返回相同的进程ID如何解决上述问题呢 Linux内核引入了线程组的概念。 多线程的进程又被称为线程组线程组内的每一个线程在内核之中都存在一个进程描述符task_struct与之对应。进程描述符结构体中的pid表面上看对应的是进程ID其实不然它对应的是线程ID;进程描述符中的tgid含义是Thread Group ID,该值对应的是用户层面的进程ID 不同于pthread_t类型的线程ID和进程ID一样线程ID是pid_t类型的变量而且是用来唯一标识线程的一个整型变量。
2.3.4 关于进程的其他操作
线程终止
void pthread_exit(void *value_ptr);
//参数
//value_ptr:value_ptr不要指向一个局部变量。
//返回值无返回值跟进程一样线程结束的时候无法返回到它的调用者自身 需要注意,pthread_exit或者return返回的指针所指向的内存单元必须是全局的或者是用malloc分配的,不能在线程函数的栈上分配,因为当其它线程得到这个返回指针时线程函数已经退出了。
取消一个执行中的线程
int pthread_cancel(pthread_t thread);
//参数
//thread:线程ID
//返回值成功返回0失败返回错误码
等待线程结束
int pthread_join(pthread_t thread, void **value_ptr);
//参数
//thread:线程ID
//value_ptr:它指向一个指针后者指向线程的返回值
//返回值成功返回0失败返回错误码 调用该函数的线程将挂起等待,直到id为thread的线程终止。thread线程以不同的方法终止,通过pthread_join得到的终止状态是不同的总结如下: 1. 如果thread线程通过return返回,value_ ptr所指向的单元里存放的是thread线程函数的返回值。 2. 如果thread线程被别的线程调用pthread_ cancel异常终掉,value_ ptr所指向的单元里存放的是常数PTHREAD_CANCELED。 3. 如果thread线程是自己调用pthread_exit终止的,value_ptr所指向的单元存放的是传pthread_exit的参数。 4. 如果对thread线程的终止状态不感兴趣,可以传NULL给value_ ptr参数。
分离线程 默认情况下新创建的线程是joinable的线程退出后需要对其进行pthread_join操作否则无法释放资源从而造成系统泄漏。 如果不关心线程的返回值join是一种负担这个时候我们可以告诉系统当线程退出时自动释放线程资源。
int pthread_detach(pthread_t thread);
2.4 Linux线程互斥
我们先来了解一些概念性的东西 临界资源多线程执行流共享的资源就叫做临界资源 临界区每个线程内部访问临界自娱的代码就叫做临界区 互斥任何时刻互斥保证有且只有一个执行流进入临界区访问临界资源通常对临界资源起保护作用 原子性后面讨论如何实现不会被任何调度机制打断的操作该操作只有两态要么完成要么未完成
2.4.1 互斥量mutex 大部分情况线程使用的数据都是局部变量变量的地址空间在线程栈空间内这种情况变量归属单个线程其他线程无法获得这种变量。但有时候很多变量都需要在线程间共享这样的变量称为共享变量可以通过数据的共享完成线程之间的交互。
多个线程并发的操作共享变量会带来一些问题。
我们举一个抢票的例子来说
#include iostream
#include pthread.h
#include unistd.h
#include cstdio
#include cerrno
#include cstdlib
#include string
#define THREADNO 5int residue 1000;void *loot(void* arg){while (true) {if (residue 0) {std::cout 线程 (long long)arg 1 : residue-- 余票 residue std::endl;;usleep(1000);}else {break;}usleep(2000);}
}int main() {//创建线程pthread_t tids[THREADNO];for (long long i 0; i THREADNO; i) {if (pthread_create(tids i, NULL, loot, (void*)i) ! 0){perror(pthread_create);exit(1);}}std::cout 进程创建完毕 std::endl;for (int i 0; i THREADNO; i) {pthread_join(tids[i],nullptr);}return 0;
} CPU运算速度是非常快的所以在访问临界资源的时候就可能出现数据错误本来只有一千张票结果发放了一千多张。
为什么可能无法获得错误结果 if 语句判断条件为真以后代码可以并发的切换到其他线程 usleep这个模拟漫长业务的过程在这个漫长的业务过程中可能有很多个线程会进入该代码段 --ticket操作本身就不是一个原子操作
2.4.2 互斥量的接口
初始化互斥量
初始化互斥量有两种方法 方法1静态分配:
pthread_mutex_t mutex PTHREAD_MUTEX_INITIALIZER 方法2动态分配:
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr
);
//参数
//mutex要初始化的互斥量
//attrNULL
销毁互斥量
销毁互斥量需要注意 使用PTHREAD_ MUTEX_ INITIALIZER初始化的互斥量不需要销毁 不要销毁一个已经加锁的互斥量 已经销毁的互斥量要确保后面不会有线程再尝试加锁
int pthread_mutex_destroy(pthread_mutex_t *mutex) 互斥量加锁和解锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
//返回值:成功返回0,失败返回错误号
调用pthread_ lock 时可能会遇到以下情况: 互斥量处于未锁状态该函数会将互斥量锁定同时返回成功 发起函数调用时其他线程已经锁定互斥量或者存在其他线程同时申请互斥量但没有竞争到互斥量那么pthread_ lock调用会陷入阻塞(执行流被挂起)等待互斥量解锁。
修改上面的抢票代码
#include iostream
#include pthread.h
#include unistd.h
#include cstdio
#include cerrno
#include cstdlib
#include string#define THREADNO 2pthread_mutex_t mtx PTHREAD_MUTEX_INITIALIZER;
int residue 100;void *loot(void* arg){while (true) {pthread_mutex_lock(mtx);if (residue 0) {std::cout 线程 (long long)arg 1 : residue 余票 residue - 1 std::endl;residue--;usleep(5000);//sleep(1);}else {pthread_mutex_unlock(mtx);break;}pthread_mutex_unlock(mtx);usleep(2000);}
}int main() {//创建线程pthread_t tids[THREADNO];for (long long i 0; i THREADNO; i) {if (pthread_create(tids i, NULL, loot, (void*)i) ! 0){perror(pthread_create);exit(1);}}std::cout 进程创建完毕 std::endl;for (int i 0; i THREADNO; i) {pthread_join(tids[i],nullptr);}return 0;
} 经过上面的例子大家已经意识到单纯的i或者i都不是原子的有可能会有数据一致性问题为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。
2.5 可重入和线程安全
2.5.1 线程安全 多个线程并发同一段代码时不会出现不同的结果。常见对全局变量或者静态变量进行操作并且没有锁保护的情况下会出现该问题。
2.5.2 重入 同一个函数被不同的执行流调用当前一个流程还没有执行完就有其他的执行流再次进入我们称之为重入。一个函数在重入的情况下运行结果不会出现任何不同或者任何问题则该函数被称为可重入函数否则是不可重入函数。
常见的线程不安全的情况 不保护共享变量的函数 函数状态随着被调用状态发生变化的函数 返回指向静态变量指针的函数 调用线程不安全函数的函数
常见的线程安全的情况 每个线程对全局变量或者静态变量只有读取的权限而没有写入的权限一般来说这些线程是安全的 类或者接口对于线程来说都是原子操作 多个线程之间的切换不会导致该接口的执行结果存在二义性
常见不可重入的情况 调用了malloc/free函数因为malloc函数是用全局链表来管理堆的 调用了标准I/O库函数标准I/O库的很多实现都以不可重入的方式使用全局数据结构 可重入函数体内使用了静态的数据结构
常见可重入的情况 不使用全局变量或静态变量 不使用用malloc或者new开辟出的空间 不调用不可重入函数 不返回静态或全局数据所有数据都有函数的调用者提供 使用本地数据或者通过制作全局数据的本地拷贝来保护全局数据
可重入与线程安全联系 函数是可重入的那就是线程安全的 函数是不可重入的那就不能由多个线程使用有可能引发线程安全问题 如果一个函数中有全局变量那么这个函数既不是线程安全也不是可重入的。
可重入与线程安全区别 可重入函数是线程安全函数的一种 线程安全不一定是可重入的而可重入函数则一定是线程安全的。 如果将对临界资源的访问加上锁则这个函数是线程安全的但如果这个重入函数若锁还未释放则会产生死锁因此是不可重入的。
2.6 死锁 死锁是指在一组进程中的各个进程均占有不会释放的资源但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。
2.6.1 死锁四个必要条件 互斥条件一个资源每次只能被一个执行流使用 请求与保持条件一个执行流因请求资源而阻塞时对已获得的资源保持不放 不剥夺条件:一个执行流已获得的资源在末使用完之前不能强行剥夺 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
2.6.2 避免死锁 破坏死锁的四个必要条件 加锁顺序一致 避免锁未释放的场景 资源一次性分配
2.7 线程同步
2.7.1 条件变量 当一个线程互斥地访问某个变量时它可能发现在其它线程改变状态之前它什么也做不了。例如一个线程访问队列时发现队列为空它只能等待只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
2.7.2 同步概念与竞态条件 同步在保证数据安全的前提下让线程能够按照某种特定的顺序访问临界资源从而有效避免饥饿问题叫做同步 竞态条件因为时序问题而导致程序异常我们称之为竞态条件。在线程场景下这种问题也不难理解
2.7.3 条件变量函数
初始化
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
//参数
//cond要初始化的条件变量
//attrNULL
销毁
int pthread_cond_destroy(pthread_cond_t *cond)
等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex
);
//参数
//cond要在这个条件变量上等待
//mutex互斥量
唤醒等待
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
简单案例
#include iostream
#include pthread.h
#include unistd.h
#include cstdio
#include stringstruct data{char name;pthread_mutex_t *mtx;pthread_cond_t *cond;
};void* print(void *arg){data *d (data*)arg;char name[64];snprintf(name, sizeof(name),线程%c打印--我是线程%c,d-name,d-name);std::string threadname name;while (true) {pthread_cond_wait(d-cond,d-mtx);std::cout threadname std::endl;sleep(1);pthread_cond_signal(d-cond);}
}int main() {pthread_mutex_t mtx;pthread_cond_t cond;pthread_mutex_init(mtx,nullptr);pthread_cond_init(cond,nullptr);data d1,d2;d1.name A;d1.mtx mtx;d1.cond cond;d2.name B;d2.mtx mtx;d2.cond cond;pthread_t p1,p2;pthread_create(p1,nullptr,print,(void*)d1);pthread_create(p2,nullptr,print,(void*)d2);sleep(1);pthread_cond_signal(cond);pthread_join(p1,nullptr);pthread_join(p2,nullptr);while (1);pthread_mutex_destroy(mtx);pthread_cond_destroy(cond);return 0;
}
运行结果 为什么pthread_ cond_ wait 需要互斥量? 条件等待是线程间同步的一种手段如果只有一个线程条件不满足一直等下去都不会满足所以必须要有一个线程通过某些操作改变共享变量使原先不满足的条件变得满足并且友好的通知等待在条件变量上的线程。条件不会无缘无故的突然变得满足了必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。
2.8 POSIX信号量 POSIX信号量和SystemV信号量作用相同都是用于同步操作达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
初始化信号量
#include semaphore.h
int sem_init(sem_t *sem, int pshared, unsigned int value);
//参数
//pshared:0表示线程间共享非零表示进程间共享
//value信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能等待信号量会将信号量的值减1
int sem_wait(sem_t *sem);
发布信号量
//发布信号量表示资源使用完毕可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);
3、实现简易的线程池生产者消费着模型
生产者消费者模型 为何要使用生产者消费者模型 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯而通过阻塞队列来进行通讯所以生产者生产完数据之后不用等待消费者处理直接扔给阻塞队列消费者不找生产者要数据而是直接从阻塞队列里取阻塞队列就相当于一个缓冲区平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产者消费者模型优点 解耦、支持并发、支持忙闲不均
线程池: 一种线程使用模式。线程过多会带来调度开销进而影响缓存局部性和整体性能。而线程池维护着多个线程等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的应用场景 1. 需要大量的线程来完成任务且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务使用线程池技术是非常合适的。因为单个任务小而任务数量巨大你可以想象一个热门网站的点击次数。 但对于长时间的任务比如一个Telnet连接请求线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。 2. 对性能要求苛刻的应用比如要求服务器迅速响应客户请求。 3. 接受突发性的大量请求但不至于使服务器因此产生大量线程的应用。突发性大量客户请求在没有线程池情况下将产生大量线程虽然理论上大部分操作系统线程数目最大值不是问题短时间内产生大量线程可能使内存到达极限出现错误. 下来我们分别用条件变量和信号量实现线程池单机版的我们就设计在线程池内部自产自销的方案实现
1使用条件变量和阻塞队列实现线程池
//thread.hpp
#pragma once#include iostream
#include pthread.h
#include string
#include cstdio
//#include functionalclass Thread{//typedef std::functionvoid* (void*) function;typedef void*(*function)(void*);public:Thread(){}Thread(int num):_tid(0){char name[64];snprintf(name, sizeof(name),thread%d,num);_name name;}void create(function fun,void* arg){pthread_create(_tid, nullptr, fun, arg);}void join() {pthread_join(_tid,nullptr);}std::string name(){return _name;}~Thread(){}private:pthread_t _tid;std::string _name;
};
//Task.hpp
#pragma once
#include iostreamtemplateclass T, class D
class Task{public:Task(T fun, D num1, D num2):_fun(fun),_arg0(num1),_arg1(num2){}T fun() {return _fun;}D arg0() {return _arg0;}D arg1() {return _arg1;}private:T _fun;D _arg0;D _arg1;
};
//mypool.hpp
#pragma once#include thread.hpp
#include Task.hpp
#include vector
#include queue
//#include cstdlib
#include ctime
#include unistd.h
#define CAPACITY 10int add(int x, int y)
{return x y;
}template class T, class D
struct poolData
{int _capacity;//线程自身Thread* _self;std::queueTaskT, D * *_task;// 锁pthread_mutex_t *_mtx;// 条件变量pthread_cond_t *_full; // 满了pthread_cond_t *_empty; // 空了
};template class T, class D
class Pool
{
public:Pool(int num) : _capacity(10){_productor new std::vectorThread *(num);_consumer new std::vectorThread *(num);_task new std::queueTaskT, D *;pthread_mutex_init(_mtx, nullptr);pthread_cond_init(_full, nullptr);pthread_cond_init(_empty, nullptr);}std::vectorThread * *getpro(){return _productor;}std::vectorThread * *getcon(){return _consumer;}std::queueTaskT, D * *gettask(){return _task;}void strat(){//poolDataT, D prodata[_productor-size()];//poolDataT, D condata[_productor-size()];std::vectorpoolDataT, D prodata(_productor-size());std::vectorpoolDataT, D condata(_productor-size());for (int i 0; i _productor-size(); i){//data[i]._consumer _consumer;//data[i]._productor _productor;prodata[i]._task _task;prodata[i]._full _full;prodata[i]._empty _empty;prodata[i]._mtx _mtx;prodata[i]._capacity _capacity;(*_productor)[i] new Thread(i);prodata[i]._self (*_productor)[i];(*_productor)[i]-create(productor, prodata[i]);condata[i]._task _task;condata[i]._full _full;condata[i]._empty _empty;condata[i]._mtx _mtx;condata[i]._capacity _capacity;(*_consumer)[i] new Thread(i);condata[i]._self (*_consumer)[i];(*_consumer)[i]-create(consumer, condata[i]);}for (int i 0; i _productor-size(); i){(*_productor)[i]-join();(*_consumer)[i]-join();}for (int i 0; i _productor-size(); i){delete (*_productor)[i];delete (*_consumer)[i];}}// 生产者static void *productor(void *args){poolDataT, D *pd (poolDataT, D *)args;//std::vectorThread * *pro pd-_productor;// 消费者队列// std::vectorThread* *con pd-_consumer;Thread* self pd-_self;// 阻塞队列std::queueTaskT, D * *task pd-_task;// 锁pthread_mutex_t *mtx pd-_mtx;// 条件变量pthread_cond_t *full pd-_full; // 满了pthread_cond_t *empty pd-_empty; // 空了int capacity pd-_capacity;srand((uint64_t)time(nullptr) ^ 0x6666);while (true){int x rand() % 1000 10;int y rand() % 1000 10;pthread_mutex_lock(mtx);// 如果满了就挂起while (task-size() capacity){std::cout 生产者挂起 std::endl;pthread_cond_wait(full, mtx);}std::cout 生产者拿到锁开始生产 std::endl;// 添加任务task-push(new Taskint (*)(int, int), int(add, x, y));std::cout 生产者 self-name() 生产 x y ? std::endl;usleep(rand()%3000);std::cout 生产者生产完毕释放锁唤醒线程 std::endl;pthread_cond_signal(empty);pthread_mutex_unlock(mtx);sleep(1);}}// 消费者static void *consumer(void *args){poolDataT, D *pd (poolDataT, D *)args;// std::vectorThread** pro pd-_productor;// 消费者队列//std::vectorThread * *con pd-_consumer;Thread* self pd-_self;// 阻塞队列std::queueTaskT, D * *task pd-_task;// 锁pthread_mutex_t *mtx pd-_mtx;// 条件变量pthread_cond_t *full pd-_full; // 满了pthread_cond_t *empty pd-_empty; // 空了while (true){pthread_mutex_lock(mtx);while (task-empty()) {std::cout 消费者挂起 std::endl;pthread_cond_wait(empty, mtx);}std::cout 消费者拿到锁开始消费 std::endl;// 执行任务// task-push(Task(add,x,y));TaskT, D *t task-front();task-pop();D num t-fun()(t-arg0(), t-arg1());std::cout 消费者 self-name() 消费 t-arg0() t-arg1() num std::endl;delete t;usleep(rand()%3000);std::cout 消费者消费完毕释放锁唤醒线程 std::endl;pthread_cond_signal(full);pthread_mutex_unlock(mtx);//usleep(rand() % 2000);sleep(2);}}~Pool(){if (!_productor-empty()){for (int i 0; i _productor-size(); i){if ((*_productor)[i] ! nullptr){delete (*_productor)[i];}}}if (!_consumer-empty()){for (int i 0; i _consumer-size(); i){if ((*_consumer)[i] ! nullptr){delete (*_consumer)[i];}}}while (!_task-empty()){TaskT,D *t _task-front();_task-pop();delete t;}delete _productor;delete _consumer;delete _task;pthread_mutex_destroy(_mtx);pthread_cond_destroy(_full);pthread_cond_destroy(_empty);}private:int _capacity;// 生产着队列std::vectorThread * *_productor;// 消费者队列std::vectorThread * *_consumer;// 阻塞队列std::queueTaskT, D * *_task;// 锁pthread_mutex_t _mtx;// 条件变量pthread_cond_t _full; // 满了pthread_cond_t _empty; // 空了
};//test.cpp
#include mypool.hppint main() {//std::cout hello std::endl;Poolint(*)(int,int),int p(1);p.strat();return 0;
}
2使用信号量和循环队列实现线程池
#pragma once//mutex.hpp
#include iostream
#include pthread.hclass mutex{public:mutex(){pthread_mutex_init(_mtx,nullptr);}void lock(){pthread_mutex_lock(_mtx);}void unlock(){pthread_mutex_unlock(_mtx);}~mutex(){pthread_mutex_destroy(_mtx);}private:pthread_mutex_t _mtx;
};
//sem.hpp
#pragma once#include iostream
#include semaphore.hclass sem{public:sem(int value){sem_init(_sem, 0, value);}void p(){sem_wait(_sem);}void v(){sem_post(_sem);}~sem(){sem_destroy(_sem);}private:sem_t _sem;
};
//ringqueue.hpp
#include iostream
#include vector
#include sem.hpp
#include mutex.hpptemplateclass T
class ringqueue {public:ringqueue(int capacity 10):_ring_queue(capacity),_start(0),_tail(0),_space_sem(capacity),_data_sem(0),_mtx(){}void push(const T in){_space_sem.p();_mtx.lock();_ring_queue[_start] in;_start % _ring_queue.size();_data_sem.v();_mtx.unlock();}void pop(T out){_data_sem.p();_mtx.lock();out _ring_queue[_tail];_tail % _ring_queue.size();_space_sem.v();_mtx.unlock();}~ringqueue(){}private:std::vectorT _ring_queue;int _start;int _tail;sem _space_sem;sem _data_sem;mutex _mtx;
};
//mypool.hpp
#pragma once#include thread.hpp
#include Task.hpp
#include ringQueue.hpp
//#include vector
//#include queue
//#include cstdlib
#include ctime
#include unistd.h
//#define CAPACITY 10int add(int x, int y)
{return x y;
}template class T, class D
struct poolData
{Thread* _self;ringqueueTaskT,D** _rq;
};template class T, class D
class Pool
{
public:Pool(int num) :_productor(num),_consumer(num){}void strat(){poolDataT,D prodata[_productor.size()];poolDataT,D condata[_consumer.size()];for (int i 0; i _productor.size(); i) {_productor[i] new Thread(i);prodata[i]._self _productor[i];prodata[i]._rq _rq;_productor[i]-create(productor,prodata[i]);_consumer[i] new Thread(i);condata[i]._self _consumer[i];condata[i]._rq _rq;_consumer[i]-create(consumer,condata[i]);}for (int i 0; i _productor.size(); i) {_productor[i]-join();delete _productor[i];_consumer[i]-join();delete _consumer[i];}}// 生产者static void *productor(void *args){poolDataT,D *pd (poolDataT,D*)args;Thread *self pd-_self;ringqueueTaskT,D* *rq pd-_rq;srand(time(nullptr) ^ 0x6666);while (true) {int x rand() % 1000 10;int y rand() % 1000 10;rq-push(new TaskT,D(add,x,y));std::cout 生产线程 self-name() 生产任务 x y ? std::endl;usleep(rand() %5000 3000);//sleep(1);}}// 消费者static void *consumer(void *args){poolDataT,D *pd (poolDataT,D*)args;Thread *self pd-_self;ringqueueTaskT,D* *rq pd-_rq;srand(time(nullptr) ^ 0x6666);while (true) {TaskT,D *t;rq-pop(t);int z t-fun()(t-arg0(),t-arg1());std::cout 消费者 self-name() 消费任务 t-arg0() t-arg1() z std::endl;delete t;usleep(rand() % 5000 3000);}}~Pool(){}private:ringqueueTaskT,D* _rq;std::vectorThread* _productor;std::vectorThread* _consumer;
};//test.cpp
#include mypool.hppint main() {Poolint(*)(int,int),int p(5);p.strat();return 0;
}
4、结语 好了今天的分享就到这里了如果文章对你有帮助请留下你的评论如果有错误也请评论私信作者