Flink 最佳实践之从零开始部署高可用Flink Standalone 集群

Flink 部署篇

背景介绍

Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。随着 ApacheFlink1.20 发布了许多令人兴奋的新特性,包括 FlinkSQL 中的许多开发,这些开发正在快速发展。本文以及后面的介绍部分,从实用的角度详细介绍了如何使用 FlinkSQL 快速构建流应用程序。首先介绍 Flink 的部署,其次将介绍如何将 MySQL, FlinkSQL 以及 TiDB 集成以实现实时数据分析的功能。文章中出现的所有练习都在 FlinkSQL CLI 中执行,整个过程使用标准 SQL 语法。

Flink 部署有多种方式,本文使用 standalone cluster 方式来部署一套环境主要进行功能的测试和尝鲜。搭建需要三台主机,每个节点之间需要有 ssh 互信,角色分配为一个 master 两个 worker 节点。

角色分配

node1 master
node2 worker
node3 worker

node{ID} 为具体的 IP 信息

互信配置

[root@ee-4190 ~]# ssh-keygen  -t rsa
Generating public/private rsa key pair.
Enter file in which to save the key (/root/.ssh/id_rsa):
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in /root/.ssh/id_rsa.
Your public key has been saved in /root/.ssh/id_rsa.pub.
The key fingerprint is:
SHA256:rFx1AoGSb0cVYig9xZJX4/fo7ZfMWOWRixOPprFy7Hc root@ee-4190
The key's randomart image is:
+---[RSA 2048]----+
|     o **+=.     |
|    + B.++ .     |
|     + =  + o   .|
|      o... + + o.|
|     . .S   . *.+|
|     . o   o * +.|
|      o   . * B .|
|         . = + E.|
|          +.. o. |
+----[SHA256]-----+
[root@ee-4190 ~]# cd ./.ssh/
[root@ee-4190 .ssh]# ls
authorized_keys  id_rsa  id_rsa.pub
[root@ee-4190 .ssh]# pwd
/root/.ssh
[root@ee-4190 .ssh]#
[root@ee-4190 .ssh]# cat /root/.ssh/id_rsa.pub | ssh root@node2  'cat >> /root/.ssh/authorized_keys'
The authenticity of host 'node2 (node2)' can't be established.
ECDSA key fingerprint is SHA256:MscWpWMS4N+bVrrgrwJCj/cW5CNx5LLWhxpqgdI5ieA.
ECDSA key fingerprint is MD5:8a:eb:7c:b2:75:85:9d:46:c0:00:de:a9:24:13:99:70.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added 'node2' (ECDSA) to the list of known hosts.
root@node2's password:
[root@ee-4190 .ssh]# ssh node2
Last login: Tue Jan 12 16:40:33 2021 from 192.168.190.102
[root@ee-5127 ~]# exit
登出

每个节点之间分别做免密互信即可

分发

下载 Flink 安装包并分发到其他两个节点

wget https://mirror.bit.edu.cn/apache/flink/flink-1.12.1/flink-1.12.1-bin-scala_2.12.tgz
tar vxzf flink-1.12.1-bin-scala_2.12.tgz
scp -r flink-1.12.1 root@xxx:/home/

修改配置

在之前分配的 master 节点修改配置文件

  • 修改 /home/flink-1.12.1/conf/flink-conf.yaml 文件
…
jobmanager.rpc.address: node1
jobmanager.memory.process.size: 4600m
taskmanager.memory.process.size: 4728m
taskmanager.numberOfTaskSlots: 10
…
  • 编辑 masters 文件,补充 master 信息(ip:port)
[root@ee-5143 conf]# cat masters
node1:8081
  • 编辑 workers 文件,补充 worker 信息
[root@ee-5143 conf]# cat workers
node2
node3 

配置分发

将 master 节点上修改的 …/conf/ 下面的配置同步到其他两个节点,可以将 conf 下面的所有文件拷贝到其他两个节点对应目录下进行覆盖即可

配置 Jre

下载 jre1.8.0_271 包并进行配置

  • 解压并分发到其他两个节点

  • 配置 java home(以下方式二选一)

1、 只配置 Flink 的 JAVA_HOME

echo "env.java.home: /home/flink-1.12.1/jre1.8.0_271" >> /home/flink-1.12.1/conf/flink-conf.yaml

2、 配置系统的 JAVA_HOME

  • 官网下载 JDK ,下载 JDK 之后解压到指定目录

  • 在 /ect/profile 下面添加

export JAVA_HOME=/home/jdk-15.0.2
export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib/
export PATH=$PATH:$JAVA_HOME/bin

并执行

source  /etc/profile
  • 检查
[root@node215 home]# java -version
java version "15.0.2" 2021-01-19
Java(TM) SE Runtime Environment (build 15.0.2+7-27)
Java HotSpot(TM) 64-Bit Server VM (build 15.0.2+7-27, mixed mode, sharing)

启动

  • 启动集群

在 ${deploy}/flink…/bin 目录下有 start-cluster.sh 脚本来启动集群,当输出如下信息时,启动成功

[root@ee-5143 bin]# pwd
/home/flink-1.12.1/bin
[root@ee-5143 bin]# ./start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host ee-5143.
Starting taskexecutor daemon on host ee-5127.
Starting taskexecutor daemon on host ee-4190.
[root@ee-5143 bin]#

在启动过程中出现启动失败等异常时,可以在 log 目录下分析日志排查原因。

  • 验证集群启动情况

使用 jps 检查进程

[root@n5138 ~]# jps
1869 Jps
12623 TaskManagerRunner

[root@m4136 ~]# jps
4595 Jps
32455 TaskManagerRunner

[root@n5162 ~]# jps
14597 TaskManagerRunner
22207 Jps

验证

  • 数据准备

在一个节点生成以下数据

for i in `seq 100000`; do echo "$i" >> /home/flink-1.12.1/test/wc.txt ; done

在数据文件生成之后,同时需要拷贝到其他两个节点…

  • 运行

在 master 节点提交任务

[root@ee-5143 bin]#  ./flink run -m node1:8081 /home/flink-1.12.1/examples/batch/WordCount.jar --input  /home/flink-1.12.1/test/wc.txt --output  /home/flink-1.12.1/test/wcout.txt
  • 结果
[root@ee-5143 bin]#  ./flink run -m node1:8081 /home/flink-1.12.1/examples/batch/WordCount.jar --input  /home/flink-1.12.1/test/wc.txt --output  /home/flink-1.12.1/test/wcout.txt
Job has been submitted with JobID 54d6c45d7faf00acccf066a6aee39663
Program execution finished
Job with JobID 54d6c45d7faf00acccf066a6aee39663 has finished.
Job Runtime: 1401 ms
 
[root@ee-5127 test]# ls
wcout.txt  wc.txt

正常执行完成之后的输出如上图,会生成一个 wcount 文件,需要注意的是该文件是不在当前节点生成,可以去其他节点查找。

  • WEB UI

通过 master 的 IP 和 PORT 即可看到执行的详细信息

在该 UI 上可以看到当前正在执行的 job 信息,同时会显示已经执行完成的 job 信息,当然如果执行异常了,也可以通过页面的 LOG 信息来判断分析。

3赞