1、Client 客户端

client-go 支持 4 种 Client 客户端对象与 Kubernetes API Server 交互的方式(ClientSet、DynamicClient、DiscoveryClient、RESTClient),前三个都是基于 RestClient 做的二次封装。

1.1 kubeconfig

改文件中存储了集群、用户、命名空间和身份验证等信息,在默认的情况下,存放在 $HOME/.kube/config 路径下。其用于管理访问 kube-apiserver 的配置信息,同时也支持访问多 kube-apiserver 的配置管理,可以在不同的环境下管理不同的集群配置,不同的业务线也可以拥有不同的集群。

文件中主要定义三种信息:

  • clusters:定义 Kubernetes 集群信息,例如 kube-apiserver 的服务地址及集群的证书信息等;

  • users:定义 Kubernetes 集群用户身份验证的客户端凭据,例如 client-certificate、client-key、token 及 username/password 等;

  • contexts:定义 Kubernetes 集群用户信息和命名空间等,用于将请求发送到指定的集群。

其中还有一个字段,current-context 表示当前所使用的上下文,以此来划分多集群。

config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)

在实际使用中,通过 BuildConfigFromFlags 方法加载 config 文件,实例化为 rest.Config 对象。其中最核心的功能是管理多个访问 kube-apiserver 集群的配置信息,将多个配置信息合并(merge)成一份,在合并的过程中会解决多个配置文件字段冲突的问题。

if len(rules.ExplicitPath) > 0 {
    if _, err := os.Stat(rules.ExplicitPath); os.IsNotExist(err) {
        return nil, err
    }
    kubeConfigFiles = append(kubeConfigFiles, rules.ExplicitPath)
} else {
    kubeConfigFiles = append(kubeConfigFiles, rules.Precedence...)
}

可以通过两种方式获取配置文件:

  • 文件路径(即rules.ExplicitPath);

  • 环境变量(通过KUBECONFIG变量,即rules.Precedence,可指定多个路径)。

最后通过文件路径加载文件,并反序列化为 Config 对象,给后续客户端创建使用。

1.2 RESTClient

RESTClient 对 HTTP Request 进行了封装,实现了 RESTful 风格的 API。它具有很高的灵活性,数据不依赖于方法和资源,因此 RESTClient 能够处理多种类型的调用,返回不同的数据格式。

通过以下几步可以创建一个客户端来使用:

  1. 加载 kubeconfig 文件;

  2. 设置 config.APIPath 路径;

  3. 设置 config.GroupVersion 资源版本;

  4. 设置 config.NegotiatedSerializer 编解码器;

  5. 实例化 rest.RESTClientFor 客户端对象;

  6. 构建执行请求,执行命令。

请求发送之前需要通过 r.URL.String 函数拼接请求参数生成请求的 RESTful URL,最后通过 Go 语言标准库 net/http 向 kube-apiserver 发送请求,得到的结果通过 fn 函数(即transformResponse)转换为资源对象。

1.3 ClientSet

ClientSet 在 RESTClient 的基础上封装了对内置 Resource 和 Version 的管理方法,每一个 Resource 和 Version 都以函数的方式暴露给开发者,例如,ClientSet 提供的 RbacV1、CoreV1、NetworkingV1 等接口函数。

client-set.png

1.4 DynamicClient

DynamicClient 是一种动态客户端,它可以对任意 Kubernetes 资源进行 RESTful 操作,包括 CRD 自定义资源。与 ClientSet 操作类似,但是对于资源的访问层面还存在差异,其内部实现了 Unstructured,用于处理非结构化数据结构,从而能够访问 CRD 资源。

dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
    log.Fatalln(err)
}
​
gvr := schema.GroupVersionResource{Version: "v1", Resource: "pods"}
dynamicClient.Resource(gvr).Namespace(corev1.NamespaceDefault).List(nil, metav1.ListOptions{Limit: 400})

1.5 DiscoveryClient

DiscoveryClient 是发现客户端,它主要用于发现 Kubernetes API Server 所支持的资源组、资源版本、资源信息,并且可以将这些信息存储到本地,用于本地缓存(Cache),以减轻对 Kubernetes API Server 访问的压力(缓存信息默认存储于~/.kube/cache和~/.kube/http-cache)。

存在缓存,则有缓存失效的时间, 默认每10分 钟与 Kubernetes API Server 同步一次,缓存未命中的情况下直接请求服务,并存储缓存。

2、Informer

组件之间通过 HTTP 协议进行通信,在不依赖任何中间件的情况下需要通过 Informer 机制保证消息的实时性、可靠性、顺序性等。

2.1 架构设计

  • Reflector:监控(Watch)指定的 Kubernetes 资源,当监控的资源发生变化时,触发相应的变更事件。例如Added(资源添加)事件、Updated(资源更新)事件、Deleted(资源删除)事件,并将其资源对象存放到本地缓存 DeltaFIFO 中。

  • DeltaFIFO:FIFO 是一个先进先出的队列,而 Delta 是一个资源对象存储,它可以保存资源对象的操作类型。

  • Indexer:是用来存储资源对象并自带索引功能的本地存储,Reflector 从 DeltaFIFO 中将消费出来的资源对象存储至 Indexer,数据与 Etcd 中完全保持一致。方便从本地存储中读取相应的资源对象数据,而无须每次从远程 Etcd 集群中读取,以减轻 Kubernetes API Server 和 Etcd 集群的压力。

2.2 代码样例

c := make(chan struct{})
defer close(c)
​
// clientset是用于与Kubernetes API Server交互的客户端,time.Minute用于设置多久进行一次resync(重新同步),周期性地执行List操作,将所有的资源存放在Informer Store中,如果该参数为0,则禁用resync功能
informerFactory := informers.NewSharedInformerFactory(client, time.Minute)
// 得到具体Pod资源的informer对象
informer := informerFactory.Core().V1().Pods().Informer()
// 
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: func(obj interface{}) {
​
    },
    UpdateFunc: func(oldObj, newObj interface{}) {
​
    },
    DeleteFunc: func(obj interface{}) {
​
    },
})
informer.Run(c)

添加资源事件回调方法,支持3种资源事件回调方法:

  • AddFunc:当创建Pod资源对象时触发的事件回调方法;

  • UpdateFunc:当更新Pod资源对象时触发的事件回调方法;

  • DeleteFunc:当删除Pod资源对象时触发的事件回调方法。

上面是针对于 pod 的监听回调,在 kubernetes 中每个资源都实现了 Informer 机制(实现 Informer 和 Lister 方法),因此所有资源都可以进行监听处理。

3、Reflector

Reflector 通过 ListerWatcher 入参来监听资源对象,其包含 List 和 Watch 两种方法,前者用于获取资源列表,后者则用于监控资源。

3.1 获取资源列表

在启动时获取该资源下所有的对象数据并将其存储至 DeltaFIFO 中,整个过程分为五个步骤执行:

  • 获取资源

    list, paginatedResult, err = pager.List(context.Background(), options)

    在这里传递了一个 options 参数,参数中 ResourceVersion 为 0 则表示获取所有的版本资源,为空则表示最新同步版本不可用,需要重新同步资源 Etcd 资源版本,否则获取特定的版本资源。

  • 获取版本

    listMetaInterface, err := meta.ListAccessor(list)
    resourceVersion = listMetaInterface.GetResourceVersion()

    每次修改当前资源对象时,Kubernetes API Server 都会更改 ResourceVersion,使得 client-go 执行 Watch 操作时可以根据 ResourceVersion 来确定当前资源对象是否发生变化。

  • 资源转换

    items, err := meta.ExtractList(list)

    将 runtime.Object 对象转换成 []runtime.Object 对象。因为获取的是资源下的所有对象的数据,例如所有的 Pod 资源数据,所以它是一个资源列表,需要进行转换使用。

  • 本地存储

    if err := r.syncWith(items, resourceVersion); err != nil {
        return fmt.Errorf("unable to sync list result: %v", err)
    }

    将资源对象列表中的资源对象和资源版本号存储至 DeltaFIFO 中,并会替换已存在的对象。

  • 更新版本

    r.setLastSyncResourceVersion(resourceVersion)

3.2 监控资源

Watch 使用 HTTP 协议的分块传输编码(Chunked Transfer Encoding),并与 Kubernetes API Server 建立长连接,当 client-go 调用 API Server 时,API Server 在 Response 的 HTTP Header 中设置 Transfer-Encoding 的值为 chunked,表示采用分块传输编码,客户端收到该信息后,便与服务端进行连接,并等待下一个数据块。

4、DeltaFIFO

type DeltaFIFO struct {
    // lock/cond protects access to 'items' and 'queue'.
    lock sync.RWMutex
    cond sync.Cond
        // Deltas 是一个 Delta 数组,其包含了对象信息 
    items map[string]Deltas
    queue []string
}

其本质上是一个先进先出的队列,有数据的生产者和消费者,其中生产者是 Reflector,通过调用 Add 方法注入数据,消费者是 Controller,通过调用 Pop 方法取出数据处理。

4.1 生产者

在 Added(资源添加)事件、Updated(资源更新)事件、Deleted(资源删除)事件中都调用了 queueActionLocked 方法,该方法实现就是往队列增加数据的。

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
        // 计算 queue key
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
       // 分装 Delta 对象
    newDeltas := append(f.items[id], Delta{actionType, obj})
       // 去重处理
    newDeltas = dedupDeltas(newDeltas)
​
    if len(newDeltas) > 0 {
        if _, exists := f.items[id]; !exists {
            f.queue = append(f.queue, id)
        }
        f.items[id] = newDeltas
               // 通知消费者解除阻塞
        f.cond.Broadcast()
    } else {
        delete(f.items, id)
    }
    return nil
}

4.2 消费者

通过传入一个处理方法,取出队列头部的对象进行处理。

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    f.lock.Lock()
    defer f.lock.Unlock()
    for {
        for len(f.queue) == 0 {
            if f.closed {
                return nil, ErrFIFOClosed
            }
                       // 空队列,等待唤醒
            f.cond.Wait()
        }
               // 取出头部对象 key
        id := f.queue[0]
               // 模拟队列出
        f.queue = f.queue[1:]
        if f.initialPopulationCount > 0 {
            f.initialPopulationCount--
        }
        item, ok := f.items[id]
        if !ok {
            // Item may have been deleted subsequently.
            continue
        }
        delete(f.items, id)
        err := process(item)
        if e, ok := err.(ErrRequeue); ok {
            f.addIfNotPresent(id, item)
            err = e.Err
        }
        return item, err
    }
}

4.3 Resync

Resync 机制会将 Indexer 本地存储中的资源对象同步到 DeltaFIFO 中,并将这些资源对象设置为 Sync 的操作类型。

5、Indexer

Indexer是一个存储资源对象并自带索引功能的本地存储,数据与 Etcd 集群中的数据保持完全一致。client-go 可以很方便地从本地存储中读取相应的资源对象数据,而无须每次都从远程 Etcd 集群中读取,这样可以减轻 API Server 和 Etcd 集群的压力。

5.1 ThreadSafeMap

是一个内存中的存储,其每次的增、删、改、查操作都会加锁,以保证数据的一致性。将资源对象数据存储于一个 map 数据结构中,key 是 <namespace>/<name> 格式,如果没有命名空间,则是一个单独的 name,value 则是资源对象。

5.2 索引器

在每次增、删、改 ThreadSafeMap 数据时,都会通过 updateIndices 或 deleteFromIndices 函数变更 Indexer,其被设计为可以自定义索引函数,有 4 个非常重要的数据结构,分别是 Indices、Index、Indexers 及 IndexFunc。

  • Indexers:存储索引器,key 为索引器名称,value 为索引器的实现函数;

  • IndexFunc:索引器函数,接收一个资源对象,返回检索结果列表;

  • Indices:存储缓存器,key为缓存器名称,value为缓存数据。

  • Index:存储缓存数据,其结构为K/V。

6、WorkQueue

Kubernetes 的 WorkQueue 队列与普通 FIFO 队列相比,实现略显复杂,它的主要功能在于标记和去重:

  1. 去重:相同元素在同一时间不会被重复处理,不管一个元素在处理之前被添加了多少次,它只会被处理一次;

  2. 并发性:多个消费者和生产者,一般情况下都是单生产多消费;

  3. 标记机制:支持标记功能,标记一个元素是否被处理,也允许元素在处理时重新排队;

  4. 通知机制:ShutDown 方法通过信号量通知队列不再接收新的元素,并通知 metric goroutine 退出;

  5. 延迟:支持延迟队列,延迟一段时间后再将元素存入队列;

  6. 限速:支持限速队列,元素存入队列时进行速率限制。限制一个元素被重新排队(Reenqueued)的次数;

6.1 FIFO队列

FIFO 队列接口,先进先出队列,并支持去重机制,队列接收一个 Type 类型元素。

type Type struct {
    queue []t
    dirty set
    processing set
}

其中主要关注三个字段:

  • queue:存储队列,实际的数据存储;

  • dirty:也是一个存储队列,一般和 queue 保持一致,用于存储处理期间遇见其他的相同元素;

  • processing:正在处理的元素,当使用 get 从队列中取出头部元素后,从队列中删除,并且用该字段来记录当前处理的元素。

着重分析其去重能力,首先是取出队列执行操作的逻辑,从队列中取出头部元素,并且删除,之后将数据丢入 processing 中,最后元素丢给消费者去处理。

func (q *Type) Get() (item interface{}, shutdown bool) {
    item, q.queue = q.queue[0], q.queue[1:]
    q.processing.insert(item)
    q.dirty.delete(item)
    return item, false
}

其次是添加队列执行操作的逻辑,如果已经在 dirty 中存在,则直接退出;如果正在 processing 中处理也直接退出。这里存在一个逻辑是正在处理的元素会先存入到 dirty 中,等到当前的元素被标记为 done 之后,才会从 dirty 挪回 queue 当中。

func (q *Type) Add(item interface{}) {
    if q.dirty.has(item) {
        return
    }
    q.dirty.insert(item)
    if q.processing.has(item) {
        return
    }
    q.queue = append(q.queue, item)
}

6.2 延迟队列

基于 FIFO 队列接口封装,在原有功能上增加了 AddAfter 方法,其原理是延迟一段时间后再将元素插入 FIFO 队列。

6.3 限速队列

限速队列利用延迟队列的特性,延迟某个元素的插入时间,达到限速目的。

  • 令牌桶算法

    令牌桶算法内部实现了一个存放 token 的“桶”,初始时“桶”是空的,token 会以固定速率往“桶”里填充,直到将其填满为止,多余的 token 会被丢弃。每个元素都会从令牌桶得到一个 token,只有得到 token 的元素才允许通过,而没有得到token的元素处于等待。

  • 排队指数算法

    排队指数算法将相同元素的排队数作为指数,排队数增大,速率限制呈指数级增长,但其最大值不会超过 maxDelay,在一个限速周期内接收到的相同元素,会被指数时间延迟,其他的正常延迟。

  • 计数器算法

    限制一段时间内允许通过的元素数量,例如在1分钟内只允许通过100个元素,每插入一个元素,计数器自增1,当计数器数到100的阈值且还在限速周期内时,则不允许元素再通过。

7、EventBroadcaster