当前位置:首页 > 开发 > 开源软件 > 正文

【Spark七十六】Spark计算结果存到MySQL

发表于: 2015-02-21   作者:bit1129   来源:转载   浏览:
摘要: package spark.examples.db import java.sql.{PreparedStatement, Connection, DriverManager} import com.mysql.jdbc.Driver import org.apache.spark.{SparkContext, SparkConf} object SparkMySQLInteg
package spark.examples.db

import java.sql.{PreparedStatement, Connection, DriverManager}

import com.mysql.jdbc.Driver
import org.apache.spark.{SparkContext, SparkConf}

object SparkMySQLIntegration {

  case class Person(name: String, age: Int)

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SparkRDDCount").setMaster("local");
    val sc = new SparkContext(conf);
    val data = sc.parallelize(List(("Tom", 31), ("Jack", 22), ("Mary", 25)))
    def func(iter: Iterator[(String, Int)]): Unit = {
//      Class.forName("com.mysql.jdbc.Driver ")
      var conn:Connection = null
      val d :Driver = null
      var pstmt:PreparedStatement = null
      try {
        val url="jdbc:mysql://localhost:3306/person";
        val user="root";
        val password=""
        //在forPartition函数内打开连接,这样连接将在worker上打开
        conn = DriverManager.getConnection(url, user, password)
        while (iter.hasNext) {
          val item = iter.next()
          println(item._1 + "," + item._2)
          val sql = "insert into TBL_PERSON(name, age) values (?, ?)";
          pstmt = conn.prepareStatement(sql);
          pstmt.setString(1, item._1)
          pstmt.setInt(2, item._2)
          pstmt.executeUpdate();
        }
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        if (pstmt != null) {
          pstmt.close()
        }
        if (conn != null) {
          conn.close()
        }
      }
    }
    data.foreachPartition(func);
  }

}

 

这个代码遇到了两个坑,

1. 按照Java程序员使用JDBC的习惯,首先通过Class.forName("com.mysql.jdbc.Driver ")注册MySQL的JDBC驱动,但是在Scala中却不需要这么做,这么做还出错,包ClassNotFoundExeception(但是com.mysql.jdbc.Driver明明在classpath上)

所以代码中添加了注释

2. 在本地运行这个代码时,反反复复报错说sql语句的(?,?)附近有语法错误,反反复复的看也没看出来哪里有错,后来发现原来是pstmt.executeUpdate();写成了pstmt.executeUpdate(sql);如此严重的编译错,Intellij Idea竟然编译不报错!!!

 

 

Spark RDD存入MySQL等存储系统最佳实践

将Spark的RDD写入数据存储系统,不管是关系型数据库如MySQL,还是NoSQL,如MongoDB,HBase,都面临着比较大的存储压力,因为每个RDD的每个partition的数据量可能非常大,因为必须节省有限的存储服务器连接,如下是一些最佳实践:

 

  • You can write your own custom writer and call a transform on your RDD to write each element to a database of your choice, but there's a lot of ways to write something that looks like it would work, but does not work well in a distributed environment. Here are some things to watch out for:
  • A common naive mistake is to open a connection on the Spark driver program, and then try to use that connection on the Spark workers. The connection should be opened on the Spark worker, such as by calling forEachPartition and opening the connection inside that function.
  • Use partitioning to control the parallelism for writing to your data storage. Your data storage may not support too many concurrent connections.
  • Use batching for writing out multiple objects at a time if batching is optimal for your data storage.
  • Make sure your write mechanism is resilient to failures.
  • Writing out a very large dataset can take a long time, which increases the chance something can go wrong - a network failure, etc.
  • Consider utilizing a static pool of database connections on your Spark workers.
  • If you are writing to a sharded data storage, partition your RDD to match your sharding strategy. That way each of your Spark workers only connects to one database shard, rather than each Spark worker connecting to every database shard.
  • Be cautious when writing out so much data, and make sure you understand the distributed nature of Spark!

 

**上面提到了batch操作,batch应该是一个节省连接资源非常有效的手段,将多个更新或者插入操作组成一个batch,使用一个连接将数据传送到存储系统引擎,关注下MySQL和MongoDB的batch操作**

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

【Spark七十六】Spark计算结果存到MySQL

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
1 spark
环境: Hadoop版本:Apache Hadoop2.7.1 Spark版本:Apache Spark1.4.1 核心代码: 测试数据: Java
在前面几篇博客里,介绍了Spark的伪分布式安装,以及使用Spark Shell进行交互式操作,本篇博客主要
在前面几篇博客里,介绍了Spark的伪分布式安装,以及使用Spark Shell进行交互式操作,本篇博客主要
Spark Streaming uses a “micro-batch” architecture, where the streaming computation is treat
http://book.51cto.com/art/201408/448416.htm 一、如何实现多台机器的ssh无密码登录 当我们在配置
引入 一般来说,分布式数据集的容错性有两种方式:数据检查点和记录数据的更新。 面向大规模数据分
MapReduce中的Shuffle 在MapReduce框架中,shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Re
Spark专用名词 RDD —- resillient distributed dataset 弹性分布式数据集 Operation —- 作用于RDD
开发Spark WordCount的步骤 下载并配置Scala2.11.4 下载Scala版本的Eclipse,简称Scala IDE 下载Spa
什么是RDD Spark是围绕着RDD(Resilient Distributed Dataset,弹性分布式数据集)建立起来的,也就是
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号