// Create creates a new container and returns its pid if it was created successfully
func(r*Runc)Create(contextcontext.Context,id,bundlestring,opts*CreateOpts)error{args:=[]string{"create","--bundle",bundle}ifopts!=nil{oargs,err:=opts.args()iferr!=nil{returnerr}args=append(args,oargs...)}cmd:=r.command(context,append(args,id)...)// shim 处理 runc 的输入输出,具体看下面
// nerdctl 的-d 模式,使用binaryIO
// nerdctl 的 -it 模式,使用 fifo
// docker 的所有模式,都使用fifo
ifopts!=nil&&opts.IO!=nil{opts.Set(cmd)}cmd.ExtraFiles=opts.ExtraFilesifcmd.Stdout==nil&&cmd.Stderr==nil{data,err:=cmdOutput(cmd,true,nil)deferputBuf(data)iferr!=nil{returnfmt.Errorf("%s: %s",err,data.String())}returnnil}ec,err:=Monitor.Start(cmd)iferr!=nil{returnerr}ifopts!=nil&&opts.IO!=nil{ifc,ok:=opts.IO.(StartCloser);ok{iferr:=c.CloseAfterStart();err!=nil{returnerr}}}status,err:=Monitor.Wait(cmd,ec)iferr==nil&&status!=0{err=fmt.Errorf("%s did not terminate successfully: %w",cmd.Args[0],&ExitError{status})}returnerr}
socket 传递到runc的 --console-socket参数
这里查看下pio即几种处理方式
fifo
1
2
3
4
5
6
7
8
9
10
11
12
// Set sets the io to the exec.Cmd
func(i*pipeIO)Set(cmd*exec.Cmd){ifi.in!=nil{cmd.Stdin=i.in.r}ifi.out!=nil{cmd.Stdout=i.out.w}ifi.err!=nil{cmd.Stderr=i.err.w}}
ifr.Terminal{ifsocket,err=runc.NewTempConsoleSocket();err!=nil{returnfmt.Errorf("failed to create OCI runtime console socket: %w",err)}defersocket.Close()}else{ifpio,err=createIO(ctx,p.id,p.IoUID,p.IoGID,p.stdio);err!=nil{returnfmt.Errorf("failed to create init process I/O: %w",err)}p.io=pio}
// Create the process with the provided config
func(p*Init)Create(ctxcontext.Context,r*CreateConfig)error{var(errerrorsocket*runc.Socketpio*processIOpidFile=newPidFile(p.Bundle))ifr.Terminal{ifsocket,err=runc.NewTempConsoleSocket();err!=nil{returnfmt.Errorf("failed to create OCI runtime console socket: %w",err)}defersocket.Close()}else{...}...ifsocket!=nil{opts.ConsoleSocket=socket}iferr:=p.runtime.Create(ctx,r.ID,r.Bundle,opts);err!=nil{returnp.runtimeError(err,"OCI runtime create failed")}......ifsocket!=nil{console,err:=socket.ReceiveMaster()iferr!=nil{returnfmt.Errorf("failed to retrieve console master: %w",err)}console,err=p.Platform.CopyConsole(ctx,console,p.id,r.Stdin,r.Stdout,r.Stderr,&p.wg)iferr!=nil{returnfmt.Errorf("failed to start console copy: %w",err)}p.console=console}......returnnil}
func(p*linuxPlatform)CopyConsole(ctxcontext.Context,consoleconsole.Console,id,stdin,stdout,stderrstring,wg*sync.WaitGroup)(consconsole.Console,retErrerror){ifp.epoller==nil{returnnil,errors.New("uninitialized epoller")}epollConsole,err:=p.epoller.Add(console)iferr!=nil{returnnil,err}varcwgsync.WaitGroupifstdin!=""{in,err:=fifo.OpenFifo(context.Background(),stdin,syscall.O_RDONLY|syscall.O_NONBLOCK,0)iferr!=nil{returnnil,err}cwg.Add(1)gofunc(){cwg.Done()bp:=bufPool.Get().(*[]byte)deferbufPool.Put(bp)io.CopyBuffer(epollConsole,in,*bp)// we need to shutdown epollConsole when pipe broken
epollConsole.Shutdown(p.epoller.CloseConsole)epollConsole.Close()}()}uri,err:=url.Parse(stdout)iferr!=nil{returnnil,fmt.Errorf("unable to parse stdout uri: %w",err)}// uri.Scheme = ""
switchuri.Scheme{case"binary":......default://
outw,err:=fifo.OpenFifo(ctx,stdout,syscall.O_WRONLY,0)iferr!=nil{returnnil,err}outr,err:=fifo.OpenFifo(ctx,stdout,syscall.O_RDONLY,0)iferr!=nil{returnnil,err}wg.Add(1)cwg.Add(1)gofunc(){cwg.Done()buf:=bufPool.Get().(*[]byte)deferbufPool.Put(buf)io.CopyBuffer(outw,epollConsole,*buf)outw.Close()outr.Close()wg.Done()}()cwg.Wait()}returnepollConsole,nil}
// Create a new initial process and container with the underlying OCI runtime
func(s*service)Create(ctxcontext.Context,r*taskAPI.CreateTaskRequest)(_*taskAPI.CreateTaskResponse,errerror){s.mu.Lock()defers.mu.Unlock()container,err:=runc.NewContainer(ctx,s.platform,r)iferr!=nil{returnnil,err}s.containers[r.ID]=containers.send(&eventstypes.TaskCreate{ContainerID:r.ID,Bundle:r.Bundle,Rootfs:r.Rootfs,IO:&eventstypes.TaskIO{Stdin:r.Stdin,Stdout:r.Stdout,Stderr:r.Stderr,Terminal:r.Terminal,},Checkpoint:r.Checkpoint,Pid:uint32(container.Pid()),})return&taskAPI.CreateTaskResponse{Pid:uint32(container.Pid()),},nil}
ifr.Terminal{ifsocket,err=runc.NewTempConsoleSocket();err!=nil{returnfmt.Errorf("failed to create OCI runtime console socket: %w",err)}defersocket.Close()}else{ifpio,err=createIO(ctx,p.id,p.IoUID,p.IoGID,p.stdio);err!=nil{returnfmt.Errorf("failed to create init process I/O: %w",err)}p.io=pio}
// NewBinaryIO runs a custom binary process for pluggable shim logging
funcNewBinaryIO(ctxcontext.Context,idstring,uri*url.URL)(_runc.IO,errerror){ns,err:=namespaces.NamespaceRequired(ctx)iferr!=nil{returnnil,err}varclosers[]func()errordeferfunc(){iferr==nil{return}result:=multierror.Append(err)for_,fn:=rangeclosers{result=multierror.Append(result,fn())}err=multierror.Flatten(result)}()out,err:=newPipe()iferr!=nil{returnnil,fmt.Errorf("failed to create stdout pipes: %w",err)}closers=append(closers,out.Close)serr,err:=newPipe()iferr!=nil{returnnil,fmt.Errorf("failed to create stderr pipes: %w",err)}closers=append(closers,serr.Close)r,w,err:=os.Pipe()iferr!=nil{returnnil,err}closers=append(closers,r.Close,w.Close)// 1, 配置日志驱动子进程
cmd:=NewBinaryCmd(uri,id,ns)// 2, 配置子进程额外文件描述符
cmd.ExtraFiles=append(cmd.ExtraFiles,out.r,serr.r,w)// don't need to register this with the reaper or wait when
// running inside a shim
iferr:=cmd.Start();err!=nil{returnnil,fmt.Errorf("failed to start binary process: %w",err)}closers=append(closers,func()error{returncmd.Process.Kill()})// close our side of the pipe after start
iferr:=w.Close();err!=nil{returnnil,fmt.Errorf("failed to close write pipe after start: %w",err)}// wait for the logging binary to be ready
b:=make([]byte,1)if_,err:=r.Read(b);err!=nil&&err!=io.EOF{returnnil,fmt.Errorf("failed to read from logging binary: %w",err)}return&binaryIO{cmd:cmd,out:out,err:serr,},nil}
// nerdctl/pkg/logging/logging.go
funcgetLoggerFunc(dataStorestring)(logging.LoggerFunc,error){ifdataStore==""{returnnil,errors.New("got empty data store")}returnfunc(_context.Context,config*logging.Config,readyfunc()error)error{ifconfig.Namespace==""||config.ID==""{returnerrors.New("got invalid config")}logConfigFilePath:=LogConfigFilePath(dataStore,config.Namespace,config.ID)if_,err:=os.Stat(logConfigFilePath);err==nil{logConfig,err:=LoadLogConfig(dataStore,config.Namespace,config.ID)iferr!=nil{returnerr}driver,err:=GetDriver(logConfig.Driver,logConfig.Opts)iferr!=nil{returnerr}iferr:=ready();err!=nil{returnerr}returndriver.Process(dataStore,config)}elseif!errors.Is(err,os.ErrNotExist){// the file does not exist if the container was created with nerdctl < 0.20
returnerr}returnnil},nil}
func(jsonLogger*JSONLogger)Process(dataStorestring,config*logging.Config)error{varjsonFilePathstringiflogPath,ok:=jsonLogger.Opts[LogPath];ok{jsonFilePath=logPath}else{jsonFilePath=jsonfile.Path(dataStore,config.Namespace,config.ID)}l:=&logrotate.Logger{Filename:jsonFilePath,}//maxSize Defaults to unlimited.
varcapValint64capVal=-1ifcapacity,ok:=jsonLogger.Opts[MaxSize];ok{varerrerrorcapVal,err=units.FromHumanSize(capacity)iferr!=nil{returnerr}ifcapVal<=0{returnfmt.Errorf("max-size must be a positive number")}}l.MaxBytes=capValmaxFile:=1ifmaxFileString,ok:=jsonLogger.Opts[MaxFile];ok{varerrerrormaxFile,err=strconv.Atoi(maxFileString)iferr!=nil{returnerr}ifmaxFile<1{returnfmt.Errorf("max-file cannot be less than 1")}}// MaxBackups does not include file to write logs to
l.MaxBackups=maxFile-1returnjsonfile.Encode(l,config.Stdout,config.Stderr)}
最终的日志处理,是使用 jsonfile 框架,将日志保存到 jsonFilePath 路径中。
小结
关于 runc 与 shim 之间的日志处理关系做一个小结:
这里不考虑runc的前台模式(因为containerd不会使用该模式)。
NewTerminal & Detached
shim创建sock文件,传递给runc。
外部驱动再与shim交互
NewTerminal & Detached
shim直接将 runc 的输入输出,与外部驱动交互。
nerdctl-log-example
下面通过简单的代码,模拟 nerdctl run -d --name runcdev1 q946666800/runcdev日志的搜集过程。
// drive.go
packagemainimport("bufio""context""encoding/json""fmt""io""os""os/signal""sync""time""github.com/fahedouch/go-logrotate""github.com/sirupsen/logrus""golang.org/x/sys/unix")funcmain(){fmt.Println("log drive start!!!")ctx,cancel:=context.WithCancel(context.Background())defercancel()var(sigCh=make(chanos.Signal,32)errCh=make(chanerror,1))signal.Notify(sigCh,unix.SIGTERM)// cmd.ExtraFiles = append(cmd.ExtraFiles, out.r, serr.r, w)
out:=os.NewFile(3,"CONTAINER_STDOUT")serr:=os.NewFile(4,"CONTAINER_STDERR")wait:=os.NewFile(5,"CONTAINER_WAIT")gofunc(){errCh<-logger(ctx,out,serr,wait.Close)}()for{select{case<-sigCh:cancel()caseerr:=<-errCh:iferr!=nil{fmt.Fprintln(os.Stderr,err)os.Exit(1)}fmt.Println("log drive exit 0")os.Exit(0)}}}funclogger(_context.Context,out*os.File,serr*os.File,readyfunc()error)error{// Notify the shim that it is ready
// call wait.Close
// r will receive io.EOF error
iferr:=ready();err!=nil{returnerr}// log path
jsonFilePath:="app.log"l:=&logrotate.Logger{Filename:jsonFilePath,}returnEncode(l,out,serr)}// Entry is compatible with Docker "json-file" logs
typeEntrystruct{Logstring`json:"log,omitempty"`// line, including "\r\n"
Streamstring`json:"stream,omitempty"`// "stdout" or "stderr"
Timetime.Time`json:"time"`// e.g. "2020-12-11T20:29:41.939902251Z"
}funcEncode(wio.WriteCloser,stdout,stderrio.Reader)error{enc:=json.NewEncoder(w)varencMusync.Mutexvarwgsync.WaitGroupwg.Add(2)f:=func(rio.Reader,namestring){deferwg.Done()br:=bufio.NewReader(r)e:=&Entry{Stream:name,}for{line,err:=br.ReadString(byte('\n'))iferr!=nil{logrus.WithError(err).Errorf("failed to read line from %q",name)return}e.Log=linee.Time=time.Now().UTC()encMu.Lock()encErr:=enc.Encode(e)encMu.Unlock()ifencErr!=nil{logrus.WithError(err).Errorf("failed to encode JSON")return}}}gof(stdout,"stdout")gof(stderr,"stderr")wg.Wait()returnnil}
// shim.go
packagemainimport("fmt""io""log""os""os/exec")funcmain(){// start log driver
pio,err:=driveIO()iferr!=nil{log.Fatal(err)}// start app
cmd:=exec.Command("./app-example")cmd.Stdout=pio.out.wcmd.Stderr=pio.err.werr=cmd.Start()iferr!=nil{log.Fatal(err)}err=cmd.Wait()iferr!=nil{log.Fatal(err)}}funcnewPipe()(*pipe,error){r,w,err:=os.Pipe()iferr!=nil{returnnil,err}return&pipe{r:r,w:w,},nil}typepipestruct{r*os.Filew*os.File}typebinaryIOstruct{cmd*exec.Cmdout,err*pipe}func(p*pipe)Close()error{iferr:=p.w.Close();err!=nil{}iferr:=p.r.Close();err!=nil{}returnfmt.Errorf("pipe close error")}funcdriveIO()(_*binaryIO,errerror){varclosers[]func()error// app out pipe
out,err:=newPipe()iferr!=nil{returnnil,err}closers=append(closers,out.Close)// app err pipe
serr,err:=newPipe()iferr!=nil{returnnil,err}closers=append(closers,serr.Close)// drive ready pipe
r,w,err:=os.Pipe()iferr!=nil{returnnil,err}closers=append(closers,r.Close,w.Close)cmd:=exec.Command("./drive-example")cmd.Stdout=os.Stdoutcmd.Stderr=os.Stderrcmd.ExtraFiles=append(cmd.ExtraFiles,out.r,serr.r,w)iferr:=cmd.Start();err!=nil{returnnil,err}closers=append(closers,func()error{returncmd.Process.Kill()})// close our side of the pipe after start
iferr:=w.Close();err!=nil{returnnil,fmt.Errorf("failed to close write pipe after start: %w",err)}// wait for the logging binary to be ready
b:=make([]byte,1)if_,err:=r.Read(b);err!=nil&&err!=io.EOF{returnnil,fmt.Errorf("failed to read from logging binary: %w",err)}return&binaryIO{cmd:cmd,out:out,err:serr,},nil}
模拟 nerdctl run -d --name runcdev1 q946666800/runcdev 的日志收集过程。