1. Reactive Programming 响应式编程
1.1. 什么是什么是响应式编程
维基百科的解释如下:
In computing, reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s), and that an inferred dependency within the associated execution model exists, which facilitates the automatic propagation of the change involved with data flow.
在计算中,响应式编程是一种专注于数据流和变化传递的异步编程范式。这意味着通过所使用的编程语言可以轻松地表达静态(如数组)或动态(如事件发射器)数据流, 并在关联执行模型中推断依赖关系,这有助于与数据流相关的更改的自动传播。
响应式编程(reactive programming)是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式。
1.2. 响应式系统的特性
响应式系统有如下4个特性:
- Responsive 【响应性】: 响应式系统需要在合理的时间内处理请求
- Resilient 【弹性】: 在面对失败(崩溃、超时、500个错误……)时必须保持响应,因此它必须针对失败进行设计并适当地处理它们。即使在部分组件开始出现故障的情况下也应该能够作出响应,将停机时间将至最低
- Elastic 【可伸缩性】: 必须在各种负载下保持响应。因此它必须伸缩,并且能够用最少的资源处理负载。
- Message driven 【消息驱动】: 响应式系统中的组件使用异步消息传递进行交互。
2. 核心概念
响应式流是一种异步编程范式,ReactiveX是一种常用的跨平台实现。ProjectReactor和Akka是另外两种实现。下面三个重要的概念是响应式流API的构建基础:
- Publisher 发布者: 事件的发送方,可以向它订阅。
- Subscriber 订阅者: 事件订阅方。
- Subscription 订阅: 将发布者和订阅者联系起来,使订阅者可以向发布者发送信号。
3. 必须知道的5件事
3.1. 响应式编程是用异步数据流编程
当使用响应式编程时,数据流将成为应用程序的核心。事件、消息、调用甚至失败都将通过数据流传递。使用响应式编程,您可以观察这些流,并在事件产生时作出反应。
3.2. 理解数据流的类型:冷的,热的?
程序中处理的数据流有2种类型: 热的和冷的。理解这2种数据流的差异是成功使用反应式编程的关键。
- 冷的数据流: 观测是懒惰的。他们什么都不做,直到有人开始观察他们(订阅RX)。冷流用于表示异步操作,例如直到有人对结果感兴趣时才会执行。冷流产生的数据不会在订阅者之间共享,当你订阅时,你会得到所有的项目。
- 热的数据流在订阅之前是活跃的,比如股票行情,或者由传感器或用户发送的数据。数据独立于单个订户。当一个观察者订阅了一个热可观察对象,它将获得订阅后发出的流中的所有值。这些值在所有订阅者之间共享。例如即使没有人订阅温度计,它也会测量并发布当前的温度。当订阅者注册到数据流时,它自动接收到下一个度量值。
为什么理解你的数据流是热的还是冷的如此重要?因为它改变了您的代码如何使用传递的项。如果你没有订阅一个hot observable,你就不会接收到数据,这个数据就会丢失。
3.3. 避免异步的坑
通过围绕数据流构建程序,您正在编写异步代码: 当流发出新的数据时将调用这些代码 。线程、阻塞代码和副作用在这个上下文中是非常重要的。
没有副作用的函数只能通过它们的参数和返回值与程序的其余部分交互。
副作用是非常有用的,在许多情况下是不可避免的。但副作用也有缺陷。
在使用反应式编程时,应该避免不必要的副作用,并在使用它们时有明确的意图。所以拥抱不变性和无副作用的功能。
虽然有些情况是合理的,但滥用副作用会导致暴雷:线程安全
观察流并在有趣的事情发生时得到通知是很好的,但是您一定不要忘记是谁在调用您,或者更准确地说,是在哪个线程上执行您的函数。强烈建议避免在程序中使用过多的线程。依赖于多个线程的异步程序变成一个棘手的同步难题,通常以死锁告终
永远不要阻塞。因为调用你编写的代码的线程不是你创建的,所以你必须确保永远不要阻塞它。如果你这样做了,你可能会避免其他项被触发,它们将被缓冲直到缓冲区满了为止(在这种情况下可以触发回压【back-pressure】)
3.4. 保持简单
能力越大,责任越大。RX提供了很多很酷的功能,但使用时很容易被滥用,导致代码晦涩难懂。将flapmap、retry、debounce和zip链接在一起会让你觉得自己像个忍者,但是,永远不要忘记优秀的代码需要能够被其他人读懂。
RX功能强大,滥用或不解释都会让你的同事变得暴躁,如下面的代码:
manager.getCampaignById(id)
.flatMap(campaign ->
manager.getCartsForCampaign(campaign)
.flatMap(list -> {
Single<List<Product>> products = manager.getProducts(campaign);
Single<List<UserCommand>> carts = manager.getCarts(campaign);
return products.zipWith(carts,
(p, c) -> new CampaignModel(campaign, p, c));
})
.flatMap(model -> template
.rxRender(rc, "templates/fruits/campaign.thl.html")
.map(Buffer::toString))
)
.subscribe(
content -> rc.response().end(content),
err -> {
log.error("Unable to render campaign view", err);
getAllCampaigns(rc);
}
);
3.5. 响应式编程 != 响应式系统
这可能是最令人困惑的部分: 使用响应式编程并不能构建一个响应式系统。响应式系统是一种构建响应式分布式系统的架构风格。响应式系统可以被看作是正确的分布式系统。响应式系统有四个特性:
- Responsive 【响应性】: 响应式系统需要在合理的时间内处理请求
- Resilient 【弹性】: 在面对失败(崩溃、超时、500个错误……)时必须保持响应,因此它必须针对失败进行设计并适当地处理它们。即使在部分组件开始出现故障的情况下也应该能够作出响应,将停机时间将至最低
- Elastic 【可伸缩性】: 必须在各种负载下保持响应。因此它必须伸缩,并且能够用最少的资源处理负载,要求系统能动态的查找和修复系统性能瓶颈。
- Message driven 【消息驱动】: 响应式系统中的组件使用异步消息传递进行交互,依赖于异步消息传递机制
尽管这些特性的基本原理简单,但要建立其中的一个却很棘手。通常每个节点都需要包含一个异步非阻塞开发模型、一个基于任务的并发模型并使用非阻塞I/O。
4. 响应式编程的好处
- 在业务层实现代码逻辑分离,方便后期维护和扩展
- 极大的提高程序响应速度,充分挖掘CPU的能力
- 帮助开发者提高代码的抽象能力和充分理解业务的能力
- Rx丰富的操作符会帮助开发者极大的简化代码逻辑
5. 响应式编程中的设计模式
响应式编程扩展并增强了观察者模式与迭代模式。
5.1. 与观察者模式的区别
响应式模式与观察者模式有许多的相似之处,其中观察者模式的类图及Observable流程如下:
- Observer(观察者)订阅一个Observable,当Observable发送数据时,Observer会通过消费或转换数据来做出响应。在等待Observable发送数据时不需要进行阻塞,,这种模式有助于并发操作。
- 响应式编程扩展并增强了观察者模式,使用Observable、Flowable表示可观测对象,可观测对象支持冷、热两种类型,在订阅者消费不过来时,支持背压。
下图为Observable的流程
5.2. 与Observable与迭代器的区别
响应式编程中的Observable类似于指令迭代器,它解决了相同的问题,但策略不同。Observable通过异步方式推送数据,而迭代器采用同步方式拉取数据。处理错误的方式也不同:Observable使用错误回调函数,而迭代器使用抛出异常。
通过使用订阅方法(onNextAction、onErrorAction、onCompletedAction)将Observer与Observable关联起来。
从设计的角度看,响应式编程的Observable通过使用onError回调方法和onCompleted回调方法添加了在发送完成和发送异常时能够发送通知的功能,使观察者模式得到了增强。
事件 | Iterable | Observable |
---|---|---|
获取数据 | T next() | onNext(T) |
异常 | thrown new Exception | onError(Exception) |
完成后触发 | Return | onCompleted() |
参考
5 Things to Know About Reactive Programming
https://github.com/ReactiveX/RxJava
https://www.tutorialspoint.com/rxjava/rxjava_overview.htm
https://www.infoq.com/news/2016/01/reactive-basics/