【 TiDB 使用环境】生产环境
【 TiDB 版本】v6.5.0
【复现路径】TiCDC同步数据到kafka,logstash消费kafka数据保存到ES
【遇到的问题:问题现象及影响】
TiCDC同步数据到Kafka,数据无法保证顺序,导致es数据异常。
使用了dispatcher分发器为rowid,理论上应该能保证行数据的顺序性,但测试后发现顺序异常。
TiDB表结构,使用的分区表
CREATE TABLE `tbsendrcd` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
.....
PRIMARY KEY (`id`,`send_tm`) /*T![clustered_index] CLUSTERED */,
UNIQUE KEY `sms_id` (`sms_id`,`send_tm`),
KEY `send_tm` (`send_tm`),
KEY `job_id` (`idx_job_id`),
KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin AUTO_INCREMENT=8975043 COMMENT='短信记录表'
PARTITION BY RANGE (UNIX_TIMESTAMP(`send_tm`))
(PARTITION `p20231020` VALUES LESS THAN (1697817600),
PARTITION `p20231021` VALUES LESS THAN (1697904000),
PARTITION `p20231022` VALUES LESS THAN (1697990400),
PARTITION `p20231023` VALUES LESS THAN (1698076800),
PARTITION `p20231024` VALUES LESS THAN (1698163200),
PARTITION `p20231025` VALUES LESS THAN (1698249600),
PARTITION `p20231026` VALUES LESS THAN (1698336000),
PARTITION `p20231027` VALUES LESS THAN (1698422400),
PARTITION `p20231028` VALUES LESS THAN (1698508800),
PARTITION `p20231029` VALUES LESS THAN (1698595200),
PARTITION `p20231030` VALUES LESS THAN (1698681600),
PARTITION `p20231031` VALUES LESS THAN (1698768000),
PARTITION `p20231101` VALUES LESS THAN (1698854400),
PARTITION `p20231102` VALUES LESS THAN (1698940800),
PARTITION `p20231103` VALUES LESS THAN (1699027200),
PARTITION `p20231104` VALUES LESS THAN (1699113600),
PARTITION `p20231105` VALUES LESS THAN (1699200000),
PARTITION `p20231106` VALUES LESS THAN (1699286400),
PARTITION `p20231107` VALUES LESS THAN (1699372800),
PARTITION `p20231108` VALUES LESS THAN (1699459200),
PARTITION `p20231109` VALUES LESS THAN (1699545600),
PARTITION `p20231110` VALUES LESS THAN (1699632000),
PARTITION `p20231111` VALUES LESS THAN (1699718400),
PARTITION `p20231112` VALUES LESS THAN (1699804800),
PARTITION `p20231113` VALUES LESS THAN (1699891200),
PARTITION `p20231114` VALUES LESS THAN (1699977600),
PARTITION `p20231115` VALUES LESS THAN (1700064000),
PARTITION `p20231116` VALUES LESS THAN (1700150400),
PARTITION `p20231117` VALUES LESS THAN (1700236800),
PARTITION `p20231118` VALUES LESS THAN (1700323200),
PARTITION `p20231119` VALUES LESS THAN (1700409600),
PARTITION `p20231120` VALUES LESS THAN (1700496000),
PARTITION `p20231121` VALUES LESS THAN (1700582400),
PARTITION `p20231122` VALUES LESS THAN (1700668800),
PARTITION `p20231123` VALUES LESS THAN (1700755200),
PARTITION `p20231124` VALUES LESS THAN (1700841600),
PARTITION `p20231125` VALUES LESS THAN (1700928000),
PARTITION `p20231126` VALUES LESS THAN (1701014400),
PARTITION `p20231127` VALUES LESS THAN (1701100800),
PARTITION `p20231128` VALUES LESS THAN (1701187200),
PARTITION `p20231129` VALUES LESS THAN (1701273600),
PARTITION `p20231130` VALUES LESS THAN (1701360000),
PARTITION `p20231201` VALUES LESS THAN (1701446400),
PARTITION `p20231202` VALUES LESS THAN (1701532800),
PARTITION `p20231203` VALUES LESS THAN (1701619200),
PARTITION `p20231204` VALUES LESS THAN (1701705600),
PARTITION `p20231205` VALUES LESS THAN (1701792000),
PARTITION `p20231206` VALUES LESS THAN (1701878400),
PARTITION `p20231207` VALUES LESS THAN (1701964800),
PARTITION `p20231208` VALUES LESS THAN (1702051200),
PARTITION `p20231209` VALUES LESS THAN (1702137600),
PARTITION `p20231210` VALUES LESS THAN (1702224000),
PARTITION `p20231211` VALUES LESS THAN (1702310400),
PARTITION `p20231212` VALUES LESS THAN (1702396800),
PARTITION `p20231213` VALUES LESS THAN (1702483200),
PARTITION `p20231214` VALUES LESS THAN (1702569600),
PARTITION `p20231215` VALUES LESS THAN (1702656000),
PARTITION `p20231216` VALUES LESS THAN (1702742400),
PARTITION `p20231217` VALUES LESS THAN (1702828800),
PARTITION `p20231218` VALUES LESS THAN (1702915200),
PARTITION `p20231219` VALUES LESS THAN (1703001600),
PARTITION `p20231220` VALUES LESS THAN (1703088000),
PARTITION `p20231221` VALUES LESS THAN (1703174400),
PARTITION `p20231222` VALUES LESS THAN (1703260800),
PARTITION `p20231223` VALUES LESS THAN (1703347200),
PARTITION `p20231224` VALUES LESS THAN (1703433600),
PARTITION `p20231225` VALUES LESS THAN (1703520000),
PARTITION `p20231226` VALUES LESS THAN (1703606400),
PARTITION `p20231227` VALUES LESS THAN (1703692800),
PARTITION `p20231228` VALUES LESS THAN (1703779200),
PARTITION `p20231229` VALUES LESS THAN (1703865600),
PARTITION `p20231230` VALUES LESS THAN (1703952000),
PARTITION `p20231231` VALUES LESS THAN (1704038400),
PARTITION `p20240101` VALUES LESS THAN (1704124800),
PARTITION `p20240102` VALUES LESS THAN (1704211200),
PARTITION `p20240103` VALUES LESS THAN (1704297600),
PARTITION `p20240104` VALUES LESS THAN (1704384000),
PARTITION `p20240105` VALUES LESS THAN (1704470400),
PARTITION `p20240106` VALUES LESS THAN (1704556800),
PARTITION `p20240107` VALUES LESS THAN (1704643200),
PARTITION `p20240108` VALUES LESS THAN (1704729600),
PARTITION `p20240109` VALUES LESS THAN (1704816000),
PARTITION `p20240110` VALUES LESS THAN (1704902400),
PARTITION `p20240111` VALUES LESS THAN (1704988800),
PARTITION `p20240112` VALUES LESS THAN (1705075200),
PARTITION `p20240113` VALUES LESS THAN (1705161600),
PARTITION `p20240114` VALUES LESS THAN (1705248000),
PARTITION `p20240115` VALUES LESS THAN (1705334400),
PARTITION `p20240116` VALUES LESS THAN (1705420800),
PARTITION `p20240117` VALUES LESS THAN (1705507200),
PARTITION `p20240118` VALUES LESS THAN (1705593600),
PARTITION `p20240119` VALUES LESS THAN (1705680000),
PARTITION `p20240120` VALUES LESS THAN (1705766400),
PARTITION `p20240121` VALUES LESS THAN (1705852800),
PARTITION `p20240122` VALUES LESS THAN (1705939200),
PARTITION `p20240123` VALUES LESS THAN (1706025600),
PARTITION `p20240124` VALUES LESS THAN (1706112000),
PARTITION `p20240125` VALUES LESS THAN (1706198400),
PARTITION `p20240126` VALUES LESS THAN (1706284800),
PARTITION `p20240127` VALUES LESS THAN (1706371200),
PARTITION `p20240128` VALUES LESS THAN (1706457600),
PARTITION `p20240129` VALUES LESS THAN (1706544000),
PARTITION `p20240130` VALUES LESS THAN (1706630400),
PARTITION `p20240131` VALUES LESS THAN (1706716800),
PARTITION `p20240201` VALUES LESS THAN (1706803200),
PARTITION `p20240202` VALUES LESS THAN (1706889600),
PARTITION `p20240203` VALUES LESS THAN (1706976000),
PARTITION `p20240204` VALUES LESS THAN (1707062400),
PARTITION `p20240205` VALUES LESS THAN (1707148800),
PARTITION `p20240206` VALUES LESS THAN (1707235200),
PARTITION `p20240207` VALUES LESS THAN (1707321600),
PARTITION `p20240208` VALUES LESS THAN (1707408000),
PARTITION `p20240209` VALUES LESS THAN (1707494400),
PARTITION `p20240210` VALUES LESS THAN (1707580800),
PARTITION `p20240211` VALUES LESS THAN (1707667200),
PARTITION `p20240212` VALUES LESS THAN (1707753600),
PARTITION `p20240213` VALUES LESS THAN (1707840000),
PARTITION `p20240214` VALUES LESS THAN (1707926400),
PARTITION `p20240215` VALUES LESS THAN (1708012800),
PARTITION `p20240216` VALUES LESS THAN (1708099200),
PARTITION `p20240217` VALUES LESS THAN (1708185600),
PARTITION `p20240218` VALUES LESS THAN (1708272000),
PARTITION `p20240219` VALUES LESS THAN (1708358400),
PARTITION `p20240220` VALUES LESS THAN (1708444800),
PARTITION `p20240221` VALUES LESS THAN (1708531200),
PARTITION `p20240222` VALUES LESS THAN (1708617600),
PARTITION `p20240223` VALUES LESS THAN (1708704000),
PARTITION `p20240224` VALUES LESS THAN (1708790400),
PARTITION `p20240225` VALUES LESS THAN (1708876800),
PARTITION `p20240226` VALUES LESS THAN (1708963200),
PARTITION `p20240227` VALUES LESS THAN (1709049600),
PARTITION `p20240228` VALUES LESS THAN (1709136000),
PARTITION `p20240229` VALUES LESS THAN (1709222400),
PARTITION `p20240301` VALUES LESS THAN (1709308800),
PARTITION `p20240302` VALUES LESS THAN (1709395200),
PARTITION `p20240303` VALUES LESS THAN (1709481600),
PARTITION `p20240304` VALUES LESS THAN (1709568000),
PARTITION `p20240305` VALUES LESS THAN (1709654400),
PARTITION `p20240306` VALUES LESS THAN (1709740800),
PARTITION `p20240307` VALUES LESS THAN (1709827200),
PARTITION `p20240308` VALUES LESS THAN (1709913600),
PARTITION `p20240309` VALUES LESS THAN (1710000000));
TiCDC同步任务脚本
curl -X POST -H "'Content-type':'application/json'" http://127.0.0.1:8300/api/v1/changefeeds -d '{"changefeed_id":"sms-record-task","sink_uri":"kafka://127.0.0.1:9092/sms-record-sync?protocol=canal-json&kafka-version=2.8.1&partition-num=60&max-message-bytes=67108864&replication-factor=3","filter_rules":["itnio_sms_record.tbsendrcd"],"sink_config":{"dispatchers":[{"matcher":["itnio_sms_record.tbsendrcd"], "dispatcher":"rowid"}],"protocol":"canal-json"}}'
logstash的logstash.conf配置
# Sample Logstash configuration for creating a simple
# kafka -> Logstash -> Elasticsearch pipeline.
# 输入源部分
input {
kafka{
add_field => {"inputType" => "tbsendrcd"}
bootstrap_servers => "127.0.0.1:9092"
topics => "sms-record-sync"
codec => json {
charset => "UTF-8"
}
group_id => "itnio-sms-sync"
auto_offset_reset => "earliest"
consumer_threads => 5
}
}
filter {
if [inputType] == "tbsendrcd" {
split {
field => "data"
remove_field => ["mysqlType","sqlType","pkNames","sql","isDdl","old","es","ts","id","@version","message","database","@timestamp"]
}
mutate {
add_field => { "dataTemp" => "%{data}" }
}
json {
source => "dataTemp"
remove_field => [ "data","dataTemp","keyId" ]
}
mutate {
add_field => { "keyId" => "%{id}" }
convert => {
"id" => "integer"
"keyId" => "integer"
"idxUserId" => "integer"
"idxSupplierId" => "integer"
"idxAgentId" => "integer"
"srcType" => "integer"
"sendType" => "integer"
"idxJobId" => "integer"
"subJobId" => "integer"
"idxRouteId" => "integer"
"idxChannelId" => "integer"
"idxGoipId" => "integer"
"portNum" => "integer"
"flash" => "integer"
"duration" => "integer"
"mcc" => "integer"
"mnc" => "integer"
"pay" => "integer"
"cost" => "integer"
"chargeCnt" => "integer"
"result" => "integer"
"pwdKeyId" => "integer"
"payType" => "integer"
"statusResult" => "integer"
"idxAppId" => "integer"
"idxAppTypeId" => "integer"
"backfillStatus" => "integer"
"setMealId" => "integer"
"rate" => "integer"
}
}
date {
match => [ "createTime", "YYYY-MM-dd HH:mm:ss"]
target => "createTime"
timezone => "Etc/GMT"
}
date {
match => [ "doneTm", "YYYY-MM-dd HH:mm:ss"]
target => "doneTm"
timezone => "Etc/GMT"
}
date {
match => [ "mdfTm", "YYYY-MM-dd HH:mm:ss"]
target => "mdfTm"
timezone => "Etc/GMT"
}
date {
match => [ "sendTm", "YYYY-MM-dd HH:mm:ss"]
target => "sendTm"
timezone => "Etc/GMT"
}
date {
match => [ "backfillStatusTm", "YYYY-MM-dd HH:mm:ss"]
target => "backfillStatusTm"
timezone => "Etc/GMT"
}
mutate {
add_field => {"day" => "%{sendTm}"}
}
ruby {
code => "event.set('day', Date.strptime(event.get('[day]'), '%Y-%m-%dT%H:%M:%S').strftime('%Y%m'))"
}
}
}
output {
if [inputType] == "tbsendrcd" and "tbsendrcd" == [table] and ![isDdl] and [smsId] != "" {
stdout { codec => rubydebug }
if [type] == "INSERT" {
elasticsearch {
document_id => "%{smsId}"
hosts => ["https://itnio-es-01.itnio-common.svc.dev.praise:9200"]
ssl_certificate_verification => false
index => "send_record_%{day}"
user => "elastic"
password => ""
}
}
if [type] == "UPDATE" {
elasticsearch {
document_id => "%{smsId}"
hosts => ["https://itnio-es-01.itnio-common.svc.dev.praise:9200"]
ssl_certificate_verification => false
index => "send_record_%{day}"
user => "elastic"
password => ""
doc_as_upsert => true
manage_template => true
action => "update"
}
}
if [type] == "DELETE" {
elasticsearch {
document_id => "%{smsId}"
action => "delete"
hosts => ["https://itnio-es-01.itnio-common.svc.dev.praise:9200"]
ssl_certificate_verification => false
index => "send_record_%{day}"
user => "elastic"
password => ""
}
}
}
}
logstash日志能看到insert在update之后执行。