chunjun
chunjun copied to clipboard
[Feature][hbase] sql模式下新增特性
Search before asking
- [X] I had searched in the issues and found no similar feature requirement.
Description
sql模式下新增特性: 1、写入时支持配置rowkeyExpress 2、写入时支持配置hbase的时间戳,支持选项:当前时间(默认)、指定时间列名(例如:列簇:字段名)、指定具体时间 3、读取时支持配置客户端每次 rpc 从服务器端读取的列数,默认不限制。 4、读取时支持配置 rowKey起始点、rowKey结束点、rowkey是否是BytesBinary 5、解决sql模式下缺少Hbase2DynamicTableFactory 6、hbase 支持 multiVersionFixedColumn模式(竖表读取) 当是竖表读取时,声明 source table只能有四个字段,并且为固定的字段类型,参考下面声明:
CREATE TABLE source_hbase
(
`rowkey` VARCHAR,
`family_qualifier` VARCHAR,
`timestamp` bigint,
`value` VARCHAR
)
7、支持配置hadoop用户名,解决读写hbase权限问题 8、支持配置字段值为空时写入模式,SKIP:跳过,此字段不写入,EMPTY:空字节数组代替 9、优化当下游没有声明使用 rowkey 字段时报错问题
Caused by: java.lang.UnsupportedOperationException: No implementation provided for SupportsProjectionPushDown. Please implement SupportsProjectionPushDown#applyProjection(int[][], DataType)
Use case
---------------------------------------------------
-- hbase 命令:
-- 输入 `hbase shell` 进入hbase shell
-- list 查看全部表
-- describe 'tableName' 查看表结构
-- scan 'tableName' 查看表所有记录(全面扫描)
-- count 'tableName' 统计行数
-- alter '表名',{NAME=>'cf列族名',VERSIONS=>3} 修改表结构,让Hbase表支持存储3个VERSIONS的版本列数据
-- scan '表名',{VERSIONS=>5}
---------------------------------------------------
CREATE TABLE source_stream
(
rowkey VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP(3)
) WITH (
'connector' = 'stream-x'
,'number-of-rows' = '1'
);
CREATE TABLE source_hbase
(
rowkey VARCHAR,
cf ROW(item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3)),
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase2-x'
,'zookeeper.quorum' = 'xxx:2181'
,'zookeeper.znode.parent' = '/hbase'
-- 空值字符串代替,默认值:"null"
,'null-string-literal' = 'null'
-- 表名,支持带命名空间的表名,格式 namespace:table , 命名空间与表名之间是冒号连接的
,'table-name' = 'test_hbase'
-- rowKey起始点, 默认值:无
,'start-row-key' = 'a'
-- rowKey结束点, 默认值:无
,'end-row-key' = 'c'
-- rowkey是否是BytesBinary, 默认值:false
,'is-binary-row-key' = 'false'
-- 客户端rpc每次fetch最大行数, 默认值:1000
,'scan-cache-size' = '1000'
-- 客户端每次rpc从服务器端读取的列数, 默认值:不限制-1
-- ,'scan-batch-size' = '2'
-- 传入hadoop账号读取\写入无权限问题
,'properties.hadoop.user.name' = 'hdp-test'
-- 读取HBase的模式,支持normal模式和multiVersionFixedColumn模式。默认:normal
,'mode' = 'multiVersionFixedColumn'
-- 指定在多版本模式下的HBase Reader读取的版本数,取值只能为-1或大于1的数字,-1表示读取所有版本。
,'max-version' = '5'
-- hbase kerberos 配置
, 'properties.hbase.security.authorization' = 'Kerberos'
, 'properties.hbase.security.authentication' = 'Kerberos'
, 'properties.hbase.security.auth.enable' = 'true'
-- kerberos 使用sftp远程文件
,'properties.remoteDir' = '/data/kerberos'
,'properties.java.security.krb5.conf' = 'krb5.conf'
,'properties.principalFile' = 'hbase.keytab'
,'properties.sftpConf' = '{"username":"xxx", "password":"xx", "host":"xxx", "port":"22"}'
,'properties.principal' = 'xx/[email protected]'
-- 以下参数 解决 Can't get Master Kerberos principal for use as renewer
,'properties.yarn.resourcemanager.principal' = 'xx/[email protected]'
-- 以下参数 解决 org.apache.hadoop.hdfs.DFSClient - Failed to connect to /xxxx for block, add to deadNodes and continue. java.io.IOException: 远程主机强迫关闭了一个现有的连接。
-- 以下参数 解决 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Failed to read expected SASL data transfer protection handshake from client at /xxxxx. Perhaps the client is running an older version of Hadoop which does not support SASL data transfer protection
,'properties.dfs.data.transfer.protection' = 'authentication'
);
CREATE TABLE sink_hbase
(
rowkey VARCHAR,
cf ROW(item_id VARCHAR, category_id string, behavior VARCHAR, ts TIMESTAMP(3))
,PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase2-x'
-- zk地址
,'zookeeper.quorum' = 'xxxx:2181'
-- hbase在zk的路径
,'zookeeper.znode.parent' = '/hbase'
-- 描述:每个写请求缓冲行的最大内存大小。这样可以提高HBase写数据的性能,但可能会增加时延。可以设置为'0'来禁用它。默认值:2mb
-- ,'sink.buffer-flush.max-size' = '1000'
-- 描述:每个写入请求要缓冲的最大行数。这样可以提高HBase写数据的性能,但可能会增加时延。可以设置为'0'来禁用它。默认值:1000
,'sink.buffer-flush.max-rows' = '1000'
-- 描述:批量写时间间隔,单位:毫秒, 默认值:10000
,'sink.buffer-flush.interval' = '2000'
-- 表名
,'table-name' = 'test_hbase'
-- 用于构造rowkey的描述信息,采用字符串格式,形式如下 字符串格式为:$(cf:col),
-- 可以多个字段组合:$(cf:col1)_$(cf:col2), 可以使用md5函数:md5($(cf:col))
,'rowkey-express' = 'md5($(cf:item_id)_$(cf:category_id))'
-- 描述:指定写入hbase的时间戳。支持:当前时间、指定时间列,指定时间,三者选一。
-- 指定时间列簇:列名
,'version-column-name' = 'cf:behavior'
-- 指定具体时间
-- ,'version-column-value' = '2024-02-23 10:10:10'
-- 字段值为空时写入模式,可选:SKIP:跳过,此字段不写入,EMPTY:空字节数组代替,默认值:SKIP
,'null-mode' = 'SKIP'
);
CREATE TABLE sink_stream
(
rowkey VARCHAR,
-- cf ROW(item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3))
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP(3)
) WITH (
'connector' = 'stream-x'
);
-- 测试向hbase写入数据
-- insert into sink_hbase
-- SELECT rowkey, ROW(item_id, category_id, behavior, ts ) as cf
-- from source_stream;
-- 指定rowkey具体值
-- insert into sink_hbase
-- SELECT rowkey, ROW(item_id, category_id, behavior, ts ) as cf
-- from (select
-- CAST('a' as string) as rowkey,
-- CAST('a-item_id' as string) as item_id,
-- CAST('a-category_id' as string) as category_id,
-- CAST('a-behavior' as string) as behavior,
-- CAST('2024-02-21 17:10:10' as timestamp(3)) as ts);
-- insert into sink_hbase
-- SELECT rowkey, ROW(item_id, category_id, behavior, ts ) as cf
-- from (select
-- CAST('a' as string) as rowkey,
-- CAST('a-item_id-7' as string) as item_id,
-- -- CAST('1708654215000' as string) as category_id,
-- CAST('a-category_id-7' as string) as category_id,
-- CAST('2024-02-23 10:10:15' as string) as behavior,
-- CAST('2024-02-23 10:10:13' as timestamp(3)) as ts);
-- 测试从hbase读取数据
-- insert into sink_stream select * from source_hbase;
insert into sink_stream select rowkey, cf.item_id, cf.category_id, cf.behavior, cf.ts from source_hbase;
Related issues
No response
Are you willing to submit a PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct