From 162d0264911038612cc074ee60fb6dd3a15a17a0 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Wed, 5 Jul 2023 22:47:55 +0000 Subject: [PATCH 1/2] feat: add support for logging events from ReplicaSets Fixes #3 --- .github/workflows/ci.yaml | 2 - logger.go | 111 ++++++++++++++++++++++++++++++++++---- logger_test.go | 93 +++++++++++++++++++++++++++++++- templates/service.yaml | 3 ++ 4 files changed, 195 insertions(+), 14 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index e973a75..37a8de3 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -3,8 +3,6 @@ name: ci on: push: - pull_request: - workflow_dispatch: permissions: diff --git a/logger.go b/logger.go index a143549..6eb02c2 100644 --- a/logger.go +++ b/logger.go @@ -13,6 +13,7 @@ import ( "github.com/coder/coder/codersdk" "github.com/coder/coder/codersdk/agentsdk" "github.com/fatih/color" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" @@ -52,8 +53,9 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve errChan: make(chan error, 16), ctx: ctx, cancelFunc: cancelFunc, - agentTokenToLogger: map[string]agentLogger{}, + agentTokenToLogger: map[string]*agentLogger{}, podToAgentTokens: map[string][]string{}, + replicaSetToTokens: map[string][]string{}, } return reporter, reporter.init() } @@ -67,8 +69,9 @@ type podEventLogger struct { ctx context.Context cancelFunc context.CancelFunc mutex sync.RWMutex - agentTokenToLogger map[string]agentLogger + agentTokenToLogger map[string]*agentLogger podToAgentTokens map[string][]string + replicaSetToTokens map[string][]string } // init starts the informer factory and registers event handlers. @@ -91,6 +94,7 @@ func (p *podEventLogger) init() error { // When a Pod is created, it's added to the map of Pods we're // interested in. When a Pod is deleted, it's removed from the map. podInformer := podFactory.Core().V1().Pods().Informer() + replicaInformer := podFactory.Apps().V1().ReplicaSets().Informer() eventInformer := eventFactory.Core().V1().Events().Informer() _, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -130,7 +134,7 @@ func (p *podEventLogger) init() error { } } if registered { - p.logger.Info(p.ctx, "registered agent pod", slog.F("pod", pod.Name)) + p.logger.Info(p.ctx, "registered agent pod", slog.F("name", pod.Name), slog.F("namespace", pod.Namespace)) } }, DeleteFunc: func(obj interface{}) { @@ -153,13 +157,92 @@ func (p *podEventLogger) init() error { Level: codersdk.LogLevelError, }) } - p.logger.Info(p.ctx, "unregistered agent pod", slog.F("pod", pod.Name)) + p.logger.Info(p.ctx, "unregistered agent pod", slog.F("name", pod.Name)) }, }) if err != nil { return fmt.Errorf("register pod handler: %w", err) } + _, err = replicaInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + replica, ok := obj.(*appsv1.ReplicaSet) + if !ok { + p.errChan <- fmt.Errorf("unexpected replica object type: %T", obj) + return + } + + // We don't want to add logs to workspaces that are already started! + if !replica.CreationTimestamp.After(startTime) { + return + } + + p.mutex.Lock() + defer p.mutex.Unlock() + + var registered bool + for _, container := range replica.Spec.Template.Spec.Containers { + for _, env := range container.Env { + if env.Name != "CODER_AGENT_TOKEN" { + continue + } + registered = true + tokens, ok := p.replicaSetToTokens[replica.Name] + if !ok { + tokens = make([]string, 0) + } + tokens = append(tokens, env.Value) + p.replicaSetToTokens[replica.Name] = tokens + + p.sendLog(replica.Name, env.Value, agentsdk.StartupLog{ + CreatedAt: time.Now(), + Output: fmt.Sprintf("🐳 %s: %s", newColor(color.Bold).Sprint("Queued pod from ReplicaSet"), replica.Name), + Level: codersdk.LogLevelInfo, + }) + } + } + if registered { + p.logger.Info(p.ctx, "registered agent pod from ReplicaSet", slog.F("name", replica.Name)) + } + }, + DeleteFunc: func(obj interface{}) { + replicaSet, ok := obj.(*appsv1.ReplicaSet) + if !ok { + p.errChan <- fmt.Errorf("unexpected replica set delete object type: %T", obj) + return + } + p.mutex.Lock() + defer p.mutex.Unlock() + _, ok = p.replicaSetToTokens[replicaSet.Name] + if !ok { + return + } + delete(p.replicaSetToTokens, replicaSet.Name) + for _, pod := range replicaSet.Spec.Template.Spec.Containers { + name := pod.Name + if name == "" { + name = replicaSet.Spec.Template.Name + } + tokens, ok := p.podToAgentTokens[name] + if !ok { + continue + } + delete(p.podToAgentTokens, name) + for _, token := range tokens { + p.sendLog(pod.Name, token, agentsdk.StartupLog{ + CreatedAt: time.Now(), + Output: fmt.Sprintf("🗑️ %s: %s", newColor(color.Bold).Sprint("Deleted ReplicaSet"), replicaSet.Name), + Level: codersdk.LogLevelError, + }) + } + } + p.logger.Info(p.ctx, "unregistered ReplicaSet", slog.F("name", replicaSet.Name)) + }, + }) + if err != nil { + return fmt.Errorf("register replicaset handler: %w", err) + } + _, err = eventInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { event, ok := obj.(*corev1.Event) @@ -175,8 +258,14 @@ func (p *podEventLogger) init() error { p.mutex.Lock() defer p.mutex.Unlock() - tokens, ok := p.podToAgentTokens[event.InvolvedObject.Name] - if !ok { + var tokens []string + switch event.InvolvedObject.Kind { + case "Pod": + tokens, ok = p.podToAgentTokens[event.InvolvedObject.Name] + case "ReplicaSet": + tokens, ok = p.replicaSetToTokens[event.InvolvedObject.Name] + } + if tokens == nil || !ok { return } @@ -210,15 +299,15 @@ func (p *podEventLogger) init() error { // loggerForToken returns a logger for the given pod name and agent token. // If a logger already exists for the token, it's returned. Otherwise a new // logger is created and returned. -func (p *podEventLogger) sendLog(podName, token string, log agentsdk.StartupLog) { +func (p *podEventLogger) sendLog(resourceName, token string, log agentsdk.StartupLog) { logger, ok := p.agentTokenToLogger[token] if !ok { client := agentsdk.New(p.coderURL) client.SetSessionToken(token) - client.SDK.Logger = p.logger.Named(podName) + client.SDK.Logger = p.logger.Named(resourceName) sendLog, closer := client.QueueStartupLogs(p.ctx, p.logDebounce) - logger = agentLogger{ + logger = &agentLogger{ sendLog: sendLog, closer: closer, closeTimer: time.AfterFunc(p.logDebounce*5, func() { @@ -226,7 +315,7 @@ func (p *podEventLogger) sendLog(podName, token string, log agentsdk.StartupLog) // We want to have two close cycles for loggers! err := closer.Close() if err != nil { - p.logger.Error(p.ctx, "close agent logger", slog.Error(err), slog.F("pod", podName)) + p.logger.Error(p.ctx, "close agent logger", slog.Error(err), slog.F("pod", resourceName)) } p.mutex.Lock() delete(p.agentTokenToLogger, token) @@ -239,7 +328,7 @@ func (p *podEventLogger) sendLog(podName, token string, log agentsdk.StartupLog) // If the logger was already closed, we await the close before // creating a new logger. This is to ensure all loggers get sent in order! _ = logger.closer.Close() - p.sendLog(podName, token, log) + p.sendLog(resourceName, token, log) return } // We make this 5x the debounce because it's low-cost to persist a few diff --git a/logger_test.go b/logger_test.go index d635528..7da27fb 100644 --- a/logger_test.go +++ b/logger_test.go @@ -13,12 +13,103 @@ import ( "github.com/coder/coder/codersdk/agentsdk" "github.com/stretchr/testify/require" "github.com/zeebo/assert" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" ) -func TestPodEventLogger(t *testing.T) { +func TestReplicaSetEvents(t *testing.T) { + t.Parallel() + + queued := make(chan agentsdk.PatchStartupLogs, 1) + agent := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var req agentsdk.PatchStartupLogs + err := json.NewDecoder(r.Body).Decode(&req) + assert.NoError(t, err) + queued <- req + })) + agentURL, err := url.Parse(agent.URL) + require.NoError(t, err) + namespace := "test-namespace" + client := fake.NewSimpleClientset() + ctx := context.Background() + reporter, err := newPodEventLogger(ctx, podEventLoggerOptions{ + client: client, + coderURL: agentURL, + namespace: namespace, + logger: slogtest.Make(t, nil), + logDebounce: time.Millisecond, + }) + require.NoError(t, err) + + rs := &appsv1.ReplicaSet{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-rs", + CreationTimestamp: v1.Time{ + Time: time.Now().Add(time.Hour), + }, + }, + Spec: appsv1.ReplicaSetSpec{ + Template: corev1.PodTemplateSpec{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-pod", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Env: []corev1.EnvVar{ + { + Name: "CODER_AGENT_TOKEN", + Value: "test-token", + }, + }, + }}, + }, + }, + }, + } + _, err = client.AppsV1().ReplicaSets(namespace).Create(ctx, rs, v1.CreateOptions{}) + require.NoError(t, err) + + log := <-queued + require.Len(t, log.Logs, 1) + require.Contains(t, log.Logs[0].Output, "Queued pod from ReplicaSet") + + event := &corev1.Event{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-event", + CreationTimestamp: v1.Time{ + Time: time.Now().Add(time.Hour), + }, + }, + InvolvedObject: corev1.ObjectReference{ + Kind: "Pod", + Name: "test-pod", + }, + Reason: "Test", + Message: "Test event", + } + _, err = client.CoreV1().Events(namespace).Create(ctx, event, v1.CreateOptions{}) + require.NoError(t, err) + + log = <-queued + require.Len(t, log.Logs, 1) + require.Contains(t, log.Logs[0].Output, event.Message) + + err = client.AppsV1().ReplicaSets(namespace).Delete(ctx, rs.Name, v1.DeleteOptions{}) + require.NoError(t, err) + + require.Eventually(t, func() bool { + reporter.mutex.Lock() + defer reporter.mutex.Unlock() + return len(reporter.podToAgentTokens) == 0 && len(reporter.replicaSetToTokens) == 0 + }, time.Second, time.Millisecond) + + err = reporter.Close() + require.NoError(t, err) +} + +func TestPodEvents(t *testing.T) { t.Parallel() queued := make(chan agentsdk.PatchStartupLogs, 1) diff --git a/templates/service.yaml b/templates/service.yaml index 547aab9..b215cbb 100644 --- a/templates/service.yaml +++ b/templates/service.yaml @@ -6,6 +6,9 @@ rules: - apiGroups: [""] resources: ["pods", "events"] verbs: ["get", "watch", "list"] +- apiGroups: ["apps"] + resources: ["replicasets", "events"] + verbs: ["get", "watch", "list"] --- apiVersion: v1 kind: ServiceAccount From 9b3d780db57e05335ce7c73218d69688b0a45d2b Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 6 Jul 2023 15:57:44 +0000 Subject: [PATCH 2/2] Fix replicaset tests event --- logger_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/logger_test.go b/logger_test.go index 7da27fb..a3f95ed 100644 --- a/logger_test.go +++ b/logger_test.go @@ -83,8 +83,8 @@ func TestReplicaSetEvents(t *testing.T) { }, }, InvolvedObject: corev1.ObjectReference{ - Kind: "Pod", - Name: "test-pod", + Kind: "ReplicaSet", + Name: "test-rs", }, Reason: "Test", Message: "Test event",