查看原文
其他

Hudi查询类型/视图总结

董可伦 伦少的博客 2023-04-18

前言

上面文章Hive增量查询Hudi表提到Hudi表有读优化视图和实时视图,其实当时并没有完全掌握,所以现在单独学习总结。Hudi官网文档中文:https://hudi.apache.org/cn/docs/0.9.0/concepts/称之为视图,其实英文:https://hudi.apache.org/cn/docs/concepts/#views为query types翻译过来为查询类型

Query types

Hudi 支持下面三种视图

  • Snapshot Queries 快照查询/实时视图 Queries see the latest snapshot of the table as of a given commit or compaction action. In case of merge on read table, it exposes near-real time data(few mins) by merging the base and delta files of the latest file slice on-the-fly. For copy on write table, it provides a drop-in replacement for existing parquet tables, while providing upsert/delete and other write side features. 在此视图上的查询可以查看给定提交或压缩操作时表的最新快照。对于读时合并表(MOR表) 该视图通过动态合并最新文件切片的基本文件(例如parquet)和增量文件(例如avro)来提供近实时数据集(几分钟的延迟)。对于写时复制表(COW表),它提供了现有parquet表的插入式替换,同时提供了插入/删除和其他写侧功能。

  • Incremental Queries 增量查询/增量视图,也就是上篇文章讲的增量查询 Queries only see new data written to the table, since a given commit/compaction. This effectively provides change streams to enable incremental data pipelines. 对该视图的查询只能看到从某个提交/压缩后写入数据集的新数据。该视图有效地提供了更改流,来支持增量数据管道。

  • Read Optimized Queries 读优化查询/读优化视图 : Queries see the latest snapshot of table as of a given commit/compaction action. Exposes only the base/columnar files in latest file slices and guarantees the same columnar query performance compared to a non-hudi columnar table. 在此视图上的查询将查看给定提交或压缩操作中数据集的最新快照。该视图仅将最新文件切片中的基本/列文件暴露给查询,并保证与非Hudi列式数据集相比,具有相同的列式查询性能。

表类型

Table TypeSupported Query types
Copy On WriteSnapshot Queries + Incremental Queries
Merge On ReadSnapshot Queries + Incremental Queries + Read Optimized Queries

也就是读优化视图只有在MOR表中存在,这点在上篇文章中也提到过,这次会从源码层面分析两种表类型的区别以及如何实现的。
另外关于这一点官网中文文档写错了,大家注意别被误导,估计是因为旧版本,且中文文档没有人维护贡献,就没人贡献修改了~,稍后我有时间会尝试提个PR修复一下,错误截图:

2022.06.30更新:已提交PR https://github.com/apache/hudi/pull/6008

源码

简单从源码层面分析同步Hive表时两种表类型的区别,Hudi同步Hive元数据的工具类为HiveSyncTool,如何利用HiveSyncTool同步元数据,先进行一个简单的示例,这里用Spark进行示例,因为Spark有获取hadoopConf的API,代码较少,方便示例,其实纯Java也是可以实现的

1val basePath = new Path(pathStr)
2val fs = basePath.getFileSystem(spark.sessionState.newHadoopConf())
3val hiveConf: HiveConf = new HiveConf()
4hiveConf.addResource(fs.getConf)
5val tableMetaClient = HoodieTableMetaClient.builder.setConf(fs.getConf).setBasePath(pathStr).build
6val recordKeyFields = tableMetaClient.getTableConfig.getRecordKeyFields
7var keys = ""
8if (recordKeyFields.isPresent) {
9keys = recordKeyFields.get().mkString(",")
10}
11var partitionPathFields: util.List[String] = null
12val partitionFields = tableMetaClient.getTableConfig.getPartitionFields
13if (partitionFields.isPresent) 
{
14    import scala.collection.JavaConverters._
15    partitionPathFields = partitionFields.get().toList.asJava
16}
17val hiveSyncConfig = getHiveSyncConfig(pathStr, hiveDatabaseName, tableName, partitionPathFields, keys)
18new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable()
19
20def getHiveSyncConfig(basePath: String, dbName: String, tableName: String,
21            partitionPathFields: util.List[String] = null, keys: String = null): HiveSyncConfig 
= {
22    val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig
23    hiveSyncConfig.syncMode = HiveSyncMode.HMS.name
24    hiveSyncConfig.createManagedTable = true
25    hiveSyncConfig.databaseName = dbName
26    hiveSyncConfig.tableName = tableName
27    hiveSyncConfig.basePath = basePath
28    hiveSyncConfig.partitionValueExtractorClass = classOf[MultiPartKeysValueExtractor].getName
29    if (partitionPathFields != null && !partitionPathFields.isEmpty) hiveSyncConfig.partitionFields 
partitionPathFields
30    if (!StringUtils.isNullOrEmpty(keys)) hiveSyncConfig.serdeProperties 
"primaryKey = " + keys //Spark SQL 更新表时需要该属性确认主键字段
31    hiveSyncConfig
32  }

这里利用tableMetaClient来获取表的主键和分区字段,因为同步元数据时Hudi表文件肯定已经存在了,当然如果知道表的主键和分区字段也可以自己指定,这里自动获取会更方便一些。
其实主要是获取配置文件,构造同步工具类HiveSyncTool,然后利用syncHoodieTable同步元数据,建Hive表

接下来看一下源码,首先new HiveSyncTool时,会根据表类型,当表类型为COW时,this.snapshotTableName = cfg.tableName,snapshotTableName 也就是实时视图等于表名,而读优化视图为空,当为MOR表示,实时视图为tableName_rt,而对于读优化视图,默认情况下为tableName_ro,
当配置skipROSuffix=true时,等于表名,这里可以发现当skipROSuffix=true时,MOR表的读优化视图为表名而COW表的实时视图为表名,感觉这里有点矛盾,可能是因为MOR表的读优化视图和COW表的实时视图查询均由HoodieParquetInputFormat实现,具体看后面的源码分析

1  private static final Logger LOG = LogManager.getLogger(HiveSyncTool.class);
2  public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
3  public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
4
5  protected final HiveSyncConfig cfg;
6  protected HoodieHiveClient hoodieHiveClient = null;
7  protected String snapshotTableName = null;
8  protected Option<String> roTableName = null;
9
10  public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
11    super(configuration.getAllProperties(), fs);
12
13    try {
14      this.hoodieHiveClient = new HoodieHiveClient(cfg, configuration, fs);
15    } catch (RuntimeException e) {
16      if (cfg.ignoreExceptions) {
17        LOG.error("Got runtime exception when hive syncing, but continuing as ignoreExceptions config is set ", e);
18      } else {
19        throw new HoodieHiveSyncException("Got runtime exception when hive syncing", e);
20      }
21    }
22
23    this.cfg = cfg;
24    // Set partitionFields to empty, when the NonPartitionedExtractor is used
25    if (NonPartitionedExtractor.class.getName().equals(cfg.partitionValueExtractorClass)) {
26      LOG.warn("Set partitionFields to empty, since the NonPartitionedExtractor is used");
27      cfg.partitionFields = new ArrayList<>();
28    }
29    if (hoodieHiveClient != null) {
30     switch (hoodieHiveClient.getTableType()) {
31        case COPY_ON_WRITE:
32          // 快照查询/实时视图等于表名
33          this.snapshotTableName = cfg.tableName;
34          // 读优化查询/读优化视图为空
35          this.roTableName = Option.empty();
36          break;
37        case MERGE_ON_READ:
38          // 快照查询/实时视图等于 表名+SUFFIX_SNAPSHOT_TABLE即 tableName_rt
39          this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
40          // 读优化查询/读优化视图 skipROSuffix默认为false 默认情况下 tableName_ro
41          // 当配置skipROSuffix=true时,等于表名
42          this.roTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
43              Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
44          break;
45        default:
46          LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
47          throw new InvalidTableException(hoodieHiveClient.getBasePath());
48      }
49    }
50  }

接下来再看一下,上篇文章中提到的两个视图的实现类HoodieParquetInputFormatHoodieParquetRealtimeInputFormat

1  @Override
2  public void syncHoodieTable() {
3    try {
4      if (hoodieHiveClient != null) {
5        doSync();
6      }
7    } catch (RuntimeException re) {
8      throw new HoodieException("Got runtime exception when hive syncing " + cfg.tableName, re);
9    } finally {
10      if (hoodieHiveClient != null) {
11        hoodieHiveClient.close();
12      }
13    }
14  }
15  protected void doSync() {
16    switch (hoodieHiveClient.getTableType()) {
17      case COPY_ON_WRITE:
18        // COW表只有snapshotTableName,也就是实时视图,查询时是由`HoodieParquetInputFormat`实现
19        syncHoodieTable(snapshotTableName, falsefalse);
20        break;
21      case MERGE_ON_READ:
22        // sync a RO table for MOR
23        // MOR 表的读优化视图,以`_RO`结尾,`READ_OPTIMIZED`的缩写,查询时由`HoodieParquetInputFormat`实现
24        syncHoodieTable(roTableName.get(), falsetrue);
25        // sync a RT table for MOR
26        // MOR 表的实时视图,以`_RT`结尾,`REAL_TIME`的缩写,查询时由`HoodieParquetRealtimeInputFormat`实现
27        syncHoodieTable(snapshotTableName, truefalse);
28        break;
29      default:
30        LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
31        throw new InvalidTableException(hoodieHiveClient.getBasePath());
32    }
33  }

可以看到,两个表的区别为:1、COW只同步1个表的元数据:实时视图,MOR表同步两个表的元数据,读优化视图和实时视图 2、除了表名外,参数也不一样,这也就决定了查询时用哪个实现类来实现

由于这篇文章不是主要讲解同步Hive元数据的源码,所以这里只贴主要实现部分,以后会单独总结一篇同步Hive元数据源码的文章。

1    protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
2                                boolean readAsOptimized) 
{
3        syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema);
4    }
5
6  private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat,
7                          boolean readAsOptimized, MessageType schema) 
{
8
9    Map<String, String> sparkSerdeProperties = getSparkSerdeProperties(readAsOptimized);
10
11    String inputFormatClassName = HoodieInputFormatUtils.getInputFormatClassName(baseFileFormat, useRealTimeInputFormat);
12
13    }
14
15    public static String getInputFormatClassName(HoodieFileFormat baseFileFormat, boolean realtime) {
16    switch (baseFileFormat) {
17      case PARQUET:
18        if (realtime) {
19          return HoodieParquetRealtimeInputFormat.class.getName();
20        } else {
21          return HoodieParquetInputFormat.class.getName();
22        }
23      case HFILE:
24        if (realtime) {
25          return HoodieHFileRealtimeInputFormat.class.getName();
26        } else {
27          return HoodieHFileInputFormat.class.getName();
28        }
29      case ORC:
30        return OrcInputFormat.class.getName();
31      default:
32        throw new HoodieIOException("Hoodie InputFormat not implemented for base file format " + baseFileFormat);
33    }

可以看到对于存储类型为PARQUET时,当useRealtimeInputFormat为true时,那么inputFormat的实现类为HoodieParquetRealtimeInputFormat,当为false时,实现类为HoodieParquetInputFormat,至于另外一个参数readAsOptimized,是否为读优化,这个参数是Spark SQL读取时用来判断该表为实时视图还是读优化视图,相关源码

1// 同步元数据建表时添加参数:`hoodie.query.as.ro.table=true/false`
2sparkSerdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(readAsOptimized));
3
4Spark读取Hive表时,用来判断,在类`org.apache.hudi.DataSourceOptionsHelper`
5
6  def parametersWithReadDefaults(parameters: Map[String, String]): Map[String, String] = {
7    // First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by HiveSyncTool,
8    // or else use query type from QUERY_TYPE.
9    val queryType = parameters.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
10      .map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL)
11      .getOrElse(parameters.getOrElse(QUERY_TYPE.key, QUERY_TYPE.defaultValue()))
12
13    Map(
14      QUERY_TYPE.key -> queryType
15    ) ++ translateConfigurations(parameters)
16  }

体现在建表语句里则为:

1WITH SERDEPROPERTIES (                             |
2|   'hoodie.query.as.ro.table'='false',

inputFormat的语句:

1STORED AS INPUTFORMAT                              |
2|   'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' 

完整的建表语句在后面的示例中

示例

DF

这里利用Apache Hudi 入门学习总结中写Hudi并同步到Hive表的程序来验证

COW表

由于之前的文章中已经有COW表的建表语句了,这里直接copy过来

1+----------------------------------------------------+
2|                   createtab_stmt                   |
3+----------------------------------------------------+
4CREATE TABLE `test_hudi_table_1`(                  |
5|   `_hoodie_commit_time` string,                    |
6|   `_hoodie_commit_seqno` string,                   |
7|   `_hoodie_record_key` string,                     |
8|   `_hoodie_partition_path` string,                 |
9|   `_hoodie_file_name` string,                      |
10|   `id` int,                                        |
11|   `name` string,                                   |
12|   `value` int,                                     |
13|   `ts` int)                                        |
14| PARTITIONED BY (                                   |
15|   `dt` string)                                     |
16ROW FORMAT SERDE                                   |
17|   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  |
18WITH SERDEPROPERTIES (                             |
19|   'hoodie.query.as.ro.table'='false',              |
20|   'path'='/tmp/test_hudi_table_1',                  |
21|   'primaryKey'='id')                               |
22STORED AS INPUTFORMAT                              |
23|   'org.apache.hudi.hadoop.HoodieParquetInputFormat'  |
24| OUTPUTFORMAT                                       |
25|   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
26| LOCATION                                           |
27|   'hdfs://cluster1/tmp/test_hudi_table_1'           |
28| TBLPROPERTIES (                                    |
29|   'last_commit_time_sync'='20220512101500',        |
30|   'spark.sql.sources.provider'='hudi',             |
31|   'spark.sql.sources.schema.numPartCols'='1',      |
32|   'spark.sql.sources.schema.numParts'='1',         |
33|   'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"value","type":"integer","nullable":false,"metadata":{}},{"name":"ts","type":"integer","nullable":false,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}',  |
34|   'spark.sql.sources.schema.partCol.0'='dt',       |
35|   'transient_lastDdlTime'='1652320902')            |
36+----------------------------------------------------+

可以看到'hoodie.query.as.ro.table'='false',对于COW表的视图为实时视图,inputFormat为org.apache.hudi.hadoop.HoodieParquetInputFormat

MOR表

我们将之前的save2HudiSyncHiveWithPrimaryKey方法中加个表类型的参数option(TABLE_TYPE.key(), MOR_TABLE_TYPE_OPT_VAL),将表名库名修改一下:

1    val databaseName = "test"
2    val tableName1 = "test_hudi_table_df_mor"
3    val primaryKey = "id"
4    val preCombineField = "ts"
5    val partitionField = "dt"
6    val tablePath1 = "/tmp/test_hudi_table_df_mor"

同步Hive表成功后,show tables,发现建了两张表test_hudi_table_df_mor_rotest_hudi_table_df_mor_rt,通过上面的源码分析部分,我们知道_ro为读优化表,_rt为实时表,我们再看一下建表语句:

1+----------------------------------------------------+
2|                   createtab_stmt                   |
3+----------------------------------------------------+
4CREATE TABLE `test_hudi_table_df_mor_ro`(          |
5|   `_hoodie_commit_time` string,                    |
6|   `_hoodie_commit_seqno` string,                   |
7|   `_hoodie_record_key` string,                     |
8|   `_hoodie_partition_path` string,                 |
9|   `_hoodie_file_name` string,                      |
10|   `id` int,                                        |
11|   `name` string,                                   |
12|   `value` int,                                     |
13|   `ts` int)                                        |
14| PARTITIONED BY (                                   |
15|   `dt` string)                                     |
16ROW FORMAT SERDE                                   |
17|   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  |
18WITH SERDEPROPERTIES (                             |
19|   'hoodie.query.as.ro.table'='true',               |
20|   'path'='/tmp/test_hudi_table_df_mor',            |
21|   'primaryKey'='id')                               |
22STORED AS INPUTFORMAT                              |
23|   'org.apache.hudi.hadoop.HoodieParquetInputFormat'  |
24| OUTPUTFORMAT                                       |
25|   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
26| LOCATION                                           |
27|   'hdfs://cluster1/tmp/test_hudi_table_df_mor'     |
28| TBLPROPERTIES (                                    |
29|   'last_commit_time_sync'='20220629145934',        |
30|   'spark.sql.sources.provider'='hudi',             |
31|   'spark.sql.sources.schema.numPartCols'='1',      |
32|   'spark.sql.sources.schema.numParts'='1',         |
33|   'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"value","type":"integer","nullable":false,"metadata":{}},{"name":"ts","type":"integer","nullable":false,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}',  |
34|   'spark.sql.sources.schema.partCol.0'='dt',       |
35|   'transient_lastDdlTime'='1656486059')            |
36+----------------------------------------------------+
37
38+----------------------------------------------------+
39|                   createtab_stmt                   |
40+----------------------------------------------------+
41CREATE TABLE `test_hudi_table_df_mor_rt`(          |
42|   `_hoodie_commit_time` string,                    |
43|   `_hoodie_commit_seqno` string,                   |
44|   `_hoodie_record_key` string,                     |
45|   `_hoodie_partition_path` string,                 |
46|   `_hoodie_file_name` string,                      |
47|   `id` int,                                        |
48|   `name` string,                                   |
49|   `value` int,                                     |
50|   `ts` int)                                        |
51| PARTITIONED BY (                                   |
52|   `dt` string)                                     |
53ROW FORMAT SERDE                                   |
54|   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  |
55WITH SERDEPROPERTIES (                             |
56|   'hoodie.query.as.ro.table'='false',              |
57|   'path'='/tmp/test_hudi_table_df_mor',            |
58|   'primaryKey'='id')                               |
59STORED AS INPUTFORMAT                              |
60|   'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'  |
61| OUTPUTFORMAT                                       |
62|   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
63| LOCATION                                           |
64|   'hdfs://cluster1/tmp/test_hudi_table_df_mor'     |
65| TBLPROPERTIES (                                    |
66|   'last_commit_time_sync'='20220629145934',        |
67|   'spark.sql.sources.provider'='hudi',             |
68|   'spark.sql.sources.schema.numPartCols'='1',      |
69|   'spark.sql.sources.schema.numParts'='1',         |
70|   'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"value","type":"integer","nullable":false,"metadata":{}},{"name":"ts","type":"integer","nullable":false,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}',  |
71|   'spark.sql.sources.schema.partCol.0'='dt',       |
72|   'transient_lastDdlTime'='1656486059')            |
73+----------------------------------------------------+

可以看到_ro_rt有两个区别,一个是hoodie.query.as.ro.table,另外一个是INPUTFORMAT,对于Hive查询来说,只有INPUTFORMAT有用,hoodie.query.as.ro.table是Spark查询时用来判断是否为读优化表的,因为MOR表只有一次写入,所以只有parquet文件,没有增量文件.log,所以两个表查询出来的结构是一样的,后面用Spark SQL示例两者的区别

Spark SQL

Hudi Spark SQL建表,不了解的可以参考:Hudi Spark SQL总结,之所以再提一下Spark SQL建表,是因为我发现他和DF写数据再同步建表有些许差别

COW表

1create table test_hudi_table_cow (
2  id int,
3  name string,
4  price double,
5  ts long,
6  dt string
7using hudi
8 partitioned by (dt)
9 options (
10  primaryKey = 'id',
11  preCombineField = 'ts',
12  type = 'cow'
13 );

建表完成后,在Hive里查看Hive表的建表语句

1show create table test_hudi_table_cow;
2
3+----------------------------------------------------+
4|                   createtab_stmt                   |
5+----------------------------------------------------+
6CREATE TABLE `test_hudi_table_cow`(                |
7|   `_hoodie_commit_time` string,                    |
8|   `_hoodie_commit_seqno` string,                   |
9|   `_hoodie_record_key` string,                     |
10|   `_hoodie_partition_path` string,                 |
11|   `_hoodie_file_name` string,                      |
12|   `id` int,                                        |
13|   `name` string,                                   |
14|   `price` double,                                  |
15|   `ts` bigint)                                     |
16| PARTITIONED BY (                                   |
17|   `dt` string)                                     |
18ROW FORMAT SERDE                                   |
19|   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  |
20WITH SERDEPROPERTIES (                             |
21|   'path'='hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_cow',  |
22|   'preCombineField'='ts',                          |
23|   'primaryKey'='id',                               |
24|   'type'='cow')                                    |
25STORED AS INPUTFORMAT                              |
26|   'org.apache.hudi.hadoop.HoodieParquetInputFormat'  |
27| OUTPUTFORMAT                                       |
28|   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
29| LOCATION                                           |
30|   'hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_cow' |
31| TBLPROPERTIES (                                    |
32|   'last_commit_time_sync'='20220628152846',        |
33|   'spark.sql.create.version'='2.4.5',              |
34|   'spark.sql.sources.provider'='hudi',             |
35|   'spark.sql.sources.schema.numPartCols'='1',      |
36|   'spark.sql.sources.schema.numParts'='1',         |
37|   'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"price","type":"double","nullable":true,"metadata":{}},{"name":"ts","type":"long","nullable":true,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}',  |
38|   'spark.sql.sources.schema.partCol.0'='dt',       |
39|   'transient_lastDdlTime'='1656401195')            |
40+----------------------------------------------------+

我们发现,Spark SQL建的表中没有hoodie.query.as.ro.table,我看了一下源码发现(上面有提到),Spark查询时

1val queryType = parameters.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
2    .map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL)
3    .getOrElse(parameters.getOrElse(QUERY_TYPE.key, QUERY_TYPE.defaultValue()))

QUERY_TYPE的默认值为QUERY_TYPE_SNAPSHOT_OPT_VAL,也就是快照查询,COW只有快照查询也就是默认值没有问题,QUERY_TYPE有三种类型:QUERY_TYPE_SNAPSHOT_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_INCREMENTAL_OPT_VAL,分别对应实时查询,读优化查询,增量查询,至于怎么利用Spark实现这些查询,这里不涉及

MOR表

1create table test_hudi_table_mor (
2  id int,
3  name string,
4  price double,
5  ts long,
6  dt string
7using hudi
8 partitioned by (dt)
9 options (
10  primaryKey = 'id',
11  preCombineField = 'ts',
12  type = 'mor'
13 );

我们用Spark创建MOR表后,show tables看一下发现只有test_hudi_table_mor表,没有对应的_rt_ro表,其实SparkSQL建表的时候还没用到Hive同步工具类HiveSyncTool,SparkSQL有自己的一套建表逻辑,而只有在写数据时才会用到HiveSyncTool,这也就是上面讲到的SparkSQL和DF同步建出来的表有差异的原因,接下来我们插入一条数据,来看一下结果

1insert into test_hudi_table_mor values (1,'hudi',10,100,'2021-05-05');

我们发现多了两张表,因为这两张表,是insert 数据然后利用同步工具类HiveSyncTool创建的表,所以和程序中用DF写数据同步建的表是一样的,区别是内部表和外部表的区别,其实SparkSQL的逻辑如果表路径不等于库路径+表名,那么为外部表,这是合理的,而我们用DF建的表是因为我们程序中指定了内部表的参数,这样我们drop其中一张表就可以删掉数据,而用SparkSQL建的表,其实多了一张表内部表test_hudi_table_mor,我们可以通过drop这张表来删除数据。

1+----------------------------------------------------+
2|                   createtab_stmt                   |
3+----------------------------------------------------+
4CREATE EXTERNAL TABLE `test_hudi_table_mor_ro`(    |
5|   `_hoodie_commit_time` string COMMENT '',         |
6|   `_hoodie_commit_seqno` string COMMENT '',        |
7|   `_hoodie_record_key` string COMMENT '',          |
8|   `_hoodie_partition_path` string COMMENT '',      |
9|   `_hoodie_file_name` string COMMENT '',           |
10|   `id` int COMMENT '',                             |
11|   `name` string COMMENT '',                        |
12|   `price` double COMMENT '',                       |
13|   `ts` bigint COMMENT '')                          |
14| PARTITIONED BY (                                   |
15|   `dt` string COMMENT '')                          |
16ROW FORMAT SERDE                                   |
17|   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  |
18WITH SERDEPROPERTIES (                             |
19|   'hoodie.query.as.ro.table'='true',               |
20|   'path'='hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor')  |
21STORED AS INPUTFORMAT                              |
22|   'org.apache.hudi.hadoop.HoodieParquetInputFormat'  |
23| OUTPUTFORMAT                                       |
24|   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
25| LOCATION                                           |
26|   'hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor' |
27| TBLPROPERTIES (                                    |
28|   'last_commit_time_sync'='20220629153816',        |
29|   'spark.sql.sources.provider'='hudi',             |
30|   'spark.sql.sources.schema.numPartCols'='1',      |
31|   'spark.sql.sources.schema.numParts'='1',         |
32|   'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"price","type":"double","nullable":true,"metadata":{}},{"name":"ts","type":"long","nullable":true,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}',  |
33|   'spark.sql.sources.schema.partCol.0'='dt',       |
34|   'transient_lastDdlTime'='1656488248')            |
35+----------------------------------------------------+
36
37+----------------------------------------------------+
38|                   createtab_stmt                   |
39+----------------------------------------------------+
40CREATE EXTERNAL TABLE `test_hudi_table_mor_rt`(    |
41|   `_hoodie_commit_time` string COMMENT '',         |
42|   `_hoodie_commit_seqno` string COMMENT '',        |
43|   `_hoodie_record_key` string COMMENT '',          |
44|   `_hoodie_partition_path` string COMMENT '',      |
45|   `_hoodie_file_name` string COMMENT '',           |
46|   `id` int COMMENT '',                             |
47|   `name` string COMMENT '',                        |
48|   `price` double COMMENT '',                       |
49|   `ts` bigint COMMENT '')                          |
50| PARTITIONED BY (                                   |
51|   `dt` string COMMENT '')                          |
52ROW FORMAT SERDE                                   |
53|   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  |
54WITH SERDEPROPERTIES (                             |
55|   'hoodie.query.as.ro.table'='false',              |
56|   'path'='hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor')  |
57STORED AS INPUTFORMAT                              |
58|   'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'  |
59| OUTPUTFORMAT                                       |
60|   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
61| LOCATION                                           |
62|   'hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor' |
63| TBLPROPERTIES (                                    |
64|   'last_commit_time_sync'='20220629153816',        |
65|   'spark.sql.sources.provider'='hudi',             |
66|   'spark.sql.sources.schema.numPartCols'='1',      |
67|   'spark.sql.sources.schema.numParts'='1',         |
68|   'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"price","type":"double","nullable":true,"metadata":{}},{"name":"ts","type":"long","nullable":true,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}',  |
69|   'spark.sql.sources.schema.partCol.0'='dt',       |
70|   'transient_lastDdlTime'='1656488248')            |
71+----------------------------------------------------+

我们再插入一条数据和更新一条数据,目的是为了生成log文件,来看两个表的不同

1insert into test_hudi_table_mor values (2,'hudi',11,110,'2021-05-05');
2update test_hudi_table_mor set name='hudi_update' where id =1;
1select * from test_hudi_table_mor_ro;
2+----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+-------+--------+------+-------------+
3| _hoodie_commit_time  | _hoodie_commit_seqno  | _hoodie_record_key  | _hoodie_partition_path  |                 _hoodie_file_name                  | id  | name  | price  |  ts  |     dt      |
4+----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+-------+--------+------+-------------+
5| 20220629153718       | 20220629153718_0_1    | id:1                | dt=2021-05-05           | bc415cdb-2b21-4d09-a3f6-a779357aa819-0_0-125-7240_20220629153718.parquet | 1   | hudi  | 10.0   | 100  | 2021-05-05  |
6| 20220629153803       | 20220629153803_0_2    | id:2                | dt=2021-05-05           | bc415cdb-2b21-4d09-a3f6-a779357aa819-0_0-154-8848_20220629153803.parquet | 2   | hudi  | 11.0   | 110  | 2021-05-05  |
7+----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+-------+--------+------+-------------+
1select * from test_hudi_table_mor_rt;
2
3+----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+--------------+--------+------+-------------+
4| _hoodie_commit_time  | _hoodie_commit_seqno  | _hoodie_record_key  | _hoodie_partition_path  |                 _hoodie_file_name                  | id  |     name     | price  |  ts  |     dt      |
5+----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+--------------+--------+------+-------------+
6| 20220629153816       | 20220629153816_0_1    | id:1                | dt=2021-05-05           | bc415cdb-2b21-4d09-a3f6-a779357aa819-0             | 1   | hudi_update  | 10.0   | 100  | 2021-05-05  |
7| 20220629153803       | 20220629153803_0_2    | id:2                | dt=2021-05-05           | bc415cdb-2b21-4d09-a3f6-a779357aa819-0_0-154-8848_20220629153803.parquet | 2   | hudi         | 11.0   | 110  | 2021-05-05  |
8+----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+--------------+--------+------+-------------+

我们发现_ro只能将新插入的查出来,而没有将更新的那条数据查出来,而_rt是将最新的数据都查出来,我们再插入和更新时看一下存储文件

1hadoop fs -ls hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor/dt=2021-05-05
2Found 4 items
3-rw-rw----+  3 spark hadoop        975 2022-06-29 15:38 hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor/dt=2021-05-05/.bc415cdb-2b21-4d09-a3f6-a779357aa819-0_20220629153803.log.1_0-186-10660
4-rw-rw----+  3 spark hadoop         93 2022-06-29 15:37 hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor/dt=2021-05-05/.hoodie_partition_metadata
5-rw-rw----+  3 spark hadoop     435283 2022-06-29 15:37 hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor/dt=2021-05-05/bc415cdb-2b21-4d09-a3f6-a779357aa819-0_0-125-7240_20220629153718.parquet
6-rw-rw----+  3 spark hadoop     434991 2022-06-29 15:38 hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor/dt=2021-05-05/bc415cdb-2b21-4d09-a3f6-a779357aa819-0_0-154-8848_20220629153803.parquet

发现,insert时是生成新的parquet文件,而更新时是生成.log文件,所以_ro表将新插入的数据也出来了,因为_ro只能查parquet文件(基本文件)中的数据,而_rt表可以动态合并最新文件切片的基本文件(例如parquet)和增量文件(例如avro)来提供近实时数据集(几分钟的延迟),至于MOR表的写入逻辑(什么条件下写增量文件)和合并逻辑(什么情况下合并增量文件为parquet),这里不深入讲解,以后我会单独总结。


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存