当前位置:首页 > 开发 > 系统架构 > 架构 > 正文

hadoop 自定义inputformat和outputformat

发表于: 2013-02-17   作者:blackproof   来源:转载   浏览次数:
摘要:   hadoop的inputformat和outputformat   最好的例子vertica :虽然是在pig中实现的udf,但是就是hadoop的inputformat和outputformat,在hive里也可以照用,贴个下载的地址:http://blackproof.iteye.com/blog/1791995   再贴一个项目中,在实现hadoo

 

hadoop的inputformat和outputformat

 

最好的例子vertica :虽然是在pig中实现的udf,但是就是hadoop的inputformat和outputformat,在hive里也可以照用,贴个下载的地址:http://blackproof.iteye.com/blog/1791995

 

再贴一个项目中,在实现hadoop join时,用的inputformat和outputformat的简单实例:

hadoop join在http://blackproof.iteye.com/blog/1757530

   自定义inputformat(泛型是maper的input)

public class MyInputFormat extends FileInputFormat<MultiKey,Employee> {
	
	public MyInputFormat(){}

	@Override
	public RecordReader<MultiKey, Employee> createRecordReader(
			InputSplit split, TaskAttemptContext context) throws IOException,
			InterruptedException {
		// TODO Auto-generated method stub
		return new MyRecordReader();
	}
	
	public static class MyRecordReader extends RecordReader<MultiKey, Employee>{

		public LineReader in;
		public MultiKey key;
		public Employee value;
		public StringTokenizer token = null;
		
		public Text line;
		
		@Override
		public void initialize(InputSplit split, TaskAttemptContext context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			FileSplit fileSplit = (FileSplit)split;
			Configuration job = context.getConfiguration();
			Path file = fileSplit.getPath();
			FileSystem fs = file.getFileSystem(job);
			
			FSDataInputStream filein = fs.open(file);
			in = new LineReader(filein, job);
			
			key = new MultiKey();
			value = new Employee();
			line = new Text();
		}

		@Override
		public boolean nextKeyValue() throws IOException, InterruptedException {

			int linesize = in.readLine(line);
			if(linesize==0)
				return false;
			String[] pieces = line.toString().split(",");
			int i = Integer.valueOf(pieces[0]);
			switch (i) {
			case 1:
				value.setEmpName(pieces[1]);
				value.setFlag(1);
				break;

			default:
				value.setDepartName(pieces[1]);
				value.setFlag(2);
				break;
			}
			value.setDepartId(pieces[2]);
			value.setDepartNo(pieces[3]);
			
			key.setDepartId(value.getDepartId());
			key.setDepartNo(value.getDepartNo());
			return true;
		}

		@Override
		public MultiKey getCurrentKey() throws IOException,
				InterruptedException {
			// TODO Auto-generated method stub
			return key;
		}

		@Override
		public Employee getCurrentValue() throws IOException,
				InterruptedException {
			// TODO Auto-generated method stub
			return value;
		}

		@Override
		public float getProgress() throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			return 0;
		}

		@Override
		public void close() throws IOException {
			// TODO Auto-generated method stub
			
		}
		
	}

}

 

 

   自定义outputformat(泛型是reduce的输出)

public class MyOutputFormat extends FileOutputFormat<Text, Employee> {

	@Override
	public RecordWriter<Text, Employee> getRecordWriter(
			TaskAttemptContext job) throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		Configuration conf = job.getConfiguration();
		Path file = getDefaultWorkFile(job, "");
		FileSystem fs = file.getFileSystem(conf);
		FSDataOutputStream fileOut = fs.create(file, false);
		return new MyRecordWriter(fileOut);
	}
	
	public static class MyRecordWriter extends RecordWriter<Text, Employee>{

		protected DataOutputStream out;
		private final byte[] keyValueSeparator;
		 public static final String NEW_LINE = System.getProperty("line.separator");
		
		public MyRecordWriter(DataOutputStream out){
			this(out,":");
		}
		
		public MyRecordWriter(DataOutputStream out,String keyValueSeparator){
			this.out = out;
			this.keyValueSeparator = keyValueSeparator.getBytes();
		}
		
		@Override
		public void write(Text key, Employee value) throws IOException,
				InterruptedException {
			if(key!=null){
				out.write(key.toString().getBytes());
				out.write(keyValueSeparator);
			}
			out.write(value.toString().getBytes());
			out.write(NEW_LINE.getBytes());
		}

		@Override
		public void close(TaskAttemptContext context) throws IOException,
				InterruptedException {
			out.close();
		}
		
	}

}

 

hadoop 自定义inputformat和outputformat

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
在上一篇中,我们实现了按 cookieId 和 time 进行二次排序,现在又有新问题:假如我需要按 cookieId
<img src="http://img.it610.com/image/product/d20a8897e10949cbb4c1f82d32420b17.png" width="
OutputFormat 主要用于描述输出数据的格式,它能够将用户提供的 key/value 对写入特定格式的文件中
Hadoop MapReduce的编程接口层主要有5个可编程组件,分别为InputFormat、Mapper、Partitioner、Reduc
开放环境,hadoop-0.20.2,hive-0.6 1.日志分隔符 2010-05-31 10:50:17|||61.132.4.82|||http://www.
InputFormat 主要用于描述输入数据的格式, 它提供以下两个功能。 ❑数据切分:按照某个策略将输入
在执行一个Job的时候,Hadoop会将输入数据划分成N个Split,然后启动相应的N个Map程序来分别处理它们
在执行一个Job的时候,Hadoop会将输入数据划分成N个Split,然后启动相应的N个Map程序来分别处理它们
在执行一个Job的时候,Hadoop会将输入数据划分成N个Split,然后启动相应的N个Map程序来分别处理它们
在本节里,我们着重学习MapReduce编程接口模型中的InputForamt组件。 InputFormat主要用于描述输入
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号