贵阳模板建站定制,北京住房和城乡建设官方网站,网站备案之后,滁州市建设工程管理处网站文章目录线程同步条件变量生产者与消费者模型信号量环形队列应用生产者消费者模型线程同步
现实生活中我们经常会遇到同一个资源多个人都想使用的问题#xff0c;例如游乐园过山车排队#xff0c;玩完的游客还想再玩#xff0c;最好的办法就是玩完的游客想再玩就去重新排…
文章目录线程同步条件变量生产者与消费者模型信号量环形队列应用生产者消费者模型线程同步
现实生活中我们经常会遇到同一个资源多个人都想使用的问题例如游乐园过山车排队玩完的游客还想再玩最好的办法就是玩完的游客想再玩就去重新排队
线程同步其实就是一种等待机制多个想要同时访问同一个对象的线程形成一个类似等待队列等待前面的线程使用完毕后下一个线程再使用。
线程同步的概念
在保证数据安全的前提下(加锁保护)让线程能够按照某种特定的顺序访问临界资源从而有效避免饥饿问题叫做同步
为什么要线程同步
互斥是保证线程的安全。但当一个线程访问了临界资源后释放了它的锁同时立刻参与到了锁的竞争中如果它又拿到了锁。那么其他线程就会由于长时间得不到锁访问不了临界资源而造成线程饥饿问题。
同步能让线程能够按照某种特定的顺序访问临界资源从而有效避免饥饿问题使多线程协同高效完成某些事情
同步就是在保证数据安全的前提下让线程按照某种特定的顺序来访问临界资源。
同步与互斥的关系互斥是保证数据的安全同步在互斥的前提下来提高线程之间的效率。
程序没有安全性的问题就没有必要使用同步
条件变量
条件变量是类型为pthread_cond_t的变量是利用线程间共享的全局变量进行同步的一种机制主要有两个动作 线程对某个临界资源进行条件判断为真则执行代码为假则挂起等待节省CPU资源避免空等也可以反着来主要是挂起等待节省资源 其他线程在执行某些动作后使条件成立唤醒等待的线程 它可以用来保证在某个线程没有满足某种条件完成之前其他线程只能挂起等待。
条件变量一般用到4个接口 int pthread_cond_init(pthread_cond_t *cv,const pthread_condattr_t *cattr); 功能初始化条件变量 cv要初始化的条件变量 cattr设置条件变量属性一般置NULL交给OS默认设置即可 返回值成功返回0失败返回错误码 int pthread_cond_destroy(pthread_cond_t *cond) 功能释放申请的条件变量 返回值成功返回0失败返回错误码 注意条件变量所占的空间没有被销毁静态区 int pthread_cond_wait(pthread_cond_t *cv,pthread_mutex_t *mutex); 功能将调用此函数的线程在指定条件变量中挂起等待 cv要在哪个条件变量下挂起等待 mutex线程调用此函数时会自动释放传入的锁 返回值成功返回0失败返回错误码 int pthread_cond_signal(pthread_cond_t *cv); 功能唤醒等待中的线程 cv唤醒在哪个条件变量下等待的线程 返回值成功返回0失败返回错误码 broadcast是一次唤醒指定条件变量下多个线程 下面用一段代码验证条件变量的用法
代码逻辑一共申请6个线程其中一个线程负责发布命令另外5个线程负责工作
#include iostream
#include string
#include pthread.h
#include unistd.hpthread_mutex_t mtx;
pthread_cond_t cond;//发布命令线程
void* master(void* args)
{std::string name (char*)args;while(true){//唤醒在条件变量下等待的一个线程std::cout begin run: std::endl;pthread_cond_signal(cond);sleep(1);}
}
//工作线程[5]
void* threadrun(void* args)
{int num *(int*)args;delete (int*)args;while(true){pthread_cond_wait(cond,mtx);std::cout thread[ num ]running. . . std::endl;}
}
//每个线程再唤醒执行后经过while循环再次挂起等待int main()
{//初始化条件变量pthread_mutex_init(mtx,nullptr);pthread_cond_init(cond,nullptr);pthread_t tid[5];pthread_t boss;pthread_create(boss,nullptr,master,(void*)boss);for(size_t i 0;i 5;i){int* num new int(i);//用堆区变量去传递线程号pthread_create(tidi,nullptr,threadrun,(void*)num);}//最后记得要等待线程以及释放锁和条件变量for(size_t i 0;i 5;i)pthread_join(tid[i],nullptr);pthread_join(boss,nullptr);pthread_mutex_destroy(mtx);pthread_cond_destroy(cond);return 0;
}通过输出结果有如下分析 条件变量内部一定有一个等待队列哪个线程调用wait哪个线程就挂起等待并进入等待队列唤醒顺序就是等待的顺序 线程执行的顺序是不一定的 wait函数一定是释放锁的否则线程调用该函数是抱着锁挂起等待的其他线程就无法访问临界区了 signal函数唤醒的线程也一定会去争锁争到才会继续执行否则可能会出现一个带锁的线程访问临界区和一个刚唤醒的线程继续访问临界区等方面的错误 在mutex已上锁的时候才能调用wait()
条件变量通常和互斥锁一起使用互斥是保证线程的安全条件变量防止互斥造成的饥饿问题 生产者与消费者模型
条件变量使用“通知—唤醒”模型例如网购商家会发快递我们只需要等待快递到了给我们发送提示短信运用在多线程中最经典的就是生产者—消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。 生产者和消费者彼此之间不直接通讯而通过阻塞队列来进行通讯所以生产者生产完数据之后不用等待消费者处理 直接扔给阻塞队列 消费者不找生产者要数据而是直接从阻塞队列里取阻塞队列就相当于一个缓冲区平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。 与普通的队列区别在于当队列为空时从队列获取元素的操作将会被阻塞直到队列中被放入了元素当队列满时往队列里存放元素的操作也会被阻塞直到有元素被从队列中取出 上面的说法放到现实中解释就是消费者—超市—供货商这样的模型消费者购买商品不需要找供货商只需要去超市即可而供货商也不用管消费者只需要给超市提供商品。 消费者购买商品的同时供货商也在生产商品并向超市提供若供货商出现问题超市会有存品可以暂时共给消费者并寻找新的供货商此时超市就解决了消费者和供货商之间的耦合问题 下面用代码验证一下模型
代码逻辑
设计一个类对普通队列、条件变量、互斥量进行封装Push对应生产者利用条件变量对其限制队列为满就挂起等待待消费者消费数据后唤醒Pop对应消费者利用条件变量对其限制队列为空就挂起等待待生产者生产数据后唤醒
/************************BlockQueue.hpp************************/#pragma once
#include iostream
#include queue
#include unistd.h
#include pthread.h
#include time.h
#include cstdlibnamespace dd
{templateclass T
class BlockQueue
{
public://初始化BlockQueue():_capacity(10){pthread_cond_init(_empty,nullptr);pthread_cond_init(_full,nullptr);pthread_mutex_init(_mtx,nullptr);}//释放互斥量、条件变量~BlockQueue(){pthread_cond_destroy(_empty);pthread_cond_destroy(_full);pthread_mutex_destroy(_mtx);}//生产逻辑void Push(const T key){Lockqueue();while(full())ProducterWait();_bq.push(key);UnlockQueue();WakeupConsumer();}//消费逻辑void Pop(T* key){Lockqueue();while(empty())ConsumerWit();*key _bq.front();_bq.pop();UnlockQueue();WakeupProducter();}//队列为空则为真bool empty(){return _bq.empty();} //队列为满则为真这里最大容量设置的是10bool full(){return _bq.size() _capacity-1; //先判断 后push所以要-1}//加锁void Lockqueue(){pthread_mutex_lock(_mtx);}//解锁void UnlockQueue(){pthread_mutex_unlock(_mtx);}//队列满时生产者挂起等待void ProducterWait(){pthread_cond_wait(_full,_mtx);}//生产者唤醒等待void WakeupProducter(){pthread_cond_signal(_full);}//队列为空时消费者挂起等待void ConsumerWit(){pthread_cond_wait(_empty,_mtx);}//消费者唤醒等待void WakeupConsumer(){pthread_cond_signal(_empty);}private:std::queueT _bq;int _capacity; //最大容量本示例设置的是10pthread_mutex_t _mtx;pthread_cond_t _empty;pthread_cond_t _full;
}; }/************************************************************************************************/
#include BlockQueue.hpp
using namespace dd;void* consumer(void* args)
{BlockQueueint* bq (BlockQueueint*)args;while(true){sleep(1);int data 0;bq-Pop(data);std::cout 消费了一个数据 data std::endl;}
}void* producter(void* args)
{BlockQueueint* bq (BlockQueueint*)args;while(true){int data rand()%201;std::cout 生产了一个数据 data std::endl;bq-Push(data);}
}int main()
{srand((long long)time(nullptr));BlockQueueint* bq new BlockQueueint();pthread_t c,p;pthread_create(c,nullptr,consumer,(void*)bq); pthread_create(p,nullptr,producter,(void*)bq); pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}上述示例需要注意 虚假唤醒问题 在wait时必须把它放到while里而不是if里为了防止虚假唤醒 例如上述示例中再增加一个线程1号线程是生产者2、3号线程是消费者当2号线程获取走最后一个数据后3号线程也想获取但发现生产队列为NULL于是就挂起等待随后1号线程生产了一个数据便唤醒3号但是2号又先把数据获取走了3号就属于虚假唤醒 3号线程唤醒后获取竞争锁竞争到了以后继续执行Pop但是队列已经为NULL了所以利用while再次判断可预防这种现象这种现象常见于多核CPU多线程中。
上述示例代码也可以将发送数据改为发送任务
把发送数据改为发送加减乘除的任务再申请一个类负责确定任务完成任务的逻辑功能部分同时也是阻塞队列的数据类型生产者只负责确定要算的数和算法消费者负责完成生产者发布的任务
综上变更的部分有新增的类、最后cpp执行部分
/************************BlockQueue.hpp************************/
#pragma once
#include iostream
#include queue
#include unistd.h
#include pthread.h
#include time.h
#include cstdlibnamespace dd
{templateclass T
class BlockQueue
{
public:BlockQueue():_capacity(10){pthread_cond_init(_empty,nullptr);pthread_cond_init(_full,nullptr);pthread_mutex_init(_mtx,nullptr);}~BlockQueue(){pthread_cond_destroy(_empty);pthread_cond_destroy(_full);pthread_mutex_destroy(_mtx);}void Push(const T key){Lockqueue();while(full())ProducterWait();_bq.push(key);UnlockQueue();WakeupConsumer();}void Pop(T* key){Lockqueue();while(empty())ConsumerWit();*key _bq.front();_bq.pop();UnlockQueue();WakeupProducter();}bool empty(){return _bq.empty();} bool full(){return _bq.size() _capacity-1; //先判断 后push所以要-1}void Lockqueue(){pthread_mutex_lock(_mtx);}void UnlockQueue(){pthread_mutex_unlock(_mtx);}void ProducterWait(){pthread_cond_wait(_full,_mtx);}void WakeupProducter(){pthread_cond_signal(_full);}void ConsumerWit(){pthread_cond_wait(_empty,_mtx);}void WakeupConsumer(){pthread_cond_signal(_empty);}private:std::queueT _bq;int _capacity;pthread_mutex_t _mtx;pthread_cond_t _empty;pthread_cond_t _full;
}; }/************************Task.hpp************************/
#pragma once
#include iostream
#include pthread.h
namespace dd
{class Task
{
public:Task(){}Task(int x,int y,char op):_x(x),_y(y),_op(op){}int Run(){int ret 0;switch(_op){case :ret _x _y;break;case -:ret _x - _y;break;case *:ret _x * _y;break;case /:ret _x / _y;break;case %:ret _x % _y;break;default:break;}//std::cout _x _op _y ret std::endl;std::cout pthread_self() : _x _op _y ret std::endl;}
private:int _x;int _y;char _op;};}/************************************************************************************************/
#include BlockQueue.hpp
#include Task.hpp
using namespace dd;void* consumer(void* args)
{BlockQueueTask* bq (BlockQueueTask*)args;while(true){Task t;bq-Pop(t);t.Run();}
}void* producter(void* args)
{BlockQueueTask* bq (BlockQueueTask*)args;std::string ops -*/%;while(true){int x rand()%201;int y rand()%201;char op ops[rand()%5];Task t(x,y,op);bq-Push(t);sleep(1);}
}int main()
{srand((long long)time(nullptr));BlockQueueTask* bq new BlockQueueTask();pthread_t c1,c2,c3,p;pthread_create(c1,nullptr,consumer,(void*)bq); pthread_create(p,nullptr,producter,(void*)bq); pthread_create(c2,nullptr,consumer,(void*)bq); pthread_create(c3,nullptr,consumer,(void*)bq); pthread_join(c1,nullptr);pthread_join(c2,nullptr);pthread_join(c3,nullptr);pthread_join(p,nullptr);return 0;
} 直接使用互斥量除了生产者、消费者之间要竞争互斥量以外消费者之间也需要竞争互斥量但如果汇聚链表中没有数据消费者之间竞争互斥锁是无意义的。有了条件变量机制以后只有生产者完成生产才会引起消费者之间的竞争提高了程序效率。
信号量
信号量的本质是一把计数器用来描述临界资源中资源数目的大小达到无冲突的访问共享资源目的
例如飞机售票把票看成信号量买票就是申请信号量卖票就是释放信号量
伪代码如下 临界资源分成5个部分count5count就被称作信号量 count–一个执行流占有临界资源一部分的操作叫做P操作 count一个执行流结束使用临界资源的一部分叫做V操作 count 0表示没有资源可以分配此时的线程或进程就会被挂起等待内部数据结构会有类似等待队列的结构 但信号量也属于临界资源所以V、P操作都是原子性的
信号变量的函数接口
#include semaphore.hint sem_init(sem_t *sem, int pshared, unsigned int value);作用初始化信号量 sem要初始化的信号量同互斥量、条件变量一样要创建sem_t类型的 变量 pshared0表示线程间共享大于0表示进程间共享 value信号量初始值信号量个数 返回值成功返回0失败返回-1并设置errno来表示错误 #include semaphore.hint sem_destroy(sem_t *sem);作用销毁定义的信号量sem要销毁的信号量返回值成功返回0失败返回-1并设置errno来表示错误
#include semaphore.hint sem_wait(sem_t *sem);作用等待信号量将信号量的值减1如果信号量为0阻塞等待。V( )操作sem要等待的信号量返回值成功返回0失败返回-1并设置errno来表示错误
#include semaphore.hint sem_post(sem_t *sem);作用表示资源使用完毕将信号量做加1操作。P( )操作sem要发布的信号量返回值成功返回0失败返回-1并设置errno来表示错误
环形队列应用生产者消费者模型
基本原理
生产者和消费者在一开始时是指向同一位置代表队列为空应该让消费者等待生产者工作生产者和消费者当之后所在同一位置代表队列为满应该让消费者工作生产者等待其余时候生产者和消费者 一定不 指向同一位置 注意事项
在不是同一位置时生产者必须在消费者前面在同一位置时为空让生产者先走为满让消费者先走但是消费者不可以套圈消费者最关心队列中的数据因此可以定义一个信号量关心队列已有数据个数生产最关心队列的空位置因此可以定义一个信号量关心队列的空位置不能让它们同时执行但是可以并发执行
代码示例
/**********************************ring_queue.hpp***************************************/
#pragma once#include iostream
#include semaphore.h
#include pthread.h
#include unistd.h
#include vectornamespace dd
{templateclass T
class Ring_queue
{
public://初始化Ring_queue():_cap(10),_c_step(0),_p_step(0){sem_init(_blank_sem,0,10); //位置信号量设置初始值为10sem_init(_data_sem,0,0); //数据信号量设置初始值为0_rq.reserve(10);}//释放信号量~Ring_queue(){sem_destroy(_blank_sem);sem_destroy(_data_sem);}//生产void Push(const T key){//申请数据消费信号量放入数据释放位置信号量sem_wait(_blank_sem);_rq[_p_step] key;sem_post(_data_sem);//更新位置_p_step;_p_step % _cap;}//消费void Pop(T* key){//申请位置信号量取出数据释放数据消费信号量sem_wait(_data_sem);*key _rq[_c_step];sem_post(_blank_sem);//更新位置_c_step;_c_step % _cap;}private:int _cap; //总容量std::vectorT _rq; //队列sem_t _blank_sem; //位置信号量sem_t _data_sem; //数据信号量int _c_step; //消费者位置下标int _p_step; //生产者位置下标}; }/*************************************************************************/
#include ring_queue.hpp
#include time.h
using namespace dd;void* consumer(void* args)
{Ring_queueint* rq (Ring_queueint*)args;while(true){int data rand()%20 1;rq-Push(data);std::cout 生产数据 data std::endl;}}void* producter(void* args)
{Ring_queueint* rq (Ring_queueint*)args;while(true){sleep(1);int data;rq-Pop(data);std::cout 消费数据 data std::endl;}
}int main()
{srand((long long)time(nullptr));pthread_t c,p;Ring_queueint* rq new Ring_queueint();pthread_create(c,nullptr,consumer,(void*)rq);pthread_create(p,nullptr,producter,(void*)rq);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}多生产多消费的区别 void Push(const T key){sem_wait(_blank_sem);pthread_mutex_lock(p_mtx_);_rq[_p_step] key; _p_step;_p_step % _cap;pthread_mutex_unlock(p_mtx_);sem_post(_data_sem);}void Pop(T* key){sem_wait(_data_sem);pthread_mutex_lock(c_mtx_);*key _rq[_c_step]; _c_step;_c_step % _cap;pthread_mutex_unlock(c_mtx_);sem_post(_blank_sem);}sem_wait是原子性的但多生产多消费中的队列和下标是临界资源它们不是原子的所以需要加锁 另外先进行信号量申请相对效率高因为无论是生产还是消费本质是它们的信号量不为0而不是先拿到锁 最终互斥锁加在信号量申请之后避免争到锁但没有信号量的情况