目录
  1. 1. 如何终止一个Java线程
  2. 2.Thread.interrupt()源码分析
    1. 2.1 校验权限
    2. 2.2 触发中断回调接口
    3. 2.3 interrupt0()
    4. 2.4 唤醒ParkEvent
    5. 2.5 唤醒Parker
  3. 3. 如何处理InterruptedException
  4. 4. 总结
  5. 5. 参考资料

1. 如何终止一个Java线程

  Java线程的终止操作最初是直接暴露给用户的,java.lang.Thread类提供了stop()方法,允许用户暴力的终止一个线程并退出临界区(释放所有锁,并在当前调用栈抛出ThreadDeath Exception)。同样的,Thread.suspend()和Thread.resume()方法允许用户灵活的暂停和恢复线程。
  然而这些看似简便的API在JDK1.2就被deprecate掉了,原因是stop()方法本质上是不安全的,它会强制释放掉线程持有的锁,这样临界区的数据中间状态就会遗留出来,从而造成不可预知的后果。
  当然Java线程不可能没有办法终止,在Java程序中,唯一的也是最好的办法就是让线程从run()方法返回。更具体来说,有以下几种情况:

2.Thread.interrupt()源码分析

  Thread.interrupt()方法设计的目的,是提示一个线程应该终止,但不强制该线程终止。程序员可以来决定如何响应这个终止提示。直接上源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//Class java.lang.Thread
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();

synchronized (blockerLock) {
Interruptible b = blocker; //中断触发器
if (b != null) {
interrupt0();
b.interrupt(this); //触发回调接口
return;
}
}
interrupt0();
}

2.1 校验权限

  如果不是当前线程自我中断,会先做一次权限检查。如果被中断的线程属于系统线程组(即JVM线程),checkAccess()方法会使用系统的System.getSecurityManager()来判断权限。由于Java默认没有开启安全策略,此方法其实会跳过安全检查。

2.2 触发中断回调接口

  如果线程的中断触发器blocker不为null,则触发中断触发回调接口Interruptible。那么这个触发器blocker是什么时候被设置的呢?
  上文提到,如果一个nio通道实现了InterruptibleChannel接口,就可以响应interrupt()中断,其原理就在InterruptibleChannel接口的抽象实现类AbstractInterruptibleChannel的方法begin()中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//Class java.nio.channels.spi.AbstractInterruptibleChannel
protected final void begin() {
if (interruptor == null) {
interruptor = new Interruptible() {
public void interrupt(Thread target) {
synchronized (closeLock) {
if (!open)
return;
open = false;
interrupted = target;
try {//关闭io通道
AbstractInterruptibleChannel.this.implCloseChannel();
} catch (IOException x) { }
}
}};
}
blockedOn(interruptor);//将interruptor设置为当前线程的blocker
Thread me = Thread.currentThread();
if (me.isInterrupted())
interruptor.interrupt(me);
}

protected final void end(boolean completed) throws AsynchronousCloseException {
blockedOn(null);
Thread interrupted = this.interrupted;
if (interrupted != null && interrupted == Thread.currentThread()) {
interrupted = null;
throw new ClosedByInterruptException();
}
if (!completed && !open)
throw new AsynchronousCloseException();
}
1
2
3
4
5
6
7
8
9
10
11
//Class java.nio.channels.Channels.ReadableByteChannelImpl
public int read(ByteBuffer dst) throws IOException {
......
try {
begin();
bytesRead = in.read(buf, 0, bytesToRead);
finally {
end(bytesRead > 0);
}
......
}

  以上述代码为例,nio通道ReadableByteChannel每次执行阻塞方法read()前,都会执行begin(),把Interruptible回调接口注册到当前线程上,以实现能够响应其他线程的中断。当线程收到中断时,Thread.interrupt()触发回调接口,在回调接口Interruptible中关闭io通道并返回,最后在finally块中执行end(),end()方法会检查中断标记,抛出ClosedByInterruptException。

2.3 interrupt0()

  无论是否设置了中断触发回调blocker,都会执行这个关键的native方法interrupt0():

1
private native void interrupt0();

  以openJDK7的Hotspot虚拟机为例,先找到native方法映射

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//jdk\src\share\native\java\lang\Thread.c
static JNINativeMethod methods[] = {
{"start0", "()V", (void *)&JVM_StartThread},
{"stop0", "(" OBJ ")V", (void *)&JVM_StopThread},
{"isAlive", "()Z", (void *)&JVM_IsThreadAlive},
{"suspend0", "()V", (void *)&JVM_SuspendThread},
{"resume0", "()V", (void *)&JVM_ResumeThread},
{"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority},
{"yield", "()V", (void *)&JVM_Yield},
{"sleep", "(J)V", (void *)&JVM_Sleep},
{"currentThread", "()" THD, (void *)&JVM_CurrentThread},
{"countStackFrames", "()I", (void *)&JVM_CountStackFrames},
{"interrupt0", "()V", (void *)&JVM_Interrupt},
{"isInterrupted", "(Z)Z", (void *)&JVM_IsInterrupted},
{"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock},
{"getThreads", "()[" THD, (void *)&JVM_GetAllThreads},
{"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
};

  可以找到interrupt0对应JVM_Interrupt这个函数,继续找到实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
//hotspot\src\share\vm\prims\jvm.cpp
JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_Interrupt");
// Ensure that the C++ Thread and OSThread structures aren't freed before we operate
oop java_thread = JNIHandles::resolve_non_null(jthread);
MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
// We need to re-resolve the java_thread, since a GC might have happened during the
// acquire of the lock
JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
if (thr != NULL) {
Thread::interrupt(thr);
}
JVM_END

  可以看到这是一个JNI方法,JVM_ENTRY是JNI调用的宏。关键函数Thread::interrupt(thr),继续跟踪:

1
2
3
4
5
6
//hotspot\src\share\vm\runtime\thread.cpp
void Thread::interrupt(Thread* thread) {
trace("interrupt", thread);
debug_only(check_for_dangling_thread_pointer(thread);)
os::interrupt(thread);
}

  关键函数os::interrupt,os此时分为linux、solaris和windows,以linux为例继续跟踪:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//hotspot\src\os\linux\vm\os_linux.cpp
void os::interrupt(Thread* thread) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
if (!osthread->interrupted()) {
osthread->set_interrupted(true);
// More than one thread can get here with the same value of osthread,
// resulting in multiple notifications. We do, however, want the store
// to interrupted() to be visible to other threads before we execute unpark().
OrderAccess::fence();
ParkEvent * const slp = thread->_SleepEvent ;
if (slp != NULL) slp->unpark() ;
}
// For JSR166. Unpark even if interrupt status already was set
if (thread->is_Java_thread())
((JavaThread*)thread)->parker()->unpark();
ParkEvent * ev = thread->_ParkEvent ;
if (ev != NULL) ev->unpark() ;
}

  每一个Java线程都与一个osthread一一对应,如果相应的os线程没有被中断,则会设置osthread的interrupt标志位为true(对应一个volatile int),并唤醒线程的SleepEvent。随后唤醒线程的parker和ParkEvent。
  简而言之,interrupt操作会对三种事件进行unpark唤醒,分别是thread->_SleepEvent、thread->parker()和thread->_ParkEvent,这些变量的具体声明如下:

1
2
3
4
5
6
7
8
9
10
11
//hotspot\src\share\vm\runtime\thread.cpp

public:
ParkEvent * _ParkEvent ; // for synchronized()
ParkEvent * _SleepEvent ; // for Thread.sleep

// JSR166 per-thread parker
private:
Parker* _parker;
public:
Parker* parker() { return _parker; }

2.4 唤醒ParkEvent

  Thread类中包含了两种作用不同的ParkEvent,**_ParkEvent变量用于synchronized同步块和Object.wait()_SleepEvent变量用于Thread.sleep()**,ParkEvent类的声明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class ParkEvent : public os::PlatformEvent {
... //略,直接看父类
}
//hotspot\src\os\linux\vm\os_linux.cpp
class PlatformEvent : public CHeapObj {
private:
pthread_mutex_t _mutex [1] ;
pthread_cond_t _cond [1] ;
...

public:
PlatformEvent() {
int status;
status = pthread_cond_init (_cond, NULL);
assert_status(status == 0, status, "cond_init");
status = pthread_mutex_init (_mutex, NULL);
assert_status(status == 0, status, "mutex_init");
_Event = 0 ;
_nParked = 0 ;
_Assoc = NULL ;
}
void park () ;
void unpark () ;
int park (jlong millis) ;
...
} ;

  ParkEvent包含了一把mutex互斥锁和一个cond条件变量,并在构造函数中进行了初始化,线程的阻塞和唤醒(park和unpark)就是通过他们实现的:

  上述Thread类的两个ParkEvent成员变量:_ParkEvent和_SleepEvent,都会在Thread.interrupt()时触发unpark()动作。
  对于_ParkEvent来说,它可以代表一个synchronized等待进入同步块的时事件,也可以代表一个Object.wait()等待条件变量的事件。不同的是,如果是synchronized等待事件,被唤醒后会尝试获取锁,如果失败则会通过循环继续park()等待,因此synchronized等待实际上是不会被interrupt()中断的;如果是Object.wait()事件,则会通过标记为判断出是否是被notify()唤醒的,如果不是则抛出InterruptedException实现中断。
  对于_SleepEvent相对简单一些,它只代表线程sleep动作,可能是java.lang.Thread.sleep(),也可能是jvm内部调用的线程os::sleep()。如果是java.lang.Thread.sleep(),则会通过线程的is_interrupted标记位来判断抛出InterruptedException。

2.5 唤醒Parker

  除了唤醒SleepEvent和ParkEvent,Thread.interrupt()还会调用thread->parker()->unpark()来唤醒Thread的parker变量。Parker类与上面的ParkEvent类很相似,都持有一把mutex互斥锁和一个cond条件变量。具体代码见:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Parker : public os::PlatformParker {
public:
// For simplicity of interface with Java, all forms of park (indefinite,
// relative, and absolute) are multiplexed into one call.
void park(bool isAbsolute, jlong time);
void unpark();
... //略,直接看父类
}

//hotspot\src\os\linux\vm\os_linux.hpp
class PlatformParker : public CHeapObj {
protected:
pthread_mutex_t _mutex [1] ;
pthread_cond_t _cond [1] ;

public:
PlatformParker() {
int status;
status = pthread_cond_init (_cond, NULL);
assert_status(status == 0, status, "cond_init");
status = pthread_mutex_init (_mutex, NULL);
assert_status(status == 0, status, "mutex_init");
}
}

  与ParkEvent一样,Parker使用着自己的锁和park()/unpark()方法。

  如源码注释,Thread的_parker变量更具有通用性。凡是在Java代码里通过unsafe.park()/unpark()的调用都会对应到Thread的_parker变量去执行。而unsafe.park()/unpark()由java.util.concurrent.locks.LockSupport类调用,它支持了java.util.concurrent的各种锁、条件变量等线程同步操作。例如:ReentrantLock, CountDownLatch, ReentrantReadWriteLock, Semaphore, ThreadPoolExecutor, ConditionObject, ArrayBlockingQueue等。
  线程被Thread.interrupt()中断时,并不意味着上述类的等待方法都会返回并抛出InterruptedException。尽管上述类最终等待在的unsafe.unpark()方法都会被唤醒,其是否继续执行park()等待仍取决于具体实现。例如,Lock.lock()方法不会响应中断,Lock.lockInterruptibly()方法则会响应中断并抛出异常,二者实现的区别就在于park()等待被唤醒时是否继续执行park()来等待锁。

3. 如何处理InterruptedException

4. 总结

  终止一个Java线程最好的方式,就是让run()方法主动退出。因为强制的让一个线程被动的退出是很不安全的,内部的数据不一致会对程序造成不可预知的后果。
  为了能够通知一个线程需要被终止,Java提供了Thread.interrupt()方法,该方法会设置线程中断的标记位,并唤醒可中断的阻塞方法,包括Thread.sleep(),Object.wait(),nio通道的IO等待,以及LockSupport.park()。识别一个方法是否会被中断,只需要看其声明中是否会throws InterruptedException或ClosedByInterruptException。
  每个Java线程都会对应一个osthread,它持有了三种条件变量,分别用于Thread.sleep(),Object.wait()和unsafe.park()。Thread.interrupt()会依次唤醒三个条件变量,以达到中断的目的。线程的同步与唤醒最终都使用了pthread_cond_wait和pthread_cond_signal这些pthread库函数。

5. 参考资料

Why Are Thread.stop, Thread.suspend, Thread.resume and Runtime.runFinalizersOnExit Deprecated?

Interrupts (The Java™ Tutorials >Essential Classes > Concurrency)

runtime overview of HotSpot - OpenJDK

OpenJDK 7 Source Releases