HBase高级功能
原创大约 9 分钟
列族设置
- 如果需要定时从HBase中删除过期数据,可以通过设置列族的
TTL
(过期失效时间,单位为秒)实现。下面的操作给表user
的列族info
设置了TTL
,超过这一时间后,数据将会在HFile
大合并时被自动删除。
hbase:001:0> create 'user1', {NAME => 'info', TTL => '3600'}
hbase:001:0> desc 'user1'
Table user1 is ENABLED
user
COLUMN FAMILIES DESCRIPTION
{NAME => 'info', ..., TTL => '600 SECONDS (10 MINUTES)', ...}
1 row(s)
Quota is disabled
Took 0.0381 seconds
- 目前的HBase版本,每个列族默认只有1个
version
,在HFile
大合并时会遗弃过期的版本。
hbase:001:0> create 'user2', {NAME => 'info', VERSIONS => 10}
hbase:001:0> desc 'user2'
Table user2 is ENABLED
user
COLUMN FAMILIES DESCRIPTION
{NAME => 'info', ..., VERSIONS => '10', ...}
1 row(s)
Quota is disabled
Took 0.0233 seconds
- 将
HFile
压缩存放有助于节省硬盘I/O
,压缩也是表定义的一部分。
hbase:001:0> create 'user3', {NAME => 'info', COMPRESSION => 'SNAPPY'}
- 可以通过设置数据块的大小(
BLOCKSIZE
)来改变查询的方式。数据块越小,索引越大,查找性能更好;而更大的数据块则会执行更好的顺序扫描,这需要根据具体的业务需求场景而定。
hbase:001:0> create 'user3', {NAME => 'info', BLOCKSIZE => '65537'}
- 如果某张表很少被访问,那么可以选择关闭列族的缓存,腾出更多缓存空间给其他表使用。
hbase:001:0> create 'user4', {NAME => 'info', BLOCKCACHE => 'false'}
HBase中存储额外的索引会占用额外的空间。
BloomFilter
(布隆过滤器)会随着它们索引对象的数据的增长而增长,但当空间不是问题的时候,BloomFilter
可以最大化系统的性能潜力。BloomFilter
参数的默认值是ROW
,表示行级布隆过滤器
。而ROWCOL
为列级布隆过滤器
。行级布隆过滤器
在数据块里检查特定RowKey
是否不存在,列级布隆过滤器
检查行和列标识符联合体是否不存在。所以,ROWCOL
的开销要高于ROW
。
hbase:001:0> create 'user5', {NAME => 'info', BLOOMFILTER => 'ROWCOL'}
扫描和过滤
HBase全表扫描的作用相当于MySQL中的SELECT *
,它可以通过一些过滤条件执行查询,但一般都是通过API调用的方式来实现。
这是通过API执行Scan
和Filter
的官方文档。
方法 | 解释 |
---|---|
scan.addFamily() | 指定要扫描的列族 |
scan.addColumn() | 指定要扫描的列,如果不指定会返回所有列 |
scan.readAllVersions() | 扫描所有版本数据 |
scan.readVersions(10) | 扫描最新的10个版本数据 |
scan.setTimeRange() | 指定扫描的最大时间戳和最小时间戳范围 |
scan.setTimestamp() | 扫描指定的时间戳 |
scan.setFilter() | 指定过滤器来过滤掉不需要的信息 |
scan.withStartRow() | 指定开始的行,如果不指定则从头开始 |
scan.withStopRow() | 指定结束的行(不包括此行) |
scan.setBatch() | 指定扫描的最大CELL数量,防止数据过大导致内存溢出 |
scan.setCaching() | 指定扫描时每次连接返回的数据条数,默认为1,适当调大可以提升性能 |
package com.itechthink.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.List;
/**
* 执行全表扫描Scan + Filter
* 首先需要创建表,然后再初始化数据
*
*/
public class HBaseScanFilter {
public static void main(String[] args) throws Exception {
// 获取配置
Configuration conf = HBaseConfiguration.create();
// 指定HBase使用的zk的地址,多个用逗号隔开
conf.set("hbase.zookeeper.quorum", "hadoop:2181");
// 指定HBase在hdfs上的根目录
conf.set("hbase.rootdir", "hdfs://hadoop:9000/hbase");
// 创建HBase连接,负责对HBase中的数据的一些增删改查(DML操作)
Connection conn = ConnectionFactory.createConnection(conf);
// 获取Table对象,指定要操作的表名,表需要提前创建好
Table table = conn.getTable(TableName.valueOf("user"));
Scan scan = new Scan();
// 范围查询:指定查询区间,提高查询性能
// 这是一个左闭右开的区间,也就是查询的结果中包含左边的,不包含右边的
scan.withStartRow(Bytes.toBytes("a"));
scan.withStopRow(Bytes.toBytes("f"));
// 添加Filter对数据进行过滤:使用RowFilter进行过滤,获取Rowkey小于等于d的数据
Filter filter = new RowFilter(CompareOperator.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("d")));
scan.setFilter(filter);
// 获取查询结果
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
List<Cell> cells = result.listCells();
// RowKey
byte[] row_key = result.getRow();
for (Cell cell : cells) {
// 列族
byte[] famaily_bytes = CellUtil.cloneFamily(cell);
// 列
byte[] column_bytes = CellUtil.cloneQualifier(cell);
// 值
byte[] value_bytes = CellUtil.cloneValue(cell);
System.out.println("Rowkey:" + new String(row_key) + ",列族:" + new String(famaily_bytes) + ",列:" + new String(column_bytes) + ",值:" + new String(value_bytes));
}
System.out.println("=============================================");
}
// 关闭连接
table.close();
conn.close();
}
}
批量导入
如果想将MySQL中的数据存储到HBase,可以通过put
命令一条条地导入,但这样效率太低。更好的做法是通过API
实现数据的批量导入。
- 先将从MySQL导出的原始数据文件
hbase_import.dat
上传到HDFS。
> hdfs dfs -put /home/work/volumes/hbase/data/hbase_import.dat /hbase/data/hbase_import.dat
- 在HBase中创建需要用到的表
t_batch
,表中存在1个列族c1
。
hbase:001:0> create 't_batch', 'c1'
- 利用
MapReduce
实现批量导入。
package com.itechthink.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
/**
* 利用MapReduce实现批量导入
* 在map阶段,把数据封装成Put操作,直接将数据入库
* 需要提前创建表t_batch1
* create 't_batch1','c1'
*
*/
public class BatchImportByMapReduce {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
// 如果传递的参数不够,程序直接退出
System.exit(0);
}
// hdfs://hadoop:9000/hbase/data/hbase_import.dat
String inPath = args[0];
// t_batch1
String outTableName = args[1];
// 设置属性对应参数
Configuration conf = new Configuration();
conf.set("hbase.table.name", outTableName);
conf.set("hbase.zookeeper.quorum", "hadoop:2181");
// 封装Job
Job job = Job.getInstance(conf, "Batch Import HBase Table:" + outTableName);
job.setJarByClass(BatchImportByMapReduce.class);
// 指定输入路径
FileInputFormat.setInputPaths(job, new Path(inPath));
// 指定map相关的代码
job.setMapperClass(BatchImportMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Put.class);
TableMapReduceUtil.initTableReducerJob(outTableName, null, job);
TableMapReduceUtil.addDependencyJars(job);
// 禁用Reduce
job.setNumReduceTasks(0);
job.waitForCompletion(true);
}
/**
* 封装Mapper
*
*/
public static class BatchImportMapper extends Mapper<LongWritable, Text, NullWritable, Put> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] strs = value.toString().split("\t");
if (strs.length == 4) {
String rowkey = strs[0];
String columnFamily = strs[1];
String name = strs[2];
String val = strs[3];
Put put = new Put(rowkey.getBytes());
put.addColumn(columnFamily.getBytes(), name.getBytes(), val.getBytes());
context.write(NullWritable.get(), put);
}
}
}
}
- 利用
BulkLoad
实现批量导入。
package com.itechthink.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 利用BulkLoad实现批量导入
* 首先使用MapReduce直接生成HBase的底层存储文件HFile,再通过BulkLoad将HFile文件加载到表中
* 需要提前创建表t_batch2
* create 't_batch2','c1'
*
*/
public class BatchImportByBulkLoad {
public static void main(String[] args) throws Exception {
if (args.length != 3) {
// 如果传递的参数不够,程序直接退出
System.exit(0);
}
// hdfs://hadoop:9000/hbase/data/hbase_import.dat
String inPath = args[0];
// hdfs://hadoop:9000/hbase/data/hbase_out
String outPath = args[1];
// t_batch2
String outTableName = args[2];
// 设置属性对应参数
Configuration conf = new Configuration();
conf.set("hbase.table.name", outTableName);
conf.set("hbase.zookeeper.quorum", "hadoop:2181");
// 封装Job
Job job = Job.getInstance(conf, "Batch Import HBase Table:" + outTableName);
job.setJarByClass(BatchImportByBulkLoad.class);
// 指定输入路径
FileInputFormat.setInputPaths(job, new Path(inPath));
// 指定输出路径[如果输出路径存在,就将其删除]
FileSystem fs = FileSystem.get(conf);
Path output = new Path(outPath);
if (fs.exists(output)) {
fs.delete(output, true);
}
FileOutputFormat.setOutputPath(job, output);
// 指定map相关的代码
job.setMapperClass(BulkLoadMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
// 禁用Reduce
job.setNumReduceTasks(0);
// 连接池
Connection connection = ConnectionFactory.createConnection(conf);
TableName tableName = TableName.valueOf(outTableName);
HFileOutputFormat2.configureIncrementalLoad(job, connection.getTable(tableName), connection.getRegionLocator(tableName));
job.waitForCompletion(true);
}
/**
* 封装Mapper
*
*/
public static class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] strs = value.toString().split("\t");
if (strs.length == 4) {
String rowkey = strs[0];
String columnFamily = strs[1];
String name = strs[2];
String val = strs[3];
ImmutableBytesWritable rowkeyWritable = new ImmutableBytesWritable(rowkey.getBytes());
Put put = new Put(rowkey.getBytes());
put.addColumn(columnFamily.getBytes(), name.getBytes(), val.getBytes());
context.write(rowkeyWritable, put);
}
}
}
}
批量导出
除了将数据导入到HBase,在某些场景下,也需要将数据从HBase导出到其他数据库。
和导入一样,导出数据也有两种方式。
- 利用HBase自带的
TableMapReduceUtil
工具类实现数据导出。
package com.itechthink.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 利用TableMapReduceUtil实现将数据批量导出
*
*/
public class BatchExportByTableMapReduceUtil {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
// 如果传递的参数不够,程序直接退出
System.exit(0);
}
// t_batch1
String inTableName = args[0];
// hdfs://hadoop:9000/hbase/export
String outPath = args[1];
// 设置属性对应参数
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "hadoop:2181");
// 组装Job
Job job = Job.getInstance(conf);
job.setJarByClass(BatchExportByTableMapReduceUtil.class);
// 设置map相关的配置
job.setMapperClass(BatchExportMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 禁用Reduce
job.setNumReduceTasks(0);
// 导出为文本文件
TableMapReduceUtil.initTableMapperJob(inTableName, new Scan(), BatchExportMapper.class, Text.class, Text.class, job);
// 设置输出路径
FileOutputFormat.setOutputPath(job, new Path(outPath));
job.waitForCompletion(true);
}
/**
* 继承TableMapper
*
*/
public static class BatchExportMapper extends TableMapper<Text, Text> {
@Override
protected void map(ImmutableBytesWritable key, Result result, Context context) throws IOException, InterruptedException {
// key在这里就是hbase的RowKey
// result是scan返回的每行结果
byte[] name = null;
byte[] age = null;
try {
name = result.getValue("c1".getBytes(), "name".getBytes());
} catch (Exception e) {
e.printStackTrace();
}
try {
age = result.getValue("c1".getBytes(), "age".getBytes());
} catch (Exception e) {
e.printStackTrace();
}
String v2 = ((name == null || name.length == 0) ? "NULL" : new String(name)) + "\t" + ((age == null || age.length == 0) ? "NULL" : new String(age));
context.write(new Text(key.get()), new Text(v2));
}
}
}
- 利用HBase内部提供
Export
工具类实现数据导出。但这种方式导出的数据格式是固定的,数据<key, value>
中的key
和value
分别代表的是ImmutableBytesWritable
和Result
。
# 导出数据
> hbase org.apache.hadoop.hbase.mapreduce.Export t_batch1 hdfs://hadoop:9000/hbase/export
# 查看导出结果
> hdfs dfs -cat /hbase/export/*
封装操作
通过代码将常见操作封装成API
调用。
package com.itechthink.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.ArrayList;
import java.util.List;
import java.io.IOException;
/**
* HBaseUtil工具类
*
*/
public class HBaseUtil {
private HBaseUtil() {
}
private static final Connection connection = getConnection();
private static Connection getConnection() {
// 获取hbase的链接
Configuration conf = new Configuration();
// 指定hbase使用的zk地址
//注意:需要在执行hbasejava代码的机器上配置zk和hbase集群的主机名和ip的映射关系
conf.set("hbase.zookeeper.quorum", "bigdata01:2181");
// 指定hbase在hdfs上的根目录
conf.set("hbase.rootdir", "hdfs://hadoop:9000/hbase");
// 创建HBase数据库链接
Connection connection = null;
try {
connection = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
System.out.println("获取链接失败:" + e.getMessage());
}
return connection;
}
/**
* 对外提供的方法
*
*/
public static Connection getInstance() {
return connection;
}
/**
* 创建表
*
*/
public static void createTable(String tableName, String... cfs) throws Exception {
Admin admin = connection.getAdmin();
List<ColumnFamilyDescriptor> cfArr = new ArrayList<>();
for (String cf : cfs) {
ColumnFamilyDescriptor cfDesc = ColumnFamilyDescriptorBuilder
.newBuilder(Bytes.toBytes(cf))
.build();
cfArr.add(cfDesc);
}
TableDescriptor tableDesc = TableDescriptorBuilder
.newBuilder(TableName.valueOf(tableName))
.setColumnFamilies(cfArr)
.build();
admin.createTable(tableDesc);
admin.close();
}
/**
* 添加一个列的数据
*
*/
public static void put2HBaseCell(String tableName, String rowKey, String columnFamily, String column, String value) throws Exception {
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
table.put(put);
table.close();
}
/**
* 向hbase中添加一批数据
*
*/
public static void put2HBaseList(String tableName, List<Put> list) throws Exception {
Table table = connection.getTable(TableName.valueOf(tableName));
table.put(list);
table.close();
}
public static void main(String[] args) throws Exception {
HBaseUtil.put2HBaseCell("user", "10000", "info", "name", "lixingyun");
}
}
感谢支持
更多内容,请移步《超级个体》。