PD 关于tso 分配源代码分析

tsoAllocatorManager分析
TSO 有一个分配管理模块tsoAllocatorManager ,这个模块用于管理所有的TSO分配,当前PD支持两类TSO,一类是global TSO分配 用于保证全局事务线性增长。 还有一种是DC-LOCATION TSO分配器,用于保证特定DC内的事务线性增长。

tsoAllocatorManager 模块的初始化以及定义:
s.tsoAllocatorManager = tso.NewAllocatorManager(
s.member, s.rootPath, s.cfg.TSOSaveInterval.Duration, s.cfg.TSOUpdatePhysicalInterval.Duration,
func() time.Duration { return s.persistOptions.GetMaxResetTSGap() },
s.GetTLSConfig())

// NewAllocatorManager creates a new TSO Allocator Manager.
func NewAllocatorManager(
m *member.Member,
rootPath string,
saveInterval time.Duration,
updatePhysicalInterval time.Duration,
maxResetTSGap func() time.Duration,
sc *grpcutil.TLSConfig,
) *AllocatorManager {
allocatorManager := &AllocatorManager{
member: m,
rootPath: rootPath,
saveInterval: saveInterval,
updatePhysicalInterval: updatePhysicalInterval,
maxResetTSGap: maxResetTSGap,
securityConfig: sc,
}
allocatorManager.mu.allocatorGroups = make(map[string]*allocatorGroup)
allocatorManager.mu.clusterDCLocations = make(map[string]*DCLocationInfo)
allocatorManager.localAllocatorConn.clientConns = make(map[string]*grpc.ClientConn)
return allocatorManager
}

// AllocatorManager is used to manage the TSO Allocators a PD server holds.
// It is in charge of maintaining TSO allocators’ leadership, checking election
// priority, and forwarding TSO allocation requests to correct TSO Allocators.
type AllocatorManager struct {
mu struct {
sync.RWMutex
// There are two kinds of TSO Allocators:
// 1. Global TSO Allocator, as a global single point to allocate
// TSO for global transactions, such as cross-region cases.
// 2. Local TSO Allocator, servers for DC-level transactions.
// dc-location/global (string) -> TSO Allocator
allocatorGroups map[string]*allocatorGroup
clusterDCLocations map[string]*DCLocationInfo
// The max suffix sign we have so far, it will be used to calculate
// the number of suffix bits we need in the TSO logical part.
maxSuffix int32
}
wg sync.WaitGroup
// for election use
member *member.Member
// TSO config
rootPath string
saveInterval time.Duration
updatePhysicalInterval time.Duration
maxResetTSGap func() time.Duration
securityConfig *grpcutil.TLSConfig
// for gRPC use
localAllocatorConn struct {
sync.RWMutex
clientConns map[string]*grpc.ClientConn
}
}

allocatorGroups map[string]*allocatorGroup, 当前PD支持两类TSO分配,一个是全局TSO分配器,别一个是DC-Location级别的TSO分配器,allocatorGroups的含义:
key: 对应DC-location,一个global,别一类是不同的dc-location信息
allocatorGroup:对应具体的TSO 分配器,
leadership: 用于tso 分配器的选主,如果是gloal 分配器,
leadership就是pd leadership定义,如果是dc-location的TSO分
配器,leadership就是dc-location的选主,用于在DC级别内产生
线性TSO 。
allocator:用于TSO的分配,两类对象,对于global TSO分配,对象是
GlobalTSOAllocator, 对于DC TSO分配,对象是LocalTSOAllocator

type allocatorGroup struct {
dcLocation string
// For the Global TSO Allocator, leadership is a PD leader’s
// leadership, and for the Local TSO Allocator, leadership
// is a DC-level certificate to allow an allocator to generate
// TSO for local transactions in its DC.
leadership *election.Leadership
allocator Allocator
}

clusterDCLocations map[string]*DCLocationInfo:
Key:对应DC-LOATION
DCLocationInfo:记录这个DC的信息,例如这个DC对应的suffix,以及这个DC内所有的
server ID
// DCLocationInfo is used to record some dc-location related info,
// such as suffix sign and server IDs in this dc-location.
type DCLocationInfo struct {
// dc-location/global (string) -> Member IDs
ServerIDs []uint64
// dc-location (string) -> Suffix sign. It is collected and maintained by the PD leader.
Suffix int32
}

为什么每个dc-location 元信息有一个suffix,他的目的是什么?
因为每个dc内的tso分配是完全独立的分配的,所以多个TSO DC级别的TSO分配的值有可
能相等,所以让每个DC-LOCATION对应一个后缀(不同DC的后缀值不一样),这个后缀
会放在TSO的尾部,用于保证每个dc-location tso值不相同. 为了保证每个DC 的suffix不
一样,所有DC对应的suffix的值由PD leader维护,并保存在ETCD持久化

 例子如下

我们有三个dc:dc-1,dc-2,dc-3.suffix的位数由常量GetSuffixBits函数决定(原理也很简单,当前sufiix最大值是多少,然后算出变个最大值对应的bit位),这个例子里,suffix对应8个bit位。对于gloabl,所以为suffix 为0,对于dc-1,sufuffix是1;dc-2,suffix是2, dc-3,对应的suffix是3. 所以他们的TSO(一共18位)对应如下值

// global: xxxxxxxxxx00000000
// dc-1: xxxxxxxxxx00000001
// dc-2: xxxxxxxxxx00000010
// dc-3: xxxxxxxxxx00000011

// GetSuffixBits calculates the bits of suffix sign
// by the max number of suffix so far,
// which will be used in the TSO logical part.
func (am *AllocatorManager) GetSuffixBits() int {
am.mu.RLock()
defer am.mu.RUnlock()
return CalSuffixBits(am.mu.maxSuffix)
}

TSO 具体分配模块分析
GlobalTSOAllocator
// GlobalTSOAllocator is the global single point TSO allocator.
type GlobalTSOAllocator struct {
// leadership is used to check the current PD server’s leadership
// to determine whether a TSO request could be processed.
leadership *election.Leadership
timestampOracle *timestampOracle
}

// timestampOracle is used to maintain the logic of TSO.
type timestampOracle struct {
client *clientv3.Client
rootPath string
// TODO: remove saveInterval
saveInterval time.Duration
updatePhysicalInterval time.Duration
maxResetTSGap func() time.Duration
// tso info stored in the memory
tsoMux struct {
sync.RWMutex
tso *tsoObject
}
// last timestamp window stored in etcd
lastSavedTime atomic.Value // stored as time.Time
suffix int
dcLocation string
}

Global tso 分配核心算法分析:
这个函数用于产生TSO 值,分二类情况,
1.cluster没有管理dc tso 分配器,整个cluster 只有一个全局的tso 分配器,那么TSO分配很简
单,直接调用gta.timestampOracle.getTS(gta.leadership, count, 0)
2.如果cluster同时管理多个DC 级别的TSO控制器,TSO 的分配会稍显麻烦一些,也就是他的最
大值,基于所有的DC-LOCATION 内的TSO最大值,所以要多次和所有的DC TSO LEADER
通信,导致性能下降明显

// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) {

    检察当前leader 是否有效
if !gta.leadership.Check() {
	return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested pd %s of cluster", errs.NotLeaderErr))
}

如果有dc 级的tso分配器,获得所有的dc-location 的信息,例如有多少dc,每个dc 的suffix以及属于这个dc的server id
// To check if we have any dc-location configured in the cluster
dcLocationMap := gta.allocatorManager.GetClusterDCLocations()


如果没有dc 级的tso分配器,那么很简单,调用gta.timestampOracle.getTS(gta.leadership, count, 0)获得TSO值 
// No dc-locations configured in the cluster, use the normal TSO generation way.
if len(dcLocationMap) == 0 {
	return gta.timestampOracle.getTS(gta.leadership, count, 0)
}

     
  如果你配置了dc tso 分配器,并且想使用全局的TSO分配器,就会导致性能下降比较多,因为需要给所有的dc local tso leader 发送 maxTS。这里面隐含性能问题。具体过程如下:
1.给所有的Local TSO allocator leader发收请求,用于得到当前全局最大的tso max 值
    gta.SyncMaxTS(ctx, dcLocationMap, maxTSO)

    2.如果逻辑部分+COUNT,超过逻辑部分上限,那么物理部分+1,逻辑部分直接等于count
3.调用SyncMaxTS,把最大TSO时间,同步给所有的local tso 分配器
// Have dc-locations configured in the cluster, use the Global TSO generation way.
// Send maxTS to all Local TSO Allocator leaders to prewrite


maxTSO := &pdpb.Timestamp{}
// 1. Collect the MaxTS with all Local TSO Allocator leaders first
if err := gta.SyncMaxTS(ctx, dcLocationMap, maxTSO); err != nil {
	return pdpb.Timestamp{}, err
}
// 2. Add the count and make sure its logical part won't overflow after being differentiated.
suffixBits := gta.allocatorManager.GetSuffixBits()
gta.preprocessLogical(maxTSO, count, suffixBits)
// 3. Sync the MaxTS with all Local TSO Allocator leaders then
if err := gta.SyncMaxTS(ctx, dcLocationMap, maxTSO); err != nil {
	return pdpb.Timestamp{}, err
}
// 4. Persist MaxTS into memory, and etcd if needed
var (
	currentGlobalTSO pdpb.Timestamp
	err              error
)
if currentGlobalTSO, err = gta.getCurrentTSO(); err != nil {
	return pdpb.Timestamp{}, err
}
if tsoutil.CompareTimestamp(&currentGlobalTSO, maxTSO) < 0 {
	// Update the Global TSO in memory
	if err := gta.timestampOracle.resetUserTimestamp(gta.leadership, tsoutil.GenerateTS(maxTSO), true); err != nil {
		log.Warn("update the global tso in memory failed", errs.ZapError(err))
	}
}
// 5.Differentiate the logical part to make the TSO unique globally by giving it a unique suffix in the whole cluster
maxTSO.Logical = gta.timestampOracle.differentiateLogical(maxTSO.Logical, suffixBits)
return *maxTSO, nil

}

如果global没有DC loccation,那么直接调用 gta.timestampOracle.getTS(gta.leadership, count, 0) 得到tso 值,逻辑很简单,
// getTS is used to get a timestamp.
func (t *timestampOracle) getTS(leadership *election.Leadership, count uint32, suffixBits int) (pdpb.Timestamp, error) {
var resp pdpb.Timestamp

for i := 0; i < maxRetryCount; i++ {
	currentPhysical, currentLogical := t.getTSO()

	// Get a new TSO result with the given count
	resp.Physical, resp.Logical = t.generateTSO(int64(count), suffixBits)
	if resp.GetPhysical() == 0 {
		return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory has been reset")
	}

	// In case lease expired after the first check.
	if !leadership.Check() {
		return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("not the pd or local tso allocator leader")
	}
	resp.SuffixBits = uint32(suffixBits)
	return resp, nil
}
return resp, errs.ErrGenerateTimestamp.FastGenByArgs("maximum number of retries exceeded")

}

// generateTSO will add the TSO’s logical part with the given count and returns the new TSO result.
func (t *timestampOracle) generateTSO(count int64, suffixBits int) (physical int64, logical int64) {
t.tsoMux.Lock()
defer t.tsoMux.Unlock()
if t.tsoMux.tso == nil {
return 0, 0
}
physical = t.tsoMux.tso.physical.UnixNano() / int64(time.Millisecond)
t.tsoMux.tso.logical += count
logical = t.tsoMux.tso.logical
if suffixBits > 0 && t.suffix >= 0 {
logical = t.differentiateLogical(logical, suffixBits)
}
return physical, logical
}

LocalTSOAllocator
TSO分配算法在lta.timestampOracle.getTS(lta.leadership, count, lta.allocatorManager.GetSuffixBits()里,这个函数之前分析过
// LocalTSOAllocator is the DC-level local TSO allocator,
// which is only used to allocate TSO in one DC each.
// One PD server may hold multiple Local TSO Allocators.
type LocalTSOAllocator struct {
allocatorManager *AllocatorManager
// leadership is used to campaign the corresponding DC’s Local TSO Allocator.
leadership *election.Leadership
timestampOracle *timestampOracle
// for election use, notice that the leadership that member holds is
// the leadership for PD leader. Local TSO Allocator’s leadership is for the
// election of Local TSO Allocator leader among several PD servers and
// Local TSO Allocator only use member’s some etcd and pbpd.Member info.
// So it’s not conflicted.
rootPath string
allocatorLeader atomic.Value // stored as *pdpb.Member
}

// NewLocalTSOAllocator creates a new local TSO allocator.
func NewLocalTSOAllocator(
am *AllocatorManager,
leadership *election.Leadership,
dcLocation string,
) Allocator {
return &LocalTSOAllocator{
allocatorManager: am,
leadership: leadership,
timestampOracle: &timestampOracle{
client: leadership.GetClient(),
rootPath: leadership.GetLeaderKey(),
saveInterval: am.saveInterval,
updatePhysicalInterval: am.updatePhysicalInterval,
maxResetTSGap: am.maxResetTSGap,
dcLocation: dcLocation,
},
rootPath: leadership.GetLeaderKey(),
}
}

// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
func (lta *LocalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) {
if !lta.leadership.Check() {
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf(“requested pd %s of %s allocator”, errs.NotLeaderErr, lta.timestampOracle.dcLocation))
}
return lta.timestampOracle.getTS(lta.leadership, count, lta.allocatorManager.GetSuffixBits())
}

TSO 核心流程分析
1.创建tso 分配管理组件
s.tsoAllocatorManager = tso.NewAllocatorManager(
s.member, s.rootPath, s.cfg.TSOSaveInterval.Duration, s.cfg.TSOUpdatePhysicalInterval.Duration,
func() time.Duration { return s.persistOptions.GetMaxResetTSGap() },
s.GetTLSConfig())

2.配置TSO global 组件. 也就是设置一个数据元素,am.mu.allocatorGroups[dcLocation]
key是global. value是对象allocatorGroup 用于后期gloabl tso 分配
s.tsoAllocatorManager.SetUpAllocator(ctx, tso.GlobalDCLocation, s.member.GetLeadership())

func (am *AllocatorManager) SetUpAllocator(parentCtx context.Context, dcLocation string, leadership *election.Leadership) {

创建global tso 分配组件,leadership就是pd 的leadership
allocator = NewGlobalTSOAllocator(am, leadership)

// Create a new allocatorGroup
ctx, cancel := context.WithCancel(parentCtx)
am.mu.allocatorGroups[dcLocation] = &allocatorGroup{
	dcLocation: dcLocation,
	ctx:        ctx,
	cancel:     cancel,
	leadership: leadership,
	allocator:  allocator,
}

}

如果PD的zone lable非空,并且设置支持dc-location tso分配的话,配置dc-location TSO分配.
SetLocalTSOConfig函数的目的很简单,保存这个pd的dc-location到etcd. 方便后台服务线程查询 ETCD,配置dc-location的tso分配组件
key:am.member.GetDCLocationPath(serverID),包含这个server ID
value:dcLocation.表示这个PD对应的location

ClusterDCLocationChecker函数分析:
1.获取所有的保存在ETCD里的DC-LOCATION以及属于这个DC的serverid ,更新
am.mu.clusterDCLocations[dcLocation]
2.如果pd是leader,更新每个dc leader对应的suffix,以及更新所有dc 里最大的suffix
3.如果pd 是follow,只要更新从etcd得到最大的suffix,然后更新am.mu.maxSuffix = maxSuffix
if zone, exist := s.cfg.Labels[config.ZoneLabel]; exist && zone != “” && s.cfg.EnableLocalTSO {
if err = s.tsoAllocatorManager.SetLocalTSOConfig(zone); err != nil {
return err
}
}
// SetLocalTSOConfig receives the zone label of this PD server and write it into etcd as dc-location
// to make the whole cluster know the DC-level topology for later Local TSO Allocator campaign.
func (am *AllocatorManager) SetLocalTSOConfig(dcLocation string) error {
serverName := am.member.Member().Name
serverID := am.member.ID()

	// The key-value pair in etcd will be like: serverID -> dcLocation
	dcLocationKey := am.member.GetDCLocationPath(serverID)
	resp, err := kv.
		NewSlowLogTxn(am.member.Client()).
		Then(clientv3.OpPut(dcLocationKey, dcLocation)).
		Commit()
	
	go am.ClusterDCLocationChecker()
	return nil
}

后台服务管理
Server服务会启动tso后台服务管理线程,
func (s *Server) startServerLoop(ctx context.Context) {
go s.tsoAllocatorLoop()
}

// tsoAllocatorLoop is used to run the TSO Allocator updating daemon.
func (s *Server) tsoAllocatorLoop() {
s.tsoAllocatorManager.AllocatorDaemon(ctx)
log.Info(“server is closed, exit allocator loop”)
}

// AllocatorDaemon is used to update every allocator’s TSO and check whether we have
// any new local allocator that needs to be set up.
func (am *AllocatorManager) AllocatorDaemon(serverCtx context.Context) {
tsTicker := time.NewTicker(am.updatePhysicalInterval)

	patrolTicker := time.NewTicker(patrolStep)
	
	checkerTicker := time.NewTicker(PriorityCheck)


	for {
		select {
		case <-tsTicker.C:
			am.allocatorUpdater()
		case <-patrolTicker.C:
			am.allocatorPatroller(serverCtx)
		case <-checkerTicker.C:
			// ClusterDCLocationChecker and PriorityChecker are time consuming and low frequent to run,
			// we should run them concurrently to speed up the progress.
			go am.ClusterDCLocationChecker()
			go am.PriorityChecker()
		case <-serverCtx.Done():
			return
		}
	}
}

am.allocatorUpdater()分析,
这个函数每隔am.updatePhysicalInterval 调用这个函数. 这个函数很简单,针对每一个global和所有的dc-location 的TSO分配器,调用UpdateTSO(),检察内存中TSO使用情况,触发保存新的TSO 值 到ETCD里面。 例如每次存储当前时间+3秒后TSO值,如果后面的时间接近ETCD保存的值时,更新存储时间(再加3S)。 别外如果逻辑时间快用完了,增加内存里对应的物理时间。

UpdateTSO()详细分析,这个函数分二类,global以及DC级别:
global 对应处理函数:gta.timestampOracle.UpdateTimestamp(gta.leadership)

 DC 对应处理函数:lta.timestampOracle.UpdateTimestamp(lta.leadership)

// Update the Local TSO Allocator leaders TSO in memory concurrently.
func (am *AllocatorManager) allocatorUpdater() {

	// Update each allocator concurrently
	for _, ag := range allocatorGroups {
		am.wg.Add(1)
		go am.updateAllocator(ag)
	}
	am.wg.Wait()
}

// updateAllocator is used to update the allocator in the group.
func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) {
ag.allocator.UpdateTSO();
}

Globale UpdateTSO
// UpdateTSO is used to update the TSO in memory and the time window in etcd.
func (gta *GlobalTSOAllocator) UpdateTSO() error {
return gta.timestampOracle.UpdateTimestamp(gta.leadership)
}

DC update TSO
// UpdateTSO is used to update the TSO in memory and the time window in etcd
// for all local TSO allocators this PD server hold.
func (lta *LocalTSOAllocator) UpdateTSO() error {
return lta.timestampOracle.UpdateTimestamp(lta.leadership)
}

1.得到当前TSO物理以及逻辑时间
2.如果当前时间和上次物理时间差值大于updateTimestampGuard,那么物理时间等于now
3.如果逻辑时间超过最大值的一半,物理时间+1m
4.如果上次ETCD保存的时间和当前时间相差updateTimestampGuard 。那么更新ETCD存储时间,
func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error {
prevPhysical, prevLogical := t.getTSO()

now := time.Now()


jetLag := typeutil.SubTimeByWallClock(now, prevPhysical)


var next time.Time
// If the system time is greater, it will be synchronized with the system time.
if jetLag > updateTimestampGuard {
	next = now
} else if prevLogical > maxLogical/2 {
	// The reason choosing maxLogical/2 here is that it's big enough for common cases.
	// Because there is enough timestamp can be allocated before next update.
	log.Warn("the logical time may be not enough", zap.Int64("prev-logical", prevLogical))
	next = prevPhysical.Add(time.Millisecond)
} else {
	// It will still use the previous physical time to alloc the timestamp.
	tsoCounter.WithLabelValues("skip_save", t.dcLocation).Inc()
	return nil
}

// It is not safe to increase the physical time to `next`.
// The time window needs to be updated and saved to etcd.
if typeutil.SubTimeByWallClock(t.lastSavedTime.Load().(time.Time), next) <= updateTimestampGuard {
	save := next.Add(t.saveInterval)
	if err := t.saveTimestamp(leadership, save); err != nil {
		tsoCounter.WithLabelValues("err_save_update_ts", t.dcLocation).Inc()
		return err
	}
}
// save into memory
t.setTSOPhysical(next)

return nil

}

am.allocatorPatroller(serverCtx)分析,
这个函数每隔patrolStep时间调用这个函数
如果有新的dc加入,配置这个DC的TSO分配器。如果有DC退出,也相应删除这个DC TSO分配器。原理也很简单,前面我们也描述,如果有新的PD 启动,并且配置DC TSO分配,我们会在ETCD保存这个KEY-VALUE(KEY 是server id,value是dc-location)
如果发现新的DC加入,调用am.SetUpAllocator配置dc tso 分配器

所以下面重点分析am.SetUpAllocator(前面我们也分析过这个函数,但是前面重点分析是的global的配置)。注意leadership没有重用pd的leadship,而是创建一个新的leadership,path等于am.getAllocatorPath(dcLocation) 表示这个dc-location范围内的leaer选主
am.SetUpAllocator 创建这个dc对应的allocatorGroup,然后DC内的leader选主。

am.allocatorLeaderLoop(parentCtx, localTSOAllocator)
// Check if we have any new dc-location configured, if yes,
// then set up the corresponding local allocator.
func (am *AllocatorManager) allocatorPatroller(serverCtx context.Context) {
// Collect all dc-locations
dcLocations := am.GetClusterDCLocations()
// Get all Local TSO Allocators
allocatorGroups := am.getAllocatorGroups(FilterDCLocation(GlobalDCLocation))
// Set up the new one
for dcLocation := range dcLocations {
if slice.NoneOf(allocatorGroups, func(i int) bool {
return allocatorGroups[i].dcLocation == dcLocation
}) {
am.SetUpAllocator(serverCtx, dcLocation, election.NewLeadership(
am.member.Client(),
am.getAllocatorPath(dcLocation),
fmt.Sprintf("%s local allocator leader election", dcLocation),
))
}
}
// Clean up the unused one
for _, ag := range allocatorGroups {
if _, exist := dcLocations[ag.dcLocation]; !exist {
am.deleteAllocatorGroup(ag.dcLocation)
}
}
}

// SetUpAllocator is used to set up an allocator, which will initialize the allocator and put it into allocator daemon.
// One TSO Allocator should only be set once, and may be initialized and reset multiple times depending on the election.
func (am *AllocatorManager) SetUpAllocator(parentCtx context.Context, dcLocation string, leadership *election.Leadership) {
var allocator Allocator

allocator = NewLocalTSOAllocator(am, leadership, dcLocation)

// Create a new allocatorGroup
ctx, cancel := context.WithCancel(parentCtx)
am.mu.allocatorGroups[dcLocation] = &allocatorGroup{
	dcLocation: dcLocation,
	ctx:        ctx,
	cancel:     cancel,
	leadership: leadership,
	allocator:  allocator,
}

// Start election of the Local TSO Allocator here
localTSOAllocator, _ := allocator.(*LocalTSOAllocator)
go am.allocatorLeaderLoop(parentCtx, localTSOAllocator)

}

allocatorLeaderLoop
1.检察leader是否存在,如果leader存在,那么watch 这个leader,直到leader 无效
2.检察是否有nextid key,这个值非常有意思,如果有nextkey,那么下面参与选主的pd 一定要等于nextkey,不然退出选主(一般用于dc tso leadr 切换操作)
3.给PD leader发送dc 信息,让leader知道有这个DC TSO
4.am.campaignAllocatorLeader 触发选主
4.1 调用 allocator.CampaignAllocatorLeader(defaultAllocatorLeaderLease, cmps…)往ETCD写选主信息,如果成功写入,表示选主成功
4.2 调用go allocator.KeepAllocatorLeader(ctx) 持续续约leaer key,保证leader有效
4.3 调用allocator.Initialize(int(dcLocationInfo.Suffix))初始化local tso 内存值,根据ETCD上次保存的值,以及当前时间算出内存TSO值

// similar logic with leaderLoop in server/server.go
func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *LocalTSOAllocator) {

for {
	
	// Check whether the Local TSO Allocator has the leader already
	allocatorLeader, rev, checkAgain := allocator.CheckAllocatorLeader()
	if checkAgain {
		continue
	}
	if allocatorLeader != nil {
		log.Info("start to watch allocator leader",
			zap.Stringer(fmt.Sprintf("%s-allocator-leader", allocator.GetDCLocation()), allocatorLeader),
			zap.String("local-tso-allocator-name", am.member.Member().Name))
		// WatchAllocatorLeader will keep looping and never return unless the Local TSO Allocator leader has changed.
		allocator.WatchAllocatorLeader(ctx, allocatorLeader, rev)
		log.Info("local tso allocator leader has changed, try to re-campaign a local tso allocator leader",
			zap.String("dc-location", allocator.GetDCLocation()))
	}

	// Check the next-leader key
	nextLeader, err := am.getNextLeaderID(allocator.GetDCLocation())
	if err != nil {
		log.Error("get next leader from etcd failed",
			zap.String("dc-location", allocator.GetDCLocation()),
			errs.ZapError(err))
		time.Sleep(200 * time.Millisecond)
		continue
	}
	isNextLeader := false
	if nextLeader != 0 {
		if nextLeader != am.member.ID() {
			log.Info("skip campaigning of the local tso allocator leader and check later",
				zap.String("server-name", am.member.Member().Name),
				zap.Uint64("server-id", am.member.ID()),
				zap.Uint64("next-leader-id", nextLeader))
			time.Sleep(200 * time.Millisecond)
			continue
		}
		isNextLeader = true
	}

	// Make sure the leader is aware of this new dc-location in order to make the
	// Global TSO synchronization can cover up this dc-location.
	ok, dcLocationInfo, err := am.getDCLocationInfoFromLeader(ctx, allocator.GetDCLocation())

	if !ok || dcLocationInfo.Suffix <= 0 {
		log.Warn("pd leader is not aware of dc-location during allocatorLeaderLoop, wait next round",
			zap.String("dc-location", allocator.GetDCLocation()),
			zap.Any("dc-location-info", dcLocationInfo),
			zap.String("wait-duration", checkStep.String()))
		// Because the checkStep is long, we use select here to check whether the ctx is done
		// to prevent the leak of goroutine.
		if !longSleep(ctx, checkStep) {
			return
		}
		continue
	}

	am.campaignAllocatorLeader(ctx, allocator, dcLocationInfo, isNextLeader)
}

}

func (am *AllocatorManager) campaignAllocatorLeader(
loopCtx context.Context,
allocator *LocalTSOAllocator,
dcLocationInfo *pdpb.GetDCLocationInfoResponse,
isNextLeader bool,
) {

cmps := make([]clientv3.Cmp, 0)
nextLeaderKey := am.nextLeaderKey(allocator.GetDCLocation())
if !isNextLeader {
	cmps = append(cmps, clientv3.Compare(clientv3.CreateRevision(nextLeaderKey), "=", 0))
} else {
	nextLeaderValue := fmt.Sprintf("%v", am.member.ID())
	cmps = append(cmps, clientv3.Compare(clientv3.Value(nextLeaderKey), "=", nextLeaderValue))
}


if err := allocator.CampaignAllocatorLeader(defaultAllocatorLeaderLease, cmps...); err != nil {
	if err.Error() == errs.ErrEtcdTxnConflict.Error() {
	
	} else {

	}
	return
}



// Maintain the Local TSO Allocator leader
go allocator.KeepAllocatorLeader(ctx)

if err := allocator.Initialize(int(dcLocationInfo.Suffix)); err != nil {

	return
}
if dcLocationInfo.GetMaxTs().GetPhysical() != 0 {
	if err := allocator.WriteTSO(dcLocationInfo.GetMaxTs()); err != nil {
		log.Error("failed to write the max local TSO after member changed",
			zap.String("dc-location", allocator.GetDCLocation()),
			zap.Any("dc-location-info", dcLocationInfo),
			errs.ZapError(err))
		return
	}
}
am.compareAndSetMaxSuffix(dcLocationInfo.Suffix)
allocator.EnableAllocatorLeader()

}

am.ClusterDCLocationChecker()
这个函数每隔PriorityCheck时间调用这个函数,这个函数前面分析过,原理也很简单,同步ETCD和同步中关于DCLOCATION的信息,该更新更新,该删除删除
// ClusterDCLocationChecker collects all dc-locations of a cluster, computes some related info
// and stores them into the DCLocationInfo, then finally writes them into am.mu.clusterDCLocations.
func (am *AllocatorManager) ClusterDCLocationChecker() {
newClusterDCLocations, err := am.GetClusterDCLocationsFromEtcd()

	// May be used to rollback the updating after
	newDCLocations := make([]string, 0)
	// Update the new dc-locations
	for dcLocation, serverIDs := range newClusterDCLocations {
		if _, ok := am.mu.clusterDCLocations[dcLocation]; !ok {
			am.mu.clusterDCLocations[dcLocation] = &DCLocationInfo{
				ServerIDs: serverIDs,
				Suffix:    -1,
			}
			newDCLocations = append(newDCLocations, dcLocation)
		}
	}
	// Only leader can write the TSO suffix to etcd in order to make it consistent in the cluster
	if am.member.IsLeader() {
		for dcLocation, info := range am.mu.clusterDCLocations {
			if info.Suffix > 0 {
				continue
			}
			suffix, err := am.getOrCreateLocalTSOSuffix(dcLocation)
			if err != nil {
				
				continue
			}
			if suffix > am.mu.maxSuffix {
				am.mu.maxSuffix = suffix
			}
			am.mu.clusterDCLocations[dcLocation].Suffix = suffix
		}
	} else {
		// Follower should check and update the am.mu.maxSuffix
		maxSuffix, err := am.getMaxLocalTSOSuffix()
		
			am.mu.maxSuffix = maxSuffix
		
	}
	am.mu.Unlock()
}

am.PriorityChecker()
这个函数每隔PriorityCheck时间调用这个函数

这个函数很简单,检察local tso 分配器选主优化级,如果我们想选取dc-1的local tso 的主,优化级规则如下,
1.有lable “dc-location="dc-1” 具有最高的优秀级被选为主
2.如果所有具有“dc-location="dc-1” 的pd都down了,才轮到其它dc的pd被选为主

例如DC-1发现dc-1的leader 所属于的dc-location不是DC-1,那么触发leader切换,在ETCD 里创建 nextkey,告诉选主,我期望下一个dc local tso 分配器的主是我自已
func (am *AllocatorManager) PriorityChecker() {
serverID := am.member.ID()
myServerDCLocation, err := am.getServerDCLocation(serverID)

// Check all Local TSO Allocator followers to see if their priorities is higher than the leaders
// Filter out allocators with leadership and initialized
allocatorGroups := am.getAllocatorGroups(FilterDCLocation(GlobalDCLocation), FilterAvailableLeadership())
for _, allocatorGroup := range allocatorGroups {
	localTSOAllocator, _ := allocatorGroup.allocator.(*LocalTSOAllocator)
	leaderServerID := localTSOAllocator.GetAllocatorLeader().GetMemberId()

	leaderServerDCLocation, err := am.getServerDCLocation(leaderServerID)
	if err != nil {
	rr))
		continue
	}
	// For example, an allocator leader for dc-1 is elected by a server of dc-2, then the server of dc-1 will
	// find this allocator's dc-location isn't the same with server of dc-2 but is same with itself.
	if allocatorGroup.dcLocation != leaderServerDCLocation && allocatorGroup.dcLocation == myServerDCLocation {
		err = am.transferLocalAllocator(allocatorGroup.dcLocation, am.member.ID())
	
	}
}
// Check next leader and resign
// Filter out allocators with leadership
allocatorGroups = am.getAllocatorGroups(FilterDCLocation(GlobalDCLocation), FilterUnavailableLeadership())
for _, allocatorGroup := range allocatorGroups {
	nextLeader, err := am.getNextLeaderID(allocatorGroup.dcLocation)
	if err != nil {
	
		continue
	}
	// nextLeader is not empty and isn't same with the server ID, resign the leader
	if nextLeader != 0 && nextLeader != serverID {
		log.Info("next leader key found, resign current leader", zap.Uint64("nextLeaderID", nextLeader))
		am.ResetAllocatorGroup(allocatorGroup.dcLocation)
	}
}

}