本文共 4784 字,大约阅读时间需要 15 分钟。
1.自定义一个类
package mr.hdfstoHbase.HbaseToHdfsMapper;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.util.Objects;public class StudentHbaseWritable implements Writable { private String name=null; private int age=0; @Override public void write(DataOutput out) throws IOException { out.writeUTF(name); out.writeInt(age); } @Override public void readFields(DataInput in) throws IOException { name = in.readUTF(); age= in.readInt(); } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; StudentHbaseWritable that = (StudentHbaseWritable) o; return age == that.age && Objects.equals(name, that.name); } @Override public int hashCode() { return Objects.hash(name, age); } @Override public String toString() { return name + ":" + age; }}
2.mr程序
package mr.hdfstoHbase.HbaseToHdfsMapper;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import java.io.IOException;import java.util.HashMap;import java.util.List;/** * 从hbase获取数据 * 分片:一个region就是一个分片 * 记录: key ImmutableBytesWritable Value Result * map阶段:一次从hbase获取一行 生成一个记录,其中一行中rowkey作为健 * 一行中cell保存在result作为值 * public abstract class TableMapper* extends Mapper { * * } *///只需要自定义一个 KEYOUT, VALUEOUT 类型public class HbaseToHdfsMapper extends TableMapper
{ private Text mapKey = new Text(); private StudentHbaseWritable mapValue = new StudentHbaseWritable(); @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { String str = Bytes.toString(key.get()); mapKey.set(str); List cells = value.listCells(); HashMap | cellMap = new HashMap<>(); for (Cell tmp : cells ) { String family = Bytes.toString(tmp.getFamilyArray(), tmp.getFamilyOffset(), tmp.getFamilyLength()); String qKey = Bytes.toString(tmp.getQualifierArray(), tmp.getQualifierOffset(), tmp.getQualifierLength()); String values = Bytes.toString(tmp.getValueArray(), tmp.getValueOffset(), tmp.getValueLength()); if (family.equals("info")) { // mapValue.setName(family+":"+qKey+":"+values); cellMap.put(qKey, values); } } mapValue.setName(cellMap.get("name")); mapValue.setAge(Integer.valueOf(cellMap.get("age"))); context.write(mapKey, mapValue); }}
package mr.hdfstoHbase.HbaseToHdfsMapper;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class HbaseToHdfsRunner { public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException { System.setProperty("hadoop.home.dir", "E:\\software\\bigdate\\hadoop-2.6.0-cdh5.15.0\\hadoop-2.6.0-cdh5.15.0"); Configuration conf = new Configuration(); //hdfs入口 conf.set("fs.defaultFS", "hdfs://wang:9000"); conf.set("zookeeper.znode.parent", "/hbase"); conf.set("hbase.zookeeper.quorum", "wang"); conf.set("hbase.zookeeper.property.clientPort", "2181"); Job job = Job.getInstance(conf); job.setJobName("HbaseToHdfsRunnerJob"); job.setJarByClass(HbaseToHdfsRunner.class); //设置输入类型,以及hbase初始化 /* public static void initTableMapperJob(String table, Scan scan, Class mapper, Class outputKeyClass, Class outputValueClass, Job job)*/ TableMapReduceUtil.initTableMapperJob("hadoop:student",//hbase表的名称 new Scan(),//全表扫描 HbaseToHdfsMapper.class, Text.class, StudentHbaseWritable.class, job); //output Path outputPath = new Path("/user/wang/hbase_data/HbaseTOHdfsOut4S"); FileOutputFormat.setOutputPath(job, outputPath); //执行任务 job.waitForCompletion(true); }}
转载地址:http://grjxi.baihongyu.com/