1. 如何终止一个Java线程
Java线程的终止操作最初是直接暴露给用户的,java.lang.Thread类提供了stop()方法,允许用户暴力的终止一个线程并退出临界区(释放所有锁,并在当前调用栈抛出ThreadDeath Exception)。同样的,Thread.suspend()和Thread.resume()方法允许用户灵活的暂停和恢复线程。
然而这些看似简便的API在JDK1.2就被deprecate掉了,原因是stop()方法本质上是不安全的,它会强制释放掉线程持有的锁,这样临界区的数据中间状态就会遗留出来,从而造成不可预知的后果。
当然Java线程不可能没有办法终止,在Java程序中,唯一的也是最好的办法就是让线程从run()方法返回。更具体来说,有以下几种情况:
对于runnable的线程,利用一个变量做标记位,定期检查
1
2
3
4
5
6
7
8
9
10
11private volatile boolean flag = true;
public void stop() {
flag = false;
}
public void run() {
while (flag) {
//do something...
}
}对于非runnable的线程,应该采取中断的方式退出阻塞,并处理捕获的中断异常
对于大部分阻塞线程的方法,使用Thread.interrupt(),可以立刻退出等待,抛出InterruptedException
这些方法包括Object.wait(), Thread.join(),Thread.sleep(),以及各种AQS衍生类:Lock.lockInterruptibly()等任何显示声明throws InterruptedException的方法。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17private volatile Thread thread;
public void stop() {
thread.interrupt();
}
public void run() {
thread = Thread.currentThread();
while (flag) {
try {
Thread.sleep(interval);
} catch (InterruptedException e){
//current thread was interrupted
return;
}
}
}被阻塞的nio Channel也会响应interrupt(),抛出ClosedByInterruptException,相应nio通道需要实现java.nio.channels.InterruptibleChannel接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25//This code does not work when run from within Eclipse
private volatile Thread thread;
public void stop() {
thread.interrupt();
}
public void run() {
thread = Thread.currentThread();
BufferedReader in = new BufferedReader(new InputStreamReader(Channels.newInputStream(
new FileInputStream(FileDescriptor.in).getChannel())));
while (flag) {
try {
String line = null;
while ((line = in.readLine()) != null) {
System.out.println("Read line:'"+line+"'");
}
} catch (ClosedByInterruptException e) {
//current channel was interrupted
return;
} catch (IOException e) {
e.printStackTrace();
}
}
}注:如果使用的是传统IO(非Channel,如ServerSocket.accept),所在线程被interrupt时不会抛出ClosedByInterruptException。但可以使用流的close方法实现退出阻塞。
还有一些阻塞方法不会响应interrupt,如等待进入synchronized段、Lock.lock()。他们不能被动的退出阻塞状态。
2.Thread.interrupt()源码分析
Thread.interrupt()方法设计的目的,是提示一个线程应该终止,但不强制该线程终止。程序员可以来决定如何响应这个终止提示。直接上源码:
1 | //Class java.lang.Thread |
2.1 校验权限
如果不是当前线程自我中断,会先做一次权限检查。如果被中断的线程属于系统线程组(即JVM线程),checkAccess()方法会使用系统的System.getSecurityManager()来判断权限。由于Java默认没有开启安全策略,此方法其实会跳过安全检查。
2.2 触发中断回调接口
如果线程的中断触发器blocker不为null,则触发中断触发回调接口Interruptible。那么这个触发器blocker是什么时候被设置的呢?
上文提到,如果一个nio通道实现了InterruptibleChannel接口,就可以响应interrupt()中断,其原理就在InterruptibleChannel接口的抽象实现类AbstractInterruptibleChannel的方法begin()中:
1 | //Class java.nio.channels.spi.AbstractInterruptibleChannel |
1 | //Class java.nio.channels.Channels.ReadableByteChannelImpl |
以上述代码为例,nio通道ReadableByteChannel每次执行阻塞方法read()前,都会执行begin(),把Interruptible回调接口注册到当前线程上,以实现能够响应其他线程的中断。当线程收到中断时,Thread.interrupt()触发回调接口,在回调接口Interruptible中关闭io通道并返回,最后在finally块中执行end(),end()方法会检查中断标记,抛出ClosedByInterruptException。
2.3 interrupt0()
无论是否设置了中断触发回调blocker,都会执行这个关键的native方法interrupt0():
1 | private native void interrupt0(); |
以openJDK7的Hotspot虚拟机为例,先找到native方法映射
1 | //jdk\src\share\native\java\lang\Thread.c |
可以找到interrupt0对应JVM_Interrupt这个函数,继续找到实现代码
1 | //hotspot\src\share\vm\prims\jvm.cpp |
可以看到这是一个JNI方法,JVM_ENTRY是JNI调用的宏。关键函数Thread::interrupt(thr),继续跟踪:
1 | //hotspot\src\share\vm\runtime\thread.cpp |
关键函数os::interrupt,os此时分为linux、solaris和windows,以linux为例继续跟踪:
1 | //hotspot\src\os\linux\vm\os_linux.cpp |
每一个Java线程都与一个osthread一一对应,如果相应的os线程没有被中断,则会设置osthread的interrupt标志位为true(对应一个volatile int),并唤醒线程的SleepEvent。随后唤醒线程的parker和ParkEvent。
简而言之,interrupt操作会对三种事件进行unpark唤醒,分别是thread->_SleepEvent、thread->parker()和thread->_ParkEvent,这些变量的具体声明如下:
1 | //hotspot\src\share\vm\runtime\thread.cpp |
2.4 唤醒ParkEvent
Thread类中包含了两种作用不同的ParkEvent,**_ParkEvent变量用于synchronized同步块和Object.wait(),_SleepEvent变量用于Thread.sleep()**,ParkEvent类的声明如下:
1 | class ParkEvent : public os::PlatformEvent { |
ParkEvent包含了一把mutex互斥锁和一个cond条件变量,并在构造函数中进行了初始化,线程的阻塞和唤醒(park和unpark)就是通过他们实现的:
- PlatformEvent::park() 方法会调用库函数pthread_cond_wait(_cond, _mutex)实现线程等待
- synchronized块的进入和Object.wait()的线程等待都是通过PlatformEvent::park()方法实现
- 注:Thread.join()是使用的Object.wait()实现的
- PlatformEvent::park(jlong millis) 方法会调用库函数pthread_cond_timedwait(_cond, _mutex, _abstime)实现计时条件等待
- Thread.sleep(millis)就是通过PlatformEvent::park(jlong millis)实现
- PlatformEvent::unpark() 方法会调用库函数pthread_cond_signal (_cond)唤醒上述等待的条件变量
- Thread.interrupt()就会触发其子类SleepEvent和ParkEvent的unpark()方法
- synchronized块的退出也会触发unpark()。其所在对象ObjectMonitor维护了ParkEvent数组作为唤醒队列,synchronized同步块退出时,会触发ParkEvent::unpark()方法来唤醒等待进入同步块的线程,或等待在Object.wait()的线程。
上述Thread类的两个ParkEvent成员变量:_ParkEvent和_SleepEvent,都会在Thread.interrupt()时触发unpark()动作。
对于_ParkEvent来说,它可以代表一个synchronized等待进入同步块的时事件,也可以代表一个Object.wait()等待条件变量的事件。不同的是,如果是synchronized等待事件,被唤醒后会尝试获取锁,如果失败则会通过循环继续park()等待,因此synchronized等待实际上是不会被interrupt()中断的;如果是Object.wait()事件,则会通过标记为判断出是否是被notify()唤醒的,如果不是则抛出InterruptedException实现中断。
对于_SleepEvent相对简单一些,它只代表线程sleep动作,可能是java.lang.Thread.sleep(),也可能是jvm内部调用的线程os::sleep()。如果是java.lang.Thread.sleep(),则会通过线程的is_interrupted标记位来判断抛出InterruptedException。
2.5 唤醒Parker
除了唤醒SleepEvent和ParkEvent,Thread.interrupt()还会调用thread->parker()->unpark()来唤醒Thread的parker变量。Parker类与上面的ParkEvent类很相似,都持有一把mutex互斥锁和一个cond条件变量。具体代码见:
1 | class Parker : public os::PlatformParker { |
与ParkEvent一样,Parker使用着自己的锁和park()/unpark()方法。
- Parker::park(bool isAbsolute, jlong time) 方法会调用库函数pthread_cond_timedwait(_cond, _mutex, _abstime)实现计时条件等待,如果time=0则会直接使用pthread_cond_wait(_cond, _mutex)实现线程等待
- Parker::unpark() 方法会调用库函数pthread_cond_signal (_cond)唤醒上述等待的条件变量
如源码注释,Thread的_parker变量更具有通用性。凡是在Java代码里通过unsafe.park()/unpark()的调用都会对应到Thread的_parker变量去执行。而unsafe.park()/unpark()由java.util.concurrent.locks.LockSupport类调用,它支持了java.util.concurrent的各种锁、条件变量等线程同步操作。例如:ReentrantLock, CountDownLatch, ReentrantReadWriteLock, Semaphore, ThreadPoolExecutor, ConditionObject, ArrayBlockingQueue等。
线程被Thread.interrupt()中断时,并不意味着上述类的等待方法都会返回并抛出InterruptedException。尽管上述类最终等待在的unsafe.unpark()方法都会被唤醒,其是否继续执行park()等待仍取决于具体实现。例如,Lock.lock()方法不会响应中断,Lock.lockInterruptibly()方法则会响应中断并抛出异常,二者实现的区别就在于park()等待被唤醒时是否继续执行park()来等待锁。
3. 如何处理InterruptedException
- 方式1 如果自己很清楚当前线程被中断后的处理方式,则按自己的方式处理
- 通常是做好善后工作,主动退出线程
- 方式2 直接在方法声明中throws InterruptedException,丢给上层处理
- 这种方式也很常见,将中断的处置权交给具体的业务来处理
- 方式3 重新设置中断标记位,Thread.currentThread().interrupt(),交给后续方法处理
- 原因是底层抛出InterruptedException时会清除中断标记位,捕获到异常后如果不想处理,可以重新设置中断标记位:
1
2
3
4
5
6try {
...
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
- 原因是底层抛出InterruptedException时会清除中断标记位,捕获到异常后如果不想处理,可以重新设置中断标记位:
- 注意 请不要吞掉InterruptedException,可能会导致上层的调用方出现不可预料的结果
4. 总结
终止一个Java线程最好的方式,就是让run()方法主动退出。因为强制的让一个线程被动的退出是很不安全的,内部的数据不一致会对程序造成不可预知的后果。
为了能够通知一个线程需要被终止,Java提供了Thread.interrupt()方法,该方法会设置线程中断的标记位,并唤醒可中断的阻塞方法,包括Thread.sleep(),Object.wait(),nio通道的IO等待,以及LockSupport.park()。识别一个方法是否会被中断,只需要看其声明中是否会throws InterruptedException或ClosedByInterruptException。
每个Java线程都会对应一个osthread,它持有了三种条件变量,分别用于Thread.sleep(),Object.wait()和unsafe.park()。Thread.interrupt()会依次唤醒三个条件变量,以达到中断的目的。线程的同步与唤醒最终都使用了pthread_cond_wait和pthread_cond_signal这些pthread库函数。
5. 参考资料
Why Are Thread.stop, Thread.suspend, Thread.resume and Runtime.runFinalizersOnExit Deprecated?
Interrupts (The Java™ Tutorials >Essential Classes > Concurrency)