当前位置:首页 > 开发 > 开源软件 > 正文

Spark通过CLI写入Cassandra

发表于: 2013-11-16   作者:cjcrobin   来源:转载   浏览次数:
摘要: 上一篇(隔得实在有点远)讲到了通过使用Cassandra原生的CLI接口将数据读入了Spark的RDD中,在这篇中,我们将了解如何将数据通过Spark的RDD写入到Cassandra中。   与读取相同的步骤,我们一开始需要初始化SparkContext,以及使用的Cassandra实例的地址,端口,keyspace,columnfamily和partitioner。如下 &nbs

上一篇(隔得实在有点远)讲到了通过使用Cassandra原生的CLI接口将数据读入了Spark的RDD中,在这篇中,我们将了解如何将数据通过Spark的RDD写入到Cassandra中。

 

与读取相同的步骤,我们一开始需要初始化SparkContext,以及使用的Cassandra实例的地址,端口,keyspace,columnfamily和partitioner。如下

 

 

val sc = new SparkContext("local[3]", "casDemo")
val job = new Job()
job.setOutputFormatClass(classOf[ColumnFamilyOutputFormat])
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost")
ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160")
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "casDemo", "WordCount")//casDemo是keyspace,和sc中的casDemo没有任何关系
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner")

 

这样子,最基本的配置就已经配置好了。接下来就可以通过RDD来进行写入了。在这里我们假设已经存在一个RDD[(String, Int)],变量名为counts。其中String代表的是一个单词,比如“China”,Int代表的是出现的次数,如1,2.....。然后可以进行如下操作

       counts.map{
            case (word, count) => {
                //store the StringType
                val colWord = new org.apache.cassandra.thrift.Column()//cli通过Thrift访问,所以我们也需要libthrift这个jar包
                colWord.setName(ByteBufferUtil.bytes("word"))//column名字为word
                colWord.setValue(ByteBufferUtil.bytes(word ))//此column值为word
                colWord.setTimestamp(System.currentTimeMillis())

                //store the LongType
                val colCount = new org.apache.cassandra.thrift.Column()
                colCount.setName(ByteBufferUtil.bytes("count"))
                colCount.setValue(ByteBufferUtil.bytes(count.toLong))
                colCount.setTimestamp(System.currentTimeMillis())

                //store the BooleanType
                val colmorethan = new org.apache.cassandra.thrift.Column()
                colmorethan.setName(ByteBufferUtil.bytes("larger5"))
                if(count>5) {
                    colmorethan.setValue(TRUE)
                } else {
                    colmorethan.setValue(FALSE)
                }
                colmorethan.setTimestamp(System.currentTimeMillis())
                
                //store the FloatType
                val colPercentage = new org.apache.cassandra.thrift.Column()
                colPercentage.setName(ByteBufferUtil.bytes("percentage"))
                colPercentage.setValue(ByteBufferUtil.bytes(1.22.toFloat))
                colPercentage.setTimestamp(System.currentTimeMillis())

                //store the DoubleType
                val colRate = new org.apache.cassandra.thrift.Column()
                colRate.setName(ByteBufferUtil.bytes("rate"))
                colRate.setValue(ByteBufferUtil.bytes(1.888888))
                colRate.setTimestamp(System.currentTimeMillis())

                val outputkey = ByteBufferUtil.bytes(word + "-COUNT-" + System.currentTimeMillis)
                val mutations: java.util.List[Mutation] = new Mutation() :: new Mutation() :: new Mutation() :: new Mutation() :: new Mutation() ::  Nil

                mutations.get(0).setColumn_or_supercolumn(new ColumnOrSuperColumn())
                mutations.get(0).column_or_supercolumn.setColumn(colWord)

                mutations.get(1).setColumn_or_supercolumn(new ColumnOrSuperColumn())
                mutations.get(1).column_or_supercolumn.setColumn(colCount)

                mutations.get(2).setColumn_or_supercolumn(new ColumnOrSuperColumn())
                mutations.get(2).column_or_supercolumn.setColumn(colmorethan)

                mutations.get(3).setColumn_or_supercolumn(new ColumnOrSuperColumn())
                mutations.get(3).column_or_supercolumn.setColumn(colPercentage)

                mutations.get(4).setColumn_or_supercolumn(new ColumnOrSuperColumn())
                mutations.get(4).column_or_supercolumn.setColumn(colRate)

                (outputkey, mutations)
            }
        }.saveAsNewAPIHadoopFile("casDemo", classOf[ByteBuffer], classOf[List[Mutation]],
            classOf[ColumnFamilyOutputFormat], job.getConfiguration)

 

这里使用了RDD的map函数以及scala的case class,这样word就对应着单词,ercount就对应着出现次数。在Cassandra中column name以及timestamp都是必须的,column name是可选的,但是这里我们全部都填上了,主要是将每个基本类型都是用了下。其中比较tricky的是bool类型,因为RDD会分出很多partition在各个节点上,每个节点也需要和Cassandra交流,所以需要将所有的数据都转化成Byte类型。在这里我们ByteBufferUitl工具包完成了这个操作。但是false和true到底怎么表达呢?这个问题我也是尝试了很多次才搞清楚的。如下

    val TRUE = new Array[Byte](1)
        TRUE(0) = 1.toByte
    val FALSE = new Array[Byte](1)//default to 0

 

0就是false,非0就是ture,这个还是比较无可厚非的。以为需要的是Byte类型所以将1变成了Byte类型也是可以理解的,唯一让我比较困惑的是为什么需要搞出一个长度为1的数组。因为时间过得比较久了,我也不记得我当初是怎么搞出来的了。Okay,回到上一段代码。所有这些都设置好了之后,根据Cassandra提供的ColumnFamilyOutputFormat接口我们需要提供给一个二元组,这个二元组中第一个参数是ByteBuffer类型的,outputkey正好是,第二个参数应该是List[Mutation]类型的,正好我们设置的mutations满足要求,所以就返回(outputkey, mutations)。接下来就是使用hadoop的newapi完成任务了,不过这里需要注意的就是,为了使用这个函数,需要导入SparkContext._中的implicit函数,将使用PairFunctionRDD中的功能。

 

这就是基本的通过CLI接口来写入Cassandra。在下一篇中,会讲解使用Cassandra新的CQL接口进行读写。

Have a nice weekend!

 

 

Spark通过CLI写入Cassandra

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
1、数据库创建 参考接上文cassandra入门 http://www.cnblogs.com/piaolingzxh/p/4197833.html 2、下
脚本: import com.csvreader.CsvWriter; String NewDataPath=bsh.args[0]; NewDataPath=NewDataPat
  SerialPort类用于控制串行端口文件资源。提供同步 I/O 和事件驱动的 I/O、对管脚和中断状态的访
1、到官网下载压缩包。 http://cassandra.apache.org/download/ 我下载的是最新的 apache-cassandra
关于NoSql数据存储方式的优劣这里不再讨论,可以关注 http://www.iteye.com/topic/524977 这里简要介
Cassandra是有FaceBook开发并开源的一个NoSQL分布式存储。目前是Apache基金会下面的一级项目,它的
第1章引言 1.1编写目的 介绍apache cassandra。 1.2非关系型数据库—Cassandra 1.2.1简介 Cassandra
本文转自我的javaEye博客,链接http://kylinsoong.javaeye.com/blog/731208 最近, Cassandra 绝对
最近, Cassandra 绝对是一个比较前端的话题,随着互联网的不断发展, Distributed Database 的广受
在前面几篇博客里,介绍了Spark的伪分布式安装,以及使用Spark Shell进行交互式操作,本篇博客主要
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号