上游Mysql 各种操作都有,非仅仅插入。
监控计算方式:在上游建一张表(只有一行数据),然后定时更新为当前数据,然后读取上下游的这张表的数据,比较差异并将差异写到结果表。
import os, time,datetime
import pymysql
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
'''
上游MySQL建库表
-- 连接数据库
use abc;
-- monitor_time
CREATE TABLE `monitor_time` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`t` bigint(20) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- 初始化数据,因后面监控程序定时更新该条记录
insert into monitor_time(t) select 1;
-- 创建监控结果表monitor_result;
CREATE TABLE `monitor_result` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`lag_ms` int(11) DEFAULT NULL COMMENT '延迟时间',
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '监控记录生成时间',
PRIMARY KEY (`id`),
KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
;
-- 查看结果
select * from monitor_result;
'''
#定期更新源库表 monitor_time 数据:更新为当前时间
def monitor_time():
info_src = {'user': "abc", 'passwd': "111", 'host': "xxx.96", 'db': "abc",
'port': 12345}
conn_src = pymysql.connect(user=info_src['user'], passwd=info_src['passwd'],
host=info_src['host'], db=info_src['db'], port=info_src['port'],
charset="utf8")
t = time.time()
t1 = int(round(t * 1000))
print(t1)
cur = conn_src.cursor()
cur.execute("update monitor_time set t=%s where 1=1 " ,(str(t1)))
conn_src.commit()
#定时读monitor_time,比较差异数据,并写入表 monitor_time
def monitor_result():
info_src = {'user': "abc", 'passwd': "111", 'host': "xxx.96", 'db': "abc", 'port': 12345}
info_des = {'user': "abc", 'passwd': "111", 'host': "xxx.118", 'db': "abc", 'port': 3390}
# 3天前的整点
today = datetime.datetime.now()
offset = datetime.timedelta(days=-3)
date_ago = (today + offset).strftime('%Y-%m-%d 00:00:00')
print(date_ago)
conn_src = pymysql.connect(user=info_src['user'], passwd=info_src['passwd'],
host=info_src['host'], db=info_src['db'], port=info_src['port'],
charset="utf8")
cur_src = conn_src.cursor()
conn_des = pymysql.connect(user=info_des['user'], passwd=info_des['passwd'],
host=info_des['host'], db=info_des['db'], port=info_des['port'],
charset="utf8")
cur_des = conn_des.cursor()
sql_get_time = "select t from monitor_time "
cur_src.execute(sql_get_time)
v_src_tuple = cur_src.fetchone()
t_src = v_src_tuple[0]
cur_des.execute(sql_get_time)
v_des_tuple = cur_des.fetchone()
t_des = v_des_tuple[0]
print(t_src, t_des)
t1 = (t_src - t_des)
cur_src.execute("insert into monitor_result(lag_ms) select %s", (str(t1)))
conn_src.commit()
if __name__ == '__main__':
scheduler.add_job(monitor_time, 'interval', seconds=1 ,id='job_monitor_time')
scheduler.add_job(monitor_result, 'interval', seconds=5 ,id='monitor_result')
scheduler.start()
DM的Prometheus 监控lag几乎是0: