Skip to content

Commit 6f9f887

Browse files
authored
feat: add support for logging events from ReplicaSets (#4)
* feat: add support for logging events from ReplicaSets Fixes #3 * Fix replicaset tests event
1 parent e67408b commit 6f9f887

File tree

4 files changed

+195
-14
lines changed

4 files changed

+195
-14
lines changed

.github/workflows/ci.yaml

-2
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ name: ci
33
on:
44
push:
55

6-
pull_request:
7-
86
workflow_dispatch:
97

108
permissions:

logger.go

+100-11
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/coder/coder/codersdk"
1414
"github.com/coder/coder/codersdk/agentsdk"
1515
"github.com/fatih/color"
16+
appsv1 "k8s.io/api/apps/v1"
1617
corev1 "k8s.io/api/core/v1"
1718
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1819
"k8s.io/client-go/informers"
@@ -52,8 +53,9 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve
5253
errChan: make(chan error, 16),
5354
ctx: ctx,
5455
cancelFunc: cancelFunc,
55-
agentTokenToLogger: map[string]agentLogger{},
56+
agentTokenToLogger: map[string]*agentLogger{},
5657
podToAgentTokens: map[string][]string{},
58+
replicaSetToTokens: map[string][]string{},
5759
}
5860
return reporter, reporter.init()
5961
}
@@ -67,8 +69,9 @@ type podEventLogger struct {
6769
ctx context.Context
6870
cancelFunc context.CancelFunc
6971
mutex sync.RWMutex
70-
agentTokenToLogger map[string]agentLogger
72+
agentTokenToLogger map[string]*agentLogger
7173
podToAgentTokens map[string][]string
74+
replicaSetToTokens map[string][]string
7275
}
7376

7477
// init starts the informer factory and registers event handlers.
@@ -91,6 +94,7 @@ func (p *podEventLogger) init() error {
9194
// When a Pod is created, it's added to the map of Pods we're
9295
// interested in. When a Pod is deleted, it's removed from the map.
9396
podInformer := podFactory.Core().V1().Pods().Informer()
97+
replicaInformer := podFactory.Apps().V1().ReplicaSets().Informer()
9498
eventInformer := eventFactory.Core().V1().Events().Informer()
9599

96100
_, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -130,7 +134,7 @@ func (p *podEventLogger) init() error {
130134
}
131135
}
132136
if registered {
133-
p.logger.Info(p.ctx, "registered agent pod", slog.F("pod", pod.Name))
137+
p.logger.Info(p.ctx, "registered agent pod", slog.F("name", pod.Name), slog.F("namespace", pod.Namespace))
134138
}
135139
},
136140
DeleteFunc: func(obj interface{}) {
@@ -153,13 +157,92 @@ func (p *podEventLogger) init() error {
153157
Level: codersdk.LogLevelError,
154158
})
155159
}
156-
p.logger.Info(p.ctx, "unregistered agent pod", slog.F("pod", pod.Name))
160+
p.logger.Info(p.ctx, "unregistered agent pod", slog.F("name", pod.Name))
157161
},
158162
})
159163
if err != nil {
160164
return fmt.Errorf("register pod handler: %w", err)
161165
}
162166

167+
_, err = replicaInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
168+
AddFunc: func(obj interface{}) {
169+
replica, ok := obj.(*appsv1.ReplicaSet)
170+
if !ok {
171+
p.errChan <- fmt.Errorf("unexpected replica object type: %T", obj)
172+
return
173+
}
174+
175+
// We don't want to add logs to workspaces that are already started!
176+
if !replica.CreationTimestamp.After(startTime) {
177+
return
178+
}
179+
180+
p.mutex.Lock()
181+
defer p.mutex.Unlock()
182+
183+
var registered bool
184+
for _, container := range replica.Spec.Template.Spec.Containers {
185+
for _, env := range container.Env {
186+
if env.Name != "CODER_AGENT_TOKEN" {
187+
continue
188+
}
189+
registered = true
190+
tokens, ok := p.replicaSetToTokens[replica.Name]
191+
if !ok {
192+
tokens = make([]string, 0)
193+
}
194+
tokens = append(tokens, env.Value)
195+
p.replicaSetToTokens[replica.Name] = tokens
196+
197+
p.sendLog(replica.Name, env.Value, agentsdk.StartupLog{
198+
CreatedAt: time.Now(),
199+
Output: fmt.Sprintf("🐳 %s: %s", newColor(color.Bold).Sprint("Queued pod from ReplicaSet"), replica.Name),
200+
Level: codersdk.LogLevelInfo,
201+
})
202+
}
203+
}
204+
if registered {
205+
p.logger.Info(p.ctx, "registered agent pod from ReplicaSet", slog.F("name", replica.Name))
206+
}
207+
},
208+
DeleteFunc: func(obj interface{}) {
209+
replicaSet, ok := obj.(*appsv1.ReplicaSet)
210+
if !ok {
211+
p.errChan <- fmt.Errorf("unexpected replica set delete object type: %T", obj)
212+
return
213+
}
214+
p.mutex.Lock()
215+
defer p.mutex.Unlock()
216+
_, ok = p.replicaSetToTokens[replicaSet.Name]
217+
if !ok {
218+
return
219+
}
220+
delete(p.replicaSetToTokens, replicaSet.Name)
221+
for _, pod := range replicaSet.Spec.Template.Spec.Containers {
222+
name := pod.Name
223+
if name == "" {
224+
name = replicaSet.Spec.Template.Name
225+
}
226+
tokens, ok := p.podToAgentTokens[name]
227+
if !ok {
228+
continue
229+
}
230+
delete(p.podToAgentTokens, name)
231+
for _, token := range tokens {
232+
p.sendLog(pod.Name, token, agentsdk.StartupLog{
233+
CreatedAt: time.Now(),
234+
Output: fmt.Sprintf("🗑️ %s: %s", newColor(color.Bold).Sprint("Deleted ReplicaSet"), replicaSet.Name),
235+
Level: codersdk.LogLevelError,
236+
})
237+
}
238+
}
239+
p.logger.Info(p.ctx, "unregistered ReplicaSet", slog.F("name", replicaSet.Name))
240+
},
241+
})
242+
if err != nil {
243+
return fmt.Errorf("register replicaset handler: %w", err)
244+
}
245+
163246
_, err = eventInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
164247
AddFunc: func(obj interface{}) {
165248
event, ok := obj.(*corev1.Event)
@@ -175,8 +258,14 @@ func (p *podEventLogger) init() error {
175258

176259
p.mutex.Lock()
177260
defer p.mutex.Unlock()
178-
tokens, ok := p.podToAgentTokens[event.InvolvedObject.Name]
179-
if !ok {
261+
var tokens []string
262+
switch event.InvolvedObject.Kind {
263+
case "Pod":
264+
tokens, ok = p.podToAgentTokens[event.InvolvedObject.Name]
265+
case "ReplicaSet":
266+
tokens, ok = p.replicaSetToTokens[event.InvolvedObject.Name]
267+
}
268+
if tokens == nil || !ok {
180269
return
181270
}
182271

@@ -210,23 +299,23 @@ func (p *podEventLogger) init() error {
210299
// loggerForToken returns a logger for the given pod name and agent token.
211300
// If a logger already exists for the token, it's returned. Otherwise a new
212301
// logger is created and returned.
213-
func (p *podEventLogger) sendLog(podName, token string, log agentsdk.StartupLog) {
302+
func (p *podEventLogger) sendLog(resourceName, token string, log agentsdk.StartupLog) {
214303
logger, ok := p.agentTokenToLogger[token]
215304
if !ok {
216305
client := agentsdk.New(p.coderURL)
217306
client.SetSessionToken(token)
218-
client.SDK.Logger = p.logger.Named(podName)
307+
client.SDK.Logger = p.logger.Named(resourceName)
219308
sendLog, closer := client.QueueStartupLogs(p.ctx, p.logDebounce)
220309

221-
logger = agentLogger{
310+
logger = &agentLogger{
222311
sendLog: sendLog,
223312
closer: closer,
224313
closeTimer: time.AfterFunc(p.logDebounce*5, func() {
225314
logger.closed.Store(true)
226315
// We want to have two close cycles for loggers!
227316
err := closer.Close()
228317
if err != nil {
229-
p.logger.Error(p.ctx, "close agent logger", slog.Error(err), slog.F("pod", podName))
318+
p.logger.Error(p.ctx, "close agent logger", slog.Error(err), slog.F("pod", resourceName))
230319
}
231320
p.mutex.Lock()
232321
delete(p.agentTokenToLogger, token)
@@ -239,7 +328,7 @@ func (p *podEventLogger) sendLog(podName, token string, log agentsdk.StartupLog)
239328
// If the logger was already closed, we await the close before
240329
// creating a new logger. This is to ensure all loggers get sent in order!
241330
_ = logger.closer.Close()
242-
p.sendLog(podName, token, log)
331+
p.sendLog(resourceName, token, log)
243332
return
244333
}
245334
// We make this 5x the debounce because it's low-cost to persist a few

logger_test.go

+92-1
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,103 @@ import (
1313
"github.com/coder/coder/codersdk/agentsdk"
1414
"github.com/stretchr/testify/require"
1515
"github.com/zeebo/assert"
16+
appsv1 "k8s.io/api/apps/v1"
1617
corev1 "k8s.io/api/core/v1"
1718
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1819
"k8s.io/client-go/kubernetes/fake"
1920
)
2021

21-
func TestPodEventLogger(t *testing.T) {
22+
func TestReplicaSetEvents(t *testing.T) {
23+
t.Parallel()
24+
25+
queued := make(chan agentsdk.PatchStartupLogs, 1)
26+
agent := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
27+
var req agentsdk.PatchStartupLogs
28+
err := json.NewDecoder(r.Body).Decode(&req)
29+
assert.NoError(t, err)
30+
queued <- req
31+
}))
32+
agentURL, err := url.Parse(agent.URL)
33+
require.NoError(t, err)
34+
namespace := "test-namespace"
35+
client := fake.NewSimpleClientset()
36+
ctx := context.Background()
37+
reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{
38+
client: client,
39+
coderURL: agentURL,
40+
namespace: namespace,
41+
logger: slogtest.Make(t, nil),
42+
logDebounce: time.Millisecond,
43+
})
44+
require.NoError(t, err)
45+
46+
rs := &appsv1.ReplicaSet{
47+
ObjectMeta: v1.ObjectMeta{
48+
Name: "test-rs",
49+
CreationTimestamp: v1.Time{
50+
Time: time.Now().Add(time.Hour),
51+
},
52+
},
53+
Spec: appsv1.ReplicaSetSpec{
54+
Template: corev1.PodTemplateSpec{
55+
ObjectMeta: v1.ObjectMeta{
56+
Name: "test-pod",
57+
},
58+
Spec: corev1.PodSpec{
59+
Containers: []corev1.Container{{
60+
Env: []corev1.EnvVar{
61+
{
62+
Name: "CODER_AGENT_TOKEN",
63+
Value: "test-token",
64+
},
65+
},
66+
}},
67+
},
68+
},
69+
},
70+
}
71+
_, err = client.AppsV1().ReplicaSets(namespace).Create(ctx, rs, v1.CreateOptions{})
72+
require.NoError(t, err)
73+
74+
log := <-queued
75+
require.Len(t, log.Logs, 1)
76+
require.Contains(t, log.Logs[0].Output, "Queued pod from ReplicaSet")
77+
78+
event := &corev1.Event{
79+
ObjectMeta: v1.ObjectMeta{
80+
Name: "test-event",
81+
CreationTimestamp: v1.Time{
82+
Time: time.Now().Add(time.Hour),
83+
},
84+
},
85+
InvolvedObject: corev1.ObjectReference{
86+
Kind: "ReplicaSet",
87+
Name: "test-rs",
88+
},
89+
Reason: "Test",
90+
Message: "Test event",
91+
}
92+
_, err = client.CoreV1().Events(namespace).Create(ctx, event, v1.CreateOptions{})
93+
require.NoError(t, err)
94+
95+
log = <-queued
96+
require.Len(t, log.Logs, 1)
97+
require.Contains(t, log.Logs[0].Output, event.Message)
98+
99+
err = client.AppsV1().ReplicaSets(namespace).Delete(ctx, rs.Name, v1.DeleteOptions{})
100+
require.NoError(t, err)
101+
102+
require.Eventually(t, func() bool {
103+
reporter.mutex.Lock()
104+
defer reporter.mutex.Unlock()
105+
return len(reporter.podToAgentTokens) == 0 && len(reporter.replicaSetToTokens) == 0
106+
}, time.Second, time.Millisecond)
107+
108+
err = reporter.Close()
109+
require.NoError(t, err)
110+
}
111+
112+
func TestPodEvents(t *testing.T) {
22113
t.Parallel()
23114

24115
queued := make(chan agentsdk.PatchStartupLogs, 1)

templates/service.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ rules:
66
- apiGroups: [""]
77
resources: ["pods", "events"]
88
verbs: ["get", "watch", "list"]
9+
- apiGroups: ["apps"]
10+
resources: ["replicasets", "events"]
11+
verbs: ["get", "watch", "list"]
912
---
1013
apiVersion: v1
1114
kind: ServiceAccount

0 commit comments

Comments
 (0)