【 TiKV 版本`】TiKV 6.0
【遇到的问题】
当TiKV apply到rocksdb时,调用了handle_raft_entry_normal()这个方法,然后会有两种分支,一种是调用的commit方法,一种是process_raft_cmd()方法。想问一下:
1.什么情况调用的commit方法呢?也就是这个方法具体是有什么作用。
2.当调用process_raft_cmd()之后,会find_pending()取出callback,push到applied_batch,那这个callback在哪里执行呢?
望大佬们赐教!
【问题现象及影响】
handle_raft_entry_normal()方法:
let index = entry.get_index();
let term = entry.get_term();
let data = entry.get_data();
if !data.is_empty() {
let cmd = util::parse_data_at(data, index, &self.tag);
if apply_ctx.yield_high_latency_operation && has_high_latency_operation(&cmd) {
self.priority = Priority::Low;
}
let mut has_unflushed_data =
self.last_flush_applied_index != self.apply_state.get_applied_index();
if has_unflushed_data && should_write_to_engine(&cmd)
|| apply_ctx.kv_wb().should_write_to_engine()
{
***apply_ctx.commit(self);***
if let Some(start) = self.handle_start.as_ref() {
if start.saturating_elapsed() >= apply_ctx.yield_duration {
return ApplyResult::Yield;
}
}
has_unflushed_data = false;
}
if self.priority != apply_ctx.priority {
if has_unflushed_data {
apply_ctx.commit(self);
}
return ApplyResult::Yield;
}
return self.process_raft_cmd(apply_ctx, index, term, cmd);
}
// TOOD(cdc): should we observe empty cmd, aka leader change?
self.apply_state.set_applied_index(index);
self.applied_index_term = term;
assert!(term > 0);
// 1. When a peer become leader, it will send an empty entry.
// 2. When a leader tries to read index during transferring leader,
// it will also propose an empty entry. But that entry will not contain
// any associated callback. So no need to clear callback.
while let Some(mut cmd) = self.pending_cmds.pop_normal(u64::MAX, term - 1) {
if let Some(cb) = cmd.cb.take() {
apply_ctx
.applied_batch
.push_cb(cb, cmd_resp::err_resp(Error::StaleCommand, term));
}
}
ApplyResult::None
process_raft_cmd方法:
fn process_raft_cmd(
&mut self,
apply_ctx: &mut ApplyContext<EK>,
index: u64,
term: u64,
cmd: RaftCmdRequest,
) -> ApplyResult<EK::Snapshot> {
if index == 0 {
panic!(
"{} processing raft command needs a none zero index",
self.tag
);
}
// Set sync log hint if the cmd requires so.
apply_ctx.sync_log_hint |= should_sync_log(&cmd);
apply_ctx.host.pre_apply(&self.region, &cmd);
let (mut resp, exec_result) = self.apply_raft_cmd(apply_ctx, index, term, &cmd);
if let ApplyResult::WaitMergeSource(_) = exec_result {
return exec_result;
}
debug!(
"applied command";
"region_id" => self.region_id(),
"peer_id" => self.id(),
"index" => index
);
// TODO: if we have exec_result, maybe we should return this callback too. Outer
// store will call it after handing exec result.
cmd_resp::bind_term(&mut resp, self.term);
let cmd_cb = self.find_pending(index, term, is_conf_change_cmd(&cmd));
let cmd = Cmd::new(index, cmd, resp);
apply_ctx
.applied_batch
.push(cmd_cb, cmd, &self.observe_info, self.region_id());
exec_result
}
- 调用 commit 的方法的情况是某个 apply fsm yield,是我们之前为了避免某个热点 fsm 占用过长时间的 cpu 时间而引入的
- 在 commit 写入 kv rocksdb 之后 https://github.com/tikv/tikv/blob/master/components/raftstore/src/store/fsm/apply.rs#L550-L568
1 个赞
感谢您的解答! 但是我还是有一个疑问:
如果不调用commit,而是调用process_raft_cmd()最终在finish_for()中调用了commit_opt(),此时参数为false,就不会调用wirte_to_db。
我想请问一下,就是把cb push到ApplyContext之后,cb在什么时候调用返回给client呢?
非常感谢!!
self.commit_opt(delegate, false);
fn commit_opt(&mut self, delegate: &mut ApplyDelegate<EK>, persistent: bool) {
delegate.update_metrics(self);
if persistent {
self.write_to_db();
self.prepare_for(delegate);
delegate.last_flush_applied_index = delegate.apply_state.get_applied_index()
}
self.kv_wb_last_bytes = self.kv_wb().data_size() as u64;
self.kv_wb_last_keys = self.kv_wb().count() as u64;
}
这里push到Applycontext的callbatch
fn process_raft_cmd(
&mut self,
apply_ctx: &mut ApplyContext<EK>,
index: u64,
term: u64,
cmd: RaftCmdRequest,
) -> ApplyResult<EK::Snapshot> {
let cmd = Cmd::new(index, cmd, resp);
apply_ctx
.applied_batch
.push(cmd_cb, cmd, &self.observe_info, self.region_id());
exec_result
}
1.请问是在end()方法中调用了flush()中调用write_to_db()来调用callback返回client嘛
2.还有一个问题就是各种KV的API在src/server/service/kv.rs中吗,在里面并没有搜到比如Raw_Get,Raw_Put这种GRPC的api呀
此话题已在最后回复的 1 分钟后被自动关闭。不再允许新回复。