目录

Containerd解析(7) - log

概述

容器日志处理,其实就是处理runc程序的标准输出,标准错误。

这里我们分析 containerd-shim-runc-v2runc

查看 shim 调用runc的代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
containerd/pkg/process/init.go

// Create the process with the provided config
func (p *Init) Create(ctx context.Context, r *CreateConfig) error {
   var (
      err     error
      socket  *runc.Socket
      pio     *processIO
      pidFile = newPidFile(p.Bundle)
   )

   // 是否为 Terminal 模式,也就是 -t 模式
   if r.Terminal {
      if socket, err = runc.NewTempConsoleSocket(); err != nil {
         return fmt.Errorf("failed to create OCI runtime console socket: %w", err)
      }
      defer socket.Close()
   } else {
      if pio, err = createIO(ctx, p.id, p.IoUID, p.IoGID, p.stdio); err != nil {
         return fmt.Errorf("failed to create init process I/O: %w", err)
      }
      // 除了 -t 模式,其他方式都在抽象成pio
      p.io = pio
   }
    
   if r.Checkpoint != "" {
      return p.createCheckpointedState(r, pidFile)
   }
   opts := &runc.CreateOpts{
      PidFile:      pidFile.Path(),
      NoPivot:      p.NoPivotRoot,
      NoNewKeyring: p.NoNewKeyring,
   }
   // pio赋值
   if p.io != nil {
      opts.IO = p.io.IO()
   }
   // socket 赋值
   if socket != nil {
      opts.ConsoleSocket = socket
   }
    
   // 前面把处理日志的方式抽象成 socket 或 pio
    // 在 p.runtime.Create 里面将 socket 或 pio 与 runc(容器进程)关联起来
   if err := p.runtime.Create(ctx, r.ID, r.Bundle, opts); err != nil {
      return p.runtimeError(err, "OCI runtime create failed")
   }
   ...
   ..
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Create creates a new container and returns its pid if it was created successfully
func (r *Runc) Create(context context.Context, id, bundle string, opts *CreateOpts) error {
   args := []string{"create", "--bundle", bundle}
   if opts != nil {
      oargs, err := opts.args()
      if err != nil {
         return err
      }
      args = append(args, oargs...)
   }
   cmd := r.command(context, append(args, id)...)
   // shim 处理 runc 的输入输出,具体看下面
   // nerdctl 的-d 模式,使用binaryIO
   // nerdctl 的 -it 模式,使用 fifo
   // docker 的所有模式,都使用fifo
   if opts != nil && opts.IO != nil {
      opts.Set(cmd)
   }
   cmd.ExtraFiles = opts.ExtraFiles

   if cmd.Stdout == nil && cmd.Stderr == nil {
      data, err := cmdOutput(cmd, true, nil)
      defer putBuf(data)
      if err != nil {
         return fmt.Errorf("%s: %s", err, data.String())
      }
      return nil
   }
   ec, err := Monitor.Start(cmd)
   if err != nil {
      return err
   }
   if opts != nil && opts.IO != nil {
      if c, ok := opts.IO.(StartCloser); ok {
         if err := c.CloseAfterStart(); err != nil {
            return err
         }
      }
   }
   status, err := Monitor.Wait(cmd, ec)
   if err == nil && status != 0 {
      err = fmt.Errorf("%s did not terminate successfully: %w", cmd.Args[0], &ExitError{status})
   }
   return err
}

socket 传递到runc的 --console-socket参数

这里查看下pio即几种处理方式

fifo

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// Set sets the io to the exec.Cmd
func (i *pipeIO) Set(cmd *exec.Cmd) {
	if i.in != nil {
		cmd.Stdin = i.in.r
	}
	if i.out != nil {
		cmd.Stdout = i.out.w
	}
	if i.err != nil {
		cmd.Stderr = i.err.w
	}
}

binary

1
2
3
4
5
6
7
8
func (b *binaryIO) Set(cmd *exec.Cmd) {
	if b.out != nil {
		cmd.Stdout = b.out.w
	}
	if b.err != nil {
		cmd.Stderr = b.err.w
	}
}

stdio

1
2
3
4
5
func (s *stdio) Set(cmd *exec.Cmd) {
	cmd.Stdin = os.Stdin
	cmd.Stdout = os.Stdout
	cmd.Stderr = os.Stderr
}

nerdctl 中的日志处理

分析两种常用的情况。

nerdctl run -t –name runcdev1 q946666800/runcdev

runc启动命令:

https://raw.githubusercontent.com/yzxiu/images/blog/2022-11/20221116-152107.png
runc启动命令

查看config.json配置

https://raw.githubusercontent.com/yzxiu/images/blog/2022-11/20221116-152355.png
terminal: true

可知该模式为 New Terminal & Detached ,需要外部传入 --console-socket

从runc启动参数可以看到

1
2
12 = {string} "--console-socket"
13 = {string} "/run/user/1000/pty4072274856/pty.sock"

查看如何创建并监听 /run/user/1000/pty4072274856/pty.sock

查看代码,可以看到在shim的 runc.NewContainer() -> p.Create() 代码中,创建了 pty.sock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
if r.Terminal {
	if socket, err = runc.NewTempConsoleSocket(); err != nil {
		return fmt.Errorf("failed to create OCI runtime console socket: %w", err)
	}
	defer socket.Close()
} else {
	if pio, err = createIO(ctx, p.id, p.IoUID, p.IoGID, p.stdio); err != nil {
		return fmt.Errorf("failed to create init process I/O: %w", err)
	}
	p.io = pio
}

NewTempConsoleSocket()创建了 pty.sock,并将sock的路径参数传递给runc。

接下来查看 shim 创建的pty.sock怎样与nerdctl关联起来???

nerdctl:

1
2
3
4
5
6
7
8
9
// 
var con console.Console
if flagT {
   con = console.Current()
   defer con.Reset()
   if err := con.SetRaw(); err != nil {
      return err
   }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// nerdctl/pkg/taskutil/taskutil.go
// 注意, cio.WithStreams(in, con, nil)这里传入了 in,con
// con是nertctl进程本身的终端。
ioCreator = cio.NewCreator(cio.WithStreams(in, con, nil), cio.WithTerminal)


// WithTerminal sets the terminal option
func WithTerminal(opt *Streams) {
	opt.Terminal = true
}
// WithStreams sets the stream options to the specified Reader and Writers
func WithStreams(stdin io.Reader, stdout, stderr io.Writer) Opt {
	return func(opt *Streams) {
		opt.Stdin = stdin
		opt.Stdout = stdout
		opt.Stderr = stderr
	}
}

// NewCreator returns an IO creator from the options
func NewCreator(opts ...Opt) Creator {
	streams := &Streams{}
	for _, opt := range opts {
		opt(streams)
	}
	if streams.FIFODir == "" {
		streams.FIFODir = defaults.DefaultFIFODir
	}
    // 所以,在这里的时候
	return func(id string) (IO, error) {
        // 将 stdio 转换为字符串路径
		fifos, err := NewFIFOSetInDir(streams.FIFODir, id, streams.Terminal)
		if err != nil {
			return nil, err
		}
		if streams.Stdin == nil {
			fifos.Stdin = ""
		}
		if streams.Stdout == nil {
			fifos.Stdout = ""
		}
		if streams.Stderr == nil {
			fifos.Stderr = ""
		}
		return copyIO(fifos, streams)
	}
}

最终在 copyIO 中,会创建 /run/containerd/fifo/392418153 路径下创建相关sock,并与nerdctl终端相关联,如下:

1
2
3
4
root@xiu-desktop:/run/containerd/fifo/7416553# tree
.
├── 2e8e9f34cedc5bcca69af8e0e98afae4452463a3b5094b03b71a970f9d88eefa-stdin
└── 2e8e9f34cedc5bcca69af8e0e98afae4452463a3b5094b03b71a970f9d88eefa-stdout

随后,nerdctl将这两个路径传递给containerd,containerd传递给shim。

shim:

在shim中,接收到的请求如下:

https://raw.githubusercontent.com/yzxiu/images/blog/2022-11/20221116-210238.png

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Create the process with the provided config
func (p *Init) Create(ctx context.Context, r *CreateConfig) error {
	var (
		err     error
		socket  *runc.Socket
		pio     *processIO
		pidFile = newPidFile(p.Bundle)
	)

	if r.Terminal {
		if socket, err = runc.NewTempConsoleSocket(); err != nil {
			return fmt.Errorf("failed to create OCI runtime console socket: %w", err)
		}
		defer socket.Close()
	} else {
		...
	}
	...
	if socket != nil {
		opts.ConsoleSocket = socket
	}
	if err := p.runtime.Create(ctx, r.ID, r.Bundle, opts); err != nil {
		return p.runtimeError(err, "OCI runtime create failed")
	}
	...
    ...
	if socket != nil {
		console, err := socket.ReceiveMaster()
		if err != nil {
			return fmt.Errorf("failed to retrieve console master: %w", err)
		}
		console, err = p.Platform.CopyConsole(ctx, console, p.id, r.Stdin, r.Stdout, r.Stderr, &p.wg)
		if err != nil {
			return fmt.Errorf("failed to start console copy: %w", err)
		}
		p.console = console
	}
    ...
    ...
	return nil
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, id, stdin, stdout, stderr string, wg *sync.WaitGroup) (cons console.Console, retErr error) {
   if p.epoller == nil {
      return nil, errors.New("uninitialized epoller")
   }

   epollConsole, err := p.epoller.Add(console)
   if err != nil {
      return nil, err
   }

   var cwg sync.WaitGroup
   if stdin != "" {
      in, err := fifo.OpenFifo(context.Background(), stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
      if err != nil {
         return nil, err
      }
      cwg.Add(1)
      go func() {
         cwg.Done()
         bp := bufPool.Get().(*[]byte)
         defer bufPool.Put(bp)
         io.CopyBuffer(epollConsole, in, *bp)
         // we need to shutdown epollConsole when pipe broken
         epollConsole.Shutdown(p.epoller.CloseConsole)
         epollConsole.Close()
      }()
   }

   uri, err := url.Parse(stdout)
   if err != nil {
      return nil, fmt.Errorf("unable to parse stdout uri: %w", err)
   }

   // uri.Scheme = ""
   switch uri.Scheme {
   case "binary":
      ...
      ...

   default:
      // 
      outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0)
      if err != nil {
         return nil, err
      }
      outr, err := fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY, 0)
      if err != nil {
         return nil, err
      }
      wg.Add(1)
      cwg.Add(1)
      go func() {
         cwg.Done()
         buf := bufPool.Get().(*[]byte)
         defer bufPool.Put(buf)
         io.CopyBuffer(outw, epollConsole, *buf)

         outw.Close()
         outr.Close()
         wg.Done()
      }()
      cwg.Wait()
   }

   return epollConsole, nil
}

在shim中,通过 传递进来的

https://raw.githubusercontent.com/yzxiu/images/blog/2022-11/20221116-210507.png

与创建socket相关联,将容器的 console 传输至 nerdctl 进程 console。

所以,使用 -it 运行容器时,可以与容器进程直接进行交互。


nerdctl run -d –name runcdev1 q946666800/runcdev

runc启动方式为:Pass-Through & Detached

首先,查看shim,create的请求参数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Create a new initial process and container with the underlying OCI runtime
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
	s.mu.Lock()
	defer s.mu.Unlock()

	container, err := runc.NewContainer(ctx, s.platform, r)
	if err != nil {
		return nil, err
	}

	s.containers[r.ID] = container

	s.send(&eventstypes.TaskCreate{
		ContainerID: r.ID,
		Bundle:      r.Bundle,
		Rootfs:      r.Rootfs,
		IO: &eventstypes.TaskIO{
			Stdin:    r.Stdin,
			Stdout:   r.Stdout,
			Stderr:   r.Stderr,
			Terminal: r.Terminal,
		},
		Checkpoint: r.Checkpoint,
		Pid:        uint32(container.Pid()),
	})

	return &taskAPI.CreateTaskResponse{
		Pid: uint32(container.Pid()),
	}, nil
}

https://raw.githubusercontent.com/yzxiu/images/blog/2022-11/20221116-211251.png

信息

注意,这里的 Stdio 与上面的区别。

-it 模式中,有Stdin, Stdout,没有Stderr,是因为我们使用终端模式运行,需要输入,并且Stdout和Stderr会统一输出到终端,所以合并在Stdout中

-d模式中,因为进程是后台运行,所以不需要Stdin,Stdout和Stderr可以分开传输。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
if r.Terminal {
   if socket, err = runc.NewTempConsoleSocket(); err != nil {
      return fmt.Errorf("failed to create OCI runtime console socket: %w", err)
   }
   defer socket.Close()
} else {
   if pio, err = createIO(ctx, p.id, p.IoUID, p.IoGID, p.stdio); err != nil {
      return fmt.Errorf("failed to create init process I/O: %w", err)
   }
   p.io = pio
}

这回执行的是 createIO

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func createIO(ctx context.Context, id string, ioUID, ioGID int, stdio stdio.Stdio) (*processIO, error) {
	pio := &processIO{
		stdio: stdio,
	}
	if stdio.IsNull() {
		i, err := runc.NewNullIO()
		if err != nil {
			return nil, err
		}
		pio.io = i
		return pio, nil
	}
	u, err := url.Parse(stdio.Stdout)
	if err != nil {
		return nil, fmt.Errorf("unable to parse stdout uri: %w", err)
	}
	if u.Scheme == "" {
		u.Scheme = "fifo"
	}
	pio.uri = u
    // u.scheme = "binary"
	switch u.Scheme {
	case "fifo":
		pio.copy = true
		pio.io, err = runc.NewPipeIO(ioUID, ioGID, withConditionalIO(stdio))
	case "binary":
		pio.io, err = NewBinaryIO(ctx, id, u)
	case "file":
		filePath := u.Path
		if err := os.MkdirAll(filepath.Dir(filePath), 0755); err != nil {
			return nil, err
		}
		var f *os.File
		f, err = os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
		if err != nil {
			return nil, err
		}
		f.Close()
		pio.stdio.Stdout = filePath
		pio.stdio.Stderr = filePath
		pio.copy = true
		pio.io, err = runc.NewPipeIO(ioUID, ioGID, withConditionalIO(stdio))
	default:
		return nil, fmt.Errorf("unknown STDIO scheme %s", u.Scheme)
	}
	if err != nil {
		return nil, err
	}
	return pio, nil
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// NewBinaryIO runs a custom binary process for pluggable shim logging
func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ runc.IO, err error) {
   ns, err := namespaces.NamespaceRequired(ctx)
   if err != nil {
      return nil, err
   }

   var closers []func() error
   defer func() {
      if err == nil {
         return
      }
      result := multierror.Append(err)
      for _, fn := range closers {
         result = multierror.Append(result, fn())
      }
      err = multierror.Flatten(result)
   }()

   out, err := newPipe()
   if err != nil {
      return nil, fmt.Errorf("failed to create stdout pipes: %w", err)
   }
   closers = append(closers, out.Close)

   serr, err := newPipe()
   if err != nil {
      return nil, fmt.Errorf("failed to create stderr pipes: %w", err)
   }
   closers = append(closers, serr.Close)

   r, w, err := os.Pipe()
   if err != nil {
      return nil, err
   }
   closers = append(closers, r.Close, w.Close)

   // 1, 配置日志驱动子进程
   cmd := NewBinaryCmd(uri, id, ns)
   // 2, 配置子进程额外文件描述符
   cmd.ExtraFiles = append(cmd.ExtraFiles, out.r, serr.r, w)
   // don't need to register this with the reaper or wait when
   // running inside a shim
   if err := cmd.Start(); err != nil {
      return nil, fmt.Errorf("failed to start binary process: %w", err)
   }
   closers = append(closers, func() error { return cmd.Process.Kill() })

   // close our side of the pipe after start
   if err := w.Close(); err != nil {
      return nil, fmt.Errorf("failed to close write pipe after start: %w", err)
   }

   // wait for the logging binary to be ready
   b := make([]byte, 1)
   if _, err := r.Read(b); err != nil && err != io.EOF {
      return nil, fmt.Errorf("failed to read from logging binary: %w", err)
   }

   return &binaryIO{
      cmd: cmd,
      out: out,
      err: serr,
   }, nil
}

日志驱动cmd信息如下:

https://raw.githubusercontent.com/yzxiu/images/blog/2022-11/20221116-214344.png

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
if opts != nil && opts.IO != nil {
	opts.Set(cmd)
}

// 将上面的文件描述符,传递给容器进程
func (b *binaryIO) Set(cmd *exec.Cmd) {
	if b.out != nil {
		cmd.Stdout = b.out.w
	}
	if b.err != nil {
		cmd.Stderr = b.err.w
	}
}

查看日志驱动,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// nerdctl/pkg/logging/logging.go
func Main(argv2 string) error {
	fn, err := getLoggerFunc(argv2)
	if err != nil {
		return err
	}
    
	logging.Run(fn)
	return nil
}

所谓日志驱动,就是编写处理日志的fn函数,然后传递给 logging.Run

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// github.com/containerd/containerd@v1.7.0-beta.0/runtime/v2/logging/logging_unix.go
config := &Config{
	ID:        os.Getenv("CONTAINER_ID"),
	Namespace: os.Getenv("CONTAINER_NAMESPACE"),
	Stdout:    os.NewFile(3, "CONTAINER_STDOUT"),
	Stderr:    os.NewFile(4, "CONTAINER_STDERR"),
}
var (
	sigCh = make(chan os.Signal, 32)
	errCh = make(chan error, 1)
	wait  = os.NewFile(5, "CONTAINER_WAIT")
)
signal.Notify(sigCh, unix.SIGTERM)
go func() {
	errCh <- fn(ctx, config, wait.Close)
}()
// 通过文件描述符获取 shim 传递过来的文件句柄。
// 然后将配置传递至 自定义的fn函数。查看fn
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

// nerdctl/pkg/logging/logging.go
func getLoggerFunc(dataStore string) (logging.LoggerFunc, error) {
	if dataStore == "" {
		return nil, errors.New("got empty data store")
	}
	return func(_ context.Context, config *logging.Config, ready func() error) error {
		if config.Namespace == "" || config.ID == "" {
			return errors.New("got invalid config")
		}
		logConfigFilePath := LogConfigFilePath(dataStore, config.Namespace, config.ID)
		if _, err := os.Stat(logConfigFilePath); err == nil {
			logConfig, err := LoadLogConfig(dataStore, config.Namespace, config.ID)
			if err != nil {
				return err
			}
			driver, err := GetDriver(logConfig.Driver, logConfig.Opts)
			if err != nil {
				return err
			}
			if err := ready(); err != nil {
				return err
			}
			return driver.Process(dataStore, config)
		} else if !errors.Is(err, os.ErrNotExist) {
			// the file does not exist if the container was created with nerdctl < 0.20
			return err
		}
		return nil
	}, nil
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (jsonLogger *JSONLogger) Process(dataStore string, config *logging.Config) error {
	var jsonFilePath string
	if logPath, ok := jsonLogger.Opts[LogPath]; ok {
		jsonFilePath = logPath
	} else {
		jsonFilePath = jsonfile.Path(dataStore, config.Namespace, config.ID)
	}
	l := &logrotate.Logger{
		Filename: jsonFilePath,
	}
	//maxSize Defaults to unlimited.
	var capVal int64
	capVal = -1
	if capacity, ok := jsonLogger.Opts[MaxSize]; ok {
		var err error
		capVal, err = units.FromHumanSize(capacity)
		if err != nil {
			return err
		}
		if capVal <= 0 {
			return fmt.Errorf("max-size must be a positive number")
		}
	}
	l.MaxBytes = capVal
	maxFile := 1
	if maxFileString, ok := jsonLogger.Opts[MaxFile]; ok {
		var err error
		maxFile, err = strconv.Atoi(maxFileString)
		if err != nil {
			return err
		}
		if maxFile < 1 {
			return fmt.Errorf("max-file cannot be less than 1")
		}
	}
	// MaxBackups does not include file to write logs to
	l.MaxBackups = maxFile - 1
	return jsonfile.Encode(l, config.Stdout, config.Stderr)
}

最终的日志处理,是使用 jsonfile 框架,将日志保存到 jsonFilePath 路径中。

小结

关于 runcshim 之间的日志处理关系做一个小结:

这里不考虑runc的前台模式(因为containerd不会使用该模式)。

NewTerminal & Detached

shim创建sock文件,传递给runc。

外部驱动再与shim交互

NewTerminal & Detached

shim直接将 runc 的输入输出,与外部驱动交互。

nerdctl-log-example

下面通过简单的代码,模拟 nerdctl run -d --name runcdev1 q946666800/runcdev日志的搜集过程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// app.go
package main

import (
	"fmt"
	"time"
)

func main() {
	i := 5
	for i > 0 {
		fmt.Println(time.Now())
		time.Sleep(time.Second)
		i--
	}
}
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// drive.go
package main

import (
   "bufio"
   "context"
   "encoding/json"
   "fmt"
   "io"
   "os"
   "os/signal"
   "sync"
   "time"

   "github.com/fahedouch/go-logrotate"
   "github.com/sirupsen/logrus"
   "golang.org/x/sys/unix"
)

func main() {

   fmt.Println("log drive start!!!")

   ctx, cancel := context.WithCancel(context.Background())
   defer cancel()

   var (
      sigCh = make(chan os.Signal, 32)
      errCh = make(chan error, 1)
   )
   signal.Notify(sigCh, unix.SIGTERM)

   // cmd.ExtraFiles = append(cmd.ExtraFiles, out.r, serr.r, w)
   out := os.NewFile(3, "CONTAINER_STDOUT")
   serr := os.NewFile(4, "CONTAINER_STDERR")
   wait := os.NewFile(5, "CONTAINER_WAIT")

   go func() {
      errCh <- logger(ctx, out, serr, wait.Close)
   }()

   for {
      select {
      case <-sigCh:
         cancel()
      case err := <-errCh:
         if err != nil {
            fmt.Fprintln(os.Stderr, err)
            os.Exit(1)
         }
         fmt.Println("log drive exit 0")
         os.Exit(0)
      }
   }

}

func logger(_ context.Context, out *os.File, serr *os.File, ready func() error) error {

   // Notify the shim that it is ready
   // call wait.Close
   // r will receive io.EOF error
   if err := ready(); err != nil {
      return err
   }

   // log path
   jsonFilePath := "app.log"
   l := &logrotate.Logger{
      Filename: jsonFilePath,
   }
   return Encode(l, out, serr)
}

// Entry is compatible with Docker "json-file" logs
type Entry struct {
   Log    string    `json:"log,omitempty"`    // line, including "\r\n"
   Stream string    `json:"stream,omitempty"` // "stdout" or "stderr"
   Time   time.Time `json:"time"`             // e.g. "2020-12-11T20:29:41.939902251Z"
}

func Encode(w io.WriteCloser, stdout, stderr io.Reader) error {
   enc := json.NewEncoder(w)
   var encMu sync.Mutex
   var wg sync.WaitGroup
   wg.Add(2)
   f := func(r io.Reader, name string) {
      defer wg.Done()
      br := bufio.NewReader(r)
      e := &Entry{
         Stream: name,
      }
      for {
         line, err := br.ReadString(byte('\n'))
         if err != nil {
            logrus.WithError(err).Errorf("failed to read line from %q", name)
            return
         }
         e.Log = line
         e.Time = time.Now().UTC()
         encMu.Lock()
         encErr := enc.Encode(e)
         encMu.Unlock()
         if encErr != nil {
            logrus.WithError(err).Errorf("failed to encode JSON")
            return
         }
      }
   }
   go f(stdout, "stdout")
   go f(stderr, "stderr")
   wg.Wait()
   return nil
}
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// shim.go
package main

import (
	"fmt"
	"io"
	"log"
	"os"
	"os/exec"
)

func main() {

	// start log driver
	pio, err := driveIO()
	if err != nil {
		log.Fatal(err)
	}

	// start app
	cmd := exec.Command("./app-example")
	cmd.Stdout = pio.out.w
	cmd.Stderr = pio.err.w

	err = cmd.Start()
	if err != nil {
		log.Fatal(err)
	}

	err = cmd.Wait()
	if err != nil {
		log.Fatal(err)
	}
}

func newPipe() (*pipe, error) {
	r, w, err := os.Pipe()
	if err != nil {
		return nil, err
	}
	return &pipe{
		r: r,
		w: w,
	}, nil
}

type pipe struct {
	r *os.File
	w *os.File
}

type binaryIO struct {
	cmd      *exec.Cmd
	out, err *pipe
}

func (p *pipe) Close() error {
	if err := p.w.Close(); err != nil {
	}
	if err := p.r.Close(); err != nil {
	}
	return fmt.Errorf("pipe close error")
}

func driveIO() (_ *binaryIO, err error) {

	var closers []func() error

	// app out pipe
	out, err := newPipe()
	if err != nil {
		return nil, err
	}
	closers = append(closers, out.Close)

	// app err pipe
	serr, err := newPipe()
	if err != nil {
		return nil, err
	}
	closers = append(closers, serr.Close)

	// drive ready pipe
	r, w, err := os.Pipe()
	if err != nil {
		return nil, err
	}
	closers = append(closers, r.Close, w.Close)

	cmd := exec.Command("./drive-example")
	cmd.Stdout = os.Stdout
	cmd.Stderr = os.Stderr

	cmd.ExtraFiles = append(cmd.ExtraFiles, out.r, serr.r, w)

	if err := cmd.Start(); err != nil {
		return nil, err
	}
	closers = append(closers, func() error { return cmd.Process.Kill() })

	// close our side of the pipe after start
	if err := w.Close(); err != nil {
		return nil, fmt.Errorf("failed to close write pipe after start: %w", err)
	}

	// wait for the logging binary to be ready
	b := make([]byte, 1)
	if _, err := r.Read(b); err != nil && err != io.EOF {
		return nil, fmt.Errorf("failed to read from logging binary: %w", err)
	}

	return &binaryIO{
		cmd: cmd,
		out: out,
		err: serr,
	}, nil
}

模拟 nerdctl run -d --name runcdev1 q946666800/runcdev 的日志收集过程。

运行 ./shim-example, 首先启动驱动程序,返回相关pio,

1
2
3
4
pio, err := driveIO()
if err != nil {
   log.Fatal(err)
}

配置应用程序,将stdio传递给上面的pio

1
2
3
4
// start app
cmd := exec.Command("./app-example")
cmd.Stdout = pio.out.w
cmd.Stderr = pio.err.w

启动应用程序,日志驱动将应用程序日志,以 json 的形式,写入到 app.log 文件中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
[xiu-desktop] nerdctl-log-example git:(master) ./shim-example
log drive start!!!
ERRO[0005] failed to read line from "stderr"             error=EOF
ERRO[0005] failed to read line from "stdout"             error=EOF
log drive exit 0
[xiu-desktop] nerdctl-log-example git:(master) cat app.log         
{"log":"2022-11-16 22:32:15.465213291 +0800 CST m=+0.000014187\n","stream":"stdout","time":"2022-11-16T14:32:15.46528788Z"}
{"log":"2022-11-16 22:32:16.466022681 +0800 CST m=+1.000823617\n","stream":"stdout","time":"2022-11-16T14:32:16.466197357Z"}
{"log":"2022-11-16 22:32:17.467090879 +0800 CST m=+2.001891785\n","stream":"stdout","time":"2022-11-16T14:32:17.467138819Z"}
{"log":"2022-11-16 22:32:18.467998254 +0800 CST m=+3.002799190\n","stream":"stdout","time":"2022-11-16T14:32:18.468071711Z"}
{"log":"2022-11-16 22:32:19.468046521 +0800 CST m=+4.002847457\n","stream":"stdout","time":"2022-11-16T14:32:19.468104589Z"}

nerdctl中日志处理方式大致如此。

demo


kubelet中的日志处理

使用命令 k run nginx --image=nginx 创建pod,会启动两个容器,我们只关注业务容器,即nginx容器。