TiCDC同步数据到Kafka,数据无法保证顺序

【 TiDB 使用环境】生产环境
【 TiDB 版本】v6.5.0
【复现路径】TiCDC同步数据到kafka,logstash消费kafka数据保存到ES
【遇到的问题:问题现象及影响】
TiCDC同步数据到Kafka,数据无法保证顺序,导致es数据异常。
使用了dispatcher分发器为rowid,理论上应该能保证行数据的顺序性,但测试后发现顺序异常。

TiDB表结构,使用的分区表

CREATE TABLE `tbsendrcd` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
		.....
  PRIMARY KEY (`id`,`send_tm`) /*T![clustered_index] CLUSTERED */,
  UNIQUE KEY `sms_id` (`sms_id`,`send_tm`),
  KEY `send_tm` (`send_tm`),
  KEY `job_id` (`idx_job_id`),
  KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin AUTO_INCREMENT=8975043 COMMENT='短信记录表'
PARTITION BY RANGE (UNIX_TIMESTAMP(`send_tm`))
(PARTITION `p20231020` VALUES LESS THAN (1697817600),
 PARTITION `p20231021` VALUES LESS THAN (1697904000),
 PARTITION `p20231022` VALUES LESS THAN (1697990400),
 PARTITION `p20231023` VALUES LESS THAN (1698076800),
 PARTITION `p20231024` VALUES LESS THAN (1698163200),
 PARTITION `p20231025` VALUES LESS THAN (1698249600),
 PARTITION `p20231026` VALUES LESS THAN (1698336000),
 PARTITION `p20231027` VALUES LESS THAN (1698422400),
 PARTITION `p20231028` VALUES LESS THAN (1698508800),
 PARTITION `p20231029` VALUES LESS THAN (1698595200),
 PARTITION `p20231030` VALUES LESS THAN (1698681600),
 PARTITION `p20231031` VALUES LESS THAN (1698768000),
 PARTITION `p20231101` VALUES LESS THAN (1698854400),
 PARTITION `p20231102` VALUES LESS THAN (1698940800),
 PARTITION `p20231103` VALUES LESS THAN (1699027200),
 PARTITION `p20231104` VALUES LESS THAN (1699113600),
 PARTITION `p20231105` VALUES LESS THAN (1699200000),
 PARTITION `p20231106` VALUES LESS THAN (1699286400),
 PARTITION `p20231107` VALUES LESS THAN (1699372800),
 PARTITION `p20231108` VALUES LESS THAN (1699459200),
 PARTITION `p20231109` VALUES LESS THAN (1699545600),
 PARTITION `p20231110` VALUES LESS THAN (1699632000),
 PARTITION `p20231111` VALUES LESS THAN (1699718400),
 PARTITION `p20231112` VALUES LESS THAN (1699804800),
 PARTITION `p20231113` VALUES LESS THAN (1699891200),
 PARTITION `p20231114` VALUES LESS THAN (1699977600),
 PARTITION `p20231115` VALUES LESS THAN (1700064000),
 PARTITION `p20231116` VALUES LESS THAN (1700150400),
 PARTITION `p20231117` VALUES LESS THAN (1700236800),
 PARTITION `p20231118` VALUES LESS THAN (1700323200),
 PARTITION `p20231119` VALUES LESS THAN (1700409600),
 PARTITION `p20231120` VALUES LESS THAN (1700496000),
 PARTITION `p20231121` VALUES LESS THAN (1700582400),
 PARTITION `p20231122` VALUES LESS THAN (1700668800),
 PARTITION `p20231123` VALUES LESS THAN (1700755200),
 PARTITION `p20231124` VALUES LESS THAN (1700841600),
 PARTITION `p20231125` VALUES LESS THAN (1700928000),
 PARTITION `p20231126` VALUES LESS THAN (1701014400),
 PARTITION `p20231127` VALUES LESS THAN (1701100800),
 PARTITION `p20231128` VALUES LESS THAN (1701187200),
 PARTITION `p20231129` VALUES LESS THAN (1701273600),
 PARTITION `p20231130` VALUES LESS THAN (1701360000),
 PARTITION `p20231201` VALUES LESS THAN (1701446400),
 PARTITION `p20231202` VALUES LESS THAN (1701532800),
 PARTITION `p20231203` VALUES LESS THAN (1701619200),
 PARTITION `p20231204` VALUES LESS THAN (1701705600),
 PARTITION `p20231205` VALUES LESS THAN (1701792000),
 PARTITION `p20231206` VALUES LESS THAN (1701878400),
 PARTITION `p20231207` VALUES LESS THAN (1701964800),
 PARTITION `p20231208` VALUES LESS THAN (1702051200),
 PARTITION `p20231209` VALUES LESS THAN (1702137600),
 PARTITION `p20231210` VALUES LESS THAN (1702224000),
 PARTITION `p20231211` VALUES LESS THAN (1702310400),
 PARTITION `p20231212` VALUES LESS THAN (1702396800),
 PARTITION `p20231213` VALUES LESS THAN (1702483200),
 PARTITION `p20231214` VALUES LESS THAN (1702569600),
 PARTITION `p20231215` VALUES LESS THAN (1702656000),
 PARTITION `p20231216` VALUES LESS THAN (1702742400),
 PARTITION `p20231217` VALUES LESS THAN (1702828800),
 PARTITION `p20231218` VALUES LESS THAN (1702915200),
 PARTITION `p20231219` VALUES LESS THAN (1703001600),
 PARTITION `p20231220` VALUES LESS THAN (1703088000),
 PARTITION `p20231221` VALUES LESS THAN (1703174400),
 PARTITION `p20231222` VALUES LESS THAN (1703260800),
 PARTITION `p20231223` VALUES LESS THAN (1703347200),
 PARTITION `p20231224` VALUES LESS THAN (1703433600),
 PARTITION `p20231225` VALUES LESS THAN (1703520000),
 PARTITION `p20231226` VALUES LESS THAN (1703606400),
 PARTITION `p20231227` VALUES LESS THAN (1703692800),
 PARTITION `p20231228` VALUES LESS THAN (1703779200),
 PARTITION `p20231229` VALUES LESS THAN (1703865600),
 PARTITION `p20231230` VALUES LESS THAN (1703952000),
 PARTITION `p20231231` VALUES LESS THAN (1704038400),
 PARTITION `p20240101` VALUES LESS THAN (1704124800),
 PARTITION `p20240102` VALUES LESS THAN (1704211200),
 PARTITION `p20240103` VALUES LESS THAN (1704297600),
 PARTITION `p20240104` VALUES LESS THAN (1704384000),
 PARTITION `p20240105` VALUES LESS THAN (1704470400),
 PARTITION `p20240106` VALUES LESS THAN (1704556800),
 PARTITION `p20240107` VALUES LESS THAN (1704643200),
 PARTITION `p20240108` VALUES LESS THAN (1704729600),
 PARTITION `p20240109` VALUES LESS THAN (1704816000),
 PARTITION `p20240110` VALUES LESS THAN (1704902400),
 PARTITION `p20240111` VALUES LESS THAN (1704988800),
 PARTITION `p20240112` VALUES LESS THAN (1705075200),
 PARTITION `p20240113` VALUES LESS THAN (1705161600),
 PARTITION `p20240114` VALUES LESS THAN (1705248000),
 PARTITION `p20240115` VALUES LESS THAN (1705334400),
 PARTITION `p20240116` VALUES LESS THAN (1705420800),
 PARTITION `p20240117` VALUES LESS THAN (1705507200),
 PARTITION `p20240118` VALUES LESS THAN (1705593600),
 PARTITION `p20240119` VALUES LESS THAN (1705680000),
 PARTITION `p20240120` VALUES LESS THAN (1705766400),
 PARTITION `p20240121` VALUES LESS THAN (1705852800),
 PARTITION `p20240122` VALUES LESS THAN (1705939200),
 PARTITION `p20240123` VALUES LESS THAN (1706025600),
 PARTITION `p20240124` VALUES LESS THAN (1706112000),
 PARTITION `p20240125` VALUES LESS THAN (1706198400),
 PARTITION `p20240126` VALUES LESS THAN (1706284800),
 PARTITION `p20240127` VALUES LESS THAN (1706371200),
 PARTITION `p20240128` VALUES LESS THAN (1706457600),
 PARTITION `p20240129` VALUES LESS THAN (1706544000),
 PARTITION `p20240130` VALUES LESS THAN (1706630400),
 PARTITION `p20240131` VALUES LESS THAN (1706716800),
 PARTITION `p20240201` VALUES LESS THAN (1706803200),
 PARTITION `p20240202` VALUES LESS THAN (1706889600),
 PARTITION `p20240203` VALUES LESS THAN (1706976000),
 PARTITION `p20240204` VALUES LESS THAN (1707062400),
 PARTITION `p20240205` VALUES LESS THAN (1707148800),
 PARTITION `p20240206` VALUES LESS THAN (1707235200),
 PARTITION `p20240207` VALUES LESS THAN (1707321600),
 PARTITION `p20240208` VALUES LESS THAN (1707408000),
 PARTITION `p20240209` VALUES LESS THAN (1707494400),
 PARTITION `p20240210` VALUES LESS THAN (1707580800),
 PARTITION `p20240211` VALUES LESS THAN (1707667200),
 PARTITION `p20240212` VALUES LESS THAN (1707753600),
 PARTITION `p20240213` VALUES LESS THAN (1707840000),
 PARTITION `p20240214` VALUES LESS THAN (1707926400),
 PARTITION `p20240215` VALUES LESS THAN (1708012800),
 PARTITION `p20240216` VALUES LESS THAN (1708099200),
 PARTITION `p20240217` VALUES LESS THAN (1708185600),
 PARTITION `p20240218` VALUES LESS THAN (1708272000),
 PARTITION `p20240219` VALUES LESS THAN (1708358400),
 PARTITION `p20240220` VALUES LESS THAN (1708444800),
 PARTITION `p20240221` VALUES LESS THAN (1708531200),
 PARTITION `p20240222` VALUES LESS THAN (1708617600),
 PARTITION `p20240223` VALUES LESS THAN (1708704000),
 PARTITION `p20240224` VALUES LESS THAN (1708790400),
 PARTITION `p20240225` VALUES LESS THAN (1708876800),
 PARTITION `p20240226` VALUES LESS THAN (1708963200),
 PARTITION `p20240227` VALUES LESS THAN (1709049600),
 PARTITION `p20240228` VALUES LESS THAN (1709136000),
 PARTITION `p20240229` VALUES LESS THAN (1709222400),
 PARTITION `p20240301` VALUES LESS THAN (1709308800),
 PARTITION `p20240302` VALUES LESS THAN (1709395200),
 PARTITION `p20240303` VALUES LESS THAN (1709481600),
 PARTITION `p20240304` VALUES LESS THAN (1709568000),
 PARTITION `p20240305` VALUES LESS THAN (1709654400),
 PARTITION `p20240306` VALUES LESS THAN (1709740800),
 PARTITION `p20240307` VALUES LESS THAN (1709827200),
 PARTITION `p20240308` VALUES LESS THAN (1709913600),
 PARTITION `p20240309` VALUES LESS THAN (1710000000));

TiCDC同步任务脚本

curl -X POST -H "'Content-type':'application/json'" http://127.0.0.1:8300/api/v1/changefeeds -d '{"changefeed_id":"sms-record-task","sink_uri":"kafka://127.0.0.1:9092/sms-record-sync?protocol=canal-json&kafka-version=2.8.1&partition-num=60&max-message-bytes=67108864&replication-factor=3","filter_rules":["itnio_sms_record.tbsendrcd"],"sink_config":{"dispatchers":[{"matcher":["itnio_sms_record.tbsendrcd"], "dispatcher":"rowid"}],"protocol":"canal-json"}}'

logstash的logstash.conf配置

# Sample Logstash configuration for creating a simple
# kafka -> Logstash -> Elasticsearch pipeline.

#  输入源部分
input {
	kafka{
		add_field => {"inputType" => "tbsendrcd"}
		bootstrap_servers => "127.0.0.1:9092"
		topics => "sms-record-sync"
		codec => json {
			charset => "UTF-8"
			}
		group_id => "itnio-sms-sync"
		auto_offset_reset => "earliest"
		consumer_threads => 5
	}
}
filter {
	if [inputType] == "tbsendrcd" {
		split {
			field => "data"
			remove_field => ["mysqlType","sqlType","pkNames","sql","isDdl","old","es","ts","id","@version","message","database","@timestamp"]
		}
		mutate {
			add_field => { "dataTemp" => "%{data}" }
		}
		json {
			source => "dataTemp"
			remove_field => [ "data","dataTemp","keyId" ]
		}
		mutate {
			add_field => { "keyId" => "%{id}" }
			convert => {
				"id" => "integer"
				"keyId" => "integer"
				"idxUserId" => "integer"
				"idxSupplierId" => "integer"
				"idxAgentId" => "integer"
				"srcType" => "integer"
				"sendType" => "integer"
				"idxJobId" => "integer"
				"subJobId" => "integer"
				"idxRouteId" => "integer"
				"idxChannelId" => "integer"
				"idxGoipId" => "integer"
				"portNum" => "integer"
				"flash" => "integer"
				"duration" => "integer"
				"mcc" => "integer"
				"mnc" => "integer"
				"pay" => "integer"
				"cost" => "integer"
				"chargeCnt" => "integer"
				"result" => "integer"
				"pwdKeyId" => "integer"
				"payType" => "integer"
				"statusResult" => "integer"
				"idxAppId" => "integer"
				"idxAppTypeId" => "integer"
				"backfillStatus" => "integer"
				"setMealId" => "integer"
				"rate" => "integer"
			}
		}
		date {
			match => [ "createTime",  "YYYY-MM-dd HH:mm:ss"]
			target => "createTime"
			timezone => "Etc/GMT"
		}
        date {
			match => [ "doneTm",  "YYYY-MM-dd HH:mm:ss"]
			target => "doneTm"
			timezone => "Etc/GMT"
		}
        date {
			match => [ "mdfTm",  "YYYY-MM-dd HH:mm:ss"]
			target => "mdfTm"
			timezone => "Etc/GMT"
         }
        date {
			match => [ "sendTm",  "YYYY-MM-dd HH:mm:ss"]
			target => "sendTm"
			timezone => "Etc/GMT"
        }
		date {
			match => [ "backfillStatusTm",  "YYYY-MM-dd HH:mm:ss"]
			target => "backfillStatusTm"
			timezone => "Etc/GMT"
		}
		mutate {
			add_field => {"day" => "%{sendTm}"}
		}
		ruby {
			code => "event.set('day', Date.strptime(event.get('[day]'), '%Y-%m-%dT%H:%M:%S').strftime('%Y%m'))"
		}
    }
}

output {
    if [inputType] == "tbsendrcd" and "tbsendrcd" == [table] and ![isDdl] and [smsId] != "" {
        stdout { codec => rubydebug }
        if [type] == "INSERT" {
			elasticsearch {
				document_id => "%{smsId}"
				hosts => ["https://itnio-es-01.itnio-common.svc.dev.praise:9200"]
				ssl_certificate_verification => false
				index => "send_record_%{day}"
				user => "elastic"
				password => ""
			}
        }
		if [type] == "UPDATE" {
			elasticsearch {
				document_id => "%{smsId}"
				hosts => ["https://itnio-es-01.itnio-common.svc.dev.praise:9200"]
				ssl_certificate_verification => false
				index => "send_record_%{day}"
				user => "elastic"
				password => ""
				doc_as_upsert => true
				manage_template => true
				action => "update"
			}
        }
        if [type] == "DELETE" {
			elasticsearch {
				document_id => "%{smsId}"
				action => "delete"
				hosts => ["https://itnio-es-01.itnio-common.svc.dev.praise:9200"]
				ssl_certificate_verification => false
				index => "send_record_%{day}"
				user => "elastic"
				password => ""
			}
        }
    }
}

logstash日志能看到insert在update之后执行。

kafka用没用分片,分片规则是什么,logstash消费的时候是不是基于分片并发的呢



logstash分片并发指的是?

没懂你说的分发器是rowid,官方文档中没看到https://docs.pingcap.com/zh/tidb/v7.6/ticdc-sink-to-kafka#partition-分发器 ,你可以把cdc配置发出来看看,正常应该是能保证单行有序性的。

https://docs.pingcap.com/zh/tidb/v6.5/ticdc-open-api


你可以看下这个文档
cdc任务的脚本

curl -X POST -H "'Content-type':'application/json'" http://127.0.0.1:8300/api/v1/changefeeds -d '{"changefeed_id":"sms-record-task","sink_uri":"kafka://127.0.0.1:9092/sms-record-sync?protocol=canal-json&kafka-version=2.8.1&partition-num=60&max-message-bytes=67108864&replication-factor=3","filter_rules":["itnio_sms_record.tbsendrcd"],"sink_config":{"dispatchers":[{"matcher":["itnio_sms_record.tbsendrcd"], "dispatcher":"rowid"}],"protocol":"canal-json"}}'

cdc任务的脚本里是 “protocol”:“canal-json”
参考的文档是 open-api

所以 changefeed 创建命令用的 protocol 是?

kafka的分片,比如kafka分了4个区,起了四个消费程序并发消费,哪值能保证再每个分区是顺序的,4个之间的顺序保证不了

image
这个保证了每行数据在同一分区的吧?并且在这个分区对这行数据的增删改查是顺序的吧?

是的,这个有问题吗