网站没备案能访问吗,哈尔滨信息网租房信息,网络设计解决如何将初步规划中的各个子系统从内部用,wordpress+4.2.4中文文章目录 8.4 生产者消费者模型8.4.1 为何要使用生产者消费者模型8.4.2 生产者消费者模型优点 8.5 基于BlockingQueue的生产者消费者模型8.5.1 C queue模拟阻塞队列的生产消费模型 8.6. 为什么pthread_cond_wait 需要互斥量?8.7 条件变量使用规范8.8 条件变量的封装8.9 POSIX信… 文章目录 8.4 生产者消费者模型8.4.1 为何要使用生产者消费者模型8.4.2 生产者消费者模型优点 8.5 基于BlockingQueue的生产者消费者模型8.5.1 C queue模拟阻塞队列的生产消费模型 8.6. 为什么pthread_cond_wait 需要互斥量?8.7 条件变量使用规范8.8 条件变量的封装8.9 POSIX信号量8.9.1 基于环形队列的生产消费模型实现一个信号量的封装实现一个线程安全的环形队列 8.4 生产者消费者模型
8.4.1 为何要使用生产者消费者模型 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯而通过阻塞队列来进行通讯所以生产者生产完数据之后不用等待消费者处理直接扔给阻塞队列消费者不找生产者要数据而是直接从阻塞队列里取阻塞队列就相当于一个缓冲区平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。 8.4.2 生产者消费者模型优点 解耦支持并发支持忙闲不均 8.5 基于BlockingQueue的生产者消费者模型 在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于当队列为空时从队列获取元素的操作将会被阻塞直到队列中被放入了元素当队列满时往队列里存放元素的操作也会被阻塞直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的线程在对阻塞队列进程操作时会被阻塞) 8.5.1 C queue模拟阻塞队列的生产消费模型
单生产者单消费者
代码
#ifndef __BLOCK_QUEUE_HPP__ // 头文件防重包含宏
#define __BLOCK_QUEUE_HPP__#include iostream
#include queue
#include pthread.h // POSIX线程库头文件// 阻塞队列类模板实现单生产者-单消费者模型
template typename T
class BlockQueue
{
private:// 检查队列是否已满bool IsFull(){return _block_queue.size() _cap;}// 检查队列是否为空bool IsEmpty(){return _block_queue.empty();}public:// 构造函数初始化队列容量和同步原语BlockQueue(int cap) : _cap(cap){// 单生产者-单消费者模型不需要记录等待数量pthread_mutex_init(_mutex, nullptr); // 初始化互斥锁pthread_cond_init(_product_cond, nullptr); // 初始化生产者条件变量pthread_cond_init(_consum_cond, nullptr); // 初始化消费者条件变量}// 生产者接口 - 仅供单个生产者线程调用// 参数in: 要入队的元素void Push(const T in){pthread_mutex_lock(_mutex); // 获取互斥锁进入临界区// 如果队列满生产者需要等待// 单生产者场景使用if而不是while因为不会有虚假唤醒if(IsFull()){// pthread_cond_wait会// 1. 释放互斥锁// 2. 阻塞等待条件变量// 3. 被唤醒后重新获取互斥锁pthread_cond_wait(_product_cond, _mutex);}// 将数据放入队列_block_queue.push(in);// 唤醒可能在等待的消费者// 单消费者场景下最多只有一个线程在等待pthread_cond_signal(_consum_cond);pthread_mutex_unlock(_mutex); // 释放互斥锁离开临界区}// 消费者接口 - 仅供单个消费者线程调用// 参数out: 用于存储出队元素的指针void Pop(T* out){pthread_mutex_lock(_mutex); // 获取互斥锁进入临界区// 如果队列空消费者需要等待// 单消费者场景使用if而不是while因为不会有虚假唤醒if(IsEmpty()){pthread_cond_wait(_consum_cond, _mutex);}// 从队列取出数据*out _block_queue.front(); // 获取队首元素_block_queue.pop(); // 移除队首元素// 唤醒可能在等待的生产者// 单生产者场景下最多只有一个线程在等待pthread_cond_signal(_product_cond);pthread_mutex_unlock(_mutex); // 释放互斥锁离开临界区}// 析构函数清理同步原语~BlockQueue(){pthread_mutex_destroy(_mutex); // 销毁互斥锁pthread_cond_destroy(_product_cond); // 销毁生产者条件变量pthread_cond_destroy(_consum_cond); // 销毁消费者条件变量}private:std::queueT _block_queue; // 底层队列容器int _cap; // 队列最大容量pthread_mutex_t _mutex; // 互斥锁保护共享资源pthread_cond_t _product_cond; // 生产者条件变量用于生产者等待pthread_cond_t _consum_cond; // 消费者条件变量用于消费者等待
};#endif多生产者多消费者
代码
#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__#include iostream
#include string
#include queue
#include pthread.h// 阻塞队列类模板用于实现生产者-消费者模型
template typename T
class BlockQueue
{
private:// 检查队列是否已满bool IsFull(){return _block_queue.size() _cap;}// 检查队列是否为空bool IsEmpty(){return _block_queue.empty();}public:// 构造函数初始化队列容量和同步原语BlockQueue(int cap) : _cap(cap){_productor_wait_num 0; // 等待的生产者数量_consumer_wait_num 0; // 等待的消费者数量pthread_mutex_init(_mutex, nullptr); // 初始化互斥锁pthread_cond_init(_product_cond, nullptr); // 初始化生产者条件变量pthread_cond_init(_consum_cond, nullptr); // 初始化消费者条件变量}// 入队方法生产者接口void Enqueue(T in){pthread_mutex_lock(_mutex); // 获取互斥锁进入临界区// 当队列满时生产者需要等待while(IsFull()) // 使用while而不是if防止虚假唤醒{// 生产者等待流程// 1. 增加等待计数// 2. 释放互斥锁并等待条件变量// 3. 被唤醒后减少等待计数_productor_wait_num;pthread_cond_wait(_product_cond, _mutex);_productor_wait_num--;}// 将数据放入队列_block_queue.push(in);// 如果有消费者在等待唤醒其中一个if(_consumer_wait_num 0)pthread_cond_signal(_consum_cond); // 也可以用broadcast唤醒所有pthread_mutex_unlock(_mutex); // 释放互斥锁}// 出队方法消费者接口void Pop(T *out){pthread_mutex_lock(_mutex); // 获取互斥锁进入临界区// 当队列空时消费者需要等待while(IsEmpty()) // 使用while防止虚假唤醒{// 消费者等待流程// 1. 增加等待计数// 2. 释放互斥锁并等待条件变量// 3. 被唤醒后减少等待计数_consumer_wait_num;pthread_cond_wait(_consum_cond, _mutex);_consumer_wait_num--;}// 从队列取出数据*out _block_queue.front();_block_queue.pop();// 如果有生产者在等待唤醒其中一个if(_productor_wait_num 0)pthread_cond_signal(_product_cond);pthread_mutex_unlock(_mutex); // 释放互斥锁}// 析构函数清理同步原语~BlockQueue(){pthread_mutex_destroy(_mutex);pthread_cond_destroy(_product_cond);pthread_cond_destroy(_consum_cond);}private:std::queueT _block_queue; // 底层队列容器int _cap; // 队列容量pthread_mutex_t _mutex; // 互斥锁保护共享资源pthread_cond_t _product_cond; // 生产者条件变量pthread_cond_t _consum_cond; // 消费者条件变量int _productor_wait_num; // 等待的生产者数量int _consumer_wait_num; // 等待的消费者数量
};#endif这里采用模版是想告诉我们队列中不仅仅可以放置内置类型比如int, 对象也可以作为任务来参与生产消费的过程。 8.6. 为什么pthread_cond_wait 需要互斥量? 条件等待是线程间同步的一种手段如果只有一个线程条件不满足一直等下去都不会满足所以必须要有一个线程通过某些操作改变共享变量使原先不满足的条件变得满足并且友好的通知等待在条件变量上的线程。条件不会无缘无故的突然变得满足了必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。 按照上面的说法我们设计出如下的代码先上锁发现条件不满足解锁然后等待在条件变量上不就行了如下代码: // 错误的设计
pthread_mutex_lock(mutex);
while (condition_is_false) {pthread_mutex_unlock(mutex);//解锁之后等待之前条件可能已经满足信号已经发出但是该信号可能被错过pthread_cond_wait(cond);pthread_mutex_lock(mutex);
}
pthread_mutex_unlock(mutex);由于解锁和等待不是原子操作。调用解锁之后 pthread_cond_wait 之前如果已经有其他线程获取到互斥量摒弃条件满足发送了信号那么 pthread_cond_wait 将错过这个信号可能会导致线程永远阻塞在这个 pthread_cond_wait 。所以解锁和等待必须是一个原子操作。int pthread_cond_wait(pthread_cond_ t *cond,pthread_mutex_ t * mutex); 进入该函数后会去看条件量等于0不等于就把互斥量变成1直到cond_ wait返回把条件量改成1把互斥量恢复成原样。 8.7 条件变量使用规范
等待条件代码
pthread_mutex_lock(mutex);
while (条件为假)pthread_cond_wait(cond, mutex);
修改条件
pthread_mutex_unlock(mutex);给条件发送信号代码
pthread_mutex_lock(mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(mutex);8.8 条件变量的封装
#pragma once // 防止头文件重复包含比#ifndef更现代的方式#include iostream
#include string
#include pthread.h
#include Lock.hpp // 包含互斥锁的封装类// 条件变量模块命名空间
namespace CondModule
{// 使用互斥锁模块的命名空间using namespace LockModule;// 条件变量封装类// 目的将pthread_cond_t的C接口封装为C类实现RAII机制class Cond{public:// 构造函数初始化条件变量Cond(){// pthread_cond_init返回0表示成功非0表示失败int n pthread_cond_init(_cond, nullptr); // 默认属性初始化(void)n; // 暂时忽略返回值实际使用时应该添加错误处理和日志}// 等待条件变量// 参数互斥锁的引用必须在调用Wait前已经获得锁void Wait(Mutex mutex){// pthread_cond_wait会自动// 1. 释放互斥锁// 2. 等待条件// 3. 被唤醒后重新获取锁int n pthread_cond_wait(_cond, mutex.GetMutexOriginal());(void)n; // 暂时忽略返回值实际使用时应该添加错误处理}// 唤醒一个等待的线程void Notify(){// 如果有多个线程在等待则随机唤醒其中一个int n pthread_cond_signal(_cond);(void)n; // 暂时忽略返回值}// 唤醒所有等待的线程void NotifyAll(){// 唤醒所有等待该条件变量的线程// 被唤醒的线程仍需要重新获得互斥锁才能继续执行int n pthread_cond_broadcast(_cond);(void)n; // 暂时忽略返回值}// 析构函数销毁条件变量~Cond(){// 销毁条件变量释放相关资源int n pthread_cond_destroy(_cond);(void)n; // 暂时忽略返回值实际使用时应该添加错误处理和日志}private:pthread_cond_t _cond; // 底层条件变量// 禁止复制和赋值// C11前的方式声明为private但不实现Cond(const Cond);Cond operator(const Cond);};
}为了让条件变量更具有通用性建议封装的时候不要在Cond类内部引用对应的封装互斥量要不然后面组合的时候会因为代码耦合的问题难以初始化因为一般而言Mutex和Cond基本是一起创建的。 8.9 POSIX信号量
POSIX信号量和SystemV信号量作用相同都是用于同步操作达到无冲突的访问共享资源目的。但POSIX可以用于线程间同步。 信号量的本质是一把计数器那么这把计数器的本质是什么 是用来描述资源数目的把资源是否就绪放在了临界区之外。申请信号量时其实就间接的已经在做判断了。 初始化信号量
#include semaphore.h
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数pshared:0表示线程间共享非零表示进程间共享value信号量初始值销毁信号量
int sem_destroy(sem_t *sem);等待信号量
功能等待信号量会将信号量的值减1
int sem_wait(sem_t *sem); //P()发布信号量
功能发布信号量表示资源使用完毕可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()上一节生产者-消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序POSIX信号量: 8.9.1 基于环形队列的生产消费模型
环形队列采用数组模拟用模运算来模拟环状特性 环形结构起始状态和结束状态都是一样的不好判断为空或者为满所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置作为满的状态。 但是我们现在有信号量这个计数器就很简单的进行多线程间的同步过程。 实现一个信号量的封装
#pragma once // 防止头文件重复包含#include iostream
#include semaphore.h // POSIX信号量头文件// 信号量封装类
// 将POSIX信号量封装为C类实现RAII机制
// P-V操作源自荷兰语Proberen(尝试)/Verhogen(增加)
class Sem
{
public:// 构造函数初始化信号量// 参数n: 信号量初始值表示可用资源数量Sem(int n){// sem_init参数说明// _sem: 信号量对象// 0: 信号量类型0表示线程间共享非0表示进程间共享// n: 信号量初始值sem_init(_sem, 0, n);}// P操作等待操作// 使信号量值减1如果信号量值为0则阻塞// 对应生产者-消费者模型中的取走资源操作void P(){// sem_wait会导致// 1. 信号量值大于0将其减1并继续执行// 2. 信号量值等于0阻塞等待直到信号量大于0sem_wait(_sem);}// V操作释放操作// 使信号量值加1如果有线程阻塞则唤醒一个// 对应生产者-消费者模型中的放入资源操作void V(){// sem_post会// 1. 将信号量值加1// 2. 如果有线程因为sem_wait阻塞则唤醒其中一个sem_post(_sem);}// 析构函数销毁信号量~Sem(){// 释放信号量相关的资源sem_destroy(_sem);}private:sem_t _sem; // POSIX信号量对象// 禁止拷贝构造和赋值操作Sem(const Sem) delete;Sem operator(const Sem) delete;
};核心功能 实现了一个计数器用于控制对共享资源的访问 比如有3个座位的餐厅这个信号量初始值就设为3 主要操作 P() 操作等待 想进入餐厅时调用如果还有座位(信号量0)就能直接进入计数器-1如果没座位(信号量0)就要在门口等待 V() 操作释放 离开餐厅时调用释放一个座位计数器1如果有人在等待会唤醒一个等待的人 实现一个线程安全的环形队列
#pragma once
#include iostream
#include string
#include vector
#include semaphore.h
#include pthread.h// 环形队列实现 - 支持多生产者多消费者
// 设计思路 321:
// 3: 需要处理三种同步关系
// a) 生产者和消费者之间的互斥与同步
// b) 多个生产者之间的互斥
// c) 多个消费者之间的互斥
// 2: 使用两把互斥锁
// - 一把用于生产者间互斥
// - 一把用于消费者间互斥
// 1: 一个循环队列作为数据缓冲区templatetypename T
class RingQueue
{
private:// 辅助函数加锁void Lock(pthread_mutex_t mutex){pthread_mutex_lock(mutex);}// 辅助函数解锁void Unlock(pthread_mutex_t mutex){pthread_mutex_unlock(mutex);}public:// 构造函数初始化环形队列和同步原语// 参数cap: 环形队列容量RingQueue(int cap): _ring_queue(cap), // 初始化vector大小为cap_cap(cap), // 保存容量_room_sem(cap), // 空闲位置信号量初值为容量_data_sem(0), // 数据项信号量初值为0_productor_step(0), // 生产位置从0开始_consumer_step(0) // 消费位置从0开始{// 初始化生产者和消费者互斥锁pthread_mutex_init(_productor_mutex, nullptr);pthread_mutex_init(_consumer_mutex, nullptr);}// 入队方法 - 生产者调用// 参数in: 要入队的数据void Enqueue(const T in){_room_sem.P(); // 等待空闲位置Lock(_productor_mutex); // 生产者间互斥// 此时一定有空间可用由信号量保证_ring_queue[_productor_step] in; // 放入数据_productor_step % _cap; // 循环更新位置Unlock(_productor_mutex); // 解除生产者间互斥_data_sem.V(); // 通知有新数据可用}// 出队方法 - 消费者调用// 参数out: 存储出队数据的指针void Pop(T *out){_data_sem.P(); // 等待有数据可用Lock(_consumer_mutex); // 消费者间互斥*out _ring_queue[_consumer_step]; // 取出数据_consumer_step % _cap; // 循环更新位置Unlock(_consumer_mutex); // 解除消费者间互斥_room_sem.V(); // 通知有新空位可用}// 析构函数清理同步原语~RingQueue(){pthread_mutex_destroy(_productor_mutex);pthread_mutex_destroy(_consumer_mutex);}private:// 1. 环形队列存储结构std::vectorT _ring_queue; // 存储数据的环形缓冲区int _cap; // 队列容量// 2. 生产和消费位置int _productor_step; // 生产者放入位置int _consumer_step; // 消费者取出位置// 3. 信号量 - 用于生产者消费者同步Sem _room_sem; // 空闲位置信号量控制生产者Sem _data_sem; // 数据项信号量控制消费者// 4. 互斥锁 - 用于多生产者/消费者间互斥pthread_mutex_t _productor_mutex; // 生产者间互斥pthread_mutex_t _consumer_mutex; // 消费者间互斥
};用一个餐厅自助取餐区的例子来解释 基本结构 想象一个传送带上有固定数量(cap)的餐盘位置 _ring_queue就是这个传送带 _productor_step是厨师放餐盘的位置 _consumer_step是顾客取餐盘的位置 核心功能 Enqueue(): 厨师生产者放餐 检查是否有空位置(_room_sem)确保只有一个厨师在放餐(_productor_mutex)放入餐品通知有新餐品可取(_data_sem) Pop(): 顾客消费者取餐 检查是否有餐品可取(_data_sem)确保只有一个顾客在取餐(_consumer_mutex)取走餐品通知有新空位可用(_room_sem) 同步机制 使用两个信号量控制生产和消费 使用两个互斥锁确保多个生产者/消费者之间不冲突 环形特性 位置到达末尾后回到开头继续使用像传送带一样循环