Please enable Javascript to view the contents

K8s API 核心对象 —— client-go

 ·  ☕ 3 分钟

API 入口

Client Sets

接收变更通知和缓存(Informers and Caching)

Client Sets可以 watch 变更,但一般我们用更高级的 Informers,因为它有缓存、索引等功能。

image-20210326154940269

  • Lister :被应用调用,返回缓存中的数据列表
  • Informer:监听器

Informer 有两个功能

  • relists:长期断连后追回增量数据
  • resync:间隔地全量同步数据

由于 Informer 对 API Server 造成了一定压力,对于同一个 GroupVersionResource 一个应用一般只使用一个Informer。通过使用informer factory,应用内可以更加方便地共享Informer

创建 Informer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import (
    ...
    "k8s.io/client-go/informers"
)
...
clientset, err := kubernetes.NewForConfig(config)
informerFactory := informers.NewSharedInformerFactory(clientset, time.Second*30)
podInformer := informerFactory.Core().V1().Pods()
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: func(new interface{}) {...},
    UpdateFunc: func(old, new interface{}) {...},
    DeleteFunc: func(obj interface{}) {...},
})
informerFactory.Start(wait.NeverStop)//starts these Go routines
informerFactory.WaitForCacheSync(wait.NeverStop)//wait for the first List calls finish
pod, err := podInformer.Lister().Pods("programming-kubernetes").Get("client-go")

一般,上面的EventHandler只是把事件和对象加入work queue中,而不是同步处理。切记,EventHandler中不能修改传入的对象。

Work Queue

k8s.io/client-go/util/workqueue 中有一个优先队列的实现。

1
2
3
4
5
6
7
8
type Interface interface {
    Add(item interface{})
    Len() int
    Get() (item interface{}, shutdown bool)
    Done(item interface{})
    ShutDown()
    ShuttingDown() bool
}

延时队列:

1
2
3
4
5
6
type DelayingInterface interface {
    Interface
    // AddAfter adds an item to the workqueue after the
    // indicated duration has passed.
    AddAfter(item interface{}, duration time.Duration)
}

限流队列:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
type RateLimitingInterface interface {
    DelayingInterface

    // AddRateLimited adds an item to the workqueue after the rate
    // limiter says it's OK.
    AddRateLimited(item interface{})

    // Forget indicates that an item is finished being retried.
    // It doesn't matter whether it's for perm failing or success;
    // we'll stop the rate limiter from tracking it. This only clears
    // the `rateLimiter`; you still have to call `Done` on the queue.
    Forget(item interface{})

    // NumRequeues returns back how many times the item was requeued.
    NumRequeues(item interface{}) int
}

有好几种限流队列,可以分别用不同的 go 对象构造器去创建。大部分 Controller 均使用DefaultControllerRateLimiter() *RateLimiter构造器。它:

  • 实现的指数级的退让(重试间隔)。间隔从 5ms 到 1,000 秒。每次队列元素处理错误,均倍增重试时间
  • 允许最快的平均处理速度是每秒10个,时间窗口内加速爆发时最大每秒100个。

API 机制分析

Kinds

Kinds 以 GroupVersionKind 缩写 GVK 来区分。

Resources

资源也是以 Group 和 Version 来区分,GroupVersionResource 缩写 GVR

如 GVR : apps/v1.deployments 映射到 /apis/apps/v1/namespaces/namespace/deployments

REST 映射

RESTMapper 可以把 GVK 映射 到 GVR

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type RESTMapping struct {
    // Resource is the GroupVersionResource (location) for this endpoint.
    Resource schema.GroupVersionResource.

    // GroupVersionKind is the GroupVersionKind (data format) to submit
    // to this endpoint.
    GroupVersionKind schema.GroupVersionKind

    // Scope contains the information needed to deal with REST Resources
    // that are in a resource hierarchy.
    Scope RESTScope
}

// KindFor takes a partial resource and returns the single match.
// Returns an error if there are multiple matches.
KindFor(resource schema.GroupVersionResource) (schema.GroupVersionKind, error)

// KindsFor takes a partial resource and returns the list of potential
// kinds in priority order.
KindsFor(resource schema.GroupVersionResource) ([]schema.GroupVersionKind, error)

// ResourceFor takes a partial resource and returns the single match.
// Returns an error if there are multiple matches.
ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, error)

// ResourcesFor takes a partial resource and returns the list of potential
// resource in priority order.
ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error)

// RESTMappings returns all resource mappings for the provided group kind
// if no version search is provided. Otherwise identifies a preferred resource
// mapping for the provided version(s).
RESTMappings(gk schema.GroupKind, versions ...string) ([]*RESTMapping, error)

如,当执行命令kubectl get pods 时,没带上 Group 与 Version。RESTMapper 负责补全。

如,apps/v1.Deployment 映射为 apps/v1.deployments

Scheme

Schema 负责把 GVK 反射为 Golang 的类型。

image-20210327223535176

image-20210327223635930

参考

[Programming Kubernetes]

分享

Mark Zhu
作者
Mark Zhu
An old developer