概述
在 containerd 的 event 中,主要用到了 go-events 这个包。
为 Go 实现一个可组合的事件分发包。
初始化
在containerd启动过程中,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
// New creates and initializes a new containerd server
func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
...
...
var events = exchange.NewExchange()
...
...
for _, p := range plugins {
...
initContext := plugin.NewContext(
ctx,
p,
initialized,
config.Root,
config.State,
)
initContext.Events = events
initContext.Address = config.GRPC.Address
initContext.TTRPCAddress = config.TTRPC.Address
...
...
}
}
|
使用 exchange.NewExchange()
初始化,并将它传递个各个plugin
,用于处理事件。
containerd
中的events处理逻辑与shim
中的Monitor逻辑类似,在需要订阅的地方调用Subscribe
,然后channel
,channel
会接受到对应的Envelope
(也就是相关事件)。
Subscribe
在cri
插件中的应用比较多。
containerd中的事件分发还与grpc进行了结合,可以通过grpc接口进行事件的订阅。
grpc接口的定义:
1
2
3
4
5
6
7
8
|
service Events {
rpc Publish(PublishRequest) returns (google.protobuf.Empty);
rpc Forward(ForwardRequest) returns (google.protobuf.Empty);
rpc Subscribe(SubscribeRequest) returns (stream Envelope);
}
|
服务端实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func (s *service) Subscribe(req *api.SubscribeRequest, srv api.Events_SubscribeServer) error {
ctx, cancel := context.WithCancel(srv.Context())
defer cancel()
eventq, errq := s.events.Subscribe(ctx, req.Filters...)
for {
select {
case ev := <-eventq:
if err := srv.Send(toProto(ev)); err != nil {
return fmt.Errorf("failed sending event to subscriber: %w", err)
}
case err := <-errq:
if err != nil {
return fmt.Errorf("subscription error: %w", err)
}
return nil
}
}
}
|