MapReduce总结【阅读.官方文档】

Inputs and Outputs

  • MapReduce 框架执行 <key,Value> < k e y , V a l u e > 对。输入job设置为 <key,Value> < k e y , V a l u e > ,输出也是 <Key,Value> < K e y , V a l u e > ,可以是不同的类型。
  • Key的类需要实现WritableCombale接口通过框架排序。

样例:WordCount 1.0

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

(WordCount)MapReduce 执行过程

public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
  StringTokenizer itr = new StringTokenizer(value.toString());
  while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);
  }
}
  • 输入文件输入后,被分成几份map进行并行处理, Mapper M a p p e r 是实现了,把字符串通过空格进行分割,并以 <1> < 字 符 , 1 > 的形式输出。
  • 举个例子:

    • 第一个map 输出:

      < Hello, 1>
      < World, 1>
      < Bye, 1>
      < World, 1>
    • 第二个map输出:

      < Hello, 1>
      < Hadoop, 1>
      < Goodbye, 1>
      < Hadoop, 1>
    • Map的执行过程中有个 localcombiner l o c a l c o m b i n e r 使之相同的字符串进行聚合。
    • 第一个map的输出:

      < Bye, 1>
      < Hello, 1>
      < World, 2>
    • 第二个map输出:

      < Goodbye, 1>
      < Hadoop, 2>
      < Hello, 1>
public void reduce(Text key, Iterable values,
                   Context context
                   ) throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable val : values) {
    sum += val.get();
  }
  result.set(sum);
  context.write(key, result);
}
  • Reducer通常被用来计算Values的值
  • job的输出是:
< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>
  • main方法主要包括 input/outputkey/Valueinput/output i n p u t / o u t p u t 路 径 、 k e y / V a l u e 类 型 , i n p u t / o u t p u t 格 式 等 job.waitForCompletion j o b . w a i t F o r C o m p l e t i o n 为了提交job和监控它的进程。

MapReduce -User Interfaces

Mapper

  • Maps 是将各个独立的任务转移输入文件到中间文件,被转移的中间文件不需要和输入文件的类型是一样。一个输入的 <KeyValue> < K e y V a l u e > 对可能有Map 0个或 多个 <Key,value> < K e y , v a l u e > 对输出。
  • hadoop MapReduce 框架中,通过job的inputFormat生成的每个InputSplit会产生一个map任务。
  • Mapper 的实现经过job的 job.setMapperClass j o b . s e t M a p p e r C l a s s 方法,然后框架为在inputSplit的每个键值对调用 map(WritableComparable,Writable,Context) m a p ( W r i t a b l e C o m p a r a b l e , W r i t a b l e , C o n t e x t ) ,然后,程序可以调用 cleanuo(Context) c l e a n u o ( C o n t e x t ) 方 法
  • 程序可以通过使用 Counter C o u n t e r 报告 其统计信息。
  • Mapper M a p p e r 输出被排序,然后分给每个Reducer,总的划分数量和REducer的任务数量是一致的。可以通过 Partitioner P a r t i t i o n e r 控制Keys 到哪个Reducer。
  • 可以使用 combinerJob.setCombinerClass(Class) c o m b i n e r ( 通 常 J o b . s e t C o m b i n e r C l a s s ( C l a s s ) ) 使中间的输出合并,可以减少Maper到Reducer的数据量。
  • maps的数量是多少?

    • maps数量通常由inputs的总大小也就是输入的文件的总数决定。
    • maps的正确的并行度每个节点大约10-100个maps,尽管它可以创建300maps对于节点的cpu来说。任务设置需要点时间,所以maps执行1min是最好的。

      1024TB=1PB
      1024PB=1EB
      1024EB=1ZB
      1024ZB=1YB
      
      ---------------------------
      
        1TB =  1024GB
        1GB= 1024MB
        1Mb =1024kb
      
      • 10Tb的数据和128mb的空间,你会得到82000maps。除非使用Configuration.set使用设置更高。

Reducer

  • Reducer 被执行通过 Job.setReducerClass J o b . s e t R e d u c e r C l a s s 方 法 ,通过重新来实例化他们,框架中的inputs组中的每个键值对然后调用 reduce(WritableComparable,Iterable<Writable>,Context) r e d u c e ( W r i t a b l e C o m p a r a b l e , I t e r a b l e < W r i t a b l e > , C o n t e x t )
  • Reducer有3 主要阶段:shuffle,sort,reduce.
    • Shuffle
      • Reducer 的input是被排序的mappers的输出。框架在这个阶段获取相关的所有mappers的划分输出,通过http.
    • Sort
      • 框架组成Reducer的inputs通过Keys(不同的mappers可能有相同的output键)在这个阶段。
      • shuffle和排序阶段同时发生,而当map-outptus 备货区时,他们是被合并的。
    • Secondary Sort
      • 在reducer之前,中间的键 在等规则情况下,需要进行不同的组合,然后可以通过 Job.setSortComparatorClass(Class). J o b . s e t S o r t C o m p a r a t o r C l a s s ( C l a s s ) . , Job.setGroupingComparatorClass(Class) J o b . s e t G r o u p i n g C o m p a r a t o r C l a s s ( C l a s s ) 可以被用来控制中间keys的组合。
    • Reduce
      • 在这个阶段中,在组合的inputs 的每个键值对调用 reduce(WritableComparable,Iterable<Writable>,Context) r e d u c e ( W r i t a b l e C o m p a r a b l e , I t e r a b l e < W r i t a b l e > , C o n t e x t ) 方法。
      • reduce的输出任务通常写入 FileSystem F i l e S y s t e m 使用 Context.write(WritableComparable,Writable) C o n t e x t . w r i t e ( W r i t a b l e C o m p a r a b l e , W r i t a b l e )
      • 程序可以使用Counter报告它的数据。
      • Reducer的输出是无序的。
    • Reducers的数量
      • reduces的数量是 0.951.75 0.95 或 1.75 乘以节点数乘以每个节点的最大容量。
      • 0.95,所有reduces可以立即启动,并且开始转移map 的outputs当maps结束时。1.75更快的结点会更快的完成他们的第一回合的reduces并且启动第二回合的reduces,做更多更好的job的负载平衡。
    • Reducer None
      • reduce任务为0 是合法的。
      • 在这种情况下,maps任务的输出直接进入FileSystem,也就是 FileOutputFormat.setOutputPath(Job,Path) F i l e O u t p u t F o r m a t . s e t O u t p u t P a t h ( J o b , P a t h )
        所设置的路径, 写入FileSystem之前,框架没有排序 mapoutputs m a p − o u t p u t s

Job 的配置

  • Job是主要用来描述MapReduce 的job 为了hadoop框架的执行。框架努力执行被Job设置好参数的job。
    • 一些配置参数可能被标记通过管理者作为固定的,因此不可以被改变。
    • 一些job参数是比较直接设置 Job.setNumReduceTasks(int)) J o b . s e t N u m R e d u c e T a s k s ( i n t ) ) ,其他参数也巧妙的相互联系参与框架的其余部分,还有更复杂的 Configuration.set(JobContext.NUMMAPS,int)) C o n f i g u r a t i o n . s e t ( J o b C o n t e x t . N U M M A P S , i n t ) ) .
  • Job通常用来指定 MapperCombinerPartitionerReducerInputFormatOutputFormat M a p p e r 、 C o m b i n e r 、 P a r t i t i o n e r 、 R e d u c e r 、 I n p u t F o r m a t 、 O u t p u t F o r m a t 的 实 现 , FileInputFormat 表明设置输入的路径 (FileInputFormat.setInputPaths(Job,Path)FileInputFormat.addInputPath(Job,Path))and(FileInputFormat.setInputPaths(Job,String)FileInputFormat.addInputPaths(Job,String)) ( F i l e I n p u t F o r m a t . s e t I n p u t P a t h s ( J o b , P a t h … ) 、 F i l e I n p u t F o r m a t . a d d I n p u t P a t h ( J o b , P a t h ) ) a n d ( F i l e I n p u t F o r m a t . s e t I n p u t P a t h s ( J o b , S t r i n g … ) 、 F i l e I n p u t F o r m a t . a d d I n p u t P a t h s ( J o b , S t r i n g ) ) ,输出文件路径通过 (FileOutputFormat.setOutputPath(Path)) ( F i l e O u t p u t F o r m a t . s e t O u t p u t P a t h ( P a t h ) ) 设置。
  • 使用者可以通过程序把 Configuration.set(String,String)/Configuration.get(String) C o n f i g u r a t i o n . s e t ( S t r i n g , S t r i n g ) / C o n f i g u r a t i o n . g e t ( S t r i n g ) 设置任意参数,可视, DistributedCache D i s t r i b u t e d C a c h e 仅被用来读巨大的数据。

Job Submission and Monitoring

  • Job 是用户作业与ResourceManager交互的主要接口。
  • Job 提交进程包括:
    • 检查job指定的input、output
    • 计算job inputSplit的值。
    • 如果有必要,设置DistributeCache的数量信息。
    • 将作业的jar 和 配置复制到MapReduce 系统文件目录。
    • 提交job 到REsourceManager,并且监控它的信息。
  • 使用者,使用job创建程序, 描述作业的各个方面,提交作业并且监控它的进展。

Job Control

  • 使用者可能需要链接MapReduce jobs 来完成复杂的不可以通过单节点MapReduce完成的任务。这是很容易的,因为,job的输出通常被写进文件系统,并且输出,可以转为下一个作业的输入文件。
  • 可是,这也意味着确保上一个作业的完成。在这种情况下,各种控制job的选项是:
    • Job.submit(),提交job到集群,立即返回。
    • Job.waitForCompletion(boolean) :提交作业到集群并且等待它完成。

Job Input

  • MapReduce 框架依赖于作业的InputFormat:
    • 验证作业的输入规范
    • 拆分输入文件到逻辑的InputSplit实例,他们中的每个都分配给单独的Mapper
    • RecordReader 提 供 R e c o r d R e a d e r 实 现 ,用于搜集来自逻辑的INputSplit输入文件以供Mapper处理。
  • InputFormat 实现的默认行为,通常是 FileInputFormat 的子类,以用bytes为单位的总大小的输入文件,拆分input到逻辑的InputSplit实例化。输入文件的总大小被作为input Splits 的上限,分割的下限可以通过 mapreduce.input.fileinputformat.split.minsize. m a p r e d u c e . i n p u t . f i l e i n p u t f o r m a t . s p l i t . m i n s i z e . 被设置.
  • TextInputFormat 是默认的InputFormat。

InputSplit

  • InputSplit 表示由单独Mapper处理的数据。
  • 通常,InputSplit 是面向字节的输入,并且它是RecordReader负责处理和呈现面向记录视图。
  • FileSplit是默认的InputSplit。它将 mapreduce.map.input.file m a p r e d u c e . m a p . i n p u t . f i l e 设置为逻辑分割的输入文件路径。

RecordReader

  • RecordReader 读取来自InputSplit的键值对
  • 通常RecordReader转换通过InputSplit提供的面向字节的视图,并且呈现出面向记录 视图使Mapper处理。

Job Output

  • OutputFormat 描述了MapReduce 作业的输出规则。
  • MapReduce 框架依赖于作业的OutputFormat:
    • 验证作业的输出规则,例如,检查输出路径是否已经存在。
    • 提供 RecordWrite 用来写入作业的输出文件,输出文件被储存在FileSystem。
  • TextOutputFormat 是默认的 OutputFormat。

OutputCommitter

  • OutputCommitter 描述了MapReduce作业输出提交。
  • MapReduce 框架依赖于 job 的OutputCommitter:
    • 初始化期间设置job,例如,在job初始化期间创建临时的输出目录。当job处于PrEP状态和初始化完成后,作业设置通过单独的任务完成,作业设置完成后,job会被移动到RUNNING状态。
    • job完成后,清除job。例如,移除临时的输出的目录。job清洗通过一个以job结束的单独任务完成。cleanup task 完成后,Job宣布 SUCCEDED/FAILED/KILLED
    • 设置任务临时输出,任务设置在人物初始化期间作为同一任务的一部分完成。
    • 检查任务是否需要提交,这是为了在任务不要提交时,避免提交。
    • 任务提交输出,由于任务被执行,如果需要,任务会提交它的输出。
    • 放弃任务提交,如果任务已经 failed/killed f a i l e d / k i l l e d ,输出会被cleaned-up,如果任务不可以cleanup,一个单独的任务会启动相同的尝试id去cleanup。

RecordWriter

  • RecordWrite 把输出的 <Key,Value> < K e y , V a l u e > 写入输出文件。
  • RecordWrite实现写入Job输出到FileSystem。

WordCount 2.0

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;

public class WordCount2 {

  public static class TokenizerMapper
       extends Mapper{

    static enum CountersEnum { INPUT_WORDS }

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    private boolean caseSensitive;
    private Set patternsToSkip = new HashSet();

    private Configuration conf;
    private BufferedReader fis;

    @Override
    public void setup(Context context) throws IOException,
        InterruptedException {
      conf = context.getConfiguration();
      caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
      if (conf.getBoolean("wordcount.skip.patterns", false)) {
        URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
        for (URI patternsURI : patternsURIs) {
          Path patternsPath = new Path(patternsURI.getPath());
          String patternsFileName = patternsPath.getName().toString();
          parseSkipFile(patternsFileName);
        }
      }
    }

    private void parseSkipFile(String fileName) {
      try {
        fis = new BufferedReader(new FileReader(fileName));
        String pattern = null;
        while ((pattern = fis.readLine()) != null) {
          patternsToSkip.add(pattern);
        }
      } catch (IOException ioe) {
        System.err.println("Caught exception while parsing the cached file '"
            + StringUtils.stringifyException(ioe));
      }
    }

    @Override
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      String line = (caseSensitive) ?
          value.toString() : value.toString().toLowerCase();
      for (String pattern : patternsToSkip) {
        line = line.replaceAll(pattern, "");
      }
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
        Counter counter = context.getCounter(CountersEnum.class.getName(),
            CountersEnum.INPUT_WORDS.toString());
        counter.increment(1);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
    String[] remainingArgs = optionParser.getRemainingArgs();
    if ((remainingArgs.length != 2) && (remainingArgs.length != 4)) {
      System.err.println("Usage: wordcount   [-skip skipPatternFile]");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount2.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    List otherArgs = new ArrayList();
    for (int i=0; i < remainingArgs.length; ++i) {
      if ("-skip".equals(remainingArgs[i])) {
        job.addCacheFile(new Path(remainingArgs[++i]).toUri());
        job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
      } else {
        otherArgs.add(remainingArgs[i]);
      }
    }
    FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

你可能感兴趣的