PD 调度器模块

1 调度器管理模块coordinator
初始化过程分析
因为调度器管理模块从属于 RaftCluster 模块,所以启动 RaftCluster 模块的时候,我们初始化调度器管理模块,从初始化代码,能够看出来,coordinator模块依赖上层模块raft cluster以及用于发送调作的模块HeartbeatStreams。
raft cluster:用于存储当前系统最新的region以及store信息,基于这些信息,我们分析,统计
当前cluster数据分布现状,以及冷热现状。调度器模块需要这些信息,然后基于一些
rule,或者为了系统健康或者更好的用户体验的想法,生成一系列调度操作

HeartbeatStreams:生成的调度操作,最终需要发送给tikv节点,用于实施调度操作,从而达
到用户或者系统定义健康的期望。这个模块主要用于连接管理,发现调度操作
// Start starts a cluster.
func (c *RaftCluster) Start(s Server) error {
c.coordinator = newCoordinator(c.ctx, cluster, s.GetHBStreams())
}

// newCoordinator creates a new coordinator.
func newCoordinator(ctx context.Context, cluster *RaftCluster, hbStreams *hbstream.HeartbeatStreams) *coordinator {
ctx, cancel := context.WithCancel(ctx)
opController := schedule.NewOperatorController(ctx, cluster, hbStreams)
return &coordinator{
ctx: ctx,
cancel: cancel,
cluster: cluster,
checkers: schedule.NewCheckerController(ctx, cluster, cluster.ruleManager, opController),
regionScatterer: schedule.NewRegionScatterer(ctx, cluster),
regionSplitter: schedule.NewRegionSplitter(cluster, schedule.NewSplitRegionsHandler(cluster, opController)),
schedulers: make(map[string]*scheduleController),
opController: opController,
hbStreams: hbStreams,
pluginInterface: schedule.NewPluginInterface(),
}
}

调度器管理模块,说白了,就是为了生成一系列调度操作,以及控制调度操作生成的速度,每类调度操作对应region的管理,一个region对应一个raft group,所以针对region的管理,就是针对raft group 成员的管理,例如增加/删除member,基于不同目的(负载,数据量)的raft member 位置转移等,调度模块从属关系coordinator–>RaftCluster -->Server
调度器coordinator模块和其它模块的关系
type RaftCluster struct {
coordinator *coordinator
}

type Server struct {
cluster *cluster.RaftCluster
}

调度器模块定义,从调度器定义里,发现调试器模块包含一系列其它模块
// coordinator is used to manage all schedulers and checkers to decide //if the region needs to be scheduled.
type coordinator struct {
sync.RWMutex

	wg              sync.WaitGroup
	ctx             context.Context
	cancel          context.CancelFunc
	cluster         *RaftCluster
	checkers        *schedule.CheckerController
	regionScatterer *schedule.RegionScatterer
	regionSplitter  *schedule.RegionSplitter
	schedulers      map[string]*scheduleController
	opController    *schedule.OperatorController
	hbStreams       *hbstream.HeartbeatStreams
	pluginInterface *schedule.PluginInterface
}

从上面coordinator的定义,也能看出来coordinator 包含一系列子模块,例如子模块分成三类:
不同目的的调度操作生成模块:checkers,regionScatterer,regionSplitter
限制调度速度的模块:opController
用于调度操作发送到tikv的模块:hbStreams

生成调度子模块分析
2.1 子模块 CheckerController
这个模块主要用于定期检察所有的数据,如果发现数据副本数,或者位置等不符合用户的期望,会生成一系列的调度操作,这些调度操作会不停的针对region实施调度操作,最终达到用户的期望。CheckerController 的初始化以及定义如下:
check 控制器的初始化过程
checkers: schedule.NewCheckerController(ctx, cluster, cluster.ruleManager, opController)

// NewCheckerController create a new CheckerController.
// TODO: isSupportMerge should be removed.
func NewCheckerController(ctx context.Context, cluster opt.Cluster, ruleManager *placement.RuleManager, opController *OperatorController) *CheckerController {
regionWaitingList := cache.NewDefaultCache(DefaultCacheSize)
return &CheckerController{
cluster: cluster,
opts: cluster.GetOpts(),
opController: opController,
learnerChecker: checker.NewLearnerChecker(cluster),
replicaChecker: checker.NewReplicaChecker(cluster, regionWaitingList),
ruleChecker: checker.NewRuleChecker(cluster, ruleManager, regionWaitingList),
mergeChecker: checker.NewMergeChecker(ctx, cluster),
jointStateChecker: checker.NewJointStateChecker(cluster),
regionWaitingList: regionWaitingList,
}
}

Check 控制器的定义,里面包含一系列不同目的的检察器
// CheckerController is used to manage all checkers.
type CheckerController struct {
cluster opt.Cluster
opts *config.PersistOptions
opController *OperatorController
learnerChecker *checker.LearnerChecker
replicaChecker *checker.ReplicaChecker
ruleChecker *checker.RuleChecker
mergeChecker *checker.MergeChecker
jointStateChecker *checker.JointStateChecker
regionWaitingList cache.Cache
}
从检察控制模块CheckerController 定义能够看出来,这个模块包含一系列用于不同目的检察调度模块,如果每类检察发现问题,会生成对应的调度操作。例如replicaChecker 模块,这个模块主要用于检察复制数,如果发现问题,生成调度操作,并通过stream发现到对应的TIKV,最终让region达到用户期望副本数。由于每类检察调度模块,基本框架一致,我们只要分析一个检察控制模块,其它依次推理分析,本文由于篇幅不会全部展开.
2.1.1 模块 LearnerChecker
初始化,以及模块定义

learnerChecker: checker.NewLearnerChecker(cluster)

// LearnerChecker ensures region has a learner will be promoted.
type LearnerChecker struct {
cluster opt.Cluster
}

// NewLearnerChecker creates a learner checker.
func NewLearnerChecker(cluster opt.Cluster) *LearnerChecker {
return &LearnerChecker{
cluster: cluster,
}
}

2.1.2LearnerChecker生成调度操作operator流程分析
LearnerChecker调用chek 函数分析特定的region, 如果发现这个region存在learner角色,并且这个learner角色,满足升级到voter角色要求,那么就会创建一个operator,用于升级这个region的learndr角色的副本到voter角色,从而让这个副本能够参与后期选举。
// Check verifies a region’s role, creating an Operator if need.
func (l *LearnerChecker) Check(region *core.RegionInfo) *operator.Operator {
for _, p := range region.GetLearners() {
op, err := operator.CreatePromoteLearnerOperator(“promote-learner”, l.cluster, region, p)
continue
}
return op
}
return nil
}

重点分析函数CreatePromoteLearnerOperator:
op, err := operator.CreatePromoteLearnerOperator(“promote-learner”, l.cluster, region, p)

// CreatePromoteLearnerOperator creates an operator that promotes a learner.
func CreatePromoteLearnerOperator(desc string, cluster opt.Cluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, cluster, region).
PromoteLearner(peer.GetStoreId()).
Build(0)
}
我们下面重点分析模块Builder模块,这个模块用于针对region生成相应operator
build 模块定义如下

// Builder is used to create operators. Usage:
// op, err := NewBuilder(desc, cluster, region).
// RemovePeer(store1).
// AddPeer(peer1).
// SetLeader(store2).
// Build(kind)
// The generated Operator will choose the most appropriate execution order
// according to various constraints.
type Builder struct {
// basic info
desc string
cluster opt.Cluster
regionID uint64
regionEpoch *metapb.RegionEpoch
rules []*placement.Rule
expectedRoles map[uint64]placement.PeerRoleType

	// operation record
	originPeers         peersMap
	unhealthyPeers      peersMap
	originLeaderStoreID uint64
	targetPeers         peersMap
	targetLeaderStoreID uint64
	err                 error

	// skip origin check flags
	skipOriginJointStateCheck bool

	// build flags
	allowDemote       bool
	useJointConsensus bool
	lightWeight       bool
	forceTargetLeader bool

	// intermediate states
	currentPeers                         peersMap
	currentLeaderStoreID                 uint64
	toAdd, toRemove, toPromote, toDemote peersMap       // pending tasks.
	steps                                []OpStep       // generated steps.
	peerAddStep                          map[uint64]int // record at which step a peer is created.

	// comparison function
	stepPlanPreferFuncs []func(stepPlan) int // for buildStepsWithoutJointConsensus
}

Build 模块初始化过程如下
1.根据参数region 对象,抽取出这个region对应的所有peer成员存放store,处于Pending状态存入的store,当前的leader所在的store,
// NewBuilder creates a Builder.
func NewBuilder(desc string, cluster opt.Cluster, region *core.RegionInfo, opts …BuilderOption) *Builder {
b := &Builder{
desc: desc,
cluster: cluster,
regionID: region.GetID(),
regionEpoch: region.GetRegionEpoch(),
}

	// options
	for _, option := range opts {
		option(b)
	}

	// origin peers
	err := b.err
	originPeers := newPeersMap()
	unhealthyPeers := newPeersMap()

	for _, p := range region.GetPeers() {
		originPeers.Set(p)
	}

	for _, p := range region.GetPendingPeers() {
		unhealthyPeers.Set(p)
	}

	for _, p := range region.GetDownPeers() {
		unhealthyPeers.Set(p.Peer)
	}

	// origin leader
	originLeaderStoreID := region.GetLeader().GetStoreId()
	b.rules = rules
	b.originPeers = originPeers
	b.unhealthyPeers = unhealthyPeers
	b.originLeaderStoreID = originLeaderStoreID
	b.targetPeers = originPeers.Copy()
	b.allowDemote = supportJointConsensus
	return b
}

下一步分析build的PromoteLearner方法,这个方法目的:根据你想要升级的learner peer,初始化相关build 成员 ,也就是修改build用于存放目标peer的角色 ,从之前leaner 角色,修改成metapb.PeerRole_Voter
// CreatePromoteLearnerOperator creates an operator that promotes a learner.
func CreatePromoteLearnerOperator(desc string, cluster opt.Cluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, cluster, region).
PromoteLearner(peer.GetStoreId()).
Build(0)
}

// PromoteLearner records a promote learner operation in Builder.
func (b *Builder) PromoteLearner(storeID uint64) *Builder {

		b.targetPeers.Set(&metapb.Peer{
			Id:      peer.GetId(),
			StoreId: peer.GetStoreId(),
			Role:    metapb.PeerRole_Voter,
		}

	return b
}

下一步高用build的Build(0)方法,这个方法,会根据build 成员的状态,生成相应的调试操作,原理其实也很简单,build记录两部分信息,一部分信息对应原始的region 信息,另一部分对应region期望的region目标状态信息。然后两组信息对比,最终生成调度操作(用于修正region到目标状态)。

// Build creates the Operator.
func (b *Builder) Build(kind OpKind) (*Operator, error) {
var brief string
if brief, b.err = b.prepareBuild(); b.err != nil {
return nil, b.err
}

		kind, b.err = b.buildStepsWithoutJointConsensus(kind)
	}
	

	return NewOperator(b.desc, brief, b.regionID, b.regionEpoch, kind, b.steps...), nil
}

两组信息对比算法如下,我们重点关注leaner到follower角色的转换.
b.originPeers 保存的region原始状态信息
b.targetPeers 保存的region期望的目标状态信息

在我们的例子里,如果 通过b.originPeers 发现 原始的peer是leaner, b.targetPeers对应peer的期望状态是voter角色。那么我们会保存一条记录,表示那个store的peer角色由learner升级到voter角色
// Initialize intermediate states.
// TODO: simplify the code
func (b *Builder) prepareBuild() (string, error) {
b.toPromote = newPeersMap()
// Diff originPeers and targetPeers to initialize toAdd, toRemove, toPromote, toDemote.
for _, o := range b.originPeers {
n := b.targetPeers[o.GetStoreId()]
if core.IsLearner(o) {
if !core.IsLearner(n) {
// learner -> voter
b.toPromote.Set(n)
}
}
}

	b.peerAddStep = make(map[uint64]int)

	return b.brief(), nil
}

每个调度操作,分很多steps,下面这个函数用于生成调度操作的steps

// Some special cases, and stores that do not support using joint consensus.
func (b *Builder) buildStepsWithoutJointConsensus(kind OpKind) (OpKind, error) {
for len(b.toPromote) > 0 {
plan := b.peerPlan()

		if plan.promote != nil {
			b.execPromoteLearner(plan.promote)
		}
	}	
	return kind, nil
}

调用b.peerPlan生成对应step paln, 最终基于stepplan生成操作步骤,

func (b *Builder) peerPlan() stepPlan {
if p := b.planPromotePeer(); !p.IsEmpty() {
return p
}
}

func (b *Builder) planPromotePeer() stepPlan {
for _, i := range b.toPromote.IDs() {
peer := b.toPromote[i]
return stepPlan{promote: peer}
}
}

type stepPlan struct {
promote *metapb.Peer
}
基于stepplan生成具体的steps。
PromoteLearner表示promote调度步骤,stote表示leaner对应的stroe,peerID表示对应peer
if plan.promote != nil {
b.execPromoteLearner(plan.promote)
}

func (b *Builder) execPromoteLearner(peer *metapb.Peer) {
b.steps = append(b.steps, PromoteLearner{ToStore: peer.GetStoreId(), PeerID: peer.GetId()})
}

// PromoteLearner is an OpStep that promotes a region learner peer to normal voter.
type PromoteLearner struct {
ToStore, PeerID uint64
}

基于以上步骤,调用build 函数生成opeartor,代码如下:
// CreatePromoteLearnerOperator creates an operator that promotes a learner.
func CreatePromoteLearnerOperator(desc string, cluster opt.Cluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, cluster, region).
PromoteLearner(peer.GetStoreId()).
Build(0)
}

// Build creates the Operator.
func (b *Builder) Build(kind OpKind) (*Operator, error) {
return NewOperator(b.desc, brief, b.regionID, b.regionEpoch, kind, b.steps…), nil
}

// NewOperator creates a new operator.
func NewOperator(desc, brief string, regionID uint64, regionEpoch *metapb.RegionEpoch, kind OpKind, steps …OpStep) *Operator {
return &Operator{
desc: desc,
brief: brief,
regionID: regionID,
regionEpoch: regionEpoch,
kind: kind,
steps: steps,
stepsTime: make([]int64, len(steps)),
status: NewOpStatusTracker(),
level: level,
AdditionalInfos: make(map[string]string),
}
}

限制调度速度的模块opController
模块定义
opController := schedule.NewOperatorController(ctx, cluster, hbStreams)

// NewOperatorController creates a OperatorController.
func NewOperatorController(ctx context.Context, cluster opt.Cluster, hbStreams *hbstream.HeartbeatStreams) *OperatorController {
return &OperatorController{
ctx: ctx,
cluster: cluster,
operators: make(map[uint64]*operator.Operator),
hbStreams: hbStreams,
histories: list.New(),
counts: make(map[operator.OpKind]uint64),
opRecords: NewOperatorRecords(ctx),
storesLimit: make(map[uint64]map[storelimit.Type]*storelimit.StoreLimit),
wop: NewRandBuckets(),
wopStatus: NewWaitingOperatorStatus(),
opNotifierQueue: make(operatorQueue, 0),
}
}

// OperatorController is used to limit the speed of scheduling.
type OperatorController struct {
sync.RWMutex
ctx context.Context
cluster opt.Cluster
operators map[uint64]*operator.Operator
hbStreams *hbstream.HeartbeatStreams
histories *list.List
counts map[operator.OpKind]uint64
opRecords *OperatorRecords
storesLimit map[uint64]map[storelimit.Type]*storelimit.StoreLimit
wop WaitingOperator
wopStatus *WaitingOperatorStatus
opNotifierQueue operatorQueue
}

这个模块的核心功能,就是控制把opeartor 发送到TIKV的速度,所以我们找出opcontrol发送opeartor到tikv的函数:
// Dispatch is used to dispatch the operator of a region.
func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) {
// Check existed operator.
if op := oc.GetOperator(region.GetID()); op != nil {
// Update operator status:
// The operator status should be STARTED.
// Check will call CheckSuccess and CheckTimeout.
step := op.Check(region)

		switch op.Status() {
		case operator.STARTED:
			oc.SendScheduleCommand(region, step, source)
		}
	}
}

oc.SendScheduleCommand(region, step, source),针对指定region对应opeartor的step到tikv节点,前面分析我们的step对应PromoteLearner,也就是上升leaner到voter。生成heatbeatResponse对象,这个对象会通过region的心跳请求回复给这个tikv.从而推动tikv升级region peer 到voter
// SendScheduleCommand sends a command to the region.
func (oc *OperatorController) SendScheduleCommand(region *core.RegionInfo, step operator.OpStep, source string) {
var cmd *pdpb.RegionHeartbeatResponse
switch st := step.(type) {
case operator.PromoteLearner:
cmd = &pdpb.RegionHeartbeatResponse{
ChangePeer: &pdpb.ChangePeer{
// reuse AddNode type
ChangeType: eraftpb.ConfChangeType_AddNode,
Peer: &metapb.Peer{
Id: st.PeerID,
StoreId: st.ToStore,
Role: metapb.PeerRole_Voter,
},
},
}

	oc.hbStreams.SendMsg(region, cmd)
}

第三个部分,分析tikv和pd心跳流处理

用于处理心跳信息模块hbStreams
心跳回复信息发送,这个函数会把消息发送到hbstreams的chanel 。用于消息的异常发送
oc.hbStreams.SendMsg(region, cmd)

// SendMsg sends a message to related store.
func (s *HeartbeatStreams) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHeartbeatResponse) {

	select {
	case s.msgCh <- msg:
	case <-s.hbStreamCtx.Done():
	}
}

hbstream模块,会启动一个后台服务线程,用于定期从channel 模块里取出消息,处理发送到相应的tikv
func (s *HeartbeatStreams) run() {
for {
select {

		case msg := <-s.msgCh:
			storeID := msg.GetTargetPeer().GetStoreId()
			storeLabel := strconv.FormatUint(storeID, 10)
			store := s.storeInformer.GetStore(storeID)

			storeAddress := store.GetAddress()
			if stream, ok := s.streams[storeID]; ok {
				if err := stream.Send(msg); 
			}

}

HeartbeatStreams模块通过一个map结构管理和其它TIKV结点的流,这个数据结构的数据更新,通过每次心跳grpc 处理更新
/ RegionHeartbeat implements gRPC PDServer.

func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
server := &heartbeatServer{stream: stream}
s.hbStreams.BindStream(storeID, server)
}

type heartbeatServer struct {
stream pdpb.PD_RegionHeartbeatServer
closed int32
}

heartbeatServer是pdpb.PD_RegionHeartbeatServer包装器
func (s *heartbeatServer) Send(m *pdpb.RegionHeartbeatResponse) error {

	done := make(chan error, 1)
	go func() { done <- s.stream.Send(m) }()
	select {
	case err := <-done:
		return errors.WithStack(err)
	case <-time.After(regionHeartbeatSendTimeout):
		atomic.StoreInt32(&s.closed, 1)
		return errors.WithStack(errSendRegionHeartbeatTimeout)
	}
}