显示官网字样的网站怎么做,企业宣传片报价,做网站封面素材图,济南网站建设jnjy81、简介
java.util.concurrent 是 Java 并发编程的核心包#xff0c;提供了丰富的工具和框架来支持多线程编程、并发任务执行、线程安全集合、同步机制等。
2、线程池Thread Pool
线程池是并发编程中最重要的工具之一#xff0c;用于管理和复用线程#xff0c;避免频繁创…1、简介
java.util.concurrent 是 Java 并发编程的核心包提供了丰富的工具和框架来支持多线程编程、并发任务执行、线程安全集合、同步机制等。
2、线程池Thread Pool
线程池是并发编程中最重要的工具之一用于管理和复用线程避免频繁创建和销毁线程的开销。
最顶层接口Executor核心接口ExecutorService
2.1、主要类
工厂类Executors
工厂类Executors提供了创建线程池的静态方法。 newFixedThreadPool(int nThreads)创建固定大小的线程池。 newCachedThreadPool()创建可缓存的线程池线程数根据需要动态调整。 newSingleThreadExecutor()创建单线程的线程池。 newScheduledThreadPool(int corePoolSize)创建支持定时任务的线程池。
package com.mqtt.mqttproject.test;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** Author : Gridsum* Description :*/
public class ThreadPoolExample {public static void main(String[] args) {ExecutorService executor Executors.newFixedThreadPool(2);for (int i 0; i 5; i){executor.submit(()-{System.out.println(任务执行 Thread.currentThread().getName());});}executor.shutdown(); //关闭线程池}
}核心接口ExecutorService
ExecutorService接口提供了提供了线程池的管理方法。 ExecutorService接口常用实现类
2.2、ThreadPoolExecutor public ThreadPoolExecutor(int corePoolSize, //核心线程数int maximumPoolSize, //最大线程数long keepAliveTime, //线程空闲时间TimeUnit unit, //时间单位BlockingQueueRunnable workQueue, //队列ThreadFactory threadFactory, //线程工厂RejectedExecutionHandler handler//拒绝策略) {if (corePoolSize 0 ||maximumPoolSize 0 ||maximumPoolSize corePoolSize ||keepAliveTime 0)throw new IllegalArgumentException();if (workQueue null || threadFactory null || handler null)throw new NullPointerException();this.corePoolSize corePoolSize;this.maximumPoolSize maximumPoolSize;this.workQueue workQueue;this.keepAliveTime unit.toNanos(keepAliveTime);this.threadFactory threadFactory;this.handler handler;}
2.2.1、线程池执行顺序
当线程数小于核心线程数时创建线程。 当线程数大于等于核心线程数且任务队列未满时将任务放入任务队列。 当线程数大于等于核心线程数且任务队列已满若线程数小于最大线程数创建线程。 若线程数等于最大线程数则执行拒绝策略
2.2.2、workQueue
1无界队列 队列大小无限制常用的为无界的LinkedBlockingQueue使用该队列作为阻塞队列时要尤其当心当任务耗时较长时可能会导致大量新任务在队列中堆积最终导致OOM。阅读代码发现Executors.newFixedThreadPool 采用就是 LinkedBlockingQueue而博主踩到的就是这个坑当QPS很高发送数据很大大量的任务被添加到这个无界LinkedBlockingQueue 中导致cpu和内存飙升服务器挂掉。 当然这种队列maximumPoolSize 的值也就无效了。当每个任务完全独立于其他任务即任务执行互不影响时适合于使用无界队列例如在 Web 页服务器中。这种排队可用于处理瞬态突发请求当命令以超过队列所能处理的平均数连续到达时此策略允许无界线程具有增长的可能性。 2有界队列 当使用有限的 maximumPoolSizes 时有界队列有助于防止资源耗尽但是可能较难调整和控制。常用的有两类一类是遵循FIFO原则的队列如ArrayBlockingQueue另一类是优先级队列如PriorityBlockingQueue。PriorityBlockingQueue中的优先级由任务的Comparator决定。 使用有界队列时队列大小需和线程池大小互相配合线程池较小有界队列较大时可减少内存消耗降低cpu使用率和上下文切换但是可能会限制系统吞吐量。 3同步移交队列 如果不希望任务在队列中等待而是希望将任务直接移交给工作线程可使用SynchronousQueue作为等待队列。SynchronousQueue不是一个真正的队列而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中必须有另一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列。
2.2.3、threadFactory
线程工厂用来创建线程 为了统一在创建线程时设置一些参数如是否守护线程线程一些特性等如优先级。通过这个TreadFactory创建出来的线程能保证有相同的特性。 它是一个接口类而且方法只有一个就是创建一个线程。如果没有另外说明则在同一个ThreadGroup 中一律使用Executors.defaultThreadFactory() 创建线程并且这些线程具有相同的NORM_PRIORITY 优先级和非守护进程状态。 通过提供不同的 ThreadFactory可以改变线程的名称、线程组、优先级、守护进程状态等等。 如果从newThread 返回 null 时ThreadFactory 未能创建线程则执行程序将继续运行但不能执行任何任务。
2.2.4、ThreadPoolExecutor执行逻辑
创建ThreadPoolExecutor实例对象-execute()/submit()-addWorker-runWorker-getTask package com.mqtt.mqttproject.test;import java.util.concurrent.*;/*** Author : Gridsum* Description :*/
public class ThreadPoolExample {public static void main(String[] args) {LinkedBlockingDeque linkedBlockingDeque new LinkedBlockingDeque(10);ThreadFactory threadFactory Executors.defaultThreadFactory();RejectedExecutionHandler rejectedExecutionHandler new ThreadPoolExecutor.AbortPolicy();ExecutorService executor new ThreadPoolExecutor(5,10,1000,TimeUnit.MILLISECONDS,linkedBlockingDeque, threadFactory,rejectedExecutionHandler);//如果不需要获取任务执行结果调用execute方法executor.execute(()-{System.out.println(开始执行任务 Thread.currentThread().getName());});//如果需要返回任务执行结果调用submit方法FutureString future executor.submit(()-{Thread.sleep(3000);System.out.println(开始执行任务 Thread.currentThread().getName());return 执行成功;});//添加回调addCallback(future, result - {System.out.println(回调结果 result);});System.out.println(执行其他任务不需要等待回调结果);executor.shutdown();}public interface Callback{void onComplate(String result);}public static void addCallback(FutureString future, Callback callback){new Thread(()-{try {callback.onComplate(future.get());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}).start();}}执行结果 ThreadPoolExecutor核心维护了一个HashSetWorker类型的 workers用来存放线程维护了BlockingQueueRunnable workQueue 队列存放需要执行的任务task启动workers中的线程执行workQueue队列中task。
public void execute(Runnable command) {if (command null)throw new NullPointerException();int c ctl.get();//workerCountOf获取线程池的当前线程数小于corePoolSize执行addWorker创建新线程执行command任务if (workerCountOf(c) corePoolSize) {if (addWorker(command, true))return;c ctl.get();}if (isRunning(c) workQueue.offer(command)) {int recheck ctl.get();if (! isRunning(recheck) remove(command))reject(command);else if (workerCountOf(recheck) 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (int c ctl.get();;) {// Check if queue empty only if necessary.if (runStateAtLeast(c, SHUTDOWN) (runStateAtLeast(c, STOP)|| firstTask ! null|| workQueue.isEmpty()))return false;for (;;) {if (workerCountOf(c) ((core ? corePoolSize : maximumPoolSize) COUNT_MASK))return false;if (compareAndIncrementWorkerCount(c))break retry;c ctl.get(); // Re-read ctlif (runStateAtLeast(c, SHUTDOWN))continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted false;boolean workerAdded false;Worker w null;try {w new Worker(firstTask);final Thread t w.thread;if (t ! null) {final ReentrantLock mainLock this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c ctl.get();if (isRunning(c) ||(runStateLessThan(c, STOP) firstTask null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s workers.size();if (s largestPoolSize)largestPoolSize s;workerAdded true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
Worker类的runworker方法 private final class Worker extends AbstractQueuedSynchronizer implements Runnable{Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask firstTask;this.thread getThreadFactory().newThread(this); // 创建线程}/** Delegates main run loop to outer runWorker */public void run() {runWorker(this);}// ...}runWorker方法是线程池的核心: 线程启动之后通过unlock方法释放锁设置AQS的state为0表示运行可中断 Worker执行firstTask或从workQueue中获取任务: 进行加锁操作保证thread不被其他线程中断(除非线程池被中断) 检查线程池状态倘若线程池处于中断状态当前线程将中断。 执行beforeExecute 执行任务的run方法 执行afterExecute方法 解锁操作
通过getTask方法从阻塞队列中获取等待的任务如果队列中没有任务getTask方法会被阻塞并挂起不会占用cpu资源
final void runWorker(Worker w) {Thread wt Thread.currentThread();Runnable task w.firstTask;w.firstTask null;w.unlock(); // allow interruptsboolean completedAbruptly true;try {while (task ! null || (task getTask()) ! null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() runStateAtLeast(ctl.get(), STOP))) !wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);try {task.run();afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task null;w.completedTasks;w.unlock();}}completedAbruptly false;} finally {processWorkerExit(w, completedAbruptly);}}
getTask方法
private Runnable getTask() {boolean timedOut false; // Did the last poll() time out?for (;;) {int c ctl.get();// Check if queue empty only if necessary.if (runStateAtLeast(c, SHUTDOWN) (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc workerCountOf(c);// Are workers subject to culling?boolean timed allowCoreThreadTimeOut || wc corePoolSize;if ((wc maximumPoolSize || (timed timedOut)) (wc 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r ! null)return r;timedOut true;} catch (InterruptedException retry) {timedOut false;}}}
3、Future和Callable
Future 和 Callable 用于表示异步任务的结果。
主要类
CallableV类似于 Runnable但可以返回结果或抛出异常
FutureV表示异步计算的结果提供了检查任务是否完成、获取结果、取消任务等方法。
FutureTaskVFuture接口的实现类可以包装 Callable 或 Runnable
详细可以查看上一篇文章并发编程Future和Callback使用
4、并发集合
java.util.concurrent 提供了线程安全的集合类适用于多线程环境。
4.1、ConcurrentHashMap
ConcurrentHashMap主要是在多线程情况下提升了性能。
详细查看另一篇内容HashMap到ConcurrentHashMap原理
5、同步工具
提供了一些高级同步工具用于控制线程之间的协作。
CountDownLatch允许一个或多个线程等待其他线程完成操作。CyclicBarrier让一组线程互相等待直到所有线程都到达某个屏障点。Semaphore控制同时访问某个资源的线程数量。Phaser更灵活的同步工具支持分阶段的线程同步。
import java.util.concurrent.CountDownLatch;public class CountDownLatchExample {public static void main(String[] args) throws InterruptedException {CountDownLatch latch new CountDownLatch(3);for (int i 0; i 3; i) {new Thread(() - {System.out.println(子线程执行: Thread.currentThread().getName());latch.countDown();}).start();}latch.await(); // 主线程等待所有子线程完成System.out.println(所有子线程执行完毕);}
} 6、原子变量
提供了一组原子操作类用于实现无锁的线程安全编程。
AtomicInteger支持原子操作的整数。AtomicLong支持原子操作的长整数。AtomicReferenceV支持原子操作的引用类型。
import java.util.concurrent.atomic.AtomicInteger;public class AtomicExample {public static void main(String[] args) {AtomicInteger counter new AtomicInteger(0);counter.incrementAndGet(); // 原子递增System.out.println(Counter: counter.get());}
}
7、锁Locks
提供了更灵活的锁机制替代传统的 synchronized 关键字。
ReentrantLock可重入锁支持公平锁和非公平锁。ReadWriteLock读写锁允许多个读线程同时访问但写线程独占。实现类ReentrantReadWriteLock
import java.util.concurrent.locks.ReentrantLock;public class LockExample {public static void main(String[] args) {ReentrantLock lock new ReentrantLock();lock.lock(); // 加锁try {System.out.println(锁保护的代码块);} finally {lock.unlock(); // 释放锁}}
}
8、定时任务
支持定时或周期性任务的执行。
ScheduledExecutorService支持定时任务的线程池接口实现类ScheduledThreadPoolExecutor
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class ScheduledTaskExample {public static void main(String[] args) {ScheduledExecutorService scheduler Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() - {System.out.println(定时任务执行: System.currentTimeMillis());}, 0, 1, TimeUnit.SECONDS); // 初始延迟 0 秒每隔 1 秒执行一次}
}
9、Fork/Join 框架
用于并行执行任务的框架适用于分治算法。
ForkJoinPool用于执行 ForkJoinTask 的线程池。RecursiveTaskV用于有返回值的任务。RecursiveAction用于无返回值的任务。
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;public class ForkJoinExample {public static void main(String[] args) {ForkJoinPool pool new ForkJoinPool();int result pool.invoke(new FibonacciTask(10));System.out.println(Fibonacci(10) result);}static class FibonacciTask extends RecursiveTaskInteger {private final int n;FibonacciTask(int n) {this.n n;}Overrideprotected Integer compute() {if (n 1) return n;FibonacciTask task1 new FibonacciTask(n - 1);task1.fork();FibonacciTask task2 new FibonacciTask(n - 2);return task2.compute() task1.join();}}
}