两阶段终止
让线程停下来
在《Java深度探索:开发基础》的《第3章 多线程》
中讲过,多线程的stop()
已经被废弃,如果想要中断线程的执行,那么需要执行interrupt()
方法然后捕获InterruptedException
异常来实现。
但interrupt()
方法的本质其实是将处于WAITING
状态的线程唤醒,然后利用异常中断正在执行的线程,这算是取了个巧,用旁门左道
的方式来终止它。
比较正常的想法都是通过标志位(flag)来停止线程的执行。
例如,假设有一个后台线程会定期将缓存中的数据同步到数据库中,就像这样。

但现在需要暂停同步,因为缓存系统可能要停机维护了。容易想到的办法就是给线程1增加一个flag
变量,如果这个变量的值为true
,那么就正常执行,否则停止执行。
那又是谁来控制或者改变这个flag
变量的值呢?当然是另一个线程,也就是线程2。

因为flag
必须对两个线程都可见,所以需要用volatile
来修饰它。
现在可以控制线程的运行和停止了,但还有一个问题:如果强行停止线程1的运行,有可能它来不及做好善后工作而造成数据丢失。

在上面的图中,不管标记1
或2
的地方哪一个没做好,都有可能造成数据的丢失。
所以,既需要能够控制线程的启停,又需要让它优雅地处理好当前任务,那么可以这样做。
第一阶段:发出终止信号,告知线程即将被终止。
第二阶段:线程收到信号,完成善后工作后停止运行。
上面两步归纳起来就是两阶段终止(Two-phase Termination)
模式。
如果线程被阻塞而无法通过flag
来控制它,也只能通过interrupt()
方法来终止运行了。
/**
* 强制终止被阻塞的线程
*
*/
public class ThreadExecutor {
/**
* 执行线程
*
*/
private Thread mainThread;
/**
* 运行状态
*
*/
private volatile boolean isRunning = false;
/**
* 发生阻塞的线程任务
*
*/
public void execute(Runnable task) {
mainThread = new Thread(() -> {
Thread child = new Thread(task);
// 子线程设置为守护线程
child.setDaemon(true);
child.start();
try {
// 执行子线程,让主线程进入阻塞状态
child.join();
isRunning = true;
} catch (InterruptedException e) {
// 捕获interrupt()异常
//e.printStackTrace();
}
});
mainThread.start();
}
/**
* 强制结束线程
*
*/
public void abortInTime(long time) {
long currentTime = System.currentTimeMillis();
while (!isRunning) {
if ((System.currentTimeMillis() - currentTime) >= time) {
System.out.println("任务超时,需要结束他!");
mainThread.interrupt();
break;
}
}
isRunning = false;
}
public static void main(String[] args) {
ThreadExecutor executor = new ThreadExecutor();
long start = System.currentTimeMillis();
executor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
//e.printStackTrace();
}
});
executor.abortInTime(1000);
long end = System.currentTimeMillis();
System.out.println(end - start);
}
}
两阶段终止
先看看比较简单的两阶段终止
代码。
/**
* 两阶段终止
*
*/
public class TerminationThread {
// 标志位
private volatile boolean flag;
// 监控线程
private Thread monitorThread;
// 启动监控线程
public void start() {
monitorThread = new Thread(() -> {
while (true) {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("执行操作");
} catch (InterruptedException e) {
//e.printStackTrace();
}
if (flag) {
System.out.println("善后处理");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("关闭线程");
try {
// 抛出中断异常
throw new InterruptedException();
} catch (InterruptedException e) {
//throw new RuntimeException(e);
// 跳出while循环
break;
}
}
}
});
monitorThread.start();
}
/**
* 停止线程运行
*
*/
public void stop() {
flag = true;
}
public static void main(String[] args) {
TerminationThread thread = new TerminationThread();
thread.start();
try {
TimeUnit.MILLISECONDS.sleep(3500);
} catch (InterruptedException e) {
//throw new RuntimeException(e);
}
// 向线程发出通知
thread.stop();
}
}
比较复杂一点的是当有多个线程共用flag
标志位时,就需要将维护这个标志位的代码单独剥离出来。
/**
* 线程停止标志
*
*/
public class TerminationThreadFlag {
public static TerminationThreadFlag instance;
/**
* 未执行的任务的数量
*
*/
public final AtomicInteger reservations = new AtomicInteger(0);
/**
* 通过volatile来修饰,当数据修改后其他线程也可以读取到
*
*/
private volatile boolean running = false;
/**
* 当多个线程共享一个TerminationThreadFlag实例时,通过队列的方式来记录,减少锁的使用
*
*/
private final Queue<WeakReference<Termination>> coordinatedThreads;
/**
* 单例模式
*
*/
public static TerminationThreadFlag getInstance() {
if (instance != null) {
return instance;
}
instance = new TerminationThreadFlag();
return instance;
}
private TerminationThreadFlag() {
this.coordinatedThreads = new ConcurrentLinkedQueue<>();
}
/**
* 是否终止
*
*/
public boolean isRunning() {
return running;
}
public void shutdown(boolean running) {
this.running = running;
}
/**
* 注册线程到TerminationThreadFlag上
*
*/
public void register(Termination thread) {
coordinatedThreads.add(new WeakReference<>(thread));
}
/**
* 通知TerminationThreadFlag中所有实例,有一个线程停止了,通知其他线程也停止
*
*/
public void notifyThreadTermination(Termination thread) {
WeakReference<Termination> reference;
Termination otherThread;
while ((reference = coordinatedThreads.poll()) != null) {
otherThread = reference.get();
if (otherThread != null && otherThread != thread) {
otherThread.terminate();
}
}
}
}
剥离出了维护flag
标志位的代码后,就可以重新整理实现两阶段终止的抽象类了。
/**
* 中断线程的抽象类
*
*/
public abstract class AbstractTerminationThread extends Thread implements Termination {
/**
* 线程共享停止的标志实例对象
*
*/
public final TerminationThreadFlag terminationThreadFlag;
public AbstractTerminationThread() {
this(TerminationThreadFlag.getInstance());
}
public AbstractTerminationThread(TerminationThreadFlag terminationThreadFlag) {
this.terminationThreadFlag = terminationThreadFlag;
System.out.println("注册线程到标志位队列");
terminationThreadFlag.register(this);
}
/**
* 发出线程终止通知
*
*/
@Override
public void terminate() {
try {
// 设置标志实例对象为true
System.out.println("设置中断标志对象为中断");
this.terminationThreadFlag.shutdown(true);
// TODO 发出中断通知
doTerminate();
} finally {
// 如果没有等待的任务,则强制去停止线程
if (terminationThreadFlag.reservations.get() <= 0) {
super.interrupt();
}
}
}
/**
* 线程善后清理
*
*/
@Override
public void run() {
Exception ex = null;
try {
// 自旋
for (;;) {
// 判断中断实例的标识是否为true,以及是否有未完成的任务
System.out.println("线程执行,此时的标志位状态:" + terminationThreadFlag.isRunning() + ", 剩余的任务数量:" + terminationThreadFlag.reservations.get());
if (terminationThreadFlag.isRunning() && terminationThreadFlag.reservations.get() <= 0) {
// 线程已经终止了
System.out.println("线程退出");
break;
}
// TODO 执行具体的业务逻辑
doRun();
}
} catch (Exception e) {
// 中断线程可能给调用interrupt被中断
ex = e;
if (e instanceof InterruptedException) {
// 中断线程响应退出
System.out.println("中断响应:" + e);
}
} finally {
try {
System.out.println("线程停止,执行善后清理工作");
// TODO 善后清理
doCleanup(ex);
} finally {
// 通知terminationThreadFlag管理的所有线程实例退出
System.out.println("标志实例对象中的线程终止,通知其他线程终止");
terminationThreadFlag.notifyThreadTermination(this);
}
}
}
/**
* 通知线程终止的抽象方法
*
*/
protected abstract void doTerminate();
/**
* 执行线程善后的抽象方法
*
*/
protected abstract void doCleanup(Exception ex);
/**
* 执行业务逻辑的抽象方法
*
*/
protected abstract void doRun() throws InterruptedException;
}
它的三个抽象方法doTerminate()
、doCleanup()
和doRun()
都是可以根据需要加入自己的业务逻辑的。
感谢支持
更多内容,请移步《超级个体》。