Etcd 集群是分布式 K/V 存储集群,提供了可靠的强一致性服务发现,存储 Kubernetes 系统的集群状态和元数据,包括所有资源对象的信息、状态、节点信息等。Kubernetes 将所有数据存储至 Etcd 集群前缀为 /registry 的目录下。

1、架构设计

1.1 分层设计

etcd-structure.png

  • RESTStorage:实现了 RESTful 风格的对外资源存储服务的 API 接口;

  • RegistryStore:实现了资源存储的通用操作,如同代理一样,存储之前需要执行函数,存储之后也要执行处理函数;

  • Storage.Interface:通用存储接口,该接口定义了资源的操作方法(Create、Delete、Watch......);

  • CacherStorage:带有缓存功能的资源存储对象,是 Storage.Interface 的一个实现;

  • UnderlyingStorage:同样也是 Storage.Interface 的一个实现,与 Etcd 集群直接交互层。

1.2 RESTStorage

Kubernetes 的每种资源(包括子资源)都提供了 RESTful 风格的对外资源存储服务 API 接口,所有通过 RESTful API 对外暴露的资源都必须实现 RESTStorage 接口。

type Storage interface {
    New() runtime.Object
    Destroy()
}

从而封装 RegistryStore 操作,最终的操作实际上都是调用了该接口的通用实现。

1.3 RegistryStore

当通过RegistryStore存储了一个资源对象时,RegistryStore中定义了如下两种函数。

  • Before Func:也称 Strategy 预处理,它被定义为在创建资源对象之前调用,做一些预处理工作。

  • After Func:它被定义为在创建资源对象之后调用,做一些收尾工作(实际上没有使用上)。

type Store struct {
    CreateStrategy rest.RESTCreateStrategy
    BeginCreate BeginCreateFunc
    AfterCreate AfterCreateFunc
    UpdateStrategy rest.RESTUpdateStrategy
    BeginUpdate BeginUpdateFunc
    AfterUpdate AfterUpdateFunc
    DeleteStrategy rest.RESTDeleteStrategy
    AfterDelete AfterDeleteFunc
}

定义了三种预处理方法,CreateStrategy(创建资源对象时的预处理操作)、UpdateStrategy(更新资源对象是的预处理操作)、DeleteStrategy(删除资源对象时的预处理操作);五种后置处理操作。

1.4 Storage.Interface

type Interface interface {
    Versioner() Versioner
    Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error
    Delete(
        ctx context.Context, key string, out runtime.Object, preconditions *Preconditions,
        validateDeletion ValidateObjectFunc, cachedExistingObject runtime.Object) error
    Watch(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)
    Get(ctx context.Context, key string, opts GetOptions, objPtr runtime.Object) error
    GetList(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error
    GuaranteedUpdate(
        ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool,
        preconditions *Preconditions, tryUpdate UpdateFunc, cachedExistingObject runtime.Object) error
    Count(key string) (int64, error)
}
  • Versioner:资源版本管理器,用于管理 Etcd 集群中的数据版本对象;

  • GuaranteedUpdate:保证传入的 tryUpdate 函数运行成功;

其中, CacherStorageUnderlyingStorage 都实现了上述接口定义,CacherStorage 实际上是在 UnderlyingStorage 之上封装了一层缓存层,在 genericregistry.StorageWithCacher 函数实例化的过程中,也会创建 UnderlyingStorage 底层存储对象(通过参数 --watch-cache 开启,默认开启缓存的)。

2、CacherStorage

不是所有的操作都配置缓存,Get、GetList、GuaranteedUpdate、Watch 配置了缓存,其中 Watch 的缓存使用缓存滑动窗口来保证历史事件不会丢失。

etcd-cache.png

2.1 watchCache

通过 Reflector 框架与 UnderlyingStorage 底层存储对象交互,并将回调事件分别存储至 w.onEvent、w.cache、cache.Store 中。

  • w.onEvent:将事件回调给 CacherStorage,将其分发给目前所有已连接的观察者,该过程通过非阻塞机制实现;

  • w.cache:将事件存储至缓存滑动窗口,它提供了对 Watch 操作的缓存数据,防止因网络或其他原因观察者连接中断,导致事件丢失;

  • cache.Store:将事件存储至本地缓存,cache.Store 与 client-go 下的 Indexer 功能相同。

2.2 cacher

Cacher 接收到 watchCache 回调的事件,遍历目前所有已连接的观察者,并将事件逐个分发给每个观察者,该过程通过非阻塞机制实现,不会阻塞任何一个观察者。

2.3 cacheWatcher

每一个发送 Watch 请求的客户端都会分配一个 cacheWatcher,用于客户端接收 Watch 事件。

当客户端发起 Watch 请求时,通过 newCacheWatcher 函数实例化 cacheWatcher 对象,并为其分配一个 id,该 id 是唯一的,从 0 开始计数,每次有新的客户端发送 Watch 请求时,该 id 会自增1,但在Kubernetes API Server重启时其会被清零。

2.4 ResourceVersion

所有 Kubernetes 资源都有一个资源版本号,在资源发生变更时,该版本号会变化,从而告诉其他的客户端资源变动,需要更新。

其依赖于 Etcd 集群中的全局 Index 机制来管理的,包括 createdIndex 和 modifiedIndex,它们用于跟踪 Etcd 集群中的数据发生了什么。

  • createdIndex:全局唯一且递增的正整数,每次在 Etcd 集群中创建 key 时其会递增;

  • modifiedIndex:与前者功能类似,每次在 Etcd 集群中修改 key 时其会递增。

2.5 滑动缓存窗口

提供了对 Watch 操作的缓存数据(事件的历史数据),防止客户端因网络或其他原因连接中断,导致事件丢失。

type watchCache struct {
    capacity int
    cache      []*watchCacheEvent
    startIndex int
    endIndex   int
}
  • capacity:窗口大小,可缓存的数量(通过--default-watch-cache-size参数指定,默认的缓存滑动窗口的大小为100。如果将其设置为0,则表示禁用watchCache);

  • cache:缓存窗口;

  • startIndex:开始下标;

  • endIndex:结束下标。

FIFO 的缓存方式类似,当缓存窗口满了优先淘汰最先进入的缓存对象。

3、Strategy

type GenericStore interface {
       // 创建资源对象时的预处理操作
    GetCreateStrategy() rest.RESTCreateStrategy
       // 更新资源对象时的预处理操作
    GetUpdateStrategy() rest.RESTUpdateStrategy
       // 删除资源对象时的预处理操作
    GetDeleteStrategy() rest.RESTDeleteStrategy
}

3.1 GetCreateStrategy

type RESTCreateStrategy interface {
    NamespaceScoped() bool
    PrepareForCreate(ctx context.Context, obj runtime.Object)
    Validate(ctx context.Context, obj runtime.Object) field.ErrorList
    Canonicalize(obj runtime.Object)
}
  • NamespaceScoped:判断当前资源对象是否拥有所属的命名空间,如有所属的命名空间,则返回true,否则返回false;

  • PrepareForCreate:创建当前资源对象之前的处理函数;

  • Validate:创建当前资源对象之前的验证函数。验证资源对象的字段信息,此方法不会修改资源对象;

  • Canonicalize:在创建当前资源对象之前将存储的资源对象规范化。在当前的 Kubernetes 系统中,并未使用该方法。

3.2 GetUpdateStrategy

type RESTUpdateStrategy interface {
    NamespaceScoped() bool
    AllowCreateOnUpdate() bool
    PrepareForUpdate(ctx context.Context, obj, old runtime.Object)
    ValidateUpdate(ctx context.Context, obj, old runtime.Object) field.ErrorList
    Canonicalize(obj runtime.Object)
    AllowUnconditionalUpdate() bool
}
  • NamespaceScoped:判断当前资源对象是否拥有所属的命名空间,如有所属的命名空间,则返回true,否则返回false;

  • AllowCreateOnUpdate:在更新当前资源对象时,如果资源对象已存在,确定是否允许重新创建资源对象;

  • PrepareForUpdate:更新当前资源对象之前的处理函数;

  • ValidateUpdate:更新当前资源对象之前的验证函数。验证资源对象的字段信息,此方法不会修改资源对象;

  • Canonicalize:在更新当前资源对象之前将存储的资源对象规范化。在当前的Kubernetes系统中,并未使用该方法;

  • AllowUnconditionalUpdate:在更新当前资源对象时,如果未指定资源版本,确定是否运行更新操作。