博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
从Hbase传输数据到Hdfs
阅读量:4160 次
发布时间:2019-05-26

本文共 4784 字,大约阅读时间需要 15 分钟。

1.自定义一个类

package mr.hdfstoHbase.HbaseToHdfsMapper;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.util.Objects;public class StudentHbaseWritable implements Writable { private  String name=null; private  int age=0;    @Override    public void write(DataOutput out) throws IOException {   out.writeUTF(name);   out.writeInt(age);    }    @Override    public void readFields(DataInput in) throws IOException {         name = in.readUTF();         age= in.readInt();    }    public String getName() {        return name;    }    public void setName(String name) {        this.name = name;    }    public int getAge() {        return age;    }    public void setAge(int age) {        this.age = age;    }    @Override    public boolean equals(Object o) {        if (this == o) return true;        if (o == null || getClass() != o.getClass()) return false;        StudentHbaseWritable that = (StudentHbaseWritable) o;        return age == that.age &&                Objects.equals(name, that.name);    }    @Override    public int hashCode() {        return Objects.hash(name, age);    }    @Override    public String toString() {        return                  name + ":" + age;    }}

2.mr程序

package mr.hdfstoHbase.HbaseToHdfsMapper;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import java.io.IOException;import java.util.HashMap;import java.util.List;/** * 从hbase获取数据 * 分片:一个region就是一个分片 * 记录: key ImmutableBytesWritable Value Result * map阶段:一次从hbase获取一行 生成一个记录,其中一行中rowkey作为健 * 一行中cell保存在result作为值 * public abstract class TableMapper
* extends Mapper
{ *

* } *///只需要自定义一个 KEYOUT, VALUEOUT 类型public class HbaseToHdfsMapper extends TableMapper

{ private Text mapKey = new Text(); private StudentHbaseWritable mapValue = new StudentHbaseWritable(); @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { String str = Bytes.toString(key.get()); mapKey.set(str); List
cells = value.listCells(); HashMap
cellMap = new HashMap<>(); for (Cell tmp : cells ) { String family = Bytes.toString(tmp.getFamilyArray(), tmp.getFamilyOffset(), tmp.getFamilyLength()); String qKey = Bytes.toString(tmp.getQualifierArray(), tmp.getQualifierOffset(), tmp.getQualifierLength()); String values = Bytes.toString(tmp.getValueArray(), tmp.getValueOffset(), tmp.getValueLength()); if (family.equals("info")) { // mapValue.setName(family+":"+qKey+":"+values); cellMap.put(qKey, values); } } mapValue.setName(cellMap.get("name")); mapValue.setAge(Integer.valueOf(cellMap.get("age"))); context.write(mapKey, mapValue); }}

package mr.hdfstoHbase.HbaseToHdfsMapper;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class HbaseToHdfsRunner {    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {        System.setProperty("hadoop.home.dir", "E:\\software\\bigdate\\hadoop-2.6.0-cdh5.15.0\\hadoop-2.6.0-cdh5.15.0");        Configuration conf = new Configuration();        //hdfs入口        conf.set("fs.defaultFS", "hdfs://wang:9000");        conf.set("zookeeper.znode.parent", "/hbase");        conf.set("hbase.zookeeper.quorum", "wang");        conf.set("hbase.zookeeper.property.clientPort", "2181");        Job job = Job.getInstance(conf);        job.setJobName("HbaseToHdfsRunnerJob");        job.setJarByClass(HbaseToHdfsRunner.class);        //设置输入类型,以及hbase初始化       /* public static void initTableMapperJob(String table, Scan scan,                Class
mapper, Class
outputKeyClass, Class
outputValueClass, Job job)*/ TableMapReduceUtil.initTableMapperJob("hadoop:student",//hbase表的名称 new Scan(),//全表扫描 HbaseToHdfsMapper.class, Text.class, StudentHbaseWritable.class, job); //output Path outputPath = new Path("/user/wang/hbase_data/HbaseTOHdfsOut4S"); FileOutputFormat.setOutputPath(job, outputPath); //执行任务 job.waitForCompletion(true); }}

 

转载地址:http://grjxi.baihongyu.com/

你可能感兴趣的文章
进程创建时cgroup处理
查看>>
idle进程创建
查看>>
内核线程创建
查看>>
linux elf tool readelf
查看>>
linux tool objdump
查看>>
linux tool nm
查看>>
字节对齐
查看>>
把类成员函数封装成线程API所需要的函数
查看>>
HTTP Live Streaming直播(iOS直播)技术分析与实现
查看>>
Ribbon界面图标可以直接用PNG做透明图标
查看>>
向其他软件窗口、控件发送消息的方法
查看>>
word或者pdf文件全部保存为图片的方法
查看>>
VS2010下SQLite3生成lib库文件
查看>>
sqlite3的helloworld
查看>>
MFC下支持中文的SQLite3封装类使用
查看>>
简单高效的多线程日志类
查看>>
研华USB4711A采集卡高速中断模式采集总结
查看>>
从零起步CMFCToolBar用法详解
查看>>
CMFCRibbonStatusBar用法
查看>>
CMFCControlRendererInfo类的参数
查看>>