Skip to content

Commit 6a6b737

Browse files
authored
Merge pull request kubernetes-client#323 from benbuzbee/benbuz/off
Add 'off' method to ListWatch and Informer which allows for the removal of callbacks
2 parents f119da0 + 1623876 commit 6a6b737

File tree

3 files changed

+139
-8
lines changed

3 files changed

+139
-8
lines changed

src/cache.ts

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
2020
) {
2121
this.watch = watch;
2222
this.listFn = listFn;
23+
this.callbackCache[ADD] = [];
24+
this.callbackCache[UPDATE] = [];
25+
this.callbackCache[DELETE] = [];
2326
if (autoStart) {
2427
this.doneHandler(null);
2528
}
@@ -30,15 +33,25 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
3033
}
3134

3235
public on(verb: string, cb: ObjectCallback<T>) {
33-
if (verb !== ADD && verb !== UPDATE && verb !== DELETE) {
36+
if (this.callbackCache[verb] === undefined) {
3437
throw new Error(`Unknown verb: ${verb}`);
3538
}
36-
if (!this.callbackCache[verb]) {
37-
this.callbackCache[verb] = [];
38-
}
3939
this.callbackCache[verb].push(cb);
4040
}
4141

42+
public off(verb: string, cb: ObjectCallback<T>) {
43+
if (this.callbackCache[verb] === undefined) {
44+
throw new Error(`Unknown verb: ${verb}`);
45+
}
46+
const indexToRemove: number = this.callbackCache[verb].findIndex(
47+
(cachedCb: ObjectCallback<T>) => cachedCb === cb,
48+
);
49+
if (indexToRemove === -1) {
50+
return;
51+
}
52+
this.callbackCache[verb].splice(indexToRemove, 1);
53+
}
54+
4255
public get(name: string, namespace?: string): T | undefined {
4356
return this.objects.find(
4457
(obj: T): boolean => {
@@ -58,7 +71,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
5871
const promise = this.listFn();
5972
const result = await promise;
6073
const list = result.body;
61-
deleteItems(this.objects, list.items, this.callbackCache[DELETE]);
74+
deleteItems(this.objects, list.items, this.callbackCache[DELETE].slice());
6275
this.addOrUpdateItems(list.items);
6376
this.watch.watch(
6477
this.path,
@@ -70,7 +83,12 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
7083

7184
private addOrUpdateItems(items: T[]) {
7285
items.forEach((obj: T) => {
73-
addOrUpdateObject(this.objects, obj, this.callbackCache[ADD], this.callbackCache[UPDATE]);
86+
addOrUpdateObject(
87+
this.objects,
88+
obj,
89+
this.callbackCache[ADD].slice(),
90+
this.callbackCache[UPDATE].slice(),
91+
);
7492
if (obj.metadata!.namespace) {
7593
this.indexObj(obj);
7694
}
@@ -90,13 +108,18 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
90108
switch (phase) {
91109
case 'ADDED':
92110
case 'MODIFIED':
93-
addOrUpdateObject(this.objects, obj, this.callbackCache[ADD], this.callbackCache[UPDATE]);
111+
addOrUpdateObject(
112+
this.objects,
113+
obj,
114+
this.callbackCache[ADD].slice(),
115+
this.callbackCache[UPDATE].slice(),
116+
);
94117
if (obj.metadata!.namespace) {
95118
this.indexObj(obj);
96119
}
97120
break;
98121
case 'DELETED':
99-
deleteObject(this.objects, obj, this.callbackCache[DELETE]);
122+
deleteObject(this.objects, obj, this.callbackCache[DELETE].slice());
100123
if (obj.metadata!.namespace) {
101124
const namespaceList = this.indexCache[obj.metadata!.namespace!] as T[];
102125
if (namespaceList) {

src/cache_test.ts

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,113 @@ describe('ListWatchCache', () => {
549549
} as V1Pod);
550550
expect(list.length).to.equal(1);
551551
});
552+
it('should not call handlers which have been unregistered', async () => {
553+
const fakeWatch = mock.mock(Watch);
554+
const list: V1Namespace[] = [];
555+
const listObj = {
556+
metadata: {
557+
resourceVersion: '12345',
558+
} as V1ListMeta,
559+
items: list,
560+
} as V1NamespaceList;
561+
const listFn: ListPromise<V1Namespace> = function(): Promise<{
562+
response: http.IncomingMessage;
563+
body: V1NamespaceList;
564+
}> {
565+
return new Promise<{ response: http.IncomingMessage; body: V1NamespaceList }>((resolve) => {
566+
resolve({ response: {} as http.IncomingMessage, body: listObj });
567+
});
568+
};
569+
const watchCalled = new Promise((resolve) => {
570+
mock.when(
571+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
572+
).thenCall(resolve);
573+
});
574+
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
575+
576+
const addedList1: V1Namespace[] = [];
577+
const addToList1Fn = function(obj: V1Namespace) {
578+
addedList1.push(obj);
579+
};
580+
const addedList2: V1Namespace[] = [];
581+
const addToList2Fn = function(obj: V1Namespace) {
582+
addedList2.push(obj);
583+
};
584+
585+
informer.start();
586+
587+
await watchCalled;
588+
const [, , watchHandler] = mock.capture(fakeWatch.watch).last();
589+
590+
informer.on(ADD, addToList1Fn);
591+
informer.on(ADD, addToList2Fn);
592+
593+
watchHandler('ADDED', {
594+
metadata: {
595+
name: 'name1',
596+
} as V1ObjectMeta,
597+
} as V1Namespace);
598+
599+
informer.off(ADD, addToList2Fn);
600+
601+
watchHandler('ADDED', {
602+
metadata: {
603+
name: 'name2',
604+
} as V1ObjectMeta,
605+
} as V1Namespace);
606+
607+
expect(addedList1.length).to.equal(2);
608+
expect(addedList2.length).to.equal(1);
609+
});
610+
611+
it('mutating handlers in a callback should not affect those which remain', async () => {
612+
const fakeWatch = mock.mock(Watch);
613+
const list: V1Namespace[] = [];
614+
const listObj = {
615+
metadata: {
616+
resourceVersion: '12345',
617+
} as V1ListMeta,
618+
items: list,
619+
} as V1NamespaceList;
620+
const listFn: ListPromise<V1Namespace> = function(): Promise<{
621+
response: http.IncomingMessage;
622+
body: V1NamespaceList;
623+
}> {
624+
return new Promise<{ response: http.IncomingMessage; body: V1NamespaceList }>((resolve) => {
625+
resolve({ response: {} as http.IncomingMessage, body: listObj });
626+
});
627+
};
628+
const watchCalled = new Promise((resolve) => {
629+
mock.when(
630+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
631+
).thenCall(resolve);
632+
});
633+
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
634+
635+
const addedList: V1Namespace[] = [];
636+
const addToListFn = function(obj: V1Namespace) {
637+
addedList.push(obj);
638+
};
639+
const removeSelf = function() {
640+
informer.off(ADD, removeSelf);
641+
};
642+
643+
informer.start();
644+
645+
await watchCalled;
646+
const [, , watchHandler] = mock.capture(fakeWatch.watch).last();
647+
648+
informer.on(ADD, removeSelf);
649+
informer.on(ADD, addToListFn);
650+
651+
watchHandler('ADDED', {
652+
metadata: {
653+
name: 'name1',
654+
} as V1ObjectMeta,
655+
} as V1Namespace);
656+
657+
expect(addedList.length).to.equal(1);
658+
});
552659
});
553660

554661
describe('delete items', () => {

src/informer.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ export const DELETE: string = 'delete';
1818

1919
export interface Informer<T> {
2020
on(verb: string, fn: ObjectCallback<T>);
21+
off(verb: string, fn: ObjectCallback<T>);
2122
start();
2223
}
2324

0 commit comments

Comments
 (0)