当前位置:首页 > 开发 > 开源软件 > 正文

【Avro三】Hadoop MapReduce读写Avro文件

发表于: 2015-04-08   作者:bit1129   来源:转载   浏览:
摘要: Avro是Doug Cutting(此人绝对是神一般的存在)牵头开发的。 开发之初就是围绕着完善Hadoop生态系统的数据处理而开展的(使用Avro作为Hadoop MapReduce需要处理数据序列化和反序列化的场景),因此Hadoop MapReduce集成Avro也就是自然而然的事情。 这个例子是一个简单的Hadoop MapReduce读取Avro格式的源文件进行计数统计,然后将计算结果

Avro是Doug Cutting(此人绝对是神一般的存在)牵头开发的。 开发之初就是围绕着完善Hadoop生态系统的数据处理而开展的(使用Avro作为Hadoop MapReduce需要处理数据序列化和反序列化的场景),因此Hadoop MapReduce集成Avro也就是自然而然的事情。

这个例子是一个简单的Hadoop MapReduce读取Avro格式的源文件进行计数统计,然后将计算结果作为Avro格式的数据写到目标文件中,主要目的是体会下Hadoop MapReduce操作Avro的基本流程和Avro提供的API

 

1. Maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>learn</groupId>
    <artifactId>learn.avro</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!--avro core-->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.7.7</version>
        </dependency>

        <!--avro rpc support-->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-ipc</artifactId>
            <version>1.7.7</version>
        </dependency>

        <!--avro utilities for Hadoop MapReduce to process avro files -->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-mapred</artifactId>
            <version>1.7.7</version>
        </dependency>

        <!--Avro and Hadoop Map Reduce-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>1.2.1</version>
        </dependency>


    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.7.7</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                            <goal>protocol</goal>
                            <goal>idl-protocol</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

 

 

2. MapReduce代码:

package examples.avro.mapreduce;

import examples.avro.simple.User;
import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyValueOutputFormat;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class MapReduceColorCount extends Configured implements Tool {

    ///Mapper定义:
    ///输入Key类型是AvroKey<User>,输入Value类型是NullWritable
    ///输出Key类型是Text,输出Value类型是IntWritable
    public static class ColorCountMapper extends
            Mapper<AvroKey<User>, NullWritable, Text, IntWritable> {

        @Override
        public void map(AvroKey<User> key, NullWritable value, Context context)
                throws IOException, InterruptedException {

            CharSequence color = key.datum().getFavoriteColor();
            if (color == null) {
                color = "none";
            }
            context.write(new Text(color.toString()), new IntWritable(1));
        }
    }

    ///Reducer定义:
    ///输入Key类型是Text,输入Value类型是IntWritable(跟Key的输出Key/Value类型一致)
    ///输出Key类型是AvroKey<CharSequence>,输出Value类型是AvroValue<Integer>
    public static class ColorCountReducer extends
            Reducer<Text, IntWritable, AvroKey<CharSequence>, AvroValue<Integer>> {

        @Override
        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context) throws IOException, InterruptedException {

            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(new AvroKey<CharSequence>(key.toString()), new AvroValue<Integer>(sum));
        }
    }

    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: MapReduceColorCount <input path> <output path>");
            return -1;
        }

        Job job = new Job(getConf());
        job.setJarByClass(MapReduceColorCount.class);
        job.setJobName("Color Count");

        ///指定输入路径,输入文件是Avro格式
        FileInputFormat.setInputPaths(job, new Path(args[0]));

        ///指定输出路径,输出文件格式是Key/Value组成的Avro文件,见AvroKeyValueOutputFormat
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //AvroKeyInputFormat: A MapReduce InputFormat that can handle Avro container files.
        job.setInputFormatClass(AvroKeyInputFormat.class);
        job.setMapperClass(ColorCountMapper.class);
        AvroJob.setInputKeySchema(job, User.getClassSchema());
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //AvroKeyValueOutputFormat: FileOutputFormat for writing Avro container files of key/value pairs
        job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
        job.setReducerClass(ColorCountReducer.class);
        AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING));
        AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.INT));

        return (job.waitForCompletion(true) ? 0 : 1);
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new MapReduceColorCount(), args);
        System.exit(res);
    }
}

 

 

3. 主要类注释

3.1 AvroKey

/** The wrapper of keys for jobs configured with {@link AvroJob} . */

 

3.2 AvroValue

/** The wrapper of values for jobs configured with {@link AvroJob} . */

 

3.3 AvroJob

/** Setters to configure jobs for Avro data. */

 

3.4 AvroKeyInputFormat

/**
 * A MapReduce InputFormat that can handle Avro container files.
 *
 * <p>Keys are AvroKey wrapper objects that contain the Avro data.  Since Avro
 * container files store only records (not key/value pairs), the value from
 * this InputFormat is a NullWritable.</p>
 */

 

3.5 AvroKeyValueOutputFormat

/**
 * FileOutputFormat for writing Avro container files of key/value pairs.
 *
 * <p>Since Avro container files can only contain records (not key/value pairs), this
 * output format puts the key and value into an Avro generic record with two fields, named
 * 'key' and 'value'.</p>
 *
 * <p>The keys and values given to this output format may be Avro objects wrapped in
 * <code>AvroKey</code> or <code>AvroValue</code> objects.  The basic Writable types are
 * also supported (e.g., IntWritable, Text); they will be converted to their corresponding
 * Avro types.</p>
 *
 * @param <K> The type of key. If an Avro type, it must be wrapped in an <code>AvroKey</code>.
 * @param <V> The type of value. If an Avro type, it must be wrapped in an <code>AvroValue</code>.
 */

 

3.6

  /**
   * Sets the job input key schema.
   *
   * @param job The job to configure.
   * @param schema The input key schema.
   */
  public static void setInputKeySchema(Job job, Schema schema) {
    job.getConfiguration().set(CONF_INPUT_KEY_SCHEMA, schema.toString());
  }

  /**
   * Sets the job input value schema.
   *
   * @param job The job to configure.
   * @param schema The input value schema.
   */
  public static void setInputValueSchema(Job job, Schema schema) {
    job.getConfiguration().set(CONF_INPUT_VALUE_SCHEMA, schema.toString());
  }

 

3.7

/**
   * Sets the map output key schema.
   *
   * @param job The job to configure.
   * @param schema The map output key schema.
   */
  public static void setMapOutputKeySchema(Job job, Schema schema) {
    job.setMapOutputKeyClass(AvroKey.class);
    job.setGroupingComparatorClass(AvroKeyComparator.class);
    job.setSortComparatorClass(AvroKeyComparator.class);
    AvroSerialization.setKeyWriterSchema(job.getConfiguration(), schema);
    AvroSerialization.setKeyReaderSchema(job.getConfiguration(), schema);
    AvroSerialization.addToConfiguration(job.getConfiguration());
  }

  /**
   * Sets the map output value schema.
   *
   * @param job The job to configure.
   * @param schema The map output value schema.
   */
  public static void setMapOutputValueSchema(Job job, Schema schema) {
    job.setMapOutputValueClass(AvroValue.class);
    AvroSerialization.setValueWriterSchema(job.getConfiguration(), schema);
    AvroSerialization.setValueReaderSchema(job.getConfiguration(), schema);
    AvroSerialization.addToConfiguration(job.getConfiguration());
  }

 

 

 

【Avro三】Hadoop MapReduce读写Avro文件

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
AVRO文件结构分析 guibin.beijing@gmail.com 研究了AVRO的规范,比较形象的图形表达了文件中内容布局
使用 使用使用 使用 HDFS 保存大量小文件的缺点: 1.Hadoop NameNode 在内存中保存所有文件的“元信
转载请写明来源地址:http://blog.csdn.net/lastsweetop/article/details/9900129 所有源码在github
一、引言 1、 简介 Avro是Hadoop中的一个子项目,也是Apache中一个独立的项目,Avro是一个基于二进
一、引言 1、 简介 Avro是Hadoop中的一个子项目,也是Apache中一个独立的项目,Avro是一个基于二进
一、引言 1、 简介 Avro是Hadoop中的一个子项目,也是Apache中一个独立的项目,Avro是一个基于二进
一、引言 1、 简介 Avro是Hadoop中的一个子项目,也是Apache中一个独立的项目,Avro是一个基于二进
一、引言 1、 简介 Avro是Hadoop中的一个子项目,也是Apache中一个独立的项目,Avro是一个基于二进
一、引言 1、 简介 Avro是Hadoop中的一个子项目,也是Apache中一个独立的项目,Avro是一个基于二进
一、引言 1、 简介 Avro是Hadoop中的一个子项目,也是Apache中一个独立的项目,Avro是一个基于二进
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号