查看原文
其他

面试官:你在项目中用过 多线程 吗?

脚本之家 2022-09-23

The following article is from Java后端技术全栈 Author 田哥

 关注脚本之家”,与百万开发者在一起

出处:Java后端技术全栈(ID:jjs-2018)
如若转载请联系原公众号

最近,从去年到现在,我给小伙伴们做模拟面试已有100多场。有时候我也在想,现在真的很卷吗?大部分人第一次模拟面试结束,给我的感觉不像大家说的那么卷。

奇怪的现象

我在做模拟面试的过程中,无意中发现一个现象,就是如果问八股文,问知识点,在校生或刚刚毕业不久的,回答的斗殴挺好的,往往是工作三五年的在这一块非常欠缺。

我也私下问过很多人,为什么这种现象,主要原因差不多就是:

  • 每天太忙,没时间学习
  • 年纪大了,记不住
  • 就是不要想学

工作三五年的也不是就真的没有优点,他们的优点就是有大量的项目经验。换着问项目业务和设计之类的,他们明显占优势,但,问他们稍微往深的问,就会懵逼。比如:你们项目中使用到了Redis,用来干嘛,他们能立马回答上来。

如果继续追问:如何保证Redis和数据库中的数据一致性?然后就会稀里糊涂的回答。还有就是问他们Redis的持久化方式使用的是哪种?“这个没注意,不是我安装的”,继续问:那你觉得哪种方式更好,答案各种各样的都有。

总结起来就亮点:

  • 学生或新人,八股文占优势(也有一部分啥都不知道,啥也没去背的)。
  • 三五年有项目经验,但大部分都停留在用上面,稍微问题问题就容易暴露自己的家点(也有一小分部知道的比较多)

在模拟面试的时候,我问过很多人是否在项目中用过并发编程的相关技术,用了什么?

基本上都回答:用过线程池

好吧,接下来,那我们就以一个线程池的面试题来对比以上两类人的回答。

聊聊线程池

线程池核心参数

学生或新人:基本上都是一口气就能吧这些参数回答上来,另外有部分优秀的会对这些参数做一个解释。

三五年的:部分人能全部回答出来,一部分人能说出核心线程数、最大线程数,其他参数就吱吱呜呜的回答,还有一部分就是完全一脸懵逼。

我们来看看到底有哪些参数:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 
{
  • corePoolSize:核心线程数
  • maximumPoolSize:最大线程数
  • keepAliveTime:空闲时间
  • unit:空闲时间单位
  • workQueue:阻塞队列
  • threadFactory:线程工厂
  • handler:拒绝策略

问核心参数时,至少要回答corePoolSizemaximumPoolSizeworkQueuehandler。还是建议全部回答吧,反正也没几个参数。

线程池原理

学生或新人:按照八股文来回答一番,甚至有的在回答核心参数的时候,顺带着就会把线程池的原理给说了(刚刚遇到能说的,就顺带着多说点)。

三五年的:有部分人也会按照八股文上的来回答,有部分人是吱吱呜呜的,不知道在说啥,还有一分部人就是瞎说咯。

关于线程池原理,我这里借用网上一张图:

如果看图记不住,我也有办法,我们可以使用生活案例来理解。

  • 公司A:线程池
  • 公司A自己的员工:核心线程数
  • 公司A接到的订单:我们的业务线程
  • 公司A的仓库:阻塞队列
  • 公司B派的人:最大线程数

开始表演:

公司A接到订单,先给自己员工处理,如果自己员工处理不来了,就丢到公司A仓库里,如果仓库堆满了,这时候就去找公司B,公司B就派人(最大线程数)到公司A,订单持续爆棚,公司B派来的员工和公司A的员工都搞不来了,那就只能把后面来的订单拒绝掉(拒绝策略)。如果公司B的员工在公司A里吧任务做完了,闲着没事了,公司A也不会立马就让人家回公司B,毕竟人员来回还是有成本的,所以,可以适当的给点时间(keepAliveTime),是在没有什么任务了,那你们还是回公司B吧。

好了,按照这个故事去编就行了,也可以模仿着编其他故事,至少让面试官觉得你不是在背八股文。

核心线程数量设置

学生或新人:就算知道也是被八股文的,但很遗憾,问过十多个人,回答上来的应该占30%左右。

三五年的:问过几十个人,回答上来的寥寥无几,甚是遗憾。在项目中敢用线程池,却不知道如何设置核心线程数,这不是瞎搞吗?有的人能回答出CPU密集型和IO密集型,但问他哪些类型是CPU密集型、哪些是IO密集型?分表举两个例子,此时很多人都会慌的。

下面,我们来说说CPU密集型和IO密集型:

CPU密集型:这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。

比如:像加解密,压缩、计算等一系列需要大量耗费 CPU 资源的任务,大部分场景下都是纯 CPU 计算。

IO密集型:这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占 用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 :

核心线程数=CPU核心数量*2

比如:像 MySQL 数据库、文件的读写、网络通信等任务,这类任务不会特别消耗 CPU 资源,但是 IO 操作比较耗时,会占用比较多时间。

另外,线程的平均工作时间所占比例越高,就需要越少的线程;线程的平均等待时间所占比例越高,就需要越多的线程;

以上只是理论值,实际项目中建议在本地或者测试环境进行多次调优,找到相对理想的值大小。

线程是如何复用的

关于这个问题,目前我见过的,只有两个人能说个大概。

我们都知道,继承Thread类或者实现Runnable接口,然后调用其start()方法就可以启动线程了,但是如果调用run()方法就和调用普通方法一样。

我们来看看,线程池中,我们提交的线程实例(任务),在线程池中到底是怎么被执行的。

线程池中有个Worker的角色,我们调用execute(Runnable tak)时候,会创建一个Worker:

我们先来看看这个Worker是怎么定义的:

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    
    private static final long serialVersionUID = 6138294804551838833L;
    
    final Thread thread;
    Runnable firstTask;
    volatile long completedTasks;

    // Worker 只有这一个构造方法,传入 firstTask
    Worker(Runnable firstTask) {
        setState(-1); 
        this.firstTask = firstTask;
        // 调用 ThreadFactory 来创建一个新的线程,这里创建的线程到时候用来执行任务
        // 我们发现创建线程的时候传入的值是this,我们知道创建线程可以通过继承Runnable的方法,
        // Worker继承了Runnable,并且下面重写了run()方法
        this.thread = getThreadFactory().newThread(this);
    }

    // 由上面创建线程时传入的this,上面的thread启动后,会执行这里的run()方法,并且此时runWorker传入的也是this
    public void run() {
        runWorker(this);
    }
}

Worker继承了Runnable,并且下面重写了run()方法,这时候我们调用new Worker().start()方法后,就会调用Worker类中的run()方法。

此时的Worker和我们平时的Thread类就类似了

好了,我们再回到前面的说的execute()方法中来。

public void execute(Runnable command) 
    int c = ctl.get();

    // 如果当前线程数少于核心线程数,那么直接添加一个 worker 来执行任务,
    // 创建一个新的线程,并把当前任务 command 作为这个线程的第一个任务(firstTask)
    if (workerCountOf(c) < corePoolSize) {
        // 添加任务成功,那么就结束了。提交任务嘛,线程池已经接受了这个任务,这个方法也就可以返回了
        // 至于执行的结果,到时候会包装到 FutureTask 中。
        // 这里的true代表当前线程数小于corePoolSize,表示以corePoolSize为线程数界限
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 到这里说明,要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了
    // 如果线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(nullfalse);
    }
    // 如果 workQueue 队列满了,那么进入到这个分支
    // 这里的false代表当前线程数大于corePoolSize,表示以 maximumPoolSize 为界创建新的 worker
    // 如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}

这段代码里我们看到了很多地方都在调用接着addWorker()方法,我们来看看这个方法(方法内容有点多):

private boolean addWorker(Runnable firstTask, boolean core) {
    //相当于goto,虽然不建议滥用,看看大神们是如何使用吧
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 如果线程池已关闭,并满足以下条件之一,那么不创建新的 worker:
        // 1. 线程池状态大于 SHUTDOWN,其实也就是 STOP, TIDYING, 或 TERMINATED
        // 2. firstTask != null
        // 3. workQueue.isEmpty()
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            //这里就是通过core参数对当前线程数的判断
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    /*
     * 到这里,我们认为在当前这个时刻,可以开始创建线程来执行任务了,
     */


    // worker 是否已经启动
    boolean workerStarted = false;
    // 是否已将这个 worker 添加到 workers 这个 HashSet 中
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        // 把 firstTask 传给 worker 的构造方法
        w = new Worker(firstTask);
        // 取 worker 中的线程对象,之前说了,Worker的构造方法会调用 ThreadFactory 来创建一个新的线程
        final Thread t = w.thread;
        if (t != null) {
            // 这个是整个类的全局锁,因为关闭一个线程池需要这个锁,至少我持有锁的期间,线程池不会被关闭
            mainLock.lock();
            try {

                int c = ctl.get();
                int rs = runStateOf(c);

                // 小于 SHUTTDOWN 那就是 RUNNING
                // 如果等于 SHUTDOWN,前面说了,不接受新的任务,但是会继续执行等待队列中的任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // worker 里面的 thread 可不能是已经启动的
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    // 加到 workers 这个 HashSet 中
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize 用于记录 workers 中的个数的最大值
                    // 因为 workers 是不断增加减少的,通过这个值可以知道线程池的大小曾经达到的最大值
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 添加成功的话,启动这个线程
            if (workerAdded) {
                // 启动线程,最重要的就是这里,下面我们会讲解如何执行任务
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 如果线程没有启动,需要做一些清理工作,如前面 workCount 加了 1,将其减掉
        if (! workerStarted)
            addWorkerFailed(w);
    }
    // 返回线程是否启动成功
    return workerStarted;
}

请注意:t.start();这里的t其实就是我们创建的Worker对象,就回到我们前面说的,调用start()方法后,会执行到他的run()方法中来,我们继续看Worker中的run()方法。

//Worker的run方法
public void run() {
      runWorker(this);
}

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //取出需要执行的任务,
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //如果task不是null,或者去队列中取任务,注意这里会阻塞,后面会分析getTask方法
            while (task != null || (task = getTask()) != null) {
               //这个lock在这里是为了如果线程被中断,那么会抛出InterruptedException,而退出循环,结束线程
                w.lock();
                //判断线程是否需要中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                   //任务开始执行前的hook方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        
                        task.run();//这里就是直接调用run 方法
                        
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } finally {
                       ////任务开始执行后的hook方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;//清空task
                    w.completedTasks++;//完成数添加
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
           //Worker退出
           processWorkerExit(w, completedAbruptly);
    }
}

这里注意这行代码:

while (task != null || (task = getTask()) != null)

注释中已经说清楚了:如果task不是null,或者去队列中取任务,注意这里会阻塞。

另外,一个注意点:

task.run();

这个其实就是调用我们我们传入到execute(Runnable task)中的参数,也就是说,我们创建的Runnable对象,根本就不会去调用其start()方法,而是直接调用其run()方法。

我们在看看getTask()方法到底是做什么?

// 此方法有三种可能:
// 1. 阻塞直到获取到任务返回。我们知道,默认 corePoolSize 之内的线程是不会被回收的,
//      它们会一直等待任务
// 2. 超时退出。keepAliveTime 起作用的时候,也就是如果这么多时间内都没有任务,那么应该执行关闭
// 3. 如果发生了以下条件,此方法必须返回 null:
//    - 池中有大于 maximumPoolSize 个 workers 存在(通过调用 setMaximumPoolSize 进行设置)
//    - 线程池处于 SHUTDOWN,而且 workQueue 是空的,前面说了,这种不再接受新的任务
//    - 线程池处于 STOP,不仅不接受新的线程,连 workQueue 中的线程也不再执行
private Runnable getTask() {
    boolean timedOut = false

    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 两种可能
        // 1. rs == SHUTDOWN && workQueue.isEmpty()
        // 2. rs >= STOP
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // CAS 操作,减少工作线程数
            decrementWorkerCount();
            return null;
        }

        boolean timed;      // Are workers subject to culling?
        for (;;) {
            int wc = workerCountOf(c);
            // 允许核心线程数内的线程回收,或当前线程数超过了核心线程数,那么有可能发生超时关闭
            timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if (wc <= maximumPoolSize && ! (timedOut && timed))
                break;
            if (compareAndDecrementWorkerCount(c))
                return null;
            c = ctl.get();  // Re-read ctl
            // compareAndDecrementWorkerCount(c) 失败,线程池中的线程数发生了改变
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
        // wc <= maximumPoolSize 同时没有超时
        try {
            // 到 workQueue 中获取任务
            // 如果timed=wc > corePoolSize=false,我们知道核心线程数之内的线程永远不会销毁,则执行workQueue.take();我前面文章中讲过,take()方法是阻塞方法,如果队里中有任务则取到任务,如果没有任务,则一直阻塞在这里知道有任务被唤醒。
            //如果timed=wc > corePoolSize=true,这里将执行超时策略,poll(keepAliveTime, TimeUnit.NANOSECONDS)会阻塞keepAliveTime这么长时间,没超时就返回任务,超时则返回null.
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            // 如果此 worker 发生了中断,采取的方案是重试
            // 解释下为什么会发生中断,这个读者要去看 setMaximumPoolSize 方法,
            // 如果开发者将 maximumPoolSize 调小了,导致其小于当前的 workers 数量,
            // 那么意味着超出的部分线程要被关闭。重新进入 for 循环,自然会有部分线程会返回 null
            timedOut = false;
        }
    }
}

到这里,大家应该都知道了,线程池中到底是如何复用线程的吧。

我来总结一下:

线程池中,维护了一个Worker的内部类,其中Worker也实现了Runnable接口,重写了run()方法,在调用这个run()时候,会采用类似于死循环的while方式重复使用这个worker去获取任务并执行我们传入execute(Runnable task)方法的参数task(task存放在阻塞队列里)。

<END>

【轻薄款上架】 🕚

  推荐阅读:

终于!我找到程序员爱穿卫衣的原因了

破玩意 | 多线程 +1 的最快操作

10张图告诉你多线程那些破事

图解|打工人看腾讯这道多线程面试题

每日打卡赢积分兑换书籍入口

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存