概述
cache.Reflector
可以说是k8s最重要的组件,它串联起k8s的整个流程。
在服务端(apiserver
) ,使用 reflector 向 etcd 获取资源数据。
在连接端(informer
、kubelet
…),使用 reflector 向 apiserver 获取资源数据。
k8s的整个逻辑流程中,所有这些获取资源数据相关的操作,都封装在 reflector 里面,可以看出 reflector 对于理解 k8s 的重要性。
定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
// name identifies this reflector. By default it will be a file:line if possible.
name string
// The name of the type we expect to place in the store. The name
// will be the stringification of expectedGVK if provided, and the
// stringification of expectedType otherwise. It is for display
// only, and should not be used for parsing or comparison.
typeDescription string
// An example object of the type we expect to place in the store.
// Only the type needs to be right, except that when that is
// `unstructured.Unstructured` the object's `"apiVersion"` and
// `"kind"` must also be right.
expectedType reflect.Type
// The GVK of the object we expect to place in the store if unstructured.
expectedGVK *schema.GroupVersionKind
// The destination to sync up with the watch source
store Store
// listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcher
// backoff manages backoff of ListWatch
backoffManager wait.BackoffManager
// initConnBackoffManager manages backoff the initial connection with the Watch call of ListAndWatch.
initConnBackoffManager wait.BackoffManager
// MaxInternalErrorRetryDuration defines how long we should retry internal errors returned by watch.
MaxInternalErrorRetryDuration time.Duration
resyncPeriod time.Duration
// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
ShouldResync func() bool
// clock allows tests to manipulate time
clock clock.Clock
// paginatedResult defines whether pagination should be forced for list calls.
// It is set based on the result of the initial list call.
paginatedResult bool
// lastSyncResourceVersion is the resource version token last
// observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store
lastSyncResourceVersion string
// isLastSyncResourceVersionUnavailable is true if the previous list or watch request with
// lastSyncResourceVersion failed with an "expired" or "too large resource version" error.
isLastSyncResourceVersionUnavailable bool
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
// If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
// (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
// it will turn off pagination to allow serving them from watch cache.
// NOTE: It should be used carefully as paginated lists are always served directly from
// etcd, which is significantly less efficient and may lead to serious performance and
// scalability problems.
WatchListPageSize int64
// Called whenever the ListAndWatch drops the connection with an error.
watchErrorHandler WatchErrorHandler
}
|
使用
看一下 reflector 在k8s中的几处应用
apiserver
apiserver 中,Reflector 使用 watchCache 作为 store,向 etcd 中获取数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
func NewCacherFromConfig(config Config) (*Cacher, error) {
...
...
cacher := &Cacher{
...
...
}
...
...
watchCache := newWatchCache(
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource)
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
...
...
cacher.watchCache = watchCache
cacher.reflector = reflector
...
...
return cacher, nil
}
|
kubelet
kubelet 中,Reflector 使用 UndeltaStore 作为 store,向 apiserver 中获取数据
1
2
3
4
5
6
7
8
9
|
/home/xiu/Github/kubernetes/pkg/kubelet/config/apiserver.go
// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
...
///
r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
go r.Run(wait.NeverStop)
}
|
informer(cache.controller) 中, Reflector 使用 DeltaFIFO 作为 store,向 apiserver 中获取数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
// Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflectorWithOptions(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
ReflectorOptions{
ResyncPeriod: c.config.FullResyncPeriod,
TypeDescription: c.config.ObjectDescription,
},
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
|
Run()
1
2
3
4
5
6
7
8
9
10
11
12
|
// Run repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
}
|
在 wait.BackoffUntil(func(){}, , r.backoffManager, true, stopCh)
调用 ListAndWatch
.
先了解一下 BackoffUntil
的运行规则:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
func TestBackoffUntil(t *testing.T) {
stopCh := make(chan struct{})
realClock := &clock.RealClock{}
backoffManager := NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock)
i := 1
t1 := time.Now()
BackoffUntil(func() {
t.Logf("第 %d 次运行, 时间差: %v", i, time.Now().Sub(t1))
time.Sleep(time.Second)
t1 = time.Now()
i++
return
}, backoffManager, true, stopCh)
}
=== RUN TestBackoffUntil
wait_test.go:41: 第 1 次运行, 时间差: 185ns
wait_test.go:41: 第 2 次运行, 时间差: 1.284438667s
wait_test.go:41: 第 3 次运行, 时间差: 3.105450176s
wait_test.go:41: 第 4 次运行, 时间差: 5.330910046s
wait_test.go:41: 第 5 次运行, 时间差: 9.201774039s
wait_test.go:41: 第 6 次运行, 时间差: 18.253303837s
wait_test.go:41: 第 7 次运行, 时间差: 43.22295292s
wait_test.go:41: 第 8 次运行, 时间差: 31.998912477s
wait_test.go:41: 第 9 次运行, 时间差: 34.72983897s
wait_test.go:41: 第 10 次运行, 时间差: 32.941101108s
wait_test.go:41: 第 11 次运行, 时间差: 39.06361719s
wait_test.go:41: 第 12 次运行, 时间差: 45.498957001s
wait_test.go:41: 第 13 次运行, 时间差: 54.462938656s
wait_test.go:41: 第 14 次运行, 时间差: 36.463043438s
wait_test.go:41: 第 15 次运行, 时间差: 41.460279724s
|
所以,当 ListAndWatch
错误退出的时候, reflector会根据配置, 在一段之间后重新执行.
ListAndWatch()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
...
...
// 获取资源数据,然后调用 cache.Store.Replace()
// 设置 ResourceVersion
err := r.list(stopCh)
...
...
for {
...
...
w, err := r.listerWatcher.Watch(options)
...
....
// 获取增量数据
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh)
...
...
}
}
|
r.list(stopCh)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
// list simply lists all items and records a resource version obtained from the server at the moment of the call.
// the resource version can be used for further progress notification (aka. watch).
func (r *Reflector) list(stopCh <-chan struct{}) error {
...
...
var list runtime.Object
var paginatedResult bool
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() {
...
...
list, paginatedResult, err = pager.List(context.Background(), options)
...
...
}()
...
...
listMetaInterface, err := meta.ListAccessor(list)
...
resourceVersion = listMetaInterface.GetResourceVersion()
...
items, err := meta.ExtractList(list)
...
...
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
...
r.setLastSyncResourceVersion(resourceVersion)
...
return nil
}
|
1
2
3
4
5
6
7
8
|
// syncWith replaces the store's items with the given list.
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
return r.store.Replace(found, resourceVersion)
}
|
watchHandler()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
// watchHandler watches w and sets setLastSyncResourceVersion
func watchHandler(start time.Time,
w watch.Interface,
store Store,
expectedType reflect.Type,
expectedGVK *schema.GroupVersionKind,
name string,
expectedTypeName string,
setLastSyncResourceVersion func(string),
clock clock.Clock,
errc chan error,
stopCh <-chan struct{},
) error {
eventCount := 0
// Stopping the watcher should be idempotent and if we return from this function there's no way
// we're coming back in with the same watch interface.
defer w.Stop()
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
case event, ok := <-w.ResultChan():
...
...
switch event.Type {
case watch.Added:
err := store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
}
case watch.Modified:
err := store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))
}
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
err := store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
}
...
...
}
}
...
...
return nil
}
|
小结
Relector 在 Run 起来之后, 通过传入的 lw, 先获取指定资源, 然后存入对应的 store 中.
信息
具体到 informer
中的reflector:
list 会产生一些 Sync/Replace
的 Delta
数据, 然后通过watch, 根据 event.Type, 产生 Added / Updated / Deleted
的 Delta
数据.
reflector Run()
起来之后,就作为生产者
产生 Delta
数据, 至于怎么消费(Pop)
这些 Delta
数据, 可以关注cache.controller 怎么调用Pop()