Skip to content

Commit 5ff006b

Browse files
committed
Add an informer implementation.
1 parent 59354fc commit 5ff006b

File tree

3 files changed

+31
-11
lines changed

3 files changed

+31
-11
lines changed

examples/cache-example.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ const watch = new k8s.Watch(kc);
1010
const listFn = (fn) => {
1111
k8sApi.listNamespacedPod('default')
1212
.then((res) => {
13-
fn(res.body.items);
13+
const podList = res.body;
14+
fn(podList.items, podList.metadata.resourceVersin);
1415
})
1516
.catch((err) => {
1617
console.log(err);

src/cache.ts

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { Informer, ObjectCallback } from './informer';
12
import { KubernetesObject } from './types';
23
import { Watch } from './watch';
34

@@ -6,11 +7,12 @@ export interface ObjectCache<T> {
67
list(namespace?: string): ReadonlyArray<T>;
78
}
89

9-
export type ListCallback<T extends KubernetesObject> = (list: T[]) => void;
10+
export type ListCallback<T extends KubernetesObject> = (list: T[], ResourceVersion: string) => void;
1011

11-
export class ListWatch<T extends KubernetesObject> implements ObjectCache<T> {
12+
export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, Informer<T> {
1213
private objects: T[] = [];
1314
private readonly indexCache: { [key: string]: T[] } = {};
15+
private readonly callbackCache: { [key: string]: ObjectCallback<T> } = {};
1416

1517
public constructor(
1618
private readonly path: string,
@@ -22,6 +24,10 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T> {
2224
this.doneHandler(null);
2325
}
2426

27+
public on(verb: string, cb: ObjectCallback<T>) {
28+
this.callbackCache[verb] = cb;
29+
}
30+
2531
public get(name: string, namespace?: string): T | undefined {
2632
return this.objects.find(
2733
(obj: T): boolean => {
@@ -38,12 +44,12 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T> {
3844
}
3945

4046
private doneHandler(err: any) {
41-
this.listFn((result: T[]) => {
47+
this.listFn((result: T[], resourceVersion: string) => {
4248
this.objects = result;
4349
for (const elt of this.objects) {
4450
this.indexObj(elt);
4551
}
46-
this.watch.watch(this.path, {}, this.watchHandler.bind(this), this.doneHandler.bind(this));
52+
this.watch.watch(this.path, { resourceVersion }, this.watchHandler.bind(this), this.doneHandler.bind(this));
4753
});
4854
}
4955

@@ -53,24 +59,24 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T> {
5359
namespaceList = [];
5460
this.indexCache[obj.metadata!.namespace!] = namespaceList;
5561
}
56-
addOrUpdateObject(namespaceList, obj);
62+
addOrUpdateObject(namespaceList, obj, this.callbackCache['add'], this.callbackCache['update']);
5763
}
5864

5965
private watchHandler(phase: string, obj: T) {
6066
switch (phase) {
6167
case 'ADDED':
6268
case 'MODIFIED':
63-
addOrUpdateObject(this.objects, obj);
69+
addOrUpdateObject(this.objects, obj, this.callbackCache['add'], this.callbackCache['update']);
6470
if (obj.metadata!.namespace) {
6571
this.indexObj(obj);
6672
}
6773
break;
6874
case 'DELETED':
69-
deleteObject(this.objects, obj);
75+
deleteObject(this.objects, obj, this.callbackCache['delete']);
7076
if (obj.metadata!.namespace) {
7177
const namespaceList = this.indexCache[obj.metadata!.namespace!] as T[];
7278
if (namespaceList) {
73-
deleteObject(namespaceList, obj);
79+
deleteObject(namespaceList, obj, this.callbackCache['delete']);
7480
}
7581
}
7682
break;
@@ -79,12 +85,16 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T> {
7985
}
8086

8187
// Only public for testing.
82-
export function addOrUpdateObject<T extends KubernetesObject>(objects: T[], obj: T) {
88+
export function addOrUpdateObject<T extends KubernetesObject>(
89+
objects: T[], obj: T,
90+
addCallback: ObjectCallback<T>, updateCallback: ObjectCallback<T>) {
8391
const ix = findKubernetesObject(objects, obj);
8492
if (ix === -1) {
8593
objects.push(obj);
94+
addCallback(obj);
8695
} else {
8796
objects[ix] = obj;
97+
updateCallback(obj);
8898
}
8999
}
90100

@@ -99,9 +109,11 @@ function findKubernetesObject<T extends KubernetesObject>(objects: T[], obj: T):
99109
}
100110

101111
// Public for testing.
102-
export function deleteObject<T extends KubernetesObject>(objects: T[], obj: T) {
112+
export function deleteObject<T extends KubernetesObject>(
113+
objects: T[], obj: T, deleteCallback: ObjectCallback<T>) {
103114
const ix = findKubernetesObject(objects, obj);
104115
if (ix !== -1) {
105116
objects.splice(ix, 1);
117+
deleteCallback(obj);
106118
}
107119
}

src/informer.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import { KubernetesObject } from './types';
2+
3+
export type ObjectCallback<T extends KubernetesObject> = (obj: T) => void;
4+
5+
export interface Informer<T> {
6+
on(verb: string, fn: ObjectCallback<T>);
7+
}

0 commit comments

Comments
 (0)