目录

0. 相关文章链接

1. 创建表

1.1. 启动spark-sql

1.2. 建表参数

1.3. 创建非分区表

1.4. 创建分区表

1.5. 在已有的hudi表上创建新表

1.6. 通过CTAS (Create Table As Select)建表

2. 插入数据

2.1. 向非分区表插入数据

2.2. 向分区表动态分区插入数据

2.3. 向分区表静态分区插入数据

2.4. 使用bulk_insert插入数据

3. 查询数据

3.1. 查询

3.2. 时间旅行查询

4. 更新数据

4.1. update

4.2. MergeInto

5. 删除数据

6. 覆盖数据

7. 修改表结构(Alter Table)

8. 修改分区

9. 存储过程(Procedures)


0. 相关文章链接

 Hudi文章汇总 

1. 创建表

1.1. 启动spark-sql

# 启动spark-sql之前需要先启动Hive的Metastore
nohup hive --service metastore & 

#针对Spark 3.2
spark-sql \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

# 如果没有配置hive环境变量,手动拷贝hive-site.xml到spark的conf下

1.2. 建表参数

参数名

默认值

说明

primaryKey

uuid

表的主键名,多个字段用逗号分隔。

同 hoodie.datasource.write.recordkey.field

preCombineField

表的预合并字段。

同 hoodie.datasource.write.precombine.field

type

cow

创建的表类型:

type = 'cow'

type = 'mor'

同hoodie.datasource.write.table.type

1.3. 创建非分区表

  • 创建一个cow表,默认primaryKey 'uuid',不提供preCombineField
create table hudi_cow_nonpcf_tbl (
  uuid int,
  name string,
  price double
) using hudi;
  • 创建一个mor非分区表
create table hudi_mor_tbl (
  id int,
  name string,
  price double,
  ts bigint
) using hudi
tblproperties (
  type = 'mor',
  primaryKey = 'id',
  preCombineField = 'ts'
);

1.4. 创建分区表

创建一个cow分区外部表,指定primaryKey和preCombineField

create table hudi_cow_pt_tbl (
  id bigint,
  name string,
  ts bigint,
  dt string,
  hh string
) using hudi
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl';

1.5. 在已有的hudi表上创建新表

不需要指定模式和非分区列(如果存在)之外的任何属性,Hudi可以自动识别模式和配置。

  • 非分区表
create table hudi_existing_tbl0 
using hudi
location 'file:///tmp/hudi/dataframe_hudi_nonpt_table';
  • 分区表
create table hudi_existing_tbl1 
using hudi
partitioned by (dt, hh)
location 'file:///tmp/hudi/dataframe_hudi_pt_table';

1.6. 通过CTAS (Create Table As Select)建表

为了提高向hudi表加载数据的性能,CTAS使用批量插入作为写操作。

  • 通过CTAS创建cow非分区表,不指定preCombineField 
create table hudi_ctas_cow_nonpcf_tbl
using hudi
tblproperties (primaryKey = 'id')
as
select 
    1 as id
    , 'a1' as name
    , 10 as price
;
  • 通过CTAS创建cow分区表,指定preCombineField
create table hudi_ctas_cow_pt_tbl
using hudi
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts')
partitioned by (dt)
as
select 
    1 as id
    , 'a1' as name
    , 10 as price
    , 1000 as ts
    , '2021-12-01' as dt
;
  • 通过CTAS从其他表加载数据
# 创建内部表
create table parquet_mngd 
using parquet 
location 'file:///tmp/parquet_dataset/*.parquet';

# 通过CTAS加载数据
create table hudi_ctas_cow_pt_tbl2 
using hudi 
location 'file:/tmp/hudi/hudi_tbl/' 
options (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
)
partitioned by (datestr) 
as 
select * from parquet_mngd
;

2. 插入数据

默认情况下,如果提供了preCombineKey,则insert into的写操作类型为upsert,否则使用insert。

2.1. 向非分区表插入数据

insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20;
insert into hudi_mor_tbl select 1, 'a1', 20, 1000;

2.2. 向分区表动态分区插入数据

insert into hudi_cow_pt_tbl partition (dt, hh)
select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh;

2.3. 向分区表静态分区插入数据

insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') select 2, 'a2', 1000;

2.4. 使用bulk_insert插入数据

hudi支持使用bulk_insert作为写操作的类型,只需要设置两个配置:

hoodie.sql.bulk.insert.enable 和 hoodie.sql.insert.mode

-- 向指定preCombineKey的表插入数据,则写操作为upsert
insert into hudi_mor_tbl select 1, 'a1_1', 20, 1001;
select id, name, price, ts from hudi_mor_tbl;
1   a1_1    20.0    1001

-- 向指定preCombineKey的表插入数据,指定写操作为bulk_insert 
set hoodie.sql.bulk.insert.enable=true;
set hoodie.sql.insert.mode=non-strict;

insert into hudi_mor_tbl select 1, 'a1_2', 20, 1002;
select id, name, price, ts from hudi_mor_tbl;
1   a1_1    20.0    1001
1   a1_2    20.0    1002

3. 查询数据

3.1. 查询

select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0

3.2. 时间旅行查询

Hudi从0.9.0开始就支持时间旅行查询。Spark SQL方式要求Spark版本 3.2及以上。

-- 关闭前面开启的bulk_insert
set hoodie.sql.bulk.insert.enable=false;

create table hudi_cow_pt_tbl1 (
  id bigint,
  name string,
  ts bigint,
  dt string,
  hh string
) using hudi
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl1';


-- 插入一条id为1的数据
insert into hudi_cow_pt_tbl1 select 1, 'a0', 1000, '2021-12-09', '10';
select * from hudi_cow_pt_tbl1;

-- 修改id为1的数据
insert into hudi_cow_pt_tbl1 select 1, 'a1', 1001, '2021-12-09', '10';
select * from hudi_cow_pt_tbl1;

-- 基于第一次提交时间进行时间旅行
select * from hudi_cow_pt_tbl1 timestamp as of '20220307091628793' where id = 1;

-- 其他时间格式的时间旅行写法
select * from hudi_cow_pt_tbl1 timestamp as of '2022-03-07 09:16:28.100' where id = 1;

select * from hudi_cow_pt_tbl1 timestamp as of '2022-03-08' where id = 1;

4. 更新数据

4.1. update

更新操作需要指定preCombineField。

  • 语法
UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression]
  • 执行更新
update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;

update hudi_cow_pt_tbl1 set name = 'a1_1', ts = 1001 where id = 1;

-- update using non-PK field
update hudi_cow_pt_tbl1 set ts = 1111 where name = 'a1_1';

4.2. MergeInto

  • 语法
MERGE INTO tableIdentifier AS target_alias
USING (sub_query | tableIdentifier) AS source_alias
ON <merge_condition>
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN NOT MATCHED [ AND <condition> ]  THEN <not_matched_action> ]

<merge_condition> =A equal bool condition 
<matched_action>  =
  DELETE  |
  UPDATE SET *  |
  UPDATE SET column1 = expression1 [, column2 = expression2 ...]
<not_matched_action>  =
  INSERT *  |
  INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])
  • 执行案例
-- 1、准备source表:非分区的hudi表,插入数据
create table merge_source (id int, name string, price double, ts bigint) using hudi
tblproperties (primaryKey = 'id', preCombineField = 'ts');
insert into merge_source values (1, "old_a1", 22.22, 2900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);

merge into hudi_mor_tbl as target
using merge_source as source
on target.id = source.id
when matched then update set *
when not matched then insert *
;


-- 2、准备source表:分区的parquet表,插入数据
create table merge_source2 (id int, name string, flag string, dt string, hh string) using parquet;
insert into merge_source2 values (1, "new_a1", 'update', '2021-12-09', '10'), (2, "new_a2", 'delete', '2021-12-09', '11'), (3, "new_a3", 'insert', '2021-12-09', '12');

merge into hudi_cow_pt_tbl1 as target
using (
  select id, name, '2000' as ts, flag, dt, hh from merge_source2
) source
on target.id = source.id
when matched and flag != 'delete' then
 update set id = source.id, name = source.name, ts = source.ts, dt = source.dt, hh = source.hh
when matched and flag = 'delete' then delete
when not matched then
 insert (id, name, ts, dt, hh) values(source.id, source.name, source.ts, source.dt, source.hh)
;

5. 删除数据

  • 语法:
DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION]
  • 案例:
delete from hudi_cow_nonpcf_tbl where uuid = 1;

delete from hudi_mor_tbl where id % 2 = 0;

-- 使用非主键字段删除
delete from hudi_cow_pt_tbl1 where name = 'a1_1';

6. 覆盖数据

  • 使用INSERT_OVERWRITE类型的写操作覆盖分区表
  • 使用INSERT_OVERWRITE_TABLE类型的写操作插入覆盖非分区表或分区表(动态分区)

1)insert overwrite 非分区表 

insert overwrite hudi_mor_tbl select 99, 'a99', 20.0, 900;
insert overwrite hudi_cow_nonpcf_tbl select 99, 'a99', 20.0;

2)通过动态分区insert overwrite table到分区表

insert overwrite table hudi_cow_pt_tbl1 select 10, 'a10', 1100, '2021-12-09', '11';

3)通过静态分区insert overwrite 分区表

insert overwrite hudi_cow_pt_tbl1 partition(dt = '2021-12-09', hh='12') select 13, 'a13', 1100;

7. 修改表结构(Alter Table)

  • 语法:
-- Alter table name
ALTER TABLE oldTableName RENAME TO newTableName

-- Alter table add columns
ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*)

-- Alter table column type
ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType

-- Alter table properties
ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value')
  • 案例:
--rename to:
ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2;

--add column:
ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string);

--change column:
ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid int;

--set properties;
alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');

8. 修改分区

  • 语法:
-- Drop Partition
ALTER TABLE tableIdentifier DROP PARTITION ( partition_col_name = partition_col_val [ , ... ] )

-- Show Partitions
SHOW PARTITIONS tableIdentifier
  • 案例:
--show partition:
show partitions hudi_cow_pt_tbl1;

--drop partition:
alter table hudi_cow_pt_tbl1 drop partition (dt='2021-12-09', hh='10');
  • 注意:show partition结果是基于文件系统表路径的。删除整个分区数据或直接删除某个分区目录并不精确。

9. 存储过程(Procedures)

  • 语法:
--Call procedure by positional arguments
CALL system.procedure_name(arg_1, arg_2, ... arg_n)

--Call procedure by named arguments
CALL system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1, ... arg_name_n => arg_n)
--show commit's info
call show_commits(table => 'hudi_cow_pt_tbl1', limit => 10);

注:其他Hudi相关文章链接由此进 ->  Hudi文章汇总 


Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐