石家庄建立网站的公司,网页微信文件传输助手,汽车行业网站建设比较好,温州网络关键词排名目录
一.为何要使用生产者消费者模型 二.生产者消费者模型优点 三.基于BlockingQueue的生产者消费者模型
1.BlockingQueue——阻塞队列
2.实现代码 四.POSIX信号量
五.基于环形队列的生产消费模型 一.为何要使用生产者消费者模型
生产者消费者模式就是通过一个容器来解决生…目录
一.为何要使用生产者消费者模型 二.生产者消费者模型优点 三.基于BlockingQueue的生产者消费者模型
1.BlockingQueue——阻塞队列
2.实现代码 四.POSIX信号量
五.基于环形队列的生产消费模型 一.为何要使用生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯而通过阻塞队列来进行通讯所以生产者生产完数据之后不用等待消费者处理直接扔给阻塞队列消费者不找生产者要数据而是直接从阻塞队列里取阻塞队列就相当于一个缓冲区平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。 二.生产者消费者模型优点
解耦:生产者和消费者不直接解除无需关心对方的情况仅仅自己与缓冲区解除。支持并发并发的体现并不在于多个消费者同时从缓冲区中拿数据也不是多个生产者同时从缓冲区放数据而是消费者在处理拿到的数据的时候生产者可以继续向缓冲区放数据。支持忙闲不均 当生产者生产过快的时候可以让生产者慢下来当消费者消费过快的时候可以让消费者慢下来。 三.基于BlockingQueue的生产者消费者模型 1.BlockingQueue——阻塞队列
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于当队列为空时从队列获取元素的操作将会被阻塞直到队列中被放入了元素当队列满时往队列里存放元素的操作也会被阻塞直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的线程在对阻塞队列进程操作时会被阻塞)。 2.实现代码
#include iostream
#include string
#include queue
#include ctime
#include unistd.h
#include pthread.husing namespace std;template class T
class BlockQueue
{
public:BlockQueue(size_t cap): _cap(cap){// 初始化条件变量pthread_cond_init(_c_cond, nullptr);pthread_cond_init(_p_cond, nullptr);}void push(T date){// 将任务push进去队列多线程加锁每一只能一个线程push任务pthread_mutex_lock(_mutex);while (_q.size() _cap) // 如果队列已经满了,生产者要被阻塞{pthread_cond_wait(_p_cond, _mutex);}_q.push(date);// 当push任务成功的时候需要将唤醒消费者来处理数据pthread_cond_signal(_c_cond);pthread_mutex_unlock(_mutex);}T pop(){// 将任务从队列中拿出来多线程加锁每一只能一个线程拿任务pthread_mutex_lock(_mutex);// 如果队列是空的就将消费者阻塞while (isempty()){pthread_cond_wait(_c_cond, _mutex);}T tmp _q.front();_q.pop();// 成功拿到数据以后唤醒生产者继续生产任务pthread_cond_signal(_p_cond);pthread_mutex_unlock(_mutex);return tmp;}~BlockQueue(){pthread_cond_destroy(_c_cond);pthread_cond_destroy(_p_cond);}private:bool isempty(){return _q.empty();}bool isfull(){return _q.size() _cap;}private:queueT _q; // 队列size_t _cap; // 容量pthread_cond_t _c_cond; // 消费者条件变量pthread_cond_t _p_cond; // 生产者条件变量pthread_mutex_t _mutex PTHREAD_MUTEX_INITIALIZER; // 互斥锁
};cp模型
#include BlockQueue.hppusing namespace std;// 构建任务
struct Task
{Task(int a, int b, char op): _a(a), _b(b), _op(op){}char _op; // 运算符int _a; // 运算数1int _b; // 运算数2int ret; // 结果int _exitcode; // 退出码
};void *push_task(void *args)
{BlockQueueTask *pBQ static_castBlockQueueTask *(args);while (1){char op_arr[4] {, -, *, /};int a rand() % 10;int b rand() % 10;char op op_arr[(a * b) % 4];// 构建任务结构体Task tk(a, b, op);// push任务pBQ-push(tk);printf(%d %c %d ?\n, a, op, b);sleep(1);}return NULL;
}void *get_task(void *args)
{BlockQueueTask *pBQ static_castBlockQueueTask *(args);while (1){// 获取任务并处理Task tk pBQ-pop();switch (tk._op){case :tk.ret tk._a tk._b;break;case -:tk.ret tk._a - tk._b;break;case *:tk.ret tk._a * tk._b;break;case /:if (tk._b 0){exit(-1);}tk.ret tk._a / tk._b;break;default:break;}printf(%d %c %d %d\n, tk._a, tk._op, tk._b, tk.ret);sleep(1);}return NULL;
}int main()
{BlockQueueTask BQ(5);pthread_t tid_c[4];pthread_t tid_p[4];srand(time(nullptr));// pushpthread_create(tid_c[0], NULL, push_task, BQ);pthread_create(tid_c[1], NULL, push_task, BQ);pthread_create(tid_c[2], NULL, push_task, BQ);pthread_create(tid_c[3], NULL, push_task, BQ);// getpthread_create(tid_p[0], NULL, get_task, BQ);pthread_create(tid_p[1], NULL, get_task, BQ);pthread_create(tid_p[2], NULL, get_task, BQ);pthread_create(tid_p[3], NULL, get_task, BQ);pthread_join(tid_c[0], NULL);pthread_join(tid_c[1], NULL);pthread_join(tid_c[2], NULL);pthread_join(tid_c[3], NULL);pthread_join(tid_p[0], NULL);pthread_join(tid_p[1], NULL);pthread_join(tid_p[2], NULL);pthread_join(tid_p[3], NULL);return 0;
}
测试结果 四.POSIX信号量
POSIX信号量和SystemV信号量作用相同都是用于同步操作达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
定义信号量
sem_t sem;
初始化信号量
#include semaphore.h
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数
pshared:0表示线程间共享非零表示进程间共享。value信号量初始值。
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能等待信号量会将信号量的值减1。
int sem_wait(sem_t *sem); //P()
发布信号量
功能发布信号量表示资源使用完毕可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
说明
信号量本身就是一个计数器用来描述可用资源的数目。信号量机制就像是我们看电影买票一样是对资源的预定机制。只有申请到信号量才能对共享资源访问。只要我们申请信号量成功了将来我们一定可以访问临界资源就像看电影我们只要买到了电影票不管我们去不去电影院电影院里一定有我们的位置。
五.基于环形队列的生产消费模型
环形队列采用数组模拟用模运算来模拟环状特性。
环形结构起始状态和结束状态都是一样的不好判断为空或者为满所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置作为满的状态。
代码
RingQueue.hpp
#pragma once
#include iostream
#include cstdio
#include ctime
#include cstdlib
#include pthread.h
#include vector
#include unistd.h
#include semaphore.h
#include mutex.hpp
#include Task.hpp
using namespace std;const size_t size 5;template class T
class RingQueue
{void P(sem_t sem) // 申请信号量{sem_wait(sem);}void V(sem_t sem) // 释放信号量{sem_post(sem);}public:RingQueue(int cap size): _cap(cap), _index_space(0), _index_date(0){// 初始化信号量sem_init(_sem_date, 0, 0); // 数据信号量初始化为0sem_init(_sem_space, 0, cap); // 空间信号量初始化为容量大小// 初始化锁pthread_mutex_init(_mutex, nullptr);_rq.resize(_cap);}void push(const T date){// 申请空间信号量P(_sem_space);{MutexGuard lock(_mutex);_rq[_index_date] date;_index_date % _cap;}// 释放数据信号量V(_sem_date);}T pop(){// 申请数据信号量P(_sem_date);T tmp;{MutexGuard lock(_mutex);tmp _rq[_index_space];_index_space % _cap;}// 释放空间信号量V(_sem_space);return tmp;}~RingQueue(){// 释放信号量和互斥锁sem_destroy(_sem_date);sem_destroy(_sem_space);pthread_mutex_destroy(_mutex);}private:vectorT _rq;size_t _cap; // 容量sem_t _sem_space; // 记录环形队列的空间信号量sem_t _sem_date; // 记录环形队列的数据信号量size_t _index_space; // 生产者的生产位置size_t _index_date; // 消费者的消费位置pthread_mutex_t _mutex; // 容量
};
mutex.hpp:
class Mutex
{
public:Mutex(pthread_mutex_t *mutex): _mutex(mutex){}void lock(){pthread_mutex_lock(_mutex);}void unlock(){pthread_mutex_unlock(_mutex);}~Mutex(){}private:pthread_mutex_t *_mutex;
};class MutexGuard
{
public:MutexGuard(pthread_mutex_t *mutex): _mutex(mutex){_mutex.lock();}~MutexGuard(){_mutex.unlock();}private:Mutex _mutex;
};Task.hpp:
#include iostream
#include cstdio
#include ctime
#include cstdlibstruct Task
{Task(int a 1, int b 1, char op ): _a(a), _b(b), _op(op){}void run(){switch (_op){case :_ret _a _b;break;case -:_ret _a - _b;break;case *:_ret _a * _b;break;case /:if (_b 0){_exitcode -1;exit(1);}_ret _a / _b;break;default:break;}}void showtask(){printf(producer:%d %c %d ?\n, _a, _op, _b);}void showresult(){printf(consumer:%d %c %d %d(exitcode:%d)\n, _a, _op, _b, _ret, _exitcode);}~Task() {}private:int _a;int _b;char _op;int _ret;int _exitcode 0;
};pthread.cc:
#include RingQueue.hppvoid *run_p(void *args)
{char ops[4] {, -, *, /};RingQueueTask *RQ static_castRingQueueTask *(args);while (1){int a rand() % 100;int b rand() % 100;int op ops[(a * b) % 4];Task tk(a, b, op);RQ-push(tk);tk.showtask();sleep(1);}
}
void *run_c(void *args)
{RingQueueTask *RQ static_castRingQueueTask *(args);while (1){Task tk RQ-pop();tk.run();tk.showresult();sleep(1);}
}int main()
{RingQueueTask *RQ new RingQueueTask(5);srand(time(0));pthread_t tid_c[5];pthread_t tid_p[5];for (int i 0; i 5; i){pthread_create(tid_c[i], nullptr, run_c, RQ);pthread_create(tid_p[i], nullptr, run_p, RQ);}for (int i 0; i 5; i){pthread_join(tid_c[i], nullptr);pthread_join(tid_p[i], nullptr);}delete RQ;return 0;
}