一切皆是流
一切皆是流
正如面向对象的编程语言Java号称一切皆是对象
那样,在反应式编程范式中,一切皆是流
。
大部分碳基生物,时时刻刻都要吸入和呼出氧气,也就是气流;除了呼吸,还需要通过吃东西来补充能量,这是食物流;我们平常看到的风景、人物、短视频之类的,都属于视觉流,还包括语音流等。

深入观察上图,就可以知道各种不同的流具有如下属性。
流基本上都是持续且源源不断的,有的流无法主动终止,如噪音污染。
各个流之间基本上都是异步进行的。
流与流之间有明显的边界。
各种流的“速度”是不一样的,有的流速度快,有的流速度慢。
流可以被“订阅”,例如短视频。也可以取消,不再接收流中的数据或内容。
多个流可以被合并,一个或多个流也可以用作另一个流的输入,例如语音流既可以独立流动,也可以作为视频流的一部分。
只要流输入发生变化,流的输出会相应自动变化,这种变化是
传递
的。
而对于计算机程序来说,各种外部事件,例如用户单击页面按钮、在聊天中发送消息、键盘敲击、蓝牙耳机不断接收音乐数据等,就类似于上图的输入流,它们也具有流的属性,而且程序也需要及时对这些流做出响应,或者反应。
Reactive eXtensions
最早的反应式编程的实现是ReactiveX,它是Reactive eXtensions
的缩写,一般简写为Rx
,在2009年它被微软用.Net实现。

Rx
融合了设计模式中的观察者模式、迭代器模式和函数式编程范式,它以一种简单的方式了一个创建异步的,且基于事件驱动的程序,并用其实现创建、订阅、组合、过滤和映射数据流的诸多功能,这种范式被以反应式宣言的形式发表。

这份宣言也同时规定
了所有反应式系统所必须具备的四大特性,它主要使用消息(或事件)驱动
作为手段来构建系统,在形式上达到所谓的弹性
和回弹性
,最后产生即时响应性
的价值。
反应式宣言的系统特质如下图所示。

宣言
对这四个特性的解释也很简单直白。
即时响应性
:为了用户体验,哪怕系统再繁忙,也应该及时地给用户以响应,别让用户干等。回弹性
:就算系统出现异常或错误也要保持即时响应性,而不是一躺了事
。弹性
:系统性能可以做到伸缩自如,也就是说在业务高峰期能及时调集资源保证系统运行平稳,而在业务低谷期也能及时释放资源节约成本。消息(或事件)驱动
:不同组件之间的互相调用通过异步的消息传递来实现。这样既明确了组件的职责边界,又能解耦。
在第四个特性中特别提到了Backpressure
问题,也就是背压
(或回压
)。所谓Backpressure
,其意思就等同于是防洪
——当上游生产速度大于下游消费速度的时候,可能会把系统给冲垮
。
Backpressure
的出现和Buffer
的大小有关。只有当Buffer
设置不合理的时候才会出现Backpressure
。 从反应式宣言来看,Rx
几乎适用于从前端、移动端再到服务端的任何应用场景。在前端页面上,它可以通过消息驱动的方式处理Web UI
事件和API
应答;在移动端,实现基于事件流的链式网络请求;而在服务端中,通过异步调用和简单的并发处理,实现松耦合的系统架构。但如果只是做一个简单且无需升级的单体应用,就没有必要用Rx
了。
到目前为止,已经有十几种编程语言支持Rx
,也出现了诸多流行的Rx框架,例如,RxJava、RxKotlin、Rx.NET、Vert.x、Spring WebFlux和Bacon.js。由于它们标准不一,且相互之间缺乏互操作性,因此给开发者造成了一些不必要的麻烦和选择困难。为了解决这个问题,Reactive Streams规范出现了。
Reactive Streams
Reactive Streams是一种针对运行时环境和网络协议的非阻塞式背压异步流处理规范。 这么说有点绕口,翻译成大白话就是:为了统一不规范的反应式编程范式,Reactive Streams推出了一组标准规范,只要是遵循这些规范的框架和编程语言,不仅能够满足对反应式编程开发的需求,它们之间还能实现互操作。
Reactive Streams声明其主要目标是处理跨边界的流数据的交换,也就是处理不同异步线程或线程池之间的数据交换,同时确保接收方不会出现Buffer
溢出,即Backpressure
问题。之所以要如此声明,是因为在资源集中的情况下,异步非阻塞是最合适的工作方式,也是必须的。而某些实时数据流是无法预估其数据量大小的,如果不仔细控制资源消耗,那么数据流的接收方将很快会被冲垮
。
同时,Reactive Streams只涉及在不同的API
组件之间调用数据流,并在其开发过程中,确保Reactive Streams规范可以涵盖所有流调用的基本方式,但不包括对流操作进行精确的控制,如转换、合并、拆分等。
总之,Reactive Streams规范是一种基于异步流处理的标准,旨在使流处理更加可靠、高效和响应式

2015年首次创建的Reactive StreamsAPI
包含了4个接口。
Publisher:发布者
,它定义了生产元素并将其发送给订阅者的方法。它也可能是一个有无限数量有序元素的提供者,并将这些数据发布给订阅者。Subscriber:订阅者
,它定义了接收元素并进行处理的方法。它也是一个流观察者,从发布者那里接收并处理数据流。Subscription:订阅行为
,定义了订阅者和发布者之间的协议,包括请求数据和取消订阅等操作。Processor:处理器
,定义了同时实现Publisher和Subscriber接口的中间件,它可以对数据进行转换、合并、过滤或映射等操作。
这四个接口之间的关系如下图所示。

Reactive Streams在Java中是以JDK 9
的java.util.concurrent.Flow
接口形式出现的。Flow
接口与Java完全等价。下面是JDK 9
中的Flow
部分源码。
public final class Flow {
private Flow() {}
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
public static interface Subscription {
public void request(long n);
public void cancel();
}
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
......
}
Flow
虽然遵守了Reactive Streams的标准规范,但它本身并没有具体实现,而是通过RxJava、Reactor、Akka Streams等其他框架实现的。可以通过一个具体的RxJava
例子来说明。
package cn.javabook.chapter06;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class RxJavaDemo {
public static void main(String[] args) {
// 第一种:RxJava 1.x不支持背压
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) {
emitter.onNext("Hello");
emitter.onNext("RxJava");
emitter.onNext("Observable");
emitter.onComplete();
}
}).subscribe(
new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable disposable) {
// 停止产生数据的信号,为遵循Reactive Streams,避免名字冲突
// 将rx.Subscription改名为io.reactivex.disposables.Disposable
// disposable.dispose();
System.out.println("Disposable: " + disposable.toString());
}
@Override
public void onNext(@NonNull String value) {
System.out.println("onNext: " + value);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("onError: " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
// 第二种:RxJava 2.x及以上支持背压
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull FlowableEmitter<String> emitter) {
emitter.onNext("Hello");
emitter.onNext("RxJava");
emitter.onNext("Flowable");
emitter.onComplete();
}
// 背压策略
}, BackpressureStrategy.ERROR)
// 背压操作符
//.onBackpressureBuffer()
// 使被观察者在独立线程执行
//.subscribeOn(Schedulers.newThread())
// 使观察者在独立线程执行
//.observeOn(Schedulers.newThread())
.subscribe(
new Subscriber<String>() {
// 观察者设置接收事件的数量,如果不设置接收不到事件
@Override
public void onSubscribe(@NonNull Subscription subscription) {
// 订阅后也可取消订阅,和disposable.dispose()一样
// subscription.cancel();
subscription.request(2);
System.out.println("Subscription: " + subscription.toString());
}
@Override
public void onNext(String value) {
System.out.println("onNext: " + value);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
}
);
}
}
在上面的代码中,通过两种方式实现了发布-订阅
。
Observable
和Flowable
都作为被观察对象,它们扮演的是都发布者Publisher
的角色。而匿名内部类Observer
和Subscriber
则作为观察者,扮演的是订阅者Subscriber
的角色,通过Publisher.subscribe()
方法实现内容的订阅。
之后,Subscriber
调用onSubscribe()
方法接收订阅数据,通过onNext()
方法处理数据,当所有的处理完成后,调用onComplete()
方法结束。
唯一不同的是,Observable
不支持背压,而Flowable
支持。这一点从代码中就能够看出来。只有明确设置了接收事件的数量,才能收到Publisher
发送的事件,也就是如下列代码段所示。
......
subscription.request(n);
......
如果不设置它,作为Flowable
的观察者Subscriber
是永远也响应不了事件的。

Reactive Streams的特性包括如下五个。
变化传递。
数据流。
声明式。
背压。
异步边界。
首先,变化传递
其实就是事件驱动,当发布者Publisher
生产一个数据后,就把它放入到组件中。之后数据就会在组件中不断传递,直至抵达终点。

上图所示是Excel电子表格。可以看到,只要A1
或A2
单元格中的数值发生变化,那么其后所有的求和结果都会发生变化,这就是基于事件驱动的变动传递,这也是反应式编程之所以称为反应
的原因之一。
其次,变动传递是基于数据流
进行的,而数据流则是按时间顺序排列的即将发生的事件的序列,这个序列上包含事件的创建、处理、异常和结束。

当产用户输入网址,然后再输入用户名密码并单击提交按钮时,一系列的事件就会被触发,并随之触发其他更多的功能,产生更多的数据,这一系列的过程连贯起来就是流。
第三,声明式
的意思是说程序保存的不再是结果,而是逻辑。例如,假设有下列代码段。
m = 1;
n = 2;
k = m + n;
在传统命令式编程中,k
仅保存当次计算的结果,即3
。如果m
和/或n
的值发生变化,那么需要重新执行一遍赋值语句才能改变k
的值。但在反应式编程中,k
保存的是m + n
的计算逻辑,只要当m
和/或n
的值发生变化,那么k
的值也一定会随之发生变化,而不需要重新再次执行赋值语句,这个特性需要变动传递的支持。
第四,背压
的意思之前说过,是为了防止上游冲垮下游,如下图所示。

背压说到底,就是一种保护机制,比如音量太大了耳朵受不了就把音量调小一点。它同时也是一种组件之间协调速度的方法,可以让Subscriber
向Publisher
反馈其消费能力。
最后,异步边界
指的是数据以异步的方式在流之中传递,强制它不能阻塞发布者,尤其是线程之间是被不同的组件或算子所隔离的。

T1
、T2
、T3
和T4
这四个线程以异步的方式同时执行四种不同的组件或算子操作,而相互之间不会有任何影响。
观察者模式
Reactive StreamsAPI
的四个接口定义了发布者和订阅者,而这种发布-订阅
关系还有另外一个名称:观察者模式
。


可以用代码来实现这种剧院和观众的观察者模式。
package cn.javabook.chapter06;
import java.util.ArrayList;
import java.util.List;
/**
* 剧院
*
*/
public class Theatre {
// 订阅接口
@FunctionalInterface
interface Viewer {
public void watch();
}
// 观众(具体订阅者)
static class ConcreteViewer implements Viewer {
@Override
public void watch() {
System.out.println("正在看表演");
}
}
// 演员(发布者)
static class Actor {
// 观众(订阅者)列表
private final List<Viewer> viewers = new ArrayList<>();
// 售票(订阅)
public void sell(Viewer viewer) {
viewers.add(viewer);
}
// 开始表演
public void play() {
for (Viewer viewer : viewers) {
viewer.watch();
}
}
}
public static void main(String[] args) {
ConcreteViewer concreteViewer1 = new ConcreteViewer();
ConcreteViewer concreteViewer2 = new ConcreteViewer();
Actor actor = new Actor();
actor.sell(concreteViewer1);
actor.sell(concreteViewer2);
actor.sell(() -> System.out.println("正在包厢看表演"));
actor.play();
}
}
所以,反应式编程本质上就是Lambda表达式
、流式计算
和观察者模式
的结合。

感谢支持
更多内容,请移步《超级个体》。