深圳市响应式网站建设,最近一周新闻大事摘抄,物流网站建设平台,网站建设课程ppt模板基于java中延时队列的实现该文章#xff0c;我们这次主要是来实现基于DelayQueue实现的延时队列。
使用DelayQueue实现的延时队列的步骤#xff1a;
定义一个继承了Delayed的类#xff0c;定义其中的属性#xff0c;并重写compareTo和getDelay两个方法创建一个Delayqueue…基于java中延时队列的实现该文章我们这次主要是来实现基于DelayQueue实现的延时队列。
使用DelayQueue实现的延时队列的步骤
定义一个继承了Delayed的类定义其中的属性并重写compareTo和getDelay两个方法创建一个Delayqueue用于创建队列创建一个生产者用于将信息添加到队列中创建一个消费者用来从队列中取出信息进行消费
接下来是一个简单的demo 定义一个元素类 import lombok.Data;import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;Data
public class DelayTesk implements Delayed {//标签Idprivate String uid;//到期时间private Long timestamp;//延时信息private String data;Overridepublic long getDelay(TimeUnit unit) {long delayTime timestamp - System.currentTimeMillis();//将时间转换成毫秒这边可转可不转影响不大return unit.convert(delayTime, TimeUnit.MILLISECONDS);}Overridepublic int compareTo(Delayed o) {//针对任务的延时时间长短进行排序把延时时间最短的放在前面long differenceTime this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);return (int)differenceTime;}
} 定义一个延时队列 import java.util.concurrent.DelayQueue;public class DelayTaskQueue {/*** 这边使用单例模式进行创建保证全局队列的唯一性* 我这边使用的是双检索双校验模式*/private volatile static DelayQueueDelayTesk delayTaskQueue;private DelayTaskQueue(){}public static DelayQueueDelayTesk getDelayTaskQueue() {if (delayTaskQueue null) {synchronized (DelayTaskQueue.class) {if (delayTaskQueue null) {delayTaskQueue new DelayQueue();}}}return delayTaskQueue;}
} 创建一个延时队列的生产者 import lombok.extern.slf4j.Slf4j;import java.util.concurrent.DelayQueue;//消息生产者
Slf4j
public class DelayTeskQueueProducer {/*** 往延时队列中插入数据* param uid* param time* param data*/public static void setDelayQueue(String uid, Long time, String data) {//创建队列DelayQueueDelayTesk delayTaskQueue DelayTaskQueue.getDelayTaskQueue();//创建任务DelayTesk delayTesk new DelayTesk();delayTesk.setUid(uid);delayTesk.setTimestamp(time);delayTesk.setData(data);log.info(消息入队:{}, uid);boolean res delayTaskQueue.offer(delayTesk);if (res) {log.info(消息入队成功:{}, uid);} else {//如果消息入队失败这边可以写一个失败的回调函数//例如将失败的消息存入数据库写个定时任务对消息进行重写投递……log.info(消息入队失败:{}, uid);}}
}定义一个延时队列的消费者 import cn.hutool.core.util.IdUtil;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.DelayQueue;Slf4j
public class DelayTeskQueueConsumer {public static void main(String[] args) {for (int i 0; i 10 ; i) {DelayTeskQueueProducer.setDelayQueue(IdUtil.fastUUID(), System.currentTimeMillis() i * 1000, hello world i);}int index 0;DelayQueueDelayTesk delayTaskQueue DelayTaskQueue.getDelayTaskQueue();while (index 10) {try {DelayTesk delayTesk delayTaskQueue.take();System.out.println(delayTesk.getData());} catch (InterruptedException e) {log.error(延时队列消费异常{}, e.getMessage());}}}
} 结果 在控控制台中每隔1秒打印一行数据 到这差不多我们的Demo就要结束了不过可能有些同学会问你这个消费者不是是写在mian方法里的每次消费的时候都需要手动去调用这跟我直接用sleep函数实现的延时队列有啥区别呀
别急 这个只是个Demo嘛如果需要使用在项目中可以写一个监听器去实时监听该延时队列 我这边暂时就只讲3种 Timer 通过timer定时定频率去获取DelayTaskQueue中的消息
import com.study.project.delay.DelayTaskQueue;
import com.study.project.delay.DelayTesk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.DelayQueue;/*** 添加Configuration 注解自动注入实例对象并由springboot 启动 定时器执行任务。*/Configuration
Slf4j
public class DelayTeskQueueTimer {Beanpublic void DelayTeskQueueTimer() {log.info(监听开始);final Timer timer new Timer();DelayQueueDelayTesk delayTaskQueue DelayTaskQueue.getDelayTaskQueue();timer.schedule(new TimerTask() {Overridepublic void run() {try {DelayTesk delayTesk delayTaskQueue.take();System.out.println(delayTesk.getData());} catch (Exception e) {log.error(延时队列消费异常{}, e.getMessage());}}//第一次执行是在当前时间的一秒之后之后每隔一秒钟执行一次},1000, 1000);}
} ConmandlineRunner import com.study.project.delay.DelayTaskQueue;
import com.study.project.delay.DelayTesk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.DelayQueue;
/*** Spring Boot应用程序在启动后程序从容器中遍历实现了CommandLineRunner接口的实例并运行它们的run方法*/
Slf4j
Configuration
public class DelayTeskQueueTimerCommandLineRunner implements CommandLineRunner {Overridepublic void run(String... args) {log.info(CommandLineRunner监听开始);DelayQueueDelayTesk delayTaskQueue DelayTaskQueue.getDelayTaskQueue();new Thread(() -{while (true) {try {DelayTesk delayTesk delayTaskQueue.take();System.out.println(delayTesk.getData());} catch (Exception e) {log.error(延时队列消费异常{}, e.getMessage());}}}).start();}
}ApplicationListener 该方法和ConmandlineRunner方法一样 都是在Spring Boot应用程序在启动后对DelayQueue进行监听
import com.study.project.delay.DelayTaskQueue;
import com.study.project.delay.DelayTesk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;import java.util.concurrent.DelayQueue;Slf4j
Configuration
public class DelayTeskQueueApplicationListener implements ApplicationListenerContextRefreshedEvent {Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {log.info(ApplicationListener监听开始);DelayQueueDelayTesk delayTaskQueue DelayTaskQueue.getDelayTaskQueue();new Thread(() - {while (true) {try {DelayTesk delayTesk delayTaskQueue.take();System.out.println(delayTesk.getData());} catch (Exception e) {log.error(延时队列消费异常{}, e.getMessage());}}}).start();}
}当然监听的方法其实还有很多不过同学们在实现队列的时候不要觉得实现了就好了要去思考如何去保证数据的持久化保证数据不会不会丢失