Flink-Zeppelin On FlinkSql

Flink-Zeppelin On FlinkSql

Flink系列文章

  • 更多Flink系列文章请点击Flink系列文章

  • 更多大数据文章请点击大数据好文推荐

摘要

最近在调研流平台,发现各大公司流平台的Web界面都是自己一个团队开发,相当完备。苦于人力、时间有限,想找现成的能提交FlinkSql的Web代码,没找到合适的开源的。但是想起了之前看过的Zeppelin,现在已经支持Flink 1.0且支持DataStream、Table & SQL 等,遂赶紧尝试。如果能走通,后续计划在原数据这块儿看看有什么好办法管理起来。

1 Zeppelin

1.1 简介

  • Apache Zeppelin
  • Apache Zeppelin-preview版
  • Github Zeppelin

Zeppelin基于界面化的笔记本,可实现数据驱动,使用SQL、Scala(没看错,能指直接写Scala代码!)等进行交互式数据分析。

可视化的笔记本是指:

  • 数据摄取
  • 数据发现
    数据发现,根据阿里云数据治理 数据保护伞 数据发现章节所说,是指通过规则配置,帮助您有效识别组织内的敏感数据,以project等不同维度,为您提供可视化的数据资产展示。
  • 数据分析
  • 数据可视化、协作

1.2 架构

Flink-Zeppelin On FlinkSql_第1张图片

1.2.1 Zeppelin Server

Zeppelin Server是Zeppelin最主要的服务之一,负责管理Interpreter。可以启动多个Zeppelin Server组成服务状态一致的集群(使用了Raft协议,具体是 Raft算法库Atomix),共享Notebook、元数据,实现Interpreter负载均衡。

  • 集群模式下的ZeppelinServer就是一个Raft节点(Leader/Follower),而Interpreter是Raft客户端
  • 集群模式中,每个ZeppelinServer都运行了一个集群管理服务(使用Raft算法库Atomix来组件服务状态一致的集群),会复制状态机(ClusterStateMachine)来在ZeppelinServer集群上维护一致性的集群元数据(包括集群服务和进程状态)。
  • 集群模式下,每个ZeppelinServer和Interpreter运行了集群管理客户端,使用Netty连接Raft Cluster Server来维护存储在ClusterStateMachine中的数据,进程停止后元数据信息将被清除。
  • 集群模式下每个ZeppelinServer和Interpreter进程中运行了监控模块,周期性发送心跳给Raft Leader。
    • 普通ZeppelinServer发送了本节点的CPU和内存资源平均使用情况,用以在创建Interpreter时进行分配。

    • Raft Leader ZeppelinServer会监控所有类型节点上报的心跳数据,发现超时就认为不可用并剔除。

  • 集群模式下notebook和interpreter改动会自动同步到集群中所有节点
  • 因为使用Raft协议,选举中必须过半投票选一个节点才能选出Leader,所以应该部署奇数个节点,比如2N + 1个,此时可容忍其中的 N 个节点挂掉

可通过Nginx等技术进行前端代理。多个用户访问域名时,Nginx根据分布式策略来将用户分配到不同的可用的Zeppelin Server,如上图User1/2。

1.2.2 Interpreter架构

Flink-Zeppelin On FlinkSql_第2张图片
每个Interpreter进程都是一个JVM进程,通过thrift和Zeppelin Server交互。单节点模式时在ZeppelinServer本地创建,集群模式时先从集群元数据中查找是否已经存在所需Interpreter如果有就直接通过元数据中的该Interpreter进程的Thrift IP和端口来将note和该进程建立绑定关系;如果不存在就通过元数据找出最空闲的ZeppelinServer节点,并通过Thrift来告知远程ZeppelinServer创建Interpreter进程。

用户使用的notebook运行在Interpreter上,底层就能跑各种引擎、语言。

Interpreter 进程启动后,将会在 Zeppelin Cluster MetaData 中提交自身的元数据信息,关闭时清除。如果没有正常退出导致元数据没有先清理,ZeppelinServer会主动周期性检查该元数据对应的Interpreter的心跳时间戳来确定对应的进程是否存活。

Zeppelin Server和Interpreter进程的通信手段是Netty。

  • InterpreterGroup
    可包含多个Interpreter,比如SparkInterpreter Group 包含了 SparkSqlInterpreter、SparkInterpreter、PySparkInterpreter 等Intepreter。

    当用户使用 SparkInterpreter Group 创建一个 notebook 时,ZeppelinServer 会创建一个独立的 JVM 进程,进程中的 SparkSqlInterpreter 、 SparkInterpreter 、PySparkInterpreter可以共用一个 SparkContext,因为他们属于同一个InterpreterGroup。

    InterpreterGroup和Interpreter关系具体还要取决于Interpreter Binding Mode。

1.2.3 Cluster MetaData

集群中元数据信息,KV键值对格式,包括ZeppelinServer和Interpreter进程元数据,通过State Machine 维护服务状态一致性。

ZeppelinServer和Interpreter都会周期性发送心跳来更新Cluster Metadata中自己的信息,而担任Leader的那个ZeppelinServer会定时检查Cluster Metadata中的时间戳信息,如果有超时的就会清理超时的服务和进程。

  • ZeppelinServer元数据
    Flink-Zeppelin On FlinkSql_第3张图片
  • Interpreter元数据
    Flink-Zeppelin On FlinkSql_第4张图片

1.2.3 Notebook

用户工作的平台,包含若干Paragraph。

默认为local即存在本地,集群模式时应该选择所有集群中的Zeppelin节点都能访问的位置,比如HDFS。

集群模式下,会将修改自动同步到所有ZeppelinServer节点。

1.2.4 Paragraph

一个Notebook包含若干Paragraph,可以共享数据。

比如一个FlinkSQL程序,可以在定义3个Paragraph:

  • source table ddl
  • sink table ddl
  • insert into sink select * from source

1.3 Interpreter

1.3.1 概念

1.3.1.1 概述

Zeppelin interpreter是个重要的组件,可将任何语言和数据处理后端以插件化的方式接入Zeppelin,目前支持的技术栈如下:
Flink-Zeppelin On FlinkSql_第5张图片
如果没有你需要的,那还可以自定义一个:

  • how to create a new interpreter
  • Installing Interpreters

通过Interpreters,我们可以很方便的使用各种语言和数据处理后端,比如可以直接用%flink来直接在Zeepelin中写scala代码。

1.3.1.2 Interpreter Binding Mode

用来控制Notebook和用户的隔离模式:

  • Globally-shared
    所有使用该interpreter的notebook/用户共享一个interpreter JVM进程和session,比如用flink on yarn那就是每次提交的任务都是提交到一个Flink集群执行。此时Note之间可互相访问创建的变量。生产环境不推荐使用。
    Flink-Zeppelin On FlinkSql_第6张图片
  • Per Note-scoped
    每个Note都会创建一个新的interpreter实例且拥有自己的Session,但是在同一个interpreter JVM进程中。此时仍可通过ResourcePool来跨NoteBook交换对象。
    Flink-Zeppelin On FlinkSql_第7张图片
  • Per Note-isolated
    每个Note都会创建一个新的interpreter进程,也拥有独享的Session。此时仍可通过ResourcePool来跨NoteBook交换对象。
    Flink-Zeppelin On FlinkSql_第8张图片

1.3.1.3 Interpreter的生命周期管理

Zeppelin 0.8.0以后提供了LifecycleManager接口来控制interpreter生命周期,0.9.0有两个实现:

  • TimeoutLifecycleManager(默认,可通过zeppelin.interpreter.lifecyclemanager.class切换)
    当interpreter保持空闲一段时间后就会关闭interpreter。默认阈值为1小时,可通过zeppelin.interpreter.lifecyclemanager.timeout.threshold设置。
  • NullLifecycleManager
    什么都不做,由用户控制interpreter生命周期

1.3.2 Interpreter架构

点击这里

1.3.3 Interpreter管理

1.3.3.1 创建Interpreter

Flink-Zeppelin On FlinkSql_第9张图片

  • interpreter group
    创建的时候,最重要的一点是选择interpreter group:
    Flink-Zeppelin On FlinkSql_第10张图片
    每个interpreter都属于某一个interpreter group,一个interpreter group将所有包含的interpreter运行在一个jvm里,以他为单位进行启动/停止。

1.3.3.2 修改Interpreter

也是跟创建一样点击进入Interpreter界面,搜索后可以修改创建的interpreter。
Flink-Zeppelin On FlinkSql_第11张图片

1.3.3.3 Interpreter配置项目

见interpreter

1.3.3.4 Interpreter全局配置

Interpreter有很多配置,可直接在Zeppelin上Web界面上设置,有两类属性:

  • 大写字母代表系统环境变量
    比如flink interpreter中设置FLINK_HOMEHADOOP_CONF_DIR,则会将配置作为环境变量传递给flink interpreter进程,由flink使用。
  • 否则表示普通interpreter属性

关于Context Parameters

  • 还可以通过#{contextParameterName}来使用解释器上下文中的参数:
    Flink-Zeppelin On FlinkSql_第12张图片
    如果context参数为null,则将其替换为空字符串。

1.3.3.5 Interpreter细粒度配置(inline configuration)

上面说的都是interpreter下的通用配置,所有使用该interpreter的Notebook都是用该配置。但有些时候我们想每个notebook单独使用某些配置,虽然可创建单独interpreter但很不方便,所以可使用Inline Generic Configuration

即在notebook最开始的paragraph里面写:

%flink_chengc.conf
flink.execution.mode yarn
flink.tm.memory 2048
flink.jm.memory 1024
flink.yarn.appName chengc
flink.yarn.queue default

1.3.4 Interpreter on yarn

可通过interpreter 配置zeppelin.interpreter.launcher yarn来讲interpreter运行在yarn上,然后会和Zeppelin服务端通过Thrift交互。

具体源码在YarnInterpreterLauncherYarnRemoteInterpreterProcess

可配合flink on yarn使用,则Interpreter在yarn上am 中拉起来后,会去拉起一个flink cluster。相关源码在FlinkScalaInterpreterFlinkInterpreter等。

flink配置文件上传和读取流程如下:

  1. 先用当前本机环境变量将flink配置文件目录上传
  2. 再将envs设为ApplicationConstants.Environment.PWD.$(),即yarn上的am container工作目录
  3. am拉起来后,会去hdfs下载flink配置文件解压到工作目录,并按照 sys.env.getOrElse
    去读取配置的工作目录路径下的flink配置文件,这样就能正确读取到我们本机的flink配置文件了

2 安装、配置和部署

2.1 下载

Download Apache Zeppelin

有三种方式:下载完整已编译二进制包、通过网络安装指定interpreter的包或者源码自己编译安装。

图方便就用第一种吧(不过很大,有1.5G)。

2.2 安装

可参考

  • Zeppelin Install

必须是JDK 1.8(171)以上

下载完后直接解压zeppelin-0.9.0-preview1-bin-all.tgz

tar -zxvf zeppelin-0.9.0-preview1-bin-all.tgz

2.3 配置

2.3.1 常用配置

可参考

  • Apache Zeppelin Configuration

可修改两个配置文件,都配置了同key属性时以环境变量文件为准:

  • conf/zeppelin-env.sh
    一些环境变量
  • conf/zeppelin-site.xml
    一些java属性

我改了几个属性:

  • zeppelin.server.addr
    改为ip,默认127.0.0.1,不然其他机器访问不了

  • zeppelin.server.port
    zeepelin启动后的web端口

  • zeppelin.interpreter.lifecyclemanager.class
    改为org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager,默认是NullLifecycleManager,不会管interpreter是否空闲。而TimeoutLifecycleManager会在interpreter保持空闲状态超过zeppelin.interpreter.lifecyclemanager.timeout.threshold毫秒时,干掉interpreter。

  • zeppelin.interpreter.lifecyclemanager.timeout.threshold
    改为 3600000

  • zeppelin.interpreter.lifecyclemanager.timeout.checkinterval
    改为60000,检测interpreter是否超时的间隔时间

  • zeppelin.recovery.storage.class
    指定zeppelin恢复模式(详见https://mp.weixin.qq.com/s/D02M68HO4Te4cReIoRLMwQ)。

    改为org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage。设定后,关掉Zeppelin主进程不会关掉interpreter进程,重启zeppelin会去重连这些interpreter进程。

    这个时候如果还想干掉所有interpreter进程,请使用bin/stop-interpreter.sh

    默认NullRecoveryStorage,意味着关掉Zeppelin就关掉了所有运行中的interpreter 进程。

  • zeppelin.recovery.dir
    在集群模式下,还应该把此项设为hdfs上路径,如/tmp/zeppelin/recovery。注意不要加如hdfs://namespace,否则路径不对!
    将Notebook保存在共享存储中

  • zeppelin.notebook.storage
    设为org.apache.zeppelin.notebook.repo.FileSystemNotebookRepo

  • zeppelin.notebook.dir
    设为hdfs上的目录,如/tmp/zeppelin/notebook这里注意不能设为hdfs://namespace/xxx,否则路径会有问题!

2.3.2 Interpreter配置

2.3.2.1 通用配置

  • FLINK_HOME
    /xxx/flink
  • flink.execution.mode
    默认local即本地模式,还可用yarn
  • HADOOP_CONF_DIR
    HADOOP配置文件所在路径,如/xxx/etc/hadoop

2.3.2.2 Flink Interpreter配置

可参考

  • Flink interpreter for Apache Zeppelin-Configuration

重要配置如下:

  • flink.interpreter.close.shutdown_cluster
    改为false。

    默认true,即在interpreter关闭时会shutdown应用程序。

    注意,在interpreter on yarn+flink on yarn模式下可以将此设置改为true,效果就是只要你需要重启interpreter来改配置就会停止对应的flink app,达成同步。

  • zeppelin.interpreter.close.cancel_job
    改为false

    默认true,即在interpreter关闭时会cancel我们的flink job。

    注意,在interpreter on yarn+flink on yarn模式下可以将此设置改为true,效果就是只要你需要重启interpreter来改配置就会停止对应的flink app,达成同步。

如果flink任务依赖一些包,可以有三种方式加入依赖:

  • flink.execution.jars
    指定flink job所依赖的普通jar包,所有的jar包都被会load到flink interpreter的classpath,还会被发送到Task Manager。

  • flink.udf.jars
    和flink.execution.jars不同的地方在于Zeppelin会自动检测该选项指定的jar包中所包含的UDF class,会把检测到的UDF注册到TableEnvironment中(UDF的名字就是这个class name),以便用户使用。

    注意:

    • 你的UDF Class必须包含一个无参的构造函数。
    • 这种方式如果实在全局interpreter配置,则UDF也是全局的
    • 还可以在你自己的notebook interpreter inline config里面配置,这种方式就对当前notebook生效
  • flink.execution.packages
    类似flink.execution.jars,但不同的是Zeppelin会下载该选项指定的package以及该package的依赖放到flink interpreter的classpath。比如你想使用kafka connector配置如下

    org.apache.flink:flink-connector-kafka_2.11:1.10.0,org.apache.flink:flink-connector-kafka-base_2.11:1.10.0,org.apache.flink:flink-json:1.10.0
    

    但我实测在测试环境无问题,生产环境因为安全策略不能自动下载,只能去repo1.maven.org手动下载打好的jar(flink-connector-kafka_2.11-1.10.0.jar、flink-connector-kafka-base_2.11-1.10.0.jar、flink-sql-connector-kafka_2.11-1.10.0.jar、flink-json-1.10.0.jar,如果找不到包还可以放入kafka-clients-2.2.0.jar),并upload到使用的flink/lib下,否则会报错如下:

    WARN [2020-05-28 11:41:34,042] ({SchedulerFactory11} NotebookServer.java[onStatusChange]:1901) - Job paragraph_1590573205576_568821782 is finished, status: ERROR, exception: null, result: %text org.apache.zeppelin.interpreter.InterpreterException: org.apache.zeppelin.interpreter.InterpreterException: java.lang.RuntimeException: [unresolved dependency: org.apache.flink#flink-connector-kafka_2.11;1.10.0: not found, unresolved dependency: org.apache.flink#flink-connector-kafka-base_2.11;1.10.0: not found, unresolved dependency: org.apache.flink#flink-json;1.10.0: not found]
            at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:76)
            at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668)
            at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:577)
            at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
            at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
            at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.zeppelin.interpreter.InterpreterException: java.lang.RuntimeException: [unresolved dependency: org.apache.flink#flink-connector-kafka_2.11;1.10.0: not found, unresolved dependency: org.apache.flink#flink-connector-kafka-base_2.11;1.10.0: not found, unresolved dependency: org.apache.flink#flink-json;1.10.0: not found]
            at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:76)
            at org.apache.zeppelin.interpreter.Interpreter.getInterpreterInTheSameSessionByClassName(Interpreter.java:355)
            at org.apache.zeppelin.interpreter.Interpreter.getInterpreterInTheSameSessionByClassName(Interpreter.java:366)
            at org.apache.zeppelin.flink.FlinkSqlInterrpeter.open(FlinkSqlInterrpeter.java:109)
            at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.open(FlinkStreamSqlInterpreter.java:49)
            at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70)
            ... 8 more
    Caused by: java.lang.RuntimeException: [unresolved dependency: org.apache.flink#flink-connector-kafka_2.11;1.10.0: not found, unresolved dependency: org.apache.flink#flink-connector-kafka-base_2.11;1.10.0: not found, unresolved dependency: org.apache.flink#flink-json;1.10.0: not found]
            at org.apache.zeppelin.flink.util.DependencyUtils$.resolveMavenCoordinates(DependencyUtils.scala:353)
            at org.apache.zeppelin.flink.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:60)
            at org.apache.zeppelin.flink.FlinkScalaInterpreter.getUserJars(FlinkScalaInterpreter.scala:740)
            at org.apache.zeppelin.flink.FlinkScalaInterpreter.open(FlinkScalaInterpreter.scala:149)
            at org.apache.zeppelin.flink.FlinkInterpreter.open(FlinkInterpreter.java:66)
            at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70)
            ... 13 more
    

2.3.3 Flink On Yarn + Interpreter On Yarn

2.3.3.1 添加jar包

需要添加一些必要的jar包放在FLINK_HOME//lib下:

  • flink-hadoop-compatibility_{scala_version}-{flink.version}.jar
  • flink-shaded-hadoop-2-uber-{hadoop.version}-{flink-shaded.version}.jar

如果要采用Hive来存元数据或访问hive还需要:

  • flink-connector-hive_2.11-1.10.0.jar
  • hive-exec-2.3.3.jar

2.3.3.2 设置interpreter

2.3.3.2.1 flink on yarn
  • flink.execution.mode
    yarn
2.3.3.2.2 interpreter on yarn

最新master分支中,还可通过interpreter 配置zeppelin.interpreter.launcher yarn来讲interpreter运行在yarn上,然后会和Zeppelin服务端通过Thrift交互,并向yarn申请资源来启动flink cluster。详见Flink on Zeppelin  (7). Yarn Interpeter 模式。

此模式其他要求:

  • 安装Hadoop client (hadoop 2和3都支持),要求能在本机直接运行hadoop classpath命令。
    zeppelin需要调用此命令将所有hadoop jar放入zeppelin inclasspath。
  • 环境变量中配置USE_HADOOP=true(也可在zeppelin-env.sh)、HADOOP_CONF_DIR

interpreter local + flink on yarn:
Flink-Zeppelin On FlinkSql_第13张图片
interpreter on yarn + flink on yarn:
Flink-Zeppelin On FlinkSql_第14张图片

相关interpreter配置如下:

  • zeppelin.interpreter.launcher
    yarn
  • zeppelin.interpreter.yarn.resource.memory
    默认1GB,单位为MB。指定interpreter进程内存。
  • zeppelin.interpreter.yarn.resource.cores
    默认1。指定interpreter进程cpu核数。
  • zeppelin.interpreter.yarn.queue
    默认default,指定interpreter进程提交运行的yarn 队列。

2.4 启动

进入安装好的zeppelin目录后,执行

bin/zeppelin-daemon.sh start

随后就可以访问Zeepelin Web界面了:
Flink-Zeppelin On FlinkSql_第15张图片
可以看到,已经有了一些现成的Notebook示例。

2.5 停止

bin/zeppelin-daemon.sh stop

2.6 重启

bin/zeppelin-daemon.sh restart

2.7 集群模式

重要说明:

  • 我使用集群模式运行一段时间以后,发现各个节点包括interpreter都报了很多关于Raft协议错误,而且是突然就崩了,导致zeppelin无法正常使用。由于zeppelin关于recover和interpreter on yarn模式已经合并到主分支,而且测试无误,所以我们放弃集群模式。

2.7.1 Zeppelin全局配置

  • 必须将Notebook保存在共享存储中

    • zeppelin.notebook.storage
      设为org.apache.zeppelin.notebook.repo.FileSystemNotebookRepo
    • zeppelin.notebook.dir
      设为hdfs上的目录,如/tmp/zeppelin/notebook这里注意不能设为hdfs://namespace/xxx,否则路径会有问题!
  • zeppelin.recovery.dir
    在集群模式下,还应该把此项设为hdfs上路径,如/tmp/zeppelin/recovery。注意不要加如hdfs://namespace,否则路径不对!

  • zeppelin.cluster.addr
    配置集群中的所有ZeppelinServerIP及Raft端口,Raft据此进行Leader选举、元数据维护等。多个地址用逗号分隔。

    这里需要注意的是,Raft端口号配置不要和zeppelin.server.port配置相同,否则会造成冲突!

注意事项:

  • 因为使用Raft协议,选举中必须过半投票选一个节点才能选出Leader,所以应该部署奇数个节点,比如2N + 1个,此时可容忍其中的 N 个节点挂掉

2.7.2 Flink Interpreter配置

如果在Zeppelin集群模式下使用FlinkSql on Yarn,需要做如以下配置,否则报错:

  • FLINK_CONF_DIR
    /FLINK_HOME/conf
  • FLINK_PLUGINS_DIR
    /FLINK_HOME/plugins
  • FLINK_LIB_DIR
    /FLINK_HOME/lib

该错误我已经提交了issue给社区,详情可见:ZEPPELIN-4809

2.8 Hive整合

2.8.1 基本配置

主要是可以让Flink使用Hive Catalog存储Flink SQL 元数据(可参考HiveCatalog,注意这种表只能由Flink读写使用,不要用Hive去读写。可以在Hive命令行中使用DESCRIBE FORMATTED命令查看表的元数据,如果是is_generic=true代表是Flink专用表),也可以直接使用Flink读写Hive表数据。

需要将以下包放入$FLINK_HOME/lib:

  • flink-connector-hive_2.11-1.10.0.jar
  • hive-exec-2.3.3.jar

然后设置flink interpreter:

  • HIVE_CONF_DIR
    设为hive-site.xml所在目录
  • zeppelin.flink.enableHive
    设为true,启用hive
  • zeppelin.flink.hive.version
    使用的hive 版本号

随后,使用flinksql注册的表会自动保存到hive default库里。

2.9 checkpoint相关

可以使用flink 配置,实现0代码配置checkpoint。

具体请参考flink-checkpoint配置

2.10 权限

zeppelin可采用LDAP做身份认证+shiro做权限控制。

相关内容可参考:

  • Apache Shiro Configuration
  • Apache Shiro authentication for Apache Zeppelin
  • Apache Zeppelin 基于 kerberos 多租户集成
  • Zeppelin集成Ldap(FreeIPA)
    需要改主机名,会导致hadoop等很多服务不正常
  • zeppelin集成openldap,以及admin用户设置

2.11 数据脱敏-Credentials

比如我们有一些ddl中定义了数据库连接信息,这些信息十分敏感不想暴露给其他人,这个时候我们可以用Credentials。

  1. Credentials配置
    先在interpreter配置injectCredentials true,也可在notebook界面做配置,比如执行时使用%flink(injectCredential=true)
    在这里插入图片描述

  2. Credentials打开
    Flink-Zeppelin On FlinkSql_第16张图片

  3. Credentials定义
    Flink-Zeppelin On FlinkSql_第17张图片
    这里的Entity就相当于是你的Credentials的Key,在访问时使用。

    需要注意的是,每个人创建的Credentials对其他人都不可见,别人也无法使用。

  4. Credentials使用
    再次强调,每个人只能使用自己的Credentials。格式为user.EntityNamepassword.EntityName。(因0.9-preview尚不是稳定版本,可能存在一定变化,还可尝试EntityName.user)。

    本用户使用Credentials效果:
    Flink-Zeppelin On FlinkSql_第18张图片
    用户使用其他人的Credentials效果,打出的是原始字符串而不是Credentials:
    Flink-Zeppelin On FlinkSql_第19张图片

2.12 Pyflink

2.12.1 概述

  1. 必须使用版本号3.5-3.7的python

  2. pip install apache-flink
    如果安装很慢或者超时,可以加参数pip --default-timeout=100000 install -i https://pypi.tuna.tsinghua.edu.cn/simple apache-flink。

    如果提示pip需要升级,可执行pip --default-timeout=100000 install -i https://pypi.tuna.tsinghua.edu.cn/simple --upgrade pip

  3. flink-python_2.11-1.10.0.jar$FLINK_HOME/opt移动到$FLINK_HOME/lib:

    cp opt/flink-python_2.11-1.10.0.jar lib/
    
  4. 配置好flink_interpreter的zeppelin.pyflink.python为python路径

2.12.2 python UDF

%flink.pyflink
class PythonUpper(ScalarFunction):
    def eval(self, s):
        return s.upper()
        
bt_env.register_function("python_upper", udf(PythonUpper(), DataTypes.STRING, DataTypes.STRING()))

3 实用功能

3.1 定时调度

  • Running a Notebook on a Given Schedule Automatically
    前提是需要将zeppelin的配置zeppelin.notebook.cron.enable设为true

3.2 数据可视化

3.2.1 概述

已经支持一些基本图标,任意后端输出都可以以图表方式展现!(不用后端人员再去学什么echarts了)
Flink-Zeppelin On FlinkSql_第20张图片
Flink-Zeppelin On FlinkSql_第21张图片

3.2.2 聚合指标运算

直接可以拖拽方式生成。

Flink-Zeppelin On FlinkSql_第22张图片
还想了解更多或二次开发就参考:

  • basic display systems
  • Angular AP 前端I
  • Angular API 后端

3.2.3 动态表格

Dynamic Forms 是Zeppelin的一个高级功能,允许用户在代码中插入UI控件来允许用户定制化你的代码。Jdbc Interpreter支持这一功能,用户可以定制SQL,下面是一个下拉框的例子。

3.2.4 发布Zeppelin笔记本

可以直接将你的Zeppelin笔记本url分享给其他写作者,则大家都可以看到实时更新。

4 Zeppelin高可用

4.1 ZeppelinServer服务不可用

Flink-Zeppelin On FlinkSql_第23张图片
如上图,如果ZeppelinServer1 突然挂掉不可用,如果正确配置了相关配置zeppelin.recovery.storage.class,则不会影响其上运行的interpreter进程,如果此时这些进程可访问,则其他ZeppelinServer节点可以通过Cluster Metadata读取到这些interpreter进程元数据信息,让用户继续使用这些interpreter进程。

当然Nginx也会发现ZeppelinServer1出现异常,将它视为离线状态。

那么现在本来是用ZeppelinServer1的User1再次启动NoteBook时,Nginx会将请求发送到其他ZeppelinServer节点来使用之前的interpreter进程。

4.2 Zeppelin节点整个不可用

Flink-Zeppelin On FlinkSql_第24张图片
当ZeppelinServer1所在节点整个挂掉时,其他ZeppelinServer会删除无效元数据,并重建interpreter进程。

5 二次开发

5.1 概述

Zeppelin前后端分离架构,可参考:

  • Contributing to Documentation
  • Contributing to Zeppelin-Web
    有很多关于Zeppelin前端开发的详细内容
  • Github-Zeppelin Web Application
    安装yarn_package和启动zeppelin-web指导
  • Contributing to Apache Zeppelin ( Website )

5.2 前端

  1. 首先要安装npm和nodejs,网上找教程即可。
  2. 然后参考Github-Zeppelin Web Application安装yarn(打包用的,不是hadoop那个yarn)
  3. 在本地Zeppelin工程目录下使用常规方式启动Zeppelin-server,默认是8080端扣
  4. 在本地Zeppelin工程/zeppelin-web 目录下执行yarn run dev即可开始调试,会自动连接到8080端口,并启动一个9000端口供访问和调试。现在你在ide里面改动js,便可立即体现在页面上了(当然,需要刷新一次)。
  5. 如果本地zeppelin-server运行很慢,可以将项目打包后放到运行速度较快的其他机器如测试环境机器,然后启动。本机的zeppelin-web只需修改base-url.service.js,假设远程zeppelin-server ip为192.168.1.1,则改动如下:
    this.getWebsocketUrl = function() {
        let wsProtocol = location.protocol === 'https:' ? 'wss:' : 'ws:';
        return wsProtocol + '//' + "192.168.1.1" + ':' + this.getPort() +
          skipTrailingSlash(location.pathname) + '/ws';
      };
    
      this.getBase = function() {
        return location.protocol + '//' + "192.168.1.1" + ':' + this.getPort() + location.pathname;
      };
    

问题:

  • 如果遇到lint:once相关语法检测错误,可以把/zeppelin/zeppelin-web/package.json中的"prebuild": "npm-run-all clean lint:once" 改为prebuild": "npm-run-all clean,并去掉"lint:once": "eslint src"
  • 如果遇到一些包卡住半天下不动,那就手动下下来放在那个目录里。

5.3 后端

好文推荐

Apache Zeppelin公众号
Flink-Zeppelin On FlinkSql_第25张图片

钉钉讨论群
Flink-Zeppelin On FlinkSql_第26张图片

官方

  • Zeppelin-Jira
  • Github-Apache Zeppelin

视频教程

  • Flink on Zeppelin: 极致体验(1) 入门 + Batch
  • Flink on Zeppelin: 极致体验(2) Streaming + 高级用法
  • Flink on Zeppelin 视频教程全集
    来自阿里的章剑锋,Zeppelin PMC

综合

  • 章剑锋Jeff-Zeppelin专栏
  • Flink on Zeppelin (1) - 入门篇
  • Flink on Zeppelin (2) - Batch篇
  • Flink on Zeppelin (3) - Streaming篇
  • Flink on Zeppelin  (4) - 高级特性篇
  • Flink on Zeppelin (5) - 机器学习篇
  • Flink on Zeppelin 极致体验 阿里章剑锋 - 直播回放
  • Flink Sql on Zeppelin教程

架构和原理

  • Zeppelin 分布式架构设计
  • Zeppelin工作机制解析

调研

  • Zeppelin调研与数据开发平台

源码

  • Zeppelin源码分析
  • Apache Zeppelin源码结构分析
  • Understanding Zeppelin Interpreters
    介绍了除Flink以外的一些Interpreters
  • Zeppelin求学之路(3)—Zeppelin基本模块介绍和Paragraph源码深入了解以及Note,NoteBook 简介,
  • Zeppelin源码阅读之更新notebook的paragraph部分

使用

  • Zeppelin: 让大数据插上机器学习的翅膀
    网易杭州研究院数据科学中心机器学习开发组负责人 刘勋
  • 如何在Apache Zeppelin中玩转Flink
  • Hadoop - Zeppelin 使用心得
  • Apache Zeppelin主要界面和基本操作
  • 可视化分析工具Apache Zeppelin:数据分析从未这样简单
    介绍了一些数据源连接配置、可视化插件(地图、热力图等)
  • Apache Zeppelin 基于 kerberos 多租户集成
  • Apache zeppelin binding mode
    关于interpreter隔离的讨论。看起来per-note模式中也会发生,不同用户使用不同note对应同一个interpreter,一个用户重启该interpreter会导致全部interpreter重启,对应任务停止!

二次开发

  • Zeppelin Notebook Now Has a Stop Button
    Notebook界面添加关闭按钮
  • Zeppelin在求学之路----在Zeppelin上开发SendMai功
  • zeppelin的数据集的优化

参考文档

  • Apache Zeppelin
  • Flink on Zeppelin (1) - 入门篇
  • Flink on Zeppelin (2) - Batch篇
  • Flink on Zeppelin (3) - Streaming篇
  • Flink on Zeppelin  (4). 高级特性篇
  • Zeppelin 分布式架构设计
  • 如何在Zeppelin里玩转Hive
  • Zeppelin 0.8.0 New Features
    来自Zeppelin PMC Jeff Zhang
  • 数据治理 数据保护伞 数据发现
  • Zeppelin安全机制之Credentials使用技巧

你可能感兴趣的