Flink写数据到 hudi中,hive读取

flink hive on hudi 手动创建表

文档 : https://www.yuque.com/docs/share/879349ce-7de4-4284-9126-9c2a3c93a91d?#%20%E3%80%8AHive%20On%20Hudi%E3%80%8B

  1. 在 /data/app/hive/auxlib 目录放入hudi jar–>hudi-hadoop-mr-bundle-0.10.0.jar
  2. 或者 修改配置项 hive-site.xml
hive.default.aux.jars.path  hive.aux.jars.path

// 示例: 
<name>hive.default.aux.jars.path</name>
<value>file:///mypath/hudi-hadoop-mr-bundle-0.9.0xxx.jar,file:///mypath/hudi-hive-sync-bundle-0.9.0xx.jar
  1. 创建对应的 方法 与表
 //flink中 创建 MERGE_ON_READ 表,需自己在hive 中创建表映射
        String sinkDDL ="CREATE TABLE hudi_datagen_mor ( " +
                "id INT Primary key, " +
                "userid INT, " +
                "amt BIGINT, " +
                "proctime TIMESTAMP(3) " +
                " ) WITH ( " +
                "'connector' = 'hudi', " +
                "'path' = 'hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor', " +
                "'table.type' = 'MERGE_ON_READ', " +
                "'write.bucket_assign.tasks' = '1'," +
                "'write.tasks' = '1', " +
                "'compaction.tasks' = '1' " 
        ")";
        
//hive中 手动创建外部表 MERGE_ON_READ 模式

//方式一:INPUTFORMAT是org.apache.hudi.hadoop.HoodieParquetInputFormat
//这种方式只会查询出来parquet数据文件中的内容,但是刚刚更新或者删除的数据不能查出来

CREATE EXTERNAL TABLE `hudi_datagen_mor`(               
   `_hoodie_commit_time` string,                    
   `_hoodie_commit_seqno` string,                   
   `_hoodie_record_key` string,                     
   `_hoodie_partition_path` string,                 
   `_hoodie_file_name` string,                      
   `id` bigint,                                     
   `userid` bigint,                                   
   `amt` bigint,                               
   `proctime` string
)ROW FORMAT SERDE                                   
   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  
 STORED AS INPUTFORMAT                              
   'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
 OUTPUTFORMAT                                       
   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
 LOCATION                                           
   'hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor'; 
   
// 方式二 : INPUTFORMAT是org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat 
// 这种方式是能够实时读出来写入的数据,会将基于Parquet的基础列式文件、和基于行的Avro日志文件合并在一起呈现给用户。

CREATE EXTERNAL TABLE `hudi_datagen_mor_2`(
  `_hoodie_commit_time` string, 
  `_hoodie_commit_seqno` string, 
  `_hoodie_record_key` string, 
  `_hoodie_partition_path` string, 
  `_hoodie_file_name` string, 
  `id` int, 
  `userid` int, 
  `amt` bigint, 
  `proctime` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor'
TBLPROPERTIES (
  'bucketing_version'='2', 
  'transient_lastDdlTime'='1642573236')
  
  1. 如果是分区表 则 需要 加载数据
   // 添加分区
alter table hudi_datagen_mor add if not exists partition(`partition`='20210414') location 'hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor/20210414';
 
alter table hudi_datagen_mor add if not exists partition(`partition`='20210414') location 'hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor/20210414';
   
   // 查询分区的数据
select * from hudi_datagen_mor where `partition`=20210414;
select * from hudi_datagen_mor where `partition`=20210414;

flink hive on hudi 自动创建表

文档 : https://www.yuque.com/docs/share/01c98494-a980-414c-9c45-152023bf3c17?#

  1. 开启hive 服务
// 依据 hive_sync.mode 选择开启 不同 服务
// 1. hms
nohup hive --service metastore &
// 2. jdbc 默认
nohup hive --service metastore &
nohup hive --service hiveserver2 &
  1. flink中 自动创建表语句
        String sinkDDL ="CREATE TABLE hudi_datagen_mor ( " +
                "id INT Primary key, " +
                "userid INT, " +
                "amt BIGINT, " +
                "proctime TIMESTAMP(3) " +

                " ) WITH ( " +
                "'connector' = 'hudi', " +
                "'path' = 'hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor', " +
                "'table.type' = 'MERGE_ON_READ', " +

                "'write.bucket_assign.tasks' = '1'," +
                "'write.tasks' = '1', " +
                "'compaction.tasks' = '1', " +
                //开启自动创建表
                "'hive_sync.enable' = 'true', " +
                "'hive_sync.mode' = 'hms', " +
                "'hive_sync.table' = 'hudi_datagen_mor3', " + //连接hive的方式 默认是jdbc
                "'hive_sync.db' = 'ods', " +
                "'hive_sync.metastore.uris' = 'thrift://hadoop001:9083', " +
                //是否开启流读
                "'read.streaming.enabled' = 'true' ," +
                "'read.streaming.check-interval' = '4'" +
                ")";
 //jdbc 模式  二选一              
  'hive_sync.metastore.uris' = 'thrift://hadoop001:9083', -- required, metastore的端口
  'hive_sync.jdbc_url'='jdbc:hive2://hadoop001:10000',    -- required, hiveServer地址
  'hive_sync.table'='hudi_datagen_mor3',                          -- required, hive 新建的表名
  'hive_sync.db'='ods',                         -- required, hive 新建的数据库名
  'hive_sync.username'='root',                     -- required, HMS 用户名
  'hive_sync.password'='123456' 


// table.type' = 'MERGE_ON_READ' hive中 自动创建2张表

// rt表--实时读取checkpoint之后的数据 
// ro表-- 读取未合并 之前的数据,即 parquet数据文件中的内容

// table.type' = 'COPY_ON_WRITE' hive中 自动创建 hudi_datagen_cow_ro 一张表

CREATE EXTERNAL TABLE `ods.hudi_datagen_mor3_rt`(
  `_hoodie_commit_time` string COMMENT '', 
  `_hoodie_commit_seqno` string COMMENT '', 
  `_hoodie_record_key` string COMMENT '', 
  `_hoodie_partition_path` string COMMENT '', 
  `_hoodie_file_name` string COMMENT '', 
  `id` int COMMENT '', 
  `userid` int COMMENT '', 
  `amt` bigint COMMENT '', 
  `proctime` bigint COMMENT '')
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
WITH SERDEPROPERTIES ( 
  'hoodie.query.as.ro.table'='false', 
  'path'='hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor3') 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor3'
TBLPROPERTIES (
  'last_commit_time_sync'='20220119153209172', 
  'spark.sql.sources.provider'='hudi', 
  'spark.sql.sources.schema.numParts'='1', 
  'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"userid","type":"integer","nullable":true,"metadata":{}},{"name":"amt","type":"long","nullable":true,"metadata":{}},{"name":"proctime","type":"timestamp","nullable":true,"metadata":{}}]}', 
  'transient_lastDdlTime'='1642576441');


  CREATE EXTERNAL TABLE `ods.hudi_datagen_mor3_ro`(
  `_hoodie_commit_time` string COMMENT '', 
  `_hoodie_commit_seqno` string COMMENT '', 
  `_hoodie_record_key` string COMMENT '', 
  `_hoodie_partition_path` string COMMENT '', 
  `_hoodie_file_name` string COMMENT '', 
  `id` int COMMENT '', 
  `userid` int COMMENT '', 
  `amt` bigint COMMENT '', 
  `proctime` bigint COMMENT '')
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
WITH SERDEPROPERTIES ( 
  'hoodie.query.as.ro.table'='true', 
  'path'='hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor3') 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor3'
TBLPROPERTIES (
  'last_commit_time_sync'='20220119153209172', 
  'spark.sql.sources.provider'='hudi', 
  'spark.sql.sources.schema.numParts'='1', 
  'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"userid","type":"integer","nullable":true,"metadata":{}},{"name":"amt","type":"long","nullable":true,"metadata":{}},{"name":"proctime","type":"timestamp","nullable":true,"metadata":{}}]}', 
  'transient_lastDdlTime'='1642576441')
  
  
 // hudi_datagen_cow_ro
 
 CREATE EXTERNAL TABLE `hudi_datagen_cow_ro`(
  `_hoodie_commit_time` string COMMENT '', 
  `_hoodie_commit_seqno` string COMMENT '', 
  `_hoodie_record_key` string COMMENT '', 
  `_hoodie_partition_path` string COMMENT '', 
  `_hoodie_file_name` string COMMENT '', 
  `id` int COMMENT '', 
  `userid` int COMMENT '', 
  `amt` bigint COMMENT '', 
  `proctime` bigint COMMENT '')
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
WITH SERDEPROPERTIES ( 
  'hoodie.query.as.ro.table'='true', 
  'path'='hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor') 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor'
TBLPROPERTIES (
  'last_commit_time_sync'='20220217144235326', 
  'spark.sql.sources.provider'='hudi', 
  'spark.sql.sources.schema.numParts'='1', 
  'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"userid","type":"integer","nullable":true,"metadata":{}},{"name":"amt","type":"long","nullable":true,"metadata":{}},{"name":"proctime","type":"timestamp","nullable":true,"metadata":{}}]}', 
  'transient_lastDdlTime'='1645080237')
  

你可能感兴趣的