PD api基础框架源码分析

PD提供两类API,一类是restful api ,一例是grpc 服务。这两类服务都依赖于ETCD。也就是依赖ETCD实例提供restful以及grpc服务。我们要做的就是启动ETCD的时候,参数配置里设置以下两类参数:

// Config holds the arguments for configuring an etcd
// server.
type Config struct {
// UserHandlers is for registering users handlers and only used
// for
// embedding etcd into other applications.
// The map key is the route path for the handler, and
// you must ensure it can’t be conflicted with etcd’s.
UserHandlers map[string]http.Handler json:"-"
// ServiceRegister is for registering users’ gRPC services. A
// simple usage example:
// cfg := embed.NewConfig()
// cfg.ServerRegister = func(s *grpc.Server) {
// pb.RegisterFooServer(s, &fooServer{})
// pb.RegisterBarServer(s, &barServer{})
// }
// embed.StartEtcd(cfg)
ServiceRegister func(*grpc.Server) json:"-"
}

ETCD里这两个参数初始代码如下:

// CreateServer creates the UNINITIALIZED pd server with
// given configuration.
func CreateServer(ctx context.Context, cfg *config.Config,
serviceBuilders …HandlerBuilder) (*Server, error) {
s := &Server{
cfg: cfg,
persistOptions: config.NewPersistOptions(cfg),
member: &member.Member{},
ctx: ctx,
startTimestamp: time.Now().Unix(),
}
// Adjust etcd config.
etcdCfg, err := s.cfg.GenEmbedEtcdConfig()
if len(serviceBuilders) != 0 {
userHandlers, err := combineBuilderServerHTTPService(ctx,
s, serviceBuilders…)
etcdCfg.UserHandlers = userHandlers
}
etcdCfg.ServiceRegister = func(gs *grpc.Server) {
pdpb.RegisterPDServer(gs, s)
diagnosticspb.RegisterDiagnosticsServer(gs, s)
}
}

先分析 combineBuilderServerHTTPService(ctx, s, serviceBuilders…),这个函数生成UserHanders 对象,用于提供restful api 服务
这个函数接受HandlerBuilder参数,返回一个map结构,key对应api path, value对应http.handle 用于处理restful 对应请求的处理逻辑。

这个函数本身没什么特别的,就是基于ServiceGroup信息,给一类API引入异常恢复中间件,依赖于第三方库(github.com/urfave/negroni)

func combineBuilderServerHTTPService(ctx context.Context, svr *Server, serviceBuilders …HandlerBuilder) (map[string]http.Handler, error) {
}

combineBuilderServerHTTPService的重点是参数serviceBuilders ,这是一个slice,元素类型是函数定义,ServiceGroup 用于生成对应的api的path,http.Handle用于restful 函数请求处理
// HandlerBuilder builds a server HTTP handler.
type HandlerBuilder func(context.Context, *Server) (http.Handler, ServiceGroup, error)

下面我们分析serviceBuilders参数实例化代码,
serviceBuilders := []server.HandlerBuilder{api.NewHandler,
swaggerserver.NewHandler, autoscaling.NewHandler}
serviceBuilders = append(serviceBuilders,
dashboard.GetServiceBuilders()…)
svr, err := server.CreateServer(ctx, cfg, serviceBuilders…)
serviceBuilders 是一个slice,每一个元素都是一个函数定义,返回api 处理handle,以及用于决定api path的数据结构serviceGroup.

我们重点分析slice元素api.NewHandler,然后一通都通,其它api定义也是类似框架:
// NewHandler creates a HTTP handler for API.
func NewHandler(ctx context.Context, svr *server.Server) (http.Handler, server.ServiceGroup, error) {
group := server.ServiceGroup{
Name: “core”,
IsCore: true,
}
router := mux.NewRouter()
r := createRouter(ctx, apiPrefix, svr)
router.PathPrefix(apiPrefix).Handler(negroni.New(
serverapi.NewRuntimeServiceValidator(svr, group),
serverapi.NewRedirector(svr),
negroni.Wrap®),
)

	return router, group, nil
}

重点分析createRouter(ctx, apiPrefix, svr),这个函数生成一个route,包含restful api的path以及处理函数

func createRouter(ctx context.Context, prefix string, svr *server.Server) *mux.Router {
operatorHandler := newOperatorHandler(handler, rd)
apiRouter.HandleFunc("/operators", operatorHandler.List).Methods(“GET”)
apiRouter.HandleFunc("/operators", operatorHandler.Post).Methods(“POST”)
apiRouter.HandleFunc("/operators/{region_id}", operatorHandler.Get).Methods(“GET”)
apiRouter.HandleFunc("/operators/{region_id}", operatorHandler.Delete).Methods(“DELETE”)

schedulerHandler := newSchedulerHandler(svr, rd)
apiRouter.HandleFunc("/schedulers", schedulerHandler.List).Methods("GET")
apiRouter.HandleFunc("/schedulers", schedulerHandler.Post).Methods("POST")
apiRouter.HandleFunc("/schedulers/{name}", schedulerHandler.Delete).Methods("DELETE")
apiRouter.HandleFunc("/schedulers/{name}", schedulerHandler.PauseOrResume).Methods("POST") 


}

有意思的是如下代码

router.PathPrefix(apiPrefix).Handler(negroni.New(
serverapi.NewRuntimeServiceValidator(svr,
group),
serverapi.NewRedirector(svr),
negroni.Wrap®),
)
上面代码hander接受一个negroni创建的handle对象,这个对象可以理解服务中间件拦截器,也就是任何一个api请求会依次经过前面两个hanlde拦截,才会被业务route 处理。我们重点关注第二拦截器:
serverapi.NewRedirector(svr)。这个拦截器也很简单,主要检察当前接受请求的pd 实例,是否是leader,如果不是,那么这个pd作为服务转发方,给pd leader转发restful api 请求

func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) {
r.Header.Set(RedirectorHeader, h.s.Name())

	leader := h.s.GetMember().GetLeader()
	if leader == nil {
		http.Error(w, "no leader", http.StatusServiceUnavailable)
		return
	}

	urls, err := config.ParseUrls(strings.Join(leader.GetClientUrls(), ","))

	client := h.s.GetHTTPClient()
	NewCustomReverseProxies(client, urls).ServeHTTP(w, r)
}

下面分析grpc服务,主要通过ETCD 参数注册grpc服务,下面的代码表明Server对象提供grpc服务

etcdCfg.ServiceRegister = func(gs *grpc.Server) {
pdpb.RegisterPDServer(gs, s)
diagnosticspb.RegisterDiagnosticsServer(gs, s)
}

我们先关注一下grpc提供服务api,然后后面分析每个模块细节时,一个个api开展深入分析

type PDServer interface {
// GetMembers get the member list of this cluster. It does not require
// the cluster_id in request matchs the id of this cluster.
GetMembers(context.Context, *GetMembersRequest) (*GetMembersResponse, error)
Tso(PD_TsoServer) error
Bootstrap(context.Context, *BootstrapRequest) (*BootstrapResponse, error)
IsBootstrapped(context.Context, *IsBootstrappedRequest) (*IsBootstrappedResponse, error)
AllocID(context.Context, *AllocIDRequest) (*AllocIDResponse, error)
GetStore(context.Context, *GetStoreRequest) (*GetStoreResponse, error)
PutStore(context.Context, *PutStoreRequest) (*PutStoreResponse, error)
GetAllStores(context.Context, *GetAllStoresRequest) (*GetAllStoresResponse, error)
StoreHeartbeat(context.Context, *StoreHeartbeatRequest) (*StoreHeartbeatResponse, error)
RegionHeartbeat(PD_RegionHeartbeatServer) error
GetRegion(context.Context, *GetRegionRequest) (*GetRegionResponse, error)
GetPrevRegion(context.Context, *GetRegionRequest) (*GetRegionResponse, error)
GetRegionByID(context.Context, *GetRegionByIDRequest) (*GetRegionResponse, error)
ScanRegions(context.Context, *ScanRegionsRequest) (*ScanRegionsResponse, error)
AskSplit(context.Context, *AskSplitRequest) (*AskSplitResponse, error)
ReportSplit(context.Context, *ReportSplitRequest) (*ReportSplitResponse, error)
AskBatchSplit(context.Context, *AskBatchSplitRequest) (*AskBatchSplitResponse, error)
ReportBatchSplit(context.Context, *ReportBatchSplitRequest) (*ReportBatchSplitResponse, error)
GetClusterConfig(context.Context, *GetClusterConfigRequest) (*GetClusterConfigResponse, error)
PutClusterConfig(context.Context, *PutClusterConfigRequest) (*PutClusterConfigResponse, error)
ScatterRegion(context.Context, *ScatterRegionRequest) (*ScatterRegionResponse, error)
GetGCSafePoint(context.Context, *GetGCSafePointRequest) (*GetGCSafePointResponse, error)
UpdateGCSafePoint(context.Context, *UpdateGCSafePointRequest) (*UpdateGCSafePointResponse, error)
UpdateServiceGCSafePoint(context.Context, *UpdateServiceGCSafePointRequest) (*UpdateServiceGCSafePointResponse, error)
SyncRegions(PD_SyncRegionsServer) error
GetOperator(context.Context, *GetOperatorRequest) (*GetOperatorResponse, error)
SyncMaxTS(context.Context, *SyncMaxTSRequest) (*SyncMaxTSResponse, error)
SplitRegions(context.Context, *SplitRegionsRequest) (*SplitRegionsResponse, error)
GetDCLocationInfo(context.Context, *GetDCLocationInfoRequest) (*GetDCLocationInfoResponse, error)
}