多图详解阻塞队列——SynchronousQueue

一、阻塞队列BlockingQueue概述

在线程池(ThreadPoolExecutor)的构造函数中,有一个代表阻塞队列的入参——BlockingQueue,它是一个接口,只要实现了这个接口的所有实现类,都可以作为阻塞队列而应用在线程池中。如下是线程池ThreadPoolExecutor的构造方法:

多图详解阻塞队列——SynchronousQueue_第1张图片

BlockingQueue作为阻塞队列接口,提供了4种插入/移除元素的方法。根据插入/移除元素失败后的不同处理方式,分为:

抛异常(Throws exception)
特殊值(Special value)
阻塞(Blocks)
超时(Times out)

在我们日常的编码中,可以根据不同的使用场景,选择不同的插入/移除元素的方法。如下截取自BlockingQueue类的源码注释:

多图详解阻塞队列——SynchronousQueue_第2张图片

基于不同的阻塞队列需求场景,衍生出了多种BlockingQueue接口的具体实现类。本篇我们就针对SynchronousQueue这个阻塞队列进行源码解析。如下是阻塞队列的不同具体实现类:

多图详解阻塞队列——SynchronousQueue_第3张图片

二、为什么要解析SynchronousQueue呢?

我们在学习线程池的时候,学习了newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor等多种类型的线程池,他们其实都比较好理解,但是唯独这个newCachedThreadPool的解释,总感觉理解并不清晰,它是怎么Cached的呢?网上有的文章是这样解释的:

多图详解阻塞队列——SynchronousQueue_第4张图片

那我们想要了解newCachedThreadPool,就避免不了要了解一下它使用了什么阻塞队列。因为它设置自己的核心线程数是0。什么意思?就是说,只要有任务在newCachedThreadPool这个线程池里执行,那么它不会立刻被创建的线程执行,而是要直接放到阻塞队列中进行处理。 那么阻塞队列的具体实现是什么,就会影响到这个任务的处理逻辑。而newCachedThreadPool采用的阻塞队列就是SynchronousQueue。如下是newCachedThreadPool的构造方法:

多图详解阻塞队列——SynchronousQueue_第5张图片

那么,我们在去网上查一下SynchronousQueue这个阻塞队列是怎么处理元素的添加/删除的,如下所示:

多图详解阻塞队列——SynchronousQueue_第6张图片

看完网上的解释,别的先不说,对SynchronousQueue的“神秘性”真是激起了我的好奇。“很奇怪的队列”、“都不能叫队列”、“没有存储空间”、“必须结伴而行”……,what are you 弄啥咧?

多图详解阻塞队列——SynchronousQueue_第7张图片

所以,基于以上的好奇,就产生了今天的这篇基于SynchronousQueue的源码解析文章了。好的,废话不多说了,我们来揭开它那神秘的面纱吧!

三、从哪里开始呢?

既然我们是通过线程池引出的SynchronousQueue源码解析的话题,那么就拿线程池作为开始的起点吧!

在线程池的execute()方法的逻辑中,调用了阻塞队列的 offer() 方法和 poll() 方法,所以,我们就以这两个方法为入口,来解析一下SynchronousQueue的代码逻辑。如下是线程池逻辑中使用到阻塞队列的逻辑圈选:

多图详解阻塞队列——SynchronousQueue_第8张图片

SynchronousQueue提供了两个模式的——公平模式(队列)和非公平模式(堆栈),可以通过构造函数入参fair来进行选择,如下所示:

3ef6dd6337172c56a22f5cb844cd49fd.png

TransferQueue和TransferStack有相同的父类Transferer。它是一个抽象类,只提供了一个transfer(...)方法,用于操作插入或消费元素。如下所示:

多图详解阻塞队列——SynchronousQueue_第9张图片

在构造newCacheThreadPool实例时,使用的是无参的构造方法,而SynchronousQueue的无参构造方法采用的是非公平模式,即:TransferStack。如下所示:

多图详解阻塞队列——SynchronousQueue_第10张图片

那么,我们就针对TransferStack类,来解析一下它所实现的 offer() 和 poll() 这两个方法。

四、源码解析

offer()方法中的逻辑很简单,就是调用了TransferStacktransfer(...)方法,源码和注释如下所示:

多图详解阻塞队列——SynchronousQueue_第11张图片

我们再看看poll()方法,也是调用了TransferStacktransfer(...)方法,只是入参不一样,第一个入参传入了null。源码和注释如下所示:

多图详解阻塞队列——SynchronousQueue_第12张图片

在transfer(...)方法中,主要逻辑分为四部分,如下图红框所示,下面我们就针对这四个部分,对transfer(...)方法的逻辑进行解析。

多图详解阻塞队列——SynchronousQueue_第13张图片

4.1> SNode和mode解析

首先,根据e是否为null,来确定后面待插入节点的模式(mode)。因为这个入参e就是我们调用offer(...)方法传的值,所以,只要是调用了offer(...)方法,e都不会为null,那么mode就对应DATA;而如果我们调用poll()方法时,e永远是null,所以mode就对应REQUEST。源码如下所示:

多图详解阻塞队列——SynchronousQueue_第14张图片

关于节点模式mode,一共有3种,如下表格所示:

多图详解阻塞队列——SynchronousQueue_第15张图片

节点模式mode就存储在SNode的数据结构中,SNode也是我们维护堆栈结果的节点,它包含如下内容:

多图详解阻塞队列——SynchronousQueue_第16张图片

节点模式mode和SNode是我们了解剩下逻辑的前提条件,所以,还是需要大家重视起来的。好的,下面我们就开始具体的操作逻辑了。

4.2> case1:如果栈顶没有元素or本次操作的mode与栈顶的节点mode相同

这部分代码主要是针对“如果栈顶没有元素”或者“新来的节点与栈顶的节点mode相同”这两种情况来做逻辑处理的,如下所示:

多图详解阻塞队列——SynchronousQueue_第17张图片

在内部处理逻辑中,采用了两部分内容,即:case1-1case1-2。下面我们会分别针对这两部分进行详细解析。Part2部分整体代码如下图所示:

多图详解阻塞队列——SynchronousQueue_第18张图片

4.2.1> case1-1:如果是超时机制,并且超时了,则直接返回null

在if判断逻辑中if(timed && nanos <= 0) ,只有设置了超时时间(timed等于true)并且已经超时了(nanos小于等于0),才会进入里面的逻辑代码,如下所示:

多图详解阻塞队列——SynchronousQueue_第19张图片

在里面的逻辑代码中if(h != null && h.isCancelled()),如果head节点是cancelled状态,则将head指针指向下一个节点。如下是判断cancelled的逻辑:

多图详解阻塞队列——SynchronousQueue_第20张图片

如果head节点不是cancelled状态,那么我们也不需要对它做什么额外操作了,直接在else语句块中执行return null;就可以了。

4.2.2> case1-2:创建新节点并且将该节点设置为head节点

如果没设置超时机制,或者设置了超时时间但是还没有超时,则会进入如下代码块中:

多图详解阻塞队列——SynchronousQueue_第21张图片

首先调用了awaitFulfill(...)方法,用于根据mode进行节点的匹配,获得能与s匹配的节点m。此处的awaitFulfill(...)方法内部逻辑比较多,下面我们会专门的针对这部分源码进行讲解。

如果发现s匹配的节点是它本身,即:if (m == s),则调用clean(s)方法执行清理操作,并return null进行返回。clean(s)方法也会在下面部分进行源码解析。

如果head指针不为空,并且s是head的后置节点,那么会将head指针更新为s的后置节点。这么做的原因是,

如果mode是REQUEST,则说明是消费者要获取阻塞队列中的值,所以返回m中存储的值,即:m.item;否则返回s中存储的值,即:s.item。

4.2.2.1> awaitFulfill(...)方法源码解析

在awaitFulFill(...)方法中,如果配置了超时机制,那么会计算出一个deadline作为超时时间,并且计算自旋spins次数。

56cb80aa0e6127d65a8e93eeeb8ac5a6.png

通过shouldSpin(s)方法来确定是否执行自旋。有3种情况下是可以自旋的,如下图中方法注释所示:

多图详解阻塞队列——SynchronousQueue_第22张图片

做完上面的准备工作之后,就开启了无限循环逻辑,如下图所示:

多图详解阻塞队列——SynchronousQueue_第23张图片

针对于无限for循环逻辑部分,为了更好的理解,我将其画了一张流程图。从流程图中我们可以看到当前线程要么在自旋spin中,要么就park阻塞或parkNanos超时阻塞。只有当节点s找到了匹配的节点(s.match != null),才会跳出无限循环。对于返回的结果有两种情况:要么返回匹配到了的节点,要么返回的是自己(详情请见s.tryCancel()部分)。流程图如下所示:

多图详解阻塞队列——SynchronousQueue_第24张图片

4.2.2.2> clean(...)方法源码解析

通过clean(...)方法,将head指向“正常”的节点(非cancelled节点),并且去除掉cancelled状态的节点链接。

多图详解阻塞队列——SynchronousQueue_第25张图片

4.3> case2:head节点模式不是待匹配节点(FULFILLING)

本段代码是针对于head节点模式不是FULFILLING来做处理的。它也是分为两大部分,即:if ... else if 结构的逻辑判断。相关源码如下所示:

多图详解阻塞队列——SynchronousQueue_第26张图片

4.3.1> 如果head节点是cancelled状态

本段逻辑比较简单,如果发现head指针指向的节点是取消状态的(cancelled),则更新head指针,指向它的后置节点。代码逻辑如下所示:

eac79a21da9d6b1b57d136715eade3ad.png

4.3.2> 如果head节点状态正常

新建一个"FULFILLING + mode"模式(即:变为等待匹配模式)的节点压入栈顶,然后开启无尽循环模式。在循环中会通过tryMatch进行匹配操作,如果匹配成功,即:m.match等于null,那么将s赋值给m的match属性,更新head指针,并返回节点的中内容。源码注释如下所示:

多图详解阻塞队列——SynchronousQueue_第27张图片

4.4> case3:head节点模式是待匹配节点(FULFILLING)

在case3中,处理的就是head节点为匹配中(FULFILLING)状态的情况。它与case2蛮像的,区别大致两点:其一:没有根据入参创建新的SNode节点,因为本段逻辑就是让匹配中的head节点“尽快”匹配。其二:如果通过tryMatch()方法找到了待匹配的节点,此时它没有像case2那样直接return返回节点的具体内容

那为什么没有直接返回呢?原因就在我们在case1中介绍的awaitFulfill(...)方法中,head节点在自旋或阻塞中,在tryMatch方法执行完毕后,会通过awaitFulfill方法返回匹配的节点,然后执行返回节点上具体值的操作。

多图详解阻塞队列——SynchronousQueue_第28张图片

tryMatch()方法中,除了将match赋值为s,表明当前SNode节点与s节点匹配上了。其次,别忘记了,还有waiter线程在阻塞或自旋呢,所以,通过调用unpark方法,将其解除阻塞。源码注释如下所示:

多图详解阻塞队列——SynchronousQueue_第29张图片

今天的文章内容就这些了,最后一句话:

写作不易,笔者几个小时甚至数天完成的一篇文章,只愿换来您几秒钟的点赞&分享。

更多技术干活,欢迎大家关注公众号“爪哇缪斯”(^o^)/~ 「干货分享,每周更新」

你可能感兴趣的