Flink CDC模式写入Hudi

1、前沿

        之前对数据湖的相关知识和怎么搭建都做了详细的讲解,感兴趣的可以去了解下

数据湖基本概念--什么是数据湖,数据湖又能干什么?为什么是Hudi_一个数据小开发的博客-CSDN博客

从0到1搭建数据湖Hudi环境_一个数据小开发的博客-CSDN博客

接下来,就是Flink on Hudi的实战了,这一篇带来的CDC模式的入湖。

2、实战

2.1、启动本地环境

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./sql-client.sh embedded -j ../lib/hudi-flink-bundle_2.11-0.10.1.jar shell

2.2、新建MySQL表

-- 新建mysql表
drop table if exists flink_hudi.flink_hudi_mysql_table;
create table if not exists flink_hudi.flink_hudi_mysql_table (id int,name varchar(100));

2.3、新建Flink MySQL CDC表


CREATE TABLE mysql_cdc_name_table (
  id int,
  name string
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'ip',
  'port' = '3306',
  'username' = 'root',
  'password' = 'xxxx',
  'database-name' = 'flink_hudi',
  'table-name' = 'flink_hudi_mysql_table',
  'scan.incremental.snapshot.enabled' = 'false'
);

2.4、新建Flink Hudi表

CREATE TABLE hudi_cow_cdc(
  id int,
  name string,
  PRIMARY KEY(id) NOT ENFORCED
)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://localhost:9000/user/root/hudi/hudi_cow_cdc',
  'table.type' = 'COPY_ON_WRITE',
  'write.insert.drop.duplicates' = 'true'
); 

2.5、实时读取mysql数据写入hudi

insert into hudi_cow_cdc select * from mysql_cdc_name_table;

2.6、插入数据

insert into flink_hudi.flink_hudi_mysql_table values(1,'XiaoM');

2.7、跑Spark查看数据

此处可以查看上一篇博客中的Spark读取Hudi的数据的demo

Flink SQL Kafka写入Hudi详解_一个数据小开发的博客-CSDN博客

2.8、delete操作数据

delete from flink_hudi.flink_hudi_mysql_table where id = 1;

可以看到根据上述的操作后,查看数据观察得知,Flink CDC入湖完整

你可能感兴趣的