Skip to content

Commit 4d332f8

Browse files
authored
Merge pull request #13229 from fabriziopandini/fix-inmemory-watch
🌱 Fix inMemory watch
2 parents a64cfe0 + e9fd4c6 commit 4d332f8

File tree

6 files changed

+284
-82
lines changed

6 files changed

+284
-82
lines changed

test/infrastructure/inmemory/pkg/runtime/cache/cache.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ type Cache interface {
4848
Update(resourceGroup string, obj client.Object) error
4949
Patch(resourceGroup string, obj client.Object, patch client.Patch) error
5050

51+
GetBookmarkResourceVersion(resourceGroup string) (string, error)
52+
5153
GetInformer(ctx context.Context, obj client.Object) (Informer, error)
5254
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error)
5355
}

test/infrastructure/inmemory/pkg/runtime/cache/client.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,22 @@ func (c *cache) Get(resourceGroup string, objKey client.ObjectKey, obj client.Ob
7979
return nil
8080
}
8181

82+
func (c *cache) GetBookmarkResourceVersion(resourceGroup string) (string, error) {
83+
if resourceGroup == "" {
84+
return "", apierrors.NewBadRequest("resourceGroup must not be empty")
85+
}
86+
87+
tracker := c.resourceGroupTracker(resourceGroup)
88+
if tracker == nil {
89+
return "", apierrors.NewBadRequest(fmt.Sprintf("resourceGroup %s does not exist", resourceGroup))
90+
}
91+
92+
tracker.lock.RLock()
93+
defer tracker.lock.RUnlock()
94+
95+
return fmt.Sprintf("%d", tracker.lastResourceVersion), nil
96+
}
97+
8298
func (c *cache) List(resourceGroup string, list client.ObjectList, opts ...client.ListOption) error {
8399
if resourceGroup == "" {
84100
return apierrors.NewBadRequest("resourceGroup must not be empty")

test/infrastructure/inmemory/pkg/runtime/cache/client_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,49 @@ func Test_cache_client(t *testing.T) {
277277
})
278278
})
279279

280+
t.Run("Get bookmark resourceVersion", func(t *testing.T) {
281+
c := NewCache(scheme).(*cache)
282+
c.AddResourceGroup("foo")
283+
284+
t.Run("fails if resourceGroup is empty", func(t *testing.T) {
285+
g := NewWithT(t)
286+
287+
_, err := c.GetBookmarkResourceVersion("")
288+
g.Expect(err).To(HaveOccurred())
289+
g.Expect(apierrors.IsBadRequest(err)).To(BeTrue())
290+
})
291+
292+
t.Run("fails if resourceGroup doesn't exist", func(t *testing.T) {
293+
g := NewWithT(t)
294+
295+
_, err := c.GetBookmarkResourceVersion("bar")
296+
g.Expect(err).To(HaveOccurred())
297+
g.Expect(apierrors.IsBadRequest(err)).To(BeTrue())
298+
})
299+
300+
t.Run("get when no objects exists", func(t *testing.T) {
301+
g := NewWithT(t)
302+
303+
v, err := c.GetBookmarkResourceVersion("foo")
304+
g.Expect(err).ToNot(HaveOccurred())
305+
306+
// Check all the computed fields are as expected.
307+
g.Expect(v).To(Equal("0"), "resourceVersion must be set")
308+
})
309+
310+
t.Run("get when objects exists", func(t *testing.T) {
311+
g := NewWithT(t)
312+
313+
createMachine(t, c, "foo", "bar")
314+
315+
v, err := c.GetBookmarkResourceVersion("foo")
316+
g.Expect(err).ToNot(HaveOccurred())
317+
318+
// Check all the computed fields are as expected.
319+
g.Expect(v).To(Equal("1"), "resourceVersion must be set")
320+
})
321+
})
322+
280323
t.Run("list objects", func(t *testing.T) {
281324
c := NewCache(scheme).(*cache)
282325
c.AddResourceGroup("foo")

test/infrastructure/inmemory/pkg/server/api/watch.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/emicklei/go-restful/v3"
2727
"github.com/pkg/errors"
28+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2829
"k8s.io/apimachinery/pkg/runtime/schema"
2930
"k8s.io/apimachinery/pkg/watch"
3031
ctrl "sigs.k8s.io/controller-runtime"
@@ -169,6 +170,20 @@ func (h *apiServerHandler) watchForResource(req *restful.Request, resp *restful.
169170
initialEvents = append(initialEvents, Event{Type: eventType, Object: &obj})
170171
}
171172

173+
// Add the bookmark event.
174+
bookmarkResourceVersion, err := c.GetBookmarkResourceVersion(resourceGroup)
175+
if err != nil {
176+
return err
177+
}
178+
179+
obj := &unstructured.Unstructured{}
180+
obj.SetAPIVersion(gvk.GroupVersion().String())
181+
obj.SetKind(gvk.Kind)
182+
obj.SetResourceVersion(bookmarkResourceVersion)
183+
obj.SetAnnotations(map[string]string{"k8s.io/initial-events-end": "true"})
184+
185+
initialEvents = append(initialEvents, Event{Type: watch.Bookmark, Object: obj})
186+
172187
return watcher.Run(ctx, queryTimeout, initialEvents, list.GetResourceVersion(), resp)
173188
}
174189

@@ -184,6 +199,7 @@ func (m *WatchEventDispatcher) Run(ctx context.Context, timeout string, initialE
184199
return errors.New("can't start Watch: can't get restful.Response")
185200
}
186201
w.Header().Set("Transfer-Encoding", "chunked")
202+
w.Header().Set("Content-Type", "application/json")
187203
w.WriteHeader(http.StatusOK)
188204

189205
// Write all initial events.

test/infrastructure/inmemory/pkg/server/listener.go

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ import (
2828
"k8s.io/client-go/rest"
2929
"k8s.io/client-go/tools/clientcmd"
3030
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
31-
"sigs.k8s.io/controller-runtime/pkg/client"
32-
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
3331

3432
"sigs.k8s.io/cluster-api/util/certs"
3533
)
@@ -119,28 +117,3 @@ func (s *WorkloadClusterListener) RESTConfig() (*rest.Config, error) {
119117

120118
return restConfig, nil
121119
}
122-
123-
// GetClient returns a client for a WorkloadClusterListener.
124-
func (s *WorkloadClusterListener) GetClient() (client.WithWatch, error) {
125-
restConfig, err := s.RESTConfig()
126-
if err != nil {
127-
return nil, err
128-
}
129-
130-
httpClient, err := rest.HTTPClientFor(restConfig)
131-
if err != nil {
132-
return nil, err
133-
}
134-
135-
mapper, err := apiutil.NewDynamicRESTMapper(restConfig, httpClient)
136-
if err != nil {
137-
return nil, err
138-
}
139-
140-
c, err := client.NewWithWatch(restConfig, client.Options{Scheme: s.scheme, Mapper: mapper})
141-
if err != nil {
142-
return nil, err
143-
}
144-
145-
return c, nil
146-
}

0 commit comments

Comments
 (0)