webflux是什么
webflux是spring推出的响应式web框架,它的对标产品为spring-mvc, 与传统的spring-mvc相比较,webflux是完全非阻塞式的。spring官网上的这张图向我们展示了两者之间的主要差别:
-
编程模式:命令式编程(mvc:imperative) VS 声明式编程(webflux:declarative)
在mvc中,我们在方法为单位,组织我们的每一条命令,虚拟机按照我们的编排,顺序地执行这些命令,程序执行 流程易于理解,编程和debug较为简单
在webflux中,我们以操作符(operator)串起来的流来定义我们想让程序如何执行,但这只是一个声明,就好 像我们写的是一个菜谱,没有原材料来的话,它就是一个静态的菜谱,在webflux中,这个材料就是数据,数据沿着一个个操作符流动的时候,我们的程序才开始执行,在webflux中,第一要义就是“在订阅(subscribe)之前,什么都不会发生”,编程的过程,就好像定义一段一段的水管,这些水管会对流经其中的水做一些处理,但只有打开了最下游的水龙头,水才能从源头流动起来,这些水管才能开始他们的工作,这个水龙头的打开就是订阅。
-
并发模型: Thread Per Request Model VS EventLoop
在mvc中,springboot默认使用tomcat作为web容器,每一个用户请求到达tomcat时,tomcat会为这次请求创建一个线程,同时有很多个请求时,tomcat的并发模式就是同时有很多个线程在平行的执行。
在webflux,默认使用的容器时netty,它的并发模型叫做eventloop, 只有少数几个线程来处理请求,如果在一个请求的处理过程中出现了阻塞,那么线程马上去处理别的请求,通过这种方式,可以让少数线程服务大量请求。
使用了webflux,我们基本可以告别java多线程编程了,基本可以丢掉等待通知,丢掉各种栅栏,丢掉synchronize,这些东西,因为底层的并发模型的变化,我们不用再去考虑了。
-
技术栈:响应式技术栈 VS Servlet技术栈
-
响应式编程(Reactive Programming)与 Reactor
响应式编程是一种编程范例,它是基于事件的,非阻塞的,异步的,并且以数据流处理为中心。在java中,响应式编程的主要类库有reactor,RxJava等,webflux是基于reactor的。
-
reactive-streams的规范
org.reactivestreams制定了reactive的规范,其中java的规范可见:github.com/reactive-st… reactor是这个规范的一种实现。在java的世界里,标准就是接口的定义,java reactive-streams的规范,可以用下面的类图来阐释:
应用场景
根据之前的分析,我们对响应式编程和webflux的特点有了一定的了解。webflux通过少量稳定线程,帮助我们更好地利用服务器,提高吞吐量,值得注意的是,我们并不能通过webflux来减小单个请求的响应时间,因为该IO阻塞的地方还是阻塞的,只是在阻塞的时候,工作线程不再等待而是去处理别的请求,所以才能提高吞吐量。
所以webflux很适合IO密集型的应用,比如spring-cloud-gateway就是基于webflux实现的,作为一个网关型应用,gateway是典型的网络IO密集型应用,每一个请求的处理过程中有大量的IO阻塞(等待各个微服务请求返回)。
在工作中,我们的很多服务都需要连接数据库,而传统的JDBC是阻塞式的,spring开发了R2DBC来支持非阻塞式的数据读取(spring.io/projects/sp… ),其对mysql的支持问世时间尚短,所以还没有得到广泛应用和验证,所以如果服务需要连接mysql数据库,建议还是使用传统的springmvc。
在实际应用中,我需要搭建一个服务,这个服务片会根据一张图片(朋友圈截图)上的文字做一些判断,它会对接收的每一个请求进行如下的处理:
对以上流程进行分析发现,这个服务属于IO密集型,跟几个长耗时的网络IO相比,CPU处理花费的时间就是毛毛雨了,且该服务不需要读取数据库,和webflux的应用场景非常契合,所以这个服务就选型了webflux来实现。
webflux中的基本概念
一等公民:Flux和Mono
Flux和Mono其实是reactor中的类,而webflux是基于reactor的。在reactor的官网上,对Flux和Mono的解释如下:
Flux
是一个发出(emit)0-N
个元素组成的异步序列的Publisher<T>
,可以被onComplete
信号或者onError
信号所终止。在响应流规范中存在三种给下游消费者调用的方法 onNext
, onComplete
, 和onError
。
相应的,Mono
是一个发出(emit)0-1
个元素的Publisher<T>
,可以被onComplete
信号或者onError
信号所终止。
看上去是有些抽象,但我们可以把Flux和Mono想象成一个水管,它包裹着我们实际要处理的元素,我们以流式编程(stream)的方式(类似java8的stream)对其中的元素做处理,Flux和Mono最主要的区别就是可以包裹的元素个数不同。
Flux和Mono支持很多个操作符(operator),这些操作符好像是流水线上的一个个工作台,每一个操作符加上我们自己在这个操作符上定义的行为,定义了这个工作台对元素进行的处理。
一个示例代码片段如下:
Flux和Mono支持的操作非常丰富,我们可以通过官方reference文档(projectreactor.io/docs/core/r… )来查询我们在不同的场景下该选取哪些操作符,比如我们怎么产生一个流,如何处理异常,如何过滤,如何进行流的合并等等。
然后我们可以通过reactor的api文档,查看相应操作符的详细说明,api文档以【弹珠图】的方式,为开发者进行了详细的说明,如下:
异步http: Webclient
Webclient是webflux为我们提供的异步的httpclient(对应springmvc中的feignclient),Webclient也像feignclient一样,使用起来非常方便。当我们说异步的时候,是指Webclient的返回是一个Mono。
webclient的使用可参考文档:docs.spring.io/spring-fram…
实际中的例子如下:
请求地址,请求方法的设置一目了然,我们还可以非常方便地设置fallback数据,如上图中红框中的那样(onErrorReturn),也可以很方便地做异常处理(onError), 其他的异常处理方式,比如dofinally, retry,可以参考reference文档:projectreactor.io/docs/core/r…
逆流而上,全流共享:Context
在springmvc中,我们会使用ThreadLocal来保存线程专有的数据,因为tomcat会为每一个请求创建一个线程,所以在这种场景下,线程专有等于请求专有,比如我们可以把登录用户的id存在ThreadLocal里,这样在请求的处理链路上,我们就不用把用户id沿着方法一次又一次地传递,需要用用户id的时候,直接从ThreadLocal里面拿就行了,在请求结束的时候,再把ThreadLocal中保存的数据清理掉,这样线程复用的时候,就不会读到之前的用户的信息。
在webflux中,面对保存【某次请求特有数据】这种需求的时候,我们是不能使用ThreadLocal的,因为一个请求可能是好几个线程来完成的,请求处理过程中的每一次阻塞都有可能导致线程的切换。Reactor提供了Context这个工具来帮我们实现这种需求,这里的Context,指的就是请求的上下文。
还有另外一种场景,我们可能会用到Context, 在传统的命令式编程中,我们会在一个方法中写很多条命令来操作数据,我们可能会获取好几种数据,然后进行一些处理和计算。在这个方法的范围内,我们可以获取当条命令前所有已经获取到的数据。而在webflux中,数据在一个个operator间流转,订阅者只能获取到上游传递下来的数据,假如有一些数据并不是下游的所有operator都要用,我们也需要把这些数据打包一层层传递到下游。
了解了Context的使用场景后,我们看下Context具体是怎样写入和读取的:
红款中是写入的例子,蓝框中是读取的例子,在实际使用过程中,写入和读取在流中的距离可能是比较远的,甚至可能散落在不同的类中,但只要是再一条流上,只要读取在写入的上游,就能读取到。
是的,你没有看错,读取是要在写入的上游,“没有订阅,一切都不会发生”,context写入的数据,只有上游能够读到。在实际中,我们可能要把context写入放置在离订阅最近的地方,这样,所有的上游处理才能读到。
一个实际中的例子如下:
迫不得已,重返阻塞:Schedulers
在webflux中,一般我们是不用管线程的调度的,基本可以告别java的高并发与多线程了,各种栅栏,各种同步工具我们都不用再去理会,因为webflux的并发模型帮我们解决了这些问题。比如下面的一段代码:
前三行代码其实是并发执行的,每一行都返回一个Mono流,从第四行开始,使用了mergeWith来合并这三个流,数据又汇聚为新的流,下游的operator再一个一个处理流过的元素。
如果我们不使用reactor的话,在java并发编程中,我们可以使用CompletableFuture来完成这种需求,但实现方案就会比较复杂,代码量也比较大,远不如上面的这种方式清晰。
请求的处理过程中,如果确实有一些同步的阻塞的处理不可绕过,我们可以使用reactor中的Schedulers,把我们的任务提交到特定的线程池中去。这样做的原因是,reactor是用少量的稳定线程来处理请求,如果我们用这些宝贵的线程去执行同步阻塞任务,那么后面的其他请求都不能被处理了,所以我们要用单独的线程池,单独的工作线程来处理这些同步阻塞的任务。
使用的范例如下:
学习资料list
-
非常好的一个视频入门概述:www.youtube.com/watch?v=Z5q…
-
spring的webflux的reference: docs.spring.io/spring-fram…
-
实际编码过程中,经常用来翻阅,寻找合适的操作符和特性的手册:projectreactor.io/docs/core/r…