当前位置:首页 > 开发 > 开源软件 > 正文

【Spark八十五】Spark Streaming分析结果落地到MySQL

发表于: 2015-04-02   作者:bit1129   来源:转载   浏览:
摘要: 几点总结: 1. DStream.foreachRDD是一个Output Operation,类似于RDD的action,会触发Job的提交。DStream.foreachRDD是数据落地很常用的方法 2. 获取MySQL Connection的操作应该放在foreachRDD的参数(是一个RDD[T]=>Unit的函数类型),这样,当foreachRDD方法在每个Worker上执行时,

几点总结:

1. DStream.foreachRDD是一个Output Operation,类似于RDD的action,会触发Job的提交。DStream.foreachRDD是数据落地很常用的方法

2. 获取MySQL Connection的操作应该放在foreachRDD的参数(是一个RDD[T]=>Unit的函数类型),这样,当foreachRDD方法在每个Worker上执行时,连接是在Worker上创建。如果Connection的获取放到dstream.foreachRDD之前,那么

Connection的获取动作将发生在Driver端,然后通过序列化的方式发送到各个Worker(Connection的序列化通常是无法正确序列化的)

3. Connection的获取在foreachRDD的参数中获取,同时还要在遍历RDD之前获取(调用RDD的foreach方法前获取),如果遍历中获取,那么RDD中的每个record都要打开关闭连接,这对于数据库连接资源将是极大的考验

4. 业务逻辑处理定义在func中,它是在foreachRDD的方法参数体中定义的,如果把func的定义放到外面,即Driver中,貌似也是可以的,Spark会对计算方法通过Broadcast进行广播到各个计算节点。

 

 

package spark.examples.streaming

import java.sql.{PreparedStatement, Connection, DriverManager}
import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._


//No need to call Class.forName("com.mysql.jdbc.Driver") to register Driver?

object SparkStreamingForPartition {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("NetCatWordCount")
    conf.setMaster("local[3]")
    val ssc = new StreamingContext(conf, Seconds(5))
    //This dstream object represents the stream of data that will be received from the data
    //server. Each record in this DStream is a line of text
    //The DStream is a collection of RDD, which makes the method foreachRDD reasonable
    val dstream = ssc.socketTextStream("192.168.26.140", 9999)
    dstream.foreachRDD(rdd => {
      //embedded function
      def func(records: Iterator[String]) {
        var conn: Connection = null
        var stmt: PreparedStatement = null
        try {
          val url = "jdbc:mysql://192.168.26.140:3306/person";
          val user = "root";
          val password = ""
          conn = DriverManager.getConnection(url, user, password)
          records.flatMap(_.split(" ")).foreach(word => {
            val sql = "insert into TBL_WORDS(word) values (?)";
            stmt = conn.prepareStatement(sql);
            stmt.setString(1, word)
            stmt.executeUpdate();
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          if (stmt != null) {
            stmt.close()
          }
          if (conn != null) {
            conn.close()
          }
        }
      }

      val repartitionedRDD = rdd.repartition(3)
      repartitionedRDD.foreachPartition(func)
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

 

【Spark八十五】Spark Streaming分析结果落地到MySQL

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
1.全局类图 蓝色背景运行在Worker节点,绿色运行在Driver节点 2. Worker Receiver 接收消息时序(以
Spark Streaming与Storm的对比分析 一、Spark Streaming与Storm的对比 二、Spark Streaming与Storm
Spark Streaming uses a “micro-batch” architecture, where the streaming computation is treat
作者:周志湖 主要内容 Spark Streaming与Kafka版的WordCount示例(一) Spark Streaming与Kafka版
本节主要内容 本节部分内容来自官方文档:http://spark.apache.org/docs/latest/streaming-programm
本节主要内容 本节部分内容来自官方文档:http://spark.apache.org/docs/latest/streaming-programm
作者:周志湖 微信号:zhouzhihubeyond 本节主要内容 Window Operation 入门案例 1. Window Operati
作者:周志湖 微信号:zhouzhihubeyond 主要内容 本节内容基于官方文档:http://spark.apache.org/d
目标 1.了解需求 网上有很多这种入门的demo,主要是一个实时计算手机点击率,在redis中存入手机的un
了解Spark Streaming之前,建议先了解Spark,入门博文Spark初探 定义 Spark Streaming is an extens
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号