影视网站怎么做内链,企业管理课程有哪些内容,欧 美 做 爱 视频网站,网站域名空间怎么买#x1f387;Linux#xff1a; 博客主页#xff1a;一起去看日落吗分享博主的在Linux中学习到的知识和遇到的问题博主的能力有限#xff0c;出现错误希望大家不吝赐教分享给大家一句我很喜欢的话#xff1a; 看似不起波澜的日复一日#xff0c;一定会在某一天让你看见坚持… Linux 博客主页一起去看日落吗分享博主的在Linux中学习到的知识和遇到的问题博主的能力有限出现错误希望大家不吝赐教分享给大家一句我很喜欢的话 看似不起波澜的日复一日一定会在某一天让你看见坚持的意义祝我们都能在鸡零狗碎里找到闪闪的快乐。 目录1. 生产者消费者模型1.1 生产者消费者模型的概念1.2 生产者消费者模型的特点1.3 生产者消费者模型优点2. 基于BlockingQueue的生产者消费者模型2.1 基于阻塞队列的生产者消费者模型2.2 模拟实现基于阻塞队列的生产消费模型1. 生产者消费者模型
1.1 生产者消费者模型的概念
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯而通过这个容器来通讯所以生产者生产完数据之后不用等待消费者处理直接将生产的数据放到这个容器当中消费者也不用找生产者要数据而是直接从这个容器里取数据这个容器就相当于一个缓冲区平衡了生产者和消费者的处理能力这个容器实际上就是用来给生产者和消费者解耦的。 1.2 生产者消费者模型的特点
生产者消费者模型是多线程同步与互斥的一个经典场景其特点如下
三种关系 生产者和生产者互斥关系、消费者和消费者互斥关系、生产者和消费者互斥关系、同步关系。两种角色 生产者和消费者。通常由进程或线程承担一个交易场所 通常指的是内存中的一段缓冲区。可以自己通过某种方式组织起来
我们用代码编写生产者消费者模型的时候本质就是对这三个特点进行维护。 生产者和生产者、消费者和消费者、生产者和消费者它们之间为什么会存在互斥关系
介于生产者和消费者之间的容器可能会被多个执行流同时访问因此我们需要将该临界资源用互斥锁保护起来。
其中所有的生产者和消费者都会竞争式的申请锁因此生产者和生产者、消费者和消费者、生产者和消费者之间都存在互斥关系。
生产者和消费者之间为什么会存在同步关系
如果让生产者一直生产那么当生产者生产的数据将容器塞满后生产者再生产数据就会生产失败。
反之让消费者一直消费那么当容器当中的数据被消费完后消费者再进行消费就会消费失败。
虽然这样不会造成任何数据不一致的问题但是这样会引起另一方的饥饿问题是非常低效的。我们应该让生产者和消费者访问该容器时具有一定的顺序性比如让生产者先生产然后再让消费者进行消费。
注意 互斥关系保证的是数据的正确性而同步关系是为了让多线程之间协同起来。 1.3 生产者消费者模型优点
解耦。支持并发。支持忙闲不均。
如果我们在主函数中调用某一函数那么我们必须等该函数体执行完后才继续执行主函数的后续代码因此函数调用本质上是一种紧耦合。
对应到生产者消费者模型中函数传参实际上就是生产者生产的过程而执行函数体实际上就是消费者消费的过程但生产者只负责生产数据消费者只负责消费数据在消费者消费期间生产者可以同时进行生产因此生产者消费者模型本质是一种松耦合。 2. 基于BlockingQueue的生产者消费者模型
2.1 基于阻塞队列的生产者消费者模型
在多线程编程中阻塞队列Blocking Queue是一种常用于实现生产者和消费者模型的数据结构。 其与普通的队列的区别在于
当队列为空时从队列获取元素的操作将会被阻塞直到队列中放入了元素。当队列满时往队列里存放元素的操作会被阻塞直到有元素从队列中取出。
看到以上阻塞队列的描述我们很容易想到的就是管道而阻塞队列最典型的应用场景实际上就是管道的实现。 2.2 模拟实现基于阻塞队列的生产消费模型
其中的BlockQueue就是生产者消费者模型当中的交易场所我们可以用CSTL库当中的queue进行实现。 其中的BlockQueue就是生产者消费者模型当中的交易场所我们可以用CSTL库当中的queue进行实现。
#include iostream
#include pthread.h
#include queue
#include unistd.h#define NUM 5templateclass T
class BlockQueue
{
private:bool IsFull(){return _q.size() _cap;}bool IsEmpty(){return _q.empty();}
public:BlockQueue(int cap NUM): _cap(cap){pthread_mutex_init(_mutex, nullptr);pthread_cond_init(_full, nullptr);pthread_cond_init(_empty, nullptr);}~BlockQueue(){pthread_mutex_destroy(_mutex);pthread_cond_destroy(_full);pthread_cond_destroy(_empty);}//向阻塞队列插入数据生产者调用void Push(const T data){pthread_mutex_lock(_mutex);while (IsFull()){//不能进行生产直到阻塞队列可以容纳新的数据pthread_cond_wait(_full, _mutex);}_q.push(data);pthread_mutex_unlock(_mutex);pthread_cond_signal(_empty); //唤醒在empty条件变量下等待的消费者线程}//从阻塞队列获取数据消费者调用void Pop(T data){pthread_mutex_lock(_mutex);while (IsEmpty()){//不能进行消费直到阻塞队列有新的数据pthread_cond_wait(_empty, _mutex);}data _q.front();_q.pop();pthread_mutex_unlock(_mutex);pthread_cond_signal(_full); //唤醒在full条件变量下等待的生产者线程}
private:std::queueT _q; //阻塞队列int _cap; //阻塞队列最大容器数据个数pthread_mutex_t _mutex;pthread_cond_t _full;pthread_cond_t _empty;
};
说明
由于我们实现的是单生产者、单消费者的生产者消费者模型因此我们不需要维护生产者和生产者之间的关系也不需要维护消费者和消费者之间的关系我们只需要维护生产者和消费者之间的同步与互斥关系即可。将BlockingQueue当中存储的数据模板化方便以后需要时进行复用。这里设置BlockingQueue存储数据的上限为5当阻塞队列中存储了五组数据时生产者就不能进行生产了此时生产者就应该被阻塞。生产者线程要向阻塞队列当中Push数据前提是阻塞队列里面有空间若阻塞队列已经满了那么此时该生产者线程就需要进行等待直到阻塞队列中有空间时再将其唤醒。消费者线程要从阻塞队列当中Pop数据前提是阻塞队列里面有数据若阻塞队列为空那么此时该消费者线程就需要进行等待直到阻塞队列中有新的数据时再将其唤醒。因此在这里我们需要用到两个条件变量一个条件变量用来描述队列为空另一个条件变量用来描述队列已满。当阻塞队列满了的时候要进行生产的生产者线程就应该在full条件变量下进行等待当阻塞队列为空的时候要进行消费的消费者线程就应该在empty条件变量下进行等待。不论是生产者线程还是消费者线程它们都是先申请到锁进入临界区后再判断是否满足生产或消费条件的如果对应条件不满足那么对应线程就会被挂起。但此时该线程是拿着锁的为了避免死锁问题在调用pthread_cond_wait函数时就需要传入当前线程手中的互斥锁此时当该线程被挂起时就会自动释放手中的互斥锁而当该线程被唤醒时又会自动获取到该互斥锁。当生产者生产完一个数据后意味着阻塞队列当中至少有一个数据而此时可能有消费者线程正在empty条件变量下进行等待因此当生产者生产完数据后需要唤醒在empty条件变量下等待的消费者线程。同样的当消费者消费完一个数据后意味着阻塞队列当中至少有一个空间而此时可能有生产者线程正在full条件变量下进行等待因此当消费者消费完数据后需要唤醒在full条件变量下等待的生产者线程。 pthread_cond_wait函数是让当前执行流进行等待的函数是函数就意味着有可能调用失败调用失败后该执行流就会继续往后执行。其次在多消费者的情况下当生产者生产了一个数据后如果使用pthread_cond_broadcast函数唤醒消费者就会一次性唤醒多个消费者但待消费的数据只有一个此时其他消费者就被伪唤醒了。为了避免出现上述情况我们就要让线程被唤醒后再次进行判断确认是否真的满足生产消费条件因此这里必须要用while进行判断。
在主函数中我们就只需要创建一个生产者线程和一个消费者线程让生产者线程不断生产数据让消费者线程不断消费数据。
#include BlockQueue.hppvoid* Producer(void* arg)
{BlockQueueint* bq (BlockQueueint*)arg;//生产者不断进行生产while (true){sleep(1);int data rand() % 100 1;bq-Push(data); //生产数据std::cout Producer: data std::endl;}
}
void* Consumer(void* arg)
{BlockQueueint* bq (BlockQueueint*)arg;//消费者不断进行消费while (true){sleep(1);int data 0;bq-Pop(data); //消费数据std::cout Consumer: data std::endl;}
}
int main()
{srand((unsigned int)time(nullptr));pthread_t producer, consumer;BlockQueueint* bq new BlockQueueint;//创建生产者线程和消费者线程pthread_create(producer, nullptr, Producer, bq);pthread_create(consumer, nullptr, Consumer, bq);//join生产者线程和消费者线程pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete bq;return 0;
}
说明
阻塞队列要让生产者线程向队列中Push数据让消费者线程从队列中Pop数据因此这个阻塞队列必须要让这两个线程同时看到所以我们在创建生产者线程和消费者线程时需要将该阻塞队列作为线程执行例程的参数进行传入。代码中生产者生产数据就是将获取到的随机数Push到阻塞队列而消费者消费数据就是从阻塞队列Pop数据为了便于观察我们可以将生产者生产的数据和消费者消费的数据进行打印输出。
由于代码中生产者是每隔一秒生产一个数据而消费者是每隔一秒消费一个数据因此运行代码后我们可以看到生产者和消费者的执行步调是一致的。 我们可以让生产者不停的进行生产而消费者每隔一秒进行消费。
void* Producer(void* arg)
{BlockQueueint* bq (BlockQueueint*)arg;//生产者不断进行生产while (true){int data rand() % 100 1;bq-Push(data); //生产数据std::cout Producer: data std::endl;}
}
void* Consumer(void* arg)
{BlockQueueint* bq (BlockQueueint*)arg;//消费者不断进行消费while (true){sleep(1);int data 0;bq-Pop(data); //消费数据std::cout Consumer: data std::endl;}
}
此时由于生产者生产的很快运行代码后一瞬间生产者就将阻塞队列打满了此时生产者想要再进行生产就只能在full条件变量下进行等待直到消费者消费完一个数据后生产者才会被唤醒进而继续进行生产生产者生产完一个数据后又会进行等待因此后续生产者和消费者的步调又变成一致的了。 当然我们也可以让生产者每隔一秒进行生产而消费者不停的进行消费。
void* Producer(void* arg)
{BlockQueueint* bq (BlockQueueint*)arg;//生产者不断进行生产while (true){sleep(1);int data rand() % 100 1;bq-Push(data); //生产数据std::cout Producer: data std::endl;}
}
void* Consumer(void* arg)
{BlockQueueint* bq (BlockQueueint*)arg;//消费者不断进行消费while (true){int data 0;bq-Pop(data); //消费数据std::cout Consumer: data std::endl;}
}
虽然消费者消费的很快但一开始阻塞队列中是没有数据的因此消费者只能在empty条件变量下进行等待直到生产者生产完一个数据后消费者才会被唤醒进而进行消费消费者消费完这一个数据后又会进行等待因此生产者和消费者的步调就是一致的。 我们也可以当阻塞队列当中存储的数据大于队列容量的一半时再唤醒消费者线程进行消费当阻塞队列当中存储的数据小于队列容器的一半时再唤醒生产者线程进行生产。
//向阻塞队列插入数据生产者调用
void Push(const T data)
{pthread_mutex_lock(_mutex);while (IsFull()){//不能进行生产直到阻塞队列可以容纳新的数据pthread_cond_wait(_full, _mutex);}_q.push(data);if (_q.size() _cap / 2){pthread_cond_signal(_empty); //唤醒在empty条件变量下等待的消费者线程}pthread_mutex_unlock(_mutex);
}
//从阻塞队列获取数据消费者调用
void Pop(T data)
{pthread_mutex_lock(_mutex);while (IsEmpty()){//不能进行消费直到阻塞队列有新的数据pthread_cond_wait(_empty, _mutex);}data _q.front();_q.pop();if (_q.size() _cap / 2){pthread_cond_signal(_full); //唤醒在full条件变量下等待的生产者线程}pthread_mutex_unlock(_mutex);
}
我们仍然让生产者生产的快消费者消费的慢。运行代码后生产者还是一瞬间将阻塞队列打满后进行等待但此时不是消费者消费一个数据就唤醒生产者线程而是当阻塞队列当中的数据小于队列容器的一半时才会唤醒生产者线程进行生产。 基于计算任务的生产者消费者模型
当然实际使用生产者消费者模型时可不是简单的让生产者生产一个数字让消费者进行打印而已我们这样做只是为了测试代码的正确性。
由于我们将BlockingQueue当中存储的数据进行了模板化此时就可以让BlockingQueue当中存储其他类型的数据。
例如我们想要实现一个基于计算任务的生产者消费者模型此时我们只需要定义一个Task类这个类当中需要包含一个Run成员函数该函数代表着我们想让消费者如何处理拿到的数据。
#pragma once
#include iostreamclass Task
{
public:Task(int x 0, int y 0, int op 0): _x(x), _y(y), _op(op){}~Task(){}void Run(){int result 0;switch (_op){case :result _x _y;break;case -:result _x - _y;break;case *:result _x * _y;break;case /:if (_y 0){std::cout Warning: div zero! std::endl;result -1;}else{result _x / _y;}break;case %:if (_y 0){std::cout Warning: mod zero! std::endl;result -1;}else{result _x % _y;}break;default:std::cout error operation! std::endl;break;}std::cout _x _op _y result std::endl;}
private:int _x;int _y;char _op;
};
此时生产者放入阻塞队列的数据就是一个Task对象而消费者从阻塞队列拿到Task对象后就可以用该对象调用Run成员函数进行数据处理。
void* Producer(void* arg)
{BlockQueueTask* bq (BlockQueueTask*)arg;const char* arr -*/%;//生产者不断进行生产while (true){int x rand() % 100;int y rand() % 100;char op arr[rand() % 5];Task t(x, y, op);bq-Push(t); //生产数据std::cout producer task done std::endl;}
}
void* Consumer(void* arg)
{BlockQueueTask* bq (BlockQueueTask*)arg;//消费者不断进行消费while (true){sleep(1);Task t;bq-Pop(t); //消费数据t.Run(); //处理数据}
}
也就是说此后我们想让生产者消费者模型处理某一种任务时就只需要提供对应的Task类然后让该Task类提供一个对应的Run成员函数告诉我们应该如何处理这个任务即可。