新聞中心
API Server源碼分析之入口點(diǎn)解析
作者:陽明 2023-03-17 07:53:20
云計(jì)算
云原生 從本文開始,我們將對(duì) K8S API Server 的代碼進(jìn)行詳細(xì)分析,并探討其應(yīng)用入口點(diǎn)、框架以及與 etcd 的通信。

在徐水等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場(chǎng)前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都網(wǎng)站制作、網(wǎng)站設(shè)計(jì) 網(wǎng)站設(shè)計(jì)制作按需求定制設(shè)計(jì),公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),高端網(wǎng)站設(shè)計(jì),營(yíng)銷型網(wǎng)站建設(shè),成都外貿(mào)網(wǎng)站建設(shè),徐水網(wǎng)站建設(shè)費(fèi)用合理。
Kubernetes(K8s)集群中最關(guān)鍵的組件之一是 API Server,它是所有集群管理活動(dòng)的入口點(diǎn)。從本文開始,我們將對(duì) K8s API Server 的代碼進(jìn)行詳細(xì)分析,并探討其應(yīng)用入口點(diǎn)、框架以及與 etcd 的通信。
應(yīng)用入口點(diǎn)
K8s API Server 的主要入口點(diǎn)位于 cmd/kube-APiServer/apiserver.go 文件的。
// cmd/kube-apiserver/apiserver.go
// apiserver is the main api server and master for the cluster.
// it is responsible for serving the cluster management API.
package main
import (
"os"
"k8s.io/component-base/cli"
_ "k8s.io/component-base/logs/json/register" // 用于JSON日志格式注冊(cè)
_ "k8s.io/component-base/metrics/prometheus/clientgo" // 加載所有的 prometheus client-go 插件
_ "k8s.io/component-base/metrics/prometheus/version" // 用于版本指標(biāo)注冊(cè)
"k8s.io/kubernetes/cmd/kube-apiserver/app"
)
func main() {
command := app.NewAPIServerCommand()
code := cli.Run(command)
os.Exit(code)
}
其中的 app.NewAPIServerCommand() 是構(gòu)建的一個(gè) cobra 的命令對(duì)象,cli.Run 然后執(zhí)行該命令即可,所以我們直接查看 NewAPIServerCommand 函數(shù)是如果構(gòu)造 cobra.Command 對(duì)象的:
// cmd/kube-apiserver/app/server.go
// NewAPIServerCommand 使用默認(rèn)參數(shù)創(chuàng)建一個(gè) *cobra.Command 對(duì)象
func NewAPIServerCommand() *cobra.Command {
// NewServerRunOptions 使用默認(rèn)參數(shù)創(chuàng)建一個(gè)新的 ServerRunOptions 對(duì)象。
// ServerRunOption 對(duì)象是運(yùn)行 apiserver 需要的對(duì)象
s := options.NewServerRunOptions()
cmd := &cobra.Command{
Use: "kube-apiserver",
Long: `The Kubernetes API server validates and configures data
for the api objects which include pods, services, replicationcontrollers, and
others. The API Server services REST operations and provides the frontend to the
cluster's shared state through which all other components interact.`,
// ......
RunE: func(cmd *cobra.Command, args []string) error {
verflag.PrintAndExitIfRequested()
fs := cmd.Flags()
if err := s.Logs.ValidateAndApply(); err != nil {
return err
}
cliflag.PrintFlags(fs)
err := checkNonZeroInsecurePort(fs)
if err != nil {
return err
}
// 設(shè)置默認(rèn)選項(xiàng)
completedOptions, err := Complete(s)
if err != nil {
return err
}
// 校驗(yàn)選項(xiàng)
if errs := completedOptions.Validate(); len(errs) != 0 {
return utilerrors.NewAggregate(errs)
}
return Run(completedOptions, genericapiserver.SetupSignalHandler())
},
}
// ......
return cmd
}
該函數(shù)最核心的功能就是使用 Complete(s) 函數(shù)來生成 apiserver 啟動(dòng)需要的默認(rèn)參數(shù),然后將默認(rèn)參數(shù)傳遞給 Run 函數(shù)進(jìn)行啟動(dòng)。
// cmd/kube-apiserver/app/server.go
// Run 運(yùn)行指定的 APIServer,不能退出.
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
// 創(chuàng)建服務(wù)鏈(包含的3個(gè)server組件)
server, err := CreateServerChain(completeOptions, stopCh)
// 服務(wù)啟動(dòng)前的準(zhǔn)備工作,包括健康檢查、存活檢查、OpenAPI路由注冊(cè)等
prepared, err := server.PrepareRun()
// 正式啟動(dòng)運(yùn)行
return prepared.Run(stopCh)
}
在 Run 函數(shù)中首先會(huì)通過 CreateServerChain 函數(shù)通過委托創(chuàng)建連接的 APIServer 對(duì)象。
// cmd/kube-apiserver/app/server.go
// CreateServerChain 通過委托創(chuàng)建連接的APIServer
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
// CreateKubeAPIServerConfig 創(chuàng)建用于運(yùn)行 APIServer 的所有配置資源,但不運(yùn)行任何資源
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
// // 創(chuàng)建 APIExtensionsServer 配置
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))
// 創(chuàng)建APIExtensionsServer并注冊(cè)路由
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
// 創(chuàng)建KubeAPIServer并注冊(cè)路由
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
// // 創(chuàng)建 aggregatorServer 配置
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer)
// 創(chuàng)建aggregatorServer并注冊(cè)路由
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
return aggregatorServer, nil
}
上面的函數(shù)中可以看到 CreateServerChain 會(huì)創(chuàng)建3個(gè) server:APIExtensionServer、KubeAPIServer、AggregratorServer,APIServer 就是依靠這3個(gè)組件來對(duì)不同類型的請(qǐng)求進(jìn)行處理的:
- APIExtensionServer: 主要負(fù)責(zé)處理 CustomResourceDefinition(CRD)方面的請(qǐng)求。
- KubeAPIServer: 主要負(fù)責(zé)處理 K8s 內(nèi)置資源的請(qǐng)求,此外還會(huì)包括通用處理、認(rèn)證、鑒權(quán)等。
- AggregratorServer: 主要負(fù)責(zé)聚合器方面的處理,它充當(dāng)一個(gè)代理服務(wù)器,將請(qǐng)求轉(zhuǎn)發(fā)到聚合進(jìn)來的 K8s service 中。
創(chuàng)建每個(gè) server 都有對(duì)應(yīng)的 config,可以看出上面函數(shù)中的 apiExtensionServer 和 aggregatorServer 的 Config 需要依賴 kubeAPIServerConfig,而這幾個(gè) ServerConfig 都需要依賴 GenericConfig,CreateKubeAPIServerConfig 函數(shù)創(chuàng)建 kubeAPIServerConfig ,在該函數(shù)中通過調(diào)用 buildGenericConfig 來創(chuàng)建 GenericConfig 對(duì)象,如下代碼所示。
// cmd/kube-apiserver/app/server.go
// CreateKubeAPIServerConfig 創(chuàng)建用于運(yùn)行 APIServer 的所有配置資源
func CreateKubeAPIServerConfig(s completedServerRunOptions) (
*controlplane.Config,
aggregatorapiserver.ServiceResolver,
[]admission.PluginInitializer,
error,
) {
proxyTransport := CreateProxyTransport()
// 構(gòu)建通用配置
genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
// ......
config := &controlplane.Config{
GenericConfig: genericConfig,
ExtraConfig: controlplane.ExtraConfig{
APIResourceConfigSource: storageFactory.APIResourceConfigSource,
StorageFactory: storageFactory,
EventTTL: s.EventTTL,
KubeletClientConfig: s.KubeletConfig,
EnableLogsSupport: s.EnableLogsHandler,
ProxyTransport: proxyTransport,
ServiceIPRange: s.PrimaryServiceClusterIPRange,
APIServerServiceIP: s.APIServerServiceIP,
SecondaryServiceIPRange: s.SecondaryServiceClusterIPRange,
APIServerServicePort: 443,
ServiceNodePortRange: s.ServiceNodePortRange,
KubernetesServiceNodePort: s.KubernetesServiceNodePort,
EndpointReconcilerType: reconcilers.Type(s.EndpointReconcilerType),
MasterCount: s.MasterCount,
ServiceAccountIssuer: s.ServiceAccountIssuer,
ServiceAccountMaxExpiration: s.ServiceAccountTokenMaxExpiration,
ExtendExpiration: s.Authentication.ServiceAccounts.ExtendExpiration,
VersionedInformers: versionedInformers,
IdentityLeaseDurationSeconds: s.IdentityLeaseDurationSeconds,
IdentityLeaseRenewIntervalSeconds: s.IdentityLeaseRenewIntervalSeconds,
},
}
// ......
return config, serviceResolver, pluginInitializers, nil
}
func buildGenericConfig(
s *options.ServerRunOptions,
proxyTransport *http.Transport,
)(...){
//創(chuàng)建一個(gè)通用配置對(duì)象
genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
// ......
//創(chuàng)建認(rèn)證實(shí)例
if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, clientgoExternalClient, versionedInformers); lastErr != nil {
return
}
// ...
// openapi/swagger配置,OpenAPIConfig 用于生成 OpenAPI 規(guī)范
getOpenAPIDefinitions := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(getOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
// storageFactoryConfig 對(duì)象定義了 kube-apiserver 與 etcd 的交互方式,如:etcd認(rèn)證、地址、存儲(chǔ)前綴等
// 該對(duì)象也定義了資源存儲(chǔ)方式,如:資源信息、資源編碼信息、資源狀態(tài)等
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
storageFactory, lastErr = completedStorageFactoryConfig.New()
if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
return
}
// ......
// 初始化 SharedInformerFactory
kubeClientConfig := genericConfig.LoopbackClientConfig
clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig)
versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
// 認(rèn)證配置,內(nèi)部調(diào)用 authenticatorConfig.New()
// K8s提供了9種認(rèn)證機(jī)制,每種認(rèn)證機(jī)制被實(shí)例化后都成為認(rèn)證器
if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, clientgoExternalClient, versionedInformers); lastErr != nil {
return
}
// 創(chuàng)建鑒權(quán)實(shí)例,K8s也提供了6種授權(quán)機(jī)制,每種授權(quán)機(jī)制被實(shí)例化后都成為授權(quán)器
genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
// ...
// 審計(jì)
lastErr = s.Audit.ApplyTo(genericConfig)
// 準(zhǔn)入控制器
// k8s資源在認(rèn)證和授權(quán)通過,被持久化到etcd之前進(jìn)入準(zhǔn)入控制邏輯
// 準(zhǔn)入控制包括:對(duì)請(qǐng)求的資源進(jìn)行自定義操作(校驗(yàn)、修改、拒絕)
// 準(zhǔn)入控制器通過 Plugins 數(shù)據(jù)結(jié)構(gòu)統(tǒng)一注冊(cè)、存放、管理
admissionConfig := &kubeapiserveradmission.Config{
ExternalInformers: versionedInformers,
LoopbackClientConfig: genericConfig.LoopbackClientConfig,
CloudConfigFile: s.CloudProvider.CloudConfigFile,
}
serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver, genericConfig.TracerProvider)
err = s.Admission.ApplyTo(
genericConfig,
versionedInformers,
kubeClientConfig,
feature.DefaultFeatureGate,
pluginInitializers...)
// ...
}
然后我們?cè)賮矸謩e看看這3個(gè) Server 是如何構(gòu)建的。
go-restful框架
這里我們就不得不先了解下 go-restful 這個(gè)框架了,因?yàn)?APIServer 就使用的這個(gè)框架。下面的代碼是 go-restful 官方的一個(gè)示例,這個(gè) demo 了解后基本上就知道 go-restful 框架是如何使用的了:
package main
import (
"log"
"net/http"
restfulspec "github.com/emicklei/go-restful-openapi/v2"
restful "github.com/emicklei/go-restful/v3"
"github.com/go-openapi/spec"
)
// UserResource is the REST layer to the User domain
type UserResource struct {
// normally one would use DAO (data access object)
users map[string]User
}
// WebService creates a new service that can handle REST requests for User resources.
func (u UserResource) WebService() *restful.WebService {
ws := new(restful.WebService)
ws.
Path("/users").
Consumes(restful.MIME_XML, restful.MIME_JSON).
Produces(restful.MIME_JSON, restful.MIME_XML) // you can specify this per route as well
tags := []string{"users"}
ws.Route(ws.GET("/").To(u.findAllUsers).
// docs
Doc("get all users").
Metadata(restfulspec.KeyOpenAPITags, tags).
Writes([]User{}).
Returns(200, "OK", []User{}))
ws.Route(ws.GET("/{user-id}").To(u.findUser).
// docs
Doc("get a user").
Param(ws.PathParameter("user-id", "identifier of the user").DataType("integer").DefaultValue("1")).
Metadata(restfulspec.KeyOpenAPITags, tags).
Writes(User{}). // on the response
Returns(200, "OK", User{}).
Returns(404, "Not Found", nil))
ws.Route(ws.PUT("/{user-id}").To(u.updateUser).
// docs
Doc("update a user").
Param(ws.PathParameter("user-id", "identifier of the user").DataType("string")).
Metadata(restfulspec.KeyOpenAPITags, tags).
Reads(User{})) // from the request
ws.Route(ws.PUT("").To(u.createUser).
// docs
Doc("create a user").
Metadata(restfulspec.KeyOpenAPITags, tags).
Reads(User{})) // from the request
ws.Route(ws.DELETE("/{user-id}").To(u.removeUser).
// docs
Doc("delete a user").
Metadata(restfulspec.KeyOpenAPITags, tags).
Param(ws.PathParameter("user-id", "identifier of the user").DataType("string")))
return ws
}
// GET http://localhost:8080/users
//
func (u UserResource) findAllUsers(request *restful.Request, response *restful.Response) {
list := []User{}
for _, each := range u.users {
list = append(list, each)
}
response.WriteEntity(list)
}
// GET http://localhost:8080/users/1
//
func (u UserResource) findUser(request *restful.Request, response *restful.Response) {
id := request.PathParameter("user-id")
usr := u.users[id]
if len(usr.ID) == 0 {
response.WriteErrorString(http.StatusNotFound, "User could not be found.")
} else {
response.WriteEntity(usr)
}
}
// PUT http://localhost:8080/users/1
//1 Melissa Raspberry
//
func (u *UserResource) updateUser(request *restful.Request, response *restful.Response) {
usr := new(User)
err := request.ReadEntity(&usr)
if err == nil {
u.users[usr.ID] = *usr
response.WriteEntity(usr)
} else {
response.WriteError(http.StatusInternalServerError, err)
}
}
// PUT http://localhost:8080/users/1
//1 Melissa
//
func (u *UserResource) createUser(request *restful.Request, response *restful.Response) {
usr := User{ID: request.PathParameter("user-id")}
err := request.ReadEntity(&usr)
if err == nil {
u.users[usr.ID] = usr
response.WriteHeaderAndEntity(http.StatusCreated, usr)
} else {
response.WriteError(http.StatusInternalServerError, err)
}
}
// DELETE http://localhost:8080/users/1
//
func (u *UserResource) removeUser(request *restful.Request, response *restful.Response) {
id := request.PathParameter("user-id")
delete(u.users, id)
}
func main() {
u := UserResource{map[string]User{}}
restful.DefaultContainer.Add(u.WebService())
config := restfulspec.Config{
WebServices: restful.RegisteredWebServices(), // you control what services are visible
APIPath: "/apidocs.json",
PostBuildSwaggerObjectHandler: enrichSwaggerObject}
restful.DefaultContainer.Add(restfulspec.NewOpenAPIService(config))
// Optionally, you can install the Swagger Service which provides a nice Web UI on your REST API
// You need to download the Swagger HTML5 assets and change the FilePath location in the config below.
// Open http://localhost:8080/apidocs/?url=http://localhost:8080/apidocs.json
http.Handle("/apidocs/", http.StripPrefix("/apidocs/", http.FileServer(http.Dir("/Users/emicklei/Projects/swagger-ui/dist"))))
log.Printf("start listening on localhost:8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
func enrichSwaggerObject(swo *spec.Swagger) {
swo.Info = &spec.Info{
InfoProps: spec.InfoProps{
Title: "UserService",
Description: "Resource for managing Users",
Contact: &spec.ContactInfo{
ContactInfoProps: spec.ContactInfoProps{
Name: "john",
Email: "[email protected]",
URL: "http://johndoe.org",
},
},
License: &spec.License{
LicenseProps: spec.LicenseProps{
Name: "MIT",
URL: "http://mit.org",
},
},
Version: "1.0.0",
},
}
swo.Tags = []spec.Tag{spec.Tag{TagProps: spec.TagProps{
Name: "users",
Description: "Managing users"}}}
}
// User is just a sample type
type User struct {
ID string `json:"id" description:"identifier of the user"`
Name string `json:"name" description:"name of the user" default:"john"`
Age int `json:"age" description:"age of the user" default:"21"`
}
這個(gè)示例代碼,就是使用 go-restful 的核心功能實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的 RESTful 的 API,實(shí)現(xiàn)了對(duì) User 的增刪查改,其中有這么幾個(gè)核心概念:Container、WebService、Route。
- Container:服務(wù)器容器,包含多個(gè) WebService 和一個(gè) http.ServerMux。
- WebService:服務(wù),由多個(gè) Route 組成,一個(gè) WebService 其實(shí)代表某一個(gè)對(duì)象相關(guān)的服務(wù),如上例中的 /users,針對(duì)該 /users 要實(shí)現(xiàn)RESTful API,那么需要向其添加增刪查改的路由,即 Route,它是 Route 的集合。
- Route:路由,包含了 url,http 方法,接收和響應(yīng)的媒體類型以及處理函數(shù)。每一個(gè) Route,根據(jù) Method 和 Path,映射到對(duì)應(yīng)的方法中,即是 Method/Path 到 Function 映射關(guān)系的抽象,如上例中的 ws.Route(ws.GET("/{user-id}").To(u.findUser)),就是針對(duì) /users/{user-id}該路徑的GET請(qǐng)求,則被路由到 findUser 方法中進(jìn)行處理。
- Container 是 WebService 的集合,可以向 Container 中添加多個(gè) WebService,而 Container 因?yàn)閷?shí)現(xiàn)了 ServeHTTP() 方法,其本質(zhì)上還是一個(gè)http Handler,可以直接用在 http Server 中。
Kubernetes 中對(duì) go-restful 的使用比較基礎(chǔ),就使用到了其最基礎(chǔ)的路由功能,由于 K8s 有很多內(nèi)置的資源對(duì)象,也包括 CRD 這種自定義資源對(duì)象,所以一開始并不是直接將這些資源對(duì)應(yīng)對(duì)應(yīng)的接口硬編碼的,而是通過一系列代碼動(dòng)態(tài)注冊(cè)的,所以接下來我們分析的其實(shí)就是想辦法讓 APIServer 能夠提供如下所示的路由處理出來:
GET /apis/apps/v1/namespaces/{namespace}/deployments/{name}
POST /apis/apps/v1/namespaces/{namespace}/deployments
GET /apis/apps/v1/namespaces/{namespace}/daemonsets/{name}
POST /apis/apps/v1/namespaces/{namespace}/daemonsets對(duì) go-restful 有一個(gè)基礎(chǔ)了解后,后面就可以去了解下這3個(gè) Server 具體是如何實(shí)例化的了。
網(wǎng)站名稱:APIServer源碼分析之入口點(diǎn)解析
網(wǎng)站鏈接:http://m.fisionsoft.com.cn/article/cdhjjij.html


咨詢
建站咨詢
