TiKV 6.0 源码apply方面的问题

【 TiKV 版本`】TiKV 6.0
【遇到的问题】
当TiKV apply到rocksdb时,调用了handle_raft_entry_normal()这个方法,然后会有两种分支,一种是调用的commit方法,一种是process_raft_cmd()方法。想问一下:
1.什么情况调用的commit方法呢?也就是这个方法具体是有什么作用。
2.当调用process_raft_cmd()之后,会find_pending()取出callback,push到applied_batch,那这个callback在哪里执行呢?
望大佬们赐教!
【问题现象及影响】

handle_raft_entry_normal()方法:
let index = entry.get_index();
let term = entry.get_term();
let data = entry.get_data();

    if !data.is_empty() {
        let cmd = util::parse_data_at(data, index, &self.tag);

        if apply_ctx.yield_high_latency_operation && has_high_latency_operation(&cmd) {
            self.priority = Priority::Low;
        }
        let mut has_unflushed_data =
            self.last_flush_applied_index != self.apply_state.get_applied_index();
        if has_unflushed_data && should_write_to_engine(&cmd)
            || apply_ctx.kv_wb().should_write_to_engine()
        {
            ***apply_ctx.commit(self);***
            if let Some(start) = self.handle_start.as_ref() {
                if start.saturating_elapsed() >= apply_ctx.yield_duration {
                    return ApplyResult::Yield;
                }
            }
            has_unflushed_data = false;
        }
        if self.priority != apply_ctx.priority {
            if has_unflushed_data {
                apply_ctx.commit(self);
            }
            return ApplyResult::Yield;
        }

        return self.process_raft_cmd(apply_ctx, index, term, cmd);
    }
    // TOOD(cdc): should we observe empty cmd, aka leader change?

    self.apply_state.set_applied_index(index);
    self.applied_index_term = term;
    assert!(term > 0);

    // 1. When a peer become leader, it will send an empty entry.
    // 2. When a leader tries to read index during transferring leader,
    //    it will also propose an empty entry. But that entry will not contain
    //    any associated callback. So no need to clear callback.
    while let Some(mut cmd) = self.pending_cmds.pop_normal(u64::MAX, term - 1) {
        if let Some(cb) = cmd.cb.take() {
            apply_ctx
                .applied_batch
                .push_cb(cb, cmd_resp::err_resp(Error::StaleCommand, term));
        }
    }
    ApplyResult::None

process_raft_cmd方法:

fn process_raft_cmd(
        &mut self,
        apply_ctx: &mut ApplyContext<EK>,
        index: u64,
        term: u64,
        cmd: RaftCmdRequest,
    ) -> ApplyResult<EK::Snapshot> {
        if index == 0 {
            panic!(
                "{} processing raft command needs a none zero index",
                self.tag
            );
        }

        // Set sync log hint if the cmd requires so.
        apply_ctx.sync_log_hint |= should_sync_log(&cmd);

        apply_ctx.host.pre_apply(&self.region, &cmd);
        let (mut resp, exec_result) = self.apply_raft_cmd(apply_ctx, index, term, &cmd);
        if let ApplyResult::WaitMergeSource(_) = exec_result {
            return exec_result;
        }

        debug!(
            "applied command";
            "region_id" => self.region_id(),
            "peer_id" => self.id(),
            "index" => index
        );

        // TODO: if we have exec_result, maybe we should return this callback too. Outer
        // store will call it after handing exec result.
        cmd_resp::bind_term(&mut resp, self.term);
        let cmd_cb = self.find_pending(index, term, is_conf_change_cmd(&cmd));
        let cmd = Cmd::new(index, cmd, resp);
        apply_ctx
            .applied_batch
            .push(cmd_cb, cmd, &self.observe_info, self.region_id());
        exec_result
    }
  1. 调用 commit 的方法的情况是某个 apply fsm yield,是我们之前为了避免某个热点 fsm 占用过长时间的 cpu 时间而引入的
  2. 在 commit 写入 kv rocksdb 之后 https://github.com/tikv/tikv/blob/master/components/raftstore/src/store/fsm/apply.rs#L550-L568
1 个赞

感谢您的解答! 但是我还是有一个疑问:
如果不调用commit,而是调用process_raft_cmd()最终在finish_for()中调用了commit_opt(),此时参数为false,就不会调用wirte_to_db。
我想请问一下,就是把cb push到ApplyContext之后,cb在什么时候调用返回给client呢?
非常感谢!!

self.commit_opt(delegate, false);

 fn commit_opt(&mut self, delegate: &mut ApplyDelegate<EK>, persistent: bool) {
        delegate.update_metrics(self);
        if persistent {
            self.write_to_db();
            self.prepare_for(delegate);
            delegate.last_flush_applied_index = delegate.apply_state.get_applied_index()
        }
        self.kv_wb_last_bytes = self.kv_wb().data_size() as u64;
        self.kv_wb_last_keys = self.kv_wb().count() as u64;
    }

这里push到Applycontext的callbatch
fn process_raft_cmd(
        &mut self,
        apply_ctx: &mut ApplyContext<EK>,
        index: u64,
        term: u64,
        cmd: RaftCmdRequest,
    ) -> ApplyResult<EK::Snapshot> {
 let cmd = Cmd::new(index, cmd, resp);
        apply_ctx
            .applied_batch
            .push(cmd_cb, cmd, &self.observe_info, self.region_id());
        exec_result
}

1.请问是在end()方法中调用了flush()中调用write_to_db()来调用callback返回client嘛
2.还有一个问题就是各种KV的API在src/server/service/kv.rs中吗,在里面并没有搜到比如Raw_Get,Raw_Put这种GRPC的api呀

此话题已在最后回复的 1 分钟后被自动关闭。不再允许新回复。