MapReduce直接连接MySQL获取数据

mysql> select * from linuxidc_tbls;
+---------------------+----------------+
| TBL_NAME            | TBL_TYPE      |
+---------------------+----------------+
| linuxidc_test_table      | EXTERNAL_TABLE |
| linuxidc_t              | MANAGED_TABLE  |
| linuxidc_t1              | MANAGED_TABLE  |
| tt                  | MANAGED_TABLE  |
| tab_partition      | MANAGED_TABLE  |
| linuxidc_hbase_table_1  | MANAGED_TABLE  |
| linuxidc_hbase_user_info | MANAGED_TABLE  |
| t                  | EXTERNAL_TABLE |
| linuxidc_jobid          | MANAGED_TABLE  |
+---------------------+----------------+
9 rows in set (0.01 sec)

mysql> select * from linuxidc_tbls where TBL_NAME like 'linuxidc%' order by TBL_NAME;
+---------------------+----------------+
| TBL_NAME            | TBL_TYPE      |
+---------------------+----------------+
| linuxidc_hbase_table_1  | MANAGED_TABLE  |
| linuxidc_hbase_user_info | MANAGED_TABLE  |
| linuxidc_jobid          | MANAGED_TABLE  |
| linuxidc_t              | MANAGED_TABLE  |
| linuxidc_t1              | MANAGED_TABLE  |
| linuxidc_test_table      | EXTERNAL_TABLE |
+---------------------+----------------+
6 rows in set (0.00 sec)

MapReduce程序代码,ConnMysql.java:

package com.linuxidc.study;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;

import org.apache.Hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ConnMysql {
       
        private static Configuration conf = new Configuration();
       
        static {
                conf.addResource(new Path("F:/linuxidc-hadoop/hdfs-site.xml"));
                conf.addResource(new Path("F:/linuxidc-hadoop/mapred-site.xml"));
                conf.addResource(new Path("F:/linuxidc-hadoop/core-site.xml"));
                conf.set("mapred.job.tracker", "10.133.103.21:50021");
        }
       
        public static class TblsRecord implements Writable, DBWritable {
                String tbl_name;
                String tbl_type;

public TblsRecord() {

}

@Override
                public void write(PreparedStatement statement) throws SQLException {
                        // TODO Auto-generated method stub
                        statement.setString(1, this.tbl_name);
                        statement.setString(2, this.tbl_type);
                }

@Override
                public void readFields(ResultSet resultSet) throws SQLException {
                        // TODO Auto-generated method stub
                        this.tbl_name = resultSet.getString(1);
                        this.tbl_type = resultSet.getString(2);
                }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/3bc210fae01d089552e99af224444d86.html