发布与订阅
原创大约 3 分钟
发布订阅
发布-订阅
模型,或者说生产者-消费者
模型,本质上就是对观察者
模式的应用。
它可以将提供服务的一方和消费服务的一方完全分离开,实现高度解耦,让它们彼此都可以完全按照自己的节奏来行动,互相不干扰,唯一的要求就是引入中间队列。

而且有了中间队列之后,对于生产者
(也就是发布者
)和消费者
(也就是订阅者
)来说,没有数量上的限制,也没有一一对应的要求。

通过ArrayBlockingQueue
就可以实现最简单的发布-订阅
模式。
/**
* 简单的生产者与消费者
*
*/
public class PubSub {
public static void main(String[] args) {
// 生产者线程池
ExecutorService producerThreads = Executors.newFixedThreadPool(2);
// 消费者线程池
ExecutorService consumerThreads = Executors.newFixedThreadPool(3);
// 任务队列
ArrayBlockingQueue<String> taskQueue = new ArrayBlockingQueue<String>(5);
// 生产者提交任务
producerThreads.submit(() -> {
try {
for (int i = 0; i < 10; i++) {
taskQueue.put(String.valueOf(i));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 消费者处理任务
consumerThreads.submit(() -> {
try {
while (true) {
String result = taskQueue.take();
System.out.println(result);
if (taskQueue.isEmpty()) {
System.out.println("任务队列为空,线程退出");
break;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
优化性能
某家银行推出了一站式贷款功能,只需要用户上传具有电子签名的合同即可。虽然看起来只有上传合同附件这一个操作,实际上将它分解的话,是有三个子操作的,尤其是为合同生成索引的操作最为耗时,经常导致应用卡死。

既然整个操作中最耗时就是生成索引,所以可以考虑将它拆分出来,单独用发布-订阅
模式处理。当用户合同上传完成后就可以直接返回页面,避免用户长时间等待了。

下面是它的代码实现。
/**
* 交易合同处理程序
*
*/
public class ContractAttachmentProcessor {
private final BlockingQueue<ContractFile> fileChannel = new ArrayBlockingQueue<ContractFile>(10);
/**
* 合同类
*
*/
static class ContractFile {
/**
* 合同文件名称
*
*/
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "{ name='" + name + "'}";
}
}
/**
* 构建indexFile文件
*
*/
private void builderIndexFile(ContractFile contractFile) throws InterruptedException {
TimeUnit.SECONDS.sleep(2);
System.out.println("解析合同:" + contractFile + " 索引文件完成");
}
/**
* 消费者
*
*/
private final Thread indexing = new Thread() {
@Override
public void run() {
ContractFile contractFile = null;
try {
while (!fileChannel.isEmpty()) {
contractFile = fileChannel.take();
System.out.println("解析合同:" + contractFile);
builderIndexFile(contractFile);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
/**
* 生产者
*
*/
public void uploadContract(ContractFile contractFile) {
try {
// 保存到数据库中
System.out.println("保存合同 " + contractFile + " 到数据库");
TimeUnit.SECONDS.sleep(1);
// 上传合同附件,放入到通道的队列中
System.out.println("上传合同附件:" + contractFile);
TimeUnit.SECONDS.sleep(1);
fileChannel.put(contractFile);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 开始解析
*
*/
public void parse() {
indexing.start();
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ContractFile firstFile = new ContractFile();
firstFile.setName("lixingyun");
ContractFile secondFile = new ContractFile();
secondFile.setName("wanglin");
ContractAttachmentProcessor attachmentProcessor = new ContractAttachmentProcessor();
attachmentProcessor.uploadContract(firstFile);
attachmentProcessor.uploadContract(secondFile);
System.out.println("上传合同附件耗时:" + (System.currentTimeMillis() - start) / 1000 + " 秒");
System.out.println("文件上传完成,可以进行下一步处理了");
// 后台解析合同
System.out.println("开始解析合同......");
attachmentProcessor.parse();
}
}
感谢支持
更多内容,请移步《超级个体》。