前言
颍上ssl适用于网站、小程序/APP、API接口等需要进行数据传输应用场景,ssl证书未来市场广阔!成为创新互联的ssl证书销售渠道,可以享受市场价格4-6折优惠!如果有意向欢迎电话联系或者加微信:028-86922220(备注:SSL证书合作)期待与您的合作!
线程池运行时,会不断从任务队列中获取任务,然后执行任务。如果我们想实现延时或者定时执行任务,重要一点就是任务队列会根据任务延时时间的不同进行排序,延时时间越短地就排在队列的前面,先被获取执行。
队列是先进先出的数据结构,就是先进入队列的数据,先被获取。但是有一种特殊的队列叫做优先级队列,它会对插入的数据进行优先级排序,保证优先级越高的数据首先被获取,与数据的插入顺序无关。
实现优先级队列高效常用的一种方式就是使用堆。关于堆的实现可以查看《堆和二叉堆的实现和特性》
ScheduledThreadPoolExecutor线程池
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,所以其内部的数据结构和ThreadPoolExecutor基本一样,并在其基础上增加了按时间调度执行任务的功能,分为延迟执行任务和周期性执行任务。
ScheduledThreadPoolExecutor的构造函数只能传3个参数corePoolSize、ThreadFactory、RejectedExecutionHandler,默认maximumPoolSize为Integer.MAX_VALUE。
工作队列是高度定制化的延迟阻塞队列DelayedWorkQueue,其实现原理和DelayQueue基本一样,核心数据结构是二叉最小堆的优先队列,队列满时会自动扩容,所以offer操作永远不会阻塞,maximumPoolSize也就用不上了,所以线程池中永远会保持至多有corePoolSize个工作线程正在运行。
- public ScheduledThreadPoolExecutor(int corePoolSize,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
- super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
- new DelayedWorkQueue(), threadFactory, handler);
- }
DelayedWorkQueue延迟阻塞队列
DelayedWorkQueue 也是一种设计为定时任务的延迟队列,它的实现和DelayQueue一样,不过是将优先级队列和DelayQueue的实现过程迁移到本身方法体中,从而可以在该过程当中灵活的加入定时任务特有的方法调用。
工作原理
DelayedWorkQueue的实现原理中规中矩,内部维护了一个以RunnableScheduledFuture类型数组实现的最小二叉堆,初始容量是16,使用ReentrantLock和Condition实现生产者和消费者模式。
源码分析
定义
DelayedWorkQueue 的类继承关系如下:
其包含的方法定义如下:
成员属性
- // 初始时,数组长度大小。
- private static final int INITIAL_CAPACITY = 16;
- // 使用数组来储存队列中的元素。
- private RunnableScheduledFuture>[] queue = new RunnableScheduledFuture>[INITIAL_CAPACITY];
- // 使用lock来保证多线程并发安全问题。
- private final ReentrantLock lock = new ReentrantLock();
- // 队列中储存元素的大小
- private int size = 0;
- //特指队列头任务所在线程
- private Thread leader = null;
- // 当队列头的任务延时时间到了,或者有新的任务变成队列头时,用来唤醒等待线程
- private final Condition available = lock.newCondition();
DelayedWorkQueue是用数组来储存队列中的元素,核心数据结构是二叉最小堆的优先队列,队列满时会自动扩容。
构造函数
DelayedWorkQueue 是 ScheduledThreadPoolExecutor 的静态类部类,默认只有一个无参构造方法。
- static class DelayedWorkQueue extends AbstractQueue
- implements BlockingQueue
{ - // ...
- }
入队方法
DelayedWorkQueue 提供了 put/add/offer(带时间) 三个插入元素方法。我们发现与普通阻塞队列相比,这三个添加方法都是调用offer方法。那是因为它没有队列已满的条件,也就是说可以不断地向DelayedWorkQueue添加元素,当元素个数超过数组长度时,会进行数组扩容。
- public void put(Runnable e) {
- offer(e);
- }
- public boolean add(Runnable e) {
- return offer(e);
- }
- public boolean offer(Runnable e, long timeout, TimeUnit unit) {
- return offer(e);
- }
offer添加元素
ScheduledThreadPoolExecutor提交任务时调用的是DelayedWorkQueue.add,而add、put等一些对外提供的添加元素的方法都调用了offer。
- public boolean offer(Runnable x) {
- if (x == null)
- throw new NullPointerException();
- RunnableScheduledFuture> e = (RunnableScheduledFuture>)x;
- // 使用lock保证并发操作安全
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- int i = size;
- // 如果要超过数组长度,就要进行数组扩容
- if (i >= queue.length)
- // 数组扩容
- grow();
- // 将队列中元素个数加一
- size = i + 1;
- // 如果是第一个元素,那么就不需要排序,直接赋值就行了
- if (i == 0) {
- queue[0] = e;
- setIndex(e, 0);
- } else {
- // 调用siftUp方法,使插入的元素变得有序。
- siftUp(i, e);
- }
- // 表示新插入的元素是队列头,更换了队列头,
- // 那么就要唤醒正在等待获取任务的线程。
- if (queue[0] == e) {
- leader = null;
- // 唤醒正在等待等待获取任务的线程
- available.signal();
- }
- } finally {
- lock.unlock();
- }
- return true;
- }
其基本流程如下:
offer基本流程图如下:
扩容grow()
可以看到,当队列满时,不会阻塞等待,而是继续扩容。新容量newCapacity在旧容量oldCapacity的基础上扩容50%(oldCapacity >> 1相当于oldCapacity /2)。最后Arrays.copyOf,先根据newCapacity创建一个新的空数组,然后将旧数组的数据复制到新数组中。
- private void grow() {
- int oldCapacity = queue.length;
- // 每次扩容增加原来数组的一半数量。
- // grow 50%
- int newCapacity = oldCapacity + (oldCapacity >> 1);
- if (newCapacity < 0) // overflow
- newCapacity = Integer.MAX_VALUE;
- // 使用Arrays.copyOf来复制一个新数组
- queue = Arrays.copyOf(queue, newCapacity);
- }
向上堆化siftUp
新添加的元素先会加到堆底,然后一步步和上面的父亲节点比较,若小于父亲节点则和父亲节点互换位置,循环比较直至大于父亲节点才结束循环。通过循环,来查找元素key应该插入在堆二叉树那个节点位置,并交互父节点的位置。
向上堆化siftUp的详细过程可以查看《堆和二叉堆的实现和特性》
- private void siftUp(int k, RunnableScheduledFuture> key) {
- // 当k==0时,就到了堆二叉树的根节点了,跳出循环
- while (k > 0) {
- // 父节点位置坐标, 相当于(k - 1) / 2
- int parent = (k - 1) >>> 1;
- // 获取父节点位置元素
- RunnableScheduledFuture> e = queue[parent];
- // 如果key元素大于父节点位置元素,满足条件,那么跳出循环
- // 因为是从小到大排序的。
- if (key.compareTo(e) >= 0)
- break;
- // 否则就将父节点元素存放到k位置
- queue[k] = e;
- // 这个只有当元素是ScheduledFutureTask对象实例才有用,用来快速取消任务。
- setIndex(e, k);
- // 重新赋值k,寻找元素key应该插入到堆二叉树的那个节点
- k = parent;
- }
- // 循环结束,k就是元素key应该插入的节点位置
- queue[k] = key;
- setIndex(key, k);
- }
出队方法
DelayedWorkQueue 提供了以下几个出队方法
take消费元素
Worker工作线程启动后就会循环消费工作队列中的元素,因为ScheduledThreadPoolExecutor的keepAliveTime=0,所以消费任务其只调用了DelayedWorkQueue.take。take基本流程如下:
- public RunnableScheduledFuture> take() throws InterruptedException {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- for (;;) {
- RunnableScheduledFuture> first = queue[0];
- // 如果没有任务,就让线程在available条件下等待。
- if (first == null)
- available.await();
- else {
- // 获取任务的剩余延时时间
- long delay = first.getDelay(NANOSECONDS);
- // 如果延时时间到了,就返回这个任务,用来执行。
- if (delay <= 0)
- return finishPoll(first);
- // 将first设置为null,当线程等待时,不持有first的引用
- first = null; // don't retain ref while waiting
- // 如果还是原来那个等待队列头任务的线程,
- // 说明队列头任务的延时时间还没有到,继续等待。
- if (leader != null)
- available.await();
- else {
- // 记录一下当前等待队列头任务的线程
- Thread thisThread = Thread.currentThread();
- leader = thisThread;
- try {
- // 当任务的延时时间到了时,能够自动超时唤醒。
- available.awaitNanos(delay);
- } finally {
- if (leader == thisThread)
- leader = null;
- }
- }
- }
- }
- } finally {
- if (leader == null && queue[0] != null) // 唤醒等待任务的线程
- available.signal();
- ock.unlock();
- }
- }
take基本流程图如下:
take线程阻塞等待
可以看出这个生产者take线程会在两种情况下阻塞等待:
finishPoll出队列
堆顶元素delay<=0,执行时间到,出队列就是一个向下堆化的过程siftDown。
- // 移除队列头元素
- private RunnableScheduledFuture> finishPoll(RunnableScheduledFuture> f) {
- // 将队列中元素个数减一
- int s = --size;
- // 获取队列末尾元素x
- RunnableScheduledFuture> x = queue[s];
- // 原队列末尾元素设置为null
- queue[s] = null;
- if (s != 0)
- // 因为移除了队列头元素,所以进行重新排序。
- siftDown(0, x);
- setIndex(f, -1);
- return f;
- }
堆的删除方法主要分为三步:
向下堆化siftDown
由于堆顶元素出队列后,就破坏了堆的结构,需要组织整理下,将堆尾元素移到堆顶,然后向下堆化:
向下堆化siftDown的详细过程可以查看《堆和二叉堆的实现和特性》
- private void siftDown(int k, RunnableScheduledFuture> key) {
- // 无符号右移,相当于size/2
- int half = size >>> 1;
- // 通过循环,保证父节点的值不能大于子节点。
- while (k < half) {
- // 左子节点, 相当于 (k * 2) + 1
- int child = (k << 1) + 1;
- // 左子节点位置元素
- RunnableScheduledFuture> c = queue[child];
- // 右子节点, 相当于 (k * 2) + 2
- int right = child + 1;
- // 如果左子节点元素值大于右子节点元素值,那么右子节点才是较小值的子节点。
- // 就要将c与child值重新赋值
- if (right < size && c.compareTo(queue[right]) > 0)
- c = queue[child = right];
- // 如果父节点元素值小于较小的子节点元素值,那么就跳出循环
- if (key.compareTo(c) <= 0)
- break;
- // 否则,父节点元素就要和子节点进行交换
- queue[k] = c;
- setIndex(c, k);
- k = child;
- }
- queue[k] = key;
- setIndex(key, k);
- }
leader线程
leader线程的设计,是Leader-Follower模式的变种,旨在于为了不必要的时间等待。当一个take线程变成leader线程时,只需要等待下一次的延迟时间,而不是leader线程的其他take线程则需要等leader线程出队列了才唤醒其他take线程。
poll()
立即获取队列头元素,当队列头任务是null,或者任务延时时间没有到,表示这个任务还不能返回,因此直接返回null。否则调用finishPoll方法,移除队列头元素并返回。
- public RunnableScheduledFuture> poll() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- RunnableScheduledFuture> first = queue[0];
- // 队列头任务是null,或者任务延时时间没有到,都返回null
- if (first == null || first.getDelay(NANOSECONDS) > 0)
- return null;
- else
- // 移除队列头元素
- return finishPoll(first);
- } finally {
- lock.unlock();
- }
- }
poll(long timeout, TimeUnit unit)
超时等待获取队列头元素,与take方法相比较,就要考虑设置的超时时间,如果超时时间到了,还没有获取到有用任务,那么就返回null。其他的与take方法中逻辑一样。
- public RunnableScheduledFuture> poll(long timeout, TimeUnit unit)
- throws InterruptedException {
- long nanos = unit.toNanos(timeout);
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- for (;;) {
- RunnableScheduledFuture> first = queue[0];
- // 如果没有任务。
- if (first == null) {
- // 超时时间已到,那么就直接返回null
- if (nanos <= 0)
- return null;
- else
- // 否则就让线程在available条件下等待nanos时间
- nanos = available.awaitNanos(nanos);
- } else {
- // 获取任务的剩余延时时间
- long delay = first.getDelay(NANOSECONDS);
- // 如果延时时间到了,就返回这个任务,用来执行。
- if (delay <= 0)
- return finishPoll(first);
- // 如果超时时间已到,那么就直接返回null
- if (nanos <= 0)
- return null;
- // 将first设置为null,当线程等待时,不持有first的引用
- first = null; // don't retain ref while waiting
- // 如果超时时间小于任务的剩余延时时间,那么就有可能获取不到任务。
- // 在这里让线程等待超时时间nanos
- if (nanos < delay || leader != null)
- nanos = available.awaitNanos(nanos);
- else {
- Thread thisThread = Thread.currentThread();
- leader = thisThread;
- try {
- // 当任务的延时时间到了时,能够自动超时唤醒。
- long timeLeft = available.awaitNanos(delay);
- // 计算剩余的超时时间
- nanos -= delay - timeLeft;
- } finally {
- if (leader == thisThread)
- leader = null;
- }
- }
- }
- }
- } finally {
- if (leader == null && queue[0] != null) // 唤醒等待任务的线程
- available.signal();
- lock.unlock();
- }
- }
remove删除指定元素
删除指定元素一般用于取消任务时,任务还在阻塞队列中,则需要将其删除。当删除的元素不是堆尾元素时,需要做堆化处理。
- public boolean remove(Object x) {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- int i = indexOf(x);
- if (i < 0)
- return false;
- //维护heapIndex
- setIndex(queue[i], -1);
- int s = --size;
- RunnableScheduledFuture> replacement = queue[s];
- queue[s] = null;
- if (s != i) {
- //删除的不是堆尾元素,则需要堆化处理
- //先向下堆化
- siftDown(i, replacement);
- if (queue[i] == replacement)
- //若向下堆化后,i位置的元素还是replacement,说明四无需向下堆化的,
- //则需要向上堆化
- siftUp(i, replacement);
- }
- return true;
- } finally {
- lock.unlock();
- }
- }
总结
使用优先级队列DelayedWorkQueue,保证添加到队列中的任务,会按照任务的延时时间进行排序,延时时间少的任务首先被获取。
本文标题:阻塞队列—DelayedWorkQueue源码分析
本文网址:http://www.shufengxianlan.com/qtweb/news49/399299.html
网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联