网站建设shwzzz,湖南张家界建设厅网站,网站建设需要域名服务器,北京做网站的大公司上篇文章12分钟从Executor自顶向下彻底搞懂线程池中我们聊到线程池#xff0c;而线程池中包含阻塞队列
这篇文章我们主要聊聊并发包下的阻塞队列
阻塞队列
什么是队列#xff1f;
队列的实现可以是数组、也可以是链表#xff0c;可以实现先进先出的顺序队列#xff0c;…上篇文章12分钟从Executor自顶向下彻底搞懂线程池中我们聊到线程池而线程池中包含阻塞队列
这篇文章我们主要聊聊并发包下的阻塞队列
阻塞队列
什么是队列
队列的实现可以是数组、也可以是链表可以实现先进先出的顺序队列也可以实现先进后出的栈队列
那什么是阻塞队列
在经典的生产者/消费者模型中生产者们将生产的元素放入队列而消费者们从队列获取元素消费
当队列已满我们会手动阻塞生产者直到消费者消费再来手动唤醒生产者
当队列为空我们会手动阻塞消费者直到生产者生产再来手动唤醒消费者
在这个过程中由于使用的是普通队列阻塞与唤醒我们需要手动操作保证同步机制
阻塞队列在队列的基础上提供等待/通知功能用于线程间的通信避免线程竞争死锁
生产者可以看成往线程池添加任务的用户线程而消费者则是线程池中的工作线程
当阻塞队列为空时阻塞工作线程获取任务当阻塞队列已满时阻塞用户线程向队列中添加任务创建非核心线程、拒绝策略
API
阻塞队列提供一下四种添加、删除元素的API我们常用阻塞等待/超时阻塞等待的API
方法名抛出异常返回true/false阻塞等待超时阻塞等待添加add(Object)offer(Object)put(Object)offer(Object,long,TimeUnit)删除remove()poll()take()poll(long,TimeUnit)
抛出异常:队满add 抛出异常IllegalStateExceptio 队空remove 抛出异常NoSuchElementException返回值: 队满offer返回false队空poll返回null阻塞等待: 队满时put会阻塞线程 或 队空时take会阻塞线程超时阻塞等待: 在阻塞等待、返回true/false的基础上增加超时等待等待一定时间就退出等待
阻塞队列的公平与不公平
什么是阻塞队列的公平与不公平
当阻塞队列已满时如果是公平的那么阻塞的线程根据先后顺序从阻塞队列中获取元素不公平则反之
实际上阻塞队列的公平与不公平要看实现阻塞队列的锁是否公平
阻塞队列一般默认使用不公平锁
ArrayBlockingQueue
从名称看就可以知道它是数组实现的我们先来看看它有哪些重要字段 public class ArrayBlockingQueueE extends AbstractQueueEimplements BlockingQueueE, java.io.Serializable {//存储元素的数组final Object[] items;//记录元素出队的下标int takeIndex;//记录元素入队的下标int putIndex;//队列中元素数量int count//使用的锁final ReentrantLock lock;//出队的等待队列作用于消费者private final Condition notEmpty;//入队的等待队列作用于生产者private final Condition notFull;}
看完关键字段我们可以知道ArrayBlockingQueue由数组实现、使用并发包下的可重入锁、同时用两个等待队列作用生产者和消费者
为什么出队、入队要使用两个下标记录
实际上它是一个环形数组在初始化后就不改变大小后续查看源码自然能明白它是环形数组
在构造器中、初始化数组容量同时使用非公平锁 public ArrayBlockingQueue(int capacity) {this(capacity, false);}public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity 0)throw new IllegalArgumentException();this.items new Object[capacity];//锁是否为公平锁lock new ReentrantLock(fair);notEmpty lock.newCondition();notFull lock.newCondition();}
ArrayBlockingQueue的公平性是由ReentrantLock来实现的
我们来看看入队方法入队方法都大同小异我们本文都查看支持超时、响应中断的方法 public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {//检查空指针checkNotNull(e);//获取超时纳秒long nanos unit.toNanos(timeout);final ReentrantLock lock this.lock;//加锁lock.lockInterruptibly();try {//如果队列已满while (count items.length) {//超时则返回入队失败否则生产者等待对应时间if (nanos 0)return false;nanos notFull.awaitNanos(nanos);}//入队enqueue(e);return true;} finally {//解锁lock.unlock();}}
直接使用可重入锁保证同步如果队列已满在此期间判断是否超时超时就返回未超时等待未满则执行入队方法 private void enqueue(E x) {//队列数组final Object[] items this.items;//往入队下标添加值items[putIndex] x;//自增入队下标 如果已满则定位到0 成环if (putIndex items.length)putIndex 0;//统计数量增加count;//唤醒消费者notEmpty.signal();}
在入队中主要是添加元素、修改下次添加的下标、统计队列中的元素和唤醒消费者到这以及可以说明它的实现是环形数组
ArrayBlockingQueue由环形数组实现的阻塞队列固定容量不支持动态扩容使用非公平的ReertrantLock保证入队、出队操作的原子性使用两个等待队列存储等待的生产者、消费者适用于在并发量不大的场景
LinkedBlockingQueue
LinkedBlockingQueue从名称上来看就是使用链表实现的我们来看看它的关键字段 public class LinkedBlockingQueueE extends AbstractQueueEimplements BlockingQueueE, java.io.Serializable {//节点static class NodeE {//存储元素E item;//下一个节点NodeE next;//...}//容量上限private final int capacity;//队列元素数量private final AtomicInteger count new AtomicInteger();//头节点transient NodeE head;//尾节点private transient NodeE last;//出队的锁private final ReentrantLock takeLock new ReentrantLock();//出队的等待队列private final Condition notEmpty takeLock.newCondition();//入队的锁private final ReentrantLock putLock new ReentrantLock();//入队的等待队列private final Condition notFull putLock.newCondition();}
从字段中我们可以知道它使用单向链表的节点、且用首尾节点记录队列的头尾并且它使用两把锁、两个等待队列作用于队头、尾与ArrayBlockingQueue相比能够增加并发性能
有个奇怪的地方都使用锁了为什么记录元素数量count却使用原子类呢
这是由于两把锁作用于入队与出队的操作入队与出队也可能并发执行同时修改count因此要使用原子类保证修改数量的原子性
在初始化时需要设置容量大小否则会设置成无界的阻塞队列容量是int的最大值
当消费速度小于生产速度时阻塞队列中会堆积任务进而导致容易发生OOM public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}public LinkedBlockingQueue(int capacity) {if (capacity 0) throw new IllegalArgumentException();this.capacity capacity;last head new NodeE(null);}
来看看入队操作 public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {if (e null) throw new NullPointerException();long nanos unit.toNanos(timeout);int c -1;final ReentrantLock putLock this.putLock;final AtomicInteger count this.count;//加锁putLock.lockInterruptibly();try {//队列已满超时返回不超时等待while (count.get() capacity) {if (nanos 0)return false;nanos notFull.awaitNanos(nanos);}//入队enqueue(new NodeE(e));// 先获取再自增 c中存储的是旧值c count.getAndIncrement();//如果数量没满 唤醒生产者if (c 1 capacity)notFull.signal();} finally {//解锁putLock.unlock();}//如果旧值为0 说明该入队操作前是空队列唤醒消费者来消费if (c 0)signalNotEmpty();return true;}
入队操作类似只不过在此期间如果数量没满唤醒生产者生产队列为空唤醒消费者来消费从而增加并发性能
入队只是改变指向关系 //添加节点到末尾private void enqueue(NodeE node) {last last.next node;}
唤醒消费者前要先获取锁 private void signalNotEmpty() {final ReentrantLock takeLock this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}
出队操作也类似 public E poll(long timeout, TimeUnit unit) throws InterruptedException {E x null;int c -1;long nanos unit.toNanos(timeout);final AtomicInteger count this.count;final ReentrantLock takeLock this.takeLock;takeLock.lockInterruptibly();try {// 队列为空 超时返回空否则等待while (count.get() 0) {if (nanos 0)return null;nanos notEmpty.awaitNanos(nanos);}//出队x dequeue();c count.getAndDecrement();//队列中除了当前线程获取的任务外还有任务就去唤醒消费者消费if (c 1)notEmpty.signal();} finally {takeLock.unlock();}//原来队列已满就去唤醒生产者 生产if (c capacity)signalNotFull();return x;}
LinkedBlockingQueue与ArrayBlockingQueue的出队、入队实现类似
只不过LinkedBlockingQueue入队、出队获取/释放的锁不同并且在此过程中不同情况回去唤醒其他的生产者、消费者从而进一步提升并发性能
LinkedBlockingQueue 由单向链表实现的阻塞队列记录首尾节点默认是无界、非公平的阻塞队列初始化时要设置容量否则可能OOM使用两把锁、两个等待队列分别操作入队、出队的生产者、消费者在入队、出队操作期间不同情况还会去唤醒生产者、消费者从而进一步提升并发性能适用于并发量大的场景
LinkedBlockingDeque
LinkedBlockingDeque实现与LinkedBlockQueue类似在LinkedBlockQueue的基础上支持从队头、队尾进行添加、删除的操作
它是一个双向链表带有一系列First、Last的方法比如offerLast、pollFirst
由于LinkedBlockingDeque双向常用其来实现工作窃取算法从而减少线程的竞争
什么是工作窃取算法
比如多线程处理多个阻塞队列的任务一一对应每个线程从队头获取任务处理当A线程处理完它负责的阻塞队列所有任务时它再从队尾窃取其他阻塞队列的任务这样就不会发生竞争除非队列中只剩一个任务才会发生竞争
ForkJoin框架就使用其来充当阻塞队列我们后文再聊这个框架
PriorityBlockingQueue
PriorityBlockingQueue是优先级排序的无界阻塞队列阻塞队列按照优先级进行排序
使用堆排序具体排序算法由Comparable或Comparator实现比较规则
默认泛型中的对象需要实现Comparable比较规则 根据compareTo方法规则排序构造器中指定比较器Comparator 根据比较器规则排序 Testpublic void testPriorityBlockingQeque() {//默认使用Integer实现Comparable的升序PriorityBlockingQueueInteger queue new PriorityBlockingQueue(6);queue.offer(99);queue.offer(1099);queue.offer(299);queue.offer(992);queue.offer(99288);queue.offer(995);//99 299 992 995 1099 99288while (!queue.isEmpty()){System.out.print( queue.poll());}System.out.println();//指定Comparator 降序queue new PriorityBlockingQueue(6, (o1, o2) - o2-o1);queue.offer(99);queue.offer(1099);queue.offer(299);queue.offer(992);queue.offer(99288);queue.offer(995);//99288 1099 995 992 299 99while (!queue.isEmpty()){System.out.print( queue.poll());}}
适用于需要根据优先级排序处理的场景
DelayQueue
Delay是一个延时获取元素的无界阻塞队列 延时最长排在队尾
Delay队列元素实现Delayed接口通过getDelay获取延时时间 public class DelayQueueE extends Delayed extends AbstractQueueEimplements BlockingQueueE {}public interface Delayed extends ComparableDelayed {long getDelay(TimeUnit unit);} DelayQueue应用场景 缓存系统的设计DelayQueue存放缓存有效期当可以获取到元素时说明缓存过期定时任务调度 将定时任务的时间设置为延时时间一旦可以获取到任务就开始执行
以定时线程池ScheduledThreadPoolExecutor的定时任务ScheduledFutureTask为例它实现Delayed获取延迟执行的时间 创建对象时,初始化数据 ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);//time记录当前对象延迟到什么时候可以使用,单位是纳秒this.time ns;this.period period;//sequenceNumber记录元素在队列中先后顺序 sequencer原子自增//AtomicLong sequencer new AtomicLong();this.sequenceNumber sequencer.getAndIncrement();} 实现Delayed接口的getDelay方法 public long getDelay(TimeUnit unit) {return unit.convert(time - now(), NANOSECONDS);} Delay接口继承了Comparable接口目的是要实现compareTo方法来继续排序 public int compareTo(Delayed other) {if (other this) // compare zero if same objectreturn 0;if (other instanceof ScheduledFutureTask) {ScheduledFutureTask? x (ScheduledFutureTask?)other;long diff time - x.time;if (diff 0)return -1;else if (diff 0)return 1;else if (sequenceNumber x.sequenceNumber)return -1;elsereturn 1;}long diff getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);return (diff 0) ? -1 : (diff 0) ? 1 : 0;}
SynchronousQueue
SynchronousQueue是一个默认下支持非公平不存储元素的阻塞队列
每个put操作要等待一个take操作,否则不能继续添加元素会阻塞 使用公平锁 Testpublic void testSynchronousQueue() throws InterruptedException {final SynchronousQueueInteger queue new SynchronousQueue(true);new Thread(() - {try {queue.put(1);queue.put(2);} catch (InterruptedException e) {e.printStackTrace();}}, put12线程).start();new Thread(() - {try {queue.put(3);queue.put(4);} catch (InterruptedException e) {e.printStackTrace();}}, put34线程).start();TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread().getName() 拿出 queue.take());TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread().getName() 拿出 queue.take());TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread().getName() 拿出 queue.take());TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread().getName() 拿出 queue.take());}//结果 因为使用公平锁 1在2前3在4前//main拿出1//main拿出3//main拿出2//main拿出4
SynchronousQueue队列本身不存储元素负责把生产者的数据传递给消费者适合传递性的场景
在该场景下吞吐量会比ArrayBlockingQueueLinkedBlockingQueue高
LinkedTransferQueue
LinkedTransferQueue是一个链表组成的无界阻塞队列拥有transfer()和tryTransfer()方法
transfer()
如果有消费者在等待接收元素transfer(e)会把元素e传输给消费者
如果没有消费者在等待接收元素transfer(e)会将元素e存放在队尾直到有消费者获取了才返回 Testpublic void testTransfer() throws InterruptedException {LinkedTransferQueue queue new LinkedTransferQueue();new Thread(()-{try {//阻塞直到被获取queue.transfer(1);//生产者放入的1被取走了System.out.println(Thread.currentThread().getName()放入的1被取走了);} catch (InterruptedException e) {e.printStackTrace();}},生产者).start();TimeUnit.SECONDS.sleep(3);//main取出队列中的元素System.out.println(Thread.currentThread().getName()取出队列中的元素);queue.poll();}
tryTransfer()无论消费者是否消费都直接返回 Testpublic void testTryTransfer() throws InterruptedException {LinkedTransferQueueInteger queue new LinkedTransferQueue();//falseSystem.out.println(queue.tryTransfer(1));//nullSystem.out.println(queue.poll());new Thread(()-{try {//消费者取出2System.out.println(Thread.currentThread().getName()取出queue.poll(2, TimeUnit.SECONDS));} catch (InterruptedException e) {e.printStackTrace();}},消费者).start();TimeUnit.SECONDS.sleep(1);//trueSystem.out.println(queue.tryTransfer(2));}
tryTransfer(long,TimeUnit) 在超时时间内消费者消费元素返回true反之返回false
总结
ArrayBlockingQueue由环形数组实现固定容量无法扩容使用非公平的可重入锁锁、两个等待队列操作入队、出队操作适合并发小的场景
LinkedBlockingQueue由单向链表实现默认无界使用两个可重入锁、两个等待队列进行入队、出队操作并在此期间可能唤醒生产者或消费者线程以此提高并发性能
LinkedBlockingDeque由双向链表实现在LinkedBlockingQueue的基础上能够在队头、队尾都进行添加、删除操作适用工作窃取算法1
PriorityBlockingQueue由堆排序实现的优先级队列具体排序算法由Comparable、Comparator来实现适用于需要根据优先级排序处理任务的场景
DelayQueue 是一个延时队列队列中存储的元素需要实现Delayed接口来获取延时时间适用于缓存失效、定时任务的场景
SynchronousQueue不存储元素只将生产者生产的元素传递给消费者 适用于传递性的场景比如不同线程间传递数据
LinkedTransgerQueue是传输形的阻塞队列适用于单个元素传递的场景
在使用无界的阻塞队列时需要设置容量避免存储任务太多导致OOM
最后不要白嫖一键三连求求拉~
本篇文章被收入专栏 由点到线由线到面深入浅出构建Java并发编程知识体系感兴趣的同学可以持续关注喔
本篇文章笔记以及案例被收入 gitee-StudyJava、 github-StudyJava 感兴趣的同学可以stat下持续关注喔~
案例地址
Gitee-JavaConcurrentProgramming/src/main/java/E_BlockQueue
Github-JavaConcurrentProgramming/src/main/java/E_BlockQueue
有什么问题可以在评论区交流如果觉得菜菜写的不错可以点赞、关注、收藏支持一下~
关注菜菜分享更多干货公众号菜菜的后端私房菜 本文由博客一文多发平台 OpenWrite 发布