保护性挂起
解决死锁问题
第三方支付已经彻底融入了人们的生产和生活中了,它一定会出现下面这样的场景。

从上面的图中可以看到,当两个及两个以上账户之间同时转账时,就会出现余额
被锁住而互相等待的情况,这和多线程中的死锁
是一个道理。
死锁
的出现,一定是因为满足了下面的四个条件之一或全部。
互斥
:某个资源R只能被一个线程占用。占有且等待
:线程A已经获取了资源R,并且想继续获取资源S,但它又不愿意释放R。不可抢占
:其他的线程不能强行去抢占线程A已经获得的资源R,只能等待。循环等待
:线程A等待线程B释放资源S,而线程B也等着线程A释放资源R。
互斥
是条件,而其他三个都是原因。
对于
占有且等待
:可以一次性申请线程A需要的所有资源,也就是把这些资源和申请资源的操作封装成原子操作,要么都执行,要么都不执行,破坏占有且等待
的成因,解决死锁问题。对于
不可抢占
:可以让线程A在申请其他资源而不得时,在指定时间内主动放弃对已有资源的占有,破坏不可抢占
的成因,解决死锁问题。对于
循环等待
:可以通过给资源编号来按序申请,让所有的线程都以相同的顺序申请资源,这样就可以破坏循环等待
的成因,解决死锁问题。
Guarded Suspension
在多线程开发中,常常为了提高应用程序的并发性,会将一个任务分解为多个子任务交给多个线程并行执行,例如,juc
中的ForkJoinPool
线程池。
当多个线程之间相互协作时,会存在某个线程等待其他的线程完成后继续下一步操作的情况,此时可以通过Guarded Suspension(保护性挂起)
模式解决上述问题。
在Guarded Suspension
模式下,如果线程A拿不到资源R就阻塞自己,进入线程生命周期中的WAITING
(而不是TIMED_WAITING
)状态。
而当线程A所有的条件都满足后,就会通知它进入RUNNABLE
状态(这些在《Java深度探索:开发基础》的《第3章 多线程》
中均有讲解)。
一个比较好的用于理解Guarded Suspension
模式的例子就是医院的挂号 —— 就诊 —— 缴费 —— 诊断
流程。

患者到了医院后,先挂号,得到结果后等待叫号。
当叫到自己的号时,患者就可以找门诊医生就诊了。
门诊医生检查完成后,一般会先开诊疗缴费单(例如,
CT
拍片),让患者去交费,同时叫下一位患者。当患者交完费后,拿缴费单去
CT
室拍片,然后将结果按排队顺序放到医生办公桌上。当医生拿到患者的
CT
结果时,呼叫患者进入诊疗室沟通。
这就是典型的等待-通知
机制,所有排队系统
都具备这个机制,例如,就餐叫号、银行排队叫号、政务大厅叫号等。
这样既保证了效率,也兼顾了公平——这其实就是ArrayBlockingQueue
的工作方式。
下面是实现Guarded Suspension
模式的代码。
/**
* Guarded Suspension模式
*
*/
public class GuardedSuspension {
private final LinkedBlockingQueue<Integer> sourceList;
public GuardedSuspension() {
this.sourceList = new LinkedBlockingQueue<>();
}
/**
* 获取数据
*
*/
public synchronized Integer get() {
while (sourceList.isEmpty()) {
try {
// 如果队列为null,等待
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return sourceList.peek();
}
/**
* 放入数据
*
*/
public synchronized void put(Integer e) {
sourceList.add(e);
// 通知所有线程继续执行
notifyAll();
}
public static void main(String[] args) throws InterruptedException {
GuardedSuspension guarded = new GuardedSuspension();
// 1个线程的线程池,等于是一个医生一次只能看一个病人
ExecutorService executorService = Executors.newFixedThreadPool(1);
// 往外拿,但什么数据都拿不到,因为这时里面还没有数据,所以就会执行 wait() 方法
// 这里等于是医生叫号
executorService.execute(() -> guarded.get());
// 休眠2秒
Thread.sleep(2000);
// 往里放20个元素,等于是挂号处放号
executorService.execute(() -> guarded.put(20));
// 关闭线程池
executorService.shutdown();
}
}
安防系统
安防系统产品一般包括硬件终端
和系统后台
两部分,硬件终端
包括监控摄像头、燃气探测器、温湿度感应器、烟雾报警器、人体探测器等。
这些硬件终端
一旦检测到超过阈值的异常,就会立即向系统后台
上报异常信息,然后再由系统后台
通知对应的人员来处理。
所以,下面就来通过代码模拟这个过程。
代码分为
探测器
和服务器
。探测器
通过心跳
机制来确认和服务器
的连接是否正常。当
探测器
和服务器
失去连接后,需要立即重新建立连接。在连接正常时,
探测器
监测到异常时,要给服务器
发送报警信息。
首先需要明确报警信息都有哪些内容。
/**
* 报警信息
*
*/
public class AlarmInfo {
/**
* 楼号-单元-门牌
*
*/
private String address;
/**
* 报警类型
*
*/
private Integer type;
public AlarmInfo(String address, Integer type) {
this.address = address;
this.type = type;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public Integer getType() {
return type;
}
public void setType(Integer type) {
this.type = type;
}
@Override
public String toString() {
return "{" +
"address='" + address + '\'' +
", type=" + type +
'}';
}
}
然后通过Guarded Suspension
模式来实现安防系统的报警,它需要能够实现:当还没有连接的时候会一直等待直到连接成功,连接成功后则立即发送异常信息。

/**
* 探测器需要完成下面的几项工作
* 1.初始化报警服务
* 2.建立连接
* 3.维持心跳
* 4.发送报警信息
*
*/
public class Detector {
/**
* 是否连接上了服务器
*
*/
private volatile boolean connected = false;
/**
* 保护条件
*
*/
Predicate agentConnected = new Predicate() {
@Override
public boolean evaluate() {
// 连接是否建立完成
return connected;
}
};
/**
* blocker对象
*/
private Blocker blocker = new ConditionVarBlocker(false);
// 心跳线程池
private ScheduledThreadPoolExecutor heartbeatExecutor = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread();
// 当主线程退出的时候跟着退出
thread.setDaemon(true);
return thread;
}
});
/**
* 上报异常信息
*
*/
private void sendAlarm(AlarmInfo alarmInfo) throws Exception {
// 构建guardedAction
GuardedAction<Void> guardedAction = new GuardedAction<Void>(agentConnected) {
@Override
public Void call() throws Exception {
// 执行目标函数
doSendAlarm(alarmInfo);
return null;
}
};
// 通过blocker执行目标
blocker.callWithGuard(guardedAction);
}
/**
* 发送异常信息
*
*/
private void doSendAlarm(AlarmInfo alarmInfo) {
// 建立socket连接发送数据给报警信息
System.out.println("开始发送异常信息:" + alarmInfo);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 模拟上报50ms
System.out.println("发送结束");
}
/**
* 初始化报警服务
* 1.连接报警服务器的线程去进行连接
* 2.定时调度线程每隔5s检查一次连接
*
*/
public void init() {
// 与服务器连接
Thread connectingThread = new Thread(new ConnectingTask());
connectingThread.start();
}
/**
* 与报警服务器建立连接
*
*/
class ConnectingTask implements Runnable {
@Override
public void run() {
try {
// 简单模拟2秒后通过Socket和服务器建立连接
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 连接建立完成
System.out.println("已连接报警服务");
onConnected();
// 每5s执行一次
heartbeatExecutor.scheduleAtFixedRate(new HeartbeatTask(), 0, 5, TimeUnit.SECONDS);
}
}
/**
* 心跳检查线程
*
*/
class HeartbeatTask implements Runnable {
@Override
public void run() {
if (!connected) {
System.out.println("断开连接");
}
}
}
/**
* 建立连接
*
*/
private void onConnected() {
// 通过blocker唤醒
try {
blocker.signalAfter(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
// 唤醒前的状态动作
// 修改连接报警服务器的状态
connected = true;
// 条件满足则执行唤醒
return Boolean.TRUE;
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
Detector detector = new Detector();
detector.init();
AlarmInfo alarmInfo = new AlarmInfo("9527", 1);
detector.sendAlarm(alarmInfo);
}
}
Predicate
、GuardedAction
、Blocker
、ConditionVarBlocker
这四个类的代码如下。
当然,使用juc
中的工具类,例如,BlockingQueue
或者Semaphore
同样可以实现上面的功能,这里只是多提供了一种可能性。
/**
* 条件接口
*
*/
public interface Predicate {
/**
* 判断条件是否满足,满足返回true否则false
*
*/
boolean evaluate();
}
/**
* 抽象目标动作,内部包含目标动作所需的保护条件
*
*/
public abstract class GuardedAction<V> implements Callable<V> {
/**
* 执行条件
*
*/
protected final Predicate predicate;
public GuardedAction(Predicate predicate) {
this.predicate = predicate;
}
}
/**
* 阻塞器接口,负责对GuardAction进行阻塞和唤醒
*
*/
public interface Blocker {
/**
* 在条件成立时执行动作,否则阻塞当前线程,直到条件成立
*
*/
<V> V callWithGuard(GuardedAction<V> guardedAction) throws Exception;
/**
* 先执行stateOperation,如果返回true则确定唤醒该Blocker上阻塞的一个线程
*
*/
void signalAfter(Callable<Boolean> stateOperation) throws Exception;
/**
* 直接唤醒blocker阻塞的某个线程
*
*/
void signal() throws Exception;
/**
* 根据stateOperation的值决定是否唤醒所有被阻塞的线程
*
*/
void broadcastAfter(Callable<Boolean> stateOperation) throws Exception;
}
/**
* 基于condition条件变量实现条件阻塞器
*
*/
public class ConditionVarBlocker implements Blocker {
/**
* lock锁
*
*/
private final Lock lock;
/**
* 条件变量
*
*/
private final Condition condition;
/**
* 是否允许获取当前blocker的锁
*
*/
private final boolean allowAccess2Lock;
public ConditionVarBlocker(boolean allowAccess2Lock) {
this(new ReentrantLock(), allowAccess2Lock);
}
public ConditionVarBlocker(Lock lock, boolean allowAccess2Lock) {
this.lock = lock;
this.condition = lock.newCondition();
this.allowAccess2Lock = allowAccess2Lock;
}
@Override
public <V> V callWithGuard(GuardedAction<V> guardedAction) throws Exception {
lock.lockInterruptibly();
try {
// 判断条件是否满足,满足则执行目标动作,不满足则进入到条件等待队列中
final Predicate predicate = guardedAction.predicate;
while (!predicate.evaluate()) {
System.out.println("正在连接到报警服务器,线程阻塞......");
// 这是最核心的一条语句:当还未连接(即条件不满足)时,线程进入到条件等待队列中
condition.await();
// 当线程从条件等待队列唤醒后,获取锁成功,然后再次尝试去判断条件是否满足
}
// 条件满足,执行目标内容
System.out.println("执行调用");
return guardedAction.call();
} finally {
lock.unlock();
}
}
@Override
public void signalAfter(Callable<Boolean> stateOperation) throws Exception {
lock.lockInterruptibly();
try {
if (stateOperation.call()) {
// 条件满足唤醒
System.out.println("唤醒线程调用");
condition.signal();
}
} finally {
lock.unlock();
}
}
@Override
public void signal() throws Exception {
lock.lockInterruptibly();
try {
condition.signal();
} finally {
lock.unlock();
}
}
@Override
public void broadcastAfter(Callable<Boolean> stateOperation) throws Exception {
lock.lockInterruptibly();
try {
if (stateOperation.call()) {
// 条件满足唤醒所有等待线程
condition.signalAll();
}
} finally {
lock.unlock();
}
}
}
感谢支持
更多内容,请移步《超级个体》。