专做hip hop音乐的网站,深圳app开发公司哪家服务好,大网站如何优化,购买保险的网站自己手动写一个线程池的必要条件需要先了解我们使用的线程池的功能。为什么会有线程池#xff1f;这是为了减少线程创建和销毁的开销。复用线程的目的。为了达到这个目的。预计方案是#xff1a;需要一个存放任务的队列#xff0c;主线程相当于生产者#xff0c;在这个队列…自己手动写一个线程池的必要条件需要先了解我们使用的线程池的功能。为什么会有线程池这是为了减少线程创建和销毁的开销。复用线程的目的。为了达到这个目的。预计方案是需要一个存放任务的队列主线程相当于生产者在这个队列里面push任务启动几个核心线程一直消费这个队列然后执行其中的任务。
1、线程池的几个重要参数
1corePoolSize 核心线程数 含义是一直存活的线程数
2maxPoolSize 最大线程数 含义是最大运行的线程个数
3keepaliveTime 存活的时间 超过核心线程数的线程存活时间
4unit 单位
5blockingQueue 阻塞队列 放置任务使用
6threadFactory 线程工厂
7拒绝策略abort策略表示拒绝任务但是会抛异常discard策略表示拒绝任务不会抛异常。CallerRunPolicy策略表示会使用调用者去执行任务。DiscardoldPolicy策略表示抛弃老的任务将新的任务添加进队列
上述几种参数决定了线程池要对外暴露什么样的接口精选1-5必要参数手动写一个线程池。
2、代码逻辑
我这边编写的步骤是
1、先将参数定义好构造方法定义出来除了上述必要的1-5的参数外还需要一个消费者工作集合。为什么要保存成集合呢主要用于在取消的时候遍历其集合然后进行中断。
2、编写submit方法逻辑首先参数肯定是Runable任务判断逻辑
1当运行的工作线程小于核心线程则直接启动一个工作线程然后加入线程集合中。
2当1不成立后那就需要加入阻塞队列等待了如果阻塞队列是数组类型阻塞队列这时有范围限制如果不超过则加入队列。
3当阻塞队列超了则查看当前线程个数是否小于最大线程个数。如果不超过则启动一个工作线程消费其任务并且加入其工作集合中
4如果超过最大线程数实际上走拒绝策略逻辑这里我们就直接报错。
3、上述工作线程内部怎么消费呢因为是阻塞队列自然就是循环队列中取任务然后执行。而这里的取任务分成了poll和take。这是为什么从上面步骤可知核心线程数超过之后也会存在启动工作线程的情况那这些线程有一个保活时间时间一到只要此线程空闲线程则会跳出循环结束执行。而这里阻塞队列的poll可以实现上述逻辑。take则在没有任务时就阻塞。
下面是按照其上述逻辑的代码大家可以参考。
package org.example.Thread1;import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;public class SelfThreadPoolExecutor {//一直存活的核心线程数private int corePoolSize;//运行存在的最大的线程数private int maxPoolSize;private AtomicInteger status new AtomicInteger();private AtomicInteger workCount new AtomicInteger();private final static Integer RUNNING 0;private final static Integer STOP 1;private int keepAliveTime;private BlockingQueueRunnable blockingQueue;private HashSetWorker hashSet new HashSet();private ReentrantLock mainLock new ReentrantLock();public SelfThreadPoolExecutor(int corePoolSize, int maxPoolSize, int keepAliveTime, BlockingQueueRunnable blockingQueue) {this.corePoolSize corePoolSize;this.maxPoolSize maxPoolSize;this.keepAliveTime keepAliveTime;this.blockingQueue blockingQueue;}public void submit(Runnable runnable) {if (status.get() STOP) {throw new RuntimeException(不能添加新的任务了);}if (workCount.get() corePoolSize addWork(runnable, true)) {return;}if (blockingQueue.offer(runnable)) {return;}if (workCount.get() maxPoolSize addWork(runnable, false)) {return;}throw new RuntimeException(拒绝此任务);}public boolean addWork(Runnable runnable, boolean core) {if (status.get() STOP) {return false;}while (true) {if (workCount.get() (core ? corePoolSize : maxPoolSize)) {return false;}boolean inc workCount.compareAndSet(workCount.get(), workCount.get()1);if (!inc) {continue;}break;}mainLock.lock();try {Worker worker new Worker(runnable);worker.thread.start();hashSet.add(worker);} finally {while (true) {boolean inc workCount.compareAndSet(workCount.get(), workCount.get()-1);if (!inc) {continue;}break;}mainLock.unlock();}return true;}public void shutdown() {mainLock.lock();try {while (true) {if (status.get() STOP) {break;}boolean stop status.compareAndSet(status.get(), STOP);if (stop) {break;}}for (Worker worker : hashSet) {if (!worker.thread.isInterrupted()) {worker.thread.interrupt();}}}finally {mainLock.unlock();}}private final class Worker implements Runnable{Runnable firstTask;final Thread thread;public Worker(Runnable firstTask) {this.firstTask firstTask;thread new Thread(this);}Overridepublic void run() {runWork(this);}public void runWork(Worker w) {Thread wt w.thread;Runnable task w.firstTask;w.firstTask null;try {while (task ! null || (task getTask()) ! null) {try {if (wt.isInterrupted()) {break;}if (status.get() STOP) {break;}task.run();} finally {task null;}}} finally {mainLock.lock();try {hashSet.remove(w);} finally {mainLock.unlock();}}}public Runnable getTask() {boolean timedOut false;while (true) {try {if (status.get() STOP) {return null;}if (timedOut) {return null;}Runnable task null;if (workCount.get() corePoolSize) {task blockingQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);} else {task blockingQueue.take();}if (task ! null) {return task;}timedOut true;} catch (InterruptedException e) {timedOut true;}}}}
}