Backpressure 在国内被翻译成背压,这个翻译在网上被很多人吐槽,我觉得大家的吐槽是有道理的,背压单纯从字面上确实看不出来有什么意思。所以松哥这里直接用英文 Backpressure 吧。
创新互联从2013年开始,是专业互联网技术服务公司,拥有项目成都网站设计、成都网站制作、外贸网站建设网站策划,项目实施与项目整合能力。我们以让每一个梦想脱颖而出为使命,1280元宝山做网站,已为上家服务,为宝山各地企业和个人服务,联系电话:028-86922220
Backpressure 是一种现象:当数据流从上游生产者向下游消费者传输的过程中,上游生产速度大于下游消费速度,导致下游的 Buffer 溢出,这种现象就叫做 Backpressure。
换句话说,上游生产数据,生产完成后通过管道将数据传到下游,下游消费数据,当下游消费速度小于上游数据生产速度时,数据在管道中积压会对上游形成一个压力,这就是 Backpressure,从这个角度来说,Backpressure 翻译成反压、回压似乎更合理一些。
Backpressure 会出现在有 Buffer 上限的系统中,当出现 Buffer 溢出的时候,就会有 Backpressure,对于 Backpressure,它的应对措施只有一个:丢弃新事件。那么什么是 Buffer 溢出呢?例如我的服务器可以同时处理 2000 个用户请求,那么我就把请求上限设置为 2000,这个 2000 就是我的 Buffer,当超出 2000 的时候,就产生了 Backpressure。
JDK9 中推出了 Flow API,用以支持 Reactive Programming,即响应式编程。
在响应式编程中,会有一个数据发布者 Publisher 和数据订阅者 Subscriber,Subscriber 接收 Publisher 发布的数据并进行消费,在 Subscriber 和 Publisher 之间还存在一个 Processor,类似于一个过滤器,可以对数据进行中间处理。
JDK9 中提供了 Flow API 用以支持响应式编程,另外 RxJava 和 Reactor 等框架也提供了相关的实现。
我们来看看 JDK9 中的 Flow 类:
非常简洁,基本上就是按照 Reactive Programming 的设计来的:
Publisher
Publisher 为数据发布者,这是一个函数式接口,里边只有一个方法,通过这个方法将数据发布出去,Publisher 的定义如下:
- @FunctionalInterface
- public static interface Publisher
{ - public void subscribe(Subscriber super T> subscriber);
- }
Subscriber
Subscriber 为数据订阅者,这个里边有四个方法,如下:
- public static interface Subscriber
{ - public void onSubscribe(Subscription subscription);
- public void onNext(T item);
- public void onError(Throwable throwable);
- public void onComplete();
- }
Subscription
Subscription 为发布者和订阅者之间的订阅关系,用来控制消息的消费,这个里边有两个方法:
- public static interface Subscription {
- public void request(long n);
- public void cancel();
- }
Processor
Processor 是一个空接口,不过它同时继承了 Publisher 和 Subscriber,所以它既能发布数据也能订阅数据,因此我们可以通过 Processor 来完成一些数据转换的功能,先接收数据进行处理,处理完成后再将数据发布出去,这个也有点类似于我们 JavaEE 中的过滤器。
- public static interface Processor
extends Subscriber , Publisher { - }
2.1 消息订阅初体验
我们通过如下一段代码体验一下消息的订阅与发布:
- public class FlowDemo {
- public static void main(String[] args) {
- SubmissionPublisher
publisher = new SubmissionPublisher<>(); - Flow.Subscriber
subscriber = new Flow.Subscriber () { - private Flow.Subscription subscription;
- @Override
- public void onSubscribe(Flow.Subscription subscription) {
- this.subscription = subscription;
- //向数据发布者请求一个数据
- this.subscription.request(1);
- }
- @Override
- public void onNext(String item) {
- System.out.println("接收到 publisher 发来的消息了:" + item);
- //接收完成后,可以继续接收或者不接收
- //this.subscription.cancel();
- this.subscription.request(1);
- }
- @Override
- public void onError(Throwable throwable) {
- //出现异常,就会来到这个方法,此时直接取消订阅即可
- this.subscription.cancel();
- }
- @Override
- public void onComplete() {
- //发布者的所有数据都被接收,并且发布者已经关闭
- System.out.println("数据接收完毕");
- }
- };
- //配置发布者和订阅者
- publisher.subscribe(subscriber);
- for (int i = 0; i < 5; i++) {
- //发送数据
- publisher.submit("hello:" + i);
- }
- //关闭发布者
- publisher.close();
- new Scanner(System.in).next();
- }
- }
松哥稍微解释一下上面这段代码:
2.2 模拟 Backpressure
Backpressure 问题在 Flow API 中得到了很好的解决。Subscriber 会将 Publisher 发布的数据缓存在 Subscription 中,其长度默认为256,相关源码如下:
- public final class Flow {
- static final int DEFAULT_BUFFER_SIZE = 256;
- public static int defaultBufferSize() {
- return DEFAULT_BUFFER_SIZE;
- }
- ...
- }
一旦超出这个数据量,publisher 就会降低数据发送速度。
我们对上面的案例进行修改,如下:
- public class FlowDemo {
- public static void main(String[] args) {
- SubmissionPublisher
publisher = new SubmissionPublisher<>(); - Flow.Subscriber
subscriber = new Flow.Subscriber () { - private Flow.Subscription subscription;
- @Override
- public void onSubscribe(Flow.Subscription subscription) {
- this.subscription = subscription;
- //向数据发布者请求一个数据
- this.subscription.request(1);
- }
- @Override
- public void onNext(String item) {
- System.out.println("接收到 publisher 发来的消息了:" + item);
- //接收完成后,可以继续接收或者不接收
- //this.subscription.cancel();
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- this.subscription.request(1);
- }
- @Override
- public void onError(Throwable throwable) {
- //出现异常,就会来到这个方法,此时直接取消订阅即可
- this.subscription.cancel();
- }
- @Override
- public void onComplete() {
- //发布者的所有数据都被接收,并且发布者已经关闭
- System.out.println("数据接收完毕");
- }
- };
- publisher.subscribe(subscriber);
- for (int i = 0; i < 500; i++) {
- System.out.println("i--------->" + i);
- publisher.submit("hello:" + i);
- }
- //关闭发布者
- publisher.close();
- new Scanner(System.in).next();
- }
- }
一共修改了三个地方:
修改完成后,我们再次启动项目,观察控制台输出:
可以看到,生产者先是一股脑生产了 257 条数据(hello0 在一开始就被消费了,所以缓存中实际上是 256 条),消息则是一条一条的来,由于消费的速度比较慢,所以当缓存中的数据超过 256 条之后,接下来都是消费一条,再发送一条。
2.3 数据处理
Flow.Processor 可以像过滤器一样,对数据进行预处理,数据从 publisher 出来之后,先进入 Flow.Processor 中进行预处理,然后再进入 Subscriber。
修改后的代码如下:
- public class FlowDemo {
- public static void main(String[] args) {
- class DataFilter extends SubmissionPublisher
implements Flow.Processor { - private Flow.Subscription subscription;
- @Override
- public void onSubscribe(Flow.Subscription subscription) {
- this.subscription = subscription;
- this.subscription.request(1);
- }
- @Override
- public void onNext(String item) {
- this.submit("【这是一条被处理过的数据】" + item);
- this.subscription.request(1);
- }
- @Override
- public void onError(Throwable throwable) {
- this.subscription.cancel();
- }
- @Override
- public void onComplete() {
- this.close();
- }
- }
- SubmissionPublisher
publisher = new SubmissionPublisher<>(); - DataFilter dataFilter = new DataFilter();
- publisher.subscribe(dataFilter);
- Flow.Subscriber
subscriber = new Flow.Subscriber () { - private Flow.Subscription subscription;
- @Override
- public void onSubscribe(Flow.Subscription subscription) {
- this.subscription = subscription;
- //向数据发布者请求一个数据
- this.subscription.request(1);
- }
- @Override
- public void onNext(String item) {
- System.out.println("接收到 publisher 发来的消息了:" + item);
- //接收完成后,可以继续接收或者不接收
- //this.subscription.cancel();
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- this.subscription.request(1);
- }
- @Override
- public void onError(Throwable throwable) {
- //出现异常,就会来到这个方法,此时直接取消订阅即可
- this.subscription.cancel();
- }
- @Override
- public void onComplete() {
- //发布者的所有数据都被接收,并且发布者已经关闭
- System.out.println("数据接收完毕");
- }
- };
- dataFilter.subscribe(subscriber);
- for (int i = 0; i < 500; i++) {
- System.out.println("发送消息 i--------->" + i);
- publisher.submit("hello:" + i);
- }
- //关闭发布者
- publisher.close();
- new Scanner(System.in).next();
- }
- }
简单起见,我这里创建了一个局部内部类 DataFilter,DataFilter 继承自 SubmissionPublisher 并实现了 Flow.Processor 接口,由于 DataFilter 继承自 SubmissionPublisher,所以它也兼具 SubmissionPublisher 的功能。
在 DataFilter 中完成消息的处理并重新发送出去。接下来定义 publisher,让 dataFilter 作为其订阅者,再定义新的订阅者,作为 dataFilter 的订阅者。
最终运行效果如下:
好啦,这就是今天和大家介绍的 Java9 中的 Reactive Stream,那么至此,我们的 WebFlux 前置知识差不多告一段落了,下篇文章开始,正式开整 WebFlux。
本文转载自微信公众号「江南一点雨」,可以通过以下二维码关注。转载本文请联系江南一点雨公众号。
当前标题:我们学习WebFlux前置知识
本文链接:http://www.shufengxianlan.com/qtweb/news36/270736.html
网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联