cdc好像自动过滤了INSERT语句并且checkpoint没变化

【 TiDB 使用环境】生产环境
【 TiDB 版本】v7.1.5
【遇到的问题:问题现象及影响】
近期刚升级了TIDB集群从v5.4.3 → v7.1.5 引入ticdc过滤规则功能还有提升性能。
升级后创建cdc changefeed,发现任务好像自动过滤了INSERT语句,是有UPDATE的,好奇怪~
还有就是 checkpoint没变化。
大佬们帮我看下哪里的问题,是不是我配置文件有问题呀,但我的过滤规则就那几条很简单呀

配置文件:


# 指定该 Changefeed 在 Capture Server 中内存配额的上限。对于超额使用部分,
# 会在运行中被 Go runtime 优先回收。默认值为 `1073741824`,即 1 GB。
# memory-quota = 1073741824
memory-quota = 3221225472

# 指定配置文件中涉及的库名、表名是否为大小写敏感
# 该配置会同时影响 filter 和 sink 相关配置。自 v6.5.6 和 v7.1.3 起,默认值由 true 改为 false
case-sensitive = false

# 是否输出 old value,从 v4.0.5 开始支持,从 v5.0 开始默认为 true
enable-old-value = true

# 是否开启 Syncpoint 功能,从 v6.3.0 开始支持,该功能默认关闭。
# 从 v6.4.0 开始,使用 Syncpoint 功能需要同步任务拥有下游集群的 SYSTEM_VARIABLES_ADMIN 或者 SUPER 权限。
# 注意:该参数只有当下游为 TiDB 时,才会生效。
# enable-sync-point = false

# Syncpoint 功能对齐上下游 snapshot 的时间间隔
# 配置格式为 h m s,例如 "1h30m30s"
# 默认值为 10m,最小值为 30s
# 注意:该参数只有当下游为 TiDB 时,才会生效。
# sync-point-interval = "5m"

# Syncpoint 功能在下游表中保存的数据的时长,超过这个时间的数据会被清理
# 配置格式为 h m s,例如 "24h30m30s"
# 默认值为 24h
# 注意:该参数只有当下游为 TiDB 时,才会生效。
# sync-point-retention = "1h"

# 从 v6.5.6 和 v7.1.3 起引入,用于设置解析 DDL 时使用的 SQL 模式,多个模式之间用逗号分隔
# 默认值和 TiDB 的默认 SQL 模式一致
# sql-mode = "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"

[mounter]
# mounter 解码 KV 数据的线程数,默认值为 16
worker-num = 11

[filter]
# 忽略指定 start_ts 的事务
# ignore-txn-start-ts = [1, 2]

# 过滤器规则
# 过滤规则语法:https://docs.pingcap.com/zh/tidb/stable/table-filter#表库过滤语法
# rules = ['*.*', '!test.*']
rules = ['bsppr.xpost']

# 事件过滤器规则
# 事件过滤器的详细配置规则可参考:https://docs.pingcap.com/zh/tidb/stable/ticdc-filter
# 第一个事件过滤器规则
# [[filter.event-filters]]
# matcher = ["test.worker"] # matcher 是一个白名单,表示该过滤规则只应用于 test 库中的 worker 表
# ignore-event = ["insert"] # 过滤掉 insert 事件
# ignore-sql = ["^drop", "add column"] # 过滤掉以 "drop" 开头或者包含 "add column" 的 DDL
# ignore-delete-value-expr = "name = 'john'" # 过滤掉包含 name = 'john' 条件的 delete DML
# ignore-insert-value-expr = "id >= 100" # 过滤掉包含 id >= 100 条件的 insert DML
# ignore-update-old-value-expr = "age < 18" # 过滤掉旧值 age < 18 的 update DML
# ignore-update-new-value-expr = "gender = 'male'" # 过滤掉新值 gender = 'male' 的 update DML

[[filter.event-filters]]
matcher = ['bsppr.xpost']
ignore-event=["all ddl","delete"]
ignore-update-new-value-expr = "hidden=1"

# 第二个事件过滤器规则
# [[filter.event-filters]]
# matcher = ["test.fruit"] # 该事件过滤器只应用于 test.fruit 表
# ignore-event = ["drop table", "delete"] # 忽略 drop table 的 DDL 事件和 delete 类型的 DML 事件
# ignore-sql = ["^drop table", "alter table"] # 忽略以 drop table 开头的,或者包含 alter table 的 DDL 语句
# ignore-insert-value-expr = "price > 1000 and origin = 'no where'" # 忽略包含 price > 1000 和 origin = 'no where' 条件的 insert DML

[scheduler]
# 将表以 Region 为单位分配给多个 TiCDC 节点进行同步。
# 注意:该功能只在 Kafka changefeed 上生效,暂不支持 MySQL changefeed。
# 默认为 "false"。设置为 "true" 以打开该功能。
enable-table-across-nodes = true
# enable-table-across-nodes 开启后,有两种分配模式
# 1. 按 Region 的数量分配,即每个 CDC 节点处理 region 的个数基本相等。当某个表 Region 个数大于 `region-threshold` 值时,会将表分配到多个节点处理。`region-threshold` 默认值为 10000。
# region-threshold = 10000 
# 2. 按写入的流量分配,即每个 CDC 节点处理 region 总修改行数基本相当。只有当表中每分钟修改行数超过 `write-key-threshold` 值时,该表才会生效。
# write-key-threshold = 30000
write-key-threshold = 30000
# 注意:
# `write-key-threshold` 参数默认值为 0,代表默认不会采用流量的分配模式。
# 两种方式配置一种即可生效,当 `region-threshold` 和 `write-key-threshold` 同时配置时,TiCDC 将优先采用按流量分配的模式,即 `write-key-threshold`。      


[sink]
# 对于 MQ 类的 Sink,可以通过 dispatchers 配置 event 分发器
# 支持 partition 及 topic(从 v6.1 开始支持)两种 event 分发器。二者的详细说明见下一节。
# matcher 的匹配语法和过滤器规则语法相同,matcher 匹配规则的详细说明见下一节。
# 注意:该参数只有当下游为消息队列时,才会生效。
# dispatchers = [
#    {matcher = ['test1.*', 'test2.*'], topic = "Topic 表达式 1", partition = "ts" },
#    {matcher = ['test3.*', 'test4.*'], topic = "Topic 表达式 2", partition = "index-value" },
#    {matcher = ['test1.*', 'test5.*'], topic = "Topic 表达式 3", partition = "table"},
#    {matcher = ['test6.*'], partition = "ts"}
# ]
dispatchers = [
    {matcher = ['bsppr.xpost'], dispatcher = "default"},
]

# protocol 用于指定传递到下游的协议格式
# 当下游类型是 Kafka 时,支持 canal-json、avro 两种协议。
# 当下游类型是存储服务时,目前仅支持 canal-json、csv 两种协议。
# 注意:该参数只有当下游为 Kafka 或存储服务时,才会生效。
# protocol = "canal-json"
protocol = "canal-json"

# 以下三个配置项仅在同步到存储服务的 sink 中使用,在 MQ 和 MySQL 类 sink 中无需设置。
# 换行符,用来分隔两个数据变更事件。默认值为空,表示使用 "\r\n" 作为换行符。
# terminator = ''

# 文件路径的日期分隔类型。可选类型有 `none`、`year`、`month` 和 `day`。默认值为 `day`,即按天分隔。详见 <https://docs.pingcap.com/zh/tidb/v7.1/ticdc-sink-to-cloud-storage#数据变更记录>。
# 注意:该参数只有当下游为存储服务时,才会生效。
date-separator = 'day'

# 是否使用 partition 作为分隔字符串。默认值为 true,即一张表中各个 partition 的数据会分不同的目录来存储。建议保持该配置项为 true 以避免下游分区表可能丢数据的问题 <https://github.com/pingcap/tiflow/issues/8581>。使用示例详见 <https://docs.pingcap.com/zh/tidb/v7.1/ticdc-sink-to-cloud-storage#数据变更记录>。
# 注意:该参数只有当下游为存储服务时,才会生效。
enable-partition-separator = true

# Schema 注册表的 URL。
# 注意:该参数只有当下游为消息队列时,才会生效。
# schema-registry = "http://localhost:80801/subjects/{subject-name}/versions/{version-number}/schema"

# 编码数据时所用编码器的线程数。
# 默认值为 16。
# 注意:该参数只有当下游为消息队列时,才会生效。
# encoder-concurrency = 16

# 是否开启 Kafka Sink V2。Kafka Sink V2 内部使用 kafka-go 实现。
# 默认值为 false。
# 注意:该参数只有当下游为消息队列时,才会生效。
# enable-kafka-sink-v2 = false

# 是否只向下游同步有内容更新的列。从 v7.1.0 开始支持。
# 默认值为 false。
# 注意:该参数只有当下游为消息队列,并且使用 Open Protocol 或 Canal-JSON 时,才会生效。
# only-output-updated-columns = false

# 从 v6.5.0 开始,TiCDC 支持以 CSV 格式将数据变更记录保存至存储服务中,在 MQ 和 MySQL 类 sink 中无需设置。
# [sink.csv]
# 字段之间的分隔符。必须为 ASCII 字符,默认值为 `,`。
# delimiter = ','
# 用于包裹字段的引号字符。空值代表不使用引号字符。默认值为 `"`。
# quote = '"'
# CSV 中列为 NULL 时将以什么字符来表示。默认值为 `\N`。
# null = '\N'
# 是否在 CSV 行中包含 commit-ts。默认值为 false。
# include-commit-ts = false
# 二进制类型数据的编码方式,可选 'base64' 或 'hex'。从 v7.1.2 开始支持。默认值为 'base64'。
# binary-encoding-method = 'base64'

# consistent 中的字段用于配置 Changefeed 的数据一致性。详细的信息,请参考 <https://docs.pingcap.com/tidb/stable/ticdc-sink-to-mysql#eventually-consistent-replication-in-disaster-scenarios>。
# 注意:一致性相关参数只有当下游为数据库并且开启 redo log 功能时,才会生效。
[consistent]
# 数据一致性级别。默认值为 "none",可选值为 "none" 和 "eventual"。
# 设置为 "none" 时将关闭 redo log。
level = "none"
# redo log 的最大日志大小,单位为 MB。默认值为 64。
max-log-size = 64
# 两次 redo log 刷新的时间间隔,单位为毫秒。默认值为 2000。
flush-interval = 2000
# redo log 使用存储服务的 URI。默认值为空。
storage = ""
# 是否将 redo log 存储到本地文件中。默认值为 false。
use-file-backend = false
# 控制 redo 模块中编解码 worker 的数量,默认值为 16。
encoding-worker-num = 16
# 控制 redo 模块中上传文件 worker 的数量,默认值为 8。
flush-worker-num = 8
# redo log 文件的压缩行为,可选值为 "" 和 "lz4"。默认值为 "",表示不进行压缩。
compression = ""
# redo log 上传单个文件的并发数,默认值为 1,表示禁用并发。
flush-concurrency = 1

[integrity]
# 是否开启单行数据的 Checksum 校验功能,默认值为 "none",即不开启。可选值为 "none" 和 "correctness"。
integrity-check-level = "none"
# 当单行数据的 Checksum 校验失败时,Changefeed 打印错误行数据相关日志的级别。默认值为 "warn",可选值为 "warn" 和 "error"。
corruption-handle-level = "warn"

## 以下参数仅在下游为 Kafka 时生效。从 v7.1.1 开始支持。
#[sink.kafka-config]
## Kafka SASL 认证机制。该参数默认值为空,表示不使用 SASL 认证。
#sasl-mechanism = "OAUTHBEARER"
## Kafka SASL OAUTHBEARER 认证机制中的 client-id。默认值为空。在使用该认证机制时,该参数必填。
#sasl-oauth-client-id = "producer-kafka"
## Kafka SASL OAUTHBEARER 认证机制中的 client-secret。默认值为空。需要 Base64 编码。在使用该认证机制时,该参数必填。
#sasl-oauth-client-secret = "cHJvZHVjZXIta2Fma2E="
## Kafka SASL OAUTHBEARER 认证机制中的 token-url 用于获取 token。默认值为空。在使用该认证机制时,该参数必填。
#sasl-oauth-token-url = "http://127.0.0.1:4444/oauth2/token"
## Kafka SASL OAUTHBEARER 认证机制中的 scopes。默认值为空。在使用该认证机制时,该参数可选填。
#sasl-oauth-scopes = ["producer.kafka", "consumer.kafka"]
## Kafka SASL OAUTHBEARER 认证机制中的 grant-type。默认值为 "client_credentials"。在使用该认证机制时,该参数可选填。
#sasl-oauth-grant-type = "client_credentials"
## Kafka SASL OAUTHBEARER 认证机制中的 audience。默认值为空。在使用该认证机制时,该参数可选填。
#sasl-oauth-audience = "kafka"

[sink.cloud-storage-config]
# 向下游存储服务保存数据变更记录的并发度,默认值为 16。
worker-count = 16
# 向下游存储服务保存数据变更记录的间隔,默认值为 "2s"。
flush-interval = "2s"
# 单个数据变更文件的字节数超过 `file-size` 时将其保存至存储服务中,默认值为 67108864,即 64 MiB。
file-size = 67108864
# 文件保留的时长,仅在 date-separator 配置为 day 时生效,默认值为 0,表示禁用文件清理。假设 `file-expiration-days = 1` 且 `file-cleanup-cron-spec = "0 0 0 * * *"`,TiCDC 将在每天 00:00:00 时刻清理已保存超过 24 小时的文件。例如,2023/12/02 00:00:00 将清理 2023/12/01 之前(注意:不包括 2023/12/01)的文件。
file-expiration-days = 0
# 定时清理任务的运行周期,与 crontab 配置兼容,格式为 `<Second> <Minute> <Hour> <Day of the month> <Month> <Day of the week (Optional)>`,默认值为 "0 0 2 * * *",表示每天凌晨两点执行清理任务
file-cleanup-cron-spec = "0 0 2 * * *"
# 上传单个文件的并发数,默认值为 1,表示禁用并发。
flush-concurrency = 1

命令:

tiup cdc cli changefeed create -c xxxxxx --server=http://192.168.241.71:8300 --sink-uri=“kafka://192.168.241.64:9092,192.168.241.65:9092,192.168.241.66:9092/tidb-xpost-incr?protocol=canal-json&kafka-version=1.1.1&partition-num=4&max-message-bytes=67108864&replication-factor=2” --config=./xxxxxx.toml

这里有一个bug,具体的内容:ddl events should not be filtered in puller · Issue #10524 · pingcap/tiflow · GitHub

1 个赞

没太看懂,意思是我配置的规则“all ddl" 这个规则里面包括了 insert 这个BUG 吗?

你们把配置文件过滤ddl 内容去掉,重试看看是不是就正常,加了过滤条件才异常。

1 个赞

去掉了"all ddl" 规则,也生效了, 还是不行~~~ 这个必须要删除 changefeed 重新创建吗?我是update的


更新同步任务一般是 pause->update->resume 。

issue里有4个 Case ,你仔细核对一下看看有没有match的场景。我们之前遇到过过滤指定的DDL后,会导致后续的相同表的DML全部被忽略过滤,是这个bug引发的。不过官方应该时钟v7.1.4修复了的。

1 个赞

我是pause->update->resume 这样更新的。
还有我当前是7.1.5版本呀

issue里面的几个case好像没看到有对insert影响的呀,还是不太明白~

你把TiCDC的debug日志打开,上游执行SQL,然后具体的执行过程就可以在日志体现,建议跟踪来看看。

推测还是类似的问题。

1 个赞

好的。

出问题的是生产环境,我重新弄了个测试环境,一样的配置,测试环境是正常的。
生产环境是刚做过升级v5.4.3->v7.1.5。 难道是升级的问题吗
不太敢在生产上调试,怕出问题

CDC的版本确认是 v7.1.5吗?另外,如果可以的话,可以尝试下重建任务。

1 个赞

image
是v7.1.5,我反复重建过任务,一样的效果,只有UPDATE SQL,checkpoint不变

新配置文件是这个效果,但我用老的配置就是v5.4.3的配置文件建的任务好像就是好的。奇怪

看了下cdc的启动时间,升级过程中也确实是重启过的

在v7.1.5,生成一个模板对比下

1 个赞

生成cdc的配置文件模板吗?怎么生成我没找到方法

我刚才弄了个测试环境,全新部署的同版本的TIDB集群和kafka,同样的ticdc配置文件,没有问题,过滤规则都是生效的。
目前来看可能和tidb集群升级有关,但我看了下当时的升级录像,全程没有异常错误和日志。
现在该怎么查问题啊 :rofl: :rofl: :rofl:

你试试新建一个 cdc 任务到一个kafka 新的 topic,然后配置一样 tso 指定现在。看看同步是否正常。

1 个赞

配置一样的tso是排除数据问题吗? 没法配置成和测试一样的tso

他说的是主题吧

1 个赞

不指定start tso 默认以现在开始同步。

1 个赞

最终结论可能是:tiup/cluster没有升级到最新版本的问题。
官方文档在7.1.5升级中建议满足版本即可,但在7.5后就没再写了,应该是这些组件越新越好吧,向下兼容。