001/*
002 * Copyright (C) 2007 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkNotNull;
020
021import com.google.common.annotations.Beta;
022
023import java.util.concurrent.CancellationException;
024import java.util.concurrent.ExecutionException;
025import java.util.concurrent.Future;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.TimeoutException;
028import java.util.concurrent.locks.AbstractQueuedSynchronizer;
029
030import javax.annotation.Nullable;
031
032/**
033 * <p>An abstract implementation of the {@link Future} interface.  This class
034 * is an abstraction of {@link java.util.concurrent.FutureTask} to support use
035 * for tasks other than {@link Runnable}s.  It uses an
036 * {@link AbstractQueuedSynchronizer} to deal with concurrency issues and
037 * guarantee thread safety.  It could be used as a base class to
038 * {@code FutureTask}, or any other implementor of the {@code Future} interface.
039 *
040 * <p>This class implements all methods in {@code Future}.  Subclasses should
041 * provide a way to set the result of the computation through the protected
042 * methods {@link #set(Object)}, {@link #setException(Throwable)}, or
043 * {@link #cancel()}.  If subclasses want to implement cancellation they can
044 * override the {@link #cancel(boolean)} method with a real implementation, the
045 * default implementation doesn't support cancellation.
046 *
047 * <p>The state changing methods all return a boolean indicating success or
048 * failure in changing the future's state.  Valid states are running,
049 * completed, failed, or cancelled.  Because this class does not implement
050 * cancellation it is left to the subclass to distinguish between created
051 * and running tasks.
052 *
053 * @author Sven Mawson
054 * @since 1
055 */
056@Beta
057public abstract class AbstractFuture<V> implements Future<V> {
058
059  /** Synchronization control for AbstractFutures. */
060  private final Sync<V> sync = new Sync<V>();
061
062  /*
063   * Blocks until either the task completes or the timeout expires.  Uses the
064   * sync blocking-with-timeout support provided by AQS.
065   */
066  @Override
067  public V get(long timeout, TimeUnit unit) throws InterruptedException,
068      TimeoutException, ExecutionException {
069    return sync.get(unit.toNanos(timeout));
070  }
071
072  /*
073   * Blocks until the task completes or we get interrupted. Uses the
074   * interruptible blocking support provided by AQS.
075   */
076  @Override
077  public V get() throws InterruptedException, ExecutionException {
078    return sync.get();
079  }
080
081  /*
082   * Checks if the sync is not in the running state.
083   */
084  @Override
085  public boolean isDone() {
086    return sync.isDone();
087  }
088
089  /*
090   * Checks if the sync is in the cancelled state.
091   */
092  @Override
093  public boolean isCancelled() {
094    return sync.isCancelled();
095  }
096
097  /*
098   * Default implementation of cancel that never cancels the future.
099   * Subclasses should override this to implement cancellation if desired.
100   */
101  @Override
102  public boolean cancel(boolean mayInterruptIfRunning) {
103    return false;
104  }
105
106  /**
107   * Subclasses should invoke this method to set the result of the computation
108   * to {@code value}.  This will set the state of the future to
109   * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
110   * state was successfully changed.
111   *
112   * @param value the value that was the result of the task.
113   * @return true if the state was successfully changed.
114   */
115  protected boolean set(@Nullable V value) {
116    boolean result = sync.set(value);
117    if (result) {
118      done();
119    }
120    return result;
121  }
122
123  /**
124   * Subclasses should invoke this method to set the result of the computation
125   * to an error, {@code throwable}.  This will set the state of the future to
126   * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
127   * state was successfully changed.
128   *
129   * @param throwable the exception that the task failed with.
130   * @return true if the state was successfully changed.
131   * @throws Error if the throwable was an {@link Error}.
132   */
133  protected boolean setException(Throwable throwable) {
134    boolean result = sync.setException(checkNotNull(throwable));
135    if (result) {
136      done();
137    }
138
139    // If it's an Error, we want to make sure it reaches the top of the
140    // call stack, so we rethrow it.
141    if (throwable instanceof Error) {
142      throw (Error) throwable;
143    }
144    return result;
145  }
146
147  /**
148   * Subclasses should invoke this method to mark the future as cancelled.
149   * This will set the state of the future to {@link
150   * AbstractFuture.Sync#CANCELLED} and call {@link #done()} if the state was
151   * successfully changed.
152   *
153   * @return true if the state was successfully changed.
154   */
155  protected final boolean cancel() {
156    boolean result = sync.cancel();
157    if (result) {
158      done();
159    }
160    return result;
161  }
162
163  /*
164   * Called by the success, failed, or cancelled methods to indicate that the
165   * value is now available and the latch can be released.  Subclasses can
166   * use this method to deal with any actions that should be undertaken when
167   * the task has completed.
168   */
169  protected void done() {
170    // Default implementation does nothing.
171  }
172
173  /**
174   * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a
175   * private subclass to hold the synchronizer.  This synchronizer is used to
176   * implement the blocking and waiting calls as well as to handle state changes
177   * in a thread-safe manner.  The current state of the future is held in the
178   * Sync state, and the lock is released whenever the state changes to either
179   * {@link #COMPLETED} or {@link #CANCELLED}.
180   *
181   * <p>To avoid races between threads doing release and acquire, we transition
182   * to the final state in two steps.  One thread will successfully CAS from
183   * RUNNING to COMPLETING, that thread will then set the result of the
184   * computation, and only then transition to COMPLETED or CANCELLED.
185   *
186   * <p>We don't use the integer argument passed between acquire methods so we
187   * pass around a -1 everywhere.
188   */
189  static final class Sync<V> extends AbstractQueuedSynchronizer {
190
191    private static final long serialVersionUID = 0L;
192
193    /* Valid states. */
194    static final int RUNNING = 0;
195    static final int COMPLETING = 1;
196    static final int COMPLETED = 2;
197    static final int CANCELLED = 4;
198
199    private V value;
200    private Throwable exception;
201
202    /*
203     * Acquisition succeeds if the future is done, otherwise it fails.
204     */
205    @Override
206    protected int tryAcquireShared(int ignored) {
207      if (isDone()) {
208        return 1;
209      }
210      return -1;
211    }
212
213    /*
214     * We always allow a release to go through, this means the state has been
215     * successfully changed and the result is available.
216     */
217    @Override
218    protected boolean tryReleaseShared(int finalState) {
219      setState(finalState);
220      return true;
221    }
222
223    /**
224     * Blocks until the task is complete or the timeout expires.  Throws a
225     * {@link TimeoutException} if the timer expires, otherwise behaves like
226     * {@link #get()}.
227     */
228    V get(long nanos) throws TimeoutException, CancellationException,
229        ExecutionException, InterruptedException {
230
231      // Attempt to acquire the shared lock with a timeout.
232      if (!tryAcquireSharedNanos(-1, nanos)) {
233        throw new TimeoutException("Timeout waiting for task.");
234      }
235
236      return getValue();
237    }
238
239    /**
240     * Blocks until {@link #complete(Object, Throwable, int)} has been
241     * successfully called.  Throws a {@link CancellationException} if the task
242     * was cancelled, or a {@link ExecutionException} if the task completed with
243     * an error.
244     */
245    V get() throws CancellationException, ExecutionException,
246        InterruptedException {
247
248      // Acquire the shared lock allowing interruption.
249      acquireSharedInterruptibly(-1);
250      return getValue();
251    }
252
253    /**
254     * Implementation of the actual value retrieval.  Will return the value
255     * on success, an exception on failure, a cancellation on cancellation, or
256     * an illegal state if the synchronizer is in an invalid state.
257     */
258    private V getValue() throws CancellationException, ExecutionException {
259      int state = getState();
260      switch (state) {
261        case COMPLETED:
262          if (exception != null) {
263            throw new ExecutionException(exception);
264          } else {
265            return value;
266          }
267
268        case CANCELLED:
269          throw new CancellationException("Task was cancelled.");
270
271        default:
272          throw new IllegalStateException(
273              "Error, synchronizer in invalid state: " + state);
274      }
275    }
276
277    /**
278     * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}.
279     */
280    boolean isDone() {
281      return (getState() & (COMPLETED | CANCELLED)) != 0;
282    }
283
284    /**
285     * Checks if the state is {@link #CANCELLED}.
286     */
287    boolean isCancelled() {
288      return getState() == CANCELLED;
289    }
290
291    /**
292     * Transition to the COMPLETED state and set the value.
293     */
294    boolean set(@Nullable V v) {
295      return complete(v, null, COMPLETED);
296    }
297
298    /**
299     * Transition to the COMPLETED state and set the exception.
300     */
301    boolean setException(Throwable t) {
302      return complete(null, t, COMPLETED);
303    }
304
305    /**
306     * Transition to the CANCELLED state.
307     */
308    boolean cancel() {
309      return complete(null, null, CANCELLED);
310    }
311
312    /**
313     * Implementation of completing a task.  Either {@code v} or {@code t} will
314     * be set but not both.  The {@code finalState} is the state to change to
315     * from {@link #RUNNING}.  If the state is not in the RUNNING state we
316     * return {@code false}.
317     *
318     * @param v the value to set as the result of the computation.
319     * @param t the exception to set as the result of the computation.
320     * @param finalState the state to transition to.
321     */
322    private boolean complete(@Nullable V v, Throwable t, int finalState) {
323      if (compareAndSetState(RUNNING, COMPLETING)) {
324        this.value = v;
325        this.exception = t;
326        releaseShared(finalState);
327        return true;
328      }
329
330      // The state was not RUNNING, so there are no valid transitions.
331      return false;
332    }
333  }
334}