Eclipse编写MapReduce程序

写一个WordCount例子

有一个数据源,格式如下

Eclipse编写MapReduce程序_第1张图片
data-source.png

求出 item 该分类有多少?

1.新建一个Map/Reduce Project

File - New - Other - Map/Reduce Project

2.SalesItemCategoryMapper.class

//SalesItemCategoryMapper.class
package SalesProduct;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesItemCategoryMapper extends MapReduceBase implements Mapper {
    private final static IntWritable one = new IntWritable(1);

    @Override
    public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter)
            throws IOException {
        // 将输入的纯文本的数据转换成String
        String valueString = value.toString();
        // 将输入的数据先按行进行分割
        StringTokenizer tokenizerArticle = new StringTokenizer(valueString, "\n");

        while (tokenizerArticle.hasMoreTokens()) {
            // 每行按空格划分
            //  StringTokenizer tokenizer = new StringTokenizer(tokenizerArticle.nextToken());
            System.out.println("-tokenizer-->"+tokenizerArticle.nextToken());
            System.out.println("-valueString-->"+valueString);
            String[] items = valueString.split("\t");
            String itemName = items[3];
            Text name = new Text(itemName);
            output.collect(name, one);
        }
    }

}

3.SalesItemCategoryReducer.class

//SalesItemCategoryReducer.class
package SalesProduct;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class SalesItemCategoryReducer  extends MapReduceBase implements Reducer {

    @Override
    public void reduce(Text t_key, Iterator values, OutputCollector output, Reporter reporter)
            throws IOException {
        // TODO Auto-generated method stub
        Text key = t_key;
//      int frequencyForCountry = 0;
//      while(values.hasNext()){
//          IntWritable value = (IntWritable)values.next();
//          frequencyForCountry += value.get();
//          
//      }
        
        output.collect(key, new IntWritable(1));
    }

}

4.SalesItemDriver.class

//SalesItemDriver.class
package SalesResult;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;


public class SalesItemDriver {
    public static void main(String[] args) {
        JobClient my_client = new JobClient();
        JobConf job_conf = new JobConf(SalesItemDriver.class);
        job_conf.setJobName("SaleCategory");
        job_conf.setOutputKeyClass(Text.class);
        job_conf.setOutputValueClass(IntWritable.class);
//      job_conf.setOutputValueClass(DoubleWritable.class);

        // get category
        job_conf.setMapperClass(SalesProduct.SalesItemCategoryMapper.class);
        job_conf.setReducerClass(SalesProduct.SalesItemCategoryReducer.class);

        // get category sum
//      job_conf.setMapperClass(SalesProduct.SalesCategorySumMapper.class);
//      job_conf.setReducerClass(SalesProduct.SalesCategorySumReducer.class);

        job_conf.setInputFormat(TextInputFormat.class);
        job_conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job_conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));

        my_client.setConf(job_conf);
        try {
            JobClient.runJob(job_conf);
        } catch (Exception e) {
            // TODO: handle exception
            e.printStackTrace();
        }
    }
}

5.添加日志 log4j.properties

log4j.rootLogger=INFO, stdout  
log4j.appender.stdout=org.apache.log4j.ConsoleAppender  
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n  
log4j.appender.logfile=org.apache.log4j.FileAppender  
log4j.appender.logfile.File=target/spring.log  
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout  
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n  

6.右键项目 Run as - Run configurations …

Eclipse编写MapReduce程序_第2张图片
java-application-config-1

设置两个参数

Eclipse编写MapReduce程序_第3张图片
java-mapreduce-params

第一个参数表示你的目标文件
第二参数是运行结果保存在指定文件,注意保存所在文件夹不能手动创建,程序会自动创建。

7.点击Run

Eclipse编写MapReduce程序_第4张图片
demo-result.png
Eclipse编写MapReduce程序_第5张图片
demo-result-2.png
Eclipse编写MapReduce程序_第6张图片
demo-result-3.png

8.注意导入hadoop所需的类在如下文件夹内

Eclipse编写MapReduce程序_第7张图片
hadoop-jars

9.练练手?
Data Set: https://pan.baidu.com/s/1c2J15Qw 密码: 4xkd

The format goes like this:
date time store_name item price payment_method

2012-01-01 09:00 San Jose Men's Clothing 214.05 Amex
2012-01-01 09:00 Fort Worth Women's Clothing 153.57 Visa
2012-01-01 09:00 San Diego Music 66.08 Cash
......
......
......

Use mapreduce programming model to find out:

  1. What are the item categories? What is the total sales value for each item category?
  2. What are the sales name for the following store name? "Reno" "Toledo" "Chandler"
  3. How many items in total were sold?
  4. What is the total sales value for all stores?

10.可能会有用的命令
Format namenode:

bin/hdfs namenode -format

Start Hadoop:

sbin/start-dfs.sh
shin/start-yarn.sh

Stop Hadoop:

sbin/stop-dfs.sh
sbin/stop-yarn.sh

Check Report

bin/hdfs dfsadmin -report

Allow port:

sudo ufw allow from 192.168.9.4 
#(allow access from this ip)
sudo ufw allow 22 
#(here allow 22 port)

HDFS create folder:

bin/hadoop fs -mkdir  /wordcount
#Use copyFromLocal to copy files to HDFS:
bin/hadoop fs -copyFromLocal /home/ubuntu/word.txt /wordcount/word.txt

check hadoop status :

http://目标IP:50070/dfshealth.html

你可能感兴趣的