网站开发语言学习,网页美工是什么,如何做网站创业,网站跳出率高怎么办文章目录 线程池 Runnable/Callable线程池 FutureCompletableFuture线程池 Async注解Spring 事件创建事件事件发布者事件监听器调用事件 消息队列生产者消费者 在实际开发中有些耗时操作#xff0c;或者对主流程不是那么重要的逻辑#xff0c;可以通过异步的方式去执行 Runnable/Callable线程池 FutureCompletableFuture线程池 Async注解Spring 事件创建事件事件发布者事件监听器调用事件 消息队列生产者消费者 在实际开发中有些耗时操作或者对主流程不是那么重要的逻辑可以通过异步的方式去执行从而提高主逻辑的效率。常见的场景比如下单成功后短信或者小程序内通知用户这个过程其实可以走异步最坏的情况是没通知到用户这个情况是可以接受的只要下单成功了就行。下面介绍几种常见的异步编程的方式 PS忽略下方创建线程池的方式主要看如何实现异步编程 线程池 Runnable/Callable
这种方式在主线程中引入线程池通过线程池进行异步操作。
public class AsyncThread{private static ExecutorService executorService Executors.newSingleThreadExecutor();public static void main(String[] args) {System.out.println(主线程开始);executorService.submit(() - {System.out.println(这是一个异步线程);});System.out.println(主线程结束);}
}-- 控制台打印结果
主线程开始
主线程结束
这是一个异步线程可能有人有疑问为什么不直接new Thread()主要原因是频繁创建线程销毁非常耗费系统资源。线程池是池化技术可以更好的管理池内线程的生命周期。但是这种实现方式不能满足一些特殊场景比如需要异步任务的返回值。
线程池 Future
Future是JUC并发包提供的它的出现解决了异步任务需要返回值的问题。
public class FutureTest {ExecutorService executorService Executors.newFixedThreadPool(1);public static void main(String[] args) {System.out.println(主线程开始);new FutureManager().execute();System.out.println(主线程结束);}SneakyThrowspublic String execute() {FutureString future executorService.submit(new CallableString() {Overridepublic String call() throws Exception {System.out.println(这是一个异步线程开始);Thread.sleep(2000);System.out.println(这是一个异步线程结束);return 这是一个异步线程返回的结果;}});String result 默认返回值;// 放开注释会阻塞主线程
// result future.get();return result;}
}-- 不获取结果 控制台打印结果
主线程开始
主线程结束
这是一个异步线程开始
这是一个异步线程结束-- 阻塞获取结果 控制台打印结果
主线程开始
这是一个异步线程开始
这是一个异步线程结束
主线程结束Future虽然可以获得异步任务的结果但是缺点也很明显主要缺点如下
获取结果需要阻塞主线程异步任务出现异常主线程无法感知多个Future之间相互独立如果多个异步任务的返回值有依赖关系就不能满足需求
CompletableFuture
CompletableFuture也是JUC并发包中的类它可以让多个Future进行编排。
public class CompletableFutureTest {/*** thenAccept子任务和父任务公用同一个线程*/SneakyThrowspublic static void thenRunAsync() {CompletableFutureInteger fristFuture CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread() fristFuture ....);try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}return 1;});CompletableFutureVoid secondFuture fristFuture.thenRunAsync(() - {System.out.println(Thread.currentThread() secondFuture ...);});//等待任务1执行完成System.out.println(fristFuture结果- fristFuture.get());//等待任务2执行完成System.out.println(secondFuture结果- secondFuture.get());}public static void main(String[] args) {System.out.println(主线程开始);thenRunAsync();System.out.println(主线程结束);}}-- 控制台打印结果
主线程开始
Thread[ForkJoinPool.commonPool-worker-3,5,main] fristFuture ....
Thread[ForkJoinPool.commonPool-worker-5,5,main] secondFuture ...
fristFuture结果-1
secondFuture结果-null
主线程结束以上示例fristFuture与secondFuture两个任务简历联系后者需要前者执行完在执行。可以实际运行一下看下效果。大概流程是示例代码的11行会阻塞在那里而不会先打印Thread[ForkJoinPool.commonPool-worker-5,5,main] secondFuture … 原因是secondFuture依赖于fristFuturefristFuture执行结束后才会往下执行。
CompletableFuture没有配合线程池使用的原因是CompletableFuture默认使用的是ForkJoinPool.commonPool从打印的结果可以清楚的看出来。ForkJoinPool的好处是可以自己管理线程池当没有太多任务需要执行时它会自己关闭一些线程释放资源。
线程池 Async注解
**Async注解建议配合线程池使用使用时没有指定线程池会使用默认的SimpleAsyncTaskExecutor它并不是真正的线程池每次都是创建新的线程执行任务不会复用线程。最主要的是它没最大线程数的限制并发大的时候容易产生性能问题。**下面是一个示例
首先需要自定义一个线程池加上EnableAsync和Configuration这样可以不用在启动类上加EnableAsync。
EnableAsync
Configuration
public class TaskPoolConfig {Bean(taskExecutor)public Executor taskExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor new ThreadPoolTaskExecutor();threadPoolTaskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());threadPoolTaskExecutor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);threadPoolTaskExecutor.setQueueCapacity(100);threadPoolTaskExecutor.setKeepAliveSeconds(60);threadPoolTaskExecutor.setThreadNamePrefix(taskExecutor-);threadPoolTaskExecutor.setAwaitTerminationSeconds(60);threadPoolTaskExecutor.setRejectedExecutionHandler(new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy());return threadPoolTaskExecutor;}
}在需要异步的方法上加上Async注解并指定线程池即可。
Service
public class AsyncServiceImpl implements AsyncService {OverrideAsync(taskExecutor)public MessageResult sendSms(String callPrefix, String mobile, String actionType, String content) {// 业务逻辑}}Spring 事件
Spring的事件原理是在某个地方抛出一个事件通过Spring的监听机制监听到该事件进而做出业务逻辑的处理。这个过程需要有3个步骤创建事件发布事件监听事件。
创建事件
首先我们创建一个自定义的事件继承自ApplicationEvent
import org.springframework.context.ApplicationEvent;public class MyEvent extends ApplicationEvent {public MyEvent(Object source) {super(source);}
}事件发布者
然后我们创建一个事件发布者它会发布我们刚刚创建的MyEvent事件
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;Component
public class MyEventPublisher {private final ApplicationEventPublisher applicationEventPublisher;public MyEventPublisher(ApplicationEventPublisher applicationEventPublisher) {this.applicationEventPublisher applicationEventPublisher;}public void publishEvent() {MyEvent myEvent new MyEvent(this);applicationEventPublisher.publishEvent(myEvent);}
}事件监听器
接下来我们创建一个事件监听器它会监听并处理我们的MyEvent事件
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;Component
public class MyEventListener {EventListenerpublic void handleMyEvent(MyEvent event) {System.out.println(MyEvent received);}
}调用事件
在主程序中调用事件发布者的publishEvent方法来发布事件
import org.springframework.context.annotation.AnnotationConfigApplicationContext;public class Main {public static void main(String[] args) {AnnotationConfigApplicationContext context new AnnotationConfigApplicationContext();context.register(MyEventPublisher.class);context.register(MyEventListener.class);context.refresh();MyEventPublisher publisher context.getBean(MyEventPublisher.class);publisher.publishEvent();}
}消息队列
消息队列在异步的场景下使用非常的广泛。以下以RabbitMQ为例。
生产者
Component
public class Producer {AutowiredAmqpTemplate amqpTemplate;public void sendCallbackMessage(MessageRequest message) {amqpTemplate.convertAndSend(QueueEnum.QUEUE_NAME.getExchange(), QueueEnum.QUEUE_NAME.getRoutingKey(), JSONObject.toJsonString(message), new MessagePostProcessor() {Overridepublic Message postProcessMessage(Message message) throws Exception {// xxxxxreturn message;}});}
}消费者
Component
RabbitListener(queues message.order, containerFactory listenerContainerFactory)
public class Consumer {RabbitHandlerpublic void handle(String json, Channel channel, Headers MapString, Object map) throws Exception {// 校验逻辑比如业务校验请求头检验try { //执行业务逻辑//消息消息成功手动确认对应消息确认模式acknowledge-mode: manualchannel.basicAck((Long) map.get(Headers.SUCCESS), false);} catch (Exception e) {log.error(消费失败 - {}, e);}}
}