Kun

Kun

IT学徒、技术民工、斜杠青年,机器人爱好者、摄影爱好 PS、PR、LR、达芬奇潜在学习者


共 279 篇文章


​ 基于云原生的周边工具

Kubectl

kubectl 是 Kubernetes 的命令行工具(CLI),是 Kubernetes 用户和管理员必备的管理

工具。该kubectl工具控制Kubernetes集群管理器。它可以让您检查集群资源,创建、删除和更新组

件以及更多功能。kubectl 提供了大量的子命令,方便管理 Kubernetes 集群中的各种功能。

  • kubectl -h 查看子命令列表
  • kubectl options 查看全局选项
  • kubectl --help 查看子命令的帮助
  • kubectl command -o= 设置输出格式(如 json、yaml、jsonpath 等)
  • kubectl explain RESOURCE 查看资源的定义
## 查看版本信息
kubectl version

## 查看资源对象简写
kubectl api-resouce

## 查看集群信息
kubectl cluster-info

## 查看 master 节点状态
kubectl get componentstatuses

kubectl get cs

## 查看命令空间
## 命令空间的作用:允许不同命令空间的相同类型的资源重名
kubectl get namespace

kubectl get ns

## 查看 default 命名空间的所有资源
kubectl get all 

## 创建命名空间
kubectl create ns app

## 删除命名空间
kubectl delete namespace app

## 发布
## 将资源暴露为新的Service
kubectl expose 

对于容器应用而言,kubernetes 提供了基于VIP(虚拟IP) 的网桥的方式访问Service,再由Service 重定向到相应的Pod

Service的type类型

ClusterIP: 提供一个集群内部的虚拟IP以供 Pod访问(service 默认类型)

NodePort: 在每个Node 上打开一个端口供外部访问,k8s 将会在每个Node上打开一个端口并且每个Node的端口都是一样的,通过NodeIp:NodePort的方式 K8s集群外部的程序可以访问Service

每个端口只能是一种服务,端口范围是 30000-32767

LoadBalancer: 通过设置 LoadBalancer 映射到云服务厂商提供的 LoadBalancer 地址. 这种用法仅用于在公有云服务提供商的云平台上设置Service的场景。通过外部的负载均衡器来访问,通常在云平台部署 LoadBalancer 还需要额外的费用

在service提交之后,k8s 就会调用 CloudProvider 在公有云创建一个负载均衡服务,并且把被代理的 Pod的IP地址配置给负载均衡服务做后端

externalName: 将Service 名称映射到一个 DNS 域名上,相当于 DNS 服务的CNAME 记录,用于让Pod去访问集群外部的资源,它本身没有绑定任务资源

## 查看 pod 网络状态信息和 Service 暴露的端口
kubectl get pods, svc -o wide

## 查看关联后端的节点
kubectl get endpoints

## 查看 service 的描述信息
kubectl describe svc nginx-service

## 回滚
kubectl rollout

## 回滚到上一个版本
kubectl rollout undo deployment/nginx

http://docs.kubernetes.org.cn/643.html

krew

krew 是一个用来管理 kubectl 插件的工具,类似于 apt 或 yum,支持搜索、安装和管理kubectl 插件。

安装

  set -x; cd "$(mktemp -d)" &&
  curl -fsSLO "https://github.com/kubernetes-sigs/krew/releases/download/v0.3.2/krew.{tar.gz,yaml}" &&
  tar zxvf krew.tar.gz &&
  ./krew-"$(uname | tr '[:upper:]' '[:lower:]')_amd64" install \
    --manifest=krew.yaml --archive=krew.tar.gz

添加环境变量

export PATH="${KREW_ROOT:-$HOME/.krew}/bin:$PATH"
source ~/.bashrc

使用

kubectl krew update: 插件索引更新

kubectl krew search: 插件搜索

kubectl krew install get-all: 安装插件

kubectl krew list: 插件已装插件

kubectl krew info ns: 查看插件详情

kubectl krew upgrade : 升级安装的插件

kubectl krew remove view-secret : 卸载插件

krew自身也作为一个“kubectl 插件”,因此,可以使用命令kubectl krew upgrade命令来升级krew

https://cloud.tencent.com/developer/article/1543178

Kubeadm

Kubeadm 是一个提供了 kubeadm initkubeadm join 的工具, 作为创建 Kubernetes 集群的 “快捷途径” 的最佳实践。

kubeadm 通过执行必要的操作来启动和运行最小可用集群。 按照设计,它只关注启动引导,而非配置机器。同样的, 安装各种 “锦上添花” 的扩展,例如 Kubernetes Dashboard、 监控方案、以及特定云平台的扩展,都不在讨论范围内。

相反, kubeadm 之上构建更高级别以及更加合规的工具, 理想情况下,使用 kubeadm 作为所有部署工作的基准将会更加易于创建一致性集群。

常用命令

kubeadm init: 此命令初始化一个 Kubernetes 控制平面节点。

kubeadm join: 此命令用来初始化 Kubernetes 工作节点并将其加入集群。

当节点加入 kubeadm 初始化的集群时,我们需要建立双向信任。 这个过程可以分解为发现(让待加入节点信任 Kubernetes 控制平面节点)和 TLS 引导(让 Kubernetes 控制平面节点信任待加入节点)两个部分。

有两种主要的发现方案。 第一种方法是使用共享令牌和 API 服务器的 IP 地址。 第二种是以文件形式提供标准 kubeconfig 文件的一个子集。 发现/kubeconfig 文件支持令牌、client-go 鉴权插件(“exec”)、“tokenFile" 和 "authProvider"。该文件可以是本地文件,也可以通过 HTTPS URL 下载。 格式是 kubeadm join --discovery-token abcdef.1234567890abcdef 1.2.3.4:6443kubeadm join --discovery-file path/to/file.conf 或者 kubeadm join --discovery-file https://url/file.conf。 只能使用其中一种。 如果发现信息是从 URL 加载的,必须使用 HTTPS。 此外,在这种情况下,主机安装的 CA 包用于验证连接。

如果使用共享令牌进行发现,还应该传递 --discovery-token-ca-cert-hash 参数来验证 Kubernetes 控制平面节点提供的根证书颁发机构(CA)的公钥。 此参数的值指定为 ":", 其中支持的哈希类型为 "sha256"。哈希是通过 Subject Public Key Info(SPKI)对象的字节计算的(如 RFC7469)。 这个值可以从 "kubeadm init" 的输出中获得,或者可以使用标准工具进行计算。 可以多次重复 --discovery-token-ca-cert-hash 参数以允许多个公钥。

如果无法提前知道 CA 公钥哈希,则可以通过 --discovery-token-unsafe-skip-ca-verification 参数禁用此验证。 这削弱了 kubeadm 安全模型,因为其他节点可能会模仿 Kubernetes 控制平面节点。

kubeadm config:在 kubeadm init 执行期间,kubeadm 将 ClusterConfiguration 对象上传 到你的集群的 kube-system 名字空间下名为 kubeadm-config 的 ConfigMap 对象中。 然后在 kubeadm joinkubeadm resetkubeadm upgrade 执行期间读取此配置。

kubeadm config print 命令打印默认静态配置, kubeadm 运行 kubeadm init and kubeadm join 时将使用此配置

kubeadm config migrate 来转换旧配置文件

kubeadm config images list : 列出 kubeadm 所需的镜像。

kubeadm config images pull : 拉取 kubeadm 所需的镜像。

kubeadm token: kubeadm init创建了一个有效期为24小时的令牌,token命令用于令牌相关的操作

kubeadm token create: 创建一个引导令牌。 你可以设置此令牌的用途,"有效时间" 和可选的人性化的描述。

kubeadm token delete: 删除指定的引导令牌列表

kubeadm token generate: 此命令将打印一个随机生成的可以被 "init" 和 "join" 命令使用的引导令牌。

kubeadm token list: 列出服务器上的引导令牌

kubeadm certs: 处理Kubernetes证书的相关命令

kubeadm certs renew: 可以使用 all 子命令来续订所有 Kubernetes 证书,也可以选择性地续订部分证书。

kubeadm certs certificate-key: 此命令可用来生成一个新的控制面证书密钥。密钥可以作为 --certificate-key 标志的取值传递给 kubeadm initkubeadm join 命令,从而在添加新的控制面节点时能够自动完成证书复制

kubeadm certs check-expiration: 此命令检查 kubeadm 所管理的本地 PKI 中的证书是否以及何时过期。

kubeadm certs generate-csr: 此命令可用来为所有控制面证书和 kubeconfig 文件生成密钥和 CSR(签名请求)。 用户可以根据自身需要选择 CA 为 CSR 签名。

kubeadm upgrade: 一个对用户友好的命令,它将复杂的升级逻辑包装在一个命令后面,支持升级的规划和实际执行。

kubeadm upgrade plan: 检查可升级到哪些版本,并验证你当前的集群是否可升级。

kubeadm upgrade apply: 将Kubernetes集群升级到指定版本

kubeadm upgrade diff: 显示哪些差异将被应用于现有的静态 pod 资源清单

kubeadm upgrade node: 升级集群中某个节点的命令

kubeadm reset: 还原kubeadm init或者kubeadm join命令对主机所做的任何更改

kubeadm init phase: kubeadm init phase 能确保调用引导过程的原子步骤。 因此,如果希望自定义应用,则可以让 kubeadm 做一些工作,然后填补空白。

kubeadm join phase: kubeadm join phase 使你能够调用 join 过程的基本原子步骤。 因此,如果希望执行自定义操作,可以让 kubeadm 做一些工作,然后由用户来补足剩余操作。

kubeadm upgrade phase: 使用此阶段,可以选择执行辅助控制平面或工作节点升级的单独步骤。请注意,kubeadm upgrade apply 命令仍然必须在主控制平面节点上调用。

kubeadm reset phase: kubeadm reset phase 使你能够调用 reset 过程的基本原子步骤。 因此,如果希望执行自定义操作,可以让 kubeadm 做一些工作,然后由用户来补足剩余操作。

kubeadm alpha:尝试试验性功能

证书过期

查看k8s证书

kubeadm alpha certs check-expiration

使用 kubeadm 部署的 k8s 集群,所以更新起证书也是比较方便的,默认的证书时间有效期是一年,我们集群的 k8s 版本是 1.15.3 版本是可以使用以下命令来更新证书的,但是一年之后还是会到期,这样就很麻烦,所以我们需要了解一下 k8s 的证书,然后我们来生成一个时间很长的证书,这样我们就可以不用去总更新证书了

kubeadm alpha certs renew all --config=kubeadm.yaml
systemctl restart kubelet
kubeadm init phase kubeconfig all --config kubeadm.yaml

然后将生成的配置文件替换,重启 kube-apiserver、kube-controller、kube-scheduler、etcd 这4个容器即可

kubeadm 会在控制面板升级的时候自动更新所有证书,所以使用 kubeadm 搭建得集群最佳的做法是经常升级集群,这样可以确保你的集群保持最新状态并保持合理的安全性。但是对于实际的生产环境我们可能并不会去频繁得升级集群,所以这个时候我们就需要去手动更新证书

Kubebuilder

Kubebuilder 是一个使用 CRDs 构建 K8s API 的 SDK,主要是:

  • 提供脚手架工具初始化 CRDs 工程,自动生成 boilerplate 代码和配置;
  • 提供代码库封装底层的 K8s go-client;

方便用户从零开始开发 CRDs,Controllers 和 Admission Webhooks 来扩展 K8s。

GVK = GroupVersionKind,GVR = GroupVersionResource。

API Group 是相关 API 功能的集合,每个 Group 拥有一或多个 Versions,用于接口的演进。

每个 GV 都包含多个 API 类型,称为 Kinds,在不同的 Versions 之间同一个 Kind 定义可能不同, Resource 是 Kind 的对象标识(resource type),一般来说 Kinds 和 Resources 是 1:1 的,比如 pods Resource 对应 Pod Kind,但是有时候相同的 Kind 可能对应多个 Resources,比如 Scale Kind 可能对应很多 Resources:deployments/scale,replicasets/scale,对于 CRD 来说,只会是 1:1 的关系。

每一个 GVK 都关联着一个 package 中给定的 root Go type,比如 apps/v1/Deployment 就关联着 K8s 源码里面 k8s.io/api/apps/v1 package 中的 Deployment struct,我们提交的各类资源定义 YAML 文件都需要写:

  • apiVersion:这个就是 GV 。
  • kind:这个就是 K。

根据 GVK K8s 就能找到你到底要创建什么类型的资源,根据你定义的 Spec 创建好资源之后就成为了 Resource,也就是 GVR。GVK/GVR 就是 K8s 资源的坐标,是我们创建/删除/修改/读取资源的基础。

每一组 Controllers 都需要一个 Scheme,提供了 Kinds 与对应 Go types 的映射,也就是说给定 Go type 就知道他的 GVK,给定 GVK 就知道他的 Go type,比如说我们给定一个 Scheme: "tutotial.kubebuilder.io/api/v1".CronJob{} 这个 Go type 映射到 batch.tutotial.kubebuilder.io/v1 的 CronJob GVK,那么从 Api Server 获取到下面的 JSON:

{

"kind": "CronJob",

"apiVersion": "batch.tutorial.kubebuilder.io/v1",

...}

就能构造出对应的 Go type了,通过这个 Go type 也能正确地获取 GVR 的一些信息,控制器可以通过该 Go type 获取到期望状态以及其他辅助信息进行调谐逻辑。

Kubebuilder 的核心组件,具有 3 个职责:

  • 负责运行所有的 Controllers;
  • 初始化共享 caches,包含 listAndWatch 功能;
  • 初始化 clients 用于与 Api Server 通信。

Cache

Kubebuilder 的核心组件,负责在 Controller 进程里面根据 Scheme 同步 Api Server 中所有该 Controller 关心 GVKs 的 GVRs,其核心是 GVK -> Informer 的映射,Informer 会负责监听对应 GVK 的 GVRs 的创建/删除/更新操作,以触发 Controller 的 Reconcile 逻辑。

Controller

Kubebuidler 为我们生成的脚手架文件,我们只需要实现 Reconcile 方法即可。

Clients

在实现 Controller 的时候不可避免地需要对某些资源类型进行创建/删除/更新,就是通过该 Clients 实现的,其中查询功能实际查询是本地的 Cache,写操作直接访问 Api Server。

Index

由于 Controller 经常要对 Cache 进行查询,Kubebuilder 提供 Index utility 给 Cache 加索引提升查询效率。

Finalizer

在一般情况下,如果资源被删除之后,我们虽然能够被触发删除事件,但是这个时候从 Cache 里面无法读取任何被删除对象的信息,这样一来,导致很多垃圾清理工作因为信息不足无法进行,K8s 的 Finalizer 字段用于处理这种情况。在 K8s 中,只要对象 ObjectMeta 里面的 Finalizers 不为空,对该对象的 delete 操作就会转变为 update 操作,具体说就是 update deletionTimestamp 字段,其意义就是告诉 K8s 的 GC“在deletionTimestamp 这个时刻之后,只要 Finalizers 为空,就立马删除掉该对象”。

所以一般的使用姿势就是在创建对象时把 Finalizers 设置好(任意 string),然后处理 DeletionTimestamp 不为空的 update 操作(实际是 delete),根据 Finalizers 的值执行完所有的 pre-delete hook(此时可以在 Cache 里面读取到被删除对象的任何信息)之后将 Finalizers 置为空即可。

OwerReference

K8s GC 在删除一个对象时,任何 ownerReference 是该对象的对象都会被清除,与此同时,Kubebuidler 支持所有对象的变更都会触发 Owner 对象 controller 的 Reconcile 方法。

命令

kubebuilder init --domain edas.io

这一步创建了一个 Go module 工程,引入了必要的依赖,创建了一些模板文件。

kubebuilder create api --group apps --version v1alpha1 --kind Application

这一步创建了对应的 CRD 和 Controller 模板文件

源码

Kubebuilder 创建的 main.go 是整个项目的入口,逻辑十分简单

var (
	scheme   = runtime.NewScheme()
	setupLog = ctrl.Log.WithName("setup")
)
func init() {
	appsv1alpha1.AddToScheme(scheme)
	// +kubebuilder:scaffold:scheme
}
func main() {
	...
        // 1、init Manager
	mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme, MetricsBindAddress: metricsAddr})
	if err != nil {
		setupLog.Error(err, "unable to start manager")
		os.Exit(1)
	}
        // 2、init Reconciler(Controller)
	err = (&controllers.ApplicationReconciler{
		Client: mgr.GetClient(),
		Log:    ctrl.Log.WithName("controllers").WithName("Application"),
		Scheme: mgr.GetScheme(),
	}).SetupWithManager(mgr)
	if err != nil {
		setupLog.Error(err, "unable to create controller", "controller", "EDASApplication")
		os.Exit(1)
	}
	// +kubebuilder:scaffold:builder
	setupLog.Info("starting manager")
        // 3、start Manager
	if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
		setupLog.Error(err, "problem running manager")
		os.Exit(1)
	}

可以看到在 init 方法里面我们将 appsv1alpha1 注册到 Scheme 里面去了,这样一来 Cache 就知道 watch 谁了,main 方法里面的逻辑基本都是 Manager 的:

  1. 初始化了一个 Manager;
  2. 将 Manager 的 Client 传给 Controller,并且调用 SetupWithManager 方法传入 Manager 进行 Controller 的初始化;
  3. 启动 Manager。

Manager初始化

// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {
	...
	// Create the cache for the cached read client and registering informers
	cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
	if err != nil {
		return nil, err
	}
	apiReader, err := client.New(config, client.Options{Scheme: options.Scheme, Mapper: mapper})
	if err != nil {
		return nil, err
	}
	writeObj, err := options.NewClient(cache, config, client.Options{Scheme: options.Scheme, Mapper: mapper})
	if err != nil {
		return nil, err
	}
	...
	return &controllerManager{
		config:           config,
		scheme:           options.Scheme,
		errChan:          make(chan error),
		cache:            cache,
		fieldIndexes:     cache,
		client:           writeObj,
		apiReader:        apiReader,
		recorderProvider: recorderProvider,
		resourceLock:     resourceLock,
		mapper:           mapper,
		metricsListener:  metricsListener,
		internalStop:     stop,
		internalStopper:  stop,
		port:             options.Port,
		host:             options.Host,
		leaseDuration:    *options.LeaseDuration,
		renewDeadline:    *options.RenewDeadline,
		retryPeriod:      *options.RetryPeriod,
	}, nil
}

创建Cache

Cache初始化代码

// New initializes and returns a new Cache.
func New(config *rest.Config, opts Options) (Cache, error) {
	opts, err := defaultOpts(config, opts)
	if err != nil {
		return nil, err
	}
	im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
	return &informerCache{InformersMap: im}, nil
}
// newSpecificInformersMap returns a new specificInformersMap (like
// the generical InformersMap, except that it doesn't implement WaitForCacheSync).
func newSpecificInformersMap(...) *specificInformersMap {
	ip := &specificInformersMap{
		Scheme:            scheme,
		mapper:            mapper,
		informersByGVK:    make(map[schema.GroupVersionKind]*MapEntry),
		codecs:            serializer.NewCodecFactory(scheme),
		resync:            resync,
		createListWatcher: createListWatcher,
		namespace:         namespace,
	}
	return ip
}
// MapEntry contains the cached data for an Informer
type MapEntry struct {
	// Informer is the cached informer
	Informer cache.SharedIndexInformer
	// CacheReader wraps Informer and implements the CacheReader interface for a single type
	Reader CacheReader
}
func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
        ...
	// Create a new ListWatch for the obj
	return &cache.ListWatch{
		ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
			if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
				return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(opts)
			}
			return dynamicClient.Resource(mapping.Resource).List(opts)
		},
		// Setup the watch function
		WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
			// Watch needs to be set to true separately
			opts.Watch = true
			if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
				return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).Watch(opts)
			}
			return dynamicClient.Resource(mapping.Resource).Watch(opts)
		},
	}, nil
}

Cache 主要就是创建了 InformersMap,Scheme 里面的每个 GVK 都创建了对应的 Informer,通过 informersByGVK 这个 map 做 GVK 到 Informer 的映射,每个 Informer 会根据 ListWatch 函数对对应的 GVK 进行 List 和 Watch

创建Clients

创建Clients 很简单

// defaultNewClient creates the default caching client
func defaultNewClient(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
	// Create the Client for Write operations.
	c, err := client.New(config, options)
	if err != nil {
		return nil, err
	}
	return &client.DelegatingClient{
		Reader: &client.DelegatingReader{
			CacheReader:  cache,
			ClientReader: c,
		},
		Writer:       c,
		StatusClient: c,
	}, nil
}

读操作使用上面创建的 Cache,写操作使用 K8s go-client 直连。

Controller初始化

func (r *EDASApplicationReconciler) SetupWithManager(mgr ctrl.Manager) error {
		err := ctrl.NewControllerManagedBy(mgr).
		For(&appsv1alpha1.EDASApplication{}).
		Complete(r)
		return err
}

使用的是 Builder 模式,NewControllerManagerBy 和 For 方法都是给 Builder 传参,最重要的是最后一个方法 Complete,其逻辑是

func (blder *Builder) Build(r reconcile.Reconciler) (manager.Manager, error) {
...
	// Set the Manager
	if err := blder.doManager(); err != nil {
		return nil, err
	}
	// Set the ControllerManagedBy
	if err := blder.doController(r); err != nil {
		return nil, err
	}
	// Set the Watch
	if err := blder.doWatch(); err != nil {
		return nil, err
	}
...
	return blder.mgr, nil
}

主要是看看 doController 和 doWatch 方法

doController方法

func New(name string, mgr manager.Manager, options Options) (Controller, error) {
	if options.Reconciler == nil {
		return nil, fmt.Errorf("must specify Reconciler")
	}
	if len(name) == 0 {
		return nil, fmt.Errorf("must specify Name for Controller")
	}
	if options.MaxConcurrentReconciles <= 0 {
		options.MaxConcurrentReconciles = 1
	}
	// Inject dependencies into Reconciler
	if err := mgr.SetFields(options.Reconciler); err != nil {
		return nil, err
	}
	// Create controller with dependencies set
	c := &controller.Controller{
		Do:                      options.Reconciler,
		Cache:                   mgr.GetCache(),
		Config:                  mgr.GetConfig(),
		Scheme:                  mgr.GetScheme(),
		Client:                  mgr.GetClient(),
		Recorder:                mgr.GetEventRecorderFor(name),
		Queue:                   workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
		MaxConcurrentReconciles: options.MaxConcurrentReconciles,
		Name:                    name,
	}
	// Add the controller as a Manager components
	return c, mgr.Add(c)
}

该方法初始化了一个 Controller,传入了一些很重要的参数:

  • Do:Reconcile 逻辑;
  • Cache:找 Informer 注册 Watch;
  • Client:对 K8s 资源进行 CRUD;
  • Queue:Watch 资源的 CUD 事件缓存;
  • Recorder:事件收集。

doWatch方法

func (blder *Builder) doWatch() error {
	// Reconcile type
	src := &source.Kind{Type: blder.apiType}
	hdler := &handler.EnqueueRequestForObject{}
	err := blder.ctrl.Watch(src, hdler, blder.predicates...)
	if err != nil {
		return err
	}
	// Watches the managed types
	for _, obj := range blder.managedObjects {
		src := &source.Kind{Type: obj}
		hdler := &handler.EnqueueRequestForOwner{
			OwnerType:    blder.apiType,
			IsController: true,
		}
		if err := blder.ctrl.Watch(src, hdler, blder.predicates...); err != nil {
			return err
		}
	}
	// Do the watch requests
	for _, w := range blder.watchRequest {
		if err := blder.ctrl.Watch(w.src, w.eventhandler, blder.predicates...); err != nil {
			return err
		}
	}
	return nil
}

可以看到该方法对本 Controller 负责的 CRD 进行了 watch,同时底下还会 watch 本 CRD 管理的其他资源,这个 managedObjects 可以通过 Controller 初始化 Buidler 的 Owns 方法传入,说到 Watch 我们关心两个逻辑:

  1. 注册的 handler

可以看到 Kubebuidler 为我们注册的 Handler 就是将发生变更的对象的 NamespacedName 入队列,如果在 Reconcile 逻辑中需要判断创建/更新/删除,需要有自己的判断逻辑。

  1. 注册的流程
// Watch implements controller.Controller
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
	...
	log.Info("Starting EventSource", "controller", c.Name, "source", src)
	return src.Start(evthdler, c.Queue, prct...)
}
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
func (is *Informer) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,
	...
	is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
	return nil
}

我们的 Handler 实际注册到 Informer 上面,这样整个逻辑就串起来了,通过 Cache 我们创建了所有 Scheme 里面 GVKs 的 Informers,然后对应 GVK 的 Controller 注册了 Watch Handler 到对应的 Informer,这样一来对应的 GVK 里面的资源有变更都会触发 Handler,将变更事件写到 Controller 的事件队列中,之后触发我们的 Reconcile 方法。

Manager启动

func (cm *controllerManager) Start(stop <-chan struct{}) error {
	...
	go cm.startNonLeaderElectionRunnables()
	...
}
func (cm *controllerManager) startNonLeaderElectionRunnables() {
	...
	// Start the Cache. Allow the function to start the cache to be mocked out for testing
	if cm.startCache == nil {
		cm.startCache = cm.cache.Start
	}
	go func() {
		if err := cm.startCache(cm.internalStop); err != nil {
			cm.errChan <- err
		}
	}()
        ...
        // Start Controllers
	for _, c := range cm.nonLeaderElectionRunnables {
		ctrl := c
		go func() {
			cm.errChan <- ctrl.Start(cm.internalStop)
		}()
	}
	cm.started = true
}

Cache启动

func (ip *specificInformersMap) Start(stop <-chan struct{}) {
	func() {
		...
		// Start each informer
		for _, informer := range ip.informersByGVK {
			go informer.Informer.Run(stop)
		}
	}()
}
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
        ...
        // informer push resource obj CUD delta to this fifo queue
	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,
    // handler to process delta
		Process: s.HandleDeltas,
	}
	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
    // this is internal controller process delta generate by reflector
		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()
        ...
	wg.StartWithChannel(processorStopCh, s.processor.run)
	s.controller.Run(stopCh)
}
func (c *controller) Run(stopCh <-chan struct{}) {
	...
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	...
        // reflector is delta producer
	wg.StartWithChannel(stopCh, r.Run)
        // internal controller's processLoop is comsume logic
	wait.Until(c.processLoop, time.Second, stopCh)
}

Cache 的初始化核心是初始化所有的 Informer,Informer 的初始化核心是创建了 reflector 和内部 controller,reflector 负责监听 Api Server 上指定的 GVK,将变更写入 delta 队列中,可以理解为变更事件的生产者,内部 controller 是变更事件的消费者,他会负责更新本地 indexer,以及计算出 CUD 事件推给我们之前注册的 Watch Handler。

Controller启动

// Start implements controller.Controller
func (c *Controller) Start(stop <-chan struct{}) error {
	...
	for i := 0; i < c.MaxConcurrentReconciles; i++ {
		// Process work items
		go wait.Until(func() {
			for c.processNextWorkItem() {
			}
		}, c.JitterPeriod, stop)
	}
	...
}
func (c *Controller) processNextWorkItem() bool {
	...
	obj, shutdown := c.Queue.Get()
	...
	var req reconcile.Request
	var ok bool
	if req, ok = obj.(reconcile.Request); 
        ...
	// RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the
	// resource to be synced.
	if result, err := c.Do.Reconcile(req); err != nil {
		c.Queue.AddRateLimited(req)
		...
	} 
        ...
}

Controller 的初始化是启动 goroutine 不断地查询队列,如果有变更消息则触发到我们自定义的 Reconcile 逻辑

Operator Framework

https://github.com/operator-framework/operator-sdk

Kruise

Kruise 对 Kubernetes 扩展的控制器中有一个重要功能就是支持原地升级

原地升级(In-place update)是一种 Kubernetes 中的 Pod 升级方式,这种升级方式可以更新 Pod 中某一个或多个容器的镜像版本,而不影响 Pod 中其余容器的运行,同时保持 Pod 的网络和存储状态不变。

Kubernetes 原生工作负载,不论是 Deployment、StatefulSet 还是 Pod 本身,如果你想升级 Pod 中的镜像,那么 Kubernetes 就会重新销毁该 Pod 并重新调度并创建一个 Pod,对于 StatefulSet 虽然可以保持原有 Pod 的名字,但是实际 UIDPod IP 都将发生改变。如果你还使用了 Istio,那么在更新 Sidecar 容器的时候,所有植入 Sidecar 容器的 Pod 都需要销毁、重新调度和重建,这将带来极大的开销,同时也影响了业务的稳定性。

原地升级的优势:

原地升级的模式极大地提升了应用发布的效率,

  1. 节省了调度的耗时,Pod 的位置、资源都不发生变化;
  2. 节省了分配网络的耗时,Pod 还使用原有的 IP;
  3. 节省了分配、挂载远程盘的耗时,Pod 还使用原有的 PV(且都是已经在 Node 上挂载好的);
  4. 节省了大部分拉取镜像的耗时,因为 Node 上已经存在了应用的旧镜像,当拉取新版本镜像时只需要下载很少的几层 layer;

而且用户要想在 Kubernetes 中使用原地升级也是极其容易的,只需要安装 OpenKruise,并在使用 Kruise 的 CRD 并在 策略中设置即可。

配置原地升级:

https://developer.aliyun.com/article/765421

https://www.cnblogs.com/xunweidezui/p/16646045.html

https://openkruise.io/zh/docs/

集群联邦

集群联邦 Federation 的目的是实现单一集群统一管理多个kubernetes集群的机制。这些集群可以是跨地域的,跨云厂商的或者是用户内部自建集群。一旦集群建立联邦后,就可以使用集群 Federation API 来管理多个集群的 kubernetes API 资源。

kubernetes集群联邦主要是实现以下目标

1)简化管理多个联邦集群的Kubernetes API 资源

2)在多个集群之间分散工作负载(容器),以提升应用(服务)的可靠性

3)在不同集群中,能更快速更容易地迁移应用(服务)

4)跨集群的服务发现,服务可以就近访问,以降低延迟

5)实践多云(Multi-cloud)或混合云(Hybird Cloud)的部署

集群联邦最初是 v1 版本,因为方案设计问题,导致可扩展性比较差,已经被废弃,目前社区提出了新的方案 Federation V2

从上图架构中得知Federation v1 的设计沿用类似Kubernetes 模型,其主要组件有以下:

federation-apiserver :提供Federation API资源,只支持部分Kubernetes API resources。
federation-controller-manager :协调不同集群之间的状态,如同步Federated资源与策略,并建立Kubernetes组件至对应集群上。
etcd :储存Federation的状态。

该方案设计之初没有考虑到crd的特性,支持的 Federation API 资源类型都是写死的,无法有效的扩展,不能兼容新的api资源,不支持跨集群权限管理,如不支持RBAC。联邦层级的设定与策略依赖API 资源的Annotations 内容,这使得弹性不佳。

Federation V2

Federation V2 是Kubernetes SIG Multi-Cluster团队新提出的集群联邦架构( Architecture Doc与Brainstorming Doc ),新架构在Federation v1基础之上,简化扩展Federated API过程,并加强跨集群服务发现与编排的功能。

相较于V1,Federation V2移除了 federation-apiserver 组件,通过 crd 机制来完成 federated resource的扩充,kubefed-controller 组件通过监听crd的变化来完成联邦资源的同步和调度等功能。

集群联邦安装文档 DOC,集群成员注册后,会在管理集群创建一个 KubeFedCluster 资源来存储集群的基本信息,如API Endpoint、CA Bundle等,kubefed controller 通过这些信息来访问和管理联邦集群成员。

联邦化资源

Federation V2 可以联邦化任意资源,包括自定义的crd资源。对集群资源联邦化的实现主要是通过两种CRD来完成,分别是 FederatedTypeConfig 和 Federated ,FederatedTypeConfig定义了 Federated 和kubernetes api资源的关联关系。而 Federated 用来定义怎么去联邦化对应的kubernetes api资源。

KubeFed提供了一种自动化机制来将工作负载实例分散到不同的集群中,且能够基于总副本数与集群的定义策略来将Deployment或ReplicaSet资源进行编排。编排策略是通过创建ReplicaSchedulingPreference(RSP),再由KubeFed RSP Controller监听与获取RSP内容来将工作负载实例建立到指定的集群上。

管理K8s集群的工具

miniKube

Kind

Cloud Providers

如果你觉得我的文章对你有帮助的话,希望可以推荐和交流一下。欢迎關注和 Star 本博客或者关注我的 Github