// NewStore returns a Store implemented simply with a map and a lock.
funcNewStore(keyFuncKeyFunc)Store{return&cache{cacheStorage:NewThreadSafeStore(Indexers{},Indices{}),keyFunc:keyFunc,}}// NewIndexer returns an Indexer implemented simply with a map and a lock.
funcNewIndexer(keyFuncKeyFunc,indexersIndexers)Indexer{return&cache{cacheStorage:NewThreadSafeStore(indexers,Indices{}),keyFunc:keyFunc,}}
// Add inserts an item into the cache.
func(c*cache)Add(objinterface{})error{key,err:=c.keyFunc(obj)iferr!=nil{returnKeyError{obj,err}}c.cacheStorage.Add(key,obj)returnnil}
// Update sets an item in the cache to its updated state.
func(c*cache)Update(objinterface{})error{key,err:=c.keyFunc(obj)iferr!=nil{returnKeyError{obj,err}}c.cacheStorage.Update(key,obj)returnnil}
delete()
1
2
3
4
5
6
7
8
9
// Delete removes an item from the cache.
func(c*cache)Delete(objinterface{})error{key,err:=c.keyFunc(obj)iferr!=nil{returnKeyError{obj,err}}c.cacheStorage.Delete(key)returnnil}
Store & Indexer 对于replace的处理比较简单, 直接替换 items, 然后更新索引.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Replace will delete the contents of 'c', using instead the given list.
// 'c' takes ownership of the list, you should not reference the list again
// after calling this function.
func(c*cache)Replace(list[]interface{},resourceVersionstring)error{items:=make(map[string]interface{},len(list))for_,item:=rangelist{key,err:=c.keyFunc(item)iferr!=nil{returnKeyError{item,err}}items[key]=item}c.cacheStorage.Replace(items,resourceVersion)returnnil}
1
2
3
4
5
6
7
8
9
10
11
func(c*threadSafeMap)Replace(itemsmap[string]interface{},resourceVersionstring){c.lock.Lock()deferc.lock.Unlock()c.items=items// rebuild any index
c.index.reset()forkey,item:=rangec.items{c.index.updateIndices(nil,item,key)}}
Resync()
1
2
3
4
// Resync is meaningless for one of these
func(c*cache)Resync()error{returnnil}
// FIFO is a Queue in which (a) each accumulator is simply the most
// recently provided object and (b) the collection of keys to process
// is a FIFO. The accumulators all start out empty, and deleting an
// object from its accumulator empties the accumulator. The Resync
// operation is a no-op.
//
// Thus: if multiple adds/updates of a single object happen while that
// object's key is in the queue before it has been processed then it
// will only be processed once, and when it is processed the most
// recent version will be processed. This can't be done with a channel
//
// FIFO solves this use case:
// - You want to process every object (exactly) once.
// - You want to process the most recent version of the object when you process it.
// - You do not want to process deleted objects, they should be removed from the queue.
// - You do not want to periodically reprocess objects.
//
// Compare with DeltaFIFO for other use cases.
typeFIFOstruct{locksync.RWMutexcondsync.Cond// We depend on the property that every key in `items` is also in `queue`
itemsmap[string]interface{}queue[]string// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populatedbool// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCountint// keyFunc is used to make the key used for queued item insertion and retrieval, and
// should be deterministic.
keyFuncKeyFunc// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRUD operations.
closedbool}
构造函数:
1
2
3
4
5
6
7
8
9
10
11
// NewFIFO returns a Store which can be used to queue up items to
// process.
funcNewFIFO(keyFuncKeyFunc)*FIFO{f:=&FIFO{items:map[string]interface{}{},queue:[]string{},keyFunc:keyFunc,}f.cond.L=&f.lockreturnf}
// DeltaFIFO is a producer-consumer queue, where a Reflector is
// intended to be the producer, and the consumer is whatever calls
// the Pop() method.
// 将 DeltaFIFO 当成一个生产-消费队列
typeDeltaFIFOstruct{locksync.RWMutexcondsync.Conditemsmap[string]Deltasqueue[]stringpopulatedboolinitialPopulationCountintkeyFuncKeyFuncknownObjectsKeyListerGetterclosedboolemitDeltaTypeReplacedbool}
// Replace atomically does two things: (1) it adds the given objects
// using the Sync or Replace DeltaType and then (2) it does some deletions.
// 1, 为给定的 objects 添加一个 Sync / Replace 的Delta数据
// 2, 添加一些 Deleted 数据
//
// In particular: for every pre-existing key K that is not the key of
// an object in `list` there is the effect of
// `Delete(DeletedFinalStateUnknown{K, O})` where O is current object
// of K.
// 对于所有存在于 list 中,但不存在于 pre-existing 中的Key K,
// 添加一个 Deleted 的 DeletedFinalStateUnknown 数据。
// 对于 pre-existing 和评定,和 K 对应 Object,在下面有详细说明
//
// If `f.knownObjects == nil` then the pre-existing keys are
// those in `f.items` and the current object of K is the `.Newest()`
// of the Deltas associated with K. Otherwise the pre-existing keys
// are those listed by `f.knownObjects` and the current object of K is
// what `f.knownObjects.GetByKey(K)` returns.
// 1, 当 f.knownObjects == nil
// pre-existing 为 f.items, 对应的 object 为 Newest()(即Deltas数组中的最后一个)
// 2, 当 f.knownObjects != nil,
// pre-existing 为 f.knownObjects.ListKeys(), 对应的 object 为 f.knownObjects.GetByKey(k)
func(f*DeltaFIFO)Replace(list[]interface{},_string)error{f.lock.Lock()deferf.lock.Unlock()keys:=make(sets.String,len(list))// keep backwards compat for old clients
action:=Synciff.emitDeltaTypeReplaced{action=Replaced}// Add Sync/Replaced action for each new item.
for_,item:=rangelist{key,err:=f.KeyOf(item)iferr!=nil{returnKeyError{item,err}}keys.Insert(key)iferr:=f.queueActionLocked(action,item);err!=nil{returnfmt.Errorf("couldn't enqueue object: %v",err)}}iff.knownObjects==nil{// Do deletion detection against our own list.
queuedDeletions:=0fork,oldItem:=rangef.items{ifkeys.Has(k){continue}// Delete pre-existing items not in the new list.
// This could happen if watch deletion event was missed while
// disconnected from apiserver.
vardeletedObjinterface{}ifn:=oldItem.Newest();n!=nil{deletedObj=n.Object}queuedDeletions++iferr:=f.queueActionLocked(Deleted,DeletedFinalStateUnknown{k,deletedObj});err!=nil{returnerr}}if!f.populated{f.populated=true// While there shouldn't be any queued deletions in the initial
// population of the queue, it's better to be on the safe side.
f.initialPopulationCount=keys.Len()+queuedDeletions}returnnil}// Detect deletions not already in the queue.
knownKeys:=f.knownObjects.ListKeys()queuedDeletions:=0for_,k:=rangeknownKeys{ifkeys.Has(k){continue}deletedObj,exists,err:=f.knownObjects.GetByKey(k)iferr!=nil{deletedObj=nilklog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object",err,k)}elseif!exists{deletedObj=nilklog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object",k)}queuedDeletions++iferr:=f.queueActionLocked(Deleted,DeletedFinalStateUnknown{k,deletedObj});err!=nil{returnerr}}if!f.populated{f.populated=truef.initialPopulationCount=keys.Len()+queuedDeletions}returnnil}
Replace() 的逻辑在注释中已经说的非常详细。
Resync()
Resync() 可以视为生产者,生产类型为 Sync 的 Delta 数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Resync adds, with a Sync type of Delta, every object listed by
// `f.knownObjects` whose key is not already queued for processing.
// If `f.knownObjects` is `nil` then Resync does nothing.
func(f*DeltaFIFO)Resync()error{f.lock.Lock()deferf.lock.Unlock()iff.knownObjects==nil{returnnil}keys:=f.knownObjects.ListKeys()for_,k:=rangekeys{iferr:=f.syncKeyLocked(k);err!=nil{returnerr}}returnnil}
func(f*DeltaFIFO)syncKeyLocked(keystring)error{obj,exists,err:=f.knownObjects.GetByKey(key)iferr!=nil{klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync",err,key)returnnil}elseif!exists{klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync",key)returnnil}// If we are doing Resync() and there is already an event queued for that object,
// we ignore the Resync for it. This is to avoid the race, in which the resync
// comes with the previous value of object (since queueing an event for the object
// doesn't trigger changing the underlying store <knownObjects>.
// 对于 f.items 中已经存在的 Object,不添加 Sync Delta数据。
id,err:=f.KeyOf(obj)iferr!=nil{returnKeyError{obj,err}}iflen(f.items[id])>0{returnnil}iferr:=f.queueActionLocked(Sync,obj);err!=nil{returnfmt.Errorf("couldn't queue object: %v",err)}returnnil}
// Pop blocks until the queue has some items, and then returns one. If
// multiple items are ready, they are returned in the order in which they were
// added/updated. The item is removed from the queue (and the store) before it
// is returned, so if you don't successfully process it, you need to add it back
// with AddIfNotPresent().
// process function is called under lock, so it is safe to update data structures
// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
// may return an instance of ErrRequeue with a nested error to indicate the current
// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
// process should avoid expensive I/O operation so that other queue operations, i.e.
// Add() and Get(), won't be blocked for too long.
// process 函数是在锁定的情况下调用的,因此可以安全地更新其中需要与队列同步的数据结构(例如 knownKeys)。
// PopProcessFunc 可能会返回一个带有嵌套错误的 ErrRequeue 实例,
// 以指示当前项目应该重新排队(相当于在锁下调用 AddIfNotPresent)。
// process 应避免昂贵的 IO 操作,以便其他队列操作,即 Add() 和 Get() 不会被阻塞太久。
//
// Pop returns a 'Deltas', which has a complete list of all the things
// that happened to the object (deltas) while it was sitting in the queue.
// Pop 返回一个“Deltas”,其中包含对象(deltas)在队列中时发生的所有事情的完整列表。
func(f*DeltaFIFO)Pop(processPopProcessFunc)(interface{},error){f.lock.Lock()deferf.lock.Unlock()for{forlen(f.queue)==0{// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
iff.closed{returnnil,ErrFIFOClosed}f.cond.Wait()}isInInitialList:=!f.hasSynced_locked()id:=f.queue[0]f.queue=f.queue[1:]depth:=len(f.queue)iff.initialPopulationCount>0{f.initialPopulationCount--}item,ok:=f.items[id]if!ok{// This should never happen
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.",id)continue}delete(f.items,id)// Only log traces if the queue depth is greater than 10 and it takes more than
// 100 milliseconds to process one item from the queue.
// Queue depth never goes high because processing an item is locking the queue,
// and new items can't be added until processing finish.
// https://github.com/kubernetes/kubernetes/issues/103789
ifdepth>10{trace:=utiltrace.New("DeltaFIFO Pop Process",utiltrace.Field{Key:"ID",Value:id},utiltrace.Field{Key:"Depth",Value:depth},utiltrace.Field{Key:"Reason",Value:"slow event handlers blocking the queue"})defertrace.LogIfLong(100*time.Millisecond)}err:=process(item,isInInitialList)ife,ok:=err.(ErrRequeue);ok{f.addIfNotPresent(id,item)err=e.Err}// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
returnitem,err}}