面试官:给我讲讲线程池(中)

前景回顾

在上一篇中我们通过线程池的继承关系,具体分析了线程池的抽象父类AbstractExecutorService中的submit、invokeAll、invokeAny方法。在本篇中,我们将会把视线放在ThreadPoolExecutor具体实现当中,通过源码分析我们将会明白7个参数是如何在源码中运转的。

使用场景

我们先回顾一下在实际场景下的业务代码,下面模拟了10个线程并行处理任务,然后停止线程池接受,最后等待线程池关闭。

public static void main(String[] args) throws InterruptedException {
        // 开启线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());
        // 开启10个任务并行处理
        for (int i = 0; i < 10; i++) {
            executor.execute(() -> {
                // 模拟业务代码
                try {
                    Thread.sleep(1000);
                       System.out.println("任务结束");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        // 暂停线程池任务接收
        executor.shutdown();
        // 等待线程池结束
        executor.awaitTermination(1,TimeUnit.MINUTES);
    }

构造函数

总共重载了4个构造函数,设置了默认的参数,这种设计思路大家可以借鉴,下面只展示了其中两个重要的构造函数。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             // 设置默认工厂,工厂中返回线程优先级为普通并且为非守护的线程
             Executors.defaultThreadFactory(), 
             // 默认拒绝策略为拒绝发生时直接抛出异常
             defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
              // 判断参数边界
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
              // 设置安全管理器,不在本篇考虑范围内
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
                 // 设置核心线程数
        this.corePoolSize = corePoolSize;
              // 设置最大线程数
        this.maximumPoolSize = maximumPoolSize;
              // 设置工作队列
        this.workQueue = workQueue;
              // 设置线程空闲时间
        this.keepAliveTime = unit.toNanos(keepAliveTime);
              // 设置线程工厂
        this.threadFactory = threadFactory;
              // 设置拒绝策略
        this.handler = handler;
}

execute方法

public void execute(Runnable command) {
              // 边界判断
        if (command == null)
            throw new NullPointerException();
              /**
                  判断当前工作线程数是否小于核心线程数
                  这里的ctl可以先认为它保存了线程池的工作线程数量和线程池状态
                  为什么一个变量可以表示两种状态后面会解释到
              **/
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
              // 添加工作线程,返回是否成功创建,成功创建则返回
            if (addWorker(command, true))
                return;
              /**
                  如果创建没成功,则重新获取线程池状态,关于线程池具体状态会在下文描述
                  重新获取的原因在于execute是线程安全的方法
                  那么就会存在多线程调用,在此期间线程池状态可能会发生变化,关闭或有新任务添加
                  所以重新获取线程池状态保持最新的状态
              **/
            c = ctl.get();
        }
              /**
                  运行到这,说明当前工作线程大于核心线程数或者创建工作线程不成功(线程池非Running)
                  判断当前线程是否运行并且任务队列是否成功添加任务
              **/
        if (isRunning(c) && workQueue.offer(command)) {
              // 重新检查线程池状态
            int recheck = ctl.get();
              // 如非运行状态且能够删除删除,则拒绝任务
            if (! isRunning(recheck) && remove(command))
                reject(command);
              // 如工作线程数为0,则添加工作线程,此种情况发生在工作线程在空闲时间销毁时
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
              /**
                  运行到此说明,线程池状态非running或添加任务队列不成功
                  则尝试添加工作线程,如果添加不成功,则拒绝任务
              **/
        else if (!addWorker(command, false))
            reject(command);
}

总结:阅读完execute方法后,我们可以总结线程池会先在小于核心线程数的时添加核心工作线程,在任务队列无法添加任务时添加非核心工作线程,在线程池非running状态任务队列满且工作线程满时拒绝任务。

回顾我们上篇提出的问题:当我们创建核心线程数10个,最大线程数20个,任务队列为无界队列的线程池,并同时来了30个任务。

问题一:请问线程池中的线程数为多少?

问题二:那假如我把任务队列改为大小为20的队列,那么现在最多可以接收多少请求?

通过源码的阅读我们现在可以很简单的回答这两个问题。

  • 问题一:通过源码可知前10个任务直接去创建核心工作线程,由于任务队列是无界的因此后20个任务直接加入了任务队列等待核心工作线程消费。
  • 问题二:如把任务队列改为容量为20的队列,那么现可接受最大(最大线程数+队列容量)=40个请求。

在阅读execute方法时,我们把ctl属性、addWorker当做了黑盒,只是通过作者注释和方法命名去判断方法大致做了什么操作,并且我们都知道execute是一个线程安全的方法,它可以由不同的线程去调用,但是在源码中我们也没有发现加锁的部分,小伙伴们肯定非常好奇这些底层方法是如何做到这些的。

CTL

    // 类型为原子整数类,增删改查都是原子操作
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 代表共有32-3=29位
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 代表最大容量为2^29-1 
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
    // 将线程池状态储存在整数字节的高位中,代表高3位代表线程池状态
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 因高3位代表线程池状态,此方法将低29位变为0就可以得到高3位状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 与上相同,将高3位变为0,得到工作线程数量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 将rs和wc的进行或运算
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    // 判断c是否小于s
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
    // 判断c是否大于等于s
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }
    // 因为只有Running小于shutdown通过此方法来判断
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
    // 尝试使用CAS的方式给ctl+1
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
    // 尝试使用CAS的方式给ctl-1
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

通过上方关于ctl的源码,我们可以看出作者将一个整数变量分为了两个部分,一部分用来表示线程池状态,另一部分来表示当前工作线程数,将高3位来表示线程池当前状态,后29位表示线程池大小。通过这里骚的面试官又可以出题了,问最大线程数最大可以设置为多少,又要杀倒一片。

或许会有小伙伴不懂位运算看不懂该段逻辑,又是左移又是右移的各种位运算,但其实先把方法大致的功能了解了并不影响后面源码的阅读。

由于该篇篇幅有限,推荐想要刨根问底的小伙伴查询一下问运算的资料。

线程池状态

在ctl属性的部分,我们会发现有如下几个枚举状态,那么都代表什么意思呢?

  • RUNNING :允许接收新任务并处理在任务队列中的任务。
  • SHUTDOWN:不接收新任务但处理在任务队列中的任务。
  • STOP:不接收新任务、不处理任务队列中任务、中断在处理中的任务
  • TIDYING:所有任务已结束、工作线程数为0、并会调用terminated()钩子方法
  • TERMINATED:terminated()钩子方法成功执行

在线程池中状态是这样子流转的:

  • RUNNING -> SHUTDOWN:调用线程池的shutdown()方法。
  • (RUNNING/SHUTDOWN) -> STOP:调用线程池的shutdownNow()方法。
  • SHUTDOWN -> TIDYING:当任务队列和工作线程都为空时。
  • STOP -> TIDYING:当工作线程为空时。
  • TIDYING -> TERMINATED:当terminated()钩子方法成功执行。

addWorker

该方法将会创建工作线程,并将创建数量控制在核心线程数或最大线程数,其中的firstTask为工作线程创建成功后执行的第一个任务,第二个参数代表是否为核心工作线程,最终返回线程是否创建成功。

private final HashSet workers = new HashSet();

private final ReentrantLock mainLock = new ReentrantLock();

private boolean addWorker(Runnable firstTask, boolean core) {
        // 给最外层循环设置标志,且该循环为死循环
        retry:
        for (;;) {
            // 获取ctl值
            int c = ctl.get();
            // 获取线程池状态
            int rs = runStateOf(c);
            if (
                // 判断是否为SHUTDOWN、STOP、TIDYING、TERMINATED其中之一
                rs >= SHUTDOWN 
                &&
                /**
                    只有在SHUTDOWN且无任务需要执行且任务队列非空的时候该段逻辑返回true
                    代表需要继续添加工作队列执行任务队列中任务
                **/
                ! (
                     // 状态为SHUTDOWN
                   rs == SHUTDOWN &&
                   // 无第一个任务需要执行
                   firstTask == null &&
                   // 任务队列非空
                   ! workQueue.isEmpty())
                  )
                return false;
            // 死循环
            for (;;) {
                // 获取当前工作线程数
                int wc = workerCountOf(c);
                if (
                    // 如当前数量超过最大容量直接返回
                    wc >= CAPACITY 
                    ||
                    // 如创建为核心工作线程则与最大核心线程大小比较,否则与最大线程数大小比较
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // CAS增加工作线程数,添加超过结束最外层循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // CAS执行没成功,值发生改变,需要重新读取CTL的值
                c = ctl.get();
                // 如线程池状态发生改变,重新执行最外层循环
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 创建工作线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 因该段代码将会对HashSet进行操作,所以使用重入锁加锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 重新获取状态
                    int rs = runStateOf(ctl.get());

                    if (
                        // rs为RUNNING状态
                        rs < SHUTDOWN 
                        ||
                        // 这种情况为池中工作线程到达空时时间被销毁但任务队列还有任务时
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 预检查线程是否可以启动
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                           // 将工作线程添加进workers集合中
                        workers.add(w);
                        // 记录工作线程数量最大到达的数量
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // 工作线程添加表示设置为成功
                        workerAdded = true;
                    }
                } finally {
                    // 可重入锁解锁
                    mainLock.unlock();
                }
                // 如工作线程添加成功,将工作线程启动
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 如工作线程启动失败,将工作线程从集合中去除
            if (! workerStarted)
                addWorkerFailed(w);
        }
        // 返回工作线程是否添加成功
        return workerStarted;
    }

通过该段源码,它在适当的时机把我们的task任务传递给了工作线程并创建,并将创建成功的工作线程加入集合中。其中CAS死循环的模式,是我们开发中可以借鉴学习的模式。

worker

通过源码,将task传递到worker中,并调用了start()方法,那么说明worker中肯定是一个线程并且有它自己的run方法,那么我们就很有必要探寻其中是如何进行编码的。

面试官:给我讲讲线程池(中)_第1张图片

上图是Worker类的继承关系图,可以看出Worker继承了AQS、实现了Runnable方法,那么我们就可以大胆的猜测他实现了某种锁的机制、并且可以被线程执行。

worker构造器
        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

    Worker(Runnable firstTask) {
            // 这里先看做设置标记
            setState(-1); 
            // 设置第一个将会执行的任务
            this.firstTask = firstTask;
            /**
                通过最开始通过线程池构造器传入的线程池工厂创建线程
                因为worker实现Runnable接口,那么它就可以通过传入新线程中
                可以推断出调用了thread.start()就会执行worker的run()方法
            **/
            this.thread = getThreadFactory().newThread(this);
}

run方法

public void run() {
    runWorker(this);
}

protected void beforeExecute(Thread t, Runnable r) { }

protected void afterExecute(Runnable r, Throwable t) { }

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 设置标志可被中断
        w.unlock();
        boolean completedAbruptly = true;
        try {
            while (
                // firstTask不为空或可以从任务队列中获取到任务
                task != null || (task = getTask()) != null) {
                w.lock();
                // 中断判断,在下篇介绍
               if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 钩子方法,子类实现
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    // 调用task的run方法,并抓住所有异常,由钩子方法处理
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 钩子方法,子类实现
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 将task置空
                    task = null;
                    // 完成的任务数+1
                    w.completedTasks++;
                    // 标志worker可用
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 执行工作线程退出
            processWorkerExit(w, completedAbruptly);
        }
    }

通过该段代码,可用分析出该段代码通过while循环一直从getTask()中获取任务,那么下面分析getTask方法。

getTask

private Runnable getTask() {
        boolean timedOut = false; 
        
        // 死循环
        for (;;) {
            // 获取当前状态
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (
                // SHUTDOWN、STOP、TIDYING、TERMINATED
                rs >= SHUTDOWN && 
                // STOP、TIDYING、TERMINATED或任务队列为空
                (rs >= STOP || workQueue.isEmpty())) {
                // 工作线程数量-1
                decrementWorkerCount();
                // return null之后上层runWorker将会退出while循环执行工作线程退出
                return null;
            }
            // 获取当前工作线程数量
            int wc = workerCountOf(c);

            // 判断是否允许超时:当允许核心线程超时为true或当前数量超过核心线程数
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if (
                // 当前工作线程数量超过最大数量或允许超时并且已经超时
                (wc > maximumPoolSize || (timed && timedOut))
                && 
                // 工作线程大于1或者任务队列为空
                (wc > 1 || workQueue.isEmpty())) {
                // 尝试CAS工作线程数量-1
                if (compareAndDecrementWorkerCount(c))
                    return null;
                // CAS不超过,继续下一次循环
                continue;
            }

            try {
                // 如果允许超时则调用poll方法等待设置定的超时时间,否则调用take方法一直阻塞等待任务获取
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // 获取到任务直接返回
                if (r != null)
                    return r;
                // 执行到这说明获取任务超时,设置超时标记位
                timedOut = true;
            } catch (InterruptedException retry) {
                // 线程被中断,设置超时标记为false,重新下一次循环
                timedOut = false;
            }
        }
    }

通过getTask方法我们可以看出,在设置允许核心线程超时或当前线程数大于核心线程数则表示超时开启,由此开关来判断调用阻塞队列中的阻塞方法还是非阻塞方法,一旦超时则返回null那么worker的run方法就会退出循环进入worker销毁过程,由此实现线程池线程数量的动态修改。

总结

本文通过通过execute方法作为切入点,带大家认识了CAS模式、锁模式以及是如何处理线程池状态。

在阅读源码的过程中,很多人喜欢刨根问底,但其实阅读源码就是一个不求甚解的过程,在实际阅读源码过程中调用栈可能会达到5-6层甚至可能更多层,这样子阅读源码其实是非常低效的,在一直往下深挖的过程中你会发现你的时间和精力在不断的被消耗,最后只明白了源码中的一部分的逻辑分支,和我们阅读源码的初衷完全不同。

所以我推荐阅读源码先阅读调用栈的1-2层,再往深了就不要去深究了 ,等到整体逻辑都看明白了可以再回过头来去学习哪些具体细节。

关注公众号:码农小张 查看更多精彩文章

面试官:给我讲讲线程池(中)_第2张图片

你可能感兴趣的