有没有java读取kafka数据(TiCDC同步到kafka)的例子或文档?
我看TiCDC的Open协议中,删除和修改记录时都没有给出记录的老值,那在一些情况下就比较难处理,能否在删除和修改记录时把老的值也给出来?(参考MySQL binlog)
有没有java读取kafka数据(TiCDC同步到kafka)的例子或文档?
我看TiCDC的Open协议中,删除和修改记录时都没有给出记录的老值,那在一些情况下就比较难处理,能否在删除和修改记录时把老的值也给出来?(参考MySQL binlog)
抱歉,有没有java读取kafka数据(TiCDC同步到kafka),这句话我没有太理解。 请问,是要从tidb同步到kafka的例子吗?
是java如何读取TiCDC同步到kafka的数据,有没有这方面的例子~
java 如何消费 kafka 的实践需要用户自己去做,可以到 kafka 社区或者 java 社区看下。
TiCDC在kafka中的格式怎么读取呢?
CDC 没有提供 Kafka 消费端的实现,只提供了开放数据协议,可以依据该协议实现 consumer
那就复杂了…
您好,当前只有go 版本的解析 demo,请参考 https://github.com/pingcap/ticdc/tree/master/kafka_consumer 。
注意:只是 demo ,不要使用在生产环境,多谢。
请问有demo了吗可以参考下吗
我这边有个解析TiCDC Canal-Json自用的小demo,写的一般,主要是想解决消费TiDB不同表加工为业务数据的情况,而且实际并未用到oldData处理数据,仅供参考
cdc-demo
└─ src
└─ main
└─ java
└─ com.example.demo
├─ constants
│ └─ CdcConstants.java
├─ dto
│ ├─ CdcMessage.java
│ └─ User.java
├─ job
│ ├─ CdcJob.java
│ └─ UserCdcJob.java
└─ service
├─ impl
│ └─ UserServiceImpl.java
└─ CdcService.java
public class CdcConstants {
public enum MessageType {
/**
* 插入操作
*/
INSERT,
/**
* 更新操作
*/
UPDATE,
/**
* 删除操作
*/
DELETE;
}
}
@Getter
@Setter
public class User {
/**
* 用户id
*/
private Long id;
/**
* 用户名
*/
private String name;
/**
* 年龄
*/
private Integer age;
}
@Getter
@Setter
public class CdcMessage<T> {
/**
* 数据集合
*/
private List<T> data;
/**
* 数据库名称
*/
private String database;
/**
* 是否为DDL语句isDdl
*/
private boolean isDdl;
/**
* 表结构的类型字段(值为字段类型,如varchar)
*/
private T mysqlType;
/**
* UPDATE类型下的旧数据(未变更字段无数据)
*/
private List<T> oldData;
/**
* sql语句
*/
private String sql;
/**
* 值为int类型
*/
private T sqlType;
/**
* 数据表名
*/
private String table;
/**
* 新增(INSERT)、更新(UPDATE)、删除(DELETE)、删除表(ERASE)等
*/
private String type;
}
@Slf4j
public class CdcJob<T> {
protected CdcService<T> cdcService;
/**
* 处理消息
*
* @param record 消息记录
* @param ack 消息处理标识
*/
public void handleMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
String recordString = String.format(
"topic:%s,partition:%s,offset:%s,value:%s",
record.topic(),
record.partition(),
record.offset(),
record.value()
);
log.info("数据更新开始处理,消息:" + recordString);
try {
boolean processResult = process(record);
String processResultString = processResult ? "成功" : "无更新";
log.info("数据更新处理结束,处理结果:" + processResultString);
} catch (Exception e) {
log.error("数据更新报错,", e);
} finally {
// 手动提交偏移量
ack.acknowledge();
}
}
/**
* 处理数据
*
* @param record kafka消费记录
* @return 处理结果
*/
public boolean process(ConsumerRecord<String, String> record) {
String bizName = this.getClass().getSimpleName();
// 服务为初始化报错
if (null == cdcService) {
throw new IllegalStateException("服务未初始化");
}
// 解析消息
CdcMessage<T> cdcMessage = JSON.parseObject(record.value(), new TypeReference<CdcMessage<T>>() {
});
// 跳过DDL
if (cdcMessage.isDdl()) {
log.info(bizName, "DDL变更,无需处理");
return false;
}
// 处理结果初始化
boolean result = false;
// 服务层处理数据
List<T> dataList = cdcMessage.getData();
if (CdcConstants.MessageType.INSERT.name().equals(cdcMessage.getType())) {
result = cdcService.insert(dataList);
} else if (CdcConstants.MessageType.UPDATE.name().equals(cdcMessage.getType())) {
result = cdcService.update(cdcMessage.getOldData(), dataList);
} else if (CdcConstants.MessageType.DELETE.name().equals(cdcMessage.getType())) {
result = cdcService.delete(dataList);
} else {
log.warn(bizName, "不处理该消息,消息类型:" + cdcMessage.getType());
}
return result;
}
}
@Component
public class UserCdcJob extends CdcJob<User> {
public UserCdcJob(UserServiceImpl userService) {
this.cdcService = userService;
}
/**
* 消费CDC消息,并进行处理
*
* @param record 消息记录
* @param ack 消息处理标识
*/
@KafkaListener(id = "UserCdcJob", groupId = "${user-cdc.group}",
topics = {"${user-cdc.topic}"}, containerFactory = "cdcKafkaListenerFactory")
public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) {
handleMessage(record, ack);
}
}
public interface CdcService<T> {
/**
* 插入数据
*
* @param data 数据
* @return 插入结果
*/
boolean insert(List<T> data);
/**
* 更新数据
*
* @param oldData 更新前数据
* @param newData 更新后数据
* @return 更新结果
*/
boolean update(List<T> oldData, List<T> newData);
/**
* 删除数据
*
* @param data 数据
* @return 删除数据
*/
boolean delete(List<T> data);
}
@Service
public class UserServiceImpl implements CdcService<User> {
@Override
public boolean insert(List<User> data) {
// TODO
return false;
}
@Override
public boolean update(List<User> oldData, List<User> newData) {
// TODO
return false;
}
@Override
public boolean delete(List<User> data) {
// TODO
return false;
}
}
好的谢谢,请问方便直接发源码或者git吗
这个只是我没写 Service 的具体实现(这个实现是看业务的,你这边对数据的更新、插入、删除是什么逻辑就写什么逻辑好了),逻辑是一样的