kubebuilder 进阶使用
上一篇文章中介绍了 kubebuilder 的简单使用方法以及实现一个简单的逻辑。本篇文章则深入kubebuilder源码,深入学习kubebuilder开发。
Status
在开发 operator,每种自定义资源只能包含有两种子资源——Status 和 Scale。这里探究一下Status的使用。
在 Kubebuilder 自动生成的CR结构体中,已经为我们生成了 Status 的结构体,但此时 Status 仍非 CR 的子资源。
type SampleSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
// Foo is an example field of Sample. Edit Sample_types.go to remove/update
Foo string `json:"foo,omitempty"`
}
// SampleStatus defines the observed state of Sample
type SampleStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
}
// +kubebuilder:object:root=true
// Sample is the Schema for the samples API
type Sample struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec SampleSpec `json:"spec,omitempty"`
Status SampleStatus `json:"status,omitempty"`
}
这里 Spec 和 Status 都是 Sample 的成员变量,Status并不像Pod.Status一样,是Pod的subResource.因此,如果我们在controller的代码中调用到Status().Update(), 会触发panic,并报错:the server could not find the requested resource
如果我们想像k8s中的设计那样,那么就要遵循k8s中status subresource的使用规范:
- 用户只能指定一个CRD实例的spec部分;
- CRD实例的status部分由控制器进行变更。
此时我们需要在 Sample 中添加一行注解// +kubebuilder
,标明 status 是 Sample 的子资源。status
其实 Status 是一个非常重要的字段,在 Controller 中实现业务代码时,需要遵循 K8s 的声明式API的思想,因此需要依据 Status 的字段去进行 CR 处理的逻辑,即判断当前CR所处状态,对比其期望状态,然后做出一定的动作。
eventRecorder
Status 字段用于 CR 的逻辑处理,但在后期维护过程中,则一部分会依赖于 CR 所产生的 Event,这有些类似于在 CR 的业务逻辑中添加部分重要的日志,最终可以帮助我们定位问题。后期运行过程中,可以通过 kubectl describe cr -n xxx
看到这些 Event。
首先需要在 SampleReconciler
中加入 recorder.EventRecorder
成员变量:
type SampleReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
}
// 在业务逻辑中记录
func (r *SampleReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
sample := &samplev1.Sample{}
_ := r.Get(ctx, client.ObjectKey{Name: req.Name, Namespace: req.Namespace}, sample)
// do something
r.Recorder.Eventf(sample, corev1.EventTypeWarning, "Error", "some error")
}
K8s 为我们提供了2种等级的 event,分别是 Normal 和 Warning。
finalizer
finalizer即终结器,存在于每一个k8s内的资源实例中,即**.metadata.finalizers
,它是一个字符串数组,每一个成员表示一个finalizer。
控制器在删除某个资源时,K8s 会更新 CR 的 DeletionTimestamp
字段,这样会触发一个 Update 动作,我们的 controller 可以监听这个字段的变更,实现 CR 的预删除处理,比如删除由 CR 创建的某些资源。
当我们需要设计这类finalizer时,就可以自定义一个controller来实现。
func (c *SampleReconciler) finalize(ctx context.Context, sample *samplev1.Sample) error {
logs := c.getLog(sample)
// 这里我们自定义一个 finalizer 字段
myFinalizerName := "sample.finalizers.io"
if sample.ObjectMeta.DeletionTimestamp.IsZero() {
//这里由于 DeletionTimestamp 是0,即没有删除操作,则不进行处理,只检查 CR 中是否含有自定义 finalizer 字符,若没有则增加。
if !containsString(sample.ObjectMeta.Finalizers, myFinalizerName) {
sample.ObjectMeta.Finalizers = append(sample.ObjectMeta.Finalizers, myFinalizerName)
}
} else {
//进行预删除操作
if containsString(sample.ObjectMeta.Finalizers, myFinalizerName) {
// do something
// 从 CR 中删除自定义 finalizer 字段。
sample.ObjectMeta.Finalizers = removeString(sample.ObjectMeta.Finalizers, myFinalizerName)
}
return nil
}
return nil
}
扩展 Reconciler
在涉及下列操作之前,我们需要先了解 controller-runtime
包的处理逻辑。Informer 机制在包中已经实现,我们就不再过多关注,假设现在已经监听到 CR 的变化事件(包括 创建、更新、删除、扩展),这个事件则会进入 WorkQueue 中。在进入 WorkQueue 之前, controller-runtime
会进行一些过滤处理和业务处理。主要涉及接口是 EventHandler
和 Predicate
。
其中 EventHandler
可以在事件入队列之前加入其他逻辑,其定义如下:
//controller-runtime@v0.5.0\pkg\handler\eventhandler.go
//此处定义了针对不同事件的处理接口,我们可以通过实现此接口完成扩展业务逻辑
type EventHandler interface {
// Create is called in response to an create event - e.g. Pod Creation.
Create(event.CreateEvent, workqueue.RateLimitingInterface)
// Update is called in response to an update event - e.g. Pod Updated.
Update(event.UpdateEvent, workqueue.RateLimitingInterface)
// Delete is called in response to a delete event - e.g. Pod Deleted.
Delete(event.DeleteEvent, workqueue.RateLimitingInterface)
// Generic is called in response to an event of an unknown type or a synthetic event triggered as a cron or
// external trigger request - e.g. reconcile Autoscaling, or a Webhook.
Generic(event.GenericEvent, workqueue.RateLimitingInterface)
}
Predicate
则是对监听到的事件进行过滤,让我们只关注我们想要的时间,其结构体如下:
type Predicate interface {
// Create returns true if the Create event should be processed
Create(event.CreateEvent) bool
// Delete returns true if the Delete event should be processed
Delete(event.DeleteEvent) bool
// Update returns true if the Update event should be processed
Update(event.UpdateEvent) bool
// Generic returns true if the Generic event should be processed
Generic(event.GenericEvent) bool
}
在入队列之前,controller-runtime 的处理逻辑如下:
//controller-runtime@v0.5.0\pkg\source\internal\eventsource.go
type EventHandler struct {
EventHandler handler.EventHandler
Queue workqueue.RateLimitingInterface
Predicates []predicate.Predicate
}
func (e EventHandler) OnAdd(obj interface{}) {
c := event.CreateEvent{}
...
// 这里可以自定义 Predicates,将事件进行过滤
for _, p := range e.Predicates {
if !p.Create(c) {
return
}
}
// 调用了上面的 EventHandler 对应的逻辑
e.EventHandler.Create(c, e.Queue)
}
// 除了 OnAdd 外,还有 OnUpdate OnDelete
注意,最终入队列的数据结构如下,即只有 namespace 和name,并没有资源的类型。
type NamespacedName struct {
Namespace string
Name string
}
在 controller-runtime
包中的 controller.Start()
方法中,则会循环从队列中拿取一个事件
// controller.go
func (c *Controller) Start(stop <-chan struct{}) error {
...
// 启动多个 worker 线程,处理事件
log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles)
for i := 0; i < c.MaxConcurrentReconciles; i++ {
go wait.Until(c.worker, c.JitterPeriod, stop)
}
...
}
func (c *Controller) worker() {
for c.processNextWorkItem() {
}
}
func (c *Controller) processNextWorkItem() bool {
// 拿取一个事件
obj, shutdown := c.Queue.Get()
if shutdown {
// Stop working
return false
}
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.Queue.Done(obj)
//处理事件
return c.reconcileHandler(obj)
}
监控多个资源变化
在我们开发时,可能会遇到 CR -> Deployment -> Pod 的逻辑,即由CR 创建deployment,最终落入pod。这时,我们不仅需要监听 CR 的变化, deployment 以及 pod 的变化我们也需要关注,这就意味着我们在 reconciler 中也需要根据deployment变化进行相应的处理。我们在SampleReconciler
的SetupWithManager
方法中,可以看到:
func (r *SampleReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&samplev1.Sample{}).
Complete(r)
}
这里的 For
则可以指定需要监听的资源。进一步,我们看NewControllerManagedBy()
的源码可以发现pkg\builder\controller.go
中实现了 构建者模式,controller-runtime
提供了一个 builder
方便我们进行配置。其中监听资源的有 For()
Owns()
和Watch()
,For()
Owns()
都是基于Watch()
实现,只不过是入队列前的 eventHandler
不同。
简单来说,我们可以调用Watch()
实现监听deployment
func (r *SampleReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&samplev1.Sample{}).
// 这里可以供我们指定 eventhandler
Watches(&source.Kind{Type: &apps.Deployment{}}, &handler.EnqueueRequestForObject{}).
Complete(r)
但这样做会监听所有 deployment 事件变化,如果我们想只关注由 CR 创建的 Deployment 因此我们可以采用 Owns()
方法。
func (r *SampleReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&samplev1.Sample{}).
// 这里可以供我们指定 eventhandler
Owns(&apps.Deployment{}).
Complete(r)
当我们使用 CR 创建 deployment 时,可以为他塞入一个从属关系,类似于 Pod 资源的Metadata
里会有一个OnwerReference
字段
_ = controllerutil.SetControllerReference(sample, &deployment, r.Scheme)
这样在监听时,只会监听到带有 OwnerShip 的 deployment。
其实 Owns() 方法另外定义了一个 eventHandler做了处理,感兴趣的小伙伴可以研读下源码
监听多资源下evnetHandler 和 Reconciler 的逻辑
我们的 controller Reconciler 业务逻辑,实际上只应该处理 CR 的变化,但有时是 CR 所拥有的 Deployment 发生了变化,但对应的 CR 并不会有更新事件,因此我们需要在自定义eventHandler
中,对资源进行判断。若是 CR 的变化,则直接向队列写入 namespace 和 name,若是 deployment 的变化,则向队列写入 deployment 对应 CR 的namespace 和name,以出发 Reconciler 的逻辑。
func (e *EnqueueRequestForDP) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if evt.Meta == nil {
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
return
}
_, ok := evt.Object.(*samplev1.Sample)
if ok {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Meta.Name,
Namespace: evt.Meta.GetNamespace(),
}})
return
}
deploy ,_:= evt.Object.(*v1.Deployment)
for _, owner := range deploy.OwnerReferences {
if owner.Kind == "Sample" {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: owner.Name,
Namespace: evt.Meta.GetNamespace(),
}})
}
}
}
监听指定字段变化
依据上文,自定义 predicate 即可实现。 比如我们只对 CR 改变 label 的事件感兴趣,我们此时可以自定义 Predicate 以及其 OnUpdate() 方法
func (rl *ResourceLabelChangedPredicate) Update (e event.UpdateEvent) bool{
_, ok1 = e.ObjectOld.(*samplev1.Sample)
_, ok2 = e.ObjectNew.(*samplev1.Sample)
if ok1 && ok2 {
if !compareMaps(e.MetaOld.GetLabels(), e.MetaNew.GetLabels()) {
return true
}
}
return false
}
注意,当我们监听多个资源后, deployment 的更新事件也会进入到这个方法,所以在方法中,需要通过 _, ok = e.Object(*xxx)
判断资源的类型。
多版本切换
在crd的开发和演进过程中,必然会存在一个crd的不同版本。 kubebuilder支持以一个conversion webhook的方式,支持对一个crd资源以不同版本进行读取。简单地描述就是:
kubectl apply -f config/samples/batch_v2_cronjob.yaml
创建一个v2的cronjob后,可以通过v1和v2两种版本进行读取:
kubectl get cronjobs.v2.batch.tutorial.kubebuilder.io -o yaml
kubectl get cronjobs.v1.batch.tutorial.kubebuilder.io -o yaml
显然,get命令得到的v1和v2版本的cronjob会存在一些字段上的不同,conversion webhook会负责进行不同版本的cronjob之间的数据转换。
webhook
有时候我们需要在某个对象的webhook中查询集群中的其他资源,比如某个operator规定了一个PodBox,规定每个PodBox中只能有一个Pod,那么在validatecreate的webhook中就要ListPod By PodBox并计数。
kubebuilder 2.X 将webhook封装得太过简介,所以我们需要搞个新法子:
我们在types和webhook的目录下新建一个文件, 在里面构建一个全局client:
package v1
import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
var globalClient client.Client
var globalReader client.Reader
func InitClient(mgr ctrl.Manager) {
globalClient = mgr.GetClient()
globalReader = mgr.GetAPIReader()
}
在 main.go中, 各种SetupWithManager之前,先执行InitClient,初始化这些client, validateCreate方法中可以直接使用这些client。
自定义webhook
我们开发的operator可能会需要对用户新建的pod进行注入,比如注入一些信息到annotations中, 也有可能要对原生对象的更新/删除操作进行判断,那么如何在我们的项目中添加这些对象的webhook?
社区提供了一个案例:https://github.com/kubernetes-sigs/controller-runtime/blob/master/examples/builtins/validatingwebhook.go
但是在该案例下,每次执行make generate时 会报错:
invalid field type interface{sigs.k8s.io/controller-runtime/pkg/client.Reader; sigs.k8s.io/controller-runtime/pkg/client.StatusClient; sigs.k8s.io/controller-runtime/pkg/client.Writer}
不过测试了一下 只要不执行generate,其他步骤都可以正常执行, 比如 make docker-build
kubebuilder 注解
我们在生成的 CR 结构体代码中会发现由很多 kubebuilder 自定义的注解,例如// +kubebuilder
等,其实这些在编译时会增加对应的功能,更多注解见root=true
Markers for Config/Code Generationbook.kubebuilder.io
还没有评论,来说两句吧...