响应式编程入门之 Project Reactor

知乎 · · 1033 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

本文目标

  • 理解响应式编程

前言

之前的 《聊聊 IO 多路复用》 中,我们理解了非阻塞 IO 的意义。但是 Spring MVC 并不能完美的应用非阻塞编程,于是 Spring 团队开发了 WebFlux,而 WebFlux 的基础正是本文要讲到的 Project Reactor(下文简称为 Reactor)

本文以 Reactor 为例带大家入门响应式编程

版本

 <dependency>
  <groupId>io.projectreactor</groupId>
  <artifactId>reactor-core</artifactId>
  <version>3.4.6</version>
 </dependency>

什么是 Reactor

Reactor 是 JVM 的非阻塞响应式编程基础,支持背压。 它直接与 Java 8 函数式 API 集成,特别是 CompletableFuture、Stream 和 Duration。 它提供了可组合的异步序列 API — Flux(用于 [N] 个元素)和 Mono(用于 [0|1] 个元素),并实现了 Reactive Streams 规范。 在 Reactor 的基础上还演化出了适合微服务架构的 Reactor Netty 。为 HTTP(包括 Websockets)、TCP 和 UDP 提供支持背压和响应式的网络引擎。

上面是对于官方文档的翻译。下面来说说我自己对 Reactor 和响应式编程的理解。

回想一下之前的非阻塞 IO 编程,例如我们现在要用非阻塞的方式调用一个远程服务,当远程接口数据可用时去做一些业务处理。这时候代码怎么写呢?我们需要提供一个回调函数,然后在响应就绪的时候,去调用我们的回调函数。

从逻辑上来看,这完全没有问题。但是如果我们的回调很复杂,代码看起来会是什么样呢?

// 以下案例来自 Reactor 官网
userService.getFavorites(userId, new Callback<List<String>>() { 
  public void onSuccess(List<String> list) { 
    if (list.isEmpty()) { 
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) { 
          UiUtils.submitOnUiThread(() -> { 
            list.stream()
                .limit(5)
                .forEach(uiList::show); 
            });
        }

        public void onError(Throwable error) { 
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() 
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, 
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }

  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});

这个代码说实话已经有点回调地狱那味儿了,让一段不是很复杂的逻辑变得很难读了。但是如果用 Reactor 写呢?

// 以下案例来自 Reactor 官网
userService.getFavorites(userId) 
           .flatMap(favoriteService::getDetails) 
           .switchIfEmpty(suggestionService.getSuggestions()) 
           .take(5) 
           .publishOn(UiUtils.uiThreadScheduler()) 
           .subscribe(uiList::show, UiUtils::errorPopup);

可以看到,代码变得非常的简洁。唯一带来的困扰就是,我们不知道这些函数到底是啥意思

响应式编程虽然有非常多的特性,但是它并不是什么神奇的技术,他也是建立在传统命令式编程的基础上。只不过它所提供的 API 以及规范更适合在非阻塞 IO 中使用。虽然在非阻塞 IO 框架中几乎只使用响应式编程(Vertx,WebFlux),只是因为这样做更合适,并不是说没了响应式编程,就玩不了非阻塞 IO 了。

响应式编程内幕

Reactor 实现了 org.reactivestreams 提供的 Java 响应式编程规范,我们只要了解 reactivestreams 中代码是如何运转的,再看 Reactor 相关的代码就容易多了。

下图展示了 reactivestreams 中的核心接口

  • Publisher:发布者
  • Subscriber:订阅者
  • Subscription:这个单词中文翻译为名词的订阅,在代码中它是发布者和订阅者之间的媒介
  • Processor:该接口继承了发布者和订阅者,可以理解为发布者和订阅者的中间操作(但是 Reactor 的中间操作并没有实现 Processor,在最新版本的 Reactor 中,Processor 的相关实现接口已经被弃用)

在了解了响应式编程的核心接口之后,我们来看下响应式编程是如何运作的

在 Reactor 中大部分实现都是按照上图的逻辑来执行的

  1. 首先是Subscriber(订阅者)主动订阅 Publisher(发布者),通过调用 Publisher 的 subscribe 方法
  2. Publisher 在向下游发送数据之前,会先调用 Subscriber 的 onSubscribe 方法,传递的参数为 Subscription(订阅媒介)
  3. Subscriber 通过 Subscription#request 来请求数据,或者 Subscription#cancel 来取消数据发布(这就是响应式编程中的背压,订阅者可以控制数据发布)
  4. Subscription 在接收到订阅者的调用后,通过 Subscriber#onNext 向下游订阅者传递数据。
  5. 在数据发布完成后,调用 Subscriber#onComplete 结束本次流,如果数据发布或者处理遇到错误会调用 Subscriber#onError
调用 Subscriber#onNext,onComplete,onError 这三个方法,可能是在 Publisher 中做的,也可能是在 Subscription 中做的,根据不同的场景有不同的实现方式,并没有什么严格的要求。可以认为 Publisher 和 Subscription 共同配合完成了数据发布

其实 Reactor 中 API 实现原理也都是这个套路,我这边也自己写了个例子便于让读者加深对响应式编程的理解

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/**
 * @author tianwen.yin
 */
public class SimpleReactiveStream {

    /**
     * 实现一个简单的响应式编程发布者
     * 逻辑:当订阅者发起订阅时,像下游发送一个 HelloWorld,发布逻辑由 SimpleSubscription 完成
     */
    static class SimplePublisher implements Publisher {
        @Override
        public void subscribe(Subscriber s) {
            // 2. Publisher 发布数据之前,调用 Subscriber 的 onSubscribe
            s.onSubscribe(new SimpleSubscription(data(), s));
        }

        private String data() {
            return "Hello World";
        }
    }

    static class SimpleSubscriber implements Subscriber {
        @Override
        public void onSubscribe(Subscription s) {
            // 3. Subscriber 通过 Subscription#request 来请求数据
            // 或者 Subscription#cancel 来取消数据发布
            s.request(Long.MAX_VALUE);
        }

        @Override
        public void onNext(Object o) {
            System.out.println(o);
        }

        @Override
        public void onError(Throwable t) {
            System.out.println("error");
        }

        @Override
        public void onComplete() {
            System.out.println("complete");
        }
    }

    static class SimpleSubscription implements Subscription {
        String data;
        Subscriber actual;
        boolean isCanceled;

        public SimpleSubscription(String data, Subscriber actual) {
            this.data = data;
            this.actual = actual;
        }

        @Override
        public void request(long n) {
            if (!isCanceled) {
                try {
                    // 4. Subscription 在接收到订阅者的调用后
                    // 通过 Subscriber#onNext 向下游订阅者传递数据
                    actual.onNext(data);
                    // 5. 在数据发布完成后,调用 Subscriber#onComplete 结束本次流
                    actual.onComplete();
                } catch (Exception e) {
                    // 如果数据发布或者处理遇到错误会调用 Subscriber#onError
                    actual.onError(e);
                }
            }
        }

        @Override
        public void cancel() {
            isCanceled = true;
        }
    }

    public static void main(String[] args) {
        // 1. Subscriber ”订阅“ Publisher
        new SimplePublisher().subscribe(new SimpleSubscriber());
    }

}

响应式编程思想

响应式编程,就像装配一条流水线。Publisher 规定了数据如何生产,中间会有 Operators(操作符)对流水线的数据进行解析,校验,转换等等操作,最终处理好的数据流转到 Subscriber。

这条流水线还有一个特点。大部分情况下当 Publisher 的 subscribe 方法被调用之前,什么都不会发生。在被订阅之前我们只是在定义流水线该如何工作,直到真正有人需要的时候,流水线才会启动。

Reactor 中的 Operator

Operators 怎么理解呢?对于上游来说,Operators 像一个订阅者,而对于它的下游来说,它像一个发布者(我们上文说过了 Reactor 中的中间操作并没有实现 Processor 接口)

    Mono.just("hello")
            .map(a -> a + "world")
            .subscribe(System.out::println);

举个简单的例子,在上面的代码中,map 就是一个 Operator,它的实现思路是什么?来看下面的代码

// 注意,这是我基于 Reactor API 实现的伪代码!
    public static class MonoMap implements Publisher {
        // 我们自定义的转换逻辑
        private Function mapper;
        // source 代表当前操作符的上游发布者
        private Publisher source;

        public MonoMap(Publisher source, Function mapper) {
            this.source = source;
            this.mapper = mapper;
        }

        @Override
        public void subscribe(Subscriber actual) {
            source.subscribe(new MonoMapSubscriber(mapper, actual));
        }
    }

    public static class MonoMapSubscriber implements Subscriber {
        // 我们自定义的转换逻辑
        private Function mapper;
        // 真正的下游
        private Subscriber actual;

        public MonoMapSubscriber(Function mapper, Subscriber actual) {
            this.mapper = mapper;
            this.actual = actual;
        }

        @Override
        public void onSubscribe(Subscription s) {
            actual.onSubscribe(s);
        }

        @Override
        public void onNext(Object o) {
            // 当上游数据发送过来时,先进行转换再发送给下游
            Object result = mapper.apply(o);
            actual.onNext(result);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }
    }

上述代码是我自己实现的一个伪代码,用于让大家理解操作符的实现思路,实际 Reactor 代码也是这个思路,只不过实现的更加巧妙和严谨

我们首先来分析一下 Mono.just("hello").map(a -> a + "world") 这句话

  1. 当执行到 Mono.just 时,会新建一个 MonoJust 对象作为当前的 Publisher。该发布者的逻辑是,当订阅时,向下游发送数据 "hello"
  2. 当执行到 map 方法时,会新建一个 MonoMap 对象替作为当前的 Publisher,MonoJust 成为了 MonoMap 中的一个属性 source(实际的上游)
  • 当 MonoMap 被订阅时,会先将它的下游 actual 做一层包装,也就是我们上面的 MonoMapSubscriber。然后去调用 source 的 subscribe 方法。上游发布数据时,MonoMapSubscriber 先对数据进行转换(我们上面的拼接字符串操作),然后再发送给 actual(它的下游)
  • 当 MonoMap 被再次转换时,MonoMap 就变成了下游操作符的 source...

最后通过一张图来总结一下

Reactor 该如何学习

本文并没有介绍太多 Reactor 的细节,因为这些东西实在是太多了。我想聊聊我自己是如何学习 Reactor 的

如果你已经通过本文理解了响应式编程的核心接口是如何工作的了,那恭喜你已经迈向了成功的第一步了。接下来就是阅读官方文档,不断的练习和阅读 Reactor 的源码。源码追踪的方向已经很明确了,当我们想了解一个发布者的实现原理是什么,我就要去关注这个发布者的 subscribe 方法和 Subscription 都做了什么。想了解消费者的逻辑,就看它的 onNext,onComplete,onError。

最后

如果觉得我的文章对你有帮助,动动小手点下关注,你的支持是对我最大的帮助

本文来自:知乎

感谢作者:知乎

查看原文:响应式编程入门之 Project Reactor

1033 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传