Kubernetes源码学习-Kubelet-P1-启动流程

矫情吗;* 2023-01-18 09:29 121阅读 0赞

前言

在大致分析过k8s的Scheduler、Controller、APIServer三个控制平面组件后,本篇开始进入数据交互平面的daemon组件kubelet部分,看看kubelet是如何在控制平面和数据平面中以承上启下的模式工作的。

启动流程

启动入口照旧,位于项目的cmd路径下,使用cobra做cmd封装:

cmd/kubelet/kubelet.go:39

  1. func main() {
  2. rand.Seed(time.Now().UnixNano())
  3. command := app.NewKubeletCommand(server.SetupSignalHandler())
  4. logs.InitLogs()
  5. defer logs.FlushLogs()
  6. if err := command.Execute(); err != nil {
  7. fmt.Fprintf(os.Stderr, "%v\n", err)
  8. os.Exit(1)
  9. }
  10. }

cmd/kubelet/app/server.go:112

NewKubeletFlagsNewKubeletConfiguration方法会初始化kubelet的很多默认flag和参数,来分别看下:

cmd/kubelet/app/options/options.go:214

  1. func NewKubeletFlags() *KubeletFlags {
  2. remoteRuntimeEndpoint := ""
  3. if runtime.GOOS == "linux" {
  4. remoteRuntimeEndpoint = "unix:///var/run/dockershim.sock"
  5. } else if runtime.GOOS == "windows" {
  6. remoteRuntimeEndpoint = "npipe:./pipe/dockershim"
  7. }
  8. return &KubeletFlags{
  9. EnableServer: true,
  10. // 容器运行时这个参数需要留意下
  11. ContainerRuntimeOptions: *NewContainerRuntimeOptions(),
  12. CertDirectory: "/var/lib/kubelet/pki",
  13. RootDirectory: defaultRootDir,
  14. MasterServiceNamespace: metav1.NamespaceDefault,
  15. MaxContainerCount: -1,
  16. MaxPerPodContainerCount: 1,
  17. MinimumGCAge: metav1.Duration{ Duration: 0},
  18. NonMasqueradeCIDR: "10.0.0.0/8",
  19. RegisterSchedulable: true,
  20. ExperimentalKernelMemcgNotification: false,
  21. RemoteRuntimeEndpoint: remoteRuntimeEndpoint,
  22. NodeLabels: make(map[string]string),
  23. VolumePluginDir: "/usr/libexec/kubernetes/kubelet-plugins/volume/exec/",
  24. RegisterNode: true,
  25. SeccompProfileRoot: filepath.Join(defaultRootDir, "seccomp"),
  26. HostNetworkSources: []string{ kubetypes.AllSource},
  27. HostPIDSources: []string{ kubetypes.AllSource},
  28. HostIPCSources: []string{ kubetypes.AllSource},
  29. // TODO(#58010:v1.13.0): Remove --allow-privileged, it is deprecated
  30. AllowPrivileged: true,
  31. // prior to the introduction of this flag, there was a hardcoded cap of 50 images
  32. NodeStatusMaxImages: 50,
  33. }
  34. }

ContainerRuntimeOptions有必要看看:

cmd/kubelet/app/options/container_runtime.go:41

  1. func NewContainerRuntimeOptions() *config.ContainerRuntimeOptions {
  2. dockerEndpoint := ""
  3. if runtime.GOOS != "windows" {
  4. // 默认的容器驱动是docker
  5. dockerEndpoint = "unix:///var/run/docker.sock"
  6. }
  7. return &config.ContainerRuntimeOptions{
  8. ContainerRuntime: kubetypes.DockerContainerRuntime,
  9. RedirectContainerStreaming: false,
  10. DockerEndpoint: dockerEndpoint,
  11. // dockershim路径,dockershim是容器运行中的实际载体,每个docker容器都会产生一个shim进程
  12. DockershimRootDirectory: "/var/lib/dockershim",
  13. // pause容器
  14. PodSandboxImage: defaultPodSandboxImage,
  15. ImagePullProgressDeadline: metav1.Duration{ Duration: 1 * time.Minute},
  16. ExperimentalDockershim: false,
  17. // 这个目录下都是网络相关功能工具的执行文件
  18. CNIBinDir: "/opt/cni/bin",
  19. // 这里是cni的配置文件,如pod网段、网关、bridge等,一般由cni动态生成
  20. CNIConfDir: "/etc/cni/net.d",
  21. }
  22. }

cmd/kubelet/app/options/options.go:293

–> cmd/kubelet/app/options/options.go:311

  1. // `NewKubeletConfiguration`方法则会默认设置一些参数
  2. func applyLegacyDefaults(kc *kubeletconfig.KubeletConfiguration) {
  3. // --anonymous-auth
  4. kc.Authentication.Anonymous.Enabled = true
  5. // --authentication-token-webhook
  6. kc.Authentication.Webhook.Enabled = false
  7. // --authorization-mode
  8. // apiserver认证篇提到的针对node设计的AlwaysAllow认证模式
  9. kc.Authorization.Mode = kubeletconfig.KubeletAuthorizationModeAlwaysAllow
  10. // 10255采集信息的接口,如prometheus采集cadvisor的metrics
  11. kc.ReadOnlyPort = ports.KubeletReadOnlyPort
  12. }

再来看看Run方法里面做了哪些操作:

–> cmd/kubelet/app/server.go:148

  1. Run: func(cmd *cobra.Command, args []string) {
  2. ...
  3. // 上面百来行代码都是默认的init flag和config相关处理,例如featureGates等,略过
  4. // 加载kubelet配置文件,展开进去看可以看到即是--config参数对应指定的文件,一般kubeadm部署时使用的是/var/lib/kubelet/config.yaml
  5. if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
  6. kubeletConfig, err = loadConfigFile(configFile)
  7. if err != nil {
  8. klog.Fatal(err)
  9. }
  10. ...
  11. }
  12. // 实例化KubeletServer
  13. kubeletServer := &options.KubeletServer{
  14. KubeletFlags: *kubeletFlags,
  15. KubeletConfiguration: *kubeletConfig,
  16. }
  17. // 构建一些kubelet的依赖插件,例如nsenter,连接dockershim的client端
  18. kubeletDeps, err := UnsecuredDependencies(kubeletServer)
  19. if err != nil {
  20. klog.Fatal(err)
  21. }
  22. // add the kubelet config controller to kubeletDeps
  23. kubeletDeps.KubeletConfigController = kubeletConfigController
  24. // start the experimental docker shim, if enabled
  25. if kubeletServer.KubeletFlags.ExperimentalDockershim {
  26. if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
  27. klog.Fatal(err)
  28. }
  29. return
  30. }
  31. // 启动kubelet
  32. klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
  33. if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {
  34. klog.Fatal(err)
  35. }
  36. },
  37. }
  38. // 下面是一些cmd help信息,省略
  39. ...
  40. return cmd

–> cmd/kubelet/app/server.go:416

–> cmd/kubelet/app/server.go:479 这个函数代码段很长,两百多行,挑主要片段

  1. func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{ }) (err error) {
  2. ...
  3. // 独立模式指的是不与外部(如apiserver)交互的模式,一般在调试中使用,所以独立模式不需要起client
  4. standaloneMode := true
  5. if len(s.KubeConfig) > 0 {
  6. standaloneMode = false
  7. }
  8. if kubeDeps == nil {
  9. kubeDeps, err = UnsecuredDependencies(s)
  10. if err != nil {
  11. return err
  12. }
  13. }
  14. // 取得注册node名
  15. hostName, err := nodeutil.GetHostname(s.HostnameOverride)
  16. if err != nil {
  17. return err
  18. }
  19. nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
  20. if err != nil {
  21. return err
  22. }
  23. switch {
  24. // 独立模式,则所有client设为nil
  25. case standaloneMode:
  26. kubeDeps.KubeClient = nil
  27. kubeDeps.EventClient = nil
  28. kubeDeps.HeartbeatClient = nil
  29. klog.Warningf("standalone mode, no API client")
  30. // 正常模式,则初始化client,包括kubeClient/eventClient/heartBeatClient
  31. case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
  32. // client的配置,主要是连接apiserver的cert相关的配置,cert文件默认放在/var/lib/kubelet/pki下,如果开启了循环续期证书,则相应的异步进程会从cert manager循环检测和更新证书。其他的配置诸如超时时间,长连接时间等。closeAllConns接收的是一个方法,用来断开连接。
  33. clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName)
  34. if err != nil {
  35. return err
  36. }
  37. if closeAllConns == nil {
  38. return errors.New("closeAllConns must be a valid function other than nil")
  39. }
  40. kubeDeps.OnHeartbeatFailure = closeAllConns
  41. // 构建一个client-go里的clientset实例,访问各个GV和GVR对象使用
  42. kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
  43. if err != nil {
  44. return fmt.Errorf("failed to initialize kubelet client: %v", err)
  45. }
  46. // event事件使用独立的client,与上面的访问GVR使用的client区分开
  47. eventClientConfig := *clientConfig
  48. eventClientConfig.QPS = float32(s.EventRecordQPS)
  49. eventClientConfig.Burst = int(s.EventBurst)
  50. kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
  51. if err != nil {
  52. return fmt.Errorf("failed to initialize kubelet event client: %v", err)
  53. }
  54. // 再开启一个心跳检测的client
  55. heartbeatClientConfig := *clientConfig
  56. heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
  57. // 如果开启了NodeLease(node定期向apiserver汇报运行状态),那么心跳间隔最大不超过NodeLease duration
  58. if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
  59. leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
  60. if heartbeatClientConfig.Timeout > leaseTimeout {
  61. heartbeatClientConfig.Timeout = leaseTimeout
  62. }
  63. }
  64. // 心跳1次/s
  65. heartbeatClientConfig.QPS = float32(-1)
  66. kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
  67. if err != nil {
  68. return fmt.Errorf("failed to initialize kubelet heartbeat client: %v", err)
  69. }
  70. }
  71. // 向apiserver发起认证建立会话
  72. if kubeDeps.Auth == nil {
  73. auth, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
  74. if err != nil {
  75. return err
  76. }
  77. kubeDeps.Auth = auth
  78. }
  79. // 填充cadvisor接口
  80. if kubeDeps.CAdvisorInterface == nil {
  81. imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
  82. kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))
  83. if err != nil {
  84. return err
  85. }
  86. }
  87. // Setup event recorder if required.
  88. makeEventRecorder(kubeDeps, nodeName)
  89. if kubeDeps.ContainerManager == nil {
  90. if s.CgroupsPerQOS && s.CgroupRoot == "" {
  91. klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")
  92. s.CgroupRoot = "/"
  93. }
  94. // /var/lib/kubelt/config.yaml里可以指定,为系统和kube组件指定不同的cgroup,为它们预留资源
  95. // kubeReserved即为kube组件指定cgroup预留的资源
  96. kubeReserved, err := parseResourceList(s.KubeReserved)
  97. if err != nil {
  98. return err
  99. }
  100. // kubeReserved即为宿主机系统进程指定cgroup预留的资源
  101. systemReserved, err := parseResourceList(s.SystemReserved)
  102. if err != nil {
  103. return err
  104. }
  105. // 硬驱逐容器的资源阈值
  106. var hardEvictionThresholds []evictionapi.Threshold
  107. // If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
  108. if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
  109. hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{ }, s.EvictionHard, nil, nil, nil)
  110. if err != nil {
  111. return err
  112. }
  113. }
  114. experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
  115. if err != nil {
  116. return err
  117. }
  118. devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)
  119. // 上面的参数汇集起来,初始化容器管理器
  120. kubeDeps.ContainerManager, err = cm.NewContainerManager(
  121. kubeDeps.Mounter,
  122. kubeDeps.CAdvisorInterface,
  123. cm.NodeConfig{
  124. RuntimeCgroupsName: s.RuntimeCgroups,
  125. SystemCgroupsName: s.SystemCgroups,
  126. KubeletCgroupsName: s.KubeletCgroups,
  127. ContainerRuntime: s.ContainerRuntime,
  128. CgroupsPerQOS: s.CgroupsPerQOS,
  129. CgroupRoot: s.CgroupRoot,
  130. CgroupDriver: s.CgroupDriver,
  131. KubeletRootDir: s.RootDirectory,
  132. ProtectKernelDefaults: s.ProtectKernelDefaults,
  133. NodeAllocatableConfig: cm.NodeAllocatableConfig{
  134. KubeReservedCgroupName: s.KubeReservedCgroup,
  135. SystemReservedCgroupName: s.SystemReservedCgroup,
  136. EnforceNodeAllocatable: sets.NewString(s.EnforceNodeAllocatable...),
  137. KubeReserved: kubeReserved,
  138. SystemReserved: systemReserved,
  139. HardEvictionThresholds: hardEvictionThresholds,
  140. },
  141. QOSReserved: *experimentalQOSReserved,
  142. ExperimentalCPUManagerPolicy: s.CPUManagerPolicy,
  143. ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
  144. ExperimentalPodPidsLimit: s.PodPidsLimit,
  145. EnforceCPULimits: s.CPUCFSQuota,
  146. CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration,
  147. },
  148. s.FailSwapOn,
  149. devicePluginEnabled,
  150. kubeDeps.Recorder)
  151. if err != nil {
  152. return err
  153. }
  154. }
  155. if err := checkPermissions(); err != nil {
  156. klog.Error(err)
  157. }
  158. utilruntime.ReallyCrash = s.ReallyCrashForTesting
  159. rand.Seed(time.Now().UnixNano())
  160. // oom判定器给当前进程设置oom分数,容器内存资源管控的手段就是使用的oom,这里待会儿拎出来单独分析
  161. oomAdjuster := kubeDeps.OOMAdjuster
  162. if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
  163. klog.Warning(err)
  164. }
  165. // RunKubelet接往下文
  166. if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
  167. return err
  168. }
  169. // 起一个健康检查的http服务
  170. if s.HealthzPort > 0 {
  171. healthz.DefaultHealthz()
  172. go wait.Until(func() {
  173. err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), nil)
  174. if err != nil {
  175. klog.Errorf("Starting health server failed: %v", err)
  176. }
  177. }, 5*time.Second, wait.NeverStop)
  178. }
  179. if s.RunOnce {
  180. return nil
  181. }
  182. // If systemd is used, notify it that we have started
  183. go daemon.SdNotify(false, "READY=1")
  184. select {
  185. case <-done:
  186. break
  187. case <-stopCh:
  188. break
  189. }
  190. return nil
  191. }

OOMAdjuster

–> pkg/util/oom/oom.go:22上面提到的oom判定器,这里分析一下,这个结构体有三个方法:

  1. // 这里目前用的还是结构体,看todo描述是后面要改成interface
  2. // TODO: make this an interface, and inject a mock ioutil struct for testing.
  3. type OOMAdjuster struct {
  4. pidLister func(cgroupName string) ([]int, error)
  5. ApplyOOMScoreAdj func(pid int, oomScoreAdj int) error
  6. ApplyOOMScoreAdjContainer func(cgroupName string, oomScoreAdj, maxTries int) error
  7. }

–> pkg/util/oom/oom_linux.go:35实现方法

  1. func NewOOMAdjuster() *OOMAdjuster {
  2. oomAdjuster := &OOMAdjuster{
  3. pidLister: getPids,
  4. ApplyOOMScoreAdj: applyOOMScoreAdj,
  5. }
  6. oomAdjuster.ApplyOOMScoreAdjContainer = oomAdjuster.applyOOMScoreAdjContainer
  7. return oomAdjuster
  8. }
  9. // 获取cgroup下所有进程的pid
  10. func getPids(cgroupName string) ([]int, error) {
  11. return cmutil.GetPids(filepath.Join("/", cgroupName))
  12. }
  13. // 修改oom分数,在linux下即是修改/proc/<pid>/oom_score_adj对应的值,当内存紧张时由linux系统的oom机制去杀掉oom score最高的进程,默认情况下是使用内存越多的进程oom score越高越容易被kill,applyOOMScoreAdj函数就是用来修改oom score的。
  14. // Writes 'value' to /proc/<pid>/oom_score_adj. PID = 0 means self
  15. // Returns os.ErrNotExist if the `pid` does not exist.
  16. func applyOOMScoreAdj(pid int, oomScoreAdj int) error {
  17. if pid < 0 {
  18. return fmt.Errorf("invalid PID %d specified for oom_score_adj", pid)
  19. }
  20. var pidStr string
  21. if pid == 0 {
  22. pidStr = "self"
  23. } else {
  24. pidStr = strconv.Itoa(pid)
  25. }
  26. maxTries := 2
  27. oomScoreAdjPath := path.Join("/proc", pidStr, "oom_score_adj")
  28. value := strconv.Itoa(oomScoreAdj)
  29. klog.V(4).Infof("attempting to set %q to %q", oomScoreAdjPath, value)
  30. var err error
  31. for i := 0; i < maxTries; i++ {
  32. err = ioutil.WriteFile(oomScoreAdjPath, []byte(value), 0700)
  33. if err != nil {
  34. if os.IsNotExist(err) {
  35. klog.V(2).Infof("%q does not exist", oomScoreAdjPath)
  36. return os.ErrNotExist
  37. }
  38. klog.V(3).Info(err)
  39. time.Sleep(100 * time.Millisecond)
  40. continue
  41. }
  42. return nil
  43. }
  44. if err != nil {
  45. klog.V(2).Infof("failed to set %q to %q: %v", oomScoreAdjPath, value, err)
  46. }
  47. return err
  48. }
  49. // 修改整个容器的oom评分,即修改某个cgroup下所有进程的评分,getPids取得所有pid遍历执行applyOOMScoreAdj
  50. // Writes 'value' to /proc/<pid>/oom_score_adj for all processes in cgroup cgroupName.
  51. // Keeps trying to write until the process list of the cgroup stabilizes, or until maxTries tries.
  52. func (oomAdjuster *OOMAdjuster) applyOOMScoreAdjContainer(cgroupName string, oomScoreAdj, maxTries int) error {
  53. adjustedProcessSet := make(map[int]bool)
  54. for i := 0; i < maxTries; i++ {
  55. continueAdjusting := false
  56. pidList, err := oomAdjuster.pidLister(cgroupName)
  57. if err != nil {
  58. if os.IsNotExist(err) {
  59. // Nothing to do since the container doesn't exist anymore.
  60. return os.ErrNotExist
  61. }
  62. continueAdjusting = true
  63. klog.V(10).Infof("Error getting process list for cgroup %s: %+v", cgroupName, err)
  64. } else if len(pidList) == 0 {
  65. klog.V(10).Infof("Pid list is empty")
  66. continueAdjusting = true
  67. } else {
  68. for _, pid := range pidList {
  69. if !adjustedProcessSet[pid] {
  70. klog.V(10).Infof("pid %d needs to be set", pid)
  71. if err = oomAdjuster.ApplyOOMScoreAdj(pid, oomScoreAdj); err == nil {
  72. adjustedProcessSet[pid] = true
  73. } else if err == os.ErrNotExist {
  74. continue
  75. } else {
  76. klog.V(10).Infof("cannot adjust oom score for pid %d - %v", pid, err)
  77. continueAdjusting = true
  78. }
  79. // Processes can come and go while we try to apply oom score adjust value. So ignore errors here.
  80. }
  81. }
  82. }
  83. if !continueAdjusting {
  84. return nil
  85. }
  86. // There's a slight race. A process might have forked just before we write its OOM score adjust.
  87. // The fork might copy the parent process's old OOM score, then this function might execute and
  88. // update the parent's OOM score, but the forked process id might not be reflected in cgroup.procs
  89. // for a short amount of time. So this function might return without changing the forked process's
  90. // OOM score. Very unlikely race, so ignoring this for now.
  91. }
  92. return fmt.Errorf("exceeded maxTries, some processes might not have desired OOM score")
  93. }

RunKubelet

–> cmd/kubelet/app/server.go:955 回到主线

  1. func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
  2. ...
  3. // 这里的几个source都是"*",意为接收api/file/http来源的pod更新
  4. hostNetworkSources, err := kubetypes.GetValidatedSources(kubeServer.HostNetworkSources)
  5. if err != nil {
  6. return err
  7. }
  8. hostPIDSources, err := kubetypes.GetValidatedSources(kubeServer.HostPIDSources)
  9. if err != nil {
  10. return err
  11. }
  12. hostIPCSources, err := kubetypes.GetValidatedSources(kubeServer.HostIPCSources)
  13. if err != nil {
  14. return err
  15. }
  16. privilegedSources := capabilities.PrivilegedSources{
  17. HostNetworkSources: hostNetworkSources,
  18. HostPIDSources: hostPIDSources,
  19. HostIPCSources: hostIPCSources,
  20. }
  21. capabilities.Setup(kubeServer.AllowPrivileged, privilegedSources, 0)
  22. credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)
  23. klog.V(2).Infof("Using root directory: %v", kubeServer.RootDirectory)
  24. if kubeDeps.OSInterface == nil {
  25. kubeDeps.OSInterface = kubecontainer.RealOS{ }
  26. }
  27. // kubelet初始化,这个函数比较复杂,下面拎出来分析
  28. k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
  29. kubeDeps,
  30. &kubeServer.ContainerRuntimeOptions,
  31. kubeServer.ContainerRuntime,
  32. kubeServer.RuntimeCgroups,
  33. kubeServer.HostnameOverride,
  34. kubeServer.NodeIP,
  35. kubeServer.ProviderID,
  36. kubeServer.CloudProvider,
  37. kubeServer.CertDirectory,
  38. kubeServer.RootDirectory,
  39. kubeServer.RegisterNode,
  40. kubeServer.RegisterWithTaints,
  41. kubeServer.AllowedUnsafeSysctls,
  42. kubeServer.RemoteRuntimeEndpoint,
  43. kubeServer.RemoteImageEndpoint,
  44. kubeServer.ExperimentalMounterPath,
  45. kubeServer.ExperimentalKernelMemcgNotification,
  46. kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
  47. kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
  48. kubeServer.MinimumGCAge,
  49. kubeServer.MaxPerPodContainerCount,
  50. kubeServer.MaxContainerCount,
  51. kubeServer.MasterServiceNamespace,
  52. kubeServer.RegisterSchedulable,
  53. kubeServer.NonMasqueradeCIDR,
  54. kubeServer.KeepTerminatedPodVolumes,
  55. kubeServer.NodeLabels,
  56. kubeServer.SeccompProfileRoot,
  57. kubeServer.BootstrapCheckpointPath,
  58. kubeServer.NodeStatusMaxImages)
  59. if err != nil {
  60. return fmt.Errorf("failed to create kubelet: %v", err)
  61. }
  62. // NewMainKubelet should have set up a pod source config if one didn't exist
  63. // when the builder was run. This is just a precaution.
  64. if kubeDeps.PodConfig == nil {
  65. return fmt.Errorf("failed to create kubelet, pod source config was nil")
  66. }
  67. podCfg := kubeDeps.PodConfig
  68. rlimit.RlimitNumFiles(uint64(kubeServer.MaxOpenFiles))
  69. // 只运行一次处理完pod就退出
  70. if runOnce {
  71. if _, err := k.RunOnce(podCfg.Updates()); err != nil {
  72. return fmt.Errorf("runonce failed: %v", err)
  73. }
  74. klog.Info("Started kubelet as runonce")
  75. } else {
  76. // 正常运行
  77. startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
  78. klog.Info("Started kubelet")
  79. }
  80. return nil
  81. }

createAndInitKubelet

–> cmd/kubelet/app/server.go:1078,这里主要走到NewMainKubelet函数:

–> pkg/kubelet/kubelet.go:326 God!这个函数简直了,500+行代码…挑重要的说一下吧

  1. func NewMainKubelet(...) (...) {
  2. ...
  3. // 加载service informer
  4. serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{ cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
  5. if kubeDeps.KubeClient != nil {
  6. serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())
  7. r := cache.NewReflector(serviceLW, &v1.Service{ }, serviceIndexer, 0)
  8. go r.Run(wait.NeverStop)
  9. }
  10. serviceLister := corelisters.NewServiceLister(serviceIndexer)
  11. // 加载node informer
  12. nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{ })
  13. if kubeDeps.KubeClient != nil {
  14. fieldSelector := fields.Set{ api.ObjectNameField: string(nodeName)}.AsSelector()
  15. nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
  16. r := cache.NewReflector(nodeLW, &v1.Node{ }, nodeIndexer, 0)
  17. go r.Run(wait.NeverStop)
  18. }
  19. nodeInfo := &predicates.CachedNodeInfo{ NodeLister: corelisters.NewNodeLister(nodeIndexer)}
  20. ...
  21. // secretManager和configMapManager初始化,因为这两者被使用都是需要往容器内挂载目录的,需要kubelet来参与
  22. var secretManager secret.Manager
  23. var configMapManager configmap.Manager
  24. switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
  25. case kubeletconfiginternal.WatchChangeDetectionStrategy:
  26. secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient)
  27. configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient)
  28. case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:
  29. secretManager = secret.NewCachingSecretManager(
  30. kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
  31. configMapManager = configmap.NewCachingConfigMapManager(
  32. kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
  33. case kubeletconfiginternal.GetChangeDetectionStrategy:
  34. secretManager = secret.NewSimpleSecretManager(kubeDeps.KubeClient)
  35. configMapManager = configmap.NewSimpleConfigMapManager(kubeDeps.KubeClient)
  36. default:
  37. return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy)
  38. }
  39. klet.secretManager = secretManager
  40. klet.configMapManager = configMapManager
  41. // 初始化存活探针管理器
  42. klet.livenessManager = proberesults.NewManager()
  43. //专为dockershim开辟的网络插件集
  44. pluginSettings := dockershim.NetworkPluginSettings{
  45. HairpinMode: kubeletconfiginternal.HairpinMode(kubeCfg.HairpinMode),
  46. NonMasqueradeCIDR: nonMasqueradeCIDR,
  47. PluginName: crOptions.NetworkPluginName,
  48. PluginConfDir: crOptions.CNIConfDir, // 默认在/etc/cni/net.d/下存放cni配置文件
  49. PluginBinDirString: crOptions.CNIBinDir, // 默认在/opt/cni/bin/下存放cni二进制文件,如bridge/tuning/vlan/dhcp/macvlan等等
  50. MTU: int(crOptions.NetworkPluginMTU), // 网卡mtu
  51. }
  52. // kubelet相关运行时初始化
  53. runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
  54. kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
  55. klet.livenessManager,
  56. seccompProfileRoot,
  57. containerRefManager,
  58. machineInfo,
  59. klet,
  60. kubeDeps.OSInterface,
  61. klet,
  62. httpClient,
  63. imageBackOff,
  64. kubeCfg.SerializeImagePulls,
  65. float32(kubeCfg.RegistryPullQPS),
  66. int(kubeCfg.RegistryBurst),
  67. kubeCfg.CPUCFSQuota,
  68. kubeCfg.CPUCFSQuotaPeriod,
  69. runtimeService,
  70. imageService,
  71. kubeDeps.ContainerManager.InternalContainerLifecycle(),
  72. legacyLogProvider,
  73. klet.runtimeClassManager,
  74. )
  75. if err != nil {
  76. return nil, err
  77. }
  78. klet.containerRuntime = runtime
  79. klet.streamingRuntime = runtime
  80. klet.runner = runtime
  81. runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
  82. if err != nil {
  83. return nil, err
  84. }
  85. klet.runtimeCache = runtimeCache
  86. // pleg初始化(Pod Lifecycle Event Generator)
  87. klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{ })
  88. // pod workQueue初始化
  89. klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
  90. // pod worker初始化,worker从workQueue中取队首,根据指令对pod进行相应的直接操作,另外还有更新pod cache的操作
  91. klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
  92. klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
  93. klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)
  94. // 初始化驱逐管理器
  95. evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)
  96. klet.evictionManager = evictionManager
  97. klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
  98. ...
  99. }

再次回到主线,进入最后的k.Run()函数循环逻辑:

–> cmd/kubelet/app/server.go:1058

  1. func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
  2. // start the kubelet
  3. // 循环执行kubelet的工作逻辑k.Run()方法
  4. go wait.Until(func() {
  5. k.Run(podCfg.Updates())
  6. }, 0, wait.NeverStop)
  7. // start the kubelet server
  8. // 提供诸如/metrics /health等api
  9. if enableServer {
  10. go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
  11. }
  12. // 配置查询api
  13. if kubeCfg.ReadOnlyPort > 0 {
  14. go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
  15. }
  16. if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
  17. go k.ListenAndServePodResources()
  18. }
  19. }

wait.Until()循环执行函数前面的文章中已经分析过多次了,不再赘述,来分析一下k.Run(podCfg.Updates())传的实参是什么:

–> pkg/kubelet/config/config.go:105

  1. func (c *PodConfig) Updates() <-chan kubetypes.PodUpdate {
  2. return c.updates
  3. }

接着看c.updates --> pkg/kubelet/config/config.go:58

  1. type PodConfig struct {
  2. pods *podStorage
  3. mux *config.Mux
  4. // the channel of denormalized changes passed to listeners
  5. updates chan kubetypes.PodUpdate
  6. // contains the list of all configured sources
  7. sourcesLock sync.Mutex
  8. sources sets.String
  9. checkpointManager checkpointmanager.CheckpointManager
  10. }

–> pkg/kubelet/types/pod_update.go:80

  1. type PodUpdate struct {
  2. Pods []*v1.Pod
  3. Op PodOperation
  4. Source string
  5. }

猜测是将pod的写(删查改)请求转换成结构体,放入chan中,然后由k.Run()方法来处理这些写请求,k.Run()的实现在这里``pkg/kubelet/kubelet.go:1382`,留作下回分析,本篇启动流程篇到此结束。

小结

kubelet的源码果真是相当的复杂,一个函数动辄数百行,也难怪,毕竟作为daemon端执行数据平面工作的它要承担着很多职责,先到这吧

发表评论

表情:
评论列表 (有 0 条评论,121人围观)

还没有评论,来说两句吧...

相关阅读

    相关 SpringBoot2.1.1启动流程分析

    使用springboot已经有一年多了,但是一直停留的在整合,项目能够正常跑的阶段。年底了。抽点时间研究一下Springboot源码 ,这肯定会对以后的工作有帮助。今天我们从s