packagemainimport("context""fmt""log""syscall""time""github.com/containerd/containerd""github.com/containerd/containerd/cio""github.com/containerd/containerd/oci""github.com/containerd/containerd/namespaces")funcmain(){iferr:=redisExample();err!=nil{log.Fatal(err)}}funcredisExample()error{// create a new client connected to the default socket path for containerd
client,err:=containerd.New("/run/containerd/containerd.sock")iferr!=nil{returnerr}deferclient.Close()// create a new context with an "example" namespace
// 这里将 example 改为
ctx:=namespaces.WithNamespace(context.Background(),"default")// pull the redis image from DockerHub
image,err:=client.Pull(ctx,"docker.io/library/redis:alpine",containerd.WithPullUnpack)iferr!=nil{returnerr}// create a container
container,err:=client.NewContainer(ctx,"redis-server",containerd.WithImage(image),containerd.WithNewSnapshot("redis-server-snapshot",image),containerd.WithNewSpec(oci.WithImageConfig(image)),)iferr!=nil{returnerr}defercontainer.Delete(ctx,containerd.WithSnapshotCleanup)// create a task from the container
task,err:=container.NewTask(ctx,cio.NewCreator(cio.WithStdio))iferr!=nil{returnerr}defertask.Delete(ctx)// make sure we wait before calling start
exitStatusC,err:=task.Wait(ctx)iferr!=nil{fmt.Println(err)}// call start on the task to execute the redis server
iferr:=task.Start(ctx);err!=nil{returnerr}// sleep for a lil bit to see the logs
time.Sleep(3*time.Second)// kill the process and get the exit status
iferr:=task.Kill(ctx,syscall.SIGTERM);err!=nil{returnerr}// wait for the process to fully exit and print out the exit status
status:=<-exitStatusCcode,_,err:=status.Result()iferr!=nil{returnerr}fmt.Printf("redis-server exited with status: %d\n",code)returnnil}
[client] client.Pull()
1
2
3
4
5
// pull the redis image from DockerHub
image,err:=client.Pull(ctx,"docker.io/library/redis:alpine",containerd.WithPullUnpack)iferr!=nil{returnerr}
// Pull downloads the provided content into containerd's content store
// and returns a platform specific image object
func(c*Client)Pull(ctxcontext.Context,refstring,opts...RemoteOpt)(_Image,retErrerror){pullCtx:=defaultRemoteContext()for_,o:=rangeopts{iferr:=o(c,pullCtx);err!=nil{returnnil,err}}ifpullCtx.PlatformMatcher==nil{iflen(pullCtx.Platforms)>1{returnnil,errors.New("cannot pull multiplatform image locally, try Fetch")}elseiflen(pullCtx.Platforms)==0{pullCtx.PlatformMatcher=c.platform// MatchComparer 能够匹配和比较平台以过滤和排序平台。
}else{p,err:=platforms.Parse(pullCtx.Platforms[0])iferr!=nil{returnnil,fmt.Errorf("invalid platform %s: %w",pullCtx.Platforms[0],err)}pullCtx.PlatformMatcher=platforms.Only(p)}}ctx,done,err:=c.WithLease(ctx)iferr!=nil{returnnil,err}deferdone(ctx)varunpacksint32varunpackEg*errgroup.GroupvarunpackWrapperfunc(fimages.Handler)images.HandlerifpullCtx.Unpack{// unpacker only supports schema 2 image, for schema 1 this is noop.
u,err:=c.newUnpacker(ctx,pullCtx)iferr!=nil{returnnil,fmt.Errorf("create unpacker: %w",err)}unpackWrapper,unpackEg=u.handlerWrapper(ctx,pullCtx,&unpacks)deferfunc(){iferr:=unpackEg.Wait();err!=nil{ifretErr==nil{retErr=fmt.Errorf("unpack: %w",err)}}}()wrapper:=pullCtx.HandlerWrapperpullCtx.HandlerWrapper=func(himages.Handler)images.Handler{ifwrapper==nil{returnunpackWrapper(h)}returnunpackWrapper(wrapper(h))}}// 获取镜像的主要逻辑都在 fetch 方法
img,err:=c.fetch(ctx,pullCtx,ref,1)iferr!=nil{returnnil,err}// NOTE(fuweid): unpacker defers blobs download. before create image
// record in ImageService, should wait for unpacking(including blobs
// download).
ifpullCtx.Unpack{ifunpackEg!=nil{// 等待镜像相关文件下载完成
iferr:=unpackEg.Wait();err!=nil{returnnil,err}}}// 调用containerd接口,往 /var/lib/containerd/io.containerd.metadata.v1.bolt/meta.db
// 数据库写入一个 images
img,err=c.createNewImage(ctx,img)iferr!=nil{returnnil,err}i:=NewImageWithPlatform(c,img,pullCtx.PlatformMatcher)ifpullCtx.Unpack{ifunpacks==0{// Try to unpack is none is done previously.
// This is at least required for schema 1 image.
iferr:=i.Unpack(ctx,pullCtx.Snapshotter,pullCtx.UnpackOpts...);err!=nil{returnnil,fmt.Errorf("failed to unpack image on snapshotter %s: %w",pullCtx.Snapshotter,err)}}}returni,nil}
func(c*Client)fetch(ctxcontext.Context,rCtx*RemoteContext,refstring,limitint)(images.Image,error){store:=c.ContentStore()// 通过 https://registry-1.docker.io/v2/library/redis/manifests/5.0.9 获取 Digest
// 内容大致如下:sha256:2a9865e55c37293b71df051922022898d8e4ec0f579c9b53a0caee1b170bc81c
name,desc,err:=rCtx.Resolver.Resolve(ctx,ref)iferr!=nil{returnimages.Image{},fmt.Errorf("failed to resolve reference %q: %w",ref,err)}fetcher,err:=rCtx.Resolver.Fetcher(ctx,name)iferr!=nil{returnimages.Image{},fmt.Errorf("failed to get fetcher for %q: %w",name,err)}var(handlerimages.HandlerisConvertibleboolconverterFuncfunc(context.Context,ocispec.Descriptor)(ocispec.Descriptor,error)limiter*semaphore.Weighted)ifdesc.MediaType==images.MediaTypeDockerSchema1Manifest&&rCtx.ConvertSchema1{schema1Converter:=schema1.NewConverter(store,fetcher)handler=images.Handlers(append(rCtx.BaseHandlers,schema1Converter)...)isConvertible=trueconverterFunc=func(ctxcontext.Context,_ocispec.Descriptor)(ocispec.Descriptor,error){returnschema1Converter.Convert(ctx)}}else{// Get all the children for a descriptor
childrenHandler:=images.ChildrenHandler(store)// Set any children labels for that content
childrenHandler=images.SetChildrenMappedLabels(store,childrenHandler,rCtx.ChildLabelMap)ifrCtx.AllMetadata{// Filter manifests by platforms but allow to handle manifest
// and configuration for not-target platforms
childrenHandler=remotes.FilterManifestByPlatformHandler(childrenHandler,rCtx.PlatformMatcher)}else{// Filter children by platforms if specified.
childrenHandler=images.FilterPlatforms(childrenHandler,rCtx.PlatformMatcher)}// Sort and limit manifests if a finite number is needed
iflimit>0{childrenHandler=images.LimitManifests(childrenHandler,rCtx.PlatformMatcher,limit)}// set isConvertible to true if there is application/octet-stream media type
convertibleHandler:=images.HandlerFunc(func(_context.Context,descocispec.Descriptor)([]ocispec.Descriptor,error){ifdesc.MediaType==docker.LegacyConfigMediaType{isConvertible=true}return[]ocispec.Descriptor{},nil},)appendDistSrcLabelHandler,err:=docker.AppendDistributionSourceLabel(store,ref)iferr!=nil{returnimages.Image{},err}handlers:=append(rCtx.BaseHandlers,remotes.FetchHandler(store,fetcher),convertibleHandler,childrenHandler,appendDistSrcLabelHandler,)handler=images.Handlers(handlers...)converterFunc=func(ctxcontext.Context,descocispec.Descriptor)(ocispec.Descriptor,error){returndocker.ConvertManifest(ctx,store,desc)}}ifrCtx.HandlerWrapper!=nil{handler=rCtx.HandlerWrapper(handler)}ifrCtx.MaxConcurrentDownloads>0{limiter=semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads))}// 遍历下载相关配置文件和分层压缩文件
// 主要的下载任务在这里进行
iferr:=images.Dispatch(ctx,handler,limiter,desc);err!=nil{returnimages.Image{},err}ifisConvertible{ifdesc,err=converterFunc(ctx,desc);err!=nil{returnimages.Image{},err}}returnimages.Image{Name:name,Target:desc,Labels:rCtx.Labels,},nil}
// create a container
container,err:=client.NewContainer(ctx,"redis-server",containerd.WithImage(image),containerd.WithNewSnapshot("redis-server-snapshot",image),containerd.WithNewSpec(oci.WithImageConfig(image)),)
// NewContainer will create a new container with the provided id.
// The id must be unique within the namespace.
func(c*Client)NewContainer(ctxcontext.Context,idstring,opts...NewContainerOpts)(Container,error){ctx,done,err:=c.WithLease(ctx)iferr!=nil{returnnil,err}deferdone(ctx)container:=containers.Container{ID:id,Runtime:containers.RuntimeInfo{Name:c.runtime,},}for_,o:=rangeopts{iferr:=o(ctx,c,&container);err!=nil{returnnil,err}}r,err:=c.ContainerService().Create(ctx,container)iferr!=nil{returnnil,err}returncontainerFromRecord(c,r),nil}
// WithImage sets the provided image as the base for the container
// 将提供的图像设置为容器的基础
funcWithImage(iImage)NewContainerOpts{returnfunc(ctxcontext.Context,client*Client,c*containers.Container)error{c.Image=i.Name()returnnil}}
WithNewSpec:
1
2
3
4
5
6
7
8
9
10
11
12
// WithNewSpec generates a new spec for a new container
// 为新容器生成新规范
funcWithNewSpec(opts...oci.SpecOpts)NewContainerOpts{returnfunc(ctxcontext.Context,client*Client,c*containers.Container)error{s,err:=oci.GenerateSpec(ctx,client,c,opts...)iferr!=nil{returnerr}c.Spec,err=typeurl.MarshalAny(s)returnerr}}
// WithNewSnapshot allocates a new snapshot to be used by the container as the
// root filesystem in read-write mode
// 分配一个新的快照供容器用作读写模式下的根文件系统
funcWithNewSnapshot(idstring,iImage,opts...snapshots.Opt)NewContainerOpts{returnfunc(ctxcontext.Context,client*Client,c*containers.Container)error{// 从 meta.db 中拿到config,从而拿到diffIDs
diffIDs,err:=i.RootFS(ctx)iferr!=nil{returnerr}// 根据 diffIDs 获取 ChainIDs
// 进而获取 parent(即镜像的finalLayer,作为可读写层的parent)
parent:=identity.ChainID(diffIDs).String()c.Snapshotter,err=client.resolveSnapshotterName(ctx,c.Snapshotter)iferr!=nil{returnerr}s,err:=client.getSnapshotter(ctx,c.Snapshotter)iferr!=nil{returnerr}// Prepare
if_,err:=s.Prepare(ctx,id,parent,opts...);err!=nil{returnerr}c.SnapshotKey=idc.Image=i.Name()returnnil}}
➜ ~ sudo nerdctl ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
redis-server docker.io/library/redis:5.0.9 "docker-entrypoint.s…" 8 minutes ago Created
func(c*container)NewTask(ctxcontext.Context,ioCreatecio.Creator,opts...NewTaskOpts)(_Task,errerror){i,err:=ioCreate(c.id)iferr!=nil{returnnil,err}deferfunc(){iferr!=nil&&i!=nil{i.Cancel()i.Close()}}()// 配置三个std
// Stdin=/run/containerd/fifo/908671294/redis-server-stdin
// Stdout=/run/containerd/fifo/908671294/redis-server-stdout
// Stderr=/run/containerd/fifo/908671294/redis-server-stderr
cfg:=i.Config()request:=&tasks.CreateTaskRequest{ContainerID:c.id,Terminal:cfg.Terminal,Stdin:cfg.Stdin,Stdout:cfg.Stdout,Stderr:cfg.Stderr,}r,err:=c.get(ctx)iferr!=nil{returnnil,err}ifr.SnapshotKey!=""{ifr.Snapshotter==""{returnnil,fmt.Errorf("unable to resolve rootfs mounts without snapshotter on container: %w",errdefs.ErrInvalidArgument)}// get the rootfs from the snapshotter and add it to the request
s,err:=c.client.getSnapshotter(ctx,r.Snapshotter)iferr!=nil{returnnil,err}// Options[0]: index=off
// Options[1]: workdir=/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/7/work
// Options[2]: upperdir=/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/7/fs
// Options[3]: lowerdir=/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/6/fs:/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/5/fs:/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/4/fs:/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/3/fs:/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/2/fs:/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/1/fs
mounts,err:=s.Mounts(ctx,r.SnapshotKey)iferr!=nil{returnnil,err}spec,err:=c.Spec(ctx)iferr!=nil{returnnil,err}for_,m:=rangemounts{ifspec.Linux!=nil&&spec.Linux.MountLabel!=""{context:=label.FormatMountLabel("",spec.Linux.MountLabel)ifcontext!=""{m.Options=append(m.Options,context)}}request.Rootfs=append(request.Rootfs,&types.Mount{Type:m.Type,Source:m.Source,Options:m.Options,})}}info:=TaskInfo{runtime:r.Runtime.Name,}for_,o:=rangeopts{iferr:=o(ctx,c.client,&info);err!=nil{returnnil,err}}ifinfo.RootFS!=nil{for_,m:=rangeinfo.RootFS{request.Rootfs=append(request.Rootfs,&types.Mount{Type:m.Type,Source:m.Source,Options:m.Options,})}}ifinfo.Options!=nil{any,err:=typeurl.MarshalAny(info.Options)iferr!=nil{returnnil,err}request.Options=any}t:=&task{client:c.client,io:i,id:c.id,c:c,}ifinfo.Checkpoint!=nil{request.Checkpoint=info.Checkpoint}//
response,err:=c.client.TaskService().Create(ctx,request)iferr!=nil{returnnil,errdefs.FromGRPC(err)}t.pid=response.Pidreturnt,nil}
func(l*local)Create(ctxcontext.Context,r*api.CreateTaskRequest,_...grpc.CallOption)(*api.CreateTaskResponse,error){container,err:=l.getContainer(ctx,r.ContainerID)iferr!=nil{returnnil,errdefs.ToGRPC(err)}checkpointPath,err:=getRestorePath(container.Runtime.Name,r.Options)iferr!=nil{returnnil,err}// jump get checkpointPath from checkpoint image
ifcheckpointPath==""&&r.Checkpoint!=nil{checkpointPath,err=os.MkdirTemp(os.Getenv("XDG_RUNTIME_DIR"),"ctrd-checkpoint")iferr!=nil{returnnil,err}ifr.Checkpoint.MediaType!=images.MediaTypeContainerd1Checkpoint{returnnil,fmt.Errorf("unsupported checkpoint type %q",r.Checkpoint.MediaType)}reader,err:=l.store.ReaderAt(ctx,ocispec.Descriptor{MediaType:r.Checkpoint.MediaType,Digest:digest.Digest(r.Checkpoint.Digest),Size:r.Checkpoint.Size,Annotations:r.Checkpoint.Annotations,})iferr!=nil{returnnil,err}_,err=archive.Apply(ctx,checkpointPath,content.NewReader(reader))reader.Close()iferr!=nil{returnnil,err}}opts:=runtime.CreateOpts{Spec:container.Spec,IO:runtime.IO{Stdin:r.Stdin,Stdout:r.Stdout,Stderr:r.Stderr,Terminal:r.Terminal,},Checkpoint:checkpointPath,Runtime:container.Runtime.Name,RuntimeOptions:container.Runtime.Options,TaskOptions:r.Options,SandboxID:container.SandboxID,}ifr.RuntimePath!=""{opts.Runtime=r.RuntimePath}for_,m:=ranger.Rootfs{opts.Rootfs=append(opts.Rootfs,mount.Mount{Type:m.Type,Source:m.Source,Options:m.Options,})}ifstrings.HasPrefix(container.Runtime.Name,"io.containerd.runtime.v1."){log.G(ctx).Warn("runtime v1 is deprecated since containerd v1.4, consider using runtime v2")}elseifcontainer.Runtime.Name==plugin.RuntimeRuncV1{log.G(ctx).Warnf("%q is deprecated since containerd v1.4, consider using %q",plugin.RuntimeRuncV1,plugin.RuntimeRuncV2)}rtime,err:=l.getRuntime(container.Runtime.Name)iferr!=nil{returnnil,err}_,err=rtime.Get(ctx,r.ContainerID)iferr!=nil&&!errdefs.IsNotFound(err){returnnil,errdefs.ToGRPC(err)}iferr==nil{returnnil,errdefs.ToGRPC(fmt.Errorf("task %s: %w",r.ContainerID,errdefs.ErrAlreadyExists))}// Create 启动新的 shim 实例并创建新任务
c,err:=rtime.Create(ctx,r.ContainerID,opts)iferr!=nil{returnnil,errdefs.ToGRPC(err)}labels:=map[string]string{"runtime":container.Runtime.Name}iferr:=l.monitor.Monitor(c,labels);err!=nil{returnnil,fmt.Errorf("monitor task: %w",err)}pid,err:=c.PID(ctx)iferr!=nil{returnnil,fmt.Errorf("failed to get task pid: %w",err)}return&api.CreateTaskResponse{ContainerID:r.ContainerID,Pid:pid,},nil}
// Create launches new shim instance and creates new task
func(m*TaskManager)Create(ctxcontext.Context,taskIDstring,optsruntime.CreateOpts)(runtime.Task,error){// 1
shim,err:=m.manager.Start(ctx,taskID,opts)iferr!=nil{returnnil,fmt.Errorf("failed to start shim: %w",err)}// Cast to shim task and call task service to create a new container task instance.
// This will not be required once shim service / client implemented.
shimTask:=newShimTask(shim)// 2
t,err:=shimTask.Create(ctx,opts)iferr!=nil{// NOTE: ctx contains required namespace information.
m.manager.shims.Delete(ctx,taskID)dctx,cancel:=timeout.WithContext(context.Background(),cleanupTimeout)defercancel()sandboxed:=opts.SandboxID!=""_,errShim:=shimTask.delete(dctx,sandboxed,func(context.Context,string){})iferrShim!=nil{iferrdefs.IsDeadlineExceeded(errShim){dctx,cancel=timeout.WithContext(context.Background(),cleanupTimeout)defercancel()}shimTask.Shutdown(dctx)shimTask.Client().Close()}returnnil,fmt.Errorf("failed to create shim task: %w",err)}returnt,nil}
// Start launches a new shim instance
func(m*ShimManager)Start(ctxcontext.Context,idstring,optsruntime.CreateOpts)(_ShimInstance,retErrerror){// 在 /run/containerd/io.containerd.runtime.v2.task/路径下,创建容器工作目录
// /run/containerd/io.containerd.runtime.v2.task/default/redis-server/work -> /var/lib/containerd/io.containerd.runtime.v2.task/default/redis-server
// 并将 config.json 写入 /run/containerd/io.containerd.runtime.v2.task/default/redis-server/
// 很明显,这在为runc的运行准备bundle
bundle,err:=NewBundle(ctx,m.root,m.state,id,opts.Spec)iferr!=nil{returnnil,err}deferfunc(){ifretErr!=nil{bundle.Delete()}}()// This container belongs to sandbox which supposed to be already started via sandbox API.
ifopts.SandboxID!=""{process,err:=m.Get(ctx,opts.SandboxID)iferr!=nil{returnnil,fmt.Errorf("can't find sandbox %s",opts.SandboxID)}// Write sandbox ID this task belongs to.
iferr:=os.WriteFile(filepath.Join(bundle.Path,"sandbox"),[]byte(opts.SandboxID),0600);err!=nil{returnnil,err}address,err:=shimbinary.ReadAddress(filepath.Join(m.state,process.Namespace(),opts.SandboxID,"address"))iferr!=nil{returnnil,fmt.Errorf("failed to get socket address for sandbox %q: %w",opts.SandboxID,err)}// Use sandbox's socket address to handle task requests for this container.
iferr:=shimbinary.WriteAddress(filepath.Join(bundle.Path,"address"),address);err!=nil{returnnil,err}shim,err:=loadShim(ctx,bundle,func(){})iferr!=nil{returnnil,fmt.Errorf("failed to load sandbox task %q: %w",opts.SandboxID,err)}iferr:=m.shims.Add(ctx,shim);err!=nil{returnnil,err}returnshim,nil}shim,err:=m.startShim(ctx,bundle,id,opts)iferr!=nil{returnnil,err}deferfunc(){ifretErr!=nil{m.cleanupShim(shim)}}()iferr:=m.shims.Add(ctx,shim);err!=nil{returnnil,fmt.Errorf("failed to add task: %w",err)}returnshim,nil}
// NewBundle returns a new bundle on disk
funcNewBundle(ctxcontext.Context,root,state,idstring,spectypeurl.Any)(b*Bundle,errerror){iferr:=identifiers.Validate(id);err!=nil{returnnil,fmt.Errorf("invalid task id %s: %w",id,err)}ns,err:=namespaces.NamespaceRequired(ctx)iferr!=nil{returnnil,err}work:=filepath.Join(root,ns,id)b=&Bundle{ID:id,Path:filepath.Join(state,ns,id),Namespace:ns,}varpaths[]stringdeferfunc(){iferr!=nil{for_,d:=rangepaths{os.RemoveAll(d)}}}()// create state directory for the bundle
iferr:=os.MkdirAll(filepath.Dir(b.Path),0711);err!=nil{returnnil,err}iferr:=os.Mkdir(b.Path,0700);err!=nil{returnnil,err}iftypeurl.Is(spec,&specs.Spec{}){iferr:=prepareBundleDirectoryPermissions(b.Path,spec.GetValue());err!=nil{returnnil,err}}paths=append(paths,b.Path)// create working directory for the bundle
iferr:=os.MkdirAll(filepath.Dir(work),0711);err!=nil{returnnil,err}rootfs:=filepath.Join(b.Path,"rootfs")iferr:=os.MkdirAll(rootfs,0711);err!=nil{returnnil,err}paths=append(paths,rootfs)iferr:=os.Mkdir(work,0711);err!=nil{if!os.IsExist(err){returnnil,err}os.RemoveAll(work)iferr:=os.Mkdir(work,0711);err!=nil{returnnil,err}}paths=append(paths,work)// symlink workdir
// /run/containerd/io.containerd.runtime.v2.task/default/redis-server/work -> /var/lib/containerd/io.containerd.runtime.v2.task/default/redis-server
iferr:=os.Symlink(work,filepath.Join(b.Path,"work"));err!=nil{returnnil,err}ifspec:=spec.GetValue();spec!=nil{// write the spec to the bundle
err=os.WriteFile(filepath.Join(b.Path,configFilename),spec,0666)iferr!=nil{returnnil,fmt.Errorf("failed to write %s",configFilename)}}returnb,nil}
新增的文件夹如下:
1
2
3
4
5
6
7
8
xiu-desktop# pwd
/run/containerd/io.containerd.runtime.v2.task/default
xiu-desktop# tree
.
└── redis-server
├── config.json
├── rootfs
└── work -> /var/lib/containerd/io.containerd.runtime.v2.task/default/redis-server
func(m*ShimManager)startShim(ctxcontext.Context,bundle*Bundle,idstring,optsruntime.CreateOpts)(*shim,error){ns,err:=namespaces.NamespaceRequired(ctx)iferr!=nil{returnnil,err}topts:=opts.TaskOptionsiftopts==nil||topts.GetValue()==nil{topts=opts.RuntimeOptions}runtimePath,err:=m.resolveRuntimePath(opts.Runtime)iferr!=nil{returnnil,fmt.Errorf("failed to resolve runtime path: %w",err)}b:=shimBinary(bundle,shimBinaryConfig{runtime:runtimePath,address:m.containerdAddress,ttrpcAddress:m.containerdTTRPCAddress,schedCore:m.schedCore,})// 启动shim
shim,err:=b.Start(ctx,protobuf.FromAny(topts),func(){log.G(ctx).WithField("id",id).Info("shim disconnected")cleanupAfterDeadShim(context.Background(),id,ns,m.shims,m.events,b)// Remove self from the runtime task list. Even though the cleanupAfterDeadShim()
// would publish taskExit event, but the shim.Delete() would always failed with ttrpc
// disconnect and there is no chance to remove this dead task from runtime task lists.
// Thus it's better to delete it here.
m.shims.Delete(ctx,id)})iferr!=nil{returnnil,fmt.Errorf("start failed: %w",err)}returnshim,nil}
func(b*binary)Start(ctxcontext.Context,opts*types.Any,onClosefunc())(_*shim,errerror){args:=[]string{"-id",b.bundle.ID}switchlogrus.GetLevel(){caselogrus.DebugLevel,logrus.TraceLevel:args=append(args,"-debug")}args=append(args,"start")cmd,err:=client.Command(ctx,&client.CommandConfig{Runtime:b.runtime,Address:b.containerdAddress,TTRPCAddress:b.containerdTTRPCAddress,Path:b.bundle.Path,Opts:opts,Args:args,SchedCore:b.schedCore,})iferr!=nil{returnnil,err}// Windows needs a namespace when openShimLog
ns,_:=namespaces.Namespace(ctx)shimCtx,cancelShimLog:=context.WithCancel(namespaces.WithNamespace(context.Background(),ns))deferfunc(){iferr!=nil{cancelShimLog()}}()f,err:=openShimLog(shimCtx,b.bundle,client.AnonDialer)iferr!=nil{returnnil,fmt.Errorf("open shim log pipe: %w",err)}deferfunc(){iferr!=nil{f.Close()}}()// open the log pipe and block until the writer is ready
// this helps with synchronization of the shim
// copy the shim's logs to containerd's output
gofunc(){deferf.Close()_,err:=io.Copy(os.Stderr,f)// To prevent flood of error messages, the expected error
// should be reset, like os.ErrClosed or os.ErrNotExist, which
// depends on platform.
err=checkCopyShimLogError(ctx,err)iferr!=nil{log.G(ctx).WithError(err).Error("copy shim log")}}()// containerd 启动 shim 进程,shim启动一个简单的rpc服务
// 返回unix:///run/containerd/s/41107b8f6663c77e690f1e545ff41ce9039b6106896f6cf5a137e23c73c363c1
out,err:=cmd.CombinedOutput()iferr!=nil{returnnil,fmt.Errorf("%s: %w",out,err)}address:=strings.TrimSpace(string(out))// 连接 shim
conn,err:=client.Connect(address,client.AnonDialer)iferr!=nil{returnnil,err}onCloseWithShimLog:=func(){onClose()cancelShimLog()f.Close()}// Save runtime binary path for restore.
// 保存运行时二进制路径
// xiu-desktop# cat /run/containerd/io.containerd.runtime.v2.task/default/redis-server/shim-binary-path
// /usr/bin/containerd-shim-runc-v2
iferr:=os.WriteFile(filepath.Join(b.bundle.Path,"shim-binary-path"),[]byte(b.runtime),0600);err!=nil{returnnil,err}client:=ttrpc.NewClient(conn,ttrpc.WithOnClose(onCloseWithShimLog))return&shim{bundle:b.bundle,client:client,},nil}
// Create a new initial process and container with the underlying OCI runtime
func(s*service)Create(ctxcontext.Context,r*taskAPI.CreateTaskRequest)(_*taskAPI.CreateTaskResponse,errerror){s.mu.Lock()defers.mu.Unlock()container,err:=runc.NewContainer(ctx,s.platform,r)iferr!=nil{returnnil,err}s.containers[r.ID]=containers.send(&eventstypes.TaskCreate{ContainerID:r.ID,Bundle:r.Bundle,Rootfs:r.Rootfs,IO:&eventstypes.TaskIO{Stdin:r.Stdin,Stdout:r.Stdout,Stderr:r.Stderr,Terminal:r.Terminal,},Checkpoint:r.Checkpoint,Pid:uint32(container.Pid()),})return&taskAPI.CreateTaskResponse{Pid:uint32(container.Pid()),},nil}
// NewContainer returns a new runc container
funcNewContainer(ctxcontext.Context,platformstdio.Platform,r*task.CreateTaskRequest)(_*Container,retErrerror){ns,err:=namespaces.NamespaceRequired(ctx)iferr!=nil{returnnil,fmt.Errorf("create namespace: %w",err)}opts:=&options.Options{}ifr.Options.GetValue()!=nil{v,err:=typeurl.UnmarshalAny(r.Options)iferr!=nil{returnnil,err}ifv!=nil{opts=v.(*options.Options)}}varmounts[]process.Mountfor_,m:=ranger.Rootfs{mounts=append(mounts,process.Mount{Type:m.Type,Source:m.Source,Target:m.Target,Options:m.Options,})}rootfs:=""iflen(mounts)>0{rootfs=filepath.Join(r.Bundle,"rootfs")iferr:=os.Mkdir(rootfs,0711);err!=nil&&!os.IsExist(err){returnnil,err}}config:=&process.CreateConfig{ID:r.ID,Bundle:r.Bundle,Runtime:opts.BinaryName,Rootfs:mounts,Terminal:r.Terminal,Stdin:r.Stdin,Stdout:r.Stdout,Stderr:r.Stderr,Checkpoint:r.Checkpoint,ParentCheckpoint:r.ParentCheckpoint,Options:r.Options,}iferr:=WriteOptions(r.Bundle,opts);err!=nil{returnnil,err}// For historical reason, we write opts.BinaryName as well as the entire opts
iferr:=WriteRuntime(r.Bundle,opts.BinaryName);err!=nil{returnnil,err}deferfunc(){ifretErr!=nil{iferr:=mount.UnmountAll(rootfs,0);err!=nil{logrus.WithError(err).Warn("failed to cleanup rootfs mount")}}}()// 在这里进行rootfs的挂载
for_,rm:=rangemounts{m:=&mount.Mount{Type:rm.Type,Source:rm.Source,Options:rm.Options,}iferr:=m.Mount(rootfs);err!=nil{returnnil,fmt.Errorf("failed to mount rootfs component %v: %w",m,err)}}p,err:=newInit(ctx,r.Bundle,filepath.Join(r.Bundle,"work"),ns,platform,config,opts,rootfs,)iferr!=nil{returnnil,errdefs.ToGRPC(err)}// 组装 p 和 config,准备调用runc
iferr:=p.Create(ctx,config);err!=nil{returnnil,errdefs.ToGRPC(err)}container:=&Container{ID:r.ID,Bundle:r.Bundle,process:p,processes:make(map[string]process.Process),reservedProcess:make(map[string]struct{}),}pid:=p.Pid()ifpid>0{varcginterface{}ifcgroups.Mode()==cgroups.Unified{g,err:=cgroupsv2.PidGroupPath(pid)iferr!=nil{logrus.WithError(err).Errorf("loading cgroup2 for %d",pid)returncontainer,nil}cg,err=cgroupsv2.LoadManager("/sys/fs/cgroup",g)iferr!=nil{logrus.WithError(err).Errorf("loading cgroup2 for %d",pid)}}else{cg,err=cgroups.Load(cgroups.V1,cgroups.PidPath(pid))iferr!=nil{logrus.WithError(err).Errorf("loading cgroup for %d",pid)}}container.cgroup=cg}returncontainer,nil}
for_,rm:=rangemounts{m:=&mount.Mount{Type:rm.Type,Source:rm.Source,Options:rm.Options,}iferr:=m.Mount(rootfs);err!=nil{returnnil,fmt.Errorf("failed to mount rootfs component %v: %w",m,err)}}
// Create the process with the provided config
func(p*Init)Create(ctxcontext.Context,r*CreateConfig)error{var(errerrorsocket*runc.Socketpio*processIOpidFile=newPidFile(p.Bundle))ifr.Terminal{ifsocket,err=runc.NewTempConsoleSocket();err!=nil{returnfmt.Errorf("failed to create OCI runtime console socket: %w",err)}defersocket.Close()}else{ifpio,err=createIO(ctx,p.id,p.IoUID,p.IoGID,p.stdio);err!=nil{returnfmt.Errorf("failed to create init process I/O: %w",err)}p.io=pio}ifr.Checkpoint!=""{returnp.createCheckpointedState(r,pidFile)}opts:=&runc.CreateOpts{PidFile:pidFile.Path(),NoPivot:p.NoPivotRoot,NoNewKeyring:p.NoNewKeyring,}// 配置相关io
ifp.io!=nil{opts.IO=p.io.IO()}ifsocket!=nil{opts.ConsoleSocket=socket}//
iferr:=p.runtime.Create(ctx,r.ID,r.Bundle,opts);err!=nil{returnp.runtimeError(err,"OCI runtime create failed")}ifr.Stdin!=""{iferr:=p.openStdin(r.Stdin);err!=nil{returnerr}}ctx,cancel:=context.WithTimeout(ctx,30*time.Second)defercancel()ifsocket!=nil{console,err:=socket.ReceiveMaster()iferr!=nil{returnfmt.Errorf("failed to retrieve console master: %w",err)}console,err=p.Platform.CopyConsole(ctx,console,p.id,r.Stdin,r.Stdout,r.Stderr,&p.wg)iferr!=nil{returnfmt.Errorf("failed to start console copy: %w",err)}p.console=console}else{iferr:=pio.Copy(ctx,&p.wg);err!=nil{returnfmt.Errorf("failed to start io pipe copy: %w",err)}}pid,err:=pidFile.Read()iferr!=nil{returnfmt.Errorf("failed to retrieve OCI runtime container pid: %w",err)}p.pid=pidreturnnil}
// Create creates a new container and returns its pid if it was created successfully
func(r*Runc)Create(contextcontext.Context,id,bundlestring,opts*CreateOpts)error{args:=[]string{"create","--bundle",bundle}ifopts!=nil{oargs,err:=opts.args()iferr!=nil{returnerr}args=append(args,oargs...)}cmd:=r.command(context,append(args,id)...)ifopts!=nil&&opts.IO!=nil{opts.Set(cmd)}cmd.ExtraFiles=opts.ExtraFilesifcmd.Stdout==nil&&cmd.Stderr==nil{data,err:=cmdOutput(cmd,true,nil)deferputBuf(data)iferr!=nil{returnfmt.Errorf("%s: %s",err,data.String())}returnnil}ec,err:=Monitor.Start(cmd)iferr!=nil{returnerr}ifopts!=nil&&opts.IO!=nil{ifc,ok:=opts.IO.(StartCloser);ok{iferr:=c.CloseAfterStart();err!=nil{returnerr}}}status,err:=Monitor.Wait(cmd,ec)iferr==nil&&status!=0{err=fmt.Errorf("%s did not terminate successfully: %w",cmd.Args[0],&ExitError{status})}returnerr}
ec, err := Monitor.Start(cmd)
1
2
3
4
5
6
7
8
9
// Start starts the command a registers the process with the reaper
func(m*Monitor)Start(c*exec.Cmd)(chanrunc.Exit,error){ec:=m.Subscribe()iferr:=c.Start();err!=nil{m.Unsubscribe(ec)returnnil,err}returnec,nil}
// Start a process
func(s*service)Start(ctxcontext.Context,r*taskAPI.StartRequest)(*taskAPI.StartResponse,error){container,err:=s.getContainer(r.ID)iferr!=nil{returnnil,err}// hold the send lock so that the start events are sent before any exit events in the error case
s.eventSendMu.Lock()//
p,err:=container.Start(ctx,r)iferr!=nil{s.eventSendMu.Unlock()returnnil,errdefs.ToGRPC(err)}switchr.ExecID{case"":switchcg:=container.Cgroup().(type){casecgroups.Cgroup:iferr:=s.ep.Add(container.ID,cg);err!=nil{logrus.WithError(err).Error("add cg to OOM monitor")}case*cgroupsv2.Manager:allControllers,err:=cg.RootControllers()iferr!=nil{logrus.WithError(err).Error("failed to get root controllers")}else{iferr:=cg.ToggleControllers(allControllers,cgroupsv2.Enable);err!=nil{ifuserns.RunningInUserNS(){logrus.WithError(err).Debugf("failed to enable controllers (%v)",allControllers)}else{logrus.WithError(err).Errorf("failed to enable controllers (%v)",allControllers)}}}iferr:=s.ep.Add(container.ID,cg);err!=nil{logrus.WithError(err).Error("add cg to OOM monitor")}}s.send(&eventstypes.TaskStart{ContainerID:container.ID,Pid:uint32(p.Pid()),})default:s.send(&eventstypes.TaskExecStarted{ContainerID:container.ID,ExecID:r.ExecID,Pid:uint32(p.Pid()),})}s.eventSendMu.Unlock()return&taskAPI.StartResponse{Pid:uint32(p.Pid()),},nil}
// Start a container process
func(c*Container)Start(ctxcontext.Context,r*task.StartRequest)(process.Process,error){p,err:=c.Process(r.ExecID)iferr!=nil{returnnil,err}//
iferr:=p.Start(ctx);err!=nil{returnnil,err}ifc.Cgroup()==nil&&p.Pid()>0{varcginterface{}ifcgroups.Mode()==cgroups.Unified{g,err:=cgroupsv2.PidGroupPath(p.Pid())iferr!=nil{logrus.WithError(err).Errorf("loading cgroup2 for %d",p.Pid())}cg,err=cgroupsv2.LoadManager("/sys/fs/cgroup",g)iferr!=nil{logrus.WithError(err).Errorf("loading cgroup2 for %d",p.Pid())}}else{cg,err=cgroups.Load(cgroups.V1,cgroups.PidPath(p.Pid()))iferr!=nil{logrus.WithError(err).Errorf("loading cgroup for %d",p.Pid())}}c.cgroup=cg}returnp,nil}
1
2
3
4
5
6
7
8
// Start the init process
func(p*Init)Start(ctxcontext.Context)error{p.mu.Lock()deferp.mu.Unlock()//
returnp.initState.Start(ctx)}
// Start will start an already created container
func(r*Runc)Start(contextcontext.Context,idstring)error{//
returnr.runOrError(r.command(context,"start",id))}
// runOrError will run the provided command. If an error is
// encountered and neither Stdout or Stderr was set the error and the
// stderr of the command will be returned in the format of <error>:
// <stderr>
func(r*Runc)runOrError(cmd*exec.Cmd)error{ifcmd.Stdout!=nil||cmd.Stderr!=nil{ec,err:=Monitor.Start(cmd)iferr!=nil{returnerr}status,err:=Monitor.Wait(cmd,ec)iferr==nil&&status!=0{err=fmt.Errorf("%s did not terminate successfully: %w",cmd.Args[0],&ExitError{status})}returnerr}//
data,err:=cmdOutput(cmd,true,nil)deferputBuf(data)iferr!=nil{returnfmt.Errorf("%s: %s",err,data.String())}returnnil}
// callers of cmdOutput are expected to call putBuf on the returned Buffer
// to ensure it is released back to the shared pool after use.
funccmdOutput(cmd*exec.Cmd,combinedbool,startedchan<-int)(*bytes.Buffer,error){b:=getBuf()cmd.Stdout=bifcombined{cmd.Stderr=b}//
ec,err:=Monitor.Start(cmd)iferr!=nil{returnnil,err}ifstarted!=nil{started<-cmd.Process.Pid}status,err:=Monitor.Wait(cmd,ec)iferr==nil&&status!=0{err=fmt.Errorf("%s did not terminate successfully: %w",cmd.Args[0],&ExitError{status})}returnb,err}
1
2
3
4
5
6
7
8
9
10
// Start starts the command a registers the process with the reaper
func(m*Monitor)Start(c*exec.Cmd)(chanrunc.Exit,error){ec:=m.Subscribe()//
iferr:=c.Start();err!=nil{m.Unsubscribe(ec)returnnil,err}returnec,nil}
// Kill a process with the provided signal
func(s*service)Kill(ctxcontext.Context,r*taskAPI.KillRequest)(*ptypes.Empty,error){container,err:=s.getContainer(r.ID)iferr!=nil{returnnil,err}//
iferr:=container.Kill(ctx,r);err!=nil{returnnil,errdefs.ToGRPC(err)}returnempty,nil}
1
2
3
4
5
6
7
8
9
// Kill a process
func(c*Container)Kill(ctxcontext.Context,r*task.KillRequest)error{p,err:=c.Process(r.ExecID)iferr!=nil{returnerr}//
returnp.Kill(ctx,r.Signal,r.All)}
1
2
3
4
5
6
7
// Kill the init process
func(p*Init)Kill(ctxcontext.Context,signaluint32,allbool)error{p.mu.Lock()deferp.mu.Unlock()//
returnp.initState.Kill(ctx,signal,all)}
// Kill sends the specified signal to the container
func(r*Runc)Kill(contextcontext.Context,idstring,sigint,opts*KillOpts)error{args:=[]string{"kill",}ifopts!=nil{args=append(args,opts.args()...)}//
returnr.runOrError(r.command(context,append(args,id,strconv.Itoa(sig))...))}
// runOrError will run the provided command. If an error is
// encountered and neither Stdout or Stderr was set the error and the
// stderr of the command will be returned in the format of <error>:
// <stderr>
func(r*Runc)runOrError(cmd*exec.Cmd)error{ifcmd.Stdout!=nil||cmd.Stderr!=nil{ec,err:=Monitor.Start(cmd)iferr!=nil{returnerr}status,err:=Monitor.Wait(cmd,ec)iferr==nil&&status!=0{err=fmt.Errorf("%s did not terminate successfully: %w",cmd.Args[0],&ExitError{status})}returnerr}//
data,err:=cmdOutput(cmd,true,nil)deferputBuf(data)iferr!=nil{returnfmt.Errorf("%s: %s",err,data.String())}returnnil}
// callers of cmdOutput are expected to call putBuf on the returned Buffer
// to ensure it is released back to the shared pool after use.
funccmdOutput(cmd*exec.Cmd,combinedbool,startedchan<-int)(*bytes.Buffer,error){b:=getBuf()cmd.Stdout=bifcombined{cmd.Stderr=b}//
ec,err:=Monitor.Start(cmd)iferr!=nil{returnnil,err}ifstarted!=nil{started<-cmd.Process.Pid}status,err:=Monitor.Wait(cmd,ec)iferr==nil&&status!=0{err=fmt.Errorf("%s did not terminate successfully: %w",cmd.Args[0],&ExitError{status})}returnb,err}
1
2
3
4
5
6
7
8
9
// Start starts the command a registers the process with the reaper
func(m*Monitor)Start(c*exec.Cmd)(chanrunc.Exit,error){ec:=m.Subscribe()iferr:=c.Start();err!=nil{m.Unsubscribe(ec)returnnil,err}returnec,nil}