目录

client-go解析(4) - cache.Controller

概述

本文中提到的 reflector, 特指 informer 中的 reflector, 即传入的 cache.Store 为 DeltaFIFO。

接口

cache.Controller

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// Controller is a low-level controller that is parameterized by a
// Config and used in sharedIndexInformer.
type Controller interface {
   // Run does two things.  One is to construct and run a Reflector
   // to pump objects/notifications from the Config's ListerWatcher
   // to the Config's Queue and possibly invoke the occasional Resync
   // on that Queue.  The other is to repeatedly Pop from the Queue
   // and process with the Config's ProcessFunc.  Both of these
   // continue until `stopCh` is closed.
   // 1, 构建并启动Reflector
   // 2, Pop from the Queue and process with the Config's ProcessFunc
   Run(stopCh <-chan struct{})

   // HasSynced delegates to the Config's Queue
   HasSynced() bool

   // LastSyncResourceVersion delegates to the Reflector when there
   // is one, otherwise returns the empty string
   LastSyncResourceVersion() string
}
1
2
3
4
5
6
7
// `*controller` implements Controller
type controller struct {
   config         Config
   reflector      *Reflector
   reflectorMutex sync.RWMutex
   clock          clock.Clock
}

Run()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
   defer utilruntime.HandleCrash()
   go func() {
      <-stopCh
      c.config.Queue.Close()
   }()
   // 构建Reflector
   r := NewReflectorWithOptions(
      c.config.ListerWatcher,
      c.config.ObjectType,
      c.config.Queue,
      ReflectorOptions{
         ResyncPeriod:    c.config.FullResyncPeriod,
         TypeDescription: c.config.ObjectDescription,
      },
   )
   r.ShouldResync = c.config.ShouldResync
   r.WatchListPageSize = c.config.WatchListPageSize
   r.clock = c.clock
   if c.config.WatchErrorHandler != nil {
      r.watchErrorHandler = c.config.WatchErrorHandler
   }

   c.reflectorMutex.Lock()
   c.reflector = r
   c.reflectorMutex.Unlock()

   var wg wait.Group
   // run reflector
   wg.StartWithChannel(stopCh, r.Run)
   // 启动消费者
   wait.Until(c.processLoop, time.Second, stopCh)
   wg.Wait()
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (c *controller) processLoop() {
   for {
      obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
      if err != nil {
         if err == ErrFIFOClosed {
            return
         }
         if c.config.RetryOnError {
            // This is the safe way to re-enqueue.
            c.config.Queue.AddIfNotPresent(obj)
         }
      }
   }
}

可以看到, reflector 在 processLoop() 中进行消费,最终交给 c.config.Process 处理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func newInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
	clientState Store,
	transformer TransformFunc,
) Controller {
	// This will hold incoming changes. Note how we pass clientState in as a
	// KeyLister, that way resync operations will result in the correct set
	// of update/delete deltas.
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          clientState,
		EmitDeltaTypeReplaced: true,
	})

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    lw,
		ObjectType:       objType,
		FullResyncPeriod: resyncPeriod,
		RetryOnError:     false,

		Process: func(obj interface{}, isInInitialList bool) error {
			if deltas, ok := obj.(Deltas); ok {
                // processDeltas
				return processDeltas(h, clientState, transformer, deltas, isInInitialList)
			}
			return errors.New("object given as Process argument is not Deltas")
		},
	}
	return New(cfg)
}

processDeltas

processDeltas 是一个比较重要的函数,它更像是一个处理 Deltas 数据的通用模板,在后面的 sharedIndexInformer 也使用到

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// Multiplexes updates in the form of a list of Deltas into a Store, and informs
// a given handler of events OnUpdate, OnAdd, OnDelete
func processDeltas(
   // Object which receives event notifications from the given deltas
   handler ResourceEventHandler,
   clientState Store,
   transformer TransformFunc,
   deltas Deltas,
   isInInitialList bool,
) error {
   // from oldest to newest
   for _, d := range deltas {
      obj := d.Object
      if transformer != nil {
         var err error
         obj, err = transformer(obj)
         if err != nil {
            return err
         }
      }

      // 将前面的多种史类型,如 Sync, Replaced, Added, Updated, Deleted 等,
      // 简化成 3 种事件,OnUpdate,OnAdd,OnDelete
      // 也就是 曾删改
      switch d.Type {
      case Sync, Replaced, Added, Updated:
         if old, exists, err := clientState.Get(obj); err == nil && exists {
            if err := clientState.Update(obj); err != nil {
               return err
            }
            handler.OnUpdate(old, obj)
         } else {
            if err := clientState.Add(obj); err != nil {
               return err
            }
            handler.OnAdd(obj, isInInitialList)
         }
      case Deleted:
         if err := clientState.Delete(obj); err != nil {
            return err
         }
         handler.OnDelete(obj)
      }
   }
   return nil
}

无论是什么类型的 Delta ,都是先更新 clientState(也就是DeltaFIFO中的knownObjects). 然后再交给 handler 去处理。

1
2
3
4
5
type ResourceEventHandler interface {
   OnAdd(obj interface{})
   OnUpdate(oldObj, newObj interface{})
   OnDelete(obj interface{})
}

小结

cache.controller 逻辑比较简单。

在 Run 方法中初始化并启动 Reflector,并启动消费者进行消费。

具体的消费方法: cache.Config.Process