CREATE TABLE user (
user_id bigint NOT NULL COMMENT ‘用户id’,
reg_date DATE NOT NULL COMMENT ‘注册日期’,
game_version int(8) NOT NULL DEFAULT ‘0’ COMMENT ‘游戏版本’,
batch_sn varchar(20) not null comment ‘批次号’,
amount decimal(11,2) default 0 comment ‘充值金额’,
…(等字段)
create_time
DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT ‘创建时间’,
modify_time
DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT ‘最后修改时间’,
KEY idx_key_1
(user_id, reg_date),
KEY idx_key_2
(reg_date, game_version),
… (等多个普通复合索引)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT=‘用户信息’
PARTITION BY RANGE ( TO_DAYS(reg_date
) ) (
PARTITION p2021
VALUES LESS THAN (738521),
PARTITION p2022
VALUES LESS THAN (738886),
PARTITION p2023
VALUES LESS THAN (739251),
PARTITION p2024
VALUES LESS THAN (739617),
PARTITION p2025
VALUES LESS THAN (739982),
PARTITION p2026
VALUES LESS THAN (740347),
PARTITION p2027
VALUES LESS THAN (740712),
PARTITION p2028
VALUES LESS THAN (741078),
PARTITION p2029
VALUES LESS THAN (741443),
PARTITION pother
VALUES LESS THAN (MAXVALUE)
)
表中 user_id+reg_date+game_version+batch_sn 记录是唯一的(表中未设置唯一索引,靠程序进行保证)
在 flink 作业中,业务流程如下:
kakfa source → group by user_id(按用户分组,将保证相同用户在相同 slot) → tidb sink
对于业务中的一条数据会转成如下 2条sql 在事务中进行执行:
insert into user(user_id,reg_date,game_version,batch_sn) select xx,xx,xx,xx from dual where not exists(select 1 from user where user_id=xx and reg_date=xx and game_version=xx and batch_sn=xx);
update user set amount=amount+xx where user_id=xx and reg_date=xx and game_version=xx and batch_sn=xx);
每次是100条记录,对应 200 条 sql 进行一次事务提交。