用 Gravity 实现 MongoDB 到 TiDB 的数据复制

数据复制实践

前言

本文将探讨从 MongoDB 到 TiDB 的数据复制的实现方式。

Gravity 支持大数据总线,能够解析 MySQL Binlog、MongoDB Oplog 中的数据变更并发布到 kafka 供下游消费;在同步过程中,支持对数据进行在线变换。

MongoDB 作为 NoSQL 的代表,数据采用 json 的存储结构;MtySQL 5.7 以后也支持 json 类型,MySQL 5.5 和 5.6 则需要将 json 转换成字符串格式存储;所以如果不考虑性能,从MongoDB 复制数据到 MySQL 的方案是可行的。

TiDB 作为分布式 NewSQL 数据库的代表,实现了自动的水平伸缩,强一致性的分布式事务,基于 Raft 的多副本复制特性除了具备 NoSQL 的优点,还兼容 MySQL 协议,并支持大部分 MySQL 语法,包括 json 数据类型,使用场景上要比 NoSQL 更加丰富。

方案测试

下面部署 Gravity 并测试从 MongoDB 到 TiDB 的数据复制方案,主要分以下两个步骤进行:

  1. 配置 Gravity 从 MongoDB 获取解析 Oplog 并发布到 Kafka

  2. 从 Kafka 中解析输出的数据格式,构造 SQL将变更同步到 TiDB

配置 mongodb 到 kafka 的同步

(1)安装 MongoDB 副本集

(2)安装 Kafka

(3)配置 Go 语言环境

(4)配置 Gravity

安装启动

mkdir -p $GOPATH/src/github.com/moiot/ && cd $GOPATH/src/github.com/moiot
git clone https://github.com/moiot/gravity.git
cd gravity && make
nohup bin/gravity -config mongo2kafka.toml &

配置文件

name = "mongo2kafka"
 
#
# Input 插件的定义
#
 
#
# 源端 Mongo 连接配置
# - 必填
#
[input.mongooplog.source]
host = "127.0.0.1"
port = 27017
username = ""
password = ""
 
#
# 源端 Mongo Oplog 的起始点,若不配置,则从当前最新的 Oplog 开始同步
# - 默认为空
# - 可选
#
[input.mongooplog]
# start-position = 123456
 
#
# 源端 Mongo Oplog 并发相关配置
# - 默认分别为 false, 50, 512, "750ms"
# - 可选 (准备废弃)
[input.mongooplog.gtm-config]
use-buffer-duration = false
buffer-size = 50
channel-size = 512
buffer-duration-ms = "750ms"
 
#
# Output 插件的定义
#
 
#
# 目标端 Kafka 连接配置
# - 必填
#
[output.async-kafka.kafka-global-config]
# - 必填
broker-addrs = ["localhost:9092"]
mode = "async"
 
# 目标端 kafka SASL 配置
# - 可选
[output.async-kafka.kafka-global-config.net.sasl]
enable = false
user = ""
password = ""
 
#
# 目标端 Kafka 路由配置
# - 必填
#
[[output.async-kafka.routes]]
match-schema = "test"
match-table = "test_table"
dml-topic = "test.test_table"
 
#
# 目标端编码规则:输出类型和版本号
# - 可选
[output.async-kafka]
# 默认为 json
output-format = "json"
# 默认为 2.0 版本
schema-version = "0.1"
 
#
# scheduler 插件的定义,此处使用默认 scheduler
#
[scheduler.batch-table-scheduler]
nr-worker = 1
batch-size = 1
queue-size = 1024
sliding-window-size = 10240

(5)测试数据同步

  • MongoDB 数据变更
use test;
# 插入数据
db.test_table.insert({name:'pingcap',age:3});
# 更新数据
db.test_table.update({name:'pingcap'},{$set:{age:4}})
# 删除数据
db.test_table.remove({name:'pingcap'})

Kafka Topic 订阅

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test.test_table --from-beginning 

Kafka 消息输出

# 插入数据
{
        "version": "0.1",
        "database": "test",
        "collection": "test_table",
        "unique_source_name": "127.0.0.1",
        "oplog": {
                 "timestamp": 1547033910,
                 "ordinal": 2,
                 "_id": "5c35dd3667be48b14aeb46f6",
                 "operation": "i",
                 "namespace": "test.test_table",
                 "data": {
                         "_id": "5c35dd3667be48b14aeb46f6",
                         "age": 3,
                         "name": "pingcap"
                 },
                 "row": null,
                 "source": 0
        }
}
# 更新数据
{
        "version": "0.1",
        "database": "test",
        "collection": "test_table",
        "unique_source_name": "127.0.0.1",
        "oplog": {
                 "timestamp": 1547034515,
                 "ordinal": 1,
                 "_id": "5c35dd3667be48b14aeb46f6",
                 "operation": "u",
                 "namespace": "test.test_table",
                 "data": {
                         "$set": {
                                 "age": 4
                         }
                 },
                 "row": {
                         "_id": "5c35dd3667be48b14aeb46f6",
                         "age": 4,
                         "name": "pingcap"
                 },
                 "source": 0
        }
}
# 删除数据
{
        "version": "0.1",
        "database": "test",
        "collection": "test_table",
        "unique_source_name": "127.0.0.1",
        "oplog": {
                 "timestamp": 1547035146,
                 "ordinal": 2,
                 "_id": "5c35dd3667be48b14aeb46f6",
                 "operation": "d",
                 "namespace": "test.test_table",
                 "data": null,
                 "row": null,
                 "source": 0
        }
}

配置 kafka 到 TiDB 的同步

思路是根据 Kafka 输出的 json 数据格式,按照一定规则构造 DML 并在 TiDB 中执行 SQL。

(1)规则说明

  • namespace 为 db.table
  • _id 和 timestamp 映射为 mongo_id varchar(24) 和 mongo_ts timestamp
  • operation 中的 i/u/d 分别对应 insert/update/delete
  • data 中的外层的 key 解析为 column,value 解析为 int、varchar 或 json 类型存储
  • 以 mongo_id 和 mongo_ts 为 where 条件完成 update 和 delete 操作

按照上述规则,将 Kafka 消息输出构造为如下 SQL:

# 插入数据
INSERT INTO test.test_table (mongo_id, mongo_ts, age, name)
VALUES ('5c35dd3667be48b14aeb46f6', from_unixtime(1547033910), 3, 'pingcap');
# 更新数据
UPDATE test.test_table
SET age = 4
WHERE mongo_id = '5c35dd3667be48b14aeb46f6' 
AND mongo_ts = from_unix(1547034515);
# 删除数据
DELETE FROM test.test_table
WHERE mongo_id = '5c35dd3667be48b14aeb46f6' 
AND mongo_ts = from_unix(1547035146);

(2)特殊处理

由于 MongoDB 使用 schema free 数据模型,database、table/collection、column/filed 都是隐式创建的,Oplog 中也并没有 DDL 操作;同步到 MySQL/TiDB 中需要先判断对应 database、table、column 是否存在,如果不存在则要先执行相应的 DDL。

例如上面的数据变更执行前,需要先执行

create database test;
create table test.test_table;

如果 insert/update 操作涉及到新的 column/field,需要先执行

alter table add column ...

对于更新操作的 unset,相当于把这个 column/field 删除,需要先执行

alter table drop column ...

此外 MongoDB 也是可以显式执行 create/drop 等 DDL 操作,但是这类操作 Kafka 消息中并没有解析输出。

MongoDB 语法转换到 SQL 规则可以参考 https://github.com/goodybag/mongo-sql

(3)规则实现

这里不考虑特殊处理,仅针对规则说明的部分,用 python 实现了一个简单的 demo 脚本,

功能是将来自 Kafka 订阅 Topic test.test_table 的 json 格式的 MongoDB DML 转换为 SQL 语句并在下游 TiDB 中执行。

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# filename: json2sql.py
from kafka import KafkaConsumer
from kafka.client import KafkaClient
import MySQLdb
import json
import time
 
class KafkaPython:
 
    consumer = server = topic =  None
 
    TOPIC   = 'test.test_table'
    BROKER_LIST = 'localhost:9092'
 
    DB_USER = 'root'
    DB_PASS = ''
    DB_NAME = 'test_db'
    DB_PORT = 4000
    DB_IP = 'localhost'
    DB_OPT = {'i':'insert', 'u':'update', 'd':'delete'}
 
    def __init__(self):
        print("init kafka consumer")
        self.server = self.BROKER_LIST
        self.topic  = self.TOPIC
        print("init mysql client")
        self.db = MySQLdb.connect(self.DB_IP, self.DB_USER, 
                self.DB_PASS, self.DB_NAME, port=self.DB_PORT)
        self.cursor = self.db.cursor()
 
    def __del__(self):
        print("end")
 
    def getConnect(self):
        self.consumer = KafkaConsumer(self.topic, bootstrap_servers = self.server)
 
    def execSQL(self, sql):
        rows = self.cursor.execute(sql)
        # print sql
        if rows > 0:
            self.db.commit() 
        return rows
 
    def beginConsumer(self):  
        for oneLog in self.consumer:  
            mlog = json.loads(oneLog.value).get('oplog', None)
            if mlog is not None:
                self.getDesc(mlog)
 
    def getDesc(self, mlog):
        desc = {}
        for k,v in mlog.items():
            if k == '_id':
                desc['mongo_id'] = v
            if k == 'timestamp':
                desc['mongo_ts'] = v
            if k == 'namespace':
                desc['table'] = v
            if k == 'operation':
                desc['iud'] = v
            if k == 'data':
                desc['values'] = v
        sql = self.ruleRoute(desc)
        if sql is not None:
            ret = self.execSQL(sql)
            dt = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
            print '%s %s %s rows' % (dt, self.DB_OPT[desc['iud']], ret)
 
    def ruleRoute(self, desc):
        if desc['iud'] not in self.DB_OPT.keys():
            return
        if desc['iud'] == 'i':
            return self.stmtIns(desc)
        elif desc['iud'] == 'u':
            return self.stmtUpd(desc)
        else:
            return self.stmtDel(desc)
 
    def stmtIns(self, desc):
        mtbl = desc['table']
        mid = desc['mongo_id']
        mts = desc['mongo_ts']
        data = desc['values']
        data.pop('_id')
        mlist = data.keys()
        vlist = data.values()
        mlist = mtbl + '(%s, %s, %s)' % ('mongo_id', 'mongo_ts', ', '.join(mlist))
        clist = "('%s', from_unixtime(%s), " % (mid, mts)
        for val in vlist:
            if type(val).__name__ == 'unicode':
                clist += "'%s', " % val
            elif type(val).__name__ == 'dict':
                clist += "'%s', " % json.dumps(val)   
            else:
                clist += "%s, " % val
        clist = clist[:-2] + ")"
        sql = "insert into %s values %s" % (mlist, clist)
        # print 'insert stmt: ',sql
        return sql
 
    def stmtUpd(self, desc):
        mtbl = desc['table']
        mid = desc['mongo_id']
        mts = desc['mongo_ts']
        data = desc['values']
        stmt = []
        for k, v in data.items():
            if k != '$set':
                return
            for col, val in v.items():
                if type(val).__name__ == 'unicode':
                    ln = "%s = '%s'" % (col, val)
                elif type(val).__name__ == 'dict':
                    ln = "%s = '%s'" % (col, json.dumps(val))
                else:
                    ln = "%s = %s" % (col, val)
                stmt.append(ln)
        col = ','.join(stmt)
        sql = "update %s set %s where mongo_id = '%s' and " 
                "mongo_ts = from_unixtime(%s)" % (mtbl, col, mid, mts)
        # print 'update stmt: ',sql
        return sql
 
    def stmtDel(self, desc):
        mtbl = desc['table']
        mid = desc['mongo_id']
        mts = desc['mongo_ts']
        sql = "delete from %s where mongo_id = '%s' and " 
                "mongo_ts = from_unixtime(%s)" % (mtbl, mid, mts)
        # print 'delete stmt: ',sql
        return sql
 
    def disConnect(self):
        self.consumer.close()
 
 
if __name__ == '__main__':
 
    kp = KafkaPython()
    kp.getConnect()
    kp.beginConsumer()

(4)同步测试

启动脚本后首先初始化连接到 Kafka 以及下游 TiDB。

# python json2sql.py
init kafka consumer
init mysql client

当接收到 Kafka 的新消息后,会将其解析为 SQL ,打印出来的格式如下。

insert stmt: insert into test.test_table(mongo_id, mongo_ts, info, age, addr, name) values ('5c35dde567be48b14aeb46f8'
     , from_unixtime(1547034085), {"employess": 80, "offce": "beijing"}3, 'dongsheng', 'pingcap01')

接着会在之前建立的 TiDB 连接中执行 SQL,如果返回的 rows 大于 0,提交操作;Kafka 消费的 Topic 对应某些库表的 DML,解析出来的是流式数据,到 TiDB 执行 SQL 也是串行的,不存在乐观锁事务冲突的问题;当上游 MongoDB 写入压力较大时,会存在一定的延迟。

关于全量数据复制 由于 mongodump 备份文件是 bson 格式的二进制数据,与 MySQL 不兼容,无法直接导入,

而 mongoexport 可以将数据导出格式为 csv、json 的文本,因此对于全量数据,通过 load data 命令导入 csv 文本是可行的;

如果数据量比较大,则可以根据各 collection 的大小分批导出多个文件,然后并行执行 load 将数据导入 MySQL/TiDB 中。

需要说明的是,不管是全量还是增量数据复制,都要将 MongoDB 中的 collection 映射为 MySQL/TiDB 中的 table,这部分工作要提前完成;

如果涉及到下游 DDL 变更,也要暂停同步,手动完成表的变更。

因此,对于 MongoDB 中的 collection,如果 schema 设计不够规范,各 document 的 field 的数量、类型不固定,

那么数据同步过程难免频繁中断,进行手工维护,甚至可能变得不可维护,这是对业务库表设计及对所要同步数据的考量;

此外数据同步时的失败处理、中断恢复、一致性校验等问题也需要进一步考虑。

总结

本文主要介绍了从 MongoDB 到 TiDB 的数据复制方案和一些注意要点 ,并结合 Gravity 工具实现从 MongoDB 到 Kafka 的数据同步,并简单展示了如何将 Kafka 的输出数据进行格式转换,构造 DML SQL 完成到 TiDB 的增量同步;由于 MongoDB 采用非关系型数据模型,且不支持标准 SQL 语法,除了简单的增删改操作,MongoDB 的丰富语法在 Oplog 中还会解析出更多格式,需要根据业务需要或借助专门的类库进一步完善。

1赞