可以做旅行行程的网站,wordpress与phpcms哪个好,如何优化公司网站,市场调研报告怎么做1. Future
1.1 Future类
Future 类是异步思想的典型运用#xff0c;主要用在一些需要执行耗时任务的场景#xff0c;避免程序一直原地等待耗时任务执行完成#xff0c;执行效率太低。具体来说是这样的#xff1a;当我们执行某一耗时的任务时#xff0c;可以将这个耗时任…1. Future
1.1 Future类
Future 类是异步思想的典型运用主要用在一些需要执行耗时任务的场景避免程序一直原地等待耗时任务执行完成执行效率太低。具体来说是这样的当我们执行某一耗时的任务时可以将这个耗时任务交给一个子线程去异步执行同时我们可以干点其他事情不用傻傻等待耗时任务执行完成。等我们的事情干完后我们再通过 Future 类获取到耗时任务的执行结果。这样一来程序的执行效率就明显提高了。
这其实就是多线程中经典的 Future 模式你可以将其看作是一种设计模式核心思想是异步调用主要用在多线程领域并非 Java 语言独有。
在 Java 中Future 类只是一个泛型接口位于 java.util.concurrent 包下其中定义了 5 个方法主要包括下面这 4 个功能
取消任务判断任务是否被取消;判断任务是否已经执行完成;获取任务执行结果。
// V 代表了Future执行的任务返回值的类型
public interface FutureV {// 取消任务执行// 成功取消返回 true否则返回 falseboolean cancel(boolean mayInterruptIfRunning);// 判断任务是否被取消boolean isCancelled();// 判断任务是否已经执行完成boolean isDone();// 获取任务执行结果V get() throws InterruptedException, ExecutionException;// 指定时间内没有返回计算结果就抛出 TimeOutException 异常V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutExceptio}简单理解就是我有一个任务提交给了 Future 来处理。任务执行期间我自己可以去做任何想做的事情。并且在这期间我还可以取消任务以及获取任务的执行状态。一段时间之后我就可以 Future 那里直接取出任务执行结果。
1.2 Callable和Future有什么关系
我们可以通过 FutureTask 来理解 Callable 和 Future 之间的关系。
FutureTask 提供了 Future 接口的基本实现常用来封装 Callable 和 Runnable具有取消任务、查看任务是否执行完成以及获取任务执行结果的方法。ExecutorService.submit() 方法返回的其实就是 Future 的实现类 FutureTask 。
T FutureT submit(CallableT task);
Future? submit(Runnable task);FutureTask 不光实现了 Future接口还实现了Runnable 接口因此可以作为任务直接被线程执行。 FutureTask 有两个构造函数可传入 Callable 或者 Runnable 对象。实际上传入 Runnable 对象也会在方法内部转换为Callable 对象。
public FutureTask(CallableV callable) {if (callable null)throw new NullPointerException();this.callable callable;this.state NEW;
}
public FutureTask(Runnable runnable, V result) {// 通过适配器RunnableAdapter来将Runnable对象runnable转换成Callable对象this.callable Executors.callable(runnable, result);this.state NEW;
}FutureTask相当于对Callable 进行了封装管理着任务执行的情况存储了 Callable 的 call 方法的任务执行结果。
1.3 CompletableFuture类
Future 在实际使用过程中存在一些局限性比如不支持异步任务的编排组合、获取计算结果的 get() 方法为阻塞调用。
Java 8 才被引入CompletableFuture 类可以解决Future 的这些缺陷。CompletableFuture 除了提供了更为好用和强大的 Future 特性之外还提供了函数式编程、异步任务编排组合可以将多个异步任务串联起来组成一个完整的链式调用等能力。
下面我们来简单看看 CompletableFuture 类的定义。
public class CompletableFutureT implements FutureT, CompletionStageT {
}在之前的图中可以看到CompletableFuture 同时实现了 Future 和 CompletionStage 接口。
CompletionStage 接口描述了一个异步计算的阶段。很多计算可以分成多个阶段或步骤此时可以通过它将所有步骤组合起来形成异步计算的流水线。
CompletionStage 接口中的方法比较多CompletableFuture 的函数式能力就是这个接口赋予的。从这个接口的方法参数你就可以发现其大量使用了 Java8 引入的函数式编程。
2. AQS
2.1 AQS介绍
2.1.1 什么是AQS
AQS 的全称为 AbstractQueuedSynchronizer 翻译过来的意思就是抽象队列同步器。这个类在 java.util.concurrent.locks 包下面。
AQS 就是一个抽象类主要用来构建锁和同步器。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
}AQS 为构建锁和同步器提供了一些通用功能的实现因此使用 AQS 能简单且高效地构造出应用广泛的大量的同步器比如我们提到的 ReentrantLockSemaphore其他的诸如 ReentrantReadWriteLockSynchronousQueue等等皆是基于 AQS 的。
2.1.2 AQS的原理
AQS 核心思想是如果被请求的共享资源空闲则将当前请求资源的线程设置为有效的工作线程并且将共享资源设置为锁定状态。如果被请求的共享资源被占用那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制这个机制 AQS 是用 CLH 队列锁 实现的即将暂时获取不到锁的线程加入到队列中。
CLH(Craig,Landin,and Hagersten) 队列是一个虚拟的双向队列虚拟的双向队列即不存在队列实例仅存在结点之间的关联关系。
AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点Node来实现锁的分配。在 CLH 同步队列中一个节点表示一个线程它保存着线程的引用thread、 当前节点在队列中的状态waitStatus、前驱节点prev、后继节点next。 AQS 使用 int 成员变量 state 表示同步状态通过内置的 线程等待队列 来完成获取资源线程的排队工作。
// 共享变量使用volatile修饰保证线程可见性
private volatile int state;另外状态信息 state 可以通过 protected 类型的getState()、setState()和compareAndSetState() 进行操作。并且这几个方法都是 final 修饰的在子类中无法被重写。
//返回同步状态的当前值
protected final int getState() {return state;
}// 设置同步状态的值
protected final void setState(int newState) {state newState;
}
//原子地CAS操作将同步状态值设置为给定值update如果当前同步状态的值等于expect期望值
protected final boolean compareAndSetState(int expect, int update) {return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}以 ReentrantLock 为例state 初始值为 0表示未锁定状态。A 线程 lock() 时会调用 tryAcquire() 独占该锁并将 state1 。此后其他线程再 tryAcquire() 时就会失败直到 A 线程 unlock() 到 state0即释放锁为止其它线程才有机会获取该锁。当然释放锁之前A 线程自己是可以重复获取此锁的state 会累加这就是可重入的概念。但要注意获取多少次就要释放多少次这样才能保证 state 是能回到零态的。
再以 CountDownLatch 以例任务分为 N 个子线程去执行state 也初始化为 N注意 N 要与线程个数一致。这 N 个子线程是并行执行的每个子线程执行完后countDown() 一次state 会 CAS(Compare and Swap) 减 1。等到所有子线程都执行完后(即 state0 )会 unpark() 主调用线程然后主调用线程就会从 await() 函数返回继续后余动作。
2.2 Semaphore介绍
2.2.1 什么是Semaphore
synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源而Semaphore(信号量)可以用来控制同时访问特定资源的线程数量。
Semaphore 的使用简单我们这里假设有 N(N5) 个线程来获取 Semaphore 中的共享资源下面的代码表示同一时刻 N 个线程中只有 5 个线程能获取到共享资源其他线程都会阻塞只有获取到共享资源的线程才能执行。等到有线程释放了共享资源其他阻塞的线程才能获取到。
// 初始共享资源数量
final Semaphore semaphore new Semaphore(5);
// 获取1个许可
semaphore.acquire();
// 释放1个许可
semaphore.release();当初始的资源个数为 1 的时候Semaphore 退化为排他锁。
Semaphore 有两种模式
公平模式 调用 acquire() 方法的顺序就是获取许可证的顺序遵循 FIFO非公平模式 抢占式的。
Semaphore 通常用于那些资源有明确访问数量限制的场景比如限流仅限于单机模式实际项目中推荐使用 Redis Lua 来做限流。
2.2.2 Semaphore 的原理
Semaphore 是共享锁的一种实现它默认构造 AQS 的 state 值为 permits你可以将 permits 的值理解为许可证的数量只有拿到许可证的线程才能执行。
调用semaphore.acquire() 线程尝试获取许可证如果 state 0 的话则表示可以获取成功。如果获取成功的话使用 CAS 操作去修改 state 的值 statestate-1。如果 state0 的话则表示许可证数量不足。此时会创建一个 Node 节点加入阻塞队列挂起当前线程。
/*** 获取1个许可证*/
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
/*** 共享模式下获取许可证获取成功则返回失败则加入阻塞队列挂起线程*/
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 尝试获取许可证arg为获取许可证个数当可用许可证数减当前获取的许可证数结果小于0,则创建一个节点加入阻塞队列挂起当前线程。if (tryAcquireShared(arg) 0)doAcquireSharedInterruptibly(arg);
}调用semaphore.release(); 线程尝试释放许可证并使用 CAS 操作去修改 state 的值 statestate1。释放许可证成功之后同时会唤醒同步队列中的一个线程。被唤醒的线程会重新尝试去修改 state 的值 statestate-1 如果 state0 则获取令牌成功否则重新进入阻塞队列挂起线程。
// 释放一个许可证
public void release() {sync.releaseShared(1);
}// 释放共享锁同时会唤醒同步队列中的一个线程。
public final boolean releaseShared(int arg) {//释放共享锁if (tryReleaseShared(arg)) {//唤醒同步队列中的一个线程doReleaseShared();return true;}return false;
}2.3 CountDownLatch介绍
2.3.1 什么是CountDownLatch
CountDownLatch 允许 count 个线程阻塞在一个地方直至所有线程的任务都执行完毕。
CountDownLatch 是一次性的计数器的值只能在构造方法中初始化一次之后没有任何机制再次对其设置值当 CountDownLatch 使用完毕后它不能再次被使用。
2.3.2 CountDownLatch的原理
CountDownLatch 是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用 countDown() 方法时,其实使用了tryReleaseShared方法以 CAS 的操作来减少 state,直至 state 为 0 。当调用 await() 方法的时候如果 state 不为 0那就证明任务还没有执行完毕await() 方法就会一直阻塞也就是说 await() 方法之后的语句不会被执行。直到count 个线程调用了countDown()使 state 值被减为 0或者调用await()的线程被中断该线程才会从阻塞中被唤醒await() 方法之后的语句得到执行。
2.3.3 CountDownLatch的应用场景
CountDownLatch 的作用就是 允许 count 个线程阻塞在一个地方直至所有线程的任务都执行完毕。 使用多线程读取多个文件处理的场景 读取处理 6 个文件这 6 个任务都是没有执行顺序依赖的任务但是我们需要返回给用户的时候将这几个文件的处理的结果进行统计整理。
处理思路 为此我们定义了一个线程池和 count 为 6 的CountDownLatch对象 。使用线程池处理读取任务每一个线程处理完之后就将 count-1调用CountDownLatch对象的 await()方法直到所有文件读取完之后才会接着执行后面的逻辑。
伪代码如下
public class CountDownLatchExample1 {// 处理文件的数量private static final int threadCount 6;public static void main(String[] args) throws InterruptedException {// 创建一个具有固定线程数量的线程池对象推荐使用构造方法创建ExecutorService threadPool Executors.newFixedThreadPool(10);final CountDownLatch countDownLatch new CountDownLatch(threadCount);for (int i 0; i threadCount; i) {final int threadnum i;threadPool.execute(() - {try {//处理文件的业务操作//......} catch (InterruptedException e) {e.printStackTrace();} finally {//表示一个文件已经被完成countDownLatch.countDown();}});}countDownLatch.await();threadPool.shutdown();System.out.println(finish);}
}2.4 CyclicBarrier介绍
2.4.1 什么是CyclicBarrier
CyclicBarrier 和 CountDownLatch 非常类似它也可以实现线程间的技术等待但是它的功能比 CountDownLatch 更加复杂和强大。主要应用场景和 CountDownLatch 类似。 CountDownLatch 的实现是基于 AQS 的 CycliBarrier 是基于 ReentrantLock(ReentrantLock 也属于 AQS 同步器)和 Condition 的。 CyclicBarrier 的字面意思是可循环使用Cyclic的屏障Barrier。它要做的事情是让一组线程到达一个屏障也可以叫同步点时被阻塞直到最后一个线程到达屏障时屏障才会开门所有被屏障拦截的线程才会继续干活。
2.4.2 CyclicBarrier的原理
CyclicBarrier 内部通过一个 count 变量作为计数器count 的初始值为 parties 属性的初始化值每当一个线程到了栅栏这里了那么就将计数器减 1。如果 count 值为 0 了表示这是这一代最后一个线程到达栅栏就尝试执行我们构造方法中输入的任务。
//每次拦截的线程数
private final int parties;
//计数器
private int count;结合源码来简单看看 1、CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties)其参数表示屏障拦截的线程数量每个线程调用 await() 方法告诉 CyclicBarrier 我已经到达了屏障然后当前线程被阻塞。
public CyclicBarrier(int parties) {this(parties, null);
}public CyclicBarrier(int parties, Runnable barrierAction) {if (parties 0) throw new IllegalArgumentException();this.parties parties;this.count parties;this.barrierCommand barrierAction;
}其中parties 就代表了有拦截的线程的数量当拦截的线程数量达到这个值的时候就打开栅栏让所有线程通过。
2、当调用 CyclicBarrier 对象调用 await() 方法时实际上调用的是 dowait(false, 0L)方法。 await() 方法就像树立起一个栅栏的行为一样将线程挡住了当拦住的线程数量达到 parties 的值时栅栏才会打开线程才得以通过执行。
public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}
}dowait(false, 0L)方法源码分析如下
// 当线程数量或者请求数量达到 count 时 await 之后的方法才会被执行。上面的示例中 count 的值就为 5。private int count;/*** Main barrier code, covering the various policies.*/private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock this.lock;// 锁住lock.lock();try {final Generation g generation;if (g.broken)throw new BrokenBarrierException();// 如果线程中断了抛出异常if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}// cout减1int index --count;// 当 count 数量减为 0 之后说明最后一个线程已经到达栅栏了也就是达到了可以执行await 方法之后的条件if (index 0) { // trippedboolean ranAction false;try {final Runnable command barrierCommand;if (command ! null)command.run();ranAction true;// 将 count 重置为 parties 属性的初始化值// 唤醒之前等待的线程// 下一波执行开始nextGeneration();return 0;} finally {if (!ranAction)breakBarrier();}}// loop until tripped, broken, interrupted, or timed outfor (;;) {try {if (!timed)trip.await();else if (nanos 0L)nanos trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g generation ! g.broken) {breakBarrier();throw ie;} else {// Were about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// belong to subsequent execution.Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g ! generation)return index;if (timed nanos 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}}