当前位置:首页 > 开发 > 开源软件 > 正文

mapreduce--读取mysql数据库数据

发表于: 2013-08-15   作者:Chrro   来源:转载   浏览次数:
摘要: import java.io.File; import java.io.IOException; import java.io.DataInput; import java.io.DataOutput; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.
import java.io.File;
import java.io.IOException;

import java.io.DataInput;

import java.io.DataOutput;



import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;
import java.util.Iterator;



import org.apache.hadoop.examples.EJob;
import org.apache.hadoop.filecache.DistributedCache;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.lib.db.DBWritable;

import org.apache.hadoop.mapred.lib.db.DBInputFormat;

import org.apache.hadoop.mapred.lib.db.DBConfiguration;



public class ReadDB {



    public static class Map extends MapReduceBase implements

            Mapper<LongWritable, StudentRecord, LongWritable, Text> {



        // map

        public void map(LongWritable key, StudentRecord value,

        OutputCollector<LongWritable, Text> collector, Reporter reporter)

                throws IOException {

            collector.collect(new LongWritable(value.id),

                    new Text(value.toString()));

        }



    }
    //reducer
       public static class Reduce extends MapReduceBase implements Reducer<LongWritable,Text,LongWritable,Text>{
@Override
public void reduce(LongWritable key, Iterator<Text> value,
OutputCollector<LongWritable,Text> collector, Reporter reporter)
throws IOException {
// TODO Auto-generated method stub
while (value.hasNext()){
collector.collect(key,value.next());
}
}
      
       }



    public static class StudentRecord implements Writable, DBWritable {

        public int id;

        public String name;

        public String sex;

        public int age;



        @Override

        public void readFields(DataInput in) throws IOException {

            this.id = in.readInt();

            this.name = Text.readString(in);

            this.sex = Text.readString(in);

            this.age = in.readInt();

        }



        @Override

        public void write(DataOutput out) throws IOException {

            out.writeInt(this.id);

            Text.writeString(out, this.name);

            Text.writeString(out, this.sex);

            out.writeInt(this.age);

        }



        @Override

        public void readFields(ResultSet result) throws SQLException {

            this.id = result.getInt(1);

            this.name = result.getString(2);

            this.sex = result.getString(3);

            this.age = result.getInt(4);

        }



        @Override

        public void write(PreparedStatement stmt) throws SQLException {

            stmt.setInt(1, this.id);

            stmt.setString(2, this.name);

            stmt.setString(3, this.sex);

            stmt.setInt(4, this.age);

        }



        @Override

        public String toString() {

            return new String("学号" + this.id + "_姓名:" + this.name

                    + "_性别:"+ this.sex + "_年龄:" + this.age);

        }

    }



    @SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {



        JobConf conf = new JobConf(ReadDB.class);
        //设置地址
       conf.set("fs.default.name", "hdfs://192.168.71.128:9000");
        conf.set("mapred.job.tracker", "192.168.71.128:9001");
        conf.set("dfs.permissions","false");
            

        File jarFile = EJob.createTempJar("bin");
   
    EJob.addClasspath("/usr/hadoop/conf");
    
    ClassLoader classLoader = EJob.getClassLoader();
    
    Thread.currentThread().setContextClassLoader(classLoader);

conf.setJar(jarFile.toString());


DistributedCache.addFileToClassPath(new Path(

         "/usr/hadoop/lib/mysql-connector-java-5.1.18-bin.jar"), conf);
Class.forName("com.mysql.jdbc.Driver");

       
        // 设置map和reduce类
       
        conf.setMapperClass(Map.class);

        conf.setReducerClass(Reduce.class);
       
        // 设置数据库
       
        DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",

            "jdbc:mysql://192.168.71.128:3306/song", "root", "mysql");


       
        // 设置表字段

        String[] fields = { "id", "name", "sex", "age" };

        DBInputFormat.setInput(conf, StudentRecord.class, "student", null,"id", fields);

        // 设置输入类型

        conf.setInputFormat(DBInputFormat.class);

        //conf.setMapOutputKeyClass(Text.class);
        //conf.setMapOutputValueClass(LongWritable.class);
        // 设置输出类型

        conf.setOutputKeyClass(LongWritable.class);

        conf.setOutputValueClass(Text.class);


        // 输出路径

        FileOutputFormat.setOutputPath(conf, new Path("rdb_out"));

        JobClient.runJob(conf);

    }

}
//需要把mysql驱动包添加到工程中

mapreduce--读取mysql数据库数据

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
1、java读取文本文件到mysql数据库【示例】:把手机号码归属地文件:安徽联通.txt读取到数据库schoo
1、java读取文本文件到mysql数据库【示例】:把手机号码归属地文件:安徽联通.txt读取到数据库schoo
项目配置文件Conf/config.php中添加数据库连接信息: // 添加数据库配置信息 'DB_TYPE' => 'mysq
 数据库是信息的集合,构建方式是将数据的类别互相关联。数据库存储在计算机上时,您可以使用其中
今天,天气依旧很热。整个房间像火炉一般炽热。今天来写写这方面的笔记,做个实验。看看如何运作,
Spring MVC中使用MessageSource默认是写在properties文件当中,以支持国际化。 但很多时候我们需要
今天将短信软件在真机上测试,一直装不上,在模拟器上都是好的。一直报NullPointerException空指针
1、需求:批量导入文件夹“手机号码归属地”下的所有文本文件,如图: 2、java源代码: package com
最近有一项目,需要从mysql数据库读取数据,到Flex中呈现在Tree上,原来一直用读取xml,现在不想用xm
因为bmp格式的图片,前面有一个头部数据,长度为78byte,所以读取的时候就需要跳过这一部分。其他格
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号