当前位置:首页 > 开发 > 开源软件 > 正文

【Spark101】Scala Promise/Future在Spark中的应用

发表于: 2015-06-06   作者:bit1129   来源:转载   浏览:
摘要: Promise和Future是Scala用于异步调用并实现结果汇集的并发原语,Scala的Future同JUC里面的Future接口含义相同,Promise理解起来就有些绕。等有时间了再仔细的研究下Promise和Future的语义以及应用场景,具体参见Scala在线文档:http://docs.scala-lang.org/sips/completed/futures-promises.html

Promise和Future是Scala用于异步调用并实现结果汇集的并发原语,Scala的Future同JUC里面的Future接口含义相同,Promise理解起来就有些绕。等有时间了再仔细的研究下Promise和Future的语义以及应用场景,具体参见Scala在线文档:http://docs.scala-lang.org/sips/completed/futures-promises.html

 

如下代码来自于BlockTransferService的fetchBlockSync方法,因为只是拉取一个Block的数据,Spark在此处定义为同步获取,而不是异步获取。异步获取的实现是BlockTransferService的fetchBlocks方法,它可以批量获取多个Blocks,返回结果放于回调函数的ManageBuffer中了。

 

如下代码,首先定义了Promis类型的result变量,Promise将放入ManagedBuffer类型的数据,一旦放入,那么Promise.future将从等待结果的状态中返回。因此,Promise的语义可以理解为Promise会在某个时间点放入一个数据,而Promise.future的语义是等待这个值的放入,放入完成后future从阻塞等待的状态立即返回。

 

Promise数据的放入是通过Promise.success和Promise.failure操作实现的,分别表示放入了异步操作得到正确的结果和异步操作失败而放入失败的结果。

 

 

 

 

  /**
   * A special case of [[fetchBlocks]], as it fetches only one block and is blocking.
   *
   * It is also only available after [[init]] is invoked.
   */
  def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = {
    // A monitor for the thread to wait on.
    //创建Promise对象result,result.future将等待Promise对象写入数据
    val result = Promise[ManagedBuffer]()
    //通过fetchBlocks发起异步获取Block的请求,请求返回后根据调用结果调用BlockFetchingListener的onBlockFetchFailure或者onBlockFetchSuccess方法,在两个方法中
    ///为Promise变量写入请求返回的数据值,此后,result.future将从等待状态返回
    fetchBlocks(host, port, execId, Array(blockId),
      new BlockFetchingListener {
        override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = {
          result.failure(exception)
        }
        override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
          val ret = ByteBuffer.allocate(data.size.toInt)
          ret.put(data.nioByteBuffer())
          ret.flip()
          result.success(new NioManagedBuffer(ret))
        }
      })
    //Await.result是Scala并发框架提供的同步等待原语,它等待的事件是它的第一个参数,此处是result.future
    Await.result(result.future, Duration.Inf)
  }

 

从下面Await的result方法可以看到,result.future是一个Awaitable类型的实例,即Future实现了Awaitable,

    @throws(classOf[Exception])
    def result[T](awaitable: Awaitable[T], atMost: Duration): T =
      blocking(awaitable.result(atMost)(AwaitPermission))
  }

 

Awaitable接口的注释:

/**
 * An object that may eventually be completed with a result value of type `T` which may be
 * awaited using blocking methods.
 * 
 * The [[Await]] object provides methods that allow accessing the result of an `Awaitable`
 * by blocking the current thread until the `Awaitable` has been completed or a timeout has
 * occurred.
 */
trait Awaitable[+T] {

 

Future接口

 

/** The trait that represents futures.
 *
 *  Asynchronous computations that yield futures are created with the `future` call:
 *
 *  {{{
 *  val s = "Hello"
 *  val f: Future[String] = future {
 *    s + " future!"
 *  }
 *  f onSuccess {
 *    case msg => println(msg)
 *  }
 *  }}}
 *
 *  @author  Philipp Haller, Heather Miller, Aleksandar Prokopec, Viktor Klang
 *
 *  @define multipleCallbacks
 *  Multiple callbacks may be registered; there is no guarantee that they will be
 *  executed in a particular order.
 *
 *  @define caughtThrowables
 *  The future may contain a throwable object and this means that the future failed.
 *  Futures obtained through combinators have the same exception as the future they were obtained from.
 *  The following throwable objects are not contained in the future:
 *  - `Error` - errors are not contained within futures
 *  - `InterruptedException` - not contained within futures
 *  - all `scala.util.control.ControlThrowable` except `NonLocalReturnControl` - not contained within futures
 *
 *  Instead, the future is completed with a ExecutionException with one of the exceptions above
 *  as the cause.
 *  If a future is failed with a `scala.runtime.NonLocalReturnControl`,
 *  it is completed with a value from that throwable instead.
 *
 *  @define nonDeterministic
 *  Note: using this method yields nondeterministic dataflow programs.
 *
 *  @define forComprehensionExamples
 *  Example:
 *
 *  {{{
 *  val f = future { 5 }
 *  val g = future { 3 }
 *  val h = for {
 *    x: Int <- f // returns Future(5)
 *    y: Int <- g // returns Future(5)
 *  } yield x + y
 *  }}}
 *
 *  is translated to:
 *
 *  {{{
 *  f flatMap { (x: Int) => g map { (y: Int) => x + y } }
 *  }}}
 *
 * @define callbackInContext
 * The provided callback always runs in the provided implicit
 *`ExecutionContext`, though there is no guarantee that the
 * `execute()` method on the `ExecutionContext` will be called once
 * per callback or that `execute()` will be called in the current
 * thread. That is, the implementation may run multiple callbacks
 * in a batch within a single `execute()` and it may run
 * `execute()` either immediately or asynchronously.
 */
trait Future[+T] extends Awaitable[T] {

 

 

关于Promise的用法,http://stackoverflow.com/questions/13381134/what-are-the-use-cases-of-scala-concurrent-promise解释了基本含义:

The Promise and Future are complementary concepts. The Future is a value which will be retrieved, well, sometime in the future and you can do stuff with it when that event happens. It is, therefore, the read or out endpoint of a computation - it is something that you retrieve a value from.

A Promise is, by analogy(与此类似), the writing side of the computation. You create a promise which is the place where you'll put the result of the computation and from that promise you get a future that will be used to read the result that was put into the promise. When you'll complete a Promise, either by failure or success, you will trigger all the behavior which was attached to the associated Future.

Regarding your first question, how can it be that for a promise p we have p.future == p. You can imagine this like a single-item buffer - a container which is initially empty and you can afterwords store one value which will become its content forever. Now, depending on your point of view this is both a Promise and a Future. It is promise for someone who intends to write the value in the buffer. It is a future for someone who waits for that value to be put in the buffer.

 

Regarding the real-world use: Most of the time you won't deal with promises directly. If you'll use a library which performs asynchronous computation then you'll just work with the futures returned by the library's methods. Promises are, in this case, created by the library - you're just working with the reading end of what those methods do.

But if you need to implement your own asynchronous API you'll have to start working with them. Suppose you need to implement an async HTTP client on top of, lets say, Netty. Then your code will look somewhat like this

 

    def makeHTTPCall(request: Request): Future[Response] = {
        val p = Promise[Response]
        registerOnCompleteCallback(buffer => {
            val response = makeResponse(buffer)
            p success response
        })
        p.future
    }

 

 

【Spark101】Scala Promise/Future在Spark中的应用

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
我们先看一下变量界定的代码 package com.dt.scala.type_parameterization class Pair_NoPerfect[T
本文中,我们将首先讨论如何在本地机器上利用Spark进行简单分析。然后,将在入门级水平探索Spark,
Spark应用概念 Spark应用(Application)是用户提交的应用程序。执行模式又Local、Standalone、YARN
1,首先介绍官网网站 http://scala-lang.org/ 下载windows 的exe直接进行安装就行。 安装后有个本地
为了在IDEA中编写scala,今天安装配置学习了IDEA集成开发环境。IDEA确实很优秀,学会之后,用起来很
工作以及兴趣所致,开始了spark学习之旅,浏览网上大牛们的博客 文章,并且结合官网docs,刚开始云
计划: 阶段1: 精通Spark内核 阶段2: 精通千万级的项目 阶段3: 机器学习 JAVA本身不是伟大的语言
http://www.cnblogs.com/byrhuangqiang/p/4017725.html 为了在IDEA中编写scala,今天安装配置学习了
1,首先介绍官网网站 http://scala-lang.org/ 下载windows 的exe直接进行安装就行。 安装后有个本地
今天做一个项目,展现一个页面需要从服务器请求两次数据,太繁琐了,今天用promise规范了一下,只需
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号