7.3.2 Iceberg¶
自 2.1.6 版本开始, Apache Doris 支持对 Iceberg 的 DDL 和 DML 操作。用户可以直接通过 Apache Doris 在 Iceberg 中创建库表,并将数据写入到 Iceberg 表中。通过该功能,用户可以通过 Apache Doris 对 Iceberg 进行完整的数据查询和写入操作,进一步帮助用户简化湖仓一体架构。
本文介绍在 Apache Doris 中支持的 Iceberg 操作,语法和使用须知。
Tip
这是一个实验功能。
Tip
使用前,请先设置:
set global enable_nereids_planner = true;
set global enable_fallback_to_original_planner = false;
1 元数据创建与删除¶
1.1 Catalog¶
-
创建
SQL 1 2 3 4 5 6 7 8
CREATE CATALOG [IF NOT EXISTS] iceberg PROPERTIES ( "type" = "iceberg", "iceberg.catalog.type" = "hms", "hive.metastore.uris" = "thrift://172.21.16.47:7004", "warehouse" = "hdfs://172.21.16.47:4007/user/hive/warehouse/", "hadoop.username" = "hadoop", "fs.defaultFS" = "hdfs://172.21.16.47:4007" );上面主要演示了如何在
Apache Doris中创建HMS Iceberg Catalog。Apache Doris目前支持多种类型的Iceberg Catalog。更多配置,请参阅Iceberg Catalog注意:
-
如果需要通过
Apache Doris的Hive Catalog创建Iceberg表或写入数据,需要在Catalog属性中显式增加fs.defaultFS属性以及warehouse属性。如果创建Catalog仅用于查询,则这两个参数可以省略。 -
Hive Catalog可以查询Iceberg表,但是不能在Hive Catalog中创建Iceberg表。
-
-
删除
SQL 1DROP CATALOG [IF EXISTS] iceberg;删除
Catalog并不会删除Iceberg中的任何库表信息。仅仅是在Apache Doris中移除了对这个Iceberg Catalog的映射。
1.2 Database¶
-
创建
可以通过
SWITCH语句切换到对应的Catalog下,执行CREATE DATABASE语句:SQL 1 2
SWITCH iceberg; CREATE DATABASE [IF NOT EXISTS] iceberg_db;也可以使用全限定名创建,或指定
location,如:SQL 1CREATE DATABASE [IF NOT EXISTS] iceberg.iceberg_db;之后可以通过
SHOW CREATE DATABASE命令可以查看Database的相关信息:SQL 1 2 3 4 5 6
mysql> SHOW CREATE DATABASE iceberg_db; +------------+------------------------------+ | Database | Create Database | +------------+------------------------------+ | iceberg_db | CREATE DATABASE `iceberg_db` | +------------+------------------------------+ -
删除
SQL 1DROP DATABASE [IF EXISTS] iceberg.iceberg_db;Warning
对于
Iceberg Database,必须先删除这个Database下的所有表后,才能删除Database,否则会报错。这个操作会同步删除Iceberg中对应的Database。
1.3 Table¶
-
创建
Apache Doris支持在Iceberg中创建分区或非分区表。SQL 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
-- Create unpartitioned iceberg table CREATE TABLE unpartitioned_table ( `col1` BOOLEAN COMMENT 'col1', `col2` INT COMMENT 'col2', `col3` BIGINT COMMENT 'col3', `col4` FLOAT COMMENT 'col4', `col5` DOUBLE COMMENT 'col5', `col6` DECIMAL(9,4) COMMENT 'col6', `col7` STRING COMMENT 'col7', `col8` DATE COMMENT 'col8', `col9` DATETIME COMMENT 'col9' ) ENGINE=iceberg PROPERTIES ( 'write-format'='parquet' ); -- Create partitioned iceberg table -- The partition columns must be in table's column definition list CREATE TABLE partition_table ( `ts` DATETIME COMMENT 'ts', `col1` BOOLEAN COMMENT 'col1', `col2` INT COMMENT 'col2', `col3` BIGINT COMMENT 'col3', `col4` FLOAT COMMENT 'col4', `col5` DOUBLE COMMENT 'col5', `col6` DECIMAL(9,4) COMMENT 'col6', `col7` STRING COMMENT 'col7', `col8` DATE COMMENT 'col8', `col9` DATETIME COMMENT 'col9', `pt1` STRING COMMENT 'pt1', `pt2` STRING COMMENT 'pt2' ) ENGINE=iceberg PARTITION BY LIST (DAY(ts), pt1, pt2) () PROPERTIES ( 'write-format'='orc', 'compression-codec'='zlib' );创建后,可以通过
SHOW CREATE TABLE命令查看Iceberg的建表语句。关于分区表的分区函数,可以参阅后面的【分区】小节。 -
列类型
在
Apache Doris中创建Iceberg表所使用的列类型,和Iceberg中的列类型对应关系如下Apache Doris Iceberg BOOLEAN BOOLEAN INT INT BIGINT BIGINT FLOAT FLOAT DOUBLE DOUBLE DECIMAL DECIMAL STRING STRING DATE DATE DATETIME TIMESTAMP ARRAY ARRAY MAP MAP STRUCT STRUCT -
注意:目前只支持这些数据类型,其它数据类型会报错。
-
列类型暂时只能为默认的
Nullable,不支持NOT NULL。 -
插入数据后,如果类型不能够兼容,例如
'abc'插入到数值类型,则会转为null值后插入。
-
-
删除
可以通过
DROP TABLE语句删除一个Iceberg表。当前删除表后,会同时删除数据,包括分区数据。 -
分区
Iceberg中的分区类型对应Apache Doris中的List分区。因此,在Apache Doris中创建Iceberg分区表,需使用List分区的建表语句,但无需显式的枚举各个分区。在写入数据时,Apache Doris会根据数据的值,自动创建对应的Iceberg分区。-
支持创建单列或多列分区表。
-
支持分区转换函数来支持
Iceberg隐式分区以及分区演进的功能。具体Iceberg分区转换函数可以查看Iceberg partition transforms-
year(ts)或者years(ts) -
month(ts)或者months(ts) -
day(ts)或者days(ts)或者date(ts) -
hour(ts)或者hours(ts)或者date_hour(ts) -
bucket(N, col) -
truncate(L, col)
-
-
-
文件格式
-
Parquet(默认) -
ORC
-
-
压缩格式
-
Parquet:snappy,zstd(默认),plain。(plain就是不采用压缩) -
ORC:snappy,zlib(默认),zstd,plain。(plain就是不采用压缩)
-
-
存储介质
-
HDFS -
对象存储
-
2 数据操作¶
可以通过 INSERT 语句将数据写入到 Iceberg 表中。
支持写入到由 Apache Doris 创建的 Iceberg 表,或者 Iceberg 中已存在的且格式支持的表。
对于分区表,会根据数据,自动写入到对应分区,或者创建新的分区。
目前不支持指定分区写入。
2.1 INSERT¶
INSERT 操作会数据以追加的方式写入到目标表中。
| SQL | |
|---|---|
1 2 3 4 5 | |
2.2 INSERT OVERWRITE¶
INSERT OVERWRITE 会使用新的数据完全覆盖原有表中的数据。
| SQL | |
|---|---|
1 2 | |
2.3 CTAS(CREATE TABLE AS SELECT)¶
可以通过 CTAS 语句创建 Iceberg 表并写入数据:
| SQL | |
|---|---|
1 | |
CTAS 支持指定文件格式、分区方式等信息,如:
| SQL | |
|---|---|
1 2 3 4 5 6 7 8 9 10 11 | |
3 异常数据和数据转换¶
TODO
3.1 HDFS 文件操作¶
在 HDFS 上的 Iceberg 表数据会写入到最终目录,提交 Iceberg 元数据进行管理。
写入的数据文件名称格式为: <query-id>_<uuid>-<index>.<compress-type>.<file-type>
3.2 对象存储文件操作¶
TODO
4 相关参数¶
4.1 FE¶
TODO
4.2 BE¶
| 参数名称 | 默认值 | 描述 |
|---|---|---|
| iceberg_sink_max_file_size | 最大的数据文件大小。当写入数据量超过该大小后会关闭当前文件,滚动产生一个新文件继续写入。 | 1GB |
| table_sink_partition_write_max_partition_nums_per_writer | BE 节点上每个 Instance 最大写入的分区数目。 | 128 |
| table_sink_non_partition_write_scaling_data_processed_threshold | 非分区表开始 scaling-write 的数据量阈值。每增加table_sink_non_partition_write_scaling_data_processed_threshold 数据就会发送给一个新的 writer(instance) 进行写入。scaling-write 机制主要是为了根据数据量来使用不同数目的 writer(instance) 来进行写入,会随着数据量的增加而增大写入的 writer(instance) 数目,从而提高并发写入的吞吐。当数据量比较少的时候也会节省资源,并且尽可能地减少产生的文件数目。 | 25MB |
| table_sink_partition_write_min_data_processed_rebalance_threshold | 分区表开始触发重平衡的最少数据量阈值。如果 当前累积的数据量 - 自从上次触发重平衡或者最开始累积的数据量 >= table_sink_partition_write_min_data_processed_rebalance_threshold,就开始触发重平衡机制。如果发现最终生成的文件大小差异过大,可以调小改阈值来增加均衡度。当然过小的阈值会导致重平衡的成本增加,可能会影响性能。 | 25MB |
| table_sink_partition_write_min_partition_data_processed_rebalance_threshold | 分区表开始进行重平衡时的最少的分区数据量阈值。如果 当前分区的数据量 >= 阈值 * 当前分区已经分配的 task 数目,就开始对该分区进行重平衡。如果发现最终生成的文件大小差异过大,可以调小改阈值来增加均衡度。当然过小的阈值会导致重平衡的成本增加,可能会影响性能。 | 15MB |