原文地址:https://www.javacodegeeks.com/2016/12/implement-thread-pool-java.html

在程序执行的时候,每一个线程都是独立执行的。Java中的每一个线程都继承了java.lang.Thread类或者实现了java.lang.Runnable接口。

多线程意味着在一个任务中有两个或多个线程在执行。在多线程环境中,每个任务都可能同时有很多并发线程正在同步或异步的执行。你可以在这里看到我写的另一篇关于线程和多线程的教程。

1. 什么是线程池

线程池包含了一组可以用来执行任务的线程,池中的每一个线程都可以被反复使用。当所有的线程都在运行的时候,如果此时有一个任务被提交了,那么此任务会进入一个等待队列,一直到线程组中某一个线程的当前任务执行完毕,此时该线程才会执行我们刚刚提交的任务。线程池内部使用 LinkedBlockingQueue 来实现任务的添加和删除。我们使用 wait()notify() 方法让一个线程等待任务信号,从而把任务队列和线程池给结合起来。下面的例子中使用了一个 Runnable 对象的队列作为工作队列(你也可以使用 Thread 对象的队列),这是一个实现线程调度常见的方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package tutorials;

import java.util.concurrent.LinkedBlockingQueue;

public class ThreadPool {
private final int nThreads;
private final PoolWorker[] threads;
private final LinkedBlockingQueue queue;

public ThreadPool(int nThreads) {
this.nThreads = nThreads;
queue = new LinkedBlockingQueue();
threads = new PoolWorker[nThreads];

for (int i = 0; i < nThreads; i++) {
threads[i] = new PoolWorker();
threads[i].start();
}
}

public void execute(Runnable task) {
synchronized (queue) {
queue.add(task); // 向队列中添加任务
queue.notify(); // 唤醒一个线程
}
}

private class PoolWorker extends Thread {
public void run() {
Runnable task;

while (true) {
synchronized (queue) {
while (queue.isEmpty()) {
try {
queue.wait(); // 线程没有任务,进入睡眠
} catch (InterruptedException e) {
System.out.println("An error occurred while queue is waiting: " + e.getMessage());
}
}
// 线程被唤醒之后,会顺利执行到这里
task = queue.poll(); // 获取任务
}

// If we don't catch RuntimeException,
// the pool could leak threads
try {
task.run(); // 执行任务
} catch (RuntimeException e) {
System.out.println("Thread pool is interrupted due to an issue: " + e.getMessage());
}
}
}
}
}

为了控制线程对工作队列的访问,一定要给工作队列加上同步锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package tutorials;

public class Task implements Runnable {

private int num;

public Task(int n) {
num = n;
}

public void run() {
System.out.println("Task " + num + " is running.");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
import tutorials.Task;
import tutorials.ThreadPool;

public class Main {

public static void main(String[] args) {
ThreadPool pool = new ThreadPool(7);

for (int i = 0; i < 5; i++) {
Task task = new Task(i);
pool.execute(task);
}
}

在上面的例子中,我们使用 notify() 替代了 notifyAll() 方法。notify() 比 notifyAll() 有着更多的性能优势,例如一个对于服务器应用很重要的因素 —— notify() 需要更少的上下文切换。不过需要注意的是,在某些场景下使用 notify() 方法可能会存在一些微弱的风险,所以notify()方法只能使用在一些特定的场合下。

下面这张图展示了上面例子中线程池的设计理念:

2. 线程池的高效使用

线程池是一个构建高效的多线程应用的方法,但是它也是存在风险的。用线程池构建的应用和其它的多线程应用一样存在着以下风险:死锁、系统抖动、同步或并发错误,线程泄漏或请求超载。

下面是一些建议:

  • 不要把正在同步等待其它任务的任务入队列,否则将可能导致死锁
  • 如果线程在等待I/O操作这样的资源的时候,指定一个最大超时时间来让任务可以重新入队列执行。这样可以使得一个线程被释放,从而可以去执行其余的任务。
  • 因为过小或过大的线程池大小都可能产生问题,所以应该设置合适的线程池大小。线程池的大小取决于可用的处理器核数和工作队列中任务的性质(译注:I/O密集型任务或CPU密集型任务)。

3. 结论

线程池对于组织服务器应用来说是相当有用的,并且正确的构建线程池可以避免死锁和复杂的 wait()notify() 方法的使用问题。不过,相较于自己从头写一个线程池而言,我推荐使用 util.concurrent 包下的 Executor 类,例如 ThreadPoolExecutor。如果需要创建线程来执行时间极短的任务,那么你就可以考虑使用一个线程池了。

4. 下载源代码

点击这里下载线程池教程的源码。