数据湖技术Hudi0.10master测试流程

Hudi0.10master测试流程

Hudi粗糙介绍

hudi同步hive底层大概是什么原理,都是指向同一份存储没有拷贝数据吗,hive是怎么实现update、delete逻辑。

hudi底层其实还是hdfs,只不过hudi提供对写入数据包括其metadata的管理和数据组织方式,通过hudi本身支持acid语义,这样可以确保数据写入和hive存放hdfs方式一致,再在hive增加对应metadata信息。 可以将hudi看做是table format用来组织数据存放位置和格式之类的

hive读取的时候是对hdfs进行list操作,如果分区文件太多,性能很差; hudi增加index在读取的时候,能快速进行目标文件

它承担完成组织数据及对提供组织数据时提供的格式(parquet,orc,avro,arrow等),并在完成组织数据过程中提供了acid的语义,可以将其理解为table format;

hive也是可以理解为table format;两者肯定有差异; 比如hive是否满足acid,是否提供存储一体化满足离线和实时(近实时),与hive读取相比是的性能如何? 是否支持schema变更是前后数据兼容,自动识别…这些

对象存储是不支持 append 的

本地环境

名称 版本 描述
flink(pre-job) 1.3.2 通过parcel包部署于cdh6.3.2中
cdh 6.3.2 开源版本
hive 2.1.1-cdh6.3.2 包含cdh中(更换jar升级替换)
hadoop 3.0.0-cdh6.3.2 cdh原生版本
presto 2.591 开源版本
trino 360 开源版本
hudi 0.10 master分支编译

编译hudi包

github 拉取hudi代码

git clone  https://github.com/apache/hudi.git

编译hudi

mvn clean install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive2 
# 如果是 hive3 需要使用 profile -Pflink-bundle-shade-hive3
# 如果是 hive1 需要使用 profile -Pflink-bundle-shade-hive1

#注意1:hive1.x现在只能实现同步metadata到hive,而无法使用hive查询,如需查询可使用spark查询hive外表的方法查询。
#注意2: 使用-Pflink-bundle-shade-hive x,需要修改profile中hive的版本为集群对应版本(只需修改profile里的hive版本)。修改位置为packaging/hudi-flink-bundle/pom.xml最下面的对应profile段,找到后修改profile中的hive版本为对应版本即可。
packaging/hudi-flink-bundle/pom.xml
<profile>
      <id>flink-bundle-shade-hive2</id>
      <properties>
        <hive.version>2.1.1-cdh6.3.2</hive.version>
        <flink.bundle.hive.scope>compile</flink.bundle.hive.scope>
      </properties>
      <dependencies>
        <dependency>
          <groupId>${hive.groupid}</groupId>
          <artifactId>hive-service-rpc</artifactId>
          <version>${hive.version}</version>
          <scope>${flink.bundle.hive.scope}</scope>
        </dependency>
      </dependencies>
    </profile>

当flink/lib下有flink-sql-connector-hive-xxx.jar时,会出现hive包冲突,解决方法是在install时,另外再指定一个profile:-Pinclude-flink-sql-connector-hive,同时删除掉flink/lib下的flink-sql-connector-hive-xxx.jar

Note: 该问题从 0.10 版本已经解决。

编译完成之后包的位置

  • flink依赖
hudi/packaging/hudi-flink-bundle/target
  • hive依赖
hudi/packaging/hudi-hadoop-mr-bundle/target
  • 导入包

    • flink依赖
    cp hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar FLINK_HOME/lib/
    #也可以通过-j或者-l指定 但是当前仅master可指定,0.90存在bug
    
    • hive依赖
    cp hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar /opt/cloudera/parcels/CDH/lib/hive/lib/
    

Hudi 表类型

COW 表适用于离线批量更新场景,对于更新数据,会先读取旧的 base file,然后合并更新数据,生成新的 base file。

MOR 表适用于实时高频更新场景,更新数据会直接写入 log file 中,读时再进行合并。为了减少读放大的问题,会定期合并 log file 到 base file 中。

开始数据湖操作

进入flink-sql-clent

flink-sql-client -l /root/hudi
使用的flink为cdh on flink(pre-job)。命令为全局命令
-l #指定文件夹,加载文件夹中所有jar
-j #指定jar

#/root/hudi
-rw-r--r-- 1 root root  3670520 926 09:00 flink-sql-connector-kafka_2.11-1.13.2.jar
-rw-r--r-- 1 root root 57509301 926 15:15 hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar

测试数据类型

类型 备注
tinyint 1字节 整数值
smallint 2字节 整数值
int 4字节 整数值
bigint 8字节 整数值
decimal(precision, scale) 精确数值,精度precision,小数点后位数scale
precision取值1~38,缺省默认为9
scale不能大于precision,缺省默认为0
float 4字节 浮点型
double 8字节 浮点型
boolean true/false
char(length) 固定长度字符,length必填(1~255)

kafka消息数据参考

{"tinyint0": 6, "smallint1": 223, "int2": 42999, "bigint3": 429450, "float4": 95.47324181659323, "double5": 340.5755392968011,"decimal6": 111.1111, "boolean7": true,  "char8": "dddddd", "varchar9": "buy0", "string10": "buy1", "timestamp11": "2021-09-13 03:08:50.810"}

创建source表

CREATE TABLE k (
   tinyint0 TINYINT
  ,smallint1 SMALLINT
  ,int2 INT
  ,bigint3 BIGINT
  ,float4 FLOAT
  ,double5 DOUBLE  
  ,decimal6 DECIMAL(38,8)
  ,boolean7 BOOLEAN
  ,char8 STRING
  ,varchar9 STRING
  ,string10 STRING
  ,timestamp11 STRING
) WITH (
      'connector' = 'kafka'                             -- 使用 kafka connector
    , 'topic' = 'hd4'                                   -- kafka topic名称
    , 'scan.startup.mode' = 'earliest-offset'           -- 从起始 offset 开始读取
    , 'properties.bootstrap.servers' = 'cdh4:9092'      -- kafka broker 地址
    , 'properties.group.id' = 'testgroup1' 
    , 'value.format' = 'json'
    , 'value.json.fail-on-missing-field' = 'true'
    , 'value.fields-include' = 'ALL'
);

创建Hudi(cow)sink表

CREATE TABLE hdc(
   tinyint0 TINYINT 
  ,smallint1 SMALLINT
  ,int2 INT
  ,bigint3 BIGINT
  ,float4 FLOAT
  ,double5 DOUBLE  
  ,decimal6 DECIMAL(12,3)
  ,boolean7 BOOLEAN
  ,char8 CHAR(64) PRIMARY KEY NOT ENFORCED
  ,varchar9 VARCHAR(64)
  ,string10 STRING
  ,timestamp11 TIMESTAMP(3)
 )
PARTITIONED BY (tinyint0) 
 WITH (
     'connector' = 'hudi'
   , 'path' = 'hdfs://nameservice1/data/hudi/hdc'
   , 'write.precombine.field' = 'timestamp11'             -- 相同的键值时,取此字段最大值,默认ts字段
   , 'write.tasks' = '4'
   , 'write.rate.limit' = '2000'                          -- 限制每秒多少条
   , 'hive_sync.enable' = 'true'                          -- 启用hive同步
   , 'hive_sync.mode' = 'hms'                             -- 启用hive hms同步,默认jdbc
   , 'hive_sync.metastore.uris' = 'thrift://cdh3:9083'    -- required, metastore的端口
   , 'hive_sync.jdbc_url' = 'jdbc:hive2://cdh3:10000'     -- required, hiveServer地址
   , 'hive_sync.table' = 'hdc'                            -- required, hive 新建的表名
   , 'hive_sync.db' = 'hudi'                              -- required, hive 新建的数据库名
   , 'hive_sync.username' = 'hive'                        -- required, HMS 用户名
   , 'hive_sync.password' = ''                            -- required, HMS 密码
 );

创建Hudi(mor)sink表

CREATE TABLE hdm2(
   tinyint0 TINYINT
  ,smallint1 SMALLINT
  ,int2 INT
  ,bigint3 BIGINT
  ,float4 FLOAT
  ,double5 DOUBLE  
  ,decimal6 DECIMAL(12,3)
  ,boolean7 BOOLEAN
  ,char8 CHAR(64)
  ,varchar9 VARCHAR(64)
  ,string10 STRING
  ,timestamp11 TIMESTAMP(3)
 )
PARTITIONED BY (tinyint0) 
 WITH (
     'connector' = 'hudi'
   , 'path' = 'hdfs://nameservice1/data/hudi/hdm2'
   , 'hoodie.datasource.write.recordkey.field' = 'char8'  -- 主键
   , 'write.precombine.field' = 'timestamp11'             -- 相同的键值时,取此字段最大值,默认ts字段
   , 'write.tasks' = '1'
   , 'read.tasks' = '4'
   , 'compaction.tasks' = '2'
   , 'write.rate.limit' = '2000'                          -- 限制每秒多少条
   , 'table.type' = 'MERGE_ON_READ'                       -- 默认COPY_ON_WRITE
   , 'compaction.async.enabled' = 'true'                  -- 在线压缩
   , 'compaction.trigger.strategy' = 'num_commits'        -- 按次数压缩
   , 'compaction.delta_commits' = '5'                     -- 默认为5
   , 'hive_sync.enable' = 'true'                          -- 启用hive同步
   , 'hive_sync.mode' = 'hms'                             -- 启用hive hms同步,默认jdbc
   , 'hive_sync.metastore.uris' = 'thrift://cdh3:9083'    -- required, metastore的端口
   , 'hive_sync.jdbc_url' = 'jdbc:hive2://cdh3:10000'     -- required, hiveServer地址
   , 'hive_sync.table' = 'hdm2'                            -- required, hive 新建的表名
   , 'hive_sync.db' = 'hudi'                              -- required, hive 新建的数据库名
   , 'hive_sync.username' = 'hive'                        -- required, HMS 用户名
   , 'hive_sync.password' = ''                            -- required, HMS 密码
   , 'hive_sync.skip_ro_suffix' = 'true'                  -- 去除ro后缀
 );

插入source数据

insert into hdm 
select   
      cast(tinyint0 as TINYINT)
    , cast(smallint1 as SMALLINT)
    , cast(int2 as INT)
    , cast(bigint3 as BIGINT)
    , cast(float4 as FLOAT)
    , cast(double5 as DOUBLE)
    , cast(decimal6 as DECIMAL(38,18))
    , cast(boolean7 as BOOLEAN)
    , cast(char8 as CHAR(64))
    , cast(varchar9 as VARCHAR(64))
    , cast(string10 as STRING)
    , cast(timestamp11 as TIMESTAMP(3)) 
 from  k;

插入单条数据测试

INSERT INTO hd VALUES(
      cast(1218 as TINYINT)
    , cast(295 as SMALLINT)
    , cast(-210121792 as INT)
    , cast(-3697946268377828253 as BIGINT)
    , cast(1.123459111111 as FLOAT)
    , cast(1111111.123411 as DOUBLE)
    , cast(1111.1234111 as DECIMAL(12, 3) )
    , cast(123123123123 as BOOLEAN)
    , cast('`[s1tX213ysdasdasdgfq3wqwdqwqd速度速度pGPYl`AggMaHNRJv\[CkIYzcgMlmVvLSjtYmnlBEcwH^kEgDSxGIwGNLDP' as CHAR(64))
    , cast('daQOIE[n_eJsYLBJLttyFHnBXiCoT`RWeCO\G[JZZTdFFnFZFCODoI`X[SbMVAjq' as VARCHAR(64))
    , cast('e1916697-e626-4446-bd18-0142bfb9417b' as STRING)
    , cast('2021-09-13 03:08:50.810' as TIMESTAMP(3))
);

流读hudi

参数 默认 描述
read.streaming.enabled false 流读
read.streaming.check-interval 60 流读检查秒数
read.streaming.start-commit 设置此参数将从提供的时间后开始读取数据

设置查询模式

SET sql-client.execution.result-mode=table;
SET sql-client.execution.result-mode=changelog;
SET sql-client.execution.result-mode=tableau;

presto查询hudi配置

presto 可以直接通过hive-catalog查询hudi

connector.name=hive-hadoop2
hive.metastore.uri=thrift://cdh2:9083
hive.config.resources=/etc/alternatives/hadoop-conf/core-site.xml,/etc/alternatives/hadoop-conf/hdfs-site.xml
hive.parquet.use-column-names=true

当 Presto-server-xxx 版本 < 0.233 时,hudi-presto-bundle.jar需要手动导入到{presto_install_dir}/plugin/hive-hadoop2/.

hudi sync hive presto表数据类型测试

Hudi数据类型 hive数据类型 presto数据类型 备注 极值 插入不符合值结果 备注
tinyint int integer(10) 1字节 整数值 (-128~127) 数值溢出 超过20位Flink-JOb返回异常
smallint int integer(10) 2字节 整数值 (-32768~32767) 数值溢出 超过20位Flink-JOb返回异常
int int integer(10) 4字节 整数值 (-2147483648~
2147483647)
数值溢出 超过20位Flink-JOb返回异常
bigint bigint bigint(19) 8字节 整数值 (±9.22*10的18次方) 数值溢出 超过20位Flink-JOb返回异常
decimal(m, d) decimal(m,d) decimal(m, d) 精确数值,精度m,小数点后位数d
m取值1~38,缺省默认为9
d不能大于m,缺省默认为0
参数m<65 是总个数
d<30且 d
小数位超出按位截取,整数位超出指定为值为NULL 超过20位Flink-JOb返回异常
float float real(24) 4字节 浮点型 8位精度(4字节) 小数保留八位,超出截取,整数位异常 超过20位Flink-JOb返回异常
double double double(53) 8字节 浮点型 16位精度(8字节) 小数位插入正常,整数位异常 超过20位Flink-JOb返回异常
boolean boolean boolean true/false true/false 插入数值为true,插入字符为NULL 插入任何仅返回true和flase
char(length) string varchar 固定长度字符,length必填(1~255) 最多255个字符 可随意插入,与设定长度无关
varchar(max_length) string varchar 可变长度字符,max_length必填(1~65535) 可随意插入,与设定长度无关
string string varchar 字符串 无异常,可随意插入
timestamp bigint(19) bigint(19) 时间戳 hive自动转类型

hudi(mor)离线压缩

  • 单机flink压缩提交

    ./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor /root/hudi/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar --path hdfs:///data/hudi/hd2 --compaction-tasks 4
    
  • Flink on yarn(pre-job)离线压缩

    flink run -t yarn-per-job -Djobmanager.memory.process.size=1024m -Dtaskmanager.memory.process.size=2048m -Dtaskmanager.numberOfTaskSlots=2 -Denv.java.opts="-Dfile.encoding=UTF-8" -c org.apache.hudi.sink.compact.HoodieFlinkCompactor /root/hudi/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar --path  hdfs:///data/hudi/hd7  --compaction-tasks 4
    

多引擎同时操作hudi

  • Spark操作数据

    Flink正常可见,不可操作,且Spark操作数据会刷新Flink插入数据

  • FLink操作数据

    Spark不可见

支持查询矩阵

Copy-On-Write #

查询引擎 快照查询 增量查询
Hive Y Y
Spark SQL Y Y
Spark Datasource Y Y
Flink SQL Y N
PrestoDB Y N
Trino Y N
Impala3.4 或更高版本 Y N

Merge-On-Read #

查询引擎 快照查询 增量查询 读优化查询
Hive Y(有bug 社区在修复) Y Y
Spark SQL Y Y Y
Spark Datasource Y Y Y
Flink SQL Y Y Y
PrestoDB Y N Y
Trino N N Y
Impala N N Y

注:

在线压缩策略没起之前占用内存资源,推荐离线压缩,但离线压缩需手动根据压缩策略才可触发

cow写少读多的场景 mor 相反

MOR表压缩在线压缩按照配置压缩,如压缩失败,会有重试压缩操作,重试压缩操作延迟一小时后重试

你可能感兴趣的