有没有java读取kafka数据(TiCDC同步到kafka)的例子或文档?

有没有java读取kafka数据(TiCDC同步到kafka)的例子或文档?

我看TiCDC的Open协议中,删除和修改记录时都没有给出记录的老值,那在一些情况下就比较难处理,能否在删除和修改记录时把老的值也给出来?(参考MySQL binlog)

抱歉,有没有java读取kafka数据(TiCDC同步到kafka),这句话我没有太理解。 请问,是要从tidb同步到kafka的例子吗?

是java如何读取TiCDC同步到kafka的数据,有没有这方面的例子~

java 如何消费 kafka 的实践需要用户自己去做,可以到 kafka 社区或者 java 社区看下。

TiCDC在kafka中的格式怎么读取呢?

CDC 没有提供 Kafka 消费端的实现,只提供了开放数据协议,可以依据该协议实现 consumer

那就复杂了…

您好,当前只有go 版本的解析 demo,请参考 https://github.com/pingcap/ticdc/tree/master/kafka_consumer

注意:只是 demo ,不要使用在生产环境,多谢。

请问有demo了吗可以参考下吗

我这边有个解析TiCDC Canal-Json自用的小demo,写的一般,主要是想解决消费TiDB不同表加工为业务数据的情况,而且实际并未用到oldData处理数据,仅供参考

1. 代码结构

cdc-demo
└─ src
    └─ main
        └─ java
         └─ com.example.demo
           ├─ constants
           │    └─ CdcConstants.java
           ├─ dto
           │    ├─ CdcMessage.java
           │    └─ User.java
           ├─ job
           │    ├─ CdcJob.java
           │    └─ UserCdcJob.java
           └─ service
                ├─ impl
                │   └─ UserServiceImpl.java
                └─ CdcService.java

2. CDC 常量类

public class CdcConstants {

    public enum MessageType {
        /**
         * 插入操作
         */
        INSERT,

        /**
         * 更新操作
         */
        UPDATE,

        /**
         * 删除操作
         */
        DELETE;
    }
}

3. 实体类

3.1 用户实体类

@Getter
@Setter
public class User {

    /**
     * 用户id
     */
    private Long id;

    /**
     * 用户名
     */
    private String name;

    /**
     * 年龄
     */
    private Integer age;
}

3.2 CDC 消息实体类

@Getter
@Setter
public class CdcMessage<T> {

    /**
     * 数据集合
     */
    private List<T> data;

    /**
     * 数据库名称
     */
    private String database;

    /**
     * 是否为DDL语句isDdl
     */
    private boolean isDdl;

    /**
     * 表结构的类型字段(值为字段类型,如varchar)
     */
    private T mysqlType;

    /**
     * UPDATE类型下的旧数据(未变更字段无数据)
     */
    private List<T> oldData;

    /**
     * sql语句
     */
    private String sql;

    /**
     * 值为int类型
     */
    private T sqlType;

    /**
     * 数据表名
     */
    private String table;

    /**
     * 新增(INSERT)、更新(UPDATE)、删除(DELETE)、删除表(ERASE)等
     */
    private String type;
}

4. 任务类

4.1 CDC 任务基类

@Slf4j
public class CdcJob<T> {

    protected CdcService<T> cdcService;

    /**
     * 处理消息
     *
     * @param record 消息记录
     * @param ack    消息处理标识
     */
    public void handleMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String recordString = String.format(
                "topic:%s,partition:%s,offset:%s,value:%s",
                record.topic(),
                record.partition(),
                record.offset(),
                record.value()
        );
        log.info("数据更新开始处理,消息:" + recordString);
        try {
            boolean processResult = process(record);
            String processResultString = processResult ? "成功" : "无更新";
            log.info("数据更新处理结束,处理结果:" + processResultString);
        } catch (Exception e) {
            log.error("数据更新报错,", e);
        } finally {
            // 手动提交偏移量
            ack.acknowledge();
        }
    }

    /**
     * 处理数据
     *
     * @param record kafka消费记录
     * @return 处理结果
     */
    public boolean process(ConsumerRecord<String, String> record) {
        String bizName = this.getClass().getSimpleName();
        // 服务为初始化报错
        if (null == cdcService) {
            throw new IllegalStateException("服务未初始化");
        }

        // 解析消息
        CdcMessage<T> cdcMessage = JSON.parseObject(record.value(), new TypeReference<CdcMessage<T>>() {
        });

        // 跳过DDL
        if (cdcMessage.isDdl()) {
            log.info(bizName, "DDL变更,无需处理");
            return false;
        }
        // 处理结果初始化
        boolean result = false;
        // 服务层处理数据
        List<T> dataList = cdcMessage.getData();
        if (CdcConstants.MessageType.INSERT.name().equals(cdcMessage.getType())) {
            result = cdcService.insert(dataList);
        } else if (CdcConstants.MessageType.UPDATE.name().equals(cdcMessage.getType())) {
            result = cdcService.update(cdcMessage.getOldData(), dataList);
        } else if (CdcConstants.MessageType.DELETE.name().equals(cdcMessage.getType())) {
            result = cdcService.delete(dataList);
        } else {
            log.warn(bizName, "不处理该消息,消息类型:" + cdcMessage.getType());
        }
        return result;
    }
}

4.2 用户表 CDC 消费任务类

@Component
public class UserCdcJob extends CdcJob<User> {

    public UserCdcJob(UserServiceImpl userService) {
        this.cdcService = userService;
    }

    /**
     * 消费CDC消息,并进行处理
     *
     * @param record 消息记录
     * @param ack    消息处理标识
     */
    @KafkaListener(id = "UserCdcJob", groupId = "${user-cdc.group}",
            topics = {"${user-cdc.topic}"}, containerFactory = "cdcKafkaListenerFactory")
    public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) {
        handleMessage(record, ack);
    }
}

5 服务类

5.1 CDC 消息处理服务接口

public interface CdcService<T> {

    /**
     * 插入数据
     *
     * @param data 数据
     * @return 插入结果
     */
    boolean insert(List<T> data);

    /**
     * 更新数据
     *
     * @param oldData 更新前数据
     * @param newData 更新后数据
     * @return 更新结果
     */
    boolean update(List<T> oldData, List<T> newData);

    /**
     * 删除数据
     *
     * @param data 数据
     * @return 删除数据
     */
    boolean delete(List<T> data);
}

5.2 用户服务实现类

@Service
public class UserServiceImpl implements CdcService<User> {

    @Override
    public boolean insert(List<User> data) {
        // TODO
        return false;
    }

    @Override
    public boolean update(List<User> oldData, List<User> newData) {
        // TODO
        return false;
    }

    @Override
    public boolean delete(List<User> data) {
        // TODO
        return false;
    }
}

好的谢谢,请问方便直接发源码或者git吗

这个只是我没写 Service 的具体实现(这个实现是看业务的,你这边对数据的更新、插入、删除是什么逻辑就写什么逻辑好了),逻辑是一样的