珠宝怎么做网站,wordpress 评论验证,广州制作网站公司哪家好,邯郸封控最新消息一、线程池原理
如果并发的线程数量很多#xff0c;并且每个线程都是执行一个时间很短的任务就结束了#xff0c;这样频繁创建线程就会大大降低系统的效率#xff0c;因为频繁创建线程和销毁线程需要时间。
线程池是一种多线程处理形式#xff0c;处理过程中将任务添加到…一、线程池原理
如果并发的线程数量很多并且每个线程都是执行一个时间很短的任务就结束了这样频繁创建线程就会大大降低系统的效率因为频繁创建线程和销毁线程需要时间。
线程池是一种多线程处理形式处理过程中将任务添加到任务队列然后在创建线程后自动启动这些任务。
线程池线程都是后台线程。每个线程都使用默认的堆栈大小以默认的优先级运行并处于多线程单元中。如果某个线程在托管代码中空闲如正在等待某个事件, 则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙但队列中包含挂起的工作则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队但他们要等到其他线程完成后才启动。
二、线程池组成 任务队列存储需要处理的任务由工作的线程来处理这些任务
通过线程池提供的 API 函数将一个待处理的任务添加到任务队列或者从任务队列中删除已处理的任务会被从任务队列中删除线程池的使用者也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程
工作的线程任务队列任务的消费者worker N个
线程池中维护了一定数量的工作线程他们的作用是是不停的读任务队列从里边取出任务并处理工作的线程相当于是任务队列的消费者角色如果任务队列为空工作的线程将会被阻塞 (使用条件变量 / 信号量阻塞)如果阻塞之后有了新的任务由生产者将阻塞解除工作线程开始工作
管理者线程不处理任务队列中的任务1个
它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测当任务过多的时候可以适当的创建一些新的工作线程当任务过少的时候可以适当的销毁一些工作的线程
三、线程池C实现
3.1 cmake
cmake_minimum_required(VERSION 2.8.12)
project(Thread_Pool CXX C)
message(STATUS CMake version: ${CMAKE_VERSION})
message(STATUS CMake system name: ${CMAKE_SYSTEM_NAME})
message(STATUS CMake system processor: ${CMAKE_SYSTEM_PROCESSOR})
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/bin)include_directories(${CMAKE_SOURCE_DIR}/include)
file(GLOB SRC_FILES${PROJECT_SOURCE_DIR}/src/*.c${PROJECT_SOURCE_DIR}/src/*.cpp
)find_package(Threads REQUIRED)
add_executable(${CMAKE_PROJECT_NAME} ${SRC_FILES})target_link_libraries(${PROJECT_NAME} ${CMAKE_THREAD_LIBS_INIT})target_link_libraries(${CMAKE_PROJECT_NAME} Threads::Threads)
target_link_libraries(${CMAKE_PROJECT_NAME} pthread)3.2 threadPool.h
1. 创建任务结构体task类型
任务的函数指针任务函数的参数
2. 创建线程池结构体ThreadPoll类型
任务队列任务结构体Task的指针Task *taskQ本质是一个环形队列任务队列的的容量任务队列头存放的是任务队列数组的下标任务队列尾同上master线程的IDworker线程的ID指针数组的头最小线程数最大线程数、繁忙线程数、存活线程数、销毁线程数锁线程池的锁、繁忙线程数锁条件变量队列是不是满、队列是不是空销毁线程池标志
3. 相关API函数的声明
线程池创建初始化销毁线程池线程池添加任务获取工作线程数、存活线程数单个线程退出
#ifndef __THREAD_POOL_H__
#define __THREAD_POOL_H__
#includestdio.h
#includestdio.h
#includepthread.h//任务结构体
struct Task{void(*function)(void* arg); //任务的函数指针void* arg; // 任务函数的参数void指针类型
};
typedef struct Task Task;
//线程池结构体
struct ThreadPool
{ Task* taskQ; /* 任务队列 */int queueCapacity; //容量int queueSize; //当前任务个数int queueFront; //队头 - 取数据int queueRear; //队尾 - 放数据pthread_t managerID; //管理者线程ID(只有1个)pthread_t *threadIDs; //工作的线程ID(有许多个)int minNum; //最小线程数量int maxNum; //最大线程数量int busyNum; //繁忙的线程数量当前正在执行工作函数的线程int liveNum; //存活的线程数量当前存在的工作线程数liveNum busyNum 没有任务阻塞休眠的线程int exitNum; //要销毁的线程数准备要删除的线程数量pthread_mutex_t mutexPool; //锁整个线程池 pthread_mutex_t std::mutex(C11)pthread_mutex_t mutexBusy; //锁busyNum变量 pthread_mutex_t std::mutex(C11)pthread_cond_t notFull; //任务队列是不是满了 pthread_cond_t std::condition_varibale类(C11)pthread_cond_t notEmpty; //任务队列是不是空了int shutdown; //是不是要销毁线程池销毁为1不销毁为0 };
typedef struct ThreadPool ThreadPool;//创建线程池并且初始化ThreadPool* threadPoolCreate(int min,int max,int queueSize);//销毁线程池int threadPoolDestroy(ThreadPool* pool);//给线程池添加任务void threadPoolAdd(ThreadPool* pool,void(*func)(void*),void* arg);//获取线程池中工作的线程的个数int threadPoolBusyNum(ThreadPool* pool);//获取线程池中活着的线程的个数int threadPoolAliveNum(ThreadPool* pool);//注意工作的线程一定是活着的但是活着的线程不一定在工作//工作的线程消费者线程的任务函数void* worker(void* arg);//管理者线程任务函数void* manager(void* arg);// (主要用于创建和销毁线程池的)//单个线程退出void threadExit(ThreadPool* pool);#endif3.3 threadPool.c
1. threadPoolCreate创建线程池
输入工作线程最小数、最大数 任务队列大小 返回值线程池指针1 申请线程池内存申请max*woker线程的内存2 申请pool、busy锁申请enmpty、full条件变量3 申请Task * queueSize任务队列内存4 创建一个master线程、min个woker线程5 如果申请过程中失败释放申请内存
2. threadPoolDestroy销毁线程池
输入线程池指针返回值01 销毁线程池标志shutdown置12 回收master线程3 empty条件变量唤醒worker线程 woker线程没有任务的时候empty条件变量将woker阻塞worker线程因为empty条件变量阻塞有两种唤醒情况 1、添加新任务唤醒empty阻塞线程布置新任务2、线程池销毁唤醒empty阻塞线程然后判断销毁标志shutdown 1woker线程自杀使用threadExit 4 释放申请的内存5 释放锁和条件变量
3. threadPoolAdd添加任务
输入线程池指针任务函数指针任务函数参数指针返回值空1 如果线程池任务队列数等于最大容量使用full条件变量阻塞master线程2 任务队列添加任务添加任务函数和函数参数3 队尾后移对队列容量取余实现环形队列队列大小4 empty条件变量唤醒woker线程之前因为任务队列空而empty条件休眠的woker线程
4. threadPoolBusyNum获取忙线程个数
输入线程池指针返回值忙线程数
5. threadPoolAliveNum获取活线程个数
输入线程池指针返回值活线程数
6. worker工作线程函数
输入线程池指针返回值空1 判断任务队列是否为空为空则empty条件变量阻塞线程2 判断销毁线程数是否大于0如果大于该worker线程自杀threadExit函数3 判断销毁线程池标志shutdown是否为1如果是该worker线程自杀threadExit函数4 创建新任务task从任务队列头中取任务然后放入新创建的task。5 移动队列头queueSize–busyNum6 唤醒full条件变量阻塞的生产者线程之前add函数中任务队列满了full条件变量阻塞生产者线程现在释放7 执行真正的任务task.function(task.arg);8 任务结束busyNum–
7. manager管理线程函数
输入线程池指针返回值空1 检测线程池销毁标志shutdown没有销毁一直检测每隔一段时间管理一下2 取出任务队列尺寸活线程数忙线程数3 如果 活线程数 最大线程数 活线程数 队列尺寸说明忙不过来添加新woker线程4 如果 忙线程*2 活线程数 最小线程数 活线程数 说明woker过多设置销毁线程数然后使用empty条件变量唤醒woker线程唤醒的worker线程自杀
8. threadExit线程退出函数
输入线程池指针返回值空1 获取当前pid2 在任务队列里对比pid3 找到自己的pid改为0给以后备用4 线程自杀pthread_exit(NULL);
#include../include/thread_pool.h
#includestring.h//用memset函数
#includeunistd.h//用sleep函数
#includepthread.h//使用pthread_self() 打印当前线程的id的函数
#includestdlib.h//使用malloc操作
const int ADDORDESTROYNUMBER 2;//每次添加/销毁的线程的number个数//创建线程池并且初始化
//形参表分别为最小线程个数最大线程个数以及队列大小ThreadPool* threadPoolCreate(int min,int max,int queueSize){ThreadPool* pool (ThreadPool*)malloc(sizeof(ThreadPool));do{if(pool NULL){//分配内存失败 return 空printf(malloc threadpool fail ...\n);break;// return NULL;}//succeed to create a thread pool!pool-threadIDs (pthread_t*)malloc(sizeof(pthread_t) * max);//工作线程中分别Max个空间的heap区数组空间if(pool-threadIDs NULL){printf(malloc threadIDs fail ...\n);//分配内存失败 return 空break;// return NULL;}//初始化工作的线程IDs们memset(pool-threadIDs, 0, sizeof(pthread_t) * max);//memset为cstring中的函数用来赋值//把线程id数组中的元素全都赋值为0pool-minNum min;pool-maxNum max;pool-busyNum 0;pool-liveNum min;//初始化时以线程的个数最小值来创建活着的线程的个数pool-exitNum 0;//一开始初始化肯定没有销毁线程的这个数量要根据程序运行中的状态来decideif(pthread_mutex_init(pool-mutexPool,NULL) ! 0||pthread_mutex_init(pool-mutexBusy,NULL) ! 0||pthread_cond_init(pool-notEmpty,NULL) ! 0||pthread_cond_init(pool-notFull,NULL) ! 0){//此时创建失败printf(mutex or condition inti fail ...\n);break;// return 0;}//此时创建锁mutex和条件变量cond成功//然后创建任务队列pool-taskQ (Task*)malloc(sizeof(Task) * queueSize);//开辟一块arr内存存放任务队列arr所存储的任务最大值为容量那么大pool-queueCapacity queueSize;pool-queueSize 0;//当前任务数为0个pool-queueFront 0;//因为没有任务所有头部执行0indexpool-queueRear 0;//因为没有任务所有尾部执行0indexpool-shutdown 0;//初始化时肯定不能销毁线程池所有标记为0自己规定的//创建线程pthread_create(pool-managerID,NULL,manager,pool);//创建管理者这一个线程for(int i0;imin;i){//一开始就创建min个线程这是线程池中的最小线程数pthread_create(pool-threadIDs[i],NULL,worker,pool);}//如果能成功执行到这里那么就表示成功执行了线程池//此时直接返回线程池即可return pool;}while(0);//只会执行一次只要有开辟不成功的case马上break出while循环//下面再进行资源释放的工作if(pool pool-threadIDs) free(pool-threadIDs);//线程池存在的case下开辟了线程IDs的空间时就释放它if(pool pool-taskQ) free(pool-taskQ);//线程池存在的case下开辟了taskQ的空间时就释放它if(pool) free(pool);//释放线程池return NULL;}//销毁线程池当线程池被销毁时线程池中的所有成员都必须被销毁int threadPoolDestroy(ThreadPool* pool){if(pool NULL) return -1;// 表示此时线程池已空不需要销毁了// 线程池不空没被销毁时pool-shutdown 1;//关闭线程池// 阻塞回收管理者线程pthread_join(pool-managerID,NULL);// 唤醒阻塞的活着的消费者线程// 唤醒后他们会自动退出为什么退出呢因为我们写了让他们退出的条件判断代码for(int i 0;i pool-liveNum; i){pthread_cond_signal(pool-notEmpty);}// 释放申请的堆区内存if(pool-taskQ){free(pool-taskQ);pool-taskQ NULL;}if(pool-threadIDs){free(pool-threadIDs);pool-threadIDs NULL;}//再释放互斥量锁还有条件类锁pthread_mutex_destroy(pool-mutexBusy);pthread_mutex_destroy(pool-mutexPool);pthread_cond_destroy(pool-notEmpty);pthread_cond_destroy(pool-notFull);free(pool);pool NULL;return 0;//return 0 就表示的是成功Destory了线程池并返回了}//给线程池的任务队列中添加任务void threadPoolAdd(ThreadPool* pool,void(*func)(void*),void* arg){//由于你给该线程池的任务队列中添加任务时很有可能此时你正在对任务进行读or写的操作//那么因此这里就必须要用线程池的锁mutex_pool来锁住(防止这份共享代码因为OS的调度切换搞乱了)pthread_mutex_lock(pool-mutexPool);while(pool-queueSize pool-queueCapacity !pool-shutdown){// 此时线程池的任务队列数 其最大容量了 并且 该线程池还没有被销毁 时// 阻塞生产者线程pthread_cond_wait(pool-notFull,pool-mutexPool);}// 此时被堵塞的生产者线程 被唤醒了注意此时该线程还是拿到了mutex互斥锁的状态的// 先判断线程池是否已经被销毁了// 线程池被销毁if(pool-shutdown){pthread_mutex_unlock(pool-mutexPool);// 解锁return;// 并 退出程序}// 线程池没被销毁时 // 给线程池中的任务队列 添加任务pool-taskQ[pool-queueRear].function func;pool-taskQ[pool-queueRear].arg arg;// 让队尾index (循环)后移pool-queueRear (pool-queueRear 1) % pool-queueCapacity;pool-queueSize;// 队列任务1//生产者生产出新的任务唤醒消费者消费任务pthread_cond_signal(pool-notEmpty);//pool-notEmpty用来worker中//通知pool-notEmpty这个condition_variable条件变量的对象 返回true 让他唤醒了去工作干活了pthread_mutex_unlock(pool-mutexPool);}// 获取线程池中工作(忙)的线程的个数int threadPoolBusyNum(ThreadPool* pool){// 注意 读取pool中的成员变量时 也为了防止别的线程给这些变量写or读数据// 应该加上锁pthread_mutex_lock(pool-mutexBusy);int busyNum pool-busyNum;pthread_mutex_unlock(pool-mutexBusy);return busyNum;}// 获取线程池中活着的线程的个数int threadPoolAliveNum(ThreadPool* pool){// 注意 读取pool中的成员变量时 也为了防止别的线程给这些变量写or读数据// 应该加上锁且这里因为没有给aliveNum 定义对应的互斥量 so 直接用线程池的锁即可pthread_mutex_lock(pool-mutexPool);int aliveNum pool-liveNum ;pthread_mutex_unlock(pool-mutexPool);return aliveNum ;}// 工作的线程消费者线程的任务函数void* worker(void* arg){//首先把传入进来的参数类型转换为线程池类型ThreadPool* pool (ThreadPool*)arg;//接下来就算不断地读取任务队列中的任务每个任务就是一个函数while(1){pthread_mutex_lock(pool-mutexPool);//加锁// 当前任务队列是否为空while(pool-queueSize 0 !pool-shutdown){// 在当前队列中任务为空 并且 线程池没有被销毁时 就阻塞在这个工作线程中pthread_cond_wait(pool-notEmpty,pool-mutexPool);//只有当pool-notEmpty的返回值为true时就继续往下执行// 当该哦工作的线程被唤醒时判断一下该线程是否需要被销毁if(pool-exitNum 0){pool-exitNum--;//每次判断好要让该线程自杀后必须让exitNum--才行// 当然当条件变量wait被唤醒后肯定是拿到了互斥锁才继续往下面执行的// 此时就必须要把pool-mutexPool这把锁头给unlock解锁一下再退出该程序否则你没解锁就退出的话该程序就直接死锁了if(pool-liveNum pool-minNum){pool-liveNum--;pthread_mutex_unlock(pool-mutexPool);//解锁// pthread_exit(NULL);//退出该线程threadExit(pool);}}}// 判断线程池是否被关闭了if(pool-shutdown){// 若线程池被销毁了可以直接退出线程pthread_mutex_unlock(pool-mutexPool);//解锁// pthread_exit(NULL);//退出线程threadExit(pool);}// 若线程池没有被销毁了可以直接做任务了消费// 从任务队列中取出一个任务(真正 do things)struct Task task;task.function pool-taskQ[pool-queueFront].function;task.arg pool-taskQ[pool-queueFront].arg;// 移动头节点 使之do循环移动 构成一个循环队列 , 取余数的目的的为了形成环形队列到最后一个进行取余变成0其他不变pool-queueFront (pool-queueFront1) % pool-queueCapacity;pool-queueSize--;//取出1个任务了所有--//解锁,消费者消费成功唤醒阻塞的生产者生产。pthread_cond_signal(pool-notFull);pthread_mutex_unlock(pool-mutexPool);// do 真正的工作在function中printf(thread %ld start working...\n,pthread_self());pthread_mutex_lock(pool-mutexBusy);pool-busyNum;//工作时pthread_mutex_unlock(pool-mutexBusy);// do things 真正工作的函数进行执行task.function(task.arg);// (*task.function)(task.arg);// 同时因为此时这个线程在忙所有busyNum且用对应的锁来锁住防止这个线程的小任务还没真正开始干活呢// OS就因为调度切换到别的线程去了free(task.arg);task.arg NULL;printf(thread %ld end working...\n,pthread_self());pthread_mutex_lock(pool-mutexBusy);pool-busyNum--;//工作完成后--pthread_mutex_unlock(pool-mutexBusy);pthread_mutex_unlock(pool-mutexPool);//解锁}return NULL;}//管理者线程任务函数(主要用于创建和销毁线程池的)void* manager(void* arg){ThreadPool* pool (ThreadPool*)arg;while(!pool-shutdown){// 只要线程池没有销毁 就继续管理它// 每隔3s钟 管理一下sleep(3);//linux 下要包含头文件unistd.h才能使用// 取出线程池钟的任务数量以及当前线程的数量// (由于你读取的过程钟有可能别的线程正在写数据)为了防止这种case你就必须要上锁才行pthread_mutex_lock(pool-mutexPool);//加锁int queueSize pool-queueSize;int liveNum pool-liveNum;pthread_mutex_unlock(pool-mutexPool);//解锁// 取出忙的线程的数量pthread_mutex_lock(pool-mutexBusy);//加锁int busyNum pool-busyNum;pthread_mutex_unlock(pool-mutexBusy);//解锁// 添加线程(此时可以根据你项目的需要来制定相应的添加线程个数的规则)// 比如当存活的线程数 max 当前存活的线程数 当前任务队列钟的任务数量 时 就添加线程此时表明当前的线程数已经忙不过来了if(liveNum pool-maxNum liveNum queueSize){pthread_mutex_lock(pool-mutexPool);//加锁int cnt 0; for(int i0;i pool-maxNum cnt ADDORDESTROYNUMBER pool-liveNum pool-maxNum;i){if(pool-threadIDs[i] 0)//为0就 表明此时该线程ID可用之前没被使用线程我可以用了{// 创建线程元素并放进去数组中pthread_create(pool-threadIDs[i],NULL,worker,pool);cnt;pool-liveNum;}}pthread_mutex_unlock(pool-mutexPool);//解锁}// 销毁线程(此时可以根据你项目的需要来制定相应的销毁线程个数的规则)// 比如当 忙的线程数*2 存活的线程数 最小线程数 存活的线程数 时 就销毁线程if(busyNum * 2 liveNum pool-minNum liveNum){pthread_mutex_lock(pool-mutexPool);pool-exitNum ADDORDESTROYNUMBER;//要销毁的线程数字定义为ADDORDESTROYNUMBER(2)pthread_mutex_unlock(pool-mutexPool);// 让工作的线程自杀for(int i0;iADDORDESTROYNUMBER;i){//唤醒线程,唤醒的是存活且不忙的线程pthread_cond_signal(pool-notEmpty);// condition_variable中的.notify_one()}}}return NULL;}//单个线程退出void threadExit(ThreadPool* pool){//先获取当前线程的线程idpthread_t threadId pthread_self();// std::this_thread::get_id();for(int i0;ipool-maxNum;i){if(pool-threadIDs[i] threadId){pool-threadIDs[i] 0;//重新置为0表示该线程ID可在后续被使用了printf(threadExit() called%ld existing...\n,threadId);break;}}pthread_exit(NULL);}四、线程池CPP实现
4.1 任务队列类的声明
// 定义任务结构体
using callback void(*)(void*);
struct Task
{Task(){function nullptr;arg nullptr;}Task(callback f, void* arg){function f;this-arg arg;}callback function;void* arg;
};// 任务队列
class TaskQueue
{
public:TaskQueue();~TaskQueue();// 添加任务void addTask(Task task);void addTask(callback func, void* arg);// 取出一个任务Task takeTask();// 获取当前队列中任务个数inline int taskNumber(){return m_queue.size();}private:pthread_mutex_t m_mutex; // 互斥锁std::queueTask m_queue; // 任务队列
};
4.2 任务队列类的定义
TaskQueue::TaskQueue()
{pthread_mutex_init(m_mutex, NULL);
}TaskQueue::~TaskQueue()
{pthread_mutex_destroy(m_mutex);
}void TaskQueue::addTask(Task task)
{pthread_mutex_lock(m_mutex);m_queue.push(task);pthread_mutex_unlock(m_mutex);
}void TaskQueue::addTask(callback func, void* arg)
{pthread_mutex_lock(m_mutex);Task task;task.function func;task.arg arg;m_queue.push(task);pthread_mutex_unlock(m_mutex);
}Task TaskQueue::takeTask()
{Task t;pthread_mutex_lock(m_mutex);if (m_queue.size() 0){t m_queue.front();m_queue.pop();}pthread_mutex_unlock(m_mutex);return t;
}4.3 线程池类的声明
class ThreadPool
{
public:ThreadPool(int min, int max);~ThreadPool();// 添加任务void addTask(Task task);// 获取忙线程的个数int getBusyNumber();// 获取活着的线程个数int getAliveNumber();private:// 工作的线程的任务函数static void* worker(void* arg);// 管理者线程的任务函数static void* manager(void* arg);void threadExit();private:pthread_mutex_t m_lock;pthread_cond_t m_notEmpty;pthread_t* m_threadIDs;pthread_t m_managerID;TaskQueue* m_taskQ;int m_minNum;int m_maxNum;int m_busyNum;int m_aliveNum;int m_exitNum;bool m_shutdown false;
};
4.4 线程池类的定义
ThreadPool::ThreadPool(int minNum, int maxNum)
{// 实例化任务队列m_taskQ new TaskQueue;do {// 初始化线程池m_minNum minNum;m_maxNum maxNum;m_busyNum 0;m_aliveNum minNum;// 根据线程的最大上限给线程数组分配内存m_threadIDs new pthread_t[maxNum];if (m_threadIDs nullptr){cout malloc thread_t[] 失败.... endl;;break;}// 初始化memset(m_threadIDs, 0, sizeof(pthread_t) * maxNum);// 初始化互斥锁,条件变量if (pthread_mutex_init(m_lock, NULL) ! 0 ||pthread_cond_init(m_notEmpty, NULL) ! 0){cout init mutex or condition fail... endl;break;}/// 创建线程 //// 根据最小线程个数, 创建线程for (int i 0; i minNum; i){pthread_create(m_threadIDs[i], NULL, worker, this);cout 创建子线程, ID: to_string(m_threadIDs[i]) endl;}// 创建管理者线程, 1个pthread_create(m_managerID, NULL, manager, this);} while (0);
}ThreadPool::~ThreadPool()
{m_shutdown 1;// 销毁管理者线程pthread_join(m_managerID, NULL);// 唤醒所有消费者线程for (int i 0; i m_aliveNum; i){pthread_cond_signal(m_notEmpty);}if (m_taskQ) delete m_taskQ;if (m_threadIDs) delete[]m_threadIDs;pthread_mutex_destroy(m_lock);pthread_cond_destroy(m_notEmpty);
}void ThreadPool::addTask(Task task)
{if (m_shutdown){return;}// 添加任务不需要加锁任务队列中有锁m_taskQ-addTask(task);// 唤醒工作的线程pthread_cond_signal(m_notEmpty);
}int ThreadPool::getAliveNumber()
{int threadNum 0;pthread_mutex_lock(m_lock);threadNum m_aliveNum;pthread_mutex_unlock(m_lock);return threadNum;
}int ThreadPool::getBusyNumber()
{int busyNum 0;pthread_mutex_lock(m_lock);busyNum m_busyNum;pthread_mutex_unlock(m_lock);return busyNum;
}// 工作线程任务函数
void* ThreadPool::worker(void* arg)
{ThreadPool* pool static_castThreadPool*(arg);// 一直不停的工作while (true){// 访问任务队列(共享资源)加锁pthread_mutex_lock(pool-m_lock);// 判断任务队列是否为空, 如果为空工作线程阻塞while (pool-m_taskQ-taskNumber() 0 !pool-m_shutdown){cout thread to_string(pthread_self()) waiting... endl;// 阻塞线程pthread_cond_wait(pool-m_notEmpty, pool-m_lock);// 解除阻塞之后, 判断是否要销毁线程if (pool-m_exitNum 0){pool-m_exitNum--;if (pool-m_aliveNum pool-m_minNum){pool-m_aliveNum--;pthread_mutex_unlock(pool-m_lock);pool-threadExit();}}}// 判断线程池是否被关闭了if (pool-m_shutdown){pthread_mutex_unlock(pool-m_lock);pool-threadExit();}// 从任务队列中取出一个任务Task task pool-m_taskQ-takeTask();// 工作的线程1pool-m_busyNum;// 线程池解锁pthread_mutex_unlock(pool-m_lock);// 执行任务cout thread to_string(pthread_self()) start working... endl;task.function(task.arg);delete task.arg;task.arg nullptr;// 任务处理结束cout thread to_string(pthread_self()) end working...;pthread_mutex_lock(pool-m_lock);pool-m_busyNum--;pthread_mutex_unlock(pool-m_lock);}return nullptr;
}// 管理者线程任务函数
void* ThreadPool::manager(void* arg)
{ThreadPool* pool static_castThreadPool*(arg);// 如果线程池没有关闭, 就一直检测while (!pool-m_shutdown){// 每隔5s检测一次sleep(5);// 取出线程池中的任务数和线程数量// 取出工作的线程池数量pthread_mutex_lock(pool-m_lock);int queueSize pool-m_taskQ-taskNumber();int liveNum pool-m_aliveNum;int busyNum pool-m_busyNum;pthread_mutex_unlock(pool-m_lock);// 创建线程const int NUMBER 2;// 当前任务个数存活的线程数 存活的线程数最大线程个数if (queueSize liveNum liveNum pool-m_maxNum){// 线程池加锁pthread_mutex_lock(pool-m_lock);int num 0;for (int i 0; i pool-m_maxNum num NUMBER pool-m_aliveNum pool-m_maxNum; i){if (pool-m_threadIDs[i] 0){pthread_create(pool-m_threadIDs[i], NULL, worker, pool);num;pool-m_aliveNum;}}pthread_mutex_unlock(pool-m_lock);}// 销毁多余的线程// 忙线程*2 存活的线程数目 存活的线程数 最小线程数量if (busyNum * 2 liveNum liveNum pool-m_minNum){pthread_mutex_lock(pool-m_lock);pool-m_exitNum NUMBER;pthread_mutex_unlock(pool-m_lock);for (int i 0; i NUMBER; i){pthread_cond_signal(pool-m_notEmpty);}}}return nullptr;
}// 线程退出
void ThreadPool::threadExit()
{pthread_t tid pthread_self();for (int i 0; i m_maxNum; i){if (m_threadIDs[i] tid){cout threadExit() function: thread to_string(pthread_self()) exiting... endl;m_threadIDs[i] 0;break;}}pthread_exit(NULL);
}五、线程池实现难点
4.1 empty条件变量
判断任务队列是否满了
阻塞休眠 当worker函数消费者消耗任务队列为空empty条件变量让该woker线程阻塞休眠 释放唤醒 当add函数添加新任务到队列中empty条件变量唤醒阻塞woker线程执行新任务当threadPoolDestroy函数销毁线程池shutdown标志为1empty条件变量唤醒阻塞woker线程。woker线程醒来后发现shutdown标志为1进行自杀当manager函数判断当前woker线程过多设置销毁线程数exitNumempty条件变量唤醒阻塞woker线程。woker线程醒来后发现s销毁线程数exitNum 大于0进行自杀
4.2 full条件变量
判断任务队列是否空了
阻塞休眠 当add函数发现任务队列满了无法添加新任务时full条件变量让该生产者线程调用add函数的线程阻塞休眠 释放唤醒 当worker函数从任务队列中取出新任务任务队列有空间的时候full条件变量唤醒阻塞生产者线程创建新任务