百度站长收录入口,网络信息公司,wordpress积分商城,拖拽式在线网页制作工具原理说明#xff1a;
1. 线程池创建时#xff0c;指定线程池的大小thread_size。当有新的函数任务通过函数addFunction ()添加进来后#xff0c;其中一个线程执行函数。一个线程一次执行一个函数。如果函数数量大与线程池数量#xff0c;则后来的函数等待。
2. 线程池内部…原理说明
1. 线程池创建时指定线程池的大小thread_size。当有新的函数任务通过函数addFunction ()添加进来后其中一个线程执行函数。一个线程一次执行一个函数。如果函数数量大与线程池数量则后来的函数等待。
2. 线程池内部有个容器m_functions 来存储待执行的函数。函数执行后从队列中移除。
3. stopAll()函数会停止线程池。
ThreadPool.h
//ThreadPool.h
#include condition_variable
#include cstddef
#include functional
#include future
#include memory
#include mutex
#include queue
#include thread
#include vectorclass ThreadPool {
public:static ThreadPool* getInstance(size_t thread_size 1); //默认线程池大小void addFunction(std::functionvoid() task); //添加需要执行的函数~ThreadPool();void stopAll(bool immediately); //停止线程池 immediately:true立即停止 immediately:false等待当前线程函数执行完后停止。
private:ThreadPool(size_t thread_size);private:void workerThreadHandler();std::vectorstd::thread m_workers; //线程容器std::queuestd::functionvoid() m_functions; //待执行的函数容器std::mutex m_queue_mutex;std::condition_variable m_condition;bool m_stop; //线程池停止状态std::thread m_wakeTimerThread;std::mutex m_timer_mutex;
};ThreadPool.cpp
#include ThreadPool.h
#include chrono
#include memory
#include thread
#include pthread.h
#include iostream
using namespace std;ThreadPool* ThreadPool::getInstance(size_t thread_size)
{static std::mutex m_lock;static std::shared_ptrThreadPool m_instancenullptr;if (nullptr m_instance){m_instance.reset(new ThreadPool(thread_size));}return m_instance.get();
}
ThreadPool::ThreadPool(size_t thread_size) : m_stop(false){m_workers.reserve(thread_size);for (size_t i0; ithread_size; i){m_workers.emplace_back([this, i](){ //创建线程池中的线程workerThreadHandler();});}//辅助线程每隔一段时间发送一次唤醒防止线程阻塞m_wakeTimerThread std::thread([this](){ for(;!this-m_stop;){std::unique_lockstd::mutex lock(this-m_timer_mutex);std::this_thread::sleep_for(std::chrono::milliseconds(2000));pthread_testcancel();m_condition.notify_all();std::coutwake upstd::endl;}});std::cout__func__std::endl;
}//线程循环函数循环查询函数容器是否为空不为空则读取一个函数并执行。
void ThreadPool::workerThreadHandler()
{for (;!this-m_stop;){std::functionvoid() task;{std::unique_lockstd::mutex lock(this-m_queue_mutex);std::couttasks begin size:this-m_functions.size() stop:m_stopstd::endl;if (!m_stop m_functions.empty()){this-m_condition.wait(lock);}pthread_testcancel(); //作为线程的终止点if (this-m_stop){return;}if (this-m_functions.empty()){continue;}task std::move(this-m_functions.front());this-m_functions.pop();std::coutstd::this_thread::get_id() tasks end size:this-m_functions.size()std::endl;}task();std::cout__func__ end taskstd::endl;std::this_thread::sleep_for(std::chrono::milliseconds(200));}
}void ThreadPool::addFunction(std::functionvoid() task) //添加待执行的函数
{std::unique_lockstd::mutex lock(m_queue_mutex);if (m_stop){return;}m_functions.emplace(std::move(task));m_condition.notify_one();
}ThreadPool::~ThreadPool()
{{std::unique_lockstd::mutex lock(m_queue_mutex);m_stop true;}m_condition.notify_all();for (std::thread worker: m_workers){worker.join();}m_wakeTimerThread.join();std::cout__func__std::endl;
}void ThreadPool::stopAll(bool immediately) //停止线程 immediately:true立即停止
{this-m_stop true;if (immediately){for (std::thread worker: m_workers){pthread_cancel(worker.native_handle());}pthread_cancel(m_wakeTimerThread.native_handle());}
}测试程序main.cpp
#include iostream
#include chrono
#include mutex
#include ThreadPool.husing namespace std;static std::mutex m_mutex;
void ProcessFunc111()
{std::cout__func__ beginstd::endl;std::this_thread::sleep_for(std::chrono::seconds(3));std::cout__func__ endstd::endl;
}void ProcessFunc222()
{std::cout__func__ beginstd::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout__func__ endstd::endl;
}void ProcessFunc333()
{std::cout__func__ beginstd::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout__func__ endstd::endl;
}void ProcessFunc444()
{std::cout__func__ beginstd::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout__func__ endstd::endl;
}void ProcessFunc555()
{std::cout__func__ beginstd::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout__func__ endstd::endl;
}void ProcessFunc666()
{std::cout__func__ beginstd::endl;std::this_thread::sleep_for(std::chrono::seconds(3));std::cout__func__ endstd::endl;
}int main()
{ThreadPool::getInstance(5);ThreadPool::getInstance()-addFunction([](){ProcessFunc111();});ThreadPool::getInstance()-addFunction([](){ProcessFunc222();});ThreadPool::getInstance()-addFunction([](){ProcessFunc333();});ThreadPool::getInstance()-addFunction([](){ProcessFunc444();});ThreadPool::getInstance()-addFunction([](){ProcessFunc555();});ThreadPool::getInstance()-addFunction([](){ProcessFunc666();});getchar();return 0;
}执行结果
tasks begin size:0 stop:0
ThreadPooltasks begin size:
0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
140563906656000 tasks end size:2
ProcessFunc111 begin
140563898263296 tasks end size:4
ProcessFunc222 begin
140563881477888 tasks end size:3
ProcessFunc333 begin
140563743504128 tasks end size:2
ProcessFunc444 begin
140563889870592 tasks end size:1
ProcessFunc555 begin
wake up
ProcessFunc111 end
workerThreadHandler end task
tasks begin size:1 stop:0
140563906656000 tasks end size:0
ProcessFunc666 begin
ProcessFunc222 end
wake up
workerThreadHandler end task
ProcessFunc333ProcessFunc555ProcessFunc444 endendworkerThreadHandler end taskworkerThreadHandler end taskend
workerThreadHandler end task
tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
wake up
tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
ProcessFunc666 end
workerThreadHandler end task
tasks begin size:0 stop:0
wake up
tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
...测试程序2,调用线程池停止程序
#include iostream
#include chrono
#include mutex
#include ThreadPool.husing namespace std;static std::mutex m_mutex;
void ProcessFunc111()
{std::cout__func__ beginstd::endl;std::this_thread::sleep_for(std::chrono::seconds(3));std::cout__func__ endstd::endl;
}void ProcessFunc222()
{std::cout__func__ beginstd::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout__func__ endstd::endl;
}void ProcessFunc333()
{std::cout__func__ beginstd::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout__func__ endstd::endl;
}void ProcessFunc444()
{std::cout__func__ beginstd::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout__func__ endstd::endl;
}void ProcessFunc555()
{std::cout__func__ beginstd::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout__func__ endstd::endl;
}void ProcessFunc666()
{std::cout__func__ beginstd::endl;std::this_thread::sleep_for(std::chrono::seconds(3));std::cout__func__ endstd::endl;
}int main()
{ThreadPool::getInstance(5);ThreadPool::getInstance()-addFunction([](){ProcessFunc111();});ThreadPool::getInstance()-addFunction([](){ProcessFunc222();});ThreadPool::getInstance()-addFunction([](){ProcessFunc333();});ThreadPool::getInstance()-addFunction([](){ProcessFunc444();});ThreadPool::getInstance()-addFunction([](){ProcessFunc555();});ThreadPool::getInstance()-addFunction([](){ProcessFunc666();});std::this_thread::sleep_for(std::chrono::seconds(1));std::coutstop all std::endl;ThreadPool::getInstance()-stopAll(false);getchar();return 0;
}执行结果
tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:ThreadPool0 stop:0tasks begin size:0 stop:0
140190941017856 tasks end size:5
ProcessFunc111 begin
140190932625152 tasks end size:4
ProcessFunc222 begin
140190966195968 tasks end size:3
ProcessFunc333 begin
140190949410560 tasks end size:2
ProcessFunc444 begin
140190957803264 tasks end size:1
ProcessFunc555 begin
stop all
wake up
ProcessFunc111 end
workerThreadHandler end task
ProcessFunc444 end
workerThreadHandler end task
ProcessFunc333 end
workerThreadHandler end task
ProcessFunc222 end
workerThreadHandler end task
ProcessFunc555 end
workerThreadHandler end task测试程序3立即停止线程池
#include iostream
#include chrono
#include mutex
#include ThreadPool.husing namespace std;static std::mutex m_mutex;
void ProcessFunc111()
{std::cout__func__ beginstd::endl;std::this_thread::sleep_for(std::chrono::seconds(3));std::cout__func__ endstd::endl;
}void ProcessFunc222()
{std::cout__func__ beginstd::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout__func__ endstd::endl;
}void ProcessFunc333()
{std::cout__func__ beginstd::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout__func__ endstd::endl;
}void ProcessFunc444()
{std::cout__func__ beginstd::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout__func__ endstd::endl;
}void ProcessFunc555()
{std::cout__func__ beginstd::endl;std::this_thread::sleep_for(std::chrono::seconds(4));std::cout__func__ endstd::endl;
}void ProcessFunc666()
{std::cout__func__ beginstd::endl;std::this_thread::sleep_for(std::chrono::seconds(3));std::cout__func__ endstd::endl;
}int main()
{ThreadPool::getInstance(5);ThreadPool::getInstance()-addFunction([](){ProcessFunc111();});ThreadPool::getInstance()-addFunction([](){ProcessFunc222();});ThreadPool::getInstance()-addFunction([](){ProcessFunc333();});ThreadPool::getInstance()-addFunction([](){ProcessFunc444();});ThreadPool::getInstance()-addFunction([](){ProcessFunc555();});ThreadPool::getInstance()-addFunction([](){ProcessFunc666();});std::this_thread::sleep_for(std::chrono::seconds(1));std::coutstop all std::endl;ThreadPool::getInstance()-stopAll(true);getchar();return 0;
}执行结果
tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
tasks begin size:0 stop:0
ThreadPool
139831215929088 tasks end size:5
ProcessFunc111 begin
tasks begin size:5 stop:0
139831199143680 tasks end size:4
ProcessFunc222 begin
139831224321792 tasks end size:3
ProcessFunc333 begin
139831207536384 tasks end size:2
ProcessFunc444 begin
139831232714496 tasks end size:1
ProcessFunc555 begin
stop all