当前位置: 首页 > news >正文

HBase与MapReduce结合(一)——HBase表中插入数据

目录

  • 1. 配置MapReduce项目的日志
  • 2. 通过MapReduce实现HBase中数据的插入
    • 2.1 pom.xml中依赖配置
    • 2.2 工具类Util
    • 2.3 MyMapper和MyReducer
    • 2.4 配置Job
    • 2.5 结果
  • 3. 利用importtsv和completebulkload实现HBase表数据的插入
    • 3.1 删除music
    • 3.2 使用importtsv创建表
    • 3.3 使用completebulkload导入数据
  • 参考

1. 配置MapReduce项目的日志

  在Maven项目下/src/main/resources文件夹下新建log4j.properties文件,在其中输入一下内容(log4j.appender.file.File的值是输出日志的文件名):

log4j.rootLogger=INFO, file

log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.Append=false
log4j.appender.file.File=hbase-mapreduce.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

  最后在Main方法所在类或者自定义Mapper所在类的代码中添加一句(MyMapper.class根据代码情景进行修改):

private static final Logger logger = Logger.getLogger(MyMapper.class);

2. 通过MapReduce实现HBase中数据的插入

2.1 pom.xml中依赖配置

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>3.3.6</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>3.3.6</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
      <version>3.3.6</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-auth</artifactId>
      <version>3.3.6</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-common</artifactId>
      <version>2.5.10</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-client</artifactId>
      <version>2.5.10</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-mapreduce</artifactId>
      <version>2.5.10</version>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
    </dependency>
  </dependencies>

2.2 工具类Util

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.util.Bytes;

public class Util {
    public static Connection getConnection() throws IOException {
        Configuration conf = HBaseConfiguration.create();
        return ConnectionFactory.createConnection(conf);
    }

    public static void create(Connection conn, String tableName, String[] families) throws IOException {
        if (families.length == 0) {
            System.out.println("please provide at least one column family.");
            return;
        }
        if (families.length > 3) {
            System.out.println("please reduce the number of column families.");
            return;
        }
        
        Admin admin = conn.getAdmin();
        TableName tableName2 = TableName.valueOf(tableName);

        if (admin.tableExists(tableName2)) {
            System.out.println("table exists!");
            return;
        }

        TableDescriptorBuilder tableDescBuilder = TableDescriptorBuilder.newBuilder(tableName2);
        for (String family : families) {
            ColumnFamilyDescriptor columnFamily = ColumnFamilyDescriptorBuilder.of(family);
            tableDescBuilder.setColumnFamily(columnFamily);
        }
        admin.createTable(tableDescBuilder.build());
        System.out.println("create table success!");
        admin.close();
    }

    public static void delete(Connection conn, String tableName) throws IOException {
        Admin admin = getConnection().getAdmin();
        TableName tableName2 = TableName.valueOf(tableName);
        if (admin.tableExists(tableName2)) {
            admin.disableTable(tableName2);
            admin.deleteTable(tableName2);
        }
        admin.close();
    }

    public static void scan(Connection conn, String tableName) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        ResultScanner scanner = table.getScanner(scan);
        System.out.println("scan: ");
        for (Result res = scanner.next(); res != null; res = scanner.next()) {
            for (Cell cell : res.listCells()) {
                String row = Bytes.toString(CellUtil.cloneRow(cell));
                String columnFamily = Bytes.toString(CellUtil.cloneFamily(cell));
                String column = Bytes.toString(CellUtil.cloneQualifier(cell));
                String data = Bytes.toString(CellUtil.cloneValue(cell));
                System.out.println(String.format("row: %s, family: %s, column; %s, data: %s", row, columnFamily,
                        column, data));
            }
        }
        scanner.close();
    }
}

2.3 MyMapper和MyReducer

import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;

public class ImportData {
    private static final Logger logger = Logger.getLogger(MyMapper.class);

    public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {;
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            context.write(value, NullWritable.get());
        }
    }

    public static class MyReducer extends TableReducer<Text, NullWritable, Text> {
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            String[] columns = {"name", "singer", "gender", "ryghme", "terminal"};
            String[] splitStr = key.toString().split("\\s+");
            Put put = new Put(Bytes.toBytes(splitStr[0]));
            for (int i = 1; i < splitStr.length; i++) {
                put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(columns[i - 1]), Bytes.toBytes(splitStr[i]));
            }
            context.write(key, put);
        }
    }
}

2.4 配置Job

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class App {
    public static void main(String[] args) throws Exception {
        String file = "file:///home/developer/CodeArtsProjects/mapreduce-hbase/play_records.txt";
        Connection conn = Util.getConnection();
        Util.delete(conn, "music");
        Util.create(conn, "music", new String[] { "info" });
        Configuration conf = HBaseConfiguration.create();
        Job job = Job.getInstance(conf, "import-data");
        job.setJarByClass(App.class);
        job.setMapperClass(ImportData.MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setNumReduceTasks(2);
        TableMapReduceUtil.initTableReducerJob("music", ImportData.MyReducer.class, job);
        FileInputFormat.addInputPath(job, new Path(file));
        int res = job.waitForCompletion(true) ? 0 : 1;
        if (res == 0) {
            System.out.println("数据全部输入后,对music表进行扫描:");
            Util.scan(conn, "music");
        }
        conn.close();
        System.exit(res);
    }
}

2.5 结果

在这里插入图片描述

3. 利用importtsv和completebulkload实现HBase表数据的插入

3.1 删除music

  进入HBase shell,输入如下命令:

disable 'music'
drop 'music'

3.2 使用importtsv创建表

  命令:

hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.bulk.output=hdfs://172.17.0.2:9000/user/root/tmp -Dimporttsv.separator=" " -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:singer,info:gender,info:ryghme,info:terminal music file:///root/CodeProject/mapreduce-hbase/play_records.txt

在这里插入图片描述
在这里插入图片描述

3.3 使用completebulkload导入数据

  命令:

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles hdfs://172.17.0.2:9000/user/root/tmp music

参考

吴章勇 杨强著 大数据Hadoop3.X分布式处理实战

相关文章:

  • Ubuntu 下 nginx-1.24.0 源码分析 - pool->cleanup
  • 半导体制造工艺(二)光刻工艺—掩模版
  • SpringAI从人工智障到人工智能的实战改造
  • 人工智能时代:Python学习的全面攻略路线图
  • 二分法 ──── 算法3
  • JAVA实战开源项目:靓车汽车销售网站(Vue+SpringBoot) 附源码
  • 安装floodlight
  • unity学习54:图片+精灵+遮罩mask,旧版文本 text 和新的TMP文本
  • vi编辑器的使用(内附快捷键的使用)(超详细)
  • 图扑数字孪生:解锁压缩空气储能管控新高度
  • elementplus点击按钮直接预览图片
  • GitHub SSH连接问题解决指南
  • Python的那些事第三十一篇:快速数据帧处理与可视化的高效工具Vaex
  • 计算机毕业设计SpringBoot+Vue.js植物健康系统(源码+文档+PPT+讲解)
  • 使用ZFile打造属于自己的私有云系统结合内网穿透实现安全远程访问
  • 对话Stack Overflow,OceanBase CTO 杨传辉谈分布式数据库的“前世今生”
  • Template Method 设计模式
  • 企业数字化过程中数据仓库与商业智能 BI的目标
  • 算法-二叉树篇04-翻转二叉树
  • 开源软件的版权保护措施
  • 湖南华容县通报“大垱湖水质受污染”,爆料者:现场已在灌清水
  • 江西省国资委原副主任李键主动向组织交代问题,接受审查调查
  • 清华姚班,正走出一支军团
  • 程璧“自由生长”,刘卓辉“被旋律牵着走”
  • 百岁太极拳大师、陈氏太极拳第十一代嫡宗传人陈全忠逝世
  • 航行警告!黄海南部进行实弹射击,禁止驶入