1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
package main
import (
"context"
"fmt"
"os"
"path"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
var (
namespace = "default"
label = "informer-dynamic-simple-" + rand.String(6)
IPAMBlockResource = schema.GroupVersionResource{
Group: "crd.projectcalico.org",
Version: "v1",
Resource: "ipamblocks",
}
)
func main() {
home, err := os.UserHomeDir()
if err != nil {
panic(err)
}
config, err := clientcmd.BuildConfigFromFlags("", path.Join(home, ".kube/config"))
if err != nil {
panic(err.Error())
}
client, err := dynamic.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// Create a shared informer factory.
// - A factory is essentially a struct keeping a map (type -> informer).
// - 5*time.Second is a default resync period (for all informers).
factory := dynamicinformer.NewDynamicSharedInformerFactory(client, 5*time.Second)
// When informer is requested, the factory instantiates it and keeps the
// the reference to it in the internal map before returning.
dynamicInformer := factory.ForResource(IPAMBlockResource)
dynamicInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cm := obj.(*unstructured.Unstructured)
fmt.Printf("Informer event: ADDED %s/%s\n", cm.GetNamespace(), cm.GetName())
},
UpdateFunc: func(old, new interface{}) {
cm := old.(*unstructured.Unstructured)
fmt.Printf("Informer event: UPDATED %s/%s\n", cm.GetNamespace(), cm.GetName())
},
DeleteFunc: func(obj interface{}) {
cm := obj.(*unstructured.Unstructured)
fmt.Printf("Informer event: DELETED %s/%s\n", cm.GetNamespace(), cm.GetName())
},
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start the informers' machinery.
// - Start() starts every Informer requested before using a goroutine per informer.
// - A started Informer will fetch ALL the IPAMBlocks from all the namespaces
// (using a lister) and trigger `AddFunc`` for each found IPAMBlock object.
// Use NewSharedInformerFactoryWithOptions() to make the lister fetch only
// a filtered subset of objects.
// - All IPAMBlocks added, updated, or deleted after the informer has been synced
// will trigger the corresponding callback call (using a watch).
// - Every 5*time.Second the UpdateFunc callback will be called for every
// previously fetched IPAMBlock (so-called resync period).
factory.Start(ctx.Done())
// factory.Start() releases the execution flow without waiting for all the
// internal machinery to warm up. We use factory.WaitForCacheSync() here
// to poll for cmInformer.Informer().HasSynced(). Essentially, it's just a
// fancy way to write a while-loop checking HasSynced() flags for all the
// registered informers with 100ms delay between iterations.
for gvr, ok := range factory.WaitForCacheSync(ctx.Done()) {
if !ok {
panic(fmt.Sprintf("Failed to sync cache for resource %v", gvr))
}
}
// Stay for a couple more seconds to observe resyncs.
time.Sleep(1000 * time.Second)
}
func createIPAMBlock(client dynamic.Interface) *unstructured.Unstructured {
cm := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "v1",
"kind": "IPAMBlock",
"metadata": map[string]interface{}{
"namespace": namespace,
"generateName": "informer-dynamic-simple-",
"labels": map[string]interface{}{
"example": label,
},
},
"data": map[string]interface{}{
"foo": "bar",
},
},
}
cm, err := client.
Resource(IPAMBlockResource).
Namespace(namespace).
Create(context.Background(), cm, metav1.CreateOptions{})
if err != nil {
panic(err.Error())
}
fmt.Printf("Created IPAMBlock %s/%s\n", cm.GetNamespace(), cm.GetName())
return cm
}
func deleteIPAMBlock(client dynamic.Interface, cm *unstructured.Unstructured) {
err := client.
Resource(IPAMBlockResource).
Namespace(cm.GetNamespace()).
Delete(context.Background(), cm.GetName(), metav1.DeleteOptions{})
if err != nil {
panic(err.Error())
}
if err != nil {
panic(err.Error())
}
fmt.Printf("Deleted IPAMBlock %s/%s\n", cm.GetNamespace(), cm.GetName())
}
|