kubebuilder 进阶使用

我不是女神ヾ 2022-09-02 13:57 791阅读 0赞

上一篇文章中介绍了 kubebuilder 的简单使用方法以及实现一个简单的逻辑。本篇文章则深入kubebuilder源码,深入学习kubebuilder开发。

Status

在开发 operator,每种自定义资源只能包含有两种子资源——Status 和 Scale。这里探究一下Status的使用。
在 Kubebuilder 自动生成的CR结构体中,已经为我们生成了 Status 的结构体,但此时 Status 仍非 CR 的子资源。

  1. type SampleSpec struct {
  2. // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
  3. // Important: Run "make" to regenerate code after modifying this file
  4. // Foo is an example field of Sample. Edit Sample_types.go to remove/update
  5. Foo string `json:"foo,omitempty"`
  6. }
  7. // SampleStatus defines the observed state of Sample
  8. type SampleStatus struct {
  9. // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
  10. // Important: Run "make" to regenerate code after modifying this file
  11. }
  12. // +kubebuilder:object:root=true
  13. // Sample is the Schema for the samples API
  14. type Sample struct {
  15. metav1.TypeMeta `json:",inline"`
  16. metav1.ObjectMeta `json:"metadata,omitempty"`
  17. Spec SampleSpec `json:"spec,omitempty"`
  18. Status SampleStatus `json:"status,omitempty"`
  19. }

这里 Spec 和 Status 都是 Sample 的成员变量,Status并不像Pod.Status一样,是Pod的subResource.因此,如果我们在controller的代码中调用到Status().Update(), 会触发panic,并报错:the server could not find the requested resource

如果我们想像k8s中的设计那样,那么就要遵循k8s中status subresource的使用规范:

  • 用户只能指定一个CRD实例的spec部分;
  • CRD实例的status部分由控制器进行变更。

此时我们需要在 Sample 中添加一行注解// +kubebuilder:subresource:status,标明 status 是 Sample 的子资源。

其实 Status 是一个非常重要的字段,在 Controller 中实现业务代码时,需要遵循 K8s 的声明式API的思想,因此需要依据 Status 的字段去进行 CR 处理的逻辑,即判断当前CR所处状态,对比其期望状态,然后做出一定的动作。

eventRecorder

Status 字段用于 CR 的逻辑处理,但在后期维护过程中,则一部分会依赖于 CR 所产生的 Event,这有些类似于在 CR 的业务逻辑中添加部分重要的日志,最终可以帮助我们定位问题。后期运行过程中,可以通过 kubectl describe cr -n xxx 看到这些 Event。

首先需要在 SampleReconciler 中加入 recorder.EventRecorder 成员变量:

  1. type SampleReconciler struct {
  2. client.Client
  3. Log logr.Logger
  4. Scheme *runtime.Scheme
  5. Recorder record.EventRecorder
  6. }
  7. // 在业务逻辑中记录
  8. func (r *SampleReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
  9. sample := &samplev1.Sample{}
  10. _ := r.Get(ctx, client.ObjectKey{Name: req.Name, Namespace: req.Namespace}, sample)
  11. // do something
  12. r.Recorder.Eventf(sample, corev1.EventTypeWarning, "Error", "some error")
  13. }

K8s 为我们提供了2种等级的 event,分别是 Normal 和 Warning。

finalizer

finalizer即终结器,存在于每一个k8s内的资源实例中,即**.metadata.finalizers,它是一个字符串数组,每一个成员表示一个finalizer。

控制器在删除某个资源时,K8s 会更新 CR 的 DeletionTimestamp 字段,这样会触发一个 Update 动作,我们的 controller 可以监听这个字段的变更,实现 CR 的预删除处理,比如删除由 CR 创建的某些资源。

当我们需要设计这类finalizer时,就可以自定义一个controller来实现。

  1. func (c *SampleReconciler) finalize(ctx context.Context, sample *samplev1.Sample) error {
  2. logs := c.getLog(sample)
  3. // 这里我们自定义一个 finalizer 字段
  4. myFinalizerName := "sample.finalizers.io"
  5. if sample.ObjectMeta.DeletionTimestamp.IsZero() {
  6. //这里由于 DeletionTimestamp 是0,即没有删除操作,则不进行处理,只检查 CR 中是否含有自定义 finalizer 字符,若没有则增加。
  7. if !containsString(sample.ObjectMeta.Finalizers, myFinalizerName) {
  8. sample.ObjectMeta.Finalizers = append(sample.ObjectMeta.Finalizers, myFinalizerName)
  9. }
  10. } else {
  11. //进行预删除操作
  12. if containsString(sample.ObjectMeta.Finalizers, myFinalizerName) {
  13. // do something
  14. // 从 CR 中删除自定义 finalizer 字段。
  15. sample.ObjectMeta.Finalizers = removeString(sample.ObjectMeta.Finalizers, myFinalizerName)
  16. }
  17. return nil
  18. }
  19. return nil
  20. }

扩展 Reconciler

在涉及下列操作之前,我们需要先了解 controller-runtime 包的处理逻辑。Informer 机制在包中已经实现,我们就不再过多关注,假设现在已经监听到 CR 的变化事件(包括 创建、更新、删除、扩展),这个事件则会进入 WorkQueue 中。在进入 WorkQueue 之前, controller-runtime 会进行一些过滤处理和业务处理。主要涉及接口是 EventHandlerPredicate

其中 EventHandler 可以在事件入队列之前加入其他逻辑,其定义如下:

  1. //controller-runtime@v0.5.0\pkg\handler\eventhandler.go
  2. //此处定义了针对不同事件的处理接口,我们可以通过实现此接口完成扩展业务逻辑
  3. type EventHandler interface {
  4. // Create is called in response to an create event - e.g. Pod Creation.
  5. Create(event.CreateEvent, workqueue.RateLimitingInterface)
  6. // Update is called in response to an update event - e.g. Pod Updated.
  7. Update(event.UpdateEvent, workqueue.RateLimitingInterface)
  8. // Delete is called in response to a delete event - e.g. Pod Deleted.
  9. Delete(event.DeleteEvent, workqueue.RateLimitingInterface)
  10. // Generic is called in response to an event of an unknown type or a synthetic event triggered as a cron or
  11. // external trigger request - e.g. reconcile Autoscaling, or a Webhook.
  12. Generic(event.GenericEvent, workqueue.RateLimitingInterface)
  13. }

Predicate 则是对监听到的事件进行过滤,让我们只关注我们想要的时间,其结构体如下:

  1. type Predicate interface {
  2. // Create returns true if the Create event should be processed
  3. Create(event.CreateEvent) bool
  4. // Delete returns true if the Delete event should be processed
  5. Delete(event.DeleteEvent) bool
  6. // Update returns true if the Update event should be processed
  7. Update(event.UpdateEvent) bool
  8. // Generic returns true if the Generic event should be processed
  9. Generic(event.GenericEvent) bool
  10. }

在入队列之前,controller-runtime 的处理逻辑如下:

  1. //controller-runtime@v0.5.0\pkg\source\internal\eventsource.go
  2. type EventHandler struct {
  3. EventHandler handler.EventHandler
  4. Queue workqueue.RateLimitingInterface
  5. Predicates []predicate.Predicate
  6. }
  7. func (e EventHandler) OnAdd(obj interface{}) {
  8. c := event.CreateEvent{}
  9. ...
  10. // 这里可以自定义 Predicates,将事件进行过滤
  11. for _, p := range e.Predicates {
  12. if !p.Create(c) {
  13. return
  14. }
  15. }
  16. // 调用了上面的 EventHandler 对应的逻辑
  17. e.EventHandler.Create(c, e.Queue)
  18. }
  19. // 除了 OnAdd 外,还有 OnUpdate OnDelete

注意,最终入队列的数据结构如下,即只有 namespace 和name,并没有资源的类型。

  1. type NamespacedName struct {
  2. Namespace string
  3. Name string
  4. }

controller-runtime 包中的 controller.Start()方法中,则会循环从队列中拿取一个事件

  1. // controller.go
  2. func (c *Controller) Start(stop <-chan struct{}) error {
  3. ...
  4. // 启动多个 worker 线程,处理事件
  5. log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles)
  6. for i := 0; i < c.MaxConcurrentReconciles; i++ {
  7. go wait.Until(c.worker, c.JitterPeriod, stop)
  8. }
  9. ...
  10. }
  11. func (c *Controller) worker() {
  12. for c.processNextWorkItem() {
  13. }
  14. }
  15. func (c *Controller) processNextWorkItem() bool {
  16. // 拿取一个事件
  17. obj, shutdown := c.Queue.Get()
  18. if shutdown {
  19. // Stop working
  20. return false
  21. }
  22. // We call Done here so the workqueue knows we have finished
  23. // processing this item. We also must remember to call Forget if we
  24. // do not want this work item being re-queued. For example, we do
  25. // not call Forget if a transient error occurs, instead the item is
  26. // put back on the workqueue and attempted again after a back-off
  27. // period.
  28. defer c.Queue.Done(obj)
  29. //处理事件
  30. return c.reconcileHandler(obj)
  31. }

监控多个资源变化

在我们开发时,可能会遇到 CR -> Deployment -> Pod 的逻辑,即由CR 创建deployment,最终落入pod。这时,我们不仅需要监听 CR 的变化, deployment 以及 pod 的变化我们也需要关注,这就意味着我们在 reconciler 中也需要根据deployment变化进行相应的处理。我们在SampleReconcilerSetupWithManager方法中,可以看到:

  1. func (r *SampleReconciler) SetupWithManager(mgr ctrl.Manager) error {
  2. return ctrl.NewControllerManagedBy(mgr).
  3. For(&samplev1.Sample{}).
  4. Complete(r)
  5. }

这里的 For 则可以指定需要监听的资源。进一步,我们看NewControllerManagedBy()的源码可以发现pkg\builder\controller.go 中实现了 构建者模式controller-runtime 提供了一个 builder 方便我们进行配置。其中监听资源的有 For() Owns()Watch() ,For() Owns()都是基于Watch()实现,只不过是入队列前的 eventHandler 不同。

简单来说,我们可以调用Watch()实现监听deployment

  1. func (r *SampleReconciler) SetupWithManager(mgr ctrl.Manager) error {
  2. return ctrl.NewControllerManagedBy(mgr).
  3. For(&samplev1.Sample{}).
  4. // 这里可以供我们指定 eventhandler
  5. Watches(&source.Kind{Type: &apps.Deployment{}}, &handler.EnqueueRequestForObject{}).
  6. Complete(r)

但这样做会监听所有 deployment 事件变化,如果我们想只关注由 CR 创建的 Deployment 因此我们可以采用 Owns() 方法。

  1. func (r *SampleReconciler) SetupWithManager(mgr ctrl.Manager) error {
  2. return ctrl.NewControllerManagedBy(mgr).
  3. For(&samplev1.Sample{}).
  4. // 这里可以供我们指定 eventhandler
  5. Owns(&apps.Deployment{}).
  6. Complete(r)

当我们使用 CR 创建 deployment 时,可以为他塞入一个从属关系,类似于 Pod 资源的Metadata 里会有一个OnwerReference字段

  1. _ = controllerutil.SetControllerReference(sample, &deployment, r.Scheme)

这样在监听时,只会监听到带有 OwnerShip 的 deployment。

其实 Owns() 方法另外定义了一个 eventHandler做了处理,感兴趣的小伙伴可以研读下源码

监听多资源下evnetHandler 和 Reconciler 的逻辑

我们的 controller Reconciler 业务逻辑,实际上只应该处理 CR 的变化,但有时是 CR 所拥有的 Deployment 发生了变化,但对应的 CR 并不会有更新事件,因此我们需要在自定义eventHandler中,对资源进行判断。若是 CR 的变化,则直接向队列写入 namespace 和 name,若是 deployment 的变化,则向队列写入 deployment 对应 CR 的namespace 和name,以出发 Reconciler 的逻辑。

  1. func (e *EnqueueRequestForDP) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
  2. if evt.Meta == nil {
  3. enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
  4. return
  5. }
  6. _, ok := evt.Object.(*samplev1.Sample)
  7. if ok {
  8. q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
  9. Name: evt.Meta.Name,
  10. Namespace: evt.Meta.GetNamespace(),
  11. }})
  12. return
  13. }
  14. deploy ,_:= evt.Object.(*v1.Deployment)
  15. for _, owner := range deploy.OwnerReferences {
  16. if owner.Kind == "Sample" {
  17. q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
  18. Name: owner.Name,
  19. Namespace: evt.Meta.GetNamespace(),
  20. }})
  21. }
  22. }
  23. }

监听指定字段变化

依据上文,自定义 predicate 即可实现。 比如我们只对 CR 改变 label 的事件感兴趣,我们此时可以自定义 Predicate 以及其 OnUpdate() 方法

  1. func (rl *ResourceLabelChangedPredicate) Update (e event.UpdateEvent) bool{
  2. _, ok1 = e.ObjectOld.(*samplev1.Sample)
  3. _, ok2 = e.ObjectNew.(*samplev1.Sample)
  4. if ok1 && ok2 {
  5. if !compareMaps(e.MetaOld.GetLabels(), e.MetaNew.GetLabels()) {
  6. return true
  7. }
  8. }
  9. return false
  10. }

注意,当我们监听多个资源后, deployment 的更新事件也会进入到这个方法,所以在方法中,需要通过 _, ok = e.Object(*xxx) 判断资源的类型。

多版本切换

在crd的开发和演进过程中,必然会存在一个crd的不同版本。 kubebuilder支持以一个conversion webhook的方式,支持对一个crd资源以不同版本进行读取。简单地描述就是:

  1. kubectl apply -f config/samples/batch_v2_cronjob.yaml

创建一个v2的cronjob后,可以通过v1和v2两种版本进行读取:

  1. kubectl get cronjobs.v2.batch.tutorial.kubebuilder.io -o yaml
  2. kubectl get cronjobs.v1.batch.tutorial.kubebuilder.io -o yaml

显然,get命令得到的v1和v2版本的cronjob会存在一些字段上的不同,conversion webhook会负责进行不同版本的cronjob之间的数据转换。

webhook

有时候我们需要在某个对象的webhook中查询集群中的其他资源,比如某个operator规定了一个PodBox,规定每个PodBox中只能有一个Pod,那么在validatecreate的webhook中就要ListPod By PodBox并计数。

kubebuilder 2.X 将webhook封装得太过简介,所以我们需要搞个新法子:

我们在types和webhook的目录下新建一个文件, 在里面构建一个全局client:

  1. package v1
  2. import (
  3. ctrl "sigs.k8s.io/controller-runtime"
  4. "sigs.k8s.io/controller-runtime/pkg/client"
  5. )
  6. var globalClient client.Client
  7. var globalReader client.Reader
  8. func InitClient(mgr ctrl.Manager) {
  9. globalClient = mgr.GetClient()
  10. globalReader = mgr.GetAPIReader()
  11. }

在 main.go中, 各种SetupWithManager之前,先执行InitClient,初始化这些client, validateCreate方法中可以直接使用这些client。

自定义webhook

我们开发的operator可能会需要对用户新建的pod进行注入,比如注入一些信息到annotations中, 也有可能要对原生对象的更新/删除操作进行判断,那么如何在我们的项目中添加这些对象的webhook?

社区提供了一个案例:https://github.com/kubernetes-sigs/controller-runtime/blob/master/examples/builtins/validatingwebhook.go

但是在该案例下,每次执行make generate时 会报错:

  1. invalid field type interface{sigs.k8s.io/controller-runtime/pkg/client.Reader; sigs.k8s.io/controller-runtime/pkg/client.StatusClient; sigs.k8s.io/controller-runtime/pkg/client.Writer}

不过测试了一下 只要不执行generate,其他步骤都可以正常执行, 比如 make docker-build

kubebuilder 注解

我们在生成的 CR 结构体代码中会发现由很多 kubebuilder 自定义的注解,例如// +kubebuilder:object:root=true 等,其实这些在编译时会增加对应的功能,更多注解见

Markers for Config/Code Generation​book.kubebuilder.io

发表评论

表情:
评论列表 (有 0 条评论,791人围观)

还没有评论,来说两句吧...

相关阅读

    相关 IDEA使用

    一、热部署 > 热部署可以使的修改代码后,无须重启服务器,就可以加载更改的代码。 使用Jrebel插件实现热部署 在线安装:菜单File -> Setting ->

    相关 linux使用(一)

    本文依据《应该知道的Linux技巧》coolshell上的一篇文章提到的Linux技巧,结合自己掌握的情况进行扩展和总结得来。主要包含下面内容: 一、日常操作 二、数

    相关 ViewPager使用指南(

    之前写了一篇ViewPager最简单的demo(有需要的小伙伴可以去参考我上一篇的博客),可以实现试图的左右滑动,但怎么确定我滑动到哪个界面呢,接下来我们就来实现这个效果。