Flink 测试篇
背景介绍
Flink 可以将上游数据进行流式处理、预聚合、join、清洗等之后写入下游。在部署完 Flink 环境之后,同时需要准备一套 MySQL 和 TiDB 环境来进行下面的测试。测试内容根据数据流向主要分为 MySQL → Flink、Flink → TiDB 以及 MySQL → Flink → TiDB 三部分,测试数据在 MySQL 与 TiDB 之间通过 Flink 进行同步。
测试内容
MySQL → Flink
测试 mysql 环境同步数据到 Flink 的同步状态是否正常,操作步骤为:在上游 MySQL 环境创建 t1 表并写入数据,在 Flink 侧查询对应的 source 表是否有数据结果。
下载 jar 包
cd /home/flink-1.12.1/lib &
wget
https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-mysql-cdc/1.1.0/flink-sql-connector-mysql-cdc-1.1.0.jar
同时拷贝该包至其他两个节点
重启集群
[root@ee-5143 bin]# ./stop-cluster.sh & ./start-cluster.sh
建表
在上游 MySQL 建表,同时 MySQL 开启 Binlog
CREATE TABLE `t1` (
`id` int(11) DEFAULT NULL,
`name` varchar(20) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8
启动并测试
启动 FlinkSQL CLI
[root@n5162 bin]# ./sql-client.sh embedded
No default environment specified.
Searching for '/home/flink-1.12.1/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/home/flink-1.12.1/conf/sql-client-defaults.yaml
No session environment specified.
Command history file path: /root/.flink-sql-history
Flink SQL> create table source (id int, name string) with (
> 'connector' = 'mysql-cdc',
> 'hostname' = '172.16.4.222',
> 'username' = 'dm',
> 'password' = '123456',
> 'database-name' = 'flink_t',
> 'table-name' = 't1'
> );
[INFO] Table has been created.
在 MySQL 侧插入数据
MySQL [flink_t]> insert into t1(id,name) values(1,'mysql');
Query OK, 1 row affected (0.00 sec)
MySQL [flink_t]> insert into t1(id,name) values(2,'mysql');
Query OK, 1 row affected (0.00 sec)
MySQL [flink_t]> insert into t1(id,name) values(3,'mysql');
Query OK, 1 row affected (0.01 sec)
MySQL [flink_t]> insert into t1(id,name) values(4,'mysql');
Query OK, 1 row affected (0.00 sec)
MySQL [flink_t]> insert into t1(id,name) values(5,'mysql');
Query OK, 1 row affected (0.00 sec)
在 Flink 侧 select source 表
Flink SQL> select * from source;
[INFO] Result retrieval cancelled.
通过查询 source 可以看到在 Flink 里面数据信息
Flink → TiDB
测试 Flink 环境同步数据到 TiDB 的同步状态是否正常,操作步骤为:在 Flink 环境创建 target 表并写入数据,在 TiDB 侧查询对应的 t 表是否有数据结果。
下载 jar 包
cd /home/flink-1.12.1/lib & wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.12.0/flink-connector-jdbc_2.12-1.12.0.jar
重启集群
[root@ee-5143 bin]# ./stop-cluster.sh & ./start-cluster.sh
建表
在下游 TiDB 环境建表并插入测试数据
MySQL [upstream_f]> create database downstream_f;
Query OK, 0 rows affected (0.11 sec)
MySQL [upstream_f]> use downstream_f
Database changed
MySQL [downstream_f]> create table t(id int,name varchar(20));
Query OK, 0 rows affected (0.11 sec)
MySQL [downstream_f]> insert into t (id,name) values(1,'aa');
Query OK, 1 row affected (0.02 sec)
MySQL [downstream_f]> insert into t (id,name) values(2,'bb');
Query OK, 1 row affected (0.00 sec)
MySQL [downstream_f]> insert into t (id,name) values(3,'down');
Query OK, 1 row affected (0.00 sec)
启动并测试
启动 FlinkSQL CLI
[root@n5162 bin]# ./sql-client.sh embedded
No default environment specified.
Searching for '/home/flink-1.12.1/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/home/flink-1.12.1/conf/sql-client-defaults.yaml
No session environment specified.
Command history file path: /root/.flink-sql-history
Flink SQL> CREATE TABLE target (
> id int,
> name string
> ) with (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://172.16.4.136:6000/downstream_f',
> 'table-name' = 't',
> 'username' = 'root',
> 'password' = ''
> );
[INFO] Table has been created.
Flink 侧插入数据
Flink SQL> insert into target (id,name) values(1,'TiDB');
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 7a7302c992dd9292aed614c9ef4899d5
Flink SQL> insert into target (id,name) values(2,'TiDB');
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 099350466e1e29125b85378749d91a89
Flink SQL> insert into target (id,name) values(3,'TiDB');
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 9430d992087cc66c98555e63c7d0d029
Flink SQL> insert into target (id,name) values(4,'TiDB');
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: ef2125a6ce7e1be0846cab384ee4ea0c
Flink SQL> insert into target (id,name) values(5,'TiDB');
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 04c694a449e5a70153b1b79de707adff
TiDB 查询结果
MySQL [downstream_f]> select * from t;
+------+------+
| id | name |
+------+------+
| 1 | TiDB |
| 2 | TiDB |
| 3 | TiDB |
| 4 | TiDB |
| 5 | TiDB |
+------+------+
5 rows in set (0.01 sec)
MySQL → Flink → TiDB
全量同步
测试对于 MySQL 环境已有的数据能否正常同步到下游的 TIDB 环境。首先在上游 MYSQL 侧对应的数据表存在数据,在 Flink 侧创建 source、target 表,并通过 insert into select 同步已有的 MySQL 数据到 TiDB 的侧,通过查询 TiDB 侧是否有 MySQL 侧全部数据来验证全量同步功能。
Flink SQL> create table source (id int primary key, name string) with (
> 'connector' = 'mysql-cdc',
> 'hostname' = '172.16.4.222',
> 'username' = 'dm',
> 'password' = '123456',
> 'database-name' = 'flink_t',
> 'table-name' = 't1'
> );
[INFO] Table has been created.
Flink SQL> CREATE TABLE target (
> id int primary key,
> name string
> ) with (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://172.16.4.136:6000/downstream_f',
> 'table-name' = 't',
> 'username' = 'root',
> 'password' = '',
> 'sink.buffer-flush.max-rows' = '1', 'sink.buffer-flush.interval' = '0'
> );
[INFO] Table has been created.
Flink SQL> select * from source;
[INFO] Result retrieval cancelled.
Flink SQL> select * from target;
[INFO] Result retrieval cancelled.
Flink SQL> insert into target select * from source;
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 683e4cfd7ae07762f212cdec869885d2
Flink SQL> insert into target select * from source;
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.
TiDB 侧
MySQL [downstream_f]> select * from t;
+------+-------+
| id | name |
+------+-------+
| 1 | mysql |
| 2 | mysql |
| 3 | mysql |
| 4 | mysql |
| 5 | mysql |
+------+-------+
5 rows in set (0.00 sec)
后台 job
通过 WEB UI 查看 job 执行情况
- 查看 job 情况
[root@n5162 bin]# ./flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
19.01.2021 11:44:10 : 683e4cfd7ae07762f212cdec869885d2 : default: insert into target select * from source (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
- 停止 job
- flink cancel
[root@n5162 bin]# ./flink cancel 683e4cfd7ae07762f212cdec869885d2
Cancelling job 683e4cfd7ae07762f212cdec869885d2.
Cancelled job 683e4cfd7ae07762f212cdec869885d2.
- web UI
增量同步
上游 MySQL 不断写入、删除、修改数据,在下游 TiDB 不断查询表来验证增量同步功能
INSERT
在上游 MySQL 环境插入数据
MySQL [flink_t]> insert into t1(id,name) values(6,'mysql');
Query OK, 1 row affected (0.00 sec)
MySQL [flink_t]> insert into t1(id,name) values(7,'mysql');
Query OK, 1 row affected (0.00 sec)
在下游 TiDB 侧查询对应的表
MySQL [downstream_f]> select * from t;
+------+-------+
| id | name |
+------+-------+
| 1 | mysql |
| 2 | mysql |
| 3 | mysql |
| 4 | mysql |
| 5 | mysql |
| 6 | mysql |
| 7 | mysql |
+------+-------+
7 rows in set (0.00 sec)
UPDATE
在上游 MySQL 侧更新数据
MySQL [flink_t]> update t1 set name='TiDB' where id =7;
Query OK, 1 row affected (0.02 sec)
Rows matched: 1 Changed: 1 Warnings: 0
在下游 TiDB 侧查询数据
MySQL [downstream_f]> select * from t;
+------+-------+
| id | name |
+------+-------+
| 1 | mysql |
| 2 | mysql |
| 3 | mysql |
| 4 | mysql |
| 5 | mysql |
| 6 | mysql |
| 7 | mysql |
| 7 | TiDB |
+------+-------+
8 rows in set (0.01 sec)
DELETE
在上游 MySQL 侧 delete 数据
MySQL [flink_t]> delete from t1 where id=7;
Query OK, 1 row affected (0.01 sec)
在下游 TiDB 侧查询数据是否存在
MySQL [downstream_f]> select * from t;
+------+-------+
| id | name |
+------+-------+
| 1 | mysql |
| 2 | mysql |
| 3 | mysql |
| 4 | mysql |
| 5 | mysql |
| 6 | mysql |
+------+-------+
6 rows in set (0.00 sec)
表 join
在上游 MySQL 创建两张表,通过 Flink 进行 join 同步之后,观察下游 TiDB 侧数据情况
在上游 mysql 创建两张表并写入数据
MySQL [flink_t]> select * from t;
+----+------+
| id | name |
+----+------+
| 1 | a |
| 2 | b |
| 3 | c |
| 4 | d |
+----+------+
4 rows in set (0.00 sec)
MySQL [flink_t]> select * from t1;
+----+-------+------+
| id | name | age |
+----+-------+------+
| 1 | mysql | 20 |
| 2 | tidb | 10 |
| 3 | redis | 10 |
| 4 | redis | 8 |
+----+-------+------+
4 rows in set (0.00 sec)
在下游创建表
MySQL [downstream_f]> show create table t\G
*************************** 1. row ***************************
Table: t
Create Table: CREATE TABLE `t` (
`id` int(11) NOT NULL,
`name` varchar(20) DEFAULT NULL,
`age` int(11) DEFAULT '0',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
1 row in set (0.01 sec)
MySQL [downstream_f]> truncate table t;
Query OK, 0 rows affected (2.03 sec)
MySQL [downstream_f]> select * from t;
Empty set (0.00 sec)
在 Flink client 将两张表进行 join 并写入到下游 TiDB 侧
Flink SQL> create table source_t (id int primary key, name string) with (
> 'connector' = 'mysql-cdc',
> 'hostname' = '172.16.4.222',
> 'username' = 'dm',
> 'password' = '123456',
> 'database-name' = 'flink_t',
> 'table-name' = 't'
> );
[INFO] Table has been created.
Flink SQL> create table source (id int primary key, name string, age int) with (
> 'connector' = 'mysql-cdc',
> 'hostname' = '172.16.4.222',
> 'username' = 'dm',
> 'password' = '123456',
> 'database-name' = 'flink_t',
> 'table-name' = 't1'
> );
[INFO] Table has been created.
Flink SQL> CREATE TABLE target (
> id int primary key,
> name string,
> age int
> ) with (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://172.16.4.136:6000/downstream_f',
> 'table-name' = 't',
> 'username' = 'root',
> 'password' = '',
> 'sink.buffer-flush.max-rows' = '1', 'sink.buffer-flush.interval' = '0'
> );
>
[INFO] Table has been created.
Flink SQL> insert into target select source.id,source.name,source.age from source_t,source where source_t.id=source.id;
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 4462252bff0b370eecd6cd641ba0a950
TiDB 侧查询数据信息
MySQL [downstream_f]> select * from t;
+----+-------+------+
| id | name | age |
+----+-------+------+
| 1 | mysql | 20 |
| 2 | tidb | 10 |
| 3 | redis | 10 |
| 4 | redis | 8 |
+----+-------+------+
4 rows in set (0.00 sec)
WEB UI
web 查看 job 执行情况