之前有帖子介绍过从 mongo 到 tidb,是否还有更新或补充?
或者新的迁移工具,或是注意事项?
之前看到过一个python脚本实现mongodb迁移到mysql的 没有实践过不知道可不可行,可以参考下。
import pymysql
from loguru import logger
class MongoToMysql:
def __init__(self, mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port, mysql_user,
mysql_password, mysql_db,table_name=None,set_max_length=False,batch_size=10000,table_description=''):
self.mongo_host = mongo_host
self.mongo_port = mongo_port
self.mongo_db = mongo_db
self.mongo_collection = mongo_collection
self.mysql_host = mysql_host
self.mysql_port = mysql_port
self.mysql_user = mysql_user
self.mysql_password = mysql_password
self.mysql_db = mysql_db
self.table_name = table_name
self.set_max_length = set_max_length
self.batch_size = batch_size
self.table_description = table_description
self.data_types = self.get_mongo_data_types()
self.create_mysql_table(self.data_types,set_max_length= self.set_max_length,table_description=self.table_description)
self.push_data_to_mysql(self.batch_size)
def get_mongo_data_types(self):
logger.debug('正在获取mongo中字段的类型!')
client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port)
db = client[self.mongo_db]
collection = db[self.mongo_collection]
data_types = {}
for field in collection.find_one().keys():
data_types[field] = type(collection.find_one()[field]).__name__
return data_types
def check_mysql_table_exists(self):
logger.debug('检查是否存在该表,有则删之!')
conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user,
password=self.mysql_password, db=self.mysql_db)
cursor = conn.cursor()
sql = f"DROP TABLE IF EXISTS {self.mongo_collection}"
cursor.execute(sql)
conn.commit()
conn.close()
def get_max_length(self, field):
logger.debug(f'正在获取字段 {field} 最大长度......')
client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port)
db = client[self.mongo_db]
collection = db[self.mongo_collection]
max_length = 0
for item in collection.find({},{field:1,'_id':0}):
value = item.get(field)
if isinstance(value, str) and len(value) > max_length:
max_length = len(value)
return max_length
def create_mysql_table(self, data_types,set_max_length,table_description):
logger.debug('正在mysql中创建表结构!')
self.check_mysql_table_exists()
conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user,
password=self.mysql_password, db=self.mysql_db)
cursor = conn.cursor()
if self.table_name:
table_name = self.table_name
else:
table_name = self.mongo_collection
fields = []
for field, data_type in data_types.items():
if data_type == 'int':
fields.append(f"{field} INT")
elif data_type == 'float':
fields.append(f"{field} FLOAT")
elif data_type == 'bool':
fields.append(f"{field} BOOLEAN")
else:
if set_max_length:
fields.append(f"{field} TEXT)")
else:
max_length = self.get_max_length(field)
fields.append(f"{field} VARCHAR({max_length + 200})")
fields_str = ','.join(fields)
sql = f"CREATE TABLE {table_name} (id INT PRIMARY KEY AUTO_INCREMENT,{fields_str}) COMMENT='{table_description}'"
cursor.execute(sql)
conn.commit()
conn.close()
def push_data_to_mysql(self, batch_size=10000):
logger.debug('--- 正在准备从mongo中每次推送10000条数据到mysql ----')
client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port)
db = client[self.mongo_db]
collection = db[self.mongo_collection]
conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user,
password=self.mysql_password, db=self.mysql_db)
cursor = conn.cursor()
if self.table_name:
table_name = self.table_name
else:
table_name = self.mongo_collection
# table_name = self.mongo_collection
data = []
count = 0
for item in collection.find():
count += 1
row = []
for field, data_type in self.data_types.items():
value = item.get(field)
if value is None:
row.append(None)
elif data_type == 'int':
row.append(str(item.get(field, 0)))
elif data_type == 'float':
row.append(str(item.get(field, 0.0)))
elif data_type == 'bool':
row.append(str(item.get(field, False)))
else:
row.append(str(item.get(field, '')))
data.append(row)
if count % batch_size == 0:
placeholders = ','.join(['%s'] * len(data[0]))
sql = f"INSERT INTO {table_name} VALUES (NULL,{placeholders})"
cursor.executemany(sql, data)
conn.commit()
data = []
logger.debug(f'--- 已完成推送:{count} 条数据! ----')
if data:
placeholders = ','.join(['%s'] * len(data[0]))
sql = f"INSERT INTO {table_name} VALUES (NULL,{placeholders})"
cursor.executemany(sql, data)
conn.commit()
logger.debug(f'--- 已完成推送:{count} 条数据! ----')
conn.close()
if __name__ == '__main__':
"""MySQL"""
mongo_host = '127.0.0.1'
mongo_port = 27017
mongo_db = 'db_name'
mongo_collection = 'collection_name'
"""MongoDB"""
mysql_host = '127.0.0.1'
mysql_port = 3306
mysql_user = 'root'
mysql_password = '123456'
mysql_db = 'mysql_db'
table_description = '' # 表描述
mongo_to_mysql = MongoToMysql(mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port,
mysql_user, mysql_password, mysql_db,table_description=table_description)
#
# table_name = None # 默认为None 则MySQL中的表名跟Mongo保持一致
# set_max_length = False # 默认为False 根据mongo中字段最大长度 加200 来设置字段的VARCHART长度 , 否则定义TEXT类型
# batch_size = 10000 # 控制每次插入数据量的大小
# table_description = '' # 表描述
# mongo_to_mysql = MongoToMysql(mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port,
# mysql_user, mysql_password, mysql_db,table_name,set_max_length,batch_size,table_description)
1 个赞
记录下,不过mongo迁移到tidb合适吗,mongo存的可是非关系结构数据,tidb对这类数据支持的一般吧。。。
是的啊,字段类型问题很大的,很少有人这么干的
-
TiDB Lightning:这是一个由 PingCAP 开发的工具,它可以实现高速导入数据,并提供自动模式转换和数据类型映射等功能。
-
DMS(Data Migration Service):这是阿里云提供的一种数据迁移服务,提供了简单易用的界面和配置选项
感觉找第三方厂商搞安全点,这个迁移几张表还行,整个业务都迁移,风险太大
字段类型,能把人搞崩溃,要转换的很多呢
可以具体说说么?有字段映射的最佳方案么?
试过用datax MongoDB 到 mysql
后面交给第三方了,英方,dsg等公司搞了