Flink 最佳实践之 MySQL、TiDB 数据同步测试方案

Flink 测试篇

背景介绍

Flink 可以将上游数据进行流式处理、预聚合、join、清洗等之后写入下游。在部署完 Flink 环境之后,同时需要准备一套 MySQL 和 TiDB 环境来进行下面的测试。测试内容根据数据流向主要分为 MySQL -> Flink、Flink -> TiDB 以及 MySQL -> Flink -> TiDB 三部分,测试数据在 MySQL 与 TiDB 之间通过 Flink 进行同步。

测试内容

MySQL -> Flink

测试 mysql 环境同步数据到 Flink 的同步状态是否正常,操作步骤为:在上游 MySQL 环境创建 t1 表并写入数据,在 Flink 侧查询对应的 source 表是否有数据结果。

下载 jar 包

cd /home/flink-1.12.1/lib & 
wget
https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-mysql-cdc/1.1.0/flink-sql-connector-mysql-cdc-1.1.0.jar

同时拷贝该包至其他两个节点

重启集群

[root@ee-5143 bin]# ./stop-cluster.sh  & ./start-cluster.sh

建表

在上游 MySQL 建表,同时 MySQL 开启 Binlog

CREATE TABLE `t1` (
  `id` int(11) DEFAULT NULL,
  `name` varchar(20) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8

启动并测试

启动 FlinkSQL CLI

[root@n5162 bin]# ./sql-client.sh embedded
No default environment specified.
Searching for '/home/flink-1.12.1/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/home/flink-1.12.1/conf/sql-client-defaults.yaml
No session environment specified.
 
Command history file path: /root/.flink-sql-history
Flink SQL> create table source (id int, name string) with (
>      'connector' = 'mysql-cdc',
>      'hostname' = '172.16.4.222',
>      'username' = 'dm',
>      'password' = '123456',
>      'database-name' = 'flink_t',
>      'table-name' = 't1'
>  );
[INFO] Table has been created.

在 MySQL 侧插入数据

MySQL [flink_t]> insert into t1(id,name) values(1,'mysql');
Query OK, 1 row affected (0.00 sec)
 
MySQL [flink_t]> insert into t1(id,name) values(2,'mysql');
Query OK, 1 row affected (0.00 sec)
 
MySQL [flink_t]> insert into t1(id,name) values(3,'mysql');
Query OK, 1 row affected (0.01 sec)
 
MySQL [flink_t]> insert into t1(id,name) values(4,'mysql');
Query OK, 1 row affected (0.00 sec)
 
MySQL [flink_t]> insert into t1(id,name) values(5,'mysql');
Query OK, 1 row affected (0.00 sec)

在 Flink 侧 select source 表

Flink SQL> select * from source;
[INFO] Result retrieval cancelled.

通过查询 source 可以看到在 Flink 里面数据信息

Flink -> TiDB

测试 Flink 环境同步数据到 TiDB 的同步状态是否正常,操作步骤为:在 Flink 环境创建 target 表并写入数据,在 TiDB 侧查询对应的 t 表是否有数据结果。

下载 jar 包

cd /home/flink-1.12.1/lib  & wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.12.0/flink-connector-jdbc_2.12-1.12.0.jar

重启集群

[root@ee-5143 bin]# ./stop-cluster.sh  & ./start-cluster.sh

建表

在下游 TiDB 环境建表并插入测试数据

MySQL [upstream_f]> create database  downstream_f;
Query OK, 0 rows affected (0.11 sec)
MySQL [upstream_f]> use downstream_f
Database changed
MySQL [downstream_f]> create table t(id int,name varchar(20));
Query OK, 0 rows affected (0.11 sec)
 
MySQL [downstream_f]> insert into t (id,name) values(1,'aa');
Query OK, 1 row affected (0.02 sec)
 
MySQL [downstream_f]> insert into t (id,name) values(2,'bb');
Query OK, 1 row affected (0.00 sec)
 
MySQL [downstream_f]> insert into t (id,name) values(3,'down');
Query OK, 1 row affected (0.00 sec)

启动并测试

启动 FlinkSQL CLI

[root@n5162 bin]# ./sql-client.sh embedded
No default environment specified.
Searching for '/home/flink-1.12.1/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/home/flink-1.12.1/conf/sql-client-defaults.yaml
No session environment specified.
 
Command history file path: /root/.flink-sql-history
 
 
Flink SQL> CREATE TABLE target (
>    id int,
>    name string
>  ) with (
>      'connector' = 'jdbc',
>      'url' = 'jdbc:mysql://172.16.4.136:6000/downstream_f',
>      'table-name' = 't',
>      'username' = 'root',
>      'password' = ''
>  );
[INFO] Table has been created.

Flink 侧插入数据

Flink SQL>  insert into target (id,name) values(1,'TiDB');
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 7a7302c992dd9292aed614c9ef4899d5
 
 
Flink SQL> insert into target (id,name) values(2,'TiDB');
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 099350466e1e29125b85378749d91a89
 
 
Flink SQL> insert into target (id,name) values(3,'TiDB');
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 9430d992087cc66c98555e63c7d0d029
 
 
Flink SQL> insert into target (id,name) values(4,'TiDB');
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: ef2125a6ce7e1be0846cab384ee4ea0c
 
 
Flink SQL> insert into target (id,name) values(5,'TiDB');
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 04c694a449e5a70153b1b79de707adff

TiDB 查询结果

MySQL [downstream_f]> select * from t;
+------+------+
| id   | name |
+------+------+
|    1 | TiDB |
|    2 | TiDB |
|    3 | TiDB |
|    4 | TiDB |
|    5 | TiDB |
+------+------+
5 rows in set (0.01 sec)

MySQL -> Flink -> TiDB

全量同步

测试对于 MySQL 环境已有的数据能否正常同步到下游的 TIDB 环境。首先在上游 MYSQL 侧对应的数据表存在数据,在 Flink 侧创建 source、target 表,并通过 insert into select 同步已有的 MySQL 数据到 TiDB 的侧,通过查询 TiDB 侧是否有 MySQL 侧全部数据来验证全量同步功能。

Flink SQL> create table source (id int primary key, name string) with (
>      'connector' = 'mysql-cdc',
>      'hostname' = '172.16.4.222',
>      'username' = 'dm',
>      'password' = '123456',
>      'database-name' = 'flink_t',
>      'table-name' = 't1'
>  );
[INFO] Table has been created.
Flink SQL> CREATE TABLE target (
>    id int primary key,
>    name string
>  ) with (
>      'connector' = 'jdbc',
>      'url' = 'jdbc:mysql://172.16.4.136:6000/downstream_f',
>      'table-name' = 't',
>      'username' = 'root',
>      'password' = '',
>       'sink.buffer-flush.max-rows' = '1', 'sink.buffer-flush.interval' = '0'
>  );
[INFO] Table has been created.
Flink SQL> select * from source;
[INFO] Result retrieval cancelled.
 
Flink SQL> select * from target;
[INFO] Result retrieval cancelled.
 
Flink SQL> insert into target select * from source;
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 683e4cfd7ae07762f212cdec869885d2
 
Flink SQL> insert into target select * from source;
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.

TiDB 侧

MySQL [downstream_f]> select * from t;
+------+-------+
| id   | name  |
+------+-------+
|    1 | mysql |
|    2 | mysql |
|    3 | mysql |
|    4 | mysql |
|    5 | mysql |
+------+-------+
5 rows in set (0.00 sec)

后台 job

通过 WEB UI 查看 job 执行情况

  • 查看 job 情况
[root@n5162 bin]# ./flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
19.01.2021 11:44:10 : 683e4cfd7ae07762f212cdec869885d2 : default: insert into target select * from source (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
  • 停止 job
  • flink cancel
[root@n5162 bin]# ./flink cancel 683e4cfd7ae07762f212cdec869885d2
Cancelling job 683e4cfd7ae07762f212cdec869885d2.
Cancelled job 683e4cfd7ae07762f212cdec869885d2.
  • web UI

增量同步

上游 MySQL 不断写入、删除、修改数据,在下游 TiDB 不断查询表来验证增量同步功能

INSERT

在上游 MySQL 环境插入数据

MySQL [flink_t]> insert into t1(id,name) values(6,'mysql');
Query OK, 1 row affected (0.00 sec)
 
MySQL [flink_t]> insert into t1(id,name) values(7,'mysql');
Query OK, 1 row affected (0.00 sec)

在下游 TiDB 侧查询对应的表

MySQL [downstream_f]> select * from t;
+------+-------+
| id   | name  |
+------+-------+
|    1 | mysql |
|    2 | mysql |
|    3 | mysql |
|    4 | mysql |
|    5 | mysql |
|    6 | mysql |
|    7 | mysql |
+------+-------+
7 rows in set (0.00 sec)
UPDATE

在上游 MySQL 侧更新数据

MySQL [flink_t]> update t1 set name='TiDB' where id =7;
Query OK, 1 row affected (0.02 sec)
Rows matched: 1  Changed: 1  Warnings: 0

在下游 TiDB 侧查询数据

MySQL [downstream_f]> select * from t;
+------+-------+
| id   | name  |
+------+-------+
|    1 | mysql |
|    2 | mysql |
|    3 | mysql |
|    4 | mysql |
|    5 | mysql |
|    6 | mysql |
|    7 | mysql |
|    7 | TiDB  |
+------+-------+
8 rows in set (0.01 sec)
DELETE

在上游 MySQL 侧 delete 数据

MySQL [flink_t]> delete from t1 where id=7;
Query OK, 1 row affected (0.01 sec)

在下游 TiDB 侧查询数据是否存在

MySQL [downstream_f]> select * from t;
+------+-------+
| id   | name  |
+------+-------+
|    1 | mysql |
|    2 | mysql |
|    3 | mysql |
|    4 | mysql |
|    5 | mysql |
|    6 | mysql |
+------+-------+
6 rows in set (0.00 sec)

表 join

在上游 MySQL 创建两张表,通过 Flink 进行 join 同步之后,观察下游 TiDB 侧数据情况

在上游 mysql 创建两张表并写入数据

MySQL [flink_t]> select * from t;
+----+------+
| id | name |
+----+------+
|  1 | a    |
|  2 | b    |
|  3 | c    |
|  4 | d    |
+----+------+
4 rows in set (0.00 sec)
 
MySQL [flink_t]> select * from t1;
+----+-------+------+
| id | name  | age  |
+----+-------+------+
|  1 | mysql |   20 |
|  2 | tidb  |   10 |
|  3 | redis |   10 |
|  4 | redis |    8 |
+----+-------+------+
4 rows in set (0.00 sec)

在下游创建表

MySQL [downstream_f]> show create table t\G
*************************** 1. row ***************************
       Table: t
Create Table: CREATE TABLE `t` (
  `id` int(11) NOT NULL,
  `name` varchar(20) DEFAULT NULL,
  `age` int(11) DEFAULT '0',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
1 row in set (0.01 sec)
 
MySQL [downstream_f]> truncate table t;
Query OK, 0 rows affected (2.03 sec)
 
MySQL [downstream_f]> select * from t;
Empty set (0.00 sec)

在 Flink client 将两张表进行 join 并写入到下游 TiDB 侧

Flink SQL> create table source_t (id int primary key, name string) with (
>      'connector' = 'mysql-cdc',
>      'hostname' = '172.16.4.222',
>      'username' = 'dm',
>      'password' = '123456',
>      'database-name' = 'flink_t',
>      'table-name' = 't'
>  );
[INFO] Table has been created.
 
Flink SQL> create table source (id int primary key, name string, age int) with (
>      'connector' = 'mysql-cdc',
>      'hostname' = '172.16.4.222',
>      'username' = 'dm',
>      'password' = '123456',
>      'database-name' = 'flink_t',
>      'table-name' = 't1'
>  );
[INFO] Table has been created.
 
Flink SQL> CREATE TABLE target (
>    id int primary key,
>    name string,
>    age int
>  ) with (
>      'connector' = 'jdbc',
>      'url' = 'jdbc:mysql://172.16.4.136:6000/downstream_f',
>      'table-name' = 't',
>      'username' = 'root',
>      'password' = '',
>       'sink.buffer-flush.max-rows' = '1', 'sink.buffer-flush.interval' = '0'
>  );
>
[INFO] Table has been created.
 
Flink SQL> insert into target select source.id,source.name,source.age from source_t,source where source_t.id=source.id;
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 4462252bff0b370eecd6cd641ba0a950

TiDB 侧查询数据信息

MySQL [downstream_f]> select * from t;
+----+-------+------+
| id | name  | age  |
+----+-------+------+
|  1 | mysql |   20 |
|  2 | tidb  |   10 |
|  3 | redis |   10 |
|  4 | redis |    8 |
+----+-------+------+
4 rows in set (0.00 sec)

WEB UI

web 查看 job 执行情况

2赞