Kubernetes源码学习-Controller-P1-多实例leader选举

﹏ヽ暗。殇╰゛Y 2023-06-20 15:53 87阅读 0赞

P1-多实例leader选举.md

前言

K8s源码分析完整系列一键直达:k8s源码笔记-项目地址,若有帮助,欢迎star,谢谢~

Kubernetes多master场景下,核心组件都是以一主多从的模式来运行的,在前面scheduler部分的文章中,并没有分析其主从选举及工作的流程,那么在本篇中,以controller为例,单独作一篇分析组件之间主从工作模式。

入口

如scheduler一样,controller的cmd启动也是借助的cobra,对cobra不了解可以回到前面的文章中查看,这里不再赘述,直接顺着入口找到启动函数:

==> cmd/kube-controller-manager/controller-manager.go:38

command := app.NewControllerManagerCommand()

==> cmd/kube-controller-manager/app/controllermanager.go:109

Run(c.Complete(), wait.NeverStop)

==> cmd/kube-controller-manager/app/controllermanager.go:153

func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {}

入口函数就在这里,代码块中已分段注释:

  1. func Run(c *config.CompletedConfig, stopCh <-chan struct{ }) error {
  2. ...
  3. // 篇幅有限,省略部分代码
  4. // 启动kube-controller的http服务
  5. // Start the controller manager HTTP server
  6. // unsecuredMux is the handler for these controller *after* authn/authz filters have been applied
  7. var unsecuredMux *mux.PathRecorderMux
  8. if c.SecureServing != nil {
  9. unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)
  10. handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
  11. // TODO: handle stoppedCh returned by c.SecureServing.Serve
  12. if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
  13. return err
  14. }
  15. }
  16. if c.InsecureServing != nil {
  17. unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)
  18. insecureSuperuserAuthn := server.AuthenticationInfo{ Authenticator: &server.InsecureSuperuser{ }}
  19. handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, nil, &insecureSuperuserAuthn)
  20. if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil {
  21. return err
  22. }
  23. }
  24. // 启动controller工作的run函数,特别标注,会作为回调函数在leader选举成功后执行
  25. run := func(ctx context.Context) {
  26. rootClientBuilder := controller.SimpleControllerClientBuilder{
  27. ClientConfig: c.Kubeconfig,
  28. }
  29. var clientBuilder controller.ControllerClientBuilder
  30. if c.ComponentConfig.KubeCloudShared.UseServiceAccountCredentials {
  31. if len(c.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
  32. // It'c possible another controller process is creating the tokens for us.
  33. // If one isn't, we'll timeout and exit when our client builder is unable to create the tokens.
  34. klog.Warningf("--use-service-account-credentials was specified without providing a --service-account-private-key-file")
  35. }
  36. clientBuilder = controller.SAControllerClientBuilder{
  37. ClientConfig: restclient.AnonymousClientConfig(c.Kubeconfig),
  38. CoreClient: c.Client.CoreV1(),
  39. AuthenticationClient: c.Client.AuthenticationV1(),
  40. Namespace: "kube-system",
  41. }
  42. } else {
  43. clientBuilder = rootClientBuilder
  44. }
  45. controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
  46. if err != nil {
  47. klog.Fatalf("error building controller context: %v", err)
  48. }
  49. saTokenControllerInitFunc := serviceAccountTokenControllerStarter{ rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
  50. if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
  51. klog.Fatalf("error starting controllers: %v", err)
  52. }
  53. controllerContext.InformerFactory.Start(controllerContext.Stop)
  54. close(controllerContext.InformersStarted)
  55. select { }
  56. }
  57. if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
  58. run(context.TODO())
  59. panic("unreachable")
  60. }
  61. id, err := os.Hostname()
  62. if err != nil {
  63. return err
  64. }
  65. // add a uniquifier so that two processes on the same host don't accidentally both become active
  66. id = id + "_" + string(uuid.NewUUID())
  67. rl, err := resourcelock.New(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
  68. "kube-system",
  69. "kube-controller-manager",
  70. c.LeaderElectionClient.CoreV1(),
  71. c.LeaderElectionClient.CoordinationV1(),
  72. resourcelock.ResourceLockConfig{
  73. Identity: id,
  74. EventRecorder: c.EventRecorder,
  75. })
  76. if err != nil {
  77. klog.Fatalf("error creating lock: %v", err)
  78. }
  79. // 主从选举从这里开始
  80. leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
  81. Lock: rl,
  82. LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
  83. RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
  84. RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
  85. Callbacks: leaderelection.LeaderCallbacks{
  86. // 回调函数,选举成功后,主工作节点开始运行上方的工作run函数
  87. OnStartedLeading: run,
  88. OnStoppedLeading: func() {
  89. klog.Fatalf("leaderelection lost")
  90. },
  91. },
  92. WatchDog: electionChecker,
  93. Name: "kube-controller-manager",
  94. })
  95. panic("unreachable")
  96. }

从这里可以看到,选举成为主领导节点后,才会进入工作流程,先跳过具体的工作流程,来看看leaderelection的选举过程

选举

选举入口

==> cmd/kube-controller-manager/app/controllermanager.go:252

leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{}

  1. func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
  2. le, err := NewLeaderElector(lec)
  3. if err != nil {
  4. panic(err)
  5. }
  6. // 加载检查leader健康状态的http接口
  7. if lec.WatchDog != nil {
  8. lec.WatchDog.SetLeaderElection(le)
  9. }
  10. // 开始进入选举
  11. le.Run(ctx)
  12. }

==> vendor/k8s.io/client-go/tools/leaderelection/leaderelection.go:196

le.Run(ctx)

  1. // Run starts the leader election loop
  2. func (le *LeaderElector) Run(ctx context.Context) {
  3. defer func() {
  4. runtime.HandleCrash()
  5. le.config.Callbacks.OnStoppedLeading()
  6. }()
  7. // 1.acquire是竞选函数,如果选举执行失败直接返回
  8. if !le.acquire(ctx) {
  9. return // ctx signalled done
  10. }
  11. ctx, cancel := context.WithCancel(ctx)
  12. defer cancel()
  13. // 2.竞选成功则另起一个线程,执行上面特别标注的run工作函数,即controller的工作循环
  14. go le.config.Callbacks.OnStartedLeading(ctx)
  15. // 3.刷新leader状态函数
  16. le.renew(ctx)
  17. }

这个函数里包含多个defer和return,这里额外备注一下defer和return的执行先后顺序:

  1. 1.多个defer是以栈结构保存的,后入先出,下文的defer先执行
  2. 2.returndefer之后执行
  3. 3.触发return条件后,return上下文的所有defer中,下文的defer不会被执行

这个函数这里,大概可以看出选举执行的逻辑:

1.选举成功者,开始执行run()函数,即controller的工作函数。同时提供leader状态健康检查的api

2.选举失败者,会结束选举程序。但watchDog会持续运行,监测leader的健康状态

3.选举成功者,在之后会持续刷新自己的leader状态信息

竞选函数:

vendor/k8s.io/client-go/tools/leaderelection/leaderelection.go:212

  1. // acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
  2. // Returns false if ctx signals done.
  3. // 选举者开始循环执行申请,若申请leader成功则返回true,若申请leader失败则进入循环状态,每间隔一段时间再申请一次
  4. func (le *LeaderElector) acquire(ctx context.Context) bool {
  5. ctx, cancel := context.WithCancel(ctx)
  6. defer cancel()
  7. succeeded := false
  8. desc := le.config.Lock.Describe()
  9. klog.Infof("attempting to acquire leader lease %v...", desc)
  10. // 进入循环申请leader状态,JitterUntil是一个定时循环功能的函数
  11. wait.JitterUntil(func() {
  12. // 申请或刷新leader函数
  13. succeeded = le.tryAcquireOrRenew()
  14. le.maybeReportTransition()
  15. if !succeeded {
  16. klog.V(4).Infof("failed to acquire lease %v", desc)
  17. return
  18. }
  19. le.config.Lock.RecordEvent("became leader")
  20. le.metrics.leaderOn(le.config.Name)
  21. klog.Infof("successfully acquired lease %v", desc)
  22. // 选举成功后,执行cancel()从定时循环函数中跳出来,返回成功结果
  23. cancel()
  24. }, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
  25. return succeeded
  26. }

定时执行函数

来看下定时循环函数JitterUntil的代码:

vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:130

  1. func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{ }) {
  2. var t *time.Timer
  3. var sawTimeout bool
  4. for {
  5. select {
  6. case <-stopCh:
  7. return
  8. default:
  9. }
  10. jitteredPeriod := period
  11. if jitterFactor > 0.0 {
  12. jitteredPeriod = Jitter(period, jitterFactor)
  13. }
  14. // sliding代表是否将f()的执行时间计算在间隔之内
  15. // 若执行间隔将f()的执行时间包含在内,则在f()开始之前就启动计时器
  16. if !sliding {
  17. t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
  18. }
  19. func() {
  20. defer runtime.HandleCrash()
  21. f()
  22. }()
  23. // 若执行间隔不将f()的执行时间包含在内,则在f()执行完成之后再启动计时器
  24. if sliding {
  25. t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
  26. }
  27. // 在这里,select的case没有优先级之分,因此,可能跳过stop判断,所以,在for loop的前面,也加入了一次stop判断,防止重复执行。
  28. select {
  29. case <-stopCh:
  30. return
  31. // 到达
  32. case <-t.C:
  33. sawTimeout = true
  34. }
  35. }
  36. }
  37. // resetOrReuseTimer avoids allocating a new timer if one is already in use.
  38. // Not safe for multiple threads.
  39. func resetOrReuseTimer(t *time.Timer, d time.Duration, sawTimeout bool) *time.Timer {
  40. if t == nil {
  41. return time.NewTimer(d)
  42. }
  43. // timer首次启动时,先将t.C channel内的值都取出来,避免channel消费方hang住
  44. if !t.Stop() && !sawTimeout {
  45. <-t.C
  46. }
  47. // 定时器重置
  48. t.Reset(d)
  49. return t
  50. }

k8s定时任务用的是非常原生的time.timer()来实现的,t.C本质上还是一个channel struct {},消费方运用select来触发到达指定计时间隔后,消费消息,进入下一次循环。

这里关于select结合channel的用法说明进行以下备注:

  1. select中,代码逻辑执行步骤如下:
  2. 1.检查每个case代码块
  3. 2.如果存在一个case代码块下有数据产生,执行对应case下的内容
  4. 3.如果多个case代码块下有数据产生,随机选取一个case并执行对应内容,无优先级之分
  5. 4.如果有default代码块,在没有任何case产生数据时,执行default代码块对应内容
  6. 5.如果default之后的代码为空,此时也没有任何case产生数据,则跳出select继续执行下文
  7. 6.如果任何一个case代码块都没有数据产生或代码上下文,同时也没有default,则select阻塞等待

关于go time.Timer,这里有一篇文章讲得很好:

https://tonybai.com/2016/12/21/how-to-use-timer-reset-in-golang-correctly/

申请/刷新leader函数

vendor/k8s.io/client-go/tools/leaderelection/leaderelection.go:293

  1. // tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
  2. // else it tries to renew the lease if it has already been acquired. Returns true
  3. // on success else returns false.
  4. // 在初次选举、后续间隔刷新状态 这两处地方都会调用这个函数
  5. // 如果参选者不是leader则尝试选举,如果已经是leader,则尝试续约租期,最后刷新信息
  6. func (le *LeaderElector) tryAcquireOrRenew() bool {
  7. now := metav1.Now()
  8. leaderElectionRecord := rl.LeaderElectionRecord{
  9. HolderIdentity: le.config.Lock.Identity(),
  10. LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
  11. RenewTime: now,
  12. AcquireTime: now,
  13. }
  14. // 1. obtain or create the ElectionRecord
  15. // 第1步:获取当前的leader的竞选记录,如果当前还没有leader记录,则创建
  16. // 首先获取当前的leader记录
  17. oldLeaderElectionRecord, err := le.config.Lock.Get()
  18. if err != nil {
  19. if !errors.IsNotFound(err) {
  20. klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
  21. return false
  22. }
  23. if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
  24. klog.Errorf("error initially creating leader election record: %v", err)
  25. return false
  26. }
  27. le.observedRecord = leaderElectionRecord
  28. le.observedTime = le.clock.Now()
  29. return true
  30. }
  31. // 第2步,对比观察记录里的leader与当前实际的leader
  32. // 2. Record obtained, check the Identity & Time
  33. if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) {
  34. // 如果参选者的上一次观察记录中的leader,不是当前leader,则修改记录,以当前leader为准
  35. le.observedRecord = *oldLeaderElectionRecord
  36. le.observedTime = le.clock.Now()
  37. }
  38. if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
  39. // 如果参选者不是当前的leader,且当前leader的任期尚未结束,则返回false,参选者选举失败
  40. le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
  41. !le.IsLeader() {
  42. klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
  43. return false
  44. }
  45. // 3. We're going to try to update. The leaderElectionRecord is set to it's default
  46. // here. Let's correct it before updating.
  47. if le.IsLeader() {
  48. // 如果参选者就是当前的leader本身,则修改记录里的当选时间变为它此前的当选时间,而不是本次时间,变更次数维持不变
  49. leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
  50. leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
  51. } else {
  52. // 如果参选者不是leader(则说明当前leader在任期已经结束,但并未续约),则当前参选者变更为新的leader
  53. leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
  54. }
  55. // update the lock itself
  56. // 更新leader信息,更新leader锁,返回true选举过程顺利完成
  57. if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
  58. klog.Errorf("Failed to update lock: %v", err)
  59. return false
  60. }
  61. le.observedRecord = leaderElectionRecord
  62. le.observedTime = le.clock.Now()
  63. return true
  64. }

这一段代码中有多个leader记录信息相关的变量,很容易混淆,为了便于理解这里抽出来说明下:

  1. LeaderElector # 参选者,每一个controller进程都会参与leader选举
  2. oldLeaderElectionRecord # 本次选举开始前,leader锁中记载的当前leader
  3. leaderElectionRecord # 本次选举的leader记录,最终会更新进入新的leader锁中
  4. observedRecord # 每个参选者都会定期观察当前的leader信息,记录在自身的这个字段中

先来看第1步中是怎么获取当前leader记录的:

vendor/k8s.io/client-go/tools/leaderelection/resourcelock/leaselock.go:39

  1. // Get returns the election record from a Lease spec
  2. func (ll *LeaseLock) Get() (*LeaderElectionRecord, error) {
  3. var err error
  4. // 1.取得lease对象
  5. ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ll.LeaseMeta.Name, metav1.GetOptions{ })
  6. if err != nil {
  7. return nil, err
  8. }
  9. // 2.将lease.spec转为LeaderElectionRecord记录并返回
  10. return LeaseSpecToLeaderElectionRecord(&ll.lease.Spec), nil
  11. }

取得lease对象的方法在这里:

vendor/k8s.io/client-go/kubernetes/typed/coordination/v1/lease.go:66

func (c *leases) Get(name string, options metav1.GetOptions) (result *v1.Lease, err error) {}

转换并返回的LeaderElectionRecord结构体是这样的:

  1. LeaderElectionRecord{
  2. HolderIdentity: holderIdentity, // leader持有标识
  3. LeaseDurationSeconds: leaseDurationSeconds, // 选举间隔
  4. AcquireTime: metav1.Time{ spec.AcquireTime.Time}, // 选举成为leader的时间
  5. RenewTime: metav1.Time{ spec.RenewTime.Time}, // 续任时间
  6. LeaderTransitions: leaseTransitions, // leader位置的转接次数
  7. }

对返回的LeaderElectionRecord进行对比,如果是自身,则续约,如果不是自身,则看leader是否过期,对leader lock信息相应处理。

刷新选举状态函数

vendor/k8s.io/client-go/tools/leaderelection/leaderelection.go:234

  1. func (le *LeaderElector) renew(ctx context.Context) {
  2. ctx, cancel := context.WithCancel(ctx)
  3. defer cancel()
  4. wait.Until(func() {
  5. timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
  6. defer timeoutCancel()
  7. // 间隔刷新leader状态,成功则续约,不成功则释放
  8. err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
  9. done := make(chan bool, 1)
  10. go func() {
  11. defer close(done)
  12. done <- le.tryAcquireOrRenew()
  13. }()
  14. select {
  15. case <-timeoutCtx.Done():
  16. return false, fmt.Errorf("failed to tryAcquireOrRenew %s", timeoutCtx.Err())
  17. case result := <-done:
  18. return result, nil
  19. }
  20. }, timeoutCtx.Done())
  21. le.maybeReportTransition()
  22. desc := le.config.Lock.Describe()
  23. if err == nil {
  24. klog.V(5).Infof("successfully renewed lease %v", desc)
  25. return
  26. }
  27. le.config.Lock.RecordEvent("stopped leading")
  28. le.metrics.leaderOff(le.config.Name)
  29. klog.Infof("failed to renew lease %v: %v", desc, err)
  30. cancel()
  31. }, le.config.RetryPeriod, ctx.Done())
  32. // if we hold the lease, give it up
  33. if le.config.ReleaseOnCancel {
  34. le.release()
  35. }
  36. }

tryAcquireOrRenew()和循环间隔执行函数同上面所讲基本一致,这里就不再说明了。

总结

组件选举大致可以概括为以下流程:

  • 初始时,各实例均为LeaderElector,最先开始选举的,成为leader,成为工作实例。同时它会维护一份信息(leader lock)供各个LeaderElector探测,包括状态信息、健康监控接口等。
  • 其余LeaderElector,进入热备状态,监控leader的运行状态,异常时会再次参与选举
  • leader在运行中会间隔持续刷新自身的leader状态。

不止于controller,其余的几个组件,主从之间的工作关系也应当是如此。

感谢阅读,欢迎指正

发表评论

表情:
评论列表 (有 0 条评论,87人围观)

还没有评论,来说两句吧...

相关阅读