DM 同步 mysql到 tidb 延迟 波动

上游Mysql 各种操作都有,非仅仅插入。

监控计算方式:在上游建一张表(只有一行数据),然后定时更新为当前数据,然后读取上下游的这张表的数据,比较差异并将差异写到结果表。

import os, time,datetime
import pymysql
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()

'''
上游MySQL建库表

-- 连接数据库
use abc;

-- monitor_time
CREATE TABLE `monitor_time` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `t` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- 初始化数据,因后面监控程序定时更新该条记录 
insert into monitor_time(t)  select 1;

-- 创建监控结果表monitor_result;
CREATE TABLE `monitor_result` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `lag_ms` int(11) DEFAULT NULL COMMENT '延迟时间',
  `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '监控记录生成时间',
  PRIMARY KEY (`id`),
  KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
;
--  查看结果
select * from monitor_result;
'''

#定期更新源库表 monitor_time 数据:更新为当前时间
def monitor_time():
        info_src = {'user': "abc", 'passwd': "111", 'host': "xxx.96", 'db': "abc",
                          'port': 12345}
        conn_src = pymysql.connect(user=info_src['user'], passwd=info_src['passwd'],
                                    host=info_src['host'], db=info_src['db'], port=info_src['port'],
                                    charset="utf8")
        t = time.time()
        t1 = int(round(t * 1000))
        print(t1)
        cur = conn_src.cursor()
        cur.execute("update monitor_time set t=%s where 1=1 " ,(str(t1)))
        conn_src.commit()


#定时读monitor_time,比较差异数据,并写入表 monitor_time
def monitor_result():
    info_src = {'user': "abc", 'passwd': "111", 'host': "xxx.96", 'db': "abc", 'port': 12345}
    info_des = {'user': "abc", 'passwd': "111", 'host': "xxx.118", 'db': "abc", 'port': 3390}

    # 3天前的整点
    today = datetime.datetime.now()
    offset = datetime.timedelta(days=-3)
    date_ago = (today + offset).strftime('%Y-%m-%d 00:00:00')
    print(date_ago)

    conn_src = pymysql.connect(user=info_src['user'], passwd=info_src['passwd'],
                               host=info_src['host'], db=info_src['db'], port=info_src['port'],
                               charset="utf8")
    cur_src = conn_src.cursor()

    conn_des = pymysql.connect(user=info_des['user'], passwd=info_des['passwd'],
                               host=info_des['host'], db=info_des['db'], port=info_des['port'],
                               charset="utf8")
    cur_des = conn_des.cursor()

    sql_get_time = "select t from monitor_time "

    cur_src.execute(sql_get_time)
    v_src_tuple = cur_src.fetchone()
    t_src = v_src_tuple[0]

    cur_des.execute(sql_get_time)
    v_des_tuple = cur_des.fetchone()
    t_des = v_des_tuple[0]

    print(t_src, t_des)
    t1 = (t_src  - t_des)
    cur_src.execute("insert into monitor_result(lag_ms) select %s", (str(t1)))
    conn_src.commit()

if __name__ == '__main__':
    scheduler.add_job(monitor_time,  'interval', seconds=1 ,id='job_monitor_time')
    scheduler.add_job(monitor_result, 'interval', seconds=5 ,id='monitor_result')
    scheduler.start()

DM的Prometheus 监控lag几乎是0: