邵阳网站开发,wordpress怎么念,在网站里面如何做支付工具,棋牌软件开发工作室目录 FutureTask详解1、FutureTask简介2、FutureTask内部结构继承结构类属性构造方法内部类WaitNode 3、Runnable、Callable、Future、RunnableFuture接口①、Runnable接口②、Callable接口③、Future接口④、RunnableFuture接口总结对比 4、FutureTask的使用示例普通Thread使用… 目录 FutureTask详解1、FutureTask简介2、FutureTask内部结构继承结构类属性构造方法内部类WaitNode 3、Runnable、Callable、Future、RunnableFuture接口①、Runnable接口②、Callable接口③、Future接口④、RunnableFuture接口总结对比 4、FutureTask的使用示例普通Thread使用线程池使用 5、FutureTask的核心方法run()方法get()方法cancel()方法 6、补充知识点Treiber 栈Treiber 栈的核心思想Treiber 栈的特点Treiber 栈的Java简易实现Treiber 栈在FutureTask中的应用 7、补充FutureTask子类ScheduledFutureTask总结 FutureTask详解
1、FutureTask简介
FutureTask主要用于异步任务的执行和结果获取。其最重要的特性就是可以被提交到线程池中执行同时也可以用来获取执行结果或检查任务的状态。
2、FutureTask内部结构
继承结构
public class FutureTaskV implements RunnableFutureV {}FutureTask 实现了 RunnableFuture 接口而 RunnableFuture 又继承 Runnable 和 Future 。 所以FutureTask 可以被提交到线程池中执行同时也可以用来获取执行结果或检查任务的状态。
类属性
// FutureTask 的状态字段用于标记任务的不同状态
private volatile int state; // volatile 确保线程间对该字段的可见性
private static final int NEW 0; // 任务刚创建尚未开始执行
private static final int COMPLETING 1; // 任务正在完成即运行中或正在设置结果
private static final int NORMAL 2; // 任务正常完成
private static final int EXCEPTIONAL 3; // 任务完成时抛出了异常
private static final int CANCELLED 4; // 任务已被取消
private static final int INTERRUPTING 5; // 任务中断中
private static final int INTERRUPTED 6; // 任务已中断/** 包含要执行的 callable 对象在运行后会被置为 null */
private CallableV callable;/** 任务的结果或者在执行时抛出的异常使用 Object 类型来存储结果或异常 */
private Object outcome; // non-volatile, 由 state 字段的读写保护/** 正在运行任务的线程在 run() 方法中使用 CAS 操作 */
private volatile Thread runner;/** 用于存储等待任务完成的线程的 Treiber 栈 */
// 可以理解成是一个 无锁且线程安全的栈
private volatile WaitNode waiters;构造方法
①、FutureTask(CallableV callable)
public FutureTask(CallableV callable) {if (callable null) // 检查 callable 是否为空throw new NullPointerException(); // 如果为空抛出空指针异常this.callable callable; // 保存 callable 对象this.state NEW; // 初始化状态为 NEW表示任务刚创建
}
②、FutureTask(Runnable runnable, V result)
public FutureTask(Runnable runnable, V result) {// 使用 Executors.callable() 方法将 Runnable 转换为 Callable并指定结果this.callable Executors.callable(runnable, result);this.state NEW; // 初始化状态为 NEW表示任务刚创建
}Executors.callable方法
public static T CallableT callable(Runnable task, T result) {if (task null)throw new NullPointerException();return new RunnableAdapterT(task, result);}// 利用RunnableAdapter 适配器类 把Runnable 转换成 Callable
static final class RunnableAdapterT implements CallableT {final Runnable task;final T result;RunnableAdapter(Runnable task, T result) {this.task task;this.result result;}public T call() {task.run();return result;}}内部类WaitNode
WaitNode 是 FutureTask 类中的一个静态内部类用于支持等待任务完成的线程管理。 WaitNode 主要作为一个节点在 waiters 栈中构建链表以便能够高效地管理等待线程。
static final class WaitNode {// 用于存储等待的线程volatile Thread thread;// 用于指向下一个等待节点形成链表结构volatile WaitNode next;// 构造函数初始化当前节点的线程为当前线程WaitNode() { thread Thread.currentThread(); }
}
3、Runnable、Callable、Future、RunnableFuture接口
①、Runnable接口
用于实现线程要执行的任务。可以将 Runnable 对象传递给 Thread 构造函数或线程池的 execute() 方法。
FunctionalInterface
public interface Runnable {void run();
}Runnable 是一个函数式接口定义了一个 run() 方法。 run() 方法没有返回值也没有抛出检查型异常checked exceptions。 主要用于线程执行的任务无需处理任务结果。
②、Callable接口
用于执行有返回值的任务。通常与 Future 配合使用可以在任务完成后获取结果。
FunctionalInterface
public interface CallableV {V call() throws Exception;
}Callable 是一个泛型接口定义了一个 call() 方法。 call() 方法可以返回一个结果并且可以抛出检查型异常。 泛型参数 V 表示任务执行后的结果类型。
③、Future接口
用于管理异步计算任务检查任务状态获取计算结果或处理异常。
public interface FutureV {// 尝试取消任务boolean cancel(boolean mayInterruptIfRunning);// 检查任务是否被取消boolean isCancelled();// 检查任务是否完成boolean isDone();// 获取任务的结果若任务尚未完成则阻塞调用get方法的线程V get() throws InterruptedException, ExecutionException;// 带超时的获取结果方法V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}④、RunnableFuture接口
RunnableFuture 主要用于 FutureTask 类的实现。它允许任务以 Runnable 方式提交到线程池中执行并通过 Future 接口管理结果和状态。
public interface RunnableFutureV extends Runnable, FutureV {
}
RunnableFuture 接口继承了 Runnable 和 Future 接口。 结合了这两个接口的功能即可以像 Runnable 一样执行任务也可以像 Future 一样获取任务的结果或取消任务。
总结对比
接口功能描述主要方法/特点适用场景Runnable定义要执行的任务无返回值run()用于执行不需要结果的任务如线程的任务Callable定义要执行的任务有返回值并可能抛出异常call()用于执行有返回值的任务Future管理异步计算的结果检查状态和处理异常get(), cancel(), isDone(), isCancelled()用于管理异步任务的结果和状态RunnableFuture结合 Runnable 和 Future 的功能可以执行任务并管理结果继承 Runnable 和 Future用于需要同时支持 Runnable 和 Future 功能的场景如 FutureTask
4、FutureTask的使用示例
普通Thread使用
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;public class TestA {public static void main(String[] args) {// 创建一个 Callable 对象CallableInteger callable () - {Thread.sleep(2000); // 模拟长时间运行的任务return 123;};// 用 Callable 对象创建 FutureTaskFutureTaskInteger futureTask new FutureTask(callable);// 用普通 Thread 执行 FutureTaskThread thread new Thread(futureTask);thread.start();// 执行其他任务System.out.println(主线程继续执行其他任务...);try {// 获取 FutureTask 的结果Integer result futureTask.get(); // 这个方法会阻塞直到结果可用System.out.println(获取 FutureTask 的结果: result);} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}
}
执行结果
主线程继续执行其他任务...
获取 FutureTask 的结果: 123线程池使用
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;public class TestA {private static final ThreadPoolExecutor threadPoolExecutor new ThreadPoolExecutor(4, // 核心线程数 8, // 最大线程数0, // 非核心线程的存活时间TimeUnit.SECONDS, // 存活时间单位new LinkedBlockingDeque(), // 存放任务的 阻塞队列new MyThreadFactory(DogEatBones), // 创建线程的 工厂 new ThreadPoolExecutor.AbortPolicy() // 线程池的拒绝策略);public static void main(String[] args) throws Exception {String dog1 秀逗;// 使用线程池执行FutureString future1 threadPoolExecutor.submit(() - eat(dog1));String result future1.get();System.out.println(FutureTask执行结果 result);// 关闭线程池threadPoolExecutor.shutdown();}public static String eat(String name) {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return name 说骨头真好吃;}}class MyThreadFactory implements ThreadFactory {private final String name;private final AtomicInteger threadNum new AtomicInteger();// 设置线程名称public MyThreadFactory(String name) {this.name name;}Overridepublic Thread newThread(Runnable r) {Thread thread new Thread(r);// 设置线程名称和编号thread.setName(name [# threadNum.incrementAndGet() ]);return thread;}
}
执行结果:
FutureTask执行结果秀逗说骨头真好吃5、FutureTask的核心方法
下面源码基于JDK8
run()方法
public void run() {// 检查任务状态是否为 NEW且当前线程是否能成功设置为 runnerif (state ! NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {CallableV c callable;// 如果 callable 不为 null 且状态仍为 NEW则执行 callableif (c ! null state NEW) {V result;boolean ran;try {result c.call(); // 调用 callable并存储结果ran true;} catch (Throwable ex) {result null;ran false;setException(ex); // 如果发生异常设置异常}if (ran)set(result); // 如果 callable 成功执行设置结果}} finally {// 确保 runner 在 run() 完成后设为 null以防止多次调用runner null;// 在将 runner 设为 null 后重新读取状态以处理可能的中断int s state;if (s INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}protected void set(V v) {// 使用 CAS 原子性地将状态从 NEW 更新为 COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome v; // 存储结果UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 设置为最终状态finishCompletion(); // 完成任务}
}
private void finishCompletion() {// 遍历所有等待线程for (WaitNode q; (q waiters) ! null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {Thread t q.thread;if (t ! null) {q.thread null;LockSupport.unpark(t); // 唤醒等待的线程}WaitNode next q.next;if (next null)break;q.next null; // 解除链表节点连接帮助垃圾回收q next;}break;}}done(); // 任务完成后执行的操作callable null; // 释放 callable 对象以减少内存占用
}
总结 run() 方法 该方法用于执行任务。 如果任务的状态是 NEWrunner 为空则将当前线程设置为 runner 并执行 callable。 如果 callable.call() 执行成功则将结果设置到 FutureTask 中 如果执行失败则捕获异常并设置异常。 无论任务成功还是失败最后都要将 runner 设为 null并根据状态处理可能的中断。
set(V v) 方法 该方法用于设置任务的结果。 如果任务的状态是 NEW则将状态更新为 COMPLETING 并存储结果。然后将状态设置为 NORMAL 表示任务完成并调用 finishCompletion() 方法完成任务的后续处理。
finishCompletion() 方法 该方法用于处理任务完成后的操作。首先唤醒所有等待的线程然后执行 done() 方法以进行额外的完成操作。最后将 callable 设为 null 以减少内存占用。
get()方法
public V get() throws InterruptedException, ExecutionException {int s state;// 如果状态小于等于 COMPLETING等待任务完成if (s COMPLETING)s awaitDone(false, 0L);return report(s); // 根据状态报告结果
}private int awaitDone(boolean timed, long nanos) throws InterruptedException {final long deadline timed ? System.nanoTime() nanos : 0L;WaitNode q null;boolean queued false;for (;;) {// 如果当前线程被中断移除等待者并抛出 InterruptedExceptionif (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s state;// 如果任务已经完成返回状态if (s COMPLETING) {if (q ! null)q.thread null;return s;}// 如果任务正在完成中当前线程主动让出 CPUelse if (s COMPLETING)Thread.yield();else if (q null)q new WaitNode(); // 创建新的等待节点else if (!queued)queued UNSAFE.compareAndSwapObject(this, waitersOffset,q.next waiters, q); // 将当前等待节点加入Treiber栈else if (timed) {nanos deadline - System.nanoTime();if (nanos 0L) {removeWaiter(q); // 超时移除等待节点并返回状态return state;}LockSupport.parkNanos(this, nanos); // 线程等待指定纳秒时间}elseLockSupport.park(this); // 无超时线程等待}
}
private void removeWaiter(WaitNode node) {if (node ! null) {node.thread null; // 清除等待节点的线程引用retry:for (;;) { // 当存在竞争条件时重新尝试移除等待节点for (WaitNode pred null, q waiters, s; q ! null; q s) {s q.next; // 获取当前节点的下一个节点if (q.thread ! null)pred q; // 记录当前节点作为前驱节点else if (pred ! null) {pred.next s; // 如果前驱节点存在且当前节点没有线程则将前驱节点的 next 指向当前节点的下一个节点if (pred.thread null) // 检查前驱节点是否也没有线程可能存在竞争条件continue retry; // 如果存在竞争条件重新尝试}else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s)) // 如果没有前驱节点则通过 CAS 将等待队列的头节点更新为下一个节点continue retry; // 如果 CAS 失败重新尝试}break; // 成功移除等待节点后跳出循环}}
}
static final class WaitNode {volatile Thread thread; // 线程引用volatile WaitNode next; // 下一个等待节点WaitNode() { thread Thread.currentThread(); } // 构造函数中将当前线程设置为该节点的线程
}private V report(int s) throws ExecutionException {Object x outcome;if (s NORMAL)return (V)x; // 如果状态为 NORMAL返回结果if (s CANCELLED)throw new CancellationException(); // 如果状态为 CANCELLED抛出取消异常throw new ExecutionException((Throwable)x); // 否则抛出执行异常
}
总结 get() 方法 用于获取任务的结果。如果任务的状态小于等于 COMPLETING则调用 awaitDone() 方法等待任务完成。最后调用 report() 方法根据任务的最终状态报告结果或抛出异常。
awaitDone() 方法 用于等待任务完成。通过检查任务状态并在必要时将当前线程挂起来等待。如果设置了超时会在超时之前进行等待如果没有超时则无限等待。处理竞争条件时确保等待节点正确地从等待队列中移除。
removeWaiter() 方法 用于从等待队列中移除指定的等待节点。通过遍历等待队列调整节点的连接并处理竞争条件确保等待节点被正确移除。
WaitNode 类 用于表示一个等待的节点。包含对线程的引用和指向下一个等待节点的引用。
report() 方法 根据任务的最终状态返回结果或抛出异常。如果任务状态为 NORMAL返回结果如果状态为 CANCELLED抛出 CancellationException否则抛出 ExecutionException。
cancel()方法
public boolean cancel(boolean mayInterruptIfRunning) {// 如果当前状态是 NEW且使用 CAS 原子性地将状态设置为 INTERRUPTING 或 CANCELLED成功则返回 trueif (!(state NEW UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try { // 如果中断当前线程的调用抛出异常将在 finally 中处理if (mayInterruptIfRunning) {try {Thread t runner;if (t ! null)t.interrupt(); // 如果任务正在运行尝试中断线程} finally { // 设置最终状态UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {finishCompletion(); // 完成任务的清理工作}return true; // 成功取消任务
}
总结 使用 UNSAFE.compareAndSwapInt 原子性地将任务状态从 NEW 更新为 INTERRUPTING 或 CANCELLED。如果状态更新成功返回 true否则返回 false表示取消失败例如任务已经开始执行或已完成。 如果 mayInterruptIfRunning 为 true尝试中断正在运行的线程。如果 runner 不为空则中断该线程。 在 finally 块中将任务状态设置为 INTERRUPTED表示任务被中断。 调用 finishCompletion() 方法完成任务的清理工作移除等待线程释放资源等。 如果成功取消任务返回 true否则返回 false。
6、补充知识点Treiber 栈
Treiber 栈是一种并发栈的实现方式旨在解决多线程环境下的栈操作问题。它由计算机科学家 Robert Treiber 提出主要用于实现无锁数据结构。
Treiber 栈的核心思想
无锁数据结构 Treiber 栈使用原子操作而不是传统的锁机制来保证线程安全允许多个线程并发访问而不会阻塞。 基于链表 Treiber 栈通常使用链表实现每个节点包含数据和指向下一个节点的引用。 原子操作 使用 compare-and-swap (CAS) 操作来更新栈的头部指针确保操作的原子性。
Treiber 栈的特点
并发友好 Treiber 栈允许多个线程同时执行 push 和 pop 操作而不需要传统的锁机制因此提高了并发性能。 简洁性 其实现相对简单主要依赖于 CAS 操作来保证线程安全。 非阻塞 由于使用了原子操作Treiber 栈是非阻塞的避免了锁带来的上下文切换和线程阻塞。
Treiber 栈的Java简易实现
import java.util.concurrent.atomic.AtomicReference;public class TestAT {public static void main(String[] args) {SimpleTreiberStackString stack new SimpleTreiberStack();// 测试 push 和 popThread t1 new Thread(() - {stack.push(秀逗);}, t1);Thread t2 new Thread(() - {stack.push(四眼);}, t2);Thread t3 new Thread(() - {stack.push(大黄);}, t3);t1.start();t2.start();t3.start();try {t1.join();t2.join();t3.join();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(stack.pop());System.out.println(stack.pop());System.out.println(stack.pop());}
}class SimpleTreiberStackT {private final AtomicReferenceNodeT top new AtomicReference(null);private static class NodeT {T value;NodeT next;Node(T value) {this.value value;this.next null;}}public void push(T value) {NodeT newNode new Node(value);NodeT currentTop;do {currentTop top.get();newNode.next currentTop; // 将新节点的 next 指向当前栈顶} while (!top.compareAndSet(currentTop, newNode));}public T pop() {NodeT oldTop, newTop;do {oldTop top.get();if (oldTop null) return null; // 栈为空newTop oldTop.next;} while (!top.compareAndSet(oldTop, newTop));return oldTop.value;}}Treiber 栈在FutureTask中的应用
FutureTask在其内部定义了内部类WaitNode
static final class WaitNode {// 用于存储等待的线程volatile Thread thread;// 用于指向下一个等待节点形成链表结构volatile WaitNode next;// 构造函数初始化当前节点的线程为当前线程WaitNode() { thread Thread.currentThread(); }
}并定义了全局变量 private volatile WaitNode waiters; waiters指向一个 Treiber 栈该栈保存着所有等待任务执行结果的线程。 当调用FutureTask的get方法时如果任务没有完成则调用线程会被阻塞本质上就是将要阻塞的线程包装成WaitNode结点保存到waiters指向的 Treiber 栈中。
在上面第5节 get()方法的源码分析中
下面这段代码就相当于Treiber栈的push操作
else if (q null)q new WaitNode(); // 创建新的等待节点else if (!queued)queued UNSAFE.compareAndSwapObject(this, waitersOffset,q.next waiters, q); // 将当前等待节点加入Treiber栈下面这段代码就相当于Treiber栈的pop操作
// 该方法在上面 在上面第5节 get()方法的源码分析中已经详细说明 这里不再赘述
removeWaiter(WaitNode node) {...}最后推荐看下 Java线程池原理剖析和应用指南中对于Future 模式的讲解。
7、补充FutureTask子类ScheduledFutureTask
ScheduledFutureTask 是 ScheduledThreadPoolExecutor 中的一个内部类它继承自 FutureTask 并实现了 RunnableScheduledFuture 接口。这个类的主要作用是将任务包装成一个可以定时调度的任务 。
private class ScheduledFutureTaskVextends FutureTaskV implements RunnableScheduledFutureV{}总结 继承 FutureTask: ScheduledFutureTask 继承自 FutureTask因此它具有 FutureTask 的所有功能如任务的提交、执行、获取结果等。
实现 RunnableScheduledFuture: RunnableScheduledFuture 是 Runnable、ScheduledFuture 和 Delayed 的一个组合接口。它结合了这些接口的功能使得 ScheduledFutureTask 不仅可以作为一个 Runnable 任务执行还能支持调度和延迟操作。
ScheduledFutureTask 在 ScheduledThreadPoolExecutor 中的应用 在 ScheduledThreadPoolExecutor 中调度相关的方法如 schedule 和 scheduleAtFixedRate会将任务包装成 ScheduledFutureTask 对象。
ScheduledFutureTask的构造方法
// 构造一个一次性任务指定任务的执行时间以纳秒为单位。
ScheduledFutureTask(Runnable r, V result, long ns) {super(r, result); // 调用父类 FutureTask 的构造函数传入 Runnable 和结果this.time ns; // 设置任务的触发时间纳秒this.period 0; // 单次任务所以周期为0this.sequenceNumber sequencer.getAndIncrement(); // 设置任务的序列号用于排序
}/*** 创建一个周期性任务指定任务的执行时间和周期时间以纳秒为单位。*/
ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result); // 调用父类 FutureTask 的构造函数传入 Runnable 和结果this.time ns; // 设置任务的初始触发时间纳秒this.period period; // 设置任务的周期时间纳秒用于周期性任务this.sequenceNumber sequencer.getAndIncrement(); // 设置任务的序列号用于排序
}/*** 创建一个一次性任务指定任务的触发时间以纳秒为单位任务是 Callable 类型。*/
ScheduledFutureTask(CallableV callable, long ns) {super(callable); // 调用父类 FutureTask 的构造函数传入 Callablethis.time ns; // 设置任务的触发时间纳秒this.period 0; // 单次任务所以周期为0this.sequenceNumber sequencer.getAndIncrement(); // 设置任务的序列号用于排序
}ScheduledFutureTask关键特性 延迟和调度: 通过实现 Delayed 接口ScheduledFutureTask 可以管理任务的延迟和调度逻辑。这使得任务可以在指定的时间后执行或周期性执行。
与 ScheduledThreadPoolExecutor 的集成: 在 ScheduledThreadPoolExecutor 中任务被封装成 ScheduledFutureTask 对象并根据调度策略插入到队列中。调度器会按照指定的时间或周期触发任务执行。
总结
FutureTask: 主要用于执行异步任务提供任务的提交、执行、结果获取和取消功能。 ScheduledFutureTask: 在 FutureTask 的基础上扩展支持定时和周期性任务调度适用于需要在特定时间或周期内执行的任务。