AQS源码深入分析之条件队列-你知道Java中的阻塞队列是如何实现的吗?

本文基于JDK-8u261源码分析


1 简介

AQS源码深入分析之条件队列-你知道Java中的阻塞队列是如何实现的吗?_第1张图片

因为CLH队列中的线程,什么线程获取到锁,什么线程进入队列排队,什么线程释放锁,这些都是不受我们控制的。所以条件队列的出现为我们提供了主动式地、只有满足指定的条件后才能线程阻塞和唤醒的方式。对于条件队列首先需要说明一些概念:条件队列是AQS中除了CLH队列之外的另一种队列,每创建一个Condition实际上就是创建了一个条件队列,而每调用一次await方法实际上就是往条件队列中入队,每调用一次signal方法实际上就是往条件队列中出队。不像CLH队列上节点的状态有多个,条件队列上节点的状态只有一个:CONDITION。所以如果条件队列上一个节点不再是CONDITION状态时,就意味着这个节点该出队了。需要注意的是,条件队列只能运行在独占模式下

一般在使用条件队列作为阻塞队列来使用时都会创建两个条件队列:notFullnotEmpty。notFull表示当条件队列已满的时候,put方法会处于等待状态,直到队列没满;notEmpty表示当条件队列为空的时候,take方法会处于等待状态,直到队列有数据了。

而notFull.signal方法和notEmpty.signal方法会将条件队列上的节点移到CLH队列中(每次只转移一个)。也就是说,存在一个节点从条件队列被转移到CLH队列的情况发生。同时也意味着,条件队列上不会发生锁资源竞争,所有的锁竞争都是发生在CLH队列上的

其他一些条件队列和CLH队列之间的差异如下:

  • 条件队列使用nextWaiter指针来指向下一个节点,是一个单向链表结构,不同于CLH队列的双向链表结构;
  • 条件队列使用firstWaiter和lastWaiter来指向头尾指针,不同于CLH队列的head和tail;
  • 条件队列中的第一个节点也不会像CLH队列一样,是一个特殊的空节点;
  • 不同于CLH队列中会用很多的CAS操作来控制并发,条件队列进队列的前提是已经获取到了独占锁资源,所以很多地方不需要考虑并发。

下面就是具体的源码分析了。条件队列以ArrayBlockingQueue来举例:


2 构造器

 1  /**
 2   * ArrayBlockingQueue:
 3   */
 4  public ArrayBlockingQueue(int capacity) {
 5    this(capacity, false);
 6}
 7
 8  public ArrayBlockingQueue(int capacity, boolean fair) {
 9    if (capacity <= 0)
10        throw new IllegalArgumentException();
11    //存放实际数据的数组
12    this.items = new Object[capacity];
13    //独占锁使用ReentrantLock来实现(fair表示的就是公平锁还是非公平锁,默认为非公平锁)
14    lock = new ReentrantLock(fair);
15    //notEmpty条件队列
16    notEmpty = lock.newCondition();
17    //notFull条件队列
18    notFull = lock.newCondition();
19  }

3 put方法

  1  /**
  2   * ArrayBlockingQueue:
  3   */
  4  public void put(E e) throws InterruptedException {
  5    //非空校验
  6    checkNotNull(e);
  7    final ReentrantLock lock = this.lock;
  8    /*
  9    获取独占锁资源,响应中断模式。其实现代码和lock方法还有Semaphore的acquire方法是类似的
 10    因为这里分析的是条件队列,于是就不再分析该方法的细节了
 11     */
 12    lock.lockInterruptibly();
 13    try {
 14        while (count == items.length)
 15            //如果数组中数据已经满了的话,就在notFull中入队一个新节点,并阻塞当前线程
 16            notFull.await();
 17        //添加数组元素并唤醒notEmpty
 18        enqueue(e);
 19    } finally {
 20        //释放锁资源
 21        lock.unlock();
 22    }
 23  }

4 await方法

如果在put的时候发现数组已满,或者在take的时候发现数组是空的,就会调用await方法来将当前节点放入条件队列中:

 1  /**
 2   * AbstractQueuedSynchronizer:
 3   */
 4  public final void await() throws InterruptedException {
 5    //如果当前线程被中断就抛出异常
 6    if (Thread.interrupted())
 7        throw new InterruptedException();
 8    //把当前节点加入到条件队列中
 9    Node node = addConditionWaiter();
10    //释放之前获取到的锁资源,因为后续会阻塞该线程,所以如果不释放的话,其他线程将会等待该线程被唤醒
11    int savedState = fullyRelease(node);
12    int interruptMode = 0;
13    //如果当前节点不在CLH队列中则阻塞住,等待unpark唤醒
14    while (!isOnSyncQueue(node)) {
15        LockSupport.park(this);
16        /*
17        这里被唤醒可能是正常的signal操作也可能是被中断了。但无论是哪种情况,都会将当前节点插入到CLH队列尾,
18        并退出循环(注意,这里被唤醒除了上面两种情况之外,还有一种情况是操作系统级别的虚假唤醒(spurious wakeup),
19        也就是当前线程毫无理由就会被唤醒了,所以上面需要使用while来规避掉这种情况)
20         */
21        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
22            break;
23    }
24    //走到这里说明当前节点已经插入到了CLH队列中(被signal所唤醒或者被中断)。然后在CLH队列中进行获取锁资源的操作
25    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
26        /*
27        <<>>
28
29        之前分析过的如果acquireQueued方法返回true,说明当前线程被中断了
30        返回true意味着在acquireQueued方法中此时会再一次被中断(注意,这意味着有两个代码点判断线程是否被中断:
31        一个是在第15行代码处,另一个是在acquireQueued方法里面),如果之前没有被中断,则interruptMode=0,
32        而在acquireQueued方法里面线程被中断返回了,这个时候将interruptMode重新修正为REINTERRUPT即可
33        至于为什么不修正为THROW_IE是因为在这种情况下,第15行代码处已经通过调用signal方法正常唤醒了,
34        节点已经放进了CLH队列中。而此时的中断是在signal操作之后,在第25行代码处去抢锁资源的时候发生的
35        这个时候中断不中断已经无所谓了,所以就不需要抛出InterruptedException
36         */
37        interruptMode = REINTERRUPT;
38    /*
39    走到这里说明当前节点已经获取到了锁资源(获取不到的话就会被再次阻塞在acquireQueued方法里)
40    如果interruptMode=REINTERRUPT的话,说明之前已经调用过signal方法了,也就是说该节点已经从条件队列中剔除掉了,
41    nextWaiter指针肯定为空,所以在这种情况下是不需要执行unlinkCancelledWaiters方法的
42    而如果interruptMode=THROW_IE的话,说明之前还没有调用过signal方法来从条件队列中剔除该节点。这个时候就需要调用
43    unlinkCancelledWaiters方法来剔除这个节点了(在之前的transferAfterCancelledWait方法中
44    已经把该节点的状态改为了初始状态0),顺便把所有其他不是CONDITION状态的节点也一并剔除掉。注意:如果当前节点是条件队列中的
45    最后一个节点的话,并不会被清理。无妨,等到下次添加节点或调用signal方法的时候就会被清理了
46     */
47    if (node.nextWaiter != null)
48        unlinkCancelledWaiters();
49    //根据不同模式处理中断(正常模式不需要处理)
50    if (interruptMode != 0)
51        reportInterruptAfterWait(interruptMode);
52  }

5 addConditionWaiter方法

在条件队列中添加一个节点的逻辑:

 1  /**
 2   * AbstractQueuedSynchronizer:
 3   */
 4  private Node addConditionWaiter() {
 5    Node t = lastWaiter;
 6    /*
 7    如果最后一个节点不是CONDITION状态,就删除条件队列中所有不是CONDITION状态的节点
 8    至于为什么只需要判断最后一个节点的状态就能知道整个队列中是否有不是CONDITION的节点,后面会说明
 9     */        
10    if (t != null && t.waitStatus != Node.CONDITION) {
11        //删除所有不是CONDITION状态的节点
12        unlinkCancelledWaiters();
13        t = lastWaiter;
14    }
15    //创建一个类型为CONDITION的新节点
16    Node node = new Node(Thread.currentThread(), Node.CONDITION);
17    if (t == null)
18        //t为null意味着此时条件队列中为空,直接将头指针指向这个新节点即可
19        firstWaiter = node;
20    else
21        //t不为null的话就说明此时条件队列中有节点,直接在尾处加入这个新节点
22        t.nextWaiter = node;
23    //尾指针指向这个新节点,添加节点完毕
24    lastWaiter = node;
25    /*
26    注意,这里不用像CLH队列中的enq方法一样,如果插入失败就会自旋直到插入成功为止
27    因为此时还没有释放独占锁
28     */
29    return node;
30  }
31
32  /**
33   * 第12行代码处:
34   * 删除条件队列当中所有不是CONDITION状态的节点
35   */
36  private void unlinkCancelledWaiters() {
37    Node t = firstWaiter;
38    /*
39    在下面的每次循环中,trail指向的是从头到循环的节点为止,最后一个是CONDITION状态的节点
40    这样做是因为要剔除队列中间不是CONDITION的节点,就需要保留上一个是CONDITION节点的指针,
41    然后直接trail.nextWaiter = next就可以断开了
42     */
43    Node trail = null;
44    while (t != null) {
45        Node next = t.nextWaiter;
46        if (t.waitStatus != Node.CONDITION) {
47            t.nextWaiter = null;
48            if (trail == null)
49                firstWaiter = next;
50            else
51                trail.nextWaiter = next;
52            if (next == null)
53                lastWaiter = trail;
54        } else
55            trail = t;
56        t = next;
57    }
58  }

6 fullyRelease方法

释放锁资源,包括可重入锁的所有锁资源:

 1  /**
 2   * AbstractQueuedSynchronizer:
 3   */
 4  final int fullyRelease(Node node) {
 5    boolean failed = true;
 6    try {
 7        int savedState = getState();
 8        /*
 9        释放锁资源。注意这里是释放所有的锁,包括可重入锁有多次加锁的话,会一次性全部释放。因为在上一行
10        代码savedState存的是所有的锁资源,而这里就是释放这些所有的资源,这也就是方法名中“fully”的含义
11         */
12        if (release(savedState)) {
13            failed = false;
14            return savedState;
15        } else {
16            /*
17            释放失败就抛异常,也就是说没有释放干净,可能是在并发的情景下state被修改了的原因,
18            也可能是其他原因。注意如果在这里抛出异常了那么会走第166行代码
19             */
20            throw new IllegalMonitorStateException();
21        }
22    } finally {
23        /*
24        如果释放锁失败,就把节点置为CANCELLED状态。比较精妙的一点是,在之前addConditionWaiter方法中的第10行代码处,
25        判断条件队列中是否有不是CONDITION的节点时,只需要判断最后一个节点的状态是否是CONDITION就行了
26        按常理来说,是需要遍历整个队列才能知道的。但是条件队列每次添加新节点都是插在尾处,而如果释放锁失败,
27        会将这个新添加的、在队列尾巴的新节点置为CANCELLED状态。而之前的CONDITION节点必然都是在队头
28        因为如果此时再有新的节点入队的话,会首先在addConditionWaiter方法中的第12行代码处将所有不是CONDITION的节点都剔除了
29        也就是说无论什么情况下,如果队列中有不是CONDITION的节点,那它一定在队尾,所以只需要判断它就可以了
30         */
31        if (failed)
32            node.waitStatus = Node.CANCELLED;
33    }
34  }

7 isOnSyncQueue方法

判断节点是否在CLH队列中,以此来判断唤醒时signal方法是否完成。当然,在transferAfterCancelledWait方法中也会调用到本方法:

 1  /**
 2   * AbstractQueuedSynchronizer:
 3   * 判断节点是否在CLH队列中
 4   */
 5  final boolean isOnSyncQueue(Node node) {
 6    /*
 7    如果当前节点的状态是CONDITION或者节点没有prev指针(prev指针只在CLH队列中的节点有,
 8    尾插法保证prev指针一定有)的话,就返回false
 9     */
10    if (node.waitStatus == Node.CONDITION || node.prev == null)
11        return false;
12    //如果当前节点有next指针(next指针只在CLH队列中的节点有,条件队列中的节点是nextWaiter)的话,就返回true
13    if (node.next != null)
14        return true;
15    //如果上面无法快速判断的话,就只能从CLH队列中进行遍历,一个一个地去进行判断了
16    return findNodeFromTail(node);
17  }
18
19  /**
20   * 遍历判断当前节点是否在CLH队列其中
21   */
22  private boolean findNodeFromTail(Node node) {
23    Node t = tail;
24    for (; ; ) {
25        if (t == node)
26            return true;
27        if (t == null)
28            return false;
29        t = t.prev;
30    }
31  }

8 checkInterruptWhileWaiting方法

判断唤醒时属于的状态(0 / THROW_IE / REINTERRUPT):

 1  /**
 2   * AbstractQueuedSynchronizer:
 3   * 如果当前线程没有被中断过,则返回0
 4   * 如果当前线程被中断时没有被signal过,则返回THROW_IE
 5   * 如果当前线程被中断时已经signal过了,则返回REINTERRUPT
 6   */
 7  private int checkInterruptWhileWaiting(Node node) {
 8    return Thread.interrupted() ?
 9            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
10            0;
11  }
12
13  /**
14   * 本方法是用来判断当前线程被中断时有没有发生过signal,以此来区分出THROW_IE和REINTERRUPT。判断的依据是:
15   * 如果发生过signal,则当前节点的状态已经不是CONDITION了,并且在CLH队列中也能找到该节点。详见transferForSignal方法
16   * 

17 * THROW_IE:表示在线程中断发生时还没有调用过signal方法,这个时候我们将这个节点放进CLH队列中去抢资源, 18 * 直到抢到锁资源后,再把这个节点从CLH队列和条件队列中都删除掉,最后再抛出InterruptedException 19 *

20 * REINTERRUPT:表示在线程中断发生时已经调用过signal方法了,这个时候发不发生中断实际上已经没有意义了, 21 * 因为该节点此时已经被放进到了CLH队列中。而且在signal方法中已经将这个节点从条件队列中剔除掉了 22 * 此时我们将这个节点放进CLH队列中去抢资源,直到抢到锁资源后(抢到资源的同时就会将这个节点从CLH队列中删除), 23 * 再次中断当前线程即可,并不会抛出InterruptedException 24 */ 25 final boolean transferAfterCancelledWait(Node node) { 26 //判断一下当前的节点状态是否是CONDITION 27 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { 28 /* 29 如果CAS成功了就表示当前节点是CONDITION状态,此时就意味着interruptMode为THROW_IE 30 然后会进行CLH队列入队,随后进行抢锁资源的操作 31 */ 32 enq(node); 33 return true; 34 } 35 /* 36 如果CAS失败了的话就意味着当前节点已经不是CONDITION状态了,说明此时已经调用过signal方法了, 37 但是因为之前已经释放锁资源了,signal方法中的transferForSignal方法将节点状态改为CONDITION 38 和将节点入CLH队列的这两个操作不是原子操作,所以可能存在并发的问题。也就是说可能会存在将节点状态改为CONDITION后, 39 但是还没入CLH队列这个时间点。下面的代码考虑的就是这种场景。这个时候只需要不断让渡当前线程资源, 40 等待signal方法将节点添加CLH队列完毕后即可 41 */ 42 while (!isOnSyncQueue(node)) 43 Thread.yield(); 44 return false; 45 }


9 reportInterruptAfterWait方法

中断唤醒最后的处理:

 1  /**
 2   * AbstractQueuedSynchronizer:
 3   */
 4  private void reportInterruptAfterWait(int interruptMode)
 5        throws InterruptedException {
 6    if (interruptMode == THROW_IE)
 7        //如果是THROW_IE最终就会抛出InterruptedException异常
 8        throw new InterruptedException();
 9    else if (interruptMode == REINTERRUPT)
10        //如果是REINTERRUPT就仅仅是“中断”当前线程而已(只是设置中断标志位为true)
11        selfInterrupt();
12  }

10 enqueue方法

ArrayBlockingQueue的入队逻辑:

 1  /**
 2   * ArrayBlockingQueue:
 3   */
 4  private void enqueue(E x) {
 5    final Object[] items = this.items;
 6    //插入数据
 7    items[putIndex] = x;
 8    //putIndex记录的是下次插入的位置。如果putIndex已经是最后一个了,重新复位为0,意味着数据可能会被覆盖
 9    if (++putIndex == items.length)
10        putIndex = 0;
11    //当前数组中的数量+1
12    count++;
13    /*
14    如果notEmpty条件队列不为空的话,唤醒notEmpty条件队列中的第一个节点去CLH队列当中去排队抢资源
15    如果notEmpty里没有节点的话,说明此时数组没空。signal方法将不会有任何作用,因为此时没有阻塞住的take线程
16     */
17    notEmpty.signal();
18  }

11 signal方法

查看是否需要唤醒条件队列中的节点,需要就进行唤醒(将节点从条件队列中转移到CLH队列中):

 1  /**
 2   * AbstractQueuedSynchronizer:
 3   */
 4  public final void signal() {
 5    //如果当前线程不是加锁时候的线程,就抛出异常
 6    if (!isHeldExclusively())
 7        throw new IllegalMonitorStateException();
 8    Node first = firstWaiter;
 9    if (first != null)
10        //如果notEmpty条件队列中有节点的话,就通知去CLH队列中排队抢资源
11        doSignal(first);
12  }
13
14  private void doSignal(Node first) {
15    do {
16        if ((firstWaiter = first.nextWaiter) == null)
17            //等于null意味着循环到此时条件队列已经空了,那么把lastWaiter也置为null
18            lastWaiter = null;
19        //断开notEmpty条件队列中当前节点的nextWaiter指针,也就相当于剔除当前节点,等待GC
20        first.nextWaiter = null;
21    } while (!transferForSignal(first) &&
22            //如果当前节点已经不是CONDITION状态的话(就说明当前节点已经失效了),就选择下一个节点尝试放进CLH队列中
23            (first = firstWaiter) != null);
24  }
25
26  /**
27   * 将notEmpty条件队列中的节点从条件队列移动到CLH队列当中
28   * 第21行代码处:
29   */
30  final boolean transferForSignal(Node node) {
31    /*
32    如果notEmpty条件队列中的节点已经不是CONDITION状态的时候,就直接返回false,
33    跳过该节点,相当于把该节点剔除出条件队列
34     */
35    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
36        return false;
37
38    //走到这里说明该节点的状态已经被修改成了初始状态0。把其加入到CLH队列尾部,并返回前一个节点
39    Node p = enq(node);
40    int ws = p.waitStatus;
41    /*
42    再来复习一下,SIGNAL状态表示当前节点是阻塞状态的话,上一个节点就是SIGNAL。notEmpty条件队列中的
43    节点此时还是处于阻塞状态,所以此时将这个节点移动到CLH队列后就需要将前一个节点的状态改为SIGNAL
44    如果CAS修改失败了的话,就将这个节点所在的线程唤醒去竞争锁资源,结局肯定是没抢到(因为锁资源是
45    当前线程所持有着),所以会在acquireQueued方法中继续被阻塞住的,而且在这其中会再次修正前一个节点
46    的SIGNAL状态(必定是要修改成功的,如果修改不成功,就会一直在acquireQueued方法中循环去CAS修改)
47    当然如果前一个节点是CANCELLED状态的话,也去唤醒这个节点。这样acquireQueued方法中有机会去剔除掉
48    这些CANCELLED节点,相当于做了次清理工作
49    需要提一下的是,该处是唤醒被阻塞住的take线程(之前数组一直是空的,现在添加了一个节点
50    后数组就不为空了,所以需要唤醒之前被阻塞住的一个拿取线程。假设这个被唤醒的线程是线程2,执行唤醒动作
51    的是线程1)。如前面所说,线程2会进入到acquireQueued方法中再次被阻塞住。直到线程1走到put方法中的
52    最后一步unlock解锁的时候会被再次唤醒(也不一定就是这次会被唤醒,也有可能唤醒的是其他的线程(假如说
53    是线程3)。但只要线程3最后执行unlock方法的时候,就会继续去唤醒,相当于把这个唤醒的动作给传递下去了
54    那么线程2最终就会有机会被唤醒(等到它变成CLH队列中的第一个节点的时候))
55     */
56    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
57        LockSupport.unpark(node.thread);
58    return true;
59  }

12 take方法

ArrayBlockingQueue的take方法:

 1  /**
 2   * ArrayBlockingQueue:
 3   */
 4  public E take() throws InterruptedException {
 5    final ReentrantLock lock = this.lock;
 6    //响应中断模式下的加锁
 7    lock.lockInterruptibly();
 8    try {
 9        while (count == 0)
10            //如果数组为空的话,就在notEmpty中入队一个新节点,并阻塞当前线程
11            notEmpty.await();
12        //删除数组元素并唤醒notFull
13        return dequeue();
14    } finally {
15        //解锁
16        lock.unlock();
17    }
18  }
19
20  /**
21   * 第13行代码处:
22   */
23  private E dequeue() {
24    final Object[] items = this.items;
25    //记录旧值并最终返回出去
26    @SuppressWarnings("unchecked")
27    E x = (E) items[takeIndex];
28    //将数组元素清空
29    items[takeIndex] = null;
30    //takeIndex记录的是下次拿取的位置。如果takeIndex已经是最后一个了,重新复位为0
31    if (++takeIndex == items.length)
32        takeIndex = 0;
33    //当前数组中的数量-1
34    count--;
35    //elementDequeued方法在数组中移除数据时会被调用,以保证Itrs迭代器和队列数据的一致性
36    if (itrs != null)
37        itrs.elementDequeued();
38    /*
39    如果notFull条件队列不为空的话,唤醒notFull条件队列中的第一个节点去CLH队列当中去排队抢资源
40    如果notFull里没有节点的话,说明此时数组没满。signal方法将不会有任何作用,因为此时没有阻塞住的put线程
41     */
42    notFull.signal();
43    return x;
44  }

更多内容请关注微信公众号:奇客时间

你可能感兴趣的