【Spark101】Scala Promise/Future在Spark中的应用












   * A special case of [[fetchBlocks]], as it fetches only one block and is blocking.
   * It is also only available after [[init]] is invoked.
  def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = {
    // A monitor for the thread to wait on.
    val result = Promise[ManagedBuffer]()
    fetchBlocks(host, port, execId, Array(blockId),
      new BlockFetchingListener {
        override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = {
        override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
          val ret = ByteBuffer.allocate(data.size.toInt)
          result.success(new NioManagedBuffer(ret))
    Await.result(result.future, Duration.Inf)



    def result[T](awaitable: Awaitable[T], atMost: Duration): T =



 * An object that may eventually be completed with a result value of type `T` which may be
 * awaited using blocking methods.
 * The [[Await]] object provides methods that allow accessing the result of an `Awaitable`
 * by blocking the current thread until the `Awaitable` has been completed or a timeout has
 * occurred.
trait Awaitable[+T] {




/** The trait that represents futures.
 *  Asynchronous computations that yield futures are created with the `future` call:
 *  {{{
 *  val s = "Hello"
 *  val f: Future[String] = future {
 *    s + " future!"
 *  }
 *  f onSuccess {
 *    case msg => println(msg)
 *  }
 *  }}}
 *  @author  Philipp Haller, Heather Miller, Aleksandar Prokopec, Viktor Klang
 *  @define multipleCallbacks
 *  Multiple callbacks may be registered; there is no guarantee that they will be
 *  executed in a particular order.
 *  @define caughtThrowables
 *  The future may contain a throwable object and this means that the future failed.
 *  Futures obtained through combinators have the same exception as the future they were obtained from.
 *  The following throwable objects are not contained in the future:
 *  - `Error` - errors are not contained within futures
 *  - `InterruptedException` - not contained within futures
 *  - all `scala.util.control.ControlThrowable` except `NonLocalReturnControl` - not contained within futures
 *  Instead, the future is completed with a ExecutionException with one of the exceptions above
 *  as the cause.
 *  If a future is failed with a `scala.runtime.NonLocalReturnControl`,
 *  it is completed with a value from that throwable instead.
 *  @define nonDeterministic
 *  Note: using this method yields nondeterministic dataflow programs.
 *  @define forComprehensionExamples
 *  Example:
 *  {{{
 *  val f = future { 5 }
 *  val g = future { 3 }
 *  val h = for {
 *    x: Int <- f // returns Future(5)
 *    y: Int <- g // returns Future(5)
 *  } yield x + y
 *  }}}
 *  is translated to:
 *  {{{
 *  f flatMap { (x: Int) => g map { (y: Int) => x + y } }
 *  }}}
 * @define callbackInContext
 * The provided callback always runs in the provided implicit
 *`ExecutionContext`, though there is no guarantee that the
 * `execute()` method on the `ExecutionContext` will be called once
 * per callback or that `execute()` will be called in the current
 * thread. That is, the implementation may run multiple callbacks
 * in a batch within a single `execute()` and it may run
 * `execute()` either immediately or asynchronously.
trait Future[+T] extends Awaitable[T] {




The Promise and Future are complementary concepts. The Future is a value which will be retrieved, well, sometime in the future and you can do stuff with it when that event happens. It is, therefore, the read or out endpoint of a computation - it is something that you retrieve a value from.

A Promise is, by analogy(与此类似), the writing side of the computation. You create a promise which is the place where you'll put the result of the computation and from that promise you get a future that will be used to read the result that was put into the promise. When you'll complete a Promise, either by failure or success, you will trigger all the behavior which was attached to the associated Future.

Regarding your first question, how can it be that for a promise p we have p.future == p. You can imagine this like a single-item buffer - a container which is initially empty and you can afterwords store one value which will become its content forever. Now, depending on your point of view this is both a Promise and a Future. It is promise for someone who intends to write the value in the buffer. It is a future for someone who waits for that value to be put in the buffer.


Regarding the real-world use: Most of the time you won't deal with promises directly. If you'll use a library which performs asynchronous computation then you'll just work with the futures returned by the library's methods. Promises are, in this case, created by the library - you're just working with the reading end of what those methods do.

But if you need to implement your own asynchronous API you'll have to start working with them. Suppose you need to implement an async HTTP client on top of, lets say, Netty. Then your code will look somewhat like this


    def makeHTTPCall(request: Request): Future[Response] = {
        val p = Promise[Response]
        registerOnCompleteCallback(buffer => {
            val response = makeResponse(buffer)
            p success response