网站建设发展状况,专业做网站推广,公司网络维护服务方案,做360网站官网还是百度知道上节回顾
在上一节当中#xff0c;已经实现了一个线程池#xff0c;在本节当中#xff0c;我们需要添加拒绝策略。这里使用到了策略模式的设计模式#xff0c;因为拒绝策略是多种的#xff0c;我们需要将这个权利下放给调用者#xff08;由调用者来指定我要采取哪种策略…上节回顾
在上一节当中已经实现了一个线程池在本节当中我们需要添加拒绝策略。这里使用到了策略模式的设计模式因为拒绝策略是多种的我们需要将这个权利下放给调用者由调用者来指定我要采取哪种策略而线程池只需要调用拒绝的接口即可。
步骤
1定义拒绝策略接口 2在线程池中加入拒绝策略参数 3自行调用测试
1.定义接口类
FunctionalInterface
interface RejectPolicyT{//注意传递参数void reject(BlockQueueT queue,Runnable task);
}2.线程池中添加接口以及调用方法
Slf4j
class ThreadPool {//任务队列private BlockQueueRunnable taskQueue;//线程集合 我们需要对线程做一个包装private HashSetWorker workers new HashSet();//核心线程数量private long coreSize;//超时时间private long timeout;//时间单位private TimeUnit timeUnit;//自定义拒绝策略private RejectPolicyRunnable rejectPolicy;public ThreadPool(int queueCapacity,long coreSize,long timeout,TimeUnit timeUnit){this.taskQueue new BlockQueue(queueCapacity);this.coreSize coreSize;this.timeout timeout;this.timeUnit timeUnit;this.rejectPolicy (queue, task) - {throw new RuntimeException();};}public ThreadPool(int queueCapacity,long coreSize,long timeout,TimeUnit timeUnit,RejectPolicyRunnable rejectPolicy){taskQueue new BlockQueue(queueCapacity);this.coreSize coreSize;this.timeout timeout;this.timeUnit timeUnit;this.rejectPolicy rejectPolicy;}//执行任务public void execute(Runnable task){//当任务数量尚未超过coreSizesynchronized (workers){if (workers.size() coreSize){log.info(创建工作线程{},task);Worker worker new Worker(task);workers.add(worker);worker.start();}else{log.info(加入到任务队列{},task);//有可能会阻塞在这里 进而将主线程阻塞掉//taskQueue.put(task);//这里会有很多种策略自定义策略//策略模式操作抽象成接口实现代码是传过来不会写死taskQueue.tryPut(rejectPolicy,task);//rejectPolicy.reject(taskQueue,task);}}}class Worker extends Thread{private Runnable task;public Worker(Runnable task){this.task task;}Overridepublic void run() {while (task ! null || (task taskQueue.poll(timeout,timeUnit)) ! null){try {log.info(正在执行...{},task);//执行任务task.run();}catch (Exception e){System.out.println(e.getMessage());}finally {//不要忘记这一步task null;}}synchronized (workers){log.info(worker被移除{},this);workers.remove(this);}}}
}3.main测试
Slf4j
public class TestPool {//阻塞队列是平衡生产者和消费者之间的中介//任务数量超过任务队列的情况public static void main(String[] args) {ThreadPool threadPool new ThreadPool(10, 2, 1000, TimeUnit.MICROSECONDS, (queue, task) - {//1.死等queue.put(task);//2.超时等待queue.offer(task, 1500, TimeUnit.MICROSECONDS);//3.调用者自己放弃// log.debug(放弃{},task);//4.调用者抛异常//throw new RuntimeException(task执行失败 task);//5.调用者自己执行task.run();});for (int i 0; i 20; i) {int j i;//主线程可能会在这里阻塞threadPool.execute(() - {try {Thread.sleep(30000);} catch (InterruptedException e) {throw new RuntimeException(e);}TestPool.log.debug({}, j);});}}
}