刷手机网站关键词,html网站怎么做视频,做杂志的网站,聊城做网站的公司流程文章目录 一、C实现线程池1. 头文件2. 测试部分 二、C11实现线程池1. 头文件2. 测试部分 一、C实现线程池
1. 头文件
#define _CRT_SECURE_NO_WARNINGS
#pragma once
#includeiostream
#includestring.h
#includestring
#includepthread.h
#… 文章目录 一、C实现线程池1. 头文件2. 测试部分 二、C11实现线程池1. 头文件2. 测试部分 一、C实现线程池
1. 头文件
#define _CRT_SECURE_NO_WARNINGS
#pragma once
#includeiostream
#includestring.h
#includestring
#includepthread.h
#includestdlib.h
#includequeue
#includeunistd.h
using namespace std;using callback void(*)(void*);
//任务的结构体
templatetypename T
struct Task
{Task(){function nullptr;args nullptr;}Task(callback fun, void* args){function fun;this - args (T*)args;}callback function;T* args;
};//任务队列
templatetypename T
class TaskQueue
{
public:TaskQueue(){pthread_mutex_init(mutex,NULL);}~TaskQueue(){pthread_mutex_destroy(mutex);}//添加任务void AddTask(TaskT task){pthread_mutex_lock(mutex);queue.push(task);pthread_mutex_unlock(mutex);}void AddTask(callback fun, void* args){pthread_mutex_lock(mutex);TaskT task(fun,args); queue.push(task);pthread_mutex_unlock(mutex);}//取出一个任务TaskT TakeTask(){TaskT task;pthread_mutex_lock(mutex);if (queue.size() 0){task queue.front();queue.pop();}pthread_mutex_unlock(mutex);return task;}//获取当前队列中的任务个数inline int GetTaskNum(){return queue.size();}
private:pthread_mutex_t mutex; //互斥锁std::queueTaskT queue;
};//线程池
templatetypename T
class ThreadPool
{
public:ThreadPool(int min , int max){//实例化任务队列taskqueue new TaskQueueT;//初始化线程池min_num min;max_num max;busy_num 0;live_num min;//根据线程最大上限,给线程数组分配内存threadID new pthread_t[max];if (threadID nullptr){cout new threadID fail endl;}//初始化线程IDmemset(threadID, 0, sizeof(pthread_t) * max);//初始化互斥锁和条件变量if (pthread_mutex_init(mutex_pool, NULL) ! 0 ||pthread_cond_init(notempty, NULL) ! 0){cout mutex or cond init fail endl;}//创建线程for (size_t i 0; i min; i){pthread_create(threadID[i], NULL, Work, this);cout create thread ID : to_string(threadID[i]) endl;}pthread_create(managerID, NULL, Manage, this);}~ThreadPool(){shutdown true;//销毁管理者进程pthread_join(managerID, NULL);//唤醒消费者for (int i 0; i live_num; i){pthread_cond_signal(notempty);}if (taskqueue){delete taskqueue;}if (threadID){delete[] threadID;}pthread_mutex_destroy(mutex_pool);pthread_cond_destroy(notempty);}//添加任务void Add_Task(TaskT task){if (shutdown)return;//添加任务,不需加锁,队列中有taskqueue-AddTask(task);//唤醒消费者pthread_cond_signal(notempty);}//获取忙线程个数int Get_Busy_Num(){int busynum 0;pthread_mutex_lock(mutex_pool);busynum busy_num;pthread_mutex_unlock(mutex_pool);return busynum;}//获取存活线程个数int Get_Live_Num(){int livenum 0;pthread_mutex_lock(mutex_pool); livenum live_num; pthread_mutex_unlock(mutex_pool); return livenum; }private://工作的线程任务函数static void* Work(void* args){ThreadPool* pool static_castThreadPool*(args);while (true){//访问任务队列加锁pthread_mutex_lock(pool-mutex_pool);//判断任务队列是否为空,空了就堵塞while (pool-taskqueue-GetTaskNum() 0 !pool-shutdown){cout thread : to_string(pthread_self()) waiting... endl;pthread_cond_wait(pool-notempty, pool-mutex_pool);//解除后 判断是否要销毁进程if (pool-exit_num 0){pool-exit_num--;if (pool-live_num pool-min_num){pool-live_num--;pthread_mutex_unlock(pool-mutex_pool);pool-Thread_Exit();}}}//判断线程池是否要关闭了if (pool-shutdown){pthread_mutex_unlock(pool-mutex_pool);pool-Thread_Exit();}//从任务队列取出任务TaskT task pool-taskqueue-TakeTask();pool-busy_num;pthread_mutex_unlock(pool-mutex_pool);cout thread : to_string(pthread_self()) start working... endl;task.function(task.args);delete task.args;task.args nullptr;//任务结束cout thread : to_string(pthread_self()) end working... endl;pthread_mutex_lock(pool-mutex_pool);pool-busy_num--;pthread_mutex_unlock(pool-mutex_pool);}return nullptr;}//管理者线程任务函数static void* Manage(void* args){ThreadPool* pool static_castThreadPool*(args);while (!pool-shutdown){//5秒检测一次sleep(5);pthread_mutex_lock(pool-mutex_pool);int livenum pool-live_num;int busynum pool-busy_num;int queuesize pool-taskqueue-GetTaskNum();pthread_mutex_unlock(pool-mutex_pool);const int NUMBER 2;//创建if (queuesize livenum livenum pool-max_num){pthread_mutex_lock(pool-mutex_pool);int num 0;for (int i 0; i pool-max_num num NUMBER pool-live_num pool-max_num ; i){if (pool-threadID[i] 0){pthread_create(pool-threadID[i], NULL, Work, pool);num;pool-live_num;}}pthread_mutex_unlock(pool-mutex_pool);}//销毁if (busynum * 2 livenum livenum pool-min_num){pthread_mutex_lock(pool-mutex_pool);pool-exit_num NUMBER;pthread_mutex_unlock(pool-mutex_pool);for (int i 0; i NUMBER; i){pthread_cond_signal(pool-notempty);}}}return nullptr;}void Thread_Exit(){pthread_t tid pthread_self();for (int i 0; i max_num; i){if (threadID[i] tid){cout thread : to_string(pthread_self()) exiting endl;threadID[i] 0;break;}}pthread_exit(NULL);}
private:pthread_mutex_t mutex_pool;pthread_cond_t notempty;pthread_t* threadID;pthread_t managerID;TaskQueueT* taskqueue;int min_num;int max_num;int busy_num;int live_num;int exit_num;bool shutdown false;};2. 测试部分
#includeThreadPool.hvoid Task_Test(void* args)
{int num *(int*)args;coutthread : pthread_self() is working number num endl;sleep(1);return;
}int main()
{//创建线程池ThreadPoolint pool(3, 10);for (int i 0; i 100; i){int* num new int(i100);pool.Add_Task(Taskint(Task_Test,num));}sleep(40);return 0;
} 以上只是基于C修改出对应于C的代码 并且以上代码存在一个问题 输出的结果有时会因为线程原因出现混乱 可以通过加锁来解决,但锁的数量超过1就容易导致死锁问题,所以暂且搁置 二、C11实现线程池
并非原创,摘于此处
1. 头文件
#pragma once
#includequeue
#includethread
#includecondition_variable
#includeatomic
#includestdexcept
#includefuture
#includevector
#includefunctionalnamespace std
{#define THREADPOOL_MAX_NUM 16class threadpool{unsigned short _initsize;using Task functionvoid();vectorthread _pool;queueTask _tasks;mutex _lock;mutex _lockGrow;condition_variable _task_cv;atomicbool _run{true};atomicint _spa_trd_num{0};public:inline threadpool(unsigned short size 4){_initsize size;Add_Thread(size);}inline ~threadpool(){_run false;_task_cv.notify_all();for (thread thread : _pool){if (thread.joinable())thread.join();}}templatetypename F,typename... Argsauto commit(F f, Args ...args) - futuredecltype(f(args...)) {if (!_run)throw runtime_error{commit auto stop};using RetType decltype(f(args...));auto task make_sharedpackaged_taskRetType()(bind(forwardF(f), forwardArgs(args)...));futureRetType future task-get_future();{lock_guardmutex lock{_lock};_tasks.emplace([task]() {(*task)(); });}if (_spa_trd_num 1 _pool.size() THREADPOOL_MAX_NUM)Add_Thread(1);_task_cv.notify_one();return future;}templatetypename Fvoid commit2(F f){if (!_run)return;{lock_guardmutex lock{_lock};_tasks.emplace(forwardF(f));}if (_spa_trd_num 1 _pool.size() THREADPOOL_MAX_NUM)Add_Thread(1);_task_cv.notify_one();}int idlCount() { return _spa_trd_num; }int thrCount() { return _pool.size(); }private:void Add_Thread(unsigned short size){if (!_run)throw runtime_error{Add_Thread stop};unique_lockmutex lockgrow{_lockGrow};for (; _pool.size() THREADPOOL_MAX_NUM size 0; --size){_pool.emplace_back([this]{while (true){Task task;{unique_lockmutex lock{_lock};_task_cv.wait(lock, [this] {return !_run || !_tasks.empty(); });if (!_run _tasks.empty())return;_spa_trd_num--;task move(_tasks.front());_tasks.pop();}task();if (_spa_trd_num 0 _pool.size() _initsize)return;{unique_lockmutex lock{_lock};_spa_trd_num;}}});{unique_lockmutex lock{_lock};_spa_trd_num;}} }};
}要使用pthread依赖库 2. 测试部分
#includeThreadPool.hpp
#includeiostreamvoid fun1(int slp)
{printf(fun1 %ld\n, std::this_thread::get_id());if (slp 0){printf(fun1 sleep %ld %ld\n, slp, std::this_thread::get_id());std::this_thread::sleep_for(std::chrono::milliseconds(slp));}
}struct gfun
{int operator()(int n){printf(gfun %ld\n, n, std::this_thread::get_id());return 42;}
};class A
{
public:static int Afun(int n 0) //函数必须是 static 的才能直接使用线程池{std::cout n Afun std::this_thread::get_id() std::endl;return n;}static std::string Bfun(int n, std::string str, char c) {std::cout n Bfun str.c_str() (int)c std::this_thread::get_id() std::endl;return str;}
};int main()
try {std::threadpool executor{ 50 };std::futurevoid ff executor.commit(fun1, 0);std::futureint fg executor.commit(gfun{}, 0);//std::futureint gg executor.commit(A::Afun, 9999); //IDE提示错误,但可以编译运行std::futurestd::string gh executor.commit(A::Bfun, 9998, mult args, 123);std::futurestd::string fh executor.commit([]()-std::string { std::cout hello, fh ! std::this_thread::get_id() std::endl; return hello,fh ret !\n; });std::this_thread::sleep_for(std::chrono::seconds(1));std::cout fg.get() fh.get().c_str() std::this_thread::get_id() std::endl;std::cout fun1,55 std::this_thread::get_id() std::endl;executor.commit(fun1, 55).get(); //调用.get()获取返回值会等待线程执行完std::threadpool pool(4);std::vector std::futureint results;for (int i 0; i 8; i){results.emplace_back(pool.commit([i] {std::cout hello i std::endl;std::this_thread::sleep_for(std::chrono::seconds(3));std::cout world i std::endl;return i * i;}));}std::this_thread::sleep_for(std::chrono::seconds(15));for (auto result : results)std::cout result.get() ;std::cout std::endl;return 0;
}
catch (std::exception e)
{std::cout some error std::this_thread::get_id() e.what() std::endl;
}测试结果