当前位置:首页 > 开发 > 开源软件 > 正文

在Spark上使用CLI读取Cassandra数据

发表于: 2013-10-12   作者:cjcrobin   来源:转载   浏览次数:
摘要: 最近在研究将Spark架设到Cassandra之上。发现这方面的信息比较少,在学习的过程中也遇到了不少问题,因此在此记录下,也和大家分享。此例为最经典的WordCount示例。 首先我先说下我所使用的各种环境和版本。由于Spark和Cassandra更新较快,如果之后版本有异可能运行不能成功需要一些微调。 暂时使用的是Windows 7, 之后会转到Linux平台,但是这个影响不大。使用的是S

最近在研究将Spark架设到Cassandra之上。发现这方面的信息比较少,在学习的过程中也遇到了不少问题,因此在此记录下,也和大家分享。此例为最经典的WordCount示例。

首先我先说下我所使用的各种环境和版本。由于Spark和Cassandra更新较快,如果之后版本有异可能运行不能成功需要一些微调。

暂时使用的是Windows 7, 之后会转到Linux平台,但是这个影响不大。使用的是Scala2.9.3,Spark 0.8, Cassandra 1.2.10,sbt 0.13.0,Java 7。

 

首先需要我们自己生成下Spark的jar包。这个需要我们运行sbt命令来得到。转到Spark所在的目录然后运行

sbt\sbt assembly(如果是Linux的话,应该是sbt/sbt assembly)。运行结束后,可以在spark\assembly\target\scala-2.9.3下面发现一个名字类似于spark-assembly-0.8.0-*.jar的包。我们需要将这个包加入到编译路径中。

 

其次我们需要在Cassandra中插入一些数据。首先需要在命令行中运行cassandra/bin下面的cassandra命令来开启服务。然后运行cassandra-cli,这样我们就能够输入我们需要的数据。本文结尾可以找到此例使用的数据示例。

 

然后我们就可以开始了。

val sc = new SparkContext("local[3]", "casDemo")

 新建一个Spark上下文对象,第一个参数是将要连接到的Cluster地址,这里我们仅仅使用localhost来运行,所以可以简单设置为local[*],*为1,2,3之类的数字。第二个只是一个显示参数,可以随意。

val job = new Job()

 新建一个Hadoop job。因为Spark是没有提供直接的API访问Cassandra,但是Spark是建于Hadoop之上,Cassandra提供了访问Hadoop的接口所以我们需要先创建一个Hadoop的job来连接它两。

job.setInputFormatClass(classOf[ColumnFamilyInputFormat])

 设置input的类,这个没有什么其他可选项,这是Cassandra默认的jar包中提供的接口。

 

ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost")
ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160")
ConfigHelper.setInputColumnFamily(job.getConfiguration(), "casDemo", "Words")
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner")

通过Cassandra提供的静态类ConfigHelper来设置相对应的一些参数。“casDemo"是这个例子使用的keyspace,words是column family。9160是Cassandra默认使用的端口。最后一个是设置多台机器运行时使用的Partitioner hash算法。有三个值可以选择,分别是org.apache.cassandra.dht.Murmur3Partitioner,org.apache.cassandra.dht.RandomPartitioner和ByteOrderedPartitioner。第三个已经不推荐使用了。在这里我们使用第一个。

 

此外需要说明的是,还有其他参数可以设置比如slice predicate之类的,这里略过了,仅仅介绍了最简单的设置。

 

然后我们就能够去创建Spark独有的RDD对象了,并使用它来完成我们要的工作。

val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
            classOf[ColumnFamilyInputFormat],
            classOf[ByteBuffer],//key
            classOf[SortedMap[ByteBuffer, IColumn]]) //value

 可以看到这里创建了一个RDD对象,第一个参数就是将之间我们配置好的参数,第二个就是之前提到的Cassandra提供的接口,第三和第四个参数其他没有其他的可选项,这两个参数是被ColumnFamilyInputFormat所限制的。这个其实大家看SparkAPI就能了解到,这里不多说。

 

val paraRdd = casRdd flatMap {
    case (key, value) => {
        value.filter(v => {
            ByteBufferUtil.string(v._1).compareTo("paragraph") == 0
        }).map(v => ByteBufferUtil.string(v._2.value()))
    }
}

这里就是运行mapper方法了,就和hadoop中的mapper一个概念。需要说明的是,这里的key就是一个ByteBuffer对象,value就是一个SortedMap[ByteBuffer, IColumn],有没有觉得很熟悉,是的,这两个就是之前创建RDD设置的最后两个参数。这里我做的就是过滤掉colomn name不等于paragraph的,然后将colomn是paragraph的值由Icolumn变成ByteBuffer类型。

 

val counts = paraRdd.flatMap(p => p.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)

最后就是reduce方法了。 先用空格将段落打散成单词,然后将单个单词word转化成元组(word, 1)。最后统计每个word出现次数。

 

这样我们就完成了简单的WordCount方法。

我们可以通过以下代码打印出结果来观看。

counts.collect() foreach {
    case (word, count) => println(word + ":" + count)
}

 

这里介绍了通过Spark读取Cassandra数据并进行处理。下一节,会介绍写入数据。

在Spark上使用CLI读取Cassandra数据

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
本文以Cassandra 0.8中的hadoop_word_count为例: https://svn.apache.org/repos/asf/cassandra/bra
1、Apache spark是一个为速度和通用目标设计的集群计算平台。 从速度的角度看,Spark从流行的MapRed
1.安装Node.js,到https://nodejs.org/下载安装 2.安装Chocolatey,https://chocolatey.org/,先看
Cassandra 的数据存储结构 Cassandra 中的数据主要分为三种: CommitLog:主要记录下客户端提交过来
参考地址 http://wiki.apache.org/cassandra/FAQ#working_with_timeuuid_in_java 下载一个包 http:/
Svn地址:https://svn.apache.org/repos/asf/cassandra/branches/cassandra-0.8/ 过程: File-New-O
转载 http://blog.csdn.net/jiedushi/article/details/7325292 Hive是基于Hadoop的一个数据仓库工具
Cassandra 的数据存储结构 Cassandra 的数据模型是基于列族(Column Family)的四维或五维模型。它借
本文是Cassandra数据模型设计第一篇(全两篇),该系列文章包含了eBay使用Cassandra数据模型设计的
使用Thrift来让PHP操作Cassandra无疑是一个首选方案,但是配置和操作比较麻烦。 我们可以使用一个ph
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号