Guava ListenableFuture
简介
ListenableFuture继承自java.util.concurrent.Future,旨在降低异步开发的难度。
JDK中定义的Future表示一个异步操作,ListenableFuture在Future的基础上增加了一个addListener方法,可以在计算完成时触发回调逻辑,通过这样的扩展可以支持更多原生Future不支持的逻辑。ListenableFuture中新增的方法addListener(Runnable, Executor),可以在指定Future上注册一个Runnable对象,这个Runnable会在Future完成的时候通过指定的Executor执行。
此外Guava还提供了一个方法用于在Future上注册回调,使用Futures类:Futures.addCallback(ListenableFuture
ListenableFuture的创建
Guava对齐JDK中的ExecutorService.submit(Runnable)提供了ListeningExecutorService,ListeningExecutorService会在ExecutorService返回Future的地方返回一个ListenableFuture。将一个ExecutorService转换成ListeningExecutorService只需要执行MoreExecutorService.listeningDecorator(ExecutorService)。官方文档中的示例如下:
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
public Explosion call() {
return pushBigRedButton();
}
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
// we want this handler to run immediately after we push the big red button!
public void onSuccess(Explosion explosion) {
walkAwayFrom(explosion);
}
public void onFailure(Throwable thrown) {
battleArchNemesis(); // escaped the explosion!
}
});
如果你此前使用的FutureTask,Guava提供ListenableFutureTask.create(Callable
假如你希望直接设置Future的结果而非通过一系列计算得出结果,可以继承AbstractFuture实现,或者直接使用SettableFuture。
如果你不得不从JDK的Future转换成ListenableFuture,唯一的方式是使用JdkFutureAdapters.listenInPoolThread(Future),但是这种方式会更重一些,因为它会针对每一个添加的Listener创建新的线程来与其绑定,所以如果可能的话,Guava强烈建议通过改造代码直接返回ListenableFuture。
ListenableFuture的使用
推荐使用ListenableFuture的最重要的原因就是通过ListenableFuture,可以进行一系列复杂的链式异步操作。官方文档的示例如下:
ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
AsyncFunction<RowKey, QueryResult> queryFunction =
new AsyncFunction<RowKey, QueryResult>() {
public ListenableFuture<QueryResult> apply(RowKey rowKey) {
return dataService.read(rowKey);
}
};
ListenableFuture<QueryResult> queryFuture =
Futures.transformAsync(rowKeyFuture, queryFunction, queryExecutor);
ListenableFuture可以支持很多Future不支持的操作,每个Listener可以和不同的Executor绑定,一个ListenableFuture可以有多个操作等待。
ListenableFuture自身的语义很好的表现了fan-out操作(当一个操作开始的时候其他的一些操作也会尽快开始执行),它在完成时会触发所有的回调逻辑。要支持fan-in操作,只需要少许额外的工作,具体参考Futures.allAsList(ListenableFuture…)
方法 | 说明 |
---|---|
transformAsync(ListenableFuture, AsyncFunction<A, B>, Executor) | 返回一个ListenableFuture,它的结果是给定的AsyncFunction利用给定的ListenableFuture的结果作为参数计算的结果。这个AsyncFunction会通过给定的Executor执行 |
transform(ListenableFuture, Function<A, B>, Executor) | 返回一个ListenableFuture,它的结果是给定的Function利用给定的ListenableFuture的结果作为参数计算的结果。这个Function会通过给定的Executor执行 |
allAsList(Iterable<ListenableFuture |
返回一个ListenableFuture,它的结果是一个包含入参的所有ListenableFuture的结果的List,如果参数中的任意一个ListenableFuture失败或者被取消,这个ListenableFuture就失败或者被取消。 |
successfulAsList(Iterable<ListenableFuture |
返回一个ListenableFuture,它的结果是一个包含入参的所有ListenableFuture的结果的List,如果参数中的任意一个ListenableFuture失败或者被取消,List中对应的元素为null。 |