Netty 源码中大量使用了异步编程,从代码实现角度看就是大量使用了线程池和 Future。
熟悉 Java 5 的同学一定对 Future 不陌生。简单来说就是其代表了一个异步任务,任务将在未来某个时刻完成,而 Future 这个接口就是用来提供例如获取接口、查看任务状态等功能。
Netty 扩展了 Java 5 引入的 Future 机制。从下面的类图我们可以看到相关类的关系:
Netty 的 Future 接口
需要注意的是,上面类图中有两个 Future,最上面的是 java.util.concurrent.Future,而其下面的则是 io.netty.util.concurrent.Future。
JDK 的 Future 对象,该接口的方法如下:
// 取消异步操作 boolean cancel(boolean mayInterruptIfRunning); // 异步操作是否取消 boolean isCancelled(); // 异步操作是否完成,正常终止、异常、取消都是完成 boolean isDone(); // 阻塞直到取得异步操作结果 V get() throws InterruptedException, ExecutionException; // 同上,但最长阻塞时间为timeout V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
接口中只有 isDone() 方法判断一个异步操作是否完成,但是对于完成的定义过于模糊,JDK 文档指出正常终止、抛出异常、用户取消都会使 isDone() 方法返回真。在我们的使用中,我们极有可能是对这三种情况分别处理,而 JDK 这样的设计不能满足我们的需求。
Netty 扩展了 JDK 的 Future 接口,扩展的方法如下:
// 异步操作完成且正常终止 boolean isSuccess(); // 异步操作是否可以取消 boolean isCancellable(); // 异步操作失败的原因 Throwable cause(); // 添加一个监听者,异步操作完成时回调,类比javascript的回调函数 Future<V> addListener( GenericFutureListener<? extends Future<? super V>> listener); Future<V> removeListener( GenericFutureListener<? extends Future<? super V>> listener); // 阻塞直到异步操作完成 Future<V> await() throws InterruptedException; // 同上,但异步操作失败时抛出异常 Future<V> sync() throws InterruptedException; // 非阻塞地返回异步结果,如果尚未完成返回null V getNow();
可知,Future 对象有两种状态尚未完成和已完成,其中已完成又有三种状态:成功、失败、用户取消。
Future 接口中的方法都是 getter 方法而没有 setter 方法,也就是说这样实现的 Future 子类的状态是不可变的,如果我们想要变化,Netty 提供的解决方法是:使用可写的 Future 即 Promise。
Netty 的 Promise
Promise 是特殊的可写 Future。Promise 在继承 Future 的基础之上进行了扩展,用来设置 IO 操作的结果。
当 Netty 进行 IO 操作的时候,会创建一个 Promise 对象,当操作完成或者失败的时候就会对 Promise 进行结果设置。
Promise<V> setSuccess(V result); boolean trySuccess(V result); Promise<V> setFailure(Throwable cause); boolean tryFailure(Throwable cause); boolean setUncancellable();
Netty 提供了一个 Promise 的默认实现 DefaultPromise。主要是 setSuccess 方法和 await 方法的实现
// 标记异步操作结果为成功,如果已被设置(不管成功还是失败)则抛出异常IllegalStateException Promise<V> setSuccess(V result); // 同上,只是结果已被设置时返回False boolean trySuccess(V result); Promise<V> setFailure(Throwable cause); boolean tryFailure(Throwable cause); // 设置结果为不可取消,结果已被取消返回False boolean setUncancellable();
Promise 接口继承自 Future 接口,它提供的 setter 方法与常见的 setter 方法大为不同。Promise 从 Uncompleted 到 Completed 的状态转变有且只能有一次,也就是说 setSuccess 和 setFailure 方法最多只会成功一个,此外,在 setSuccess 和 setFailure 方法中会通知注册到其上的监听者。
一个简单实现
class Person extends Thread { BlockingQueue<Runnable> taskQueue; //任务队列 public Person(String name) { super(name); taskQueue = new LinkedBlockingQueue<>(); } @Override public void run() { while(true) { //无限循环, 不断从任务队列取任务 try { Runnable task = taskQueue.take(); task.run(); } catch (InterruptedException e) { e.printStackTrace(); } } } public void submit(Runnable task) { //将任务提交到任务队列中去 taskQueue.offer(task); } }
做数学题的例子
void main() { final Person wang = new Person("wang"); final Person li = new Person("li"); li.start(); //启动小王 wang.start(); //启动小李 wang.submit(new Runnable() { //提交一个简单的题 @Override public void run() { System.out.println( Thread.currentThread().getName() + "1. 这是一道简单的题"); } }); wang.submit(new Runnable() { //提交一个复杂的题 @Override public void run() { li.submit(new Runnable() { //将复杂的题交给li来做 @Override public void run() { System.out.println( Thread.currentThread().getName() + " 2. 这是一道复杂的题"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } wang.submit(new Runnable() { //做完之后将结果作为Task返回给wang @Override public void run() { System.out.println( Thread.currentThread().getName() + "复杂题执行结果"); } }); } }); } }); wang.submit(new Runnable() { //提交一个简单的题 @Override public void run() { System.out.println( `Thread.currentThread().getName() + " 3. 这是一道简单的题"); } }); }
执行结果是
wang 1. 这是一道简单的题 wang 3. 这是一道简单的题 li 2. 这是一道复杂的题 wang 复杂题执行完毕
Netty 中的实现
final DefaultEventExecutor wang = new DefaultEventExecutor(); final DefaultEventExecutor li = new DefaultEventExecutor(); wang.execute(new Runnable() { @Override public void run() { System.out.println( Thread.currentThread().getName() + " 1. 这是一道简单的题"); } }); wang.execute(new Runnable() { @Override public void run() { final Promise<Integer> promise = wang.newPromise(); promise.addListener(new GenericFutureListener<Future<? super Integer>>() { @Override public void operationComplete(Future<? super Integer> future) throws Exception { System.out.println(Thread.currentThread().getName() + "复杂题执行结果"); } }); li.execute(new Runnable() { @Override public void run() { System.out.println( Thread.currentThread().getName() + " 2. 这是一道复杂的题"); promise.setSuccess(10); } }); } }); wang.execute(new Runnable() { @Override public void run() { System.out.println( Thread.currentThread().getName() + " 3. 这是一道简单的题"); } });
执行结果是
defaultEventExecutor-1-1 1. 这是一道简单的题 defaultEventExecutor-1-1 3. 这是一道简单的题 defaultEventExecutor-3-1 2. 这是一道复杂的题 defaultEventExecutor-1-1 复杂题执行结果
看起来和简单实现中的代码差不多, DefaultEventExecutor可以简单的看做拥有一个队列的线程。与简单实现不同的是, 小李执行完任务后通知小王的方式。
在 Netty 中 Promise 代码一个可写的异步任务结果,以上代码的含义是:
生成一个 promise,为该 promise 注册一个 listener,当任务执行完后回调该 listener。
在另一个线程中执行一个异步任务,执行完后,将 promise 设置为成功,回调 listener,该 listener 在异步任务提交者线程中执行。
一种误用
为了实现以上问题,还可以像下面这样写。
wang.submit(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " 1. 这是一道简单的题"); } }); wang.submit(new Runnable() { @Override public void run() { Future<String> result = li.submit(new Callable<String>() { @Override public String call() throws Exception { for(int i = 0; i <= 10000000; i++){ for(int j = 0; j <= 1000000; j++) { ; } } System.out.println( Thread.currentThread().getName() + " 2. 这是一道复杂的题"); return null; } }); result.addListener(new GenericFutureListener<Future<? super String>>() { @Override public void operationComplete( Future<? super String> future) throws Exception { System.out.println( Thread.currentThread().getName() + "3. 复杂题执行结果"); } }); } }); wang.submit(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " 3. 这是一道简单的题"); } });
执行结果是:
defaultEventExecutor-1-1 1. 这是一道简单的题 defaultEventExecutor-3-1 2. 这是一道复杂的题 defaultEventExecutor-1-1 3. 这是一道简单的题 defaultEventExecutor-3-1 3. 复杂题执行结果
这样写似乎更简单,但运行一下会发现,listener 的执行却是由小李来处理,按理说,小王交给小李一个任务,小李做完之后将结果返回给小王,应该是小王处理才对啊,可为什么是小李来处理呢?
查看源码可知,DefaultEventExecutor.submit 方法将一个 Callable 包装成一个 DefaultPromise,并且将执行者作为 DefaultPromise 的 exectutor,为什么要这样做呢?
Netty 的异步回调机制需要提交者必须有一个 TaskQueue 才行,而这里 wang 并不一定含有一个 TaskQueue,为了防止因为提交者没有 TaskQueue 而出错,所以只能赋值为执行者,而使用 newPromise 就没有问题,因为 newPromise 是 DefaultEventExecutor 的接口,而 DefaultEventExecutor 肯定有一个 TaskQueue。
Netty 源码中对异步回调的使用
在 Netty 中,ChannelHandlerContext 的 write(msg, promise) 和 bind(address, promise) 等操作都是一个异步操作,为了使该操作不阻塞当前 executor 的执行,一般这样使用:
Promise result = ctx.newPromise(ctx.executor()); ctx.write(msg, result); result.addListener(listener);
Reference:
http://www.jianshu.com/p/a06da3256f0c
http://lingnanlu.github.io/2016/08/16/netty-asyc-callback
转载请并标注: “本文转载自 linkedkeeper.com (文/张松然)” ©著作权归作者所有