001/*
002 * Copyright (C) 2006 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkNotNull;
020import static java.util.concurrent.TimeUnit.NANOSECONDS;
021
022import com.google.common.annotations.Beta;
023import com.google.common.base.Function;
024
025import java.lang.reflect.UndeclaredThrowableException;
026import java.util.List;
027import java.util.concurrent.BlockingQueue;
028import java.util.concurrent.CancellationException;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.ExecutionException;
031import java.util.concurrent.Executor;
032import java.util.concurrent.Executors;
033import java.util.concurrent.Future;
034import java.util.concurrent.LinkedBlockingQueue;
035import java.util.concurrent.ThreadFactory;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.TimeoutException;
038import java.util.concurrent.atomic.AtomicBoolean;
039
040import javax.annotation.Nullable;
041
042/**
043 * Static utility methods pertaining to the {@link Future} interface.
044 *
045 * @author Kevin Bourrillion
046 * @author Nishant Thakkar
047 * @author Sven Mawson
048 * @since 1
049 */
050@Beta
051public final class Futures {
052  private Futures() {}
053
054  /**
055   * Returns an uninterruptible view of a {@code Future}. If a thread is
056   * interrupted during an attempt to {@code get()} from the returned future, it
057   * continues to wait on the result until it is available or the timeout
058   * elapses, and only then re-interrupts the thread.
059   */
060  public static <V> UninterruptibleFuture<V> makeUninterruptible(
061      final Future<V> future) {
062    checkNotNull(future);
063    if (future instanceof UninterruptibleFuture<?>) {
064      return (UninterruptibleFuture<V>) future;
065    }
066    return new UninterruptibleFuture<V>() {
067      @Override
068      public boolean cancel(boolean mayInterruptIfRunning) {
069        return future.cancel(mayInterruptIfRunning);
070      }
071      @Override
072      public boolean isCancelled() {
073        return future.isCancelled();
074      }
075      @Override
076      public boolean isDone() {
077        return future.isDone();
078      }
079
080      @Override
081      public V get(long originalTimeout, TimeUnit originalUnit)
082          throws TimeoutException, ExecutionException {
083        boolean interrupted = false;
084        try {
085          long end = System.nanoTime() + originalUnit.toNanos(originalTimeout);
086          while (true) {
087            try {
088              // Future treats negative timeouts just like zero.
089              return future.get(end - System.nanoTime(), NANOSECONDS);
090            } catch (InterruptedException e) {
091              interrupted = true;
092            }
093          }
094        } finally {
095          if (interrupted) {
096            Thread.currentThread().interrupt();
097          }
098        }
099      }
100
101      @Override
102      public V get() throws ExecutionException {
103        boolean interrupted = false;
104        try {
105          while (true) {
106            try {
107              return future.get();
108            } catch (InterruptedException ignored) {
109              interrupted = true;
110            }
111          }
112        } finally {
113          if (interrupted) {
114            Thread.currentThread().interrupt();
115          }
116        }
117      }
118    };
119  }
120
121  /**
122   *
123   * <p>Creates a {@link ListenableFuture} out of a normal {@link Future}. The
124   * returned future will create a thread to wait for the source future to
125   * complete before executing the listeners.
126   *
127   * <p><b>Warning:</b> If the input future does not already implement {@link
128   * ListenableFuture}, the returned future will emulate {@link
129   * ListenableFuture#addListener} by taking a thread from an internal,
130   * unbounded pool at the first call to {@code addListener} and holding it
131   * until the future is {@linkplain Future#isDone() done}.
132   *
133   * <p>Callers who have a future that subclasses
134   * {@link java.util.concurrent.FutureTask} may want to instead subclass
135   * {@link ListenableFutureTask}, which adds the {@link ListenableFuture}
136   * functionality to the standard {@code FutureTask} implementation.
137   */
138  public static <V> ListenableFuture<V> makeListenable(
139      Future<V> future) {
140    if (future instanceof ListenableFuture<?>) {
141      return (ListenableFuture<V>) future;
142    }
143    return new ListenableFutureAdapter<V>(future);
144  }
145
146  static <V> ListenableFuture<V> makeListenable(
147      Future<V> future, Executor executor) {
148    checkNotNull(executor);
149    if (future instanceof ListenableFuture<?>) {
150      return (ListenableFuture<V>) future;
151    }
152    return new ListenableFutureAdapter<V>(future, executor);
153  }
154
155  /**
156   * Creates a {@link CheckedFuture} out of a normal {@link Future} and a
157   * {@link Function} that maps from {@link Exception} instances into the
158   * appropriate checked type.
159   *
160   * <p><b>Warning:</b> If the input future does not implement {@link
161   * ListenableFuture}, the returned future will emulate {@link
162   * ListenableFuture#addListener} by taking a thread from an internal,
163   * unbounded pool at the first call to {@code addListener} and holding it
164   * until the future is {@linkplain Future#isDone() done}.
165   *
166   * <p>The given mapping function will be applied to an
167   * {@link InterruptedException}, a {@link CancellationException}, or an
168   * {@link ExecutionException} with the actual cause of the exception.
169   * See {@link Future#get()} for details on the exceptions thrown.
170   */
171  public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
172      Future<V> future, Function<Exception, X> mapper) {
173    return new MappingCheckedFuture<V, X>(makeListenable(future), mapper);
174  }
175
176  /**
177   * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture}
178   * and a {@link Function} that maps from {@link Exception} instances into the
179   * appropriate checked type.
180   *
181   * <p>The given mapping function will be applied to an
182   * {@link InterruptedException}, a {@link CancellationException}, or an
183   * {@link ExecutionException} with the actual cause of the exception.
184   * See {@link Future#get()} for details on the exceptions thrown.
185   *
186   * @since 9 (source-compatible since release 1)
187   */
188  public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
189      ListenableFuture<V> future, Function<Exception, X> mapper) {
190    return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
191  }
192
193  /**
194   * Creates a {@code ListenableFuture} which has its value set immediately upon
195   * construction. The getters just return the value. This {@code Future} can't
196   * be canceled or timed out and its {@code isDone()} method always returns
197   * {@code true}.
198   */
199  public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
200    SettableFuture<V> future = SettableFuture.create();
201    future.set(value);
202    return future;
203  }
204
205  /**
206   * Returns a {@code CheckedFuture} which has its value set immediately upon
207   * construction.
208   *
209   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
210   * method always returns {@code true}. Calling {@code get()} or {@code
211   * checkedGet()} will immediately return the provided value.
212   */
213  public static <V, X extends Exception> CheckedFuture<V, X>
214      immediateCheckedFuture(@Nullable V value) {
215    SettableFuture<V> future = SettableFuture.create();
216    future.set(value);
217    return Futures.makeChecked(future, new Function<Exception, X>() {
218      @Override
219      public X apply(Exception e) {
220        throw new AssertionError("impossible");
221      }
222    });
223  }
224
225  /**
226   * Returns a {@code ListenableFuture} which has an exception set immediately
227   * upon construction.
228   *
229   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
230   * method always returns {@code true}. Calling {@code get()} will immediately
231   * throw the provided {@code Throwable} wrapped in an {@code
232   * ExecutionException}.
233   *
234   * @throws Error if the throwable is an {@link Error}.
235   */
236  public static <V> ListenableFuture<V> immediateFailedFuture(
237      Throwable throwable) {
238    checkNotNull(throwable);
239    SettableFuture<V> future = SettableFuture.create();
240    future.setException(throwable);
241    return future;
242  }
243
244  /**
245   * Returns a {@code CheckedFuture} which has an exception set immediately upon
246   * construction.
247   *
248   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
249   * method always returns {@code true}. Calling {@code get()} will immediately
250   * throw the provided {@code Throwable} wrapped in an {@code
251   * ExecutionException}, and calling {@code checkedGet()} will throw the
252   * provided exception itself.
253   *
254   * @throws Error if the throwable is an {@link Error}.
255   */
256  public static <V, X extends Exception> CheckedFuture<V, X>
257      immediateFailedCheckedFuture(final X exception) {
258    checkNotNull(exception);
259    return makeChecked(Futures.<V>immediateFailedFuture(exception),
260        new Function<Exception, X>() {
261          @Override
262          public X apply(Exception e) {
263            return exception;
264          }
265        });
266  }
267
268  /**
269   * Returns a new {@code ListenableFuture} whose result is asynchronously
270   * derived from the result of the given {@code Future}. More precisely, the
271   * returned {@code Future} takes its result from a {@code Future} produced by
272   * applying the given {@code Function} to the result of the original {@code
273   * Future}. Example:
274   *
275   * <pre>   {@code
276   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
277   *   Function<RowKey, ListenableFuture<QueryResult>> queryFunction =
278   *       new Function<RowKey, ListenableFuture<QueryResult>>() {
279   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
280   *           return dataService.read(rowKey);
281   *         }
282   *       };
283   *   ListenableFuture<QueryResult> queryFuture =
284   *       chain(queryFuture, queryFunction);
285   * }</pre>
286   *
287   * <p>Successful cancellation of either the input future or the result of
288   * function application will cause the returned future to be cancelled.
289   * Cancelling the returned future will succeed if it is currently running.
290   * In this case, attempts will be made to cancel the input future and the
291   * result of the function, however there is no guarantee of success.
292   *
293   * <p>TODO: Add a version that accepts a normal {@code Future}
294   *
295   * <p>The typical use for this method would be when a RPC call is dependent on
296   * the results of another RPC.  One would call the first RPC (input), create a
297   * function that calls another RPC based on input's result, and then call
298   * chain on input and that function to get a {@code ListenableFuture} of
299   * the result.
300   *
301   * @param input The future to chain
302   * @param function A function to chain the results of the provided future
303   *     to the results of the returned future.  This will be run in the thread
304   *     that notifies input it is complete.
305   * @return A future that holds result of the chain.
306   */
307  public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
308      Function<? super I, ? extends ListenableFuture<? extends O>> function) {
309    return chain(input, function, MoreExecutors.sameThreadExecutor());
310  }
311
312  /**
313   * Returns a new {@code ListenableFuture} whose result is asynchronously
314   * derived from the result of the given {@code Future}. More precisely, the
315   * returned {@code Future} takes its result from a {@code Future} produced by
316   * applying the given {@code Function} to the result of the original {@code
317   * Future}. Example:
318   *
319   * <pre>   {@code
320   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
321   *   Function<RowKey, ListenableFuture<QueryResult>> queryFunction =
322   *       new Function<RowKey, ListenableFuture<QueryResult>>() {
323   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
324   *           return dataService.read(rowKey);
325   *         }
326   *       };
327   *   ListenableFuture<QueryResult> queryFuture =
328   *       chain(queryFuture, queryFunction, executor);
329   * }</pre>
330   *
331   * <p>Successful cancellation of either the input future or the result of
332   * function application will cause the returned future to be cancelled.
333   * Cancelling the returned future will succeed if it is currently running.
334   * In this case, attempts will be made to cancel the input future and the
335   * result of the function, however there is no guarantee of success.
336   *
337   * <p>This version allows an arbitrary executor to be passed in for running
338   * the chained Function. When using {@link MoreExecutors#sameThreadExecutor},
339   * the thread chained Function executes in will be whichever thread set the
340   * result of the input Future, which may be the network thread in the case of
341   * RPC-based Futures.
342   *
343   * @param input The future to chain
344   * @param function A function to chain the results of the provided future
345   *     to the results of the returned future.
346   * @param exec Executor to run the function in.
347   * @return A future that holds result of the chain.
348   */
349  public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
350      Function<? super I, ? extends ListenableFuture<? extends O>> function,
351      Executor exec) {
352    ChainingListenableFuture<I, O> chain =
353        new ChainingListenableFuture<I, O>(function, input);
354    input.addListener(chain, exec);
355    return chain;
356  }
357
358  /**
359   * Returns a new {@code ListenableFuture} whose result is the product of
360   * applying the given {@code Function} to the result of the given {@code
361   * Future}. Example:
362   *
363   * <pre>   {@code
364   *   ListenableFuture<QueryResult> queryFuture = ...;
365   *   Function<QueryResult, List<Row>> rowsFunction =
366   *       new Function<QueryResult, List<Row>>() {
367   *         public List<Row> apply(QueryResult queryResult) {
368   *           return queryResult.getRows();
369   *         }
370   *       };
371   *   ListenableFuture<List<Row>> rowsFuture =
372   *       transform(queryFuture, rowsFunction);
373   * }</pre>
374   *
375   * <p>Successful cancellation of the input future will cause the returned
376   * future to be cancelled.  Cancelling the returned future will succeed if it
377   * is currently running.  In this case, an attempt will be made to cancel the
378   * input future, however there is no guarantee of success.
379   *
380   * <p>An example use of this method is to convert a serializable object
381   * returned from an RPC into a POJO.
382   *
383   * @param future The future to compose
384   * @param function A Function to compose the results of the provided future
385   *     to the results of the returned future.  This will be run in the thread
386   *     that notifies input it is complete.
387   * @return A future that holds result of the composition.
388   * @since 9 (in version 1 as {@code compose})
389   */
390  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future,
391      final Function<? super I, ? extends O> function) {
392    return transform(future, function, MoreExecutors.sameThreadExecutor());
393  }
394
395  /**
396   * Returns a new {@code ListenableFuture} whose result is the product of
397   * applying the given {@code Function} to the result of the given {@code
398   * Future}. Example:
399   *
400   * <pre>   {@code
401   *   ListenableFuture<QueryResult> queryFuture = ...;
402   *   Function<QueryResult, List<Row>> rowsFunction =
403   *       new Function<QueryResult, List<Row>>() {
404   *         public List<Row> apply(QueryResult queryResult) {
405   *           return queryResult.getRows();
406   *         }
407   *       };
408   *   ListenableFuture<List<Row>> rowsFuture =
409   *       transform(queryFuture, rowsFunction, executor);
410   * }</pre>
411   *
412   * <p>Successful cancellation of the input future will cause the returned
413   * future to be cancelled.  Cancelling the returned future will succeed if it
414   * is currently running.  In this case, an attempt will be made to cancel the
415   * input future, however there is no guarantee of success.
416   *
417   * <p>An example use of this method is to convert a serializable object
418   * returned from an RPC into a POJO.
419   *
420   * <p>This version allows an arbitrary executor to be passed in for running
421   * the chained Function. When using {@link MoreExecutors#sameThreadExecutor},
422   * the thread chained Function executes in will be whichever thread set the
423   * result of the input Future, which may be the network thread in the case of
424   * RPC-based Futures.
425   *
426   * @param future The future to compose
427   * @param function A Function to compose the results of the provided future
428   *     to the results of the returned future.
429   * @param exec Executor to run the function in.
430   * @return A future that holds result of the composition.
431   * @since 9 (in version 2 as {@code compose})
432   */
433  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future,
434      final Function<? super I, ? extends O> function, Executor exec) {
435    checkNotNull(function);
436    Function<I, ListenableFuture<O>> wrapperFunction
437        = new Function<I, ListenableFuture<O>>() {
438            @Override public ListenableFuture<O> apply(I input) {
439              O output = function.apply(input);
440              return immediateFuture(output);
441            }
442        };
443    return chain(future, wrapperFunction, exec);
444  }
445
446  /**
447   * Returns a new {@code Future} whose result is the product of applying the
448   * given {@code Function} to the result of the given {@code Future}. Example:
449   *
450   * <pre>   {@code
451   *   Future<QueryResult> queryFuture = ...;
452   *   Function<QueryResult, List<Row>> rowsFunction =
453   *       new Function<QueryResult, List<Row>>() {
454   *         public List<Row> apply(QueryResult queryResult) {
455   *           return queryResult.getRows();
456   *         }
457   *       };
458   *   Future<List<Row>> rowsFuture = transform(queryFuture, rowsFunction);
459   * }</pre>
460   *
461   * <p>Each call to {@code Future<O>.get(*)} results in a call to
462   * {@code Future<I>.get(*)}, but {@code function} is only applied once, so it
463   * is assumed that {@code Future<I>.get(*)} is idempotent.
464   *
465   * <p>When calling {@link Future#get(long, TimeUnit)} on the returned
466   * future, the timeout only applies to the future passed in to this method.
467   * Any additional time taken by applying {@code function} is not considered.
468   * (Exception: If the input future is a {@link ListenableFuture}, timeouts
469   * will be strictly enforced.)
470   *
471   * @param future The future to compose
472   * @param function A Function to compose the results of the provided future
473   *     to the results of the returned future.  This will be run in the thread
474   *     that calls one of the varieties of {@code get()}.
475   * @return A future that computes result of the composition.
476   * @since 9 (in version 1 as {@code compose})
477   */
478  public static <I, O> Future<O> transform(final Future<I> future,
479      final Function<? super I, ? extends O> function) {
480    if (future instanceof ListenableFuture) {
481      return transform((ListenableFuture<I>) future, function);
482    }
483    checkNotNull(future);
484    checkNotNull(function);
485    return new Future<O>() {
486
487      /*
488       * Concurrency detail:
489       *
490       * <p>To preserve the idempotency of calls to this.get(*) calls to the
491       * function are only applied once. A lock is required to prevent multiple
492       * applications of the function. The calls to future.get(*) are performed
493       * outside the lock, as is required to prevent calls to
494       * get(long, TimeUnit) to persist beyond their timeout.
495       *
496       * <p>Calls to future.get(*) on every call to this.get(*) also provide
497       * the cancellation behavior for this.
498       *
499       * <p>(Consider: in thread A, call get(), in thread B call get(long,
500       * TimeUnit). Thread B may have to wait for Thread A to finish, which
501       * would be unacceptable.)
502       *
503       * <p>Note that each call to Future<O>.get(*) results in a call to
504       * Future<I>.get(*), but the function is only applied once, so
505       * Future<I>.get(*) is assumed to be idempotent.
506       */
507
508      private final Object lock = new Object();
509      private boolean set = false;
510      private O value = null;
511      private ExecutionException exception = null;
512
513      @Override
514      public O get() throws InterruptedException, ExecutionException {
515        return apply(future.get());
516      }
517
518      @Override
519      public O get(long timeout, TimeUnit unit) throws InterruptedException,
520          ExecutionException, TimeoutException {
521        return apply(future.get(timeout, unit));
522      }
523
524      private O apply(I raw) throws ExecutionException {
525        synchronized (lock) {
526          if (!set) {
527            try {
528              value = function.apply(raw);
529            } catch (RuntimeException e) {
530              exception = new ExecutionException(e);
531            } catch (Error e) {
532              exception = new ExecutionException(e);
533            }
534            set = true;
535          }
536
537          if (exception != null) {
538            throw exception;
539          }
540          return value;
541        }
542      }
543
544      @Override
545      public boolean cancel(boolean mayInterruptIfRunning) {
546        return future.cancel(mayInterruptIfRunning);
547      }
548
549      @Override
550      public boolean isCancelled() {
551        return future.isCancelled();
552      }
553
554      @Override
555      public boolean isDone() {
556        return future.isDone();
557      }
558    };
559  }
560
561  /**
562   * An implementation of {@code ListenableFuture} that also implements
563   * {@code Runnable} so that it can be used to nest ListenableFutures.
564   * Once the passed-in {@code ListenableFuture} is complete, it calls the
565   * passed-in {@code Function} to generate the result.
566   *
567   * <p>If the function throws any checked exceptions, they should be wrapped
568   * in a {@code UndeclaredThrowableException} so that this class can get
569   * access to the cause.
570   */
571  private static class ChainingListenableFuture<I, O>
572      extends AbstractListenableFuture<O> implements Runnable {
573
574    private Function<? super I, ? extends ListenableFuture<? extends O>>
575        function;
576    private ListenableFuture<? extends I> inputFuture;
577    private volatile ListenableFuture<? extends O> outputFuture;
578    private final BlockingQueue<Boolean> mayInterruptIfRunningChannel =
579        new LinkedBlockingQueue<Boolean>(1);
580    private final CountDownLatch outputCreated = new CountDownLatch(1);
581
582    private ChainingListenableFuture(
583        Function<? super I, ? extends ListenableFuture<? extends O>> function,
584        ListenableFuture<? extends I> inputFuture) {
585      this.function = checkNotNull(function);
586      this.inputFuture = checkNotNull(inputFuture);
587    }
588
589    /**
590     * Delegate the get() to the input and output futures, in case
591     * their implementations defer starting computation until their
592     * own get() is invoked.
593     */
594    @Override
595    public O get() throws InterruptedException, ExecutionException {
596      if (!isDone()) {
597        // Invoking get on the inputFuture will ensure our own run()
598        // method below is invoked as a listener when inputFuture sets
599        // its value.  Therefore when get() returns we should then see
600        // the outputFuture be created.
601        ListenableFuture<? extends I> inputFuture = this.inputFuture;
602        if (inputFuture != null) {
603          inputFuture.get();
604        }
605
606        // If our listener was scheduled to run on an executor we may
607        // need to wait for our listener to finish running before the
608        // outputFuture has been constructed by the function.
609        outputCreated.await();
610
611        // Like above with the inputFuture, we have a listener on
612        // the outputFuture that will set our own value when its
613        // value is set.  Invoking get will ensure the output can
614        // complete and invoke our listener, so that we can later
615        // get the result.
616        ListenableFuture<? extends O> outputFuture = this.outputFuture;
617        if (outputFuture != null) {
618          outputFuture.get();
619        }
620      }
621      return super.get();
622    }
623
624    /**
625     * Delegate the get() to the input and output futures, in case
626     * their implementations defer starting computation until their
627     * own get() is invoked.
628     */
629    @Override
630    public O get(long timeout, TimeUnit unit) throws TimeoutException,
631        ExecutionException, InterruptedException {
632      if (!isDone()) {
633        // Use a single time unit so we can decrease remaining timeout
634        // as we wait for various phases to complete.
635        if (unit != NANOSECONDS) {
636          timeout = NANOSECONDS.convert(timeout, unit);
637          unit = NANOSECONDS;
638        }
639
640        // Invoking get on the inputFuture will ensure our own run()
641        // method below is invoked as a listener when inputFuture sets
642        // its value.  Therefore when get() returns we should then see
643        // the outputFuture be created.
644        ListenableFuture<? extends I> inputFuture = this.inputFuture;
645        if (inputFuture != null) {
646          long start = System.nanoTime();
647          inputFuture.get(timeout, unit);
648          timeout -= Math.max(0, System.nanoTime() - start);
649        }
650
651        // If our listener was scheduled to run on an executor we may
652        // need to wait for our listener to finish running before the
653        // outputFuture has been constructed by the function.
654        long start = System.nanoTime();
655        if (!outputCreated.await(timeout, unit)) {
656          throw new TimeoutException();
657        }
658        timeout -= Math.max(0, System.nanoTime() - start);
659
660        // Like above with the inputFuture, we have a listener on
661        // the outputFuture that will set our own value when its
662        // value is set.  Invoking get will ensure the output can
663        // complete and invoke our listener, so that we can later
664        // get the result.
665        ListenableFuture<? extends O> outputFuture = this.outputFuture;
666        if (outputFuture != null) {
667          outputFuture.get(timeout, unit);
668        }
669      }
670      return super.get(timeout, unit);
671    }
672
673    @Override
674    public boolean cancel(boolean mayInterruptIfRunning) {
675      if (cancel()) {
676        try {
677          // This should never block since only one thread is allowed to cancel
678          // this Future.
679          mayInterruptIfRunningChannel.put(mayInterruptIfRunning);
680        } catch (InterruptedException ignored) {
681          Thread.currentThread().interrupt();
682        }
683        cancel(inputFuture, mayInterruptIfRunning);
684        cancel(outputFuture, mayInterruptIfRunning);
685        return true;
686      }
687      return false;
688    }
689
690    private void cancel(@Nullable Future<?> future,
691        boolean mayInterruptIfRunning) {
692      if (future != null) {
693        future.cancel(mayInterruptIfRunning);
694      }
695    }
696
697    @Override
698    public void run() {
699      try {
700        I sourceResult;
701        try {
702          sourceResult = makeUninterruptible(inputFuture).get();
703        } catch (CancellationException e) {
704          // Cancel this future and return.
705          cancel();
706          return;
707        } catch (ExecutionException e) {
708          // Set the cause of the exception as this future's exception
709          setException(e.getCause());
710          return;
711        }
712
713        final ListenableFuture<? extends O> outputFuture = this.outputFuture =
714            function.apply(sourceResult);
715        if (isCancelled()) {
716          // Handles the case where cancel was called while the function was
717          // being applied.
718          try {
719            // There is a gap in cancel(boolean) between calling cancel() and
720            // storing the value of mayInterruptIfRunning, so this thread needs
721            // to block, waiting for that value.
722            outputFuture.cancel(mayInterruptIfRunningChannel.take());
723          } catch (InterruptedException ignored) {
724            Thread.currentThread().interrupt();
725          }
726          this.outputFuture = null;
727          return;
728        }
729        outputFuture.addListener(new Runnable() {
730            @Override
731            public void run() {
732              try {
733                // Here it would have been nice to have had an
734                // UninterruptibleListenableFuture, but we don't want to start a
735                // combinatorial explosion of interfaces, so we have to make do.
736                set(makeUninterruptible(outputFuture).get());
737              } catch (CancellationException e) {
738                // Cancel this future and return.
739                cancel();
740                return;
741              } catch (ExecutionException e) {
742                // Set the cause of the exception as this future's exception
743                setException(e.getCause());
744              } finally {
745                // Don't pin inputs beyond completion
746                ChainingListenableFuture.this.outputFuture = null;
747              }
748            }
749          }, MoreExecutors.sameThreadExecutor());
750      } catch (UndeclaredThrowableException e) {
751        // Set the cause of the exception as this future's exception
752        setException(e.getCause());
753      } catch (RuntimeException e) {
754        // This exception is irrelevant in this thread, but useful for the
755        // client
756        setException(e);
757      } catch (Error e) {
758        // Propagate errors up ASAP - our superclass will rethrow the error
759        setException(e);
760      } finally {
761        // Don't pin inputs beyond completion
762        function = null;
763        inputFuture = null;
764        // Allow our get routines to examine outputFuture now.
765        outputCreated.countDown();
766      }
767    }
768  }
769
770  /**
771   * A checked future that uses a function to map from exceptions to the
772   * appropriate checked type.
773   */
774  private static class MappingCheckedFuture<V, X extends Exception> extends
775      AbstractCheckedFuture<V, X> {
776
777    final Function<Exception, X> mapper;
778
779    MappingCheckedFuture(ListenableFuture<V> delegate,
780        Function<Exception, X> mapper) {
781      super(delegate);
782
783      this.mapper = checkNotNull(mapper);
784    }
785
786    @Override
787    protected X mapException(Exception e) {
788      return mapper.apply(e);
789    }
790  }
791
792  /**
793   * An adapter to turn a {@link Future} into a {@link ListenableFuture}.  This
794   * will wait on the future to finish, and when it completes, run the
795   * listeners.  This implementation will wait on the source future
796   * indefinitely, so if the source future never completes, the adapter will
797   * never complete either.
798   *
799   * <p>If the delegate future is interrupted or throws an unexpected unchecked
800   * exception, the listeners will not be invoked.
801   */
802  private static class ListenableFutureAdapter<V> extends ForwardingFuture<V>
803      implements ListenableFuture<V> {
804
805    private static final ThreadFactory threadFactory =
806        new ThreadFactoryBuilder()
807            .setNameFormat("ListenableFutureAdapter-thread-%d")
808            .build();
809    private static final Executor defaultAdapterExecutor =
810        Executors.newCachedThreadPool(threadFactory);
811
812    private final Executor adapterExecutor;
813
814    // The execution list to hold our listeners.
815    private final ExecutionList executionList = new ExecutionList();
816
817    // This allows us to only start up a thread waiting on the delegate future
818    // when the first listener is added.
819    private final AtomicBoolean hasListeners = new AtomicBoolean(false);
820
821    // The delegate future.
822    private final Future<V> delegate;
823
824    ListenableFutureAdapter(Future<V> delegate) {
825      this(delegate, defaultAdapterExecutor);
826    }
827
828    ListenableFutureAdapter(Future<V> delegate, Executor adapterExecutor) {
829      this.delegate = checkNotNull(delegate);
830      this.adapterExecutor = checkNotNull(adapterExecutor);
831    }
832
833    @Override
834    protected Future<V> delegate() {
835      return delegate;
836    }
837
838    @Override
839    public void addListener(Runnable listener, Executor exec) {
840      executionList.add(listener, exec);
841
842      // When a listener is first added, we run a task that will wait for
843      // the delegate to finish, and when it is done will run the listeners.
844      if (hasListeners.compareAndSet(false, true)) {
845        if (delegate.isDone()) {
846          // If the delegate is already done, run the execution list
847          // immediately on the current thread.
848          executionList.run();
849          return;
850        }
851
852        adapterExecutor.execute(new Runnable() {
853          @Override
854          public void run() {
855            try {
856              delegate.get();
857            } catch (Error e) {
858              throw e;
859            } catch (InterruptedException e) {
860              // This thread was interrupted.  This should never happen, so we
861              // throw an IllegalStateException.
862              Thread.currentThread().interrupt();
863              throw new IllegalStateException("Adapter thread interrupted!", e);
864            } catch (Throwable e) {
865              // ExecutionException / CancellationException / RuntimeException
866              // The task is done, run the listeners.
867            }
868            executionList.run();
869          }
870        });
871      }
872    }
873  }
874}