新聞中心
Containerd深度剖析-CRI篇
作者:段全峰 2023-01-10 13:48:50
云計算
云原生 本文就針對K8s使用Containerd作為運行時的整個調(diào)用鏈進行介紹和源碼級別的分析。

創(chuàng)新互聯(lián)主營臨潼網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,成都app軟件開發(fā)公司,臨潼h5小程序定制開發(fā)搭建,臨潼網(wǎng)站營銷推廣歡迎臨潼等地區(qū)企業(yè)咨詢
本文經(jīng)DCOS(公眾號 ID: indagate)授權(quán)轉(zhuǎn)載,轉(zhuǎn)載請聯(lián)系出處。
撰文 | 段全鋒
編輯 | zouyee
段全鋒: 軟件工程師,熟悉K8s架構(gòu)、精通Runtime底層技術(shù)細節(jié)等。
目前我司現(xiàn)網(wǎng)的K8s集群的運行時已經(jīng)完成從docker到Containerd的切換,有小伙伴對K8s與Containerd調(diào)用鏈涉及的組件不了解,其中Containerd和RunC是什么關(guān)系,docker和containerd又有什么區(qū)別,以及K8s調(diào)用Containerd創(chuàng)建容器又是怎樣的流程,最終RunC又是如何創(chuàng)建容器的,諸如此類的疑問。本文就針對K8s使用Containerd作為運行時的整個調(diào)用鏈進行介紹和源碼級別的分析。
其中關(guān)于kubelet與運行時的分層架構(gòu)圖可以參看下圖:
那么關(guān)于各類運行時的介紹可以參看Containerd深度剖析-runtime篇
一、“運行時簡介”
容器運行時意思就是能夠管理容器運行的整個生命周期,具體一點就是如何制作容器的鏡像、容器鏡像格式是什么樣子的、管理容器的鏡像、容器鏡像的分發(fā)、如何運行一個容器以及管理創(chuàng)建的容器實例等等。
容器運行時有一個行業(yè)標(biāo)準(zhǔn)叫做OCI規(guī)范,這個規(guī)范分成兩部分:
a. 容器運行時規(guī)范:描述了如何通過一個bundle運行容器,bundle就是一個目錄,里面包括一個容器的規(guī)格文 件,文件叫config.json 和一個rootfs,rootfs中包含了一個容器運行時所需操作系統(tǒng)的文件。
b. 容器鏡像規(guī)范:定義了容器的鏡像如何打包如何將鏡像轉(zhuǎn)換成一個bundle。
的老爺子,考慮是新冠肺炎轉(zhuǎn)成重癥。雖然意識還比較清醒,但血氣分析結(jié)果很差,需要住進ICU進行呼吸監(jiān)護。從普通病房轉(zhuǎn)運至ICU,坐電梯上幾層樓就能到,但如果給氧條件不夠,風(fēng)險會非常大。他的妻女想見老爺子一面、說說話,怕以后再也見不到了。
目前流行將運行時分成low-level運行時和high-level運行時,low-level運行時專注于如何創(chuàng)建一個容器例如runc和kata,high-level包含了更多上層功能,比如鏡像管理,以docker和containerd為代表。
K8s的kubelet是調(diào)用容器運行時創(chuàng)建容器的,但是容器運行時這么多不可能逐個兼容,K8s在對接容器運行時定義了CRI接口,容器運行時只需實現(xiàn)該接口就能被使用。下圖分別是k8s使用docker和containerd的調(diào)用鏈,使用containerd時CRI接口是在containerd代碼中實現(xiàn)的;使用docker時的CRI接口是在k8s的代碼中實現(xiàn)的,叫做docker-shim(kubernetes/pkg/kubelet/dockershim/docker_service.go),這部分代碼在k8s代碼中是歷史原因,當(dāng)時docker是容器方面行業(yè)事實上的標(biāo)準(zhǔn),但隨著越來越多運行時實現(xiàn)了CRI支持,docker-shim的維護日益變成社區(qū)負擔(dān),在最新的K8s版本中,該部分代碼目前已經(jīng)移出,暫時由mirantis進行維護,下圖是插件的發(fā)展歷程。
二、“Containerd CRI簡介”
Containerd是一個行業(yè)標(biāo)準(zhǔn)的容器運行時,它是一個daemon進程,可以管理主機上容器的全部生命周期和它的文件系統(tǒng),包括:鏡像的分發(fā)和存儲、容器的運行和監(jiān)控,底層的存儲和網(wǎng)絡(luò)。
Containerd有多種客戶端,比如K8s、docker等,為了不同客戶端的容器或者鏡像能隔離開,Containerd中有namespace概念,默認情況下docker的namespace是moby,K8s的是k8s.io。
container在Containerd中代表的是一個容器的元數(shù)據(jù),containerd中的Task用于獲取容器對象并將它轉(zhuǎn)換成在操作系統(tǒng)中可運行的進程,它代表的就是容器中可運行的對象。
Containerd內(nèi)部的cri模塊實現(xiàn)K8s的CRI接口,所以K8s的kubelet可以直接使用containerd。CRI的接口包括:RuntimeService 和 ImageService
// Runtime service defines the public APIs for remote container runtimes
service RuntimeService {
// Version returns the runtime name, runtime version, and runtime API version.
rpc Version(VersionRequest) returns (VersionResponse) {}
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes must ensure
// the sandbox is in the ready state on success.
rpc RunPodSandbox(RunPodSandboxRequest) returns (RunPodSandboxResponse) {}
// StopPodSandbox stops any running process that is part of the sandbox and
// reclaims network resources (e.g., IP addresses) allocated to the sandbox.
// If there are any running containers in the sandbox, they must be forcibly
// terminated.
// This call is idempotent, and must not return an error if all relevant
// resources have already been reclaimed. kubelet will call StopPodSandbox
// at least once before calling RemovePodSandbox. It will also attempt to
// reclaim resources eagerly, as soon as a sandbox is not needed. Hence,
// multiple StopPodSandbox calls are expected.
rpc StopPodSandbox(StopPodSandboxRequest) returns (StopPodSandboxResponse) {}
// RemovePodSandbox removes the sandbox. If there are any running containers
// in the sandbox, they must be forcibly terminated and removed.
// This call is idempotent, and must not return an error if the sandbox has
// already been removed.
rpc RemovePodSandbox(RemovePodSandboxRequest) returns (RemovePodSandboxResponse) {}
// PodSandboxStatus returns the status of the PodSandbox. If the PodSandbox is not
// present, returns an error.
rpc PodSandboxStatus(PodSandboxStatusRequest) returns (PodSandboxStatusResponse) {}
// ListPodSandbox returns a list of PodSandboxes.
rpc ListPodSandbox(ListPodSandboxRequest) returns (ListPodSandboxResponse) {}
// CreateContainer creates a new container in specified PodSandbox
rpc CreateContainer(CreateContainerRequest) returns (CreateContainerResponse) {}
// StartContainer starts the container.
rpc StartContainer(StartContainerRequest) returns (StartContainerResponse) {}
// StopContainer stops a running container with a grace period (i.e., timeout).
// This call is idempotent, and must not return an error if the container has
// already been stopped.
// The runtime must forcibly kill the container after the grace period is
// reached.
rpc StopContainer(StopContainerRequest) returns (StopContainerResponse) {}
// RemoveContainer removes the container. If the container is running, the
// container must be forcibly removed.
// This call is idempotent, and must not return an error if the container has
// already been removed.
rpc RemoveContainer(RemoveContainerRequest) returns (RemoveContainerResponse) {}
// ListContainers lists all containers by filters.
rpc ListContainers(ListContainersRequest) returns (ListContainersResponse) {}
// ContainerStatus returns status of the container. If the container is not
// present, returns an error.
rpc ContainerStatus(ContainerStatusRequest) returns (ContainerStatusResponse) {}
// UpdateContainerResources updates ContainerConfig of the container synchronously.
// If runtime fails to transactionally update the requested resources, an error is returned.
rpc UpdateContainerResources(UpdateContainerResourcesRequest) returns (UpdateContainerResourcesResponse) {}
// ReopenContainerLog asks runtime to reopen the stdout/stderr log file
// for the container. This is often called after the log file has been
// rotated. If the container is not running, container runtime can choose
// to either create a new log file and return nil, or return an error.
// Once it returns error, new container log file MUST NOT be created.
rpc ReopenContainerLog(ReopenContainerLogRequest) returns (ReopenContainerLogResponse) {}
// ExecSync runs a command in a container synchronously.
rpc ExecSync(ExecSyncRequest) returns (ExecSyncResponse) {}
// Exec prepares a streaming endpoint to execute a command in the container.
rpc Exec(ExecRequest) returns (ExecResponse) {}
// Attach prepares a streaming endpoint to attach to a running container.
rpc Attach(AttachRequest) returns (AttachResponse) {}
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox.
rpc PortForward(PortForwardRequest) returns (PortForwardResponse) {}
// ContainerStats returns stats of the container. If the container does not
// exist, the call returns an error.
rpc ContainerStats(ContainerStatsRequest) returns (ContainerStatsResponse) {}
// ListContainerStats returns stats of all running containers.
rpc ListContainerStats(ListContainerStatsRequest) returns (ListContainerStatsResponse) {}
// PodSandboxStats returns stats of the pod sandbox. If the pod sandbox does not
// exist, the call returns an error.
rpc PodSandboxStats(PodSandboxStatsRequest) returns (PodSandboxStatsResponse) {}
// ListPodSandboxStats returns stats of the pod sandboxes matching a filter.
rpc ListPodSandboxStats(ListPodSandboxStatsRequest) returns (ListPodSandboxStatsResponse) {}
// UpdateRuntimeConfig updates the runtime configuration based on the given request.
rpc UpdateRuntimeConfig(UpdateRuntimeConfigRequest) returns (UpdateRuntimeConfigResponse) {}
// Status returns the status of the runtime.
rpc Status(StatusRequest) returns (StatusResponse) {}
}
// ImageService defines the public APIs for managing images.
service ImageService {
// ListImages lists existing images.
rpc ListImages(ListImagesRequest) returns (ListImagesResponse) {}
// ImageStatus returns the status of the image. If the image is not
// present, returns a response with ImageStatusResponse.Image set to
// nil.
rpc ImageStatus(ImageStatusRequest) returns (ImageStatusResponse) {}
// PullImage pulls an image with authentication config.
rpc PullImage(PullImageRequest) returns (PullImageResponse) {}
// RemoveImage removes the image.
// This call is idempotent, and must not return an error if the image has
// already been removed.
rpc RemoveImage(RemoveImageRequest) returns (RemoveImageResponse) {}
// ImageFSInfo returns information of the filesystem that is used to store images.
rpc ImageFsInfo(ImageFsInfoRequest) returns (ImageFsInfoResponse) {}
}
kubelet調(diào)用CRI接口創(chuàng)建一個包含A和B兩個業(yè)務(wù)container的Pod流程如下所示:
① 為Pod創(chuàng)建sandbox
② 創(chuàng)建container A
③ 啟動container A
④ 創(chuàng)建container B
⑤ 啟動container B
三、“Containerd CRI實現(xiàn)”
RunPodSandbox
RunPodSandbox的流程如下:
① 拉取sandbox的鏡像,在containerd中配置
② 獲取創(chuàng)建pod要使用的runtime,可以在創(chuàng)建pod的yaml中指定,如果沒指定使用containerd中默認的
(runtime在containerd中配置)
③ 如果pod不是hostNetwork那么添加創(chuàng)建新net namespace,并使用cni插件設(shè)置網(wǎng)絡(luò)(criService在初始化時會加載containerd中cri指定的插件信息)
④ 調(diào)用containerd客戶端創(chuàng)建一個container
⑤ 在rootDir/io.containerd.grpc.v1.cri/sandboxes下為當(dāng)前pod以pod Id為名創(chuàng)建一個目錄
(pkg/cri/cri.go)
⑥ 根據(jù)選擇的runtime為sandbox容器創(chuàng)建task
⑦ 啟動sandbox容器的task,將sandbox添加到數(shù)據(jù)庫中
代碼在containerd/pkg/cri/server/sanbox_run.go 中
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state.
func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (_ *runtime.RunPodSandboxResponse, retErr error) {
config := r.GetConfig()
log.G(ctx).Debugf("Sandbox config %+v", config)
// Generate unique id and name for the sandbox and reserve the name.
id := util.GenerateID()
metadata := config.GetMetadata()
if metadata == nil {
return nil, errors.New("sandbox config must include metadata")
}
name := makeSandboxName(metadata)
log.G(ctx).WithField("podsandboxid", id).Debugf("generated id for sandbox name %q", name)
// cleanupErr records the last error returned by the critical cleanup operations in deferred functions,
// like CNI teardown and stopping the running sandbox task.
// If cleanup is not completed for some reason, the CRI-plugin will leave the sandbox
// in a not-ready state, which can later be cleaned up by the next execution of the kubelet's syncPod workflow.
var cleanupErr error
// Reserve the sandbox name to avoid concurrent `RunPodSandbox` request starting the
// same sandbox.
if err := c.sandboxNameIndex.Reserve(name, id); err != nil {
return nil, fmt.Errorf("failed to reserve sandbox name %q: %w", name, err)
}
defer func() {
// Release the name if the function returns with an error.
// When cleanupErr != nil, the name will be cleaned in sandbox_remove.
if retErr != nil && cleanupErr == nil {
c.sandboxNameIndex.ReleaseByName(name)
}
}()
var (
err error
sandboxInfo = sb.Sandbox{ID: id}
)
ociRuntime, err := c.getSandboxRuntime(config, r.GetRuntimeHandler())
if err != nil {
return nil, fmt.Errorf("unable to get OCI runtime for sandbox %q: %w", id, err)
}
sandboxInfo.Runtime.Name = ociRuntime.Type
// Retrieve runtime options
runtimeOpts, err := generateRuntimeOptions(ociRuntime, c.config)
if err != nil {
return nil, fmt.Errorf("failed to generate sandbox runtime options: %w", err)
}
...
// Create initial internal sandbox object.
sandbox := sandboxstore.NewSandbox(
...
)
if _, err := c.client.SandboxStore().Create(ctx, sandboxInfo); err != nil {
return nil, fmt.Errorf("failed to save sandbox metadata: %w", err)
}
...
// Setup the network namespace if host networking wasn't requested.
if !hostNetwork(config) {
netStart := time.Now()
// If it is not in host network namespace then create a namespace and set the sandbox
// handle. NetNSPath in sandbox metadata and NetNS is non empty only for non host network
// namespaces. If the pod is in host network namespace then both are empty and should not
// be used.
var netnsMountDir = "/var/run/netns"
if c.config.NetNSMountsUnderStateDir {
netnsMountDir = filepath.Join(c.config.StateDir, "netns")
}
sandbox.NetNS, err = netns.NewNetNS(netnsMountDir)
if err != nil {
return nil, fmt.Errorf("failed to create network namespace for sandbox %q: %w", id, err)
}
// Update network namespace in the store, which is used to generate the container's spec
sandbox.NetNSPath = sandbox.NetNS.GetPath()
defer func() {
// Remove the network namespace only if all the resource cleanup is done
if retErr != nil && cleanupErr == nil {
if cleanupErr = sandbox.NetNS.Remove(); cleanupErr != nil {
log.G(ctx).WithError(cleanupErr).Errorf("Failed to remove network namespace %s for sandbox %q", sandbox.NetNSPath, id)
return
}
sandbox.NetNSPath = ""
}
}()
if err := sandboxInfo.AddExtension(podsandbox.MetadataKey, &sandbox.Metadata); err != nil {
return nil, fmt.Errorf("unable to save sandbox %q to store: %w", id, err)
}
// Save sandbox metadata to store
if sandboxInfo, err = c.client.SandboxStore().Update(ctx, sandboxInfo, "extensions"); err != nil {
return nil, fmt.Errorf("unable to update extensions for sandbox %q: %w", id, err)
}
// Define this defer to teardownPodNetwork prior to the setupPodNetwork function call.
// This is because in setupPodNetwork the resource is allocated even if it returns error, unlike other resource
// creation functions.
defer func() {
// Remove the network namespace only if all the resource cleanup is done.
if retErr != nil && cleanupErr == nil {
deferCtx, deferCancel := ctrdutil.DeferContext()
defer deferCancel()
// Teardown network if an error is returned.
if cleanupErr = c.teardownPodNetwork(deferCtx, sandbox); cleanupErr != nil {
log.G(ctx).WithError(cleanupErr).Errorf("Failed to destroy network for sandbox %q", id)
}
}
}()
// Setup network for sandbox.
// Certain VM based solutions like clear containers (Issue containerd/cri-containerd#524)
// rely on the assumption that CRI shim will not be querying the network namespace to check the
// network states such as IP.
// In future runtime implementation should avoid relying on CRI shim implementation details.
// In this case however caching the IP will add a subtle performance enhancement by avoiding
// calls to network namespace of the pod to query the IP of the veth interface on every
// SandboxStatus request.
if err := c.setupPodNetwork(ctx, &sandbox); err != nil {
return nil, fmt.Errorf("failed to setup network for sandbox %q: %w", id, err)
}
sandboxCreateNetworkTimer.UpdateSince(netStart)
}
if err := sandboxInfo.AddExtension(podsandbox.MetadataKey, &sandbox.Metadata); err != nil {
return nil, fmt.Errorf("unable to save sandbox %q to store: %w", id, err)
}
controller, err := c.getSandboxController(config, r.GetRuntimeHandler())
if err != nil {
return nil, fmt.Errorf("failed to get sandbox controller: %w", err)
}
// Save sandbox metadata to store
if sandboxInfo, err = c.client.SandboxStore().Update(ctx, sandboxInfo, "extensions"); err != nil {
return nil, fmt.Errorf("unable to update extensions for sandbox %q: %w", id, err)
}
runtimeStart := time.Now()
if err := controller.Create(ctx, id); err != nil {
return nil, fmt.Errorf("failed to create sandbox %q: %w", id, err)
}
resp, err := controller.Start(ctx, id)
if err != nil {
sandbox.Container, _ = c.client.LoadContainer(ctx, id)
if resp != nil && resp.SandboxID == "" && resp.Pid == 0 && resp.CreatedAt == nil && len(resp.Labels) == 0 {
// if resp is a non-nil zero-value, an error occurred during cleanup
cleanupErr = fmt.Errorf("failed to cleanup sandbox")
}
return nil, fmt.Errorf("failed to start sandbox %q: %w", id, err)
}
// TODO: get rid of this. sandbox object should no longer have Container field.
if ociRuntime.SandboxMode == string(criconfig.ModePodSandbox) {
container, err := c.client.LoadContainer(ctx, id)
if err != nil {
return nil, fmt.Errorf("failed to load container %q for sandbox: %w", id, err)
}
sandbox.Container = container
}
labels := resp.GetLabels()
if labels == nil {
labels = map[string]string{}
}
sandbox.ProcessLabel = labels["selinux_label"]
if err := sandbox.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
// Set the pod sandbox as ready after successfully start sandbox container.
status.Pid = resp.Pid
status.State = sandboxstore.StateReady
status.CreatedAt = protobuf.FromTimestamp(resp.CreatedAt)
return status, nil
}); err != nil {
return nil, fmt.Errorf("failed to update sandbox status: %w", err)
}
// Add sandbox into sandbox store in INIT state.
if err := c.sandboxStore.Add(sandbox); err != nil {
return nil, fmt.Errorf("failed to add sandbox %+v into store: %w", sandbox, err)
}
// Send CONTAINER_CREATED event with both ContainerId and SandboxId equal to SandboxId.
// Note that this has to be done after sandboxStore.Add() because we need to get
// SandboxStatus from the store and include it in the event.
c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_CREATED_EVENT)
// start the monitor after adding sandbox into the store, this ensures
// that sandbox is in the store, when event monitor receives the TaskExit event.
//
// TaskOOM from containerd may come before sandbox is added to store,
// but we don't care about sandbox TaskOOM right now, so it is fine.
go func() {
resp, err := controller.Wait(ctrdutil.NamespacedContext(), id)
if err != nil {
log.G(ctx).WithError(err).Error("failed to wait for sandbox controller, skipping exit event")
return
}
e := &eventtypes.TaskExit{
ContainerID: id,
ID: id,
// Pid is not used
Pid: 0,
ExitStatus: resp.ExitStatus,
ExitedAt: resp.ExitedAt,
}
c.eventMonitor.backOff.enBackOff(id, e)
}()
// Send CONTAINER_STARTED event with ContainerId equal to SandboxId.
c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_STARTED_EVENT)
sandboxRuntimeCreateTimer.WithValues(labels["oci_runtime_type"]).UpdateSince(runtimeStart)
return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil
}
CreateContainer
CreateContainer在指定的PodSandbox中創(chuàng)建一個新的container元數(shù)據(jù),流程如下:
① 獲取容器的sandbox信息
② 為容器要用的鏡像初始化鏡像handler
③ 為容器在rootDir/io.containerd.grpc.v1.cri目錄下以容器Id命名的目錄
④ 從sandbox中獲取所使用的runtime
⑤ 為容器創(chuàng)建containerSpec
⑥ 使用containerd客戶端創(chuàng)建container
⑦ 保存container的信息
代碼見containerd/pkg/cri/server/container_create.go 下面是省略過的代碼。
func (c *criService) CreateContainer(ctx context.Context, r
*runtime.CreateContainerRequest) (_ *runtime.CreateContainerResponse, retErr error) {
sandbox, err := c.sandboxStore.Get(r.GetPodSandboxId())
s, err := sandbox.Container.Task(ctx, nil)
sandboxPid := s.Pid()
image, err := c.localResolve(config.GetImage().GetImage())
if err != nil {
return nil, errors.Wrapf(err, "failed to resolve image %q", config.GetImage().GetImage())
}
containerdImage, err := c.toContainerdImage(ctx, image)
// Run container using the same runtime with sandbox.
sandboxInfo, err := sandbox.Container.Info(ctx)
if err != nil {
return nil, errors.Wrapf(err, "failed to get sandbox %q info", sandboxID)
}
// Create container root directory. containerRootDir := c.getContainerRootDir(id)
if err = c.os.MkdirAll(containerRootDir, 0755)? err != nil {
return nil, errors.Wrapf(err, "failed to create container root directory %q", containerRootDir)
}
ociRuntime, err := c.getSandboxRuntime(sandboxConfig, sandbox.Metadata.RuntimeHandler)
if err != nil {
return nil, errors.Wrap(err, "failed to get sandbox runtime")
}
spec, err := c.containerSpec(id, sandboxID, sandboxPid, sandbox.NetNSPath, containerName, containerdImage.Name(), config, sandboxConfig,
&image.ImageSpec.Config, append(mounts, volumeMounts...), ociRuntime)
if err != nil {
return nil, errors.Wrapf(err, "failed to generate container %q spec", id)
}
opts = append(opts, containerd.WithSpec(spec, specOpts...),
containerd.WithRuntime(sandboxInfo.Runtime.Name, runtimeOptions), containerd.WithContainerLabels(containerLabels), containerd.WithContainerExtension(containerMetadataExtension, &meta))
var cntr containerd.Container
if cntr, err = c.client.NewContainer(ctx, id, opts...)? err != nil {
return nil, errors.Wrap(err, "failed to create containerd container")
}
// Add container into container store.
if err := c.containerStore.Add(container)? err != nil {
return nil, errors.Wrapf(err, "failed to add container %q into store", id)
}
}
StartContainer
StartContainer用于啟動一個容器,流程如下:
① 讀取保存的container元數(shù)據(jù)
② 讀取關(guān)聯(lián)的sandbox信息
③ 為容器創(chuàng)建task
④ 啟動task
代碼見containerd/pkg/cri/server/container_start.go ,下面是該部分省略過后的代碼:
func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContainerRequest) (retRes *runtime.StartContainerResponse, retErr error) {
cntr, err := c.containerStore.Get(r.GetContainerId())
// Get sandbox config from sandbox store. sandbox, err := c.sandboxStore.Get(meta.SandboxID) ctrInfo, err := container.Info(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to get container info")
}
taskOpts := c.taskOpts(ctrInfo.Runtime.Name)
task, err := container.NewTask(ctx, ioCreation, taskOpts...)
if err != nil {
return nil, errors.Wrap(err, "failed to create containerd task")
}
// wait is a long running background request, no timeout needed. exitCh, err := task.Wait(ctrdutil.NamespacedContext())
// Start containerd task.
if err := task.Start(ctx)? err != nil {
return nil, errors.Wrapf(err, "failed to start containerd task %q", id)
}
}
創(chuàng)建task的代碼如下,調(diào)用了containerd的客戶端的TasksClient,向服務(wù)器端發(fā)送創(chuàng)建task的請求
func (c *container) NewTask(ctx context.Context, ioCreate cio.Creator, opts
...NewTaskOpts) (_ Task, err error) {
......
request := &tasks.CreateTaskRequest{
ContainerID: c.id,
Terminal: cfg.Terminal, Stdin: cfg.Stdin,
Stdout: cfg.Stdout,
Stderr: cfg.Stderr,
}
......
response, err := c.client.TaskService().Create(ctx, request)
......
task啟動的代碼如下,調(diào)用了containerd的客戶端的TasksClient,向服務(wù)器端發(fā)送啟動task的請求。
func (t *task) Start(ctx context.Context) error {
r, err := t.client.TaskService().Start(ctx, &tasks.StartRequest{
ContainerID: t.id,
})
if err != nil {
if t.io != nil {
t.io.Cancel()
t.io.Close()
}
return errdefs.FromGRPC(err)
}
t.pid = r.Pid
return nil
}
四、“Task Service”
Task Service創(chuàng)建task流程
下面是tasks-service處理創(chuàng)建task請求的代碼,根據(jù)容器運行時創(chuàng)建容器。
func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _
...grpc.CallOption) (*api.CreateTaskResponse, error) { container, err := l.getContainer(ctx, r.ContainerID)
......
rtime, err := l.getRuntime(container.Runtime.Name)
if err != nil {
return nil, err
}
_, err = rtime.Get(ctx, r.ContainerID)
if err != nil && err != runtime.ErrTaskNotExists {
return nil, errdefs.ToGRPC(err)
}
if err == nil {
return nil, errdefs.ToGRPC(fmt.Errorf(名稱欄目:Containerd深度剖析-CRI篇
文章位置:http://m.fisionsoft.com.cn/article/cooddsg.html


咨詢
建站咨詢
