一个线程池的运行流程如下:

image-20230407104446460

看下面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class TestExecutionWebServer {

private static final int READS = 100;

private static final Executor executor = Executors.newFixedThreadPool(READS);

public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(80);
while (true) {
final Socket socket = serverSocket.accept();
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println("do Something");
}
};
// new Thread(task).start();
executor.execute(task);
}
}
}

被注释掉的那一行,就是没有使用线程池的写法。它会为每一个到来的请求都新建一个线程去执行任务,但是这样做有一个弊端,就是会无限制的创建线程,有100个请求就创建100个线程,然后用完再销毁,频繁的线程创建和销毁会给系统带来很大的压力,而且无休止的创建线程也会导致系统耗尽资源而崩溃。

而使用了线程池,它会固定线程的数量,每次来任务时,就从线程池里面取一个闲置的线程去执行任务,而且线程池里面的线程都是创建好了的,省略了创建线程和销毁线程的资源消耗。

Executor的生命周期

由于executor以异步的方式执行,所以任意时刻,之前提交的任务状态都是不可知的,可能运行结束,可能还在执行或者排队。所以关闭 时需要将操作中受影响的任务状态返回给应用程序。

为了解决这个问题,Executor扩展了ExecutorService接口,添加了一些有关生命周期的方法。比如说shutdown,shutdownNow,isShutdown等。下面是一个支持关闭的服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class TestExecutionWebServer {

private static final int READS = 100;

private static final ExecutorService executor = Executors.newFixedThreadPool(READS);

public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(80);
while (!executor.isShutdown()) {
try {
final Socket socket = serverSocket.accept();
Runnable task = () -> handleRequest(socket);
executor.execute(task);
} catch (RejectedExecutionException e) {
if (!executor.isShutdown()) {
e.printStackTrace();
}
}
}
}

public void stop() {
executor.shutdown();
}

static void handleRequest(Socket socket) {
System.out.println("do something with socket");
}
}

线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。

ThreadPoolExecutor的运行状态有5种:

RUNNING:能接受新提交的任务,也能处理阻塞队列中的任务。

SHUTDOWN:关闭状态,不接受新任务,但可以处理阻塞队列中已保存的任务。

STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。

TIDYING:所有任务已经终止,workerCount(有效线程数)为0。

TERMINATED:在terminated()方法执行完成后进入该状态。

线程转变过程如下:

image-20230407104821174

任务执行机制

任务调度

所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。具体执行过程如下图:

image-20230407105329566

注:这里的核心数是指操作系统的核心线程数,由于Java线程的设计是采用1:1的设计方案,也就是说每一个Java线程都绑定了一个操作系统的核心线程。

任务缓冲

线程池本质是对任务和线程的管理,做到这一点的关键是将两者解耦,不让两者关联才可以做后续的分配工作。而这一点是通过采用生产者-消费者模式,通过一个阻塞队列实现的。阻塞队列缓存任务,然后工作线程从中获取任务。

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

image-20230407111347618

任务申请

任务的执行有两种可能:

1、任务直接由新创建的线程执行。

2、线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去从队列中申请任务再去执行。

第一种情况仅出现在线程初始创建的时候,第二种是线程获取任务绝大多数的情况。执行流程如下:

image-20230407112022917

任务拒绝

任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,以保护线程池。

Worker线程管理

线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker。

Worker线程

Worker这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。

image-20230407121025966

这里的非核心线程创建区别于核心线程。在线程池中,核心线程一般是初始化时创建的,非核心线程是在任务提交到线程池后,如果当前线程池中的线程数还没有达到最大线程数,就会创建新的线程来执行任务。

生命周期管理的实现

线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。线程池使用一张Hash表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行。

Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。

lock方法一旦获取了独占锁,表示当前线程正在执行任务中,则不应该中断线程。如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。

线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。

image-20230407122748731

Worker线程增加

增加线程的执行流程如下所示:

image-20230407123051268

Worker线程回收

线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。

Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用。

Worker线程执行任务

Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:

有一个while循环会不断的从阻塞队列中获取任务,如果线程池正在停止,就要保证当前线程是中断状态。然后开始执行任务,直到从阻塞队列获取任务返回为空时,销毁线程。流程图如下所示:

image-20230407123642437

参考

《Java并发编程实战》

美团技术博客