摘要:為所有對(duì)外提供服務(wù)的資源實(shí)現(xiàn)了一套通用的符合要求的操作接口,每個(gè)服務(wù)接口負(fù)責(zé)處理一類資源對(duì)象。該接口最終返回了的和清除操作資源的接口。
源碼版本
Kubernetes v1.5.0
簡(jiǎn)介k8s的各個(gè)組件與apiServer交互操作各種資源對(duì)象,最終都會(huì)落入到etcd中。
k8s為所有對(duì)外提供服務(wù)的Restful資源實(shí)現(xiàn)了一套通用的符合Restful要求的etcd操作接口,每個(gè)服務(wù)接口負(fù)責(zé)處理一類(Kind)資源對(duì)象。
這些資源對(duì)象包括pods、bindings、podTemplates、RC、Services等。
要了解etcd操作接口的實(shí)現(xiàn),我們先需要了解下Master.GenericAPIServer.storage結(jié)構(gòu):
storage map[string]rest.Storage
該storage變量是個(gè)map,Key是REST API的path,Value是rest.Storage接口,該接口就是一個(gè)通用的符合Restful要求的資源存儲(chǔ)接口。
核心組資源列表的創(chuàng)建要查看pkg/registry/core/rest/storage_core.go中的NewLegacyRESTStorage()接口:
接口調(diào)用流程: main --> App.Run --> config.Complete().New() --> m.InstallLegacyAPI() --> NewLegacyRESTStorage()
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter genericapiserver.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) { .... // 創(chuàng)建podStorage podStorage := podetcd.NewStorage( restOptionsGetter(api.Resource("pods")), nodeStorage.KubeletConnectionInfo, c.ProxyTransport, podDisruptionClient, ) ... // 資源列表 restStorageMap := map[string]rest.Storage{ "pods": podStorage.Pod, "pods/attach": podStorage.Attach, "pods/status": podStorage.Status, "pods/log": podStorage.Log, "pods/exec": podStorage.Exec, "pods/portforward": podStorage.PortForward, "pods/proxy": podStorage.Proxy, "pods/binding": podStorage.Binding, "bindings": podStorage.Binding, "podTemplates": podTemplateStorage, "replicationControllers": controllerStorage.Controller, "replicationControllers/status": controllerStorage.Status, "services": serviceRest.Service, "services/proxy": serviceRest.Proxy, "services/status": serviceStatusStorage, "endpoints": endpointsStorage, ... "componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate), } if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}) { restStorageMap["replicationControllers/scale"] = controllerStorage.Scale } if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "policy", Version: "v1beta1"}) { restStorageMap["pods/eviction"] = podStorage.Eviction } apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap return restStorage, apiGroupInfo, nil }
該接口在ApiServer源碼分析的第二章介紹資源注冊(cè)的時(shí)候已經(jīng)講過,這里我們主要分析后端存儲(chǔ)etcd操作接口的實(shí)現(xiàn)。
我們以Pod資源為例,進(jìn)行介紹:
路徑: pkg/registry/core/pod/etcd/etcd.go
func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) PodStorage { // 完成prefix prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &api.PodList{} } // 調(diào)用接口裝飾器,返回該storage的etcd操作接口及資源delete接口 // 該opts傳參進(jìn)來的,需要到上一層查看master.go下的restOptionsFactory.NewFor storageInterface, dFunc := opts.Decorator( opts.StorageConfig, // 這一下的參數(shù)都是用于開啟cache時(shí)的接口使用 cachesize.GetWatchCacheSizeByResource(cachesize.Pods), &api.Pod{}, prefix, pod.Strategy, newListFunc, pod.NodeNameTriggerFunc, ) // 創(chuàng)建Store對(duì)象 store := ®istry.Store{ NewFunc: func() runtime.Object { return &api.Pod{} }, NewListFunc: newListFunc, KeyRootFunc: func(ctx api.Context) string { return registry.NamespaceKeyRootFunc(ctx, prefix) }, KeyFunc: func(ctx api.Context, name string) (string, error) { return registry.NamespaceKeyFunc(ctx, prefix, name) }, ObjectNameFunc: func(obj runtime.Object) (string, error) { return obj.(*api.Pod).Name, nil }, PredicateFunc: pod.MatchPod, QualifiedResource: api.Resource("pods"), EnableGarbageCollection: opts.EnableGarbageCollection, DeleteCollectionWorkers: opts.DeleteCollectionWorkers, CreateStrategy: pod.Strategy, UpdateStrategy: pod.Strategy, DeleteStrategy: pod.Strategy, ReturnDeletedObject: true, Storage: storageInterface, DestroyFunc: dFunc, } statusStore := *store statusStore.UpdateStrategy = pod.StatusStrategy return PodStorage{ Pod: &REST{store, proxyTransport}, Binding: &BindingREST{store: store}, Eviction: newEvictionStorage(store, podDisruptionBudgetClient), Status: &StatusREST{store: &statusStore}, Log: &podrest.LogREST{Store: store, KubeletConn: k}, Proxy: &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport}, Exec: &podrest.ExecREST{Store: store, KubeletConn: k}, Attach: &podrest.AttachREST{Store: store, KubeletConn: k}, PortForward: &podrest.PortForwardREST{Store: store, KubeletConn: k}, } }
該接口中調(diào)用了opts.Decorator()接口返回了關(guān)鍵的storage interface及清除操作資源的接口。
要看該接口的實(shí)現(xiàn),我們得先從opts的創(chuàng)建開始。
restOptionsGetter(api.Resource("pods"))該步完成了opts的創(chuàng)建,api.Resource("pods")其實(shí)就是拼接了一個(gè)GroupResource的結(jié)構(gòu),我們需要從頭開始介紹restOptionsGetter接口的由來。
路徑:pkg/master/master.go
func (c completedConfig) New() (*Master, error) { ... restOptionsFactory := restOptionsFactory{ deleteCollectionWorkers: c.DeleteCollectionWorkers, enableGarbageCollection: c.GenericConfig.EnableGarbageCollection, storageFactory: c.StorageFactory, } // 判斷是否使能了用于Watch的Cache // 有無cache賦值的是不同的接口實(shí)現(xiàn) // restOptionsFactory.storageDecorator:是一個(gè)各個(gè)資源的REST interface(CRUD)裝飾者 // 后面調(diào)用NewStorage()時(shí)會(huì)用到該接口,并輸出對(duì)應(yīng)的CRUD接口及銷毀接口。 // 可以參考pkg/registry/core/pod/etcd/etcd.go中的NewStorage() // 其實(shí)這里有無cache的接口差異就在于:有cache的話,就提供操作cache的接口;無cache的話,就提供直接操作etcd的接口 if c.EnableWatchCache { restOptionsFactory.storageDecorator = registry.StorageWithCacher } else { restOptionsFactory.storageDecorator = generic.UndecoratedStorage } // install legacy rest storage if c.GenericConfig.APIResourceConfigSource.AnyResourcesForVersionEnabled(apiv1.SchemeGroupVersion) { legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{ StorageFactory: c.StorageFactory, ProxyTransport: c.ProxyTransport, KubeletClientConfig: c.KubeletClientConfig, EventTTL: c.EventTTL, ServiceIPRange: c.ServiceIPRange, ServiceNodePortRange: c.ServiceNodePortRange, LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig, } m.InstallLegacyAPI(c.Config, restOptionsFactory.NewFor, legacyRESTStorageProvider) } ... }
該接口初始化了一個(gè)restOptionsFactory變量,里面指定了最大的刪除回收資源的協(xié)程數(shù),是否使能GC和storageFactory,還根據(jù)是否使能了WatchCache來完成NewStorage()接口中調(diào)用的裝飾器接口的賦值。
restOptionsFactory.NewForj接口一直被往下傳,直到NewLegacyRESTStorage()接口中被調(diào)用然后創(chuàng)建了opts,我們看下該接口實(shí)現(xiàn):
路徑: pkg/master/master.go
type restOptionsFactory struct { deleteCollectionWorkers int enableGarbageCollection bool storageFactory genericapiserver.StorageFactory storageDecorator generic.StorageDecorator } func (f restOptionsFactory) NewFor(resource unversioned.GroupResource) generic.RESTOptions { // 創(chuàng)建該資源的Storage Config storageConfig, err := f.storageFactory.NewConfig(resource) if err != nil { glog.Fatalf("Unable to find storage destination for %v, due to %v", resource, err.Error()) } // 最終返回的就是RESTOptions, 就是前面的opts的類型 // 需要關(guān)注f.storageDecorator的由來 return generic.RESTOptions{ // 用于生成Storage的config StorageConfig: storageConfig, Decorator: f.storageDecorator, DeleteCollectionWorkers: f.deleteCollectionWorkers, EnableGarbageCollection: f.enableGarbageCollection, ResourcePrefix: f.storageFactory.ResourcePrefix(resource), } }
該接口比較簡(jiǎn)單,初始化了一個(gè)generic.RESTOptions變量,即opts。我們需要找出opts.Decorator的由來,就只需要看下上一個(gè)接口判斷EnableWatchCache時(shí)就明白了。
opts.Decorator該接口最終返回了storage的interface和清除操作資源的接口??梢韵胍幌聨Ь彌_和不帶緩沖的接口實(shí)現(xiàn)肯定不一致,所以這里需要進(jìn)行區(qū)分:
registry.StorageWithCacher:該接口是返回了操作cache的接口,和清除cache的操作接口
generic.UndecoratedStorage: 該接口會(huì)根據(jù)你配置的后端類型(etcd2/etcd3等),來返回不同的etcd操作接口,其實(shí)是為所有的資源對(duì)象創(chuàng)建了etcd的鏈接,然后通過該鏈接發(fā)送不同的命令,最后還返回了斷開該鏈接的接口。
所以實(shí)現(xiàn)完全不一樣,一個(gè)操作cache,一個(gè)操作實(shí)際的etcd。
先看registry.StorageWithCacher()接口實(shí)現(xiàn):
路徑: pkg/registry/generic/registry/storage_factory.go
func StorageWithCacher( storageConfig *storagebackend.Config, capacity int, objectType runtime.Object, resourcePrefix string, scopeStrategy rest.NamespaceScopedStrategy, newListFunc func() runtime.Object, triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) { // storageConfig是后端存儲(chǔ)的config,定義了存儲(chǔ)類型,存儲(chǔ)服務(wù)器List,TLS證書信息,Cache大小等。 // 該接口就是generic.UndecoratedStorage()接口的實(shí)現(xiàn),StorageWithCacher()接口就是多了下面的cacher操作 s, d := generic.NewRawStorage(storageConfig) // TODO: we would change this later to make storage always have cacher and hide low level KV layer inside. // Currently it has two layers of same storage interface -- cacher and low level kv. cacherConfig := storage.CacherConfig{ CacheCapacity: capacity, Storage: s, Versioner: etcdstorage.APIObjectVersioner{}, Type: objectType, ResourcePrefix: resourcePrefix, NewListFunc: newListFunc, TriggerPublisherFunc: triggerFunc, Codec: storageConfig.Codec, } // 根據(jù)是否有namespace來進(jìn)行區(qū)分賦值 // KeyFunc函數(shù)用于獲取該object的Key: // 有namespace的話,key的格式:prefix + "/" + Namespace + "/" + name // 無namespace的話,key的格式:prefix + "/" + name if scopeStrategy.NamespaceScoped() { cacherConfig.KeyFunc = func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(resourcePrefix, obj) } } else { cacherConfig.KeyFunc = func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(resourcePrefix, obj) } } // 根據(jù)之前初始化的Cacher的config,進(jìn)行cacher創(chuàng)建 // 比較關(guān)鍵,后面進(jìn)行介紹 cacher := storage.NewCacherFromConfig(cacherConfig) destroyFunc := func() { cacher.Stop() d() } return cacher, destroyFunc }
先調(diào)用NewRawStorage()接口創(chuàng)建了一個(gè)存儲(chǔ)后端,我們先看下這個(gè)接口實(shí)現(xiàn):
路徑: pkg/registry/generic/storage_decorator.go
func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc) { s, d, err := factory.Create(*config) if err != nil { glog.Fatalf("Unable to create storage backend: config (%v), err (%v)", config, err) } return s, d }
沒啥好說的,繼續(xù)看Create():
路徑: pkg/storage/storagebackend/factory/factory.go
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { // 判斷下存儲(chǔ)類型:etcd2 、etcd3 switch c.Type { case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD2: return newETCD2Storage(c) case storagebackend.StorageTypeETCD3: // TODO: We have the following features to implement: // - Support secure connection by using key, cert, and CA files. // - Honor "https" scheme to support secure connection in gRPC. // - Support non-quorum read. return newETCD3Storage(c) default: return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type) } }
挑個(gè)etcd2看下實(shí)現(xiàn):
路徑: pkg/storage/storagebackend/factory/etcd2.go
func newETCD2Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { // 根據(jù)配置的TLS證書信息創(chuàng)建http.Transport tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile) if err != nil { return nil, nil, err } // 創(chuàng)建etcd2 client,返回的是httpClusterClient結(jié)構(gòu) client, err := newETCD2Client(tr, c.ServerList) if err != nil { return nil, nil, err } // 根據(jù)入?yún)⒊跏蓟粋€(gè)實(shí)現(xiàn)了storage.Interface接口的etcdHelper變量 s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize) // 返回etcdHelper變量,及關(guān)閉鏈接的函數(shù) return s, tr.CloseIdleConnections, nil }
前兩步都是為了創(chuàng)建與etcd連接的client,后一步比較關(guān)鍵:
路徑: pkg/storage/etcd/etcd_helper.go
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int) storage.Interface { return &etcdHelper{ // 創(chuàng)建一個(gè)httpMembersAPI變量,附帶很多方法 etcdMembersAPI: etcd.NewMembersAPI(client), // 創(chuàng)建一個(gè)httpKeysAPI變量,同樣附帶各類方法 etcdKeysAPI: etcd.NewKeysAPI(client), // 編解碼使用 codec: codec, versioner: APIObjectVersioner{}, // 用于序列化反序列化,版本間轉(zhuǎn)換,兼容等 copier: api.Scheme, pathPrefix: path.Join("/", prefix), quorum: quorum, // 創(chuàng)建cache結(jié)構(gòu) cache: utilcache.NewCache(cacheSize), } }
該接口很簡(jiǎn)單的初始化,需要關(guān)注的是etcdHelper附帶的通用的RESTFul 方法:
可以看到storage.Interface接口所需要的方法都實(shí)現(xiàn)了。
繼續(xù)回到StorageWithCacher()接口,在往下走就是CacherConfig的初始化,就不介紹了,直接進(jìn)入cacher的創(chuàng)建接口:
路徑: pkg/storage/cacher.go
func NewCacherFromConfig(config CacherConfig) *Cacher { watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc) listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) // Give this error when it is constructed rather than when you get the // first watch item, because it"s much easier to track down that way. if obj, ok := config.Type.(runtime.Object); ok { if err := runtime.CheckCodec(config.Codec, obj); err != nil { panic("storage codec doesn"t seem to match given type: " + err.Error()) } } cacher := &Cacher{ ready: newReady(), storage: config.Storage, objectType: reflect.TypeOf(config.Type), watchCache: watchCache, reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0), versioner: config.Versioner, triggerFunc: config.TriggerPublisherFunc, watcherIdx: 0, watchers: indexedWatchers{ allWatchers: make(map[int]*cacheWatcher), valueWatchers: make(map[string]watchersMap), }, // TODO: Figure out the correct value for the buffer size. incoming: make(chan watchCacheEvent, 100), // We need to (potentially) stop both: // - wait.Until go-routine // - reflector.ListAndWatch // and there are no guarantees on the order that they will stop. // So we will be simply closing the channel, and synchronizing on the WaitGroup. stopCh: make(chan struct{}), } watchCache.SetOnEvent(cacher.processEvent) go cacher.dispatchEvents() stopCh := cacher.stopCh cacher.stopWg.Add(1) go func() { defer cacher.stopWg.Done() wait.Until( func() { if !cacher.isStopped() { cacher.startCaching(stopCh) } }, time.Second, stopCh, ) }() return cacher }
該接口主要用于開啟cacher,而該cache只用于WATCH和LIST的request。
我們?cè)诳聪翪acher結(jié)構(gòu)體:
該接口必然也實(shí)現(xiàn)了storage.Interface接口所需要的方法。
因?yàn)樵揅acher只用于WATCH和LIST的request,所以你可以看下cacher提供的API,除了WATCH和LIST相關(guān)的之外的接口都是調(diào)用了之前創(chuàng)建的storage的API。
查看下cacher.Create和Delete:
func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { return c.storage.Create(ctx, key, obj, out, ttl) } func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions) error { return c.storage.Delete(ctx, key, out, preconditions) }
到這里registry.StorageWithCacher()接口就結(jié)束了,我們繼續(xù)回到前面講的另外一個(gè)接口generic.UndecoratedStorage():
路徑:pkg/registry/generic/storage_decorator.go
func UndecoratedStorage( config *storagebackend.Config, capacity int, objectType runtime.Object, resourcePrefix string, scopeStrategy rest.NamespaceScopedStrategy, newListFunc func() runtime.Object, trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) { return NewRawStorage(config) }
發(fā)現(xiàn)registry.StorageWithCacher()接口也是調(diào)用了NewRawStorage()接口,其實(shí)現(xiàn)就少了cache。
這里接觸到了cache,下節(jié)會(huì)專門介紹該cache實(shí)現(xiàn)。
用戶配置--watch-cache: 該apiServer的參數(shù)默認(rèn)就是true的,用于打開watch cache
--watch-cache-sizes: 既然有enable cache,那就少不了cache sizes,而且該size可以指定各類資源所使用的cache size。格式: resource#size
--storage-backend: 后端持久化存儲(chǔ)類型,可選項(xiàng)為etcd2(默認(rèn))、etcd3
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/32554.html
摘要:源碼版本簡(jiǎn)介是最重要的組成部分,不論是命令操作還是通過進(jìn)行控制,實(shí)際都需要經(jīng)過。僅用于長(zhǎng)時(shí)間執(zhí)行的請(qǐng)求最小請(qǐng)求處理超時(shí)時(shí)間,默認(rèn)僅用于該文件內(nèi)設(shè)置鑒權(quán)機(jī)構(gòu)一組用于運(yùn)行時(shí)的配置信息。在最后會(huì)啟動(dòng)服務(wù)。 源碼版本 Kubernetes v1.5.0 簡(jiǎn)介 apiserver是K8S最重要的組成部分,不論是命令操作還是通過remote API進(jìn)行控制,實(shí)際都需要經(jīng)過apiserver。api...
摘要:我們先將上面的接口解析放放,先看下是如何初始化的路徑定義了,再看路徑定義空的創(chuàng)建,用于不同版本對(duì)象轉(zhuǎn)換增加一些轉(zhuǎn)換函數(shù)上面就創(chuàng)建了一個(gè)空的。其實(shí)就是向添加了轉(zhuǎn)換函數(shù),比如將轉(zhuǎn)換為,將轉(zhuǎn)換為。 源碼版本 Kubernetes v1.5.0 簡(jiǎn)介 k8s里面有各種資源,如Pod、Service、RC、namespaces等資源,用戶操作的其實(shí)也就是這一大堆資源。但這些資源并不是雜亂無章的,...
摘要:它包括一組和一個(gè)對(duì)象,使用進(jìn)行請(qǐng)求派發(fā)。流程基本就是這樣,接著我們直接進(jìn)入接口看實(shí)現(xiàn)拼裝然后填充并返回一個(gè)對(duì)象創(chuàng)建一個(gè)這個(gè)是關(guān)鍵,會(huì)對(duì)各種進(jìn)行注冊(cè)增加一個(gè)的將該加入到前兩個(gè)調(diào)用函數(shù)比較簡(jiǎn)單,這里不進(jìn)行介紹了。 源碼版本 Kubernetes v1.5.0 go-restful 簡(jiǎn)介 go-restful是用于構(gòu)建REST-style web服務(wù)的golang包。它是出現(xiàn)時(shí)因?yàn)橐粋€(gè)jav...
摘要:前言了解的同學(xué)都知道,對(duì)外提供接口提供查詢,監(jiān)聽集群資源狀態(tài)的服務(wù),主要就做一件事,就是如何將接口調(diào)用映射到對(duì)后端存儲(chǔ)比如的增刪改查訪問,在設(shè)計(jì)的時(shí)候考慮到是個(gè)快速迭代的開源項(xiàng)目,很多接口版本可能在未來版本發(fā)生變化,因此如何設(shè)計(jì)一個(gè)擴(kuò)展 前言 了解 k8s 的同學(xué)都知道,kube-apiserver 對(duì)外提供 RESTful API 接口提供 查詢,監(jiān)聽集群(資源)狀態(tài)的服務(wù),kube...
摘要:前言了解的同學(xué)都知道,對(duì)外提供接口提供查詢,監(jiān)聽集群資源狀態(tài)的服務(wù),主要就做一件事,就是如何將接口調(diào)用映射到對(duì)后端存儲(chǔ)比如的增刪改查訪問,在設(shè)計(jì)的時(shí)候考慮到是個(gè)快速迭代的開源項(xiàng)目,很多接口版本可能在未來版本發(fā)生變化,因此如何設(shè)計(jì)一個(gè)擴(kuò)展 前言 了解 k8s 的同學(xué)都知道,kube-apiserver 對(duì)外提供 RESTful API 接口提供 查詢,監(jiān)聽集群(資源)狀態(tài)的服務(wù),kube...
閱讀 2553·2021-10-11 10:58
閱讀 1037·2019-08-29 13:58
閱讀 1673·2019-08-26 13:32
閱讀 838·2019-08-26 10:40
閱讀 3265·2019-08-26 10:18
閱讀 1764·2019-08-23 14:18
閱讀 1113·2019-08-23 10:54
閱讀 442·2019-08-22 18:39