PD 客户端源码分析

客户端对象创建以及初始化
// NewClient creates a PD client.
func NewClient(pdAddrs []string, security SecurityOption, opts …ClientOption) (Client, error) {
return NewClientWithContext(context.Background(), pdAddrs, security, opts…)
}

// newBaseClient returns a new baseClient.
func newBaseClient(ctx context.Context, urls []string, security SecurityOption, opts …ClientOption) (*baseClient, error) {
ctx1, cancel := context.WithCancel(ctx)
c := &baseClient{
urls: urls,
checkLeaderCh: make(chan struct{}, 1),
checkTSODispatcherCh: make(chan struct{}, 1),
}

	c.initRetry(c.initClusterID);
	c.initRetry(c.updateMember);
	
	c.wg.Add(1)
	go c.memberLoop()

	return c, nil
}

client包含子模块basic client ,这个子模块目的很明确,基于用户的请求,尽量发送到pd leader和tso dc-location 目的地。因为部分请求,只能PD leader或者dc-location leader能够处理,如果不是leader收到请求,还是要转发到对应的leader。而basic client就是为了维护两类leader,以及建立连接.

baseClient 子模块分析,主要包含以下信息:
对应pd leader对象
对应follower对象
dc-locatopn对应的leader对象以及grpc
一些channel,checkLeaderCh,checkTSODispatcherCh 。用于两类leader的管理

// baseClient is a basic client for all other complex client.
type baseClient struct {
urls []string
clusterID uint64
// PD leader URL
leader atomic.Value // Store as string
// PD follower URLs
followers atomic.Value // Store as []string
// dc-location -> TSO allocator leader gRPC connection
clientConns sync.Map // Store as map[string]*grpc.ClientConn
// dc-location -> TSO allocator leader URL
allocators sync.Map // Store as map[string]string
checkLeaderCh chan struct{}
checkTSODispatcherCh chan struct{}
}

下面我们详细分析basic client的创建流程,三个核心步骤
调用c.updateMember

func (c *baseClient) updateMember() error {
for _, u := range c.urls {
members, err := c.getMembers(ctx, u)
更新c.clientConns以及c.allocators,用于匹配dc-location到grpc连接
c.switchTSOAllocatorLeader(members.GetTsoAllocatorLeaders());

        更新pd所有成员的url
		c.updateURLs(members.GetMembers())
        
        更新pd follower的urls
		c.updateFollowers(members.GetMembers(), members.GetLeader())
        更新c.leader,指定pd的leader.以及指定dc global对应的leader
		c.switchLeader(members.GetLeader().GetClientUrls()); 
        给c.checkTSODispatcherCh发送空消息,触发check tso
		c.scheduleCheckTSODispatcher()
		return nil
	}
	
}

启动后台服务线程go c.memberLoop()
这个后台服务,每隔1分钟触发一次updateMemer用于更新pd成员角色。或者通过channel触发成员角色更新
func (c *baseClient) memberLoop() {
for {
select {
case <-c.checkLeaderCh:
case <-time.After(time.Minute):
case <-ctx.Done():
return
}
if err := c.updateMember(); err != nil {
log.Error("[pd] failed updateMember", errs.ZapError(err))
}
}
}

Client流程分析:
核心流程分析,当用户创建pd 客户端的时候,触发以下步骤,
1.创建client 对象,包含子模块basic client对象的创建
2.调用c.updateTSODispatcher(),
3.启动一系列后台管理线程

// NewClient creates a PD client.
func NewClient(pdAddrs []string, security SecurityOption, opts …ClientOption) (Client, error) {
return NewClientWithContext(context.Background(), pdAddrs, security, opts…)
}

// NewClientWithContext creates a PD client with context.
func NewClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {

	base, err := newBaseClient(ctx, addrsToUrls(pdAddrs), security, opts...)

	c := &client{
		baseClient:        base,
		checkTSDeadlineCh: make(chan struct{}),
	}

	c.updateTSODispatcher()
	c.wg.Add(3)
	go c.tsLoop()
	go c.tsCancelLoop()
	go c.leaderCheckLoop()

	return c, nil
}

调度每个dc-location,用于请求批量处理发往这个dc-location 的tso 请要求
在client,每个dc对应一个TSODispatcher,dispatcher用于批量发送请求到相应的dc-location leader

func (c *client) updateTSODispatcher() {
// Set up the new TSO dispatcher
c.allocators.Range(func(dcLocationKey, _ interface{}) bool {
dcLocation := dcLocationKey.(string)
if !c.checkTSODispatcher(dcLocation) {
go c.handleDispatcher(dispatcherCtx, dcLocation, tsoRequestCh)
}
return true
})

}

启动后台服务handleDispatcher,这个后台服务调用processTSORequests 用于批量从PD请求一定数目的TSO
go c.handleDispatcher(dispatcherCtx, dcLocation, tsoRequestCh)

func (c *client) handleDispatcher(dispatcherCtx context.Context, dc string, tsoDispatcher chan *tsoRequest) {

for {

	select {
	case first := <-tsoDispatcher:
		pendingPlus1 := len(tsoDispatcher) + 1
		requests[0] = first
		for i := 1; i < pendingPlus1; i++ {
			requests[i] = <-tsoDispatcher
		}
		done := make(chan struct{})
		dl := deadline{
			timer:  time.After(c.timeout),
			done:   done,
			cancel: cancel,
		}
		tsDeadlineCh, ok := c.tsDeadline.Load(dc)

		select {
		case tsDeadlineCh.(chan deadline) <- dl:
		case <-dispatcherCtx.Done():
			return
		}
	
		err = c.processTSORequests(stream, dc, requests[:pendingPlus1], opts)
		
	}
	
}

}

client后台服务
tsLoop流程分析:定期检察或者通过checkTSODispatcherCh 触发是否有新加入的dc-location,如果有的话,调用updateTSODispatcher,用于批量处理发往这个dc-location的tso 请求

func (c *client) tsLoop() {

	ticker := time.NewTicker(tsLoopDCCheckInterval)

	for {
		c.updateTSODispatcher()
		select {
		case <-ticker.C:
		case <-c.checkTSODispatcherCh:
		case <-loopCtx.Done():
			return
		}
	}
}

tsCancelLoop后台服务线程主要用于 tso 请求超时检察,原理比较简单,当我们处理每次TSO请求时,会生成一个对象发送给对应dc-location对应超时处理的channel. 这个对象包含超时检察以及超时处理函数。 然后在tsCancelLoop服务里,我们会watch每个dc-location,watch函数会进入一个loop. 等待channel 收到对象。如果发现对象,进入超时处理,要么超时,取消操作,要么处理完成,接受下一个对象
func (c *client) tsCancelLoop() {

	ticker := time.NewTicker(tsLoopDCCheckInterval)

	for {
		// Watch every dc-location's tsDeadlineCh
		c.allocators.Range(func(dcLocation, _ interface{}) bool {
			c.watchTSDeadline(tsCancelLoopCtx, dcLocation.(string))
			return true
		})
		select {
		case <-c.checkTSDeadlineCh:
			continue
		case <-ticker.C:
			continue
		case <-tsCancelLoopCtx.Done():
			return
		}
	}
}

func (c *client) watchTSDeadline(ctx context.Context, dcLocation string) {
if _, exist := c.tsDeadline.Load(dcLocation); !exist {
tsDeadlineCh := make(chan deadline, 1)
c.tsDeadline.Store(dcLocation, tsDeadlineCh)
go func(dc string, tsDeadlineCh <-chan deadline) {
for {
select {
case d := <-tsDeadlineCh:
select {
case <-d.timer:
log.Error(“tso request is canceled due to timeout”, zap.String(“dc-location”, dc), errs.ZapError(errs.ErrClientGetTSOTimeout))
d.cancel()
case <-d.done:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}(dcLocation, tsDeadlineCh)
}
}

后台leader检察线程,如果发现leader变化,更新相关信息
func (c *client) leaderCheckLoop() {

	ticker := time.NewTicker(LeaderHealthCheckInterval)
	defer ticker.Stop()

	for {
		select {
		case <-c.ctx.Done():
			return
		case <-ticker.C:
			c.checkLeaderHealth(leaderCheckLoopCtx)
		}
	}
}