使用 Bulk Load 快速向 HBase 中导入数据
Apache HBase 是目前大数据系统中应用最为广泛的分布式数据库之一。我们经常面临向 HBase 中导入大量数据的情景,通常会选择使用标准的客户端 API 对 HBase 进行直接的操作,或者在MapReduce作业中使用 TableOutputFormat 作为输出。实际上,借助 HBase 的 Bulk Load 特性可以更加便捷、快速地向HBase数据库中导入数据。
MapReduce 在写入 HBase 时常采用 TableOutputFormat 方式,直接写入 HBase,但该方式在大量数据写入时效率比较低下(频繁进行 flush、split、compat等I/O操作),并对 HBase 节点稳定性造成影响( RegionServer 无响应)。
HBase的数据实际上是以特定格式存储在 HDFS 上的,因而 Bulk Load 就是先将数据按照HBase的内部数据格式生成持久化的 HFile 文件,然后复制到合适的位置并通知 RegionServer ,即完成巨量数据的入库。在生成 HFile 时无需占用 Region 资源,降低了 HBase 节点的写入压力,在大量数据写入时能极大地提高写入效率。
Bulk Load 简介
使用 Bulk Load 特性将数据导入 HBase 通常需要分为三个阶段:
通常需要导入的外部数据都是存储在其它的关系型数据库或一些文本文件中,我们需要将数据提取出来并放置于 HDFS 中。借助 Sqoop 这一工具可以解决大多数关系型数据库向 HDFS 迁移数据的问题。
通过 MapReduce 任务生成 HFile
在进行数据导入时,需要对数据进行预处理,如过滤无效数据、数据格式转换等。通常按照不同的导入要求,需要编写不同的 Mapper;Reducer 由 HBase 负责处理。为了按照 HBase 内部存储格式生成数据,一个重要的类是 HFileOutputFormat2
(HBase 1.0.0以前版本使用 HFileOutputFormat
)。为了更有效地导入数据, 每一个输出的 HFile 要恰好适应一个 Region。为了确保这一点, 需要使用 TotalOrderPartitioner
类将 map 的输出切分为 key 互不相交的部分。HFileOutputFormat2
类中的 configureIncrementalLoad()
方法会依据当前表中的 Region 边界自动设置 TotalOrderPartitioner
一旦数据准备好,就可以使用 completebulkload
工具将生成的 HFile 导入HBase 集群中。completebulkload
是一个命令行工具,对生成的 HFile 文件迭代进行处理,对每一个 HFile, 确定所属的 region, 然后联系对应的 RegionServer, 将数据移动至相应的存储路径。
如果在准备数据过程中,或者在使用 completebulkload
导入数据过程中, region 的边界发生了改变(split), completebulkload
除了使用 completebulkload
工具外,也可以在程序中完成, LoadIncrementalHFiles
Bulk Load实例
这里给出一个简单的例子,旨在说明如何使用 MapReduce 和 Bulk Load 将数据导入到HBase中。这里不介绍如何将数据迁移至 HDFS 中,重点关注 HFile 的生成及载入。
创建 MapReduce 作业
运行 MapReduce 作业
定制 Mapper 类,负责对数据进行预处理,如过滤,转换等。
一些辅助方法。这里给出如何在程序中直接使用 Bulk Load,而无需通过命令行工具。
MapReduce 作业生成的文件存放在 HDFS 上时,其权限归运行 MapReduce 作业的用户所有。在使用 Bulk Load 导入数据时, 需要将权限赋给 hbase 用户。简单粗暴的方法就是将文件夹的权限改为“777”, 下面的方法实现了该功能。
方法中,MapReduce 作业的很多配置都自动完成了。从源码中可以看出,该方法中主要完成了以下几点:- 设置作业输出的 key、value 类为
- 设置作业的 OutputFormat 类为
- 根据作业 Map 的输出设置合适 Reduce 类。Map 输出 key 必须为 ImmutableBytesWritable,Value 类型为 分别为 KeyValue、 Put、和 Text,对应的Reducer 分别为
。 - 根据当前 region 数量确定 Reduce 的数量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
public static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, RegionLocator regionLocator) throws IOException { configureIncrementalLoad(job, tableDescriptor, regionLocator, HFileOutputFormat2.class); } static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, RegionLocator regionLocator, Class<? extends OutputFormat<?, ?>> cls) throws IOException, UnsupportedEncodingException { Configuration conf = job.getConfiguration(); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setOutputFormatClass(cls); // Based on the configured map output class, set the correct reducer to properly // sort the incoming values. // TODO it would be nice to pick one or the other of these formats. if (KeyValue.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(KeyValueSortReducer.class); } else if (Put.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(PutSortReducer.class); } else if (Text.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(TextSortReducer.class); } else { LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); } conf.setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName(), KeyValueSerialization.class.getName()); if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { // record this table name for creating writer by favored nodes LOG.info("bulkload locality sensitive enabled"); conf.set(OUTPUT_TABLE_NAME_CONF_KEY, regionLocator.getName().getNameAsString()); } // Use table's region boundaries for TOP split points. LOG.info("Looking up current regions for table " + regionLocator.getName()); List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator); LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count"); job.setNumReduceTasks(startKeys.size()); configurePartitioner(job, startKeys); // Set compression algorithms based on column families configureCompression(conf, tableDescriptor); configureBloomType(tableDescriptor, conf); configureBlockSize(tableDescriptor, conf); configureDataBlockEncoding(tableDescriptor, conf); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); LOG.info("Incremental table " + regionLocator.getName() + " output configured."); }
- 设置作业输出的 key、value 类为
Reduce 没有 setNumReduceTasks 是因为,该设置是根据该表当前 region 数量自动配置的。在建表时应当做好 region 的预切分,
方法会根据 region 的数量来决定 reduce 的数量以及每个 reduce 覆盖的 rowkey 范围。否则当单个 reduce 过大时,任务处理不均衡。completebulkload
工具使用方法:hadoop jar hbase-server-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] /path/to/output table
