Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions src/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
) {
this.watch = watch;
this.listFn = listFn;
this.callbackCache[ADD] = [];
this.callbackCache[UPDATE] = [];
this.callbackCache[DELETE] = [];
if (autoStart) {
this.doneHandler(null);
}
Expand All @@ -30,15 +33,25 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
}

public on(verb: string, cb: ObjectCallback<T>) {
if (verb !== ADD && verb !== UPDATE && verb !== DELETE) {
if (this.callbackCache[verb] === undefined) {
throw new Error(`Unknown verb: ${verb}`);
}
if (!this.callbackCache[verb]) {
this.callbackCache[verb] = [];
}
this.callbackCache[verb].push(cb);
}

public off(verb: string, cb: ObjectCallback<T>) {
if (this.callbackCache[verb] === undefined) {
throw new Error(`Unknown verb: ${verb}`);
}
const indexToRemove: number = this.callbackCache[verb].findIndex(
(cachedCb: ObjectCallback<T>) => cachedCb === cb,
);
if (indexToRemove === -1) {
return;
}
this.callbackCache[verb].splice(indexToRemove, 1);
}

public get(name: string, namespace?: string): T | undefined {
return this.objects.find(
(obj: T): boolean => {
Expand All @@ -58,7 +71,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
const promise = this.listFn();
const result = await promise;
const list = result.body;
deleteItems(this.objects, list.items, this.callbackCache[DELETE]);
deleteItems(this.objects, list.items, this.callbackCache[DELETE].slice());
this.addOrUpdateItems(list.items);
this.watch.watch(
this.path,
Expand All @@ -70,7 +83,12 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In

private addOrUpdateItems(items: T[]) {
items.forEach((obj: T) => {
addOrUpdateObject(this.objects, obj, this.callbackCache[ADD], this.callbackCache[UPDATE]);
addOrUpdateObject(
this.objects,
obj,
this.callbackCache[ADD].slice(),
this.callbackCache[UPDATE].slice(),
);
if (obj.metadata!.namespace) {
this.indexObj(obj);
}
Expand All @@ -90,13 +108,18 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
switch (phase) {
case 'ADDED':
case 'MODIFIED':
addOrUpdateObject(this.objects, obj, this.callbackCache[ADD], this.callbackCache[UPDATE]);
addOrUpdateObject(
this.objects,
obj,
this.callbackCache[ADD].slice(),
this.callbackCache[UPDATE].slice(),
);
if (obj.metadata!.namespace) {
this.indexObj(obj);
}
break;
case 'DELETED':
deleteObject(this.objects, obj, this.callbackCache[DELETE]);
deleteObject(this.objects, obj, this.callbackCache[DELETE].slice());
if (obj.metadata!.namespace) {
const namespaceList = this.indexCache[obj.metadata!.namespace!] as T[];
if (namespaceList) {
Expand Down
107 changes: 107 additions & 0 deletions src/cache_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,113 @@ describe('ListWatchCache', () => {
} as V1Pod);
expect(list.length).to.equal(1);
});
it('should not call handlers which have been unregistered', async () => {
const fakeWatch = mock.mock(Watch);
const list: V1Namespace[] = [];
const listObj = {
metadata: {
resourceVersion: '12345',
} as V1ListMeta,
items: list,
} as V1NamespaceList;
const listFn: ListPromise<V1Namespace> = function(): Promise<{
response: http.IncomingMessage;
body: V1NamespaceList;
}> {
return new Promise<{ response: http.IncomingMessage; body: V1NamespaceList }>((resolve) => {
resolve({ response: {} as http.IncomingMessage, body: listObj });
});
};
const watchCalled = new Promise((resolve) => {
mock.when(
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(resolve);
});
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);

const addedList1: V1Namespace[] = [];
const addToList1Fn = function(obj: V1Namespace) {
addedList1.push(obj);
};
const addedList2: V1Namespace[] = [];
const addToList2Fn = function(obj: V1Namespace) {
addedList2.push(obj);
};

informer.start();

await watchCalled;
const [, , watchHandler] = mock.capture(fakeWatch.watch).last();

informer.on(ADD, addToList1Fn);
informer.on(ADD, addToList2Fn);

watchHandler('ADDED', {
metadata: {
name: 'name1',
} as V1ObjectMeta,
} as V1Namespace);

informer.off(ADD, addToList2Fn);

watchHandler('ADDED', {
metadata: {
name: 'name2',
} as V1ObjectMeta,
} as V1Namespace);

expect(addedList1.length).to.equal(2);
expect(addedList2.length).to.equal(1);
});

it('mutating handlers in a callback should not affect those which remain', async () => {
const fakeWatch = mock.mock(Watch);
const list: V1Namespace[] = [];
const listObj = {
metadata: {
resourceVersion: '12345',
} as V1ListMeta,
items: list,
} as V1NamespaceList;
const listFn: ListPromise<V1Namespace> = function(): Promise<{
response: http.IncomingMessage;
body: V1NamespaceList;
}> {
return new Promise<{ response: http.IncomingMessage; body: V1NamespaceList }>((resolve) => {
resolve({ response: {} as http.IncomingMessage, body: listObj });
});
};
const watchCalled = new Promise((resolve) => {
mock.when(
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(resolve);
});
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);

const addedList: V1Namespace[] = [];
const addToListFn = function(obj: V1Namespace) {
addedList.push(obj);
};
const removeSelf = function() {
informer.off(ADD, removeSelf);
};

informer.start();

await watchCalled;
const [, , watchHandler] = mock.capture(fakeWatch.watch).last();

informer.on(ADD, removeSelf);
informer.on(ADD, addToListFn);

watchHandler('ADDED', {
metadata: {
name: 'name1',
} as V1ObjectMeta,
} as V1Namespace);

expect(addedList.length).to.equal(1);
});
});

describe('delete items', () => {
Expand Down
1 change: 1 addition & 0 deletions src/informer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export const DELETE: string = 'delete';

export interface Informer<T> {
on(verb: string, fn: ObjectCallback<T>);
off(verb: string, fn: ObjectCallback<T>);
start();
}

Expand Down