Skip to content

Commit 88722fc

Browse files
authored
contrib/segmentio/kafka.go.v0: refactor tracing code (DataDog#2885)
1 parent 93311db commit 88722fc

File tree

18 files changed

+770
-457
lines changed

18 files changed

+770
-457
lines changed

.github/workflows/unit-integration-tests.yml

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -175,20 +175,24 @@ jobs:
175175
image: memcached:1.5.9
176176
ports:
177177
- 11211:11211
178-
zookeeper:
179-
image: bitnami/zookeeper:latest
180-
env:
181-
ALLOW_ANONYMOUS_LOGIN: "yes"
182-
ports:
183-
- 2181:2181
184178
kafka:
185-
image: darccio/kafka:2.13-2.8.1
179+
image: confluentinc/confluent-local:7.5.0
186180
env:
187-
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
188-
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
189-
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
190-
KAFKA_CREATE_TOPICS: gotest:1:1,gosegtest:1:1
191-
KAFKA_BROKER_ID: 1
181+
KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094"
182+
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9093,BROKER://localhost:9092"
183+
KAFKA_REST_BOOTSTRAP_SERVERS: "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092"
184+
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@localhost:9094"
185+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"
186+
KAFKA_INTER_BROKER_LISTENER_NAME: "BROKER"
187+
KAFKA_BROKER_ID: "1"
188+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
189+
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: "1"
190+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: "1"
191+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: "1"
192+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: "0"
193+
KAFKA_NODE_ID: "1"
194+
KAFKA_PROCESS_ROLES: "broker,controller"
195+
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
192196
ports:
193197
- 9092:9092
194198
localstack:

contrib/cloud.google.com/go/pubsub.v1/internal/tracing/tracing.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,13 @@
33
// This product includes software developed at Datadog (https://www.datadoghq.com/).
44
// Copyright 2024 Datadog, Inc.
55

6+
// Package tracing contains tracing logic for the cloud.google.com/go/pubsub.v1 instrumentation.
7+
//
8+
// WARNING: this package SHOULD NOT import cloud.google.com/go/pubsub.
9+
//
10+
// The motivation of this package is to support orchestrion, which cannot use the main package because it imports
11+
// the cloud.google.com/go/pubsub package, and since orchestrion modifies the library code itself,
12+
// this would cause an import cycle.
613
package tracing
714

815
import (

contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -179,30 +179,6 @@ func TestConsumerChannel(t *testing.T) {
179179
}
180180
}
181181

182-
/*
183-
to run the integration test locally:
184-
185-
docker network create confluent
186-
187-
docker run --rm \
188-
--name zookeeper \
189-
--network confluent \
190-
-p 2181:2181 \
191-
-e ZOOKEEPER_CLIENT_PORT=2181 \
192-
confluentinc/cp-zookeeper:5.0.0
193-
194-
docker run --rm \
195-
--name kafka \
196-
--network confluent \
197-
-p 9092:9092 \
198-
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
199-
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
200-
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
201-
-e KAFKA_CREATE_TOPICS=gotest:1:1 \
202-
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
203-
confluentinc/cp-kafka:5.0.0
204-
*/
205-
206182
func TestConsumerFunctional(t *testing.T) {
207183
for _, tt := range []struct {
208184
name string

contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -179,30 +179,6 @@ func TestConsumerChannel(t *testing.T) {
179179
}
180180
}
181181

182-
/*
183-
to run the integration test locally:
184-
185-
docker network create confluent
186-
187-
docker run --rm \
188-
--name zookeeper \
189-
--network confluent \
190-
-p 2181:2181 \
191-
-e ZOOKEEPER_CLIENT_PORT=2181 \
192-
confluentinc/cp-zookeeper:5.0.0
193-
194-
docker run --rm \
195-
--name kafka \
196-
--network confluent \
197-
-p 9092:9092 \
198-
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
199-
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
200-
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
201-
-e KAFKA_CREATE_TOPICS=gotest:1:1 \
202-
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
203-
confluentinc/cp-kafka:5.0.0
204-
*/
205-
206182
func TestConsumerFunctional(t *testing.T) {
207183
for _, tt := range []struct {
208184
name string

contrib/segmentio/kafka.go.v0/example_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
kafkatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0"
1414
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
1515

16-
kafka "github.com/segmentio/kafka-go"
16+
"github.com/segmentio/kafka-go"
1717
)
1818

1919
func ExampleWriter() {

contrib/segmentio/kafka.go.v0/headers.go

Lines changed: 4 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -6,49 +6,14 @@
66
package kafka
77

88
import (
9+
"github.com/segmentio/kafka-go"
10+
11+
"gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing"
912
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
1013
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
11-
12-
"github.com/segmentio/kafka-go"
1314
)
1415

15-
// A messageCarrier implements TextMapReader/TextMapWriter for extracting/injecting traces on a kafka.Message
16-
type messageCarrier struct {
17-
msg *kafka.Message
18-
}
19-
20-
var _ interface {
21-
tracer.TextMapReader
22-
tracer.TextMapWriter
23-
} = (*messageCarrier)(nil)
24-
25-
// ForeachKey conforms to the TextMapReader interface.
26-
func (c messageCarrier) ForeachKey(handler func(key, val string) error) error {
27-
for _, h := range c.msg.Headers {
28-
err := handler(h.Key, string(h.Value))
29-
if err != nil {
30-
return err
31-
}
32-
}
33-
return nil
34-
}
35-
36-
// Set implements TextMapWriter
37-
func (c messageCarrier) Set(key, val string) {
38-
// ensure uniqueness of keys
39-
for i := 0; i < len(c.msg.Headers); i++ {
40-
if string(c.msg.Headers[i].Key) == key {
41-
c.msg.Headers = append(c.msg.Headers[:i], c.msg.Headers[i+1:]...)
42-
i--
43-
}
44-
}
45-
c.msg.Headers = append(c.msg.Headers, kafka.Header{
46-
Key: key,
47-
Value: []byte(val),
48-
})
49-
}
50-
5116
// ExtractSpanContext retrieves the SpanContext from a kafka.Message
5217
func ExtractSpanContext(msg kafka.Message) (ddtrace.SpanContext, error) {
53-
return tracer.Extract(messageCarrier{&msg})
18+
return tracer.Extract(tracing.NewMessageCarrier(wrapMessage(&msg)))
5419
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2024 Datadog, Inc.
5+
6+
package tracing
7+
8+
import (
9+
"context"
10+
11+
"gopkg.in/DataDog/dd-trace-go.v1/datastreams"
12+
"gopkg.in/DataDog/dd-trace-go.v1/datastreams/options"
13+
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
14+
)
15+
16+
func (tr *Tracer) SetConsumeDSMCheckpoint(msg Message) {
17+
if !tr.dataStreamsEnabled || msg == nil {
18+
return
19+
}
20+
edges := []string{"direction:in", "topic:" + msg.GetTopic(), "type:kafka"}
21+
if tr.kafkaCfg.ConsumerGroupID != "" {
22+
edges = append(edges, "group:"+tr.kafkaCfg.ConsumerGroupID)
23+
}
24+
carrier := NewMessageCarrier(msg)
25+
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(
26+
datastreams.ExtractFromBase64Carrier(context.Background(), carrier),
27+
options.CheckpointParams{PayloadSize: getConsumerMsgSize(msg)},
28+
edges...,
29+
)
30+
if !ok {
31+
return
32+
}
33+
datastreams.InjectToBase64Carrier(ctx, carrier)
34+
if tr.kafkaCfg.ConsumerGroupID != "" {
35+
// only track Kafka lag if a consumer group is set.
36+
// since there is no ack mechanism, we consider that messages read are committed right away.
37+
tracer.TrackKafkaCommitOffset(tr.kafkaCfg.ConsumerGroupID, msg.GetTopic(), int32(msg.GetPartition()), msg.GetOffset())
38+
}
39+
}
40+
41+
func (tr *Tracer) SetProduceDSMCheckpoint(msg Message, writer Writer) {
42+
if !tr.dataStreamsEnabled || msg == nil {
43+
return
44+
}
45+
46+
var topic string
47+
if writer.GetTopic() != "" {
48+
topic = writer.GetTopic()
49+
} else {
50+
topic = msg.GetTopic()
51+
}
52+
53+
edges := []string{"direction:out", "topic:" + topic, "type:kafka"}
54+
carrier := MessageCarrier{msg}
55+
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(
56+
datastreams.ExtractFromBase64Carrier(context.Background(), carrier),
57+
options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)},
58+
edges...,
59+
)
60+
if !ok {
61+
return
62+
}
63+
64+
// Headers will be dropped if the current protocol does not support them
65+
datastreams.InjectToBase64Carrier(ctx, carrier)
66+
}
67+
68+
func getProducerMsgSize(msg Message) (size int64) {
69+
for _, header := range msg.GetHeaders() {
70+
size += int64(len(header.GetKey()) + len(header.GetValue()))
71+
}
72+
if msg.GetValue() != nil {
73+
size += int64(len(msg.GetValue()))
74+
}
75+
if msg.GetKey() != nil {
76+
size += int64(len(msg.GetKey()))
77+
}
78+
return size
79+
}
80+
81+
func getConsumerMsgSize(msg Message) (size int64) {
82+
for _, header := range msg.GetHeaders() {
83+
size += int64(len(header.GetKey()) + len(header.GetValue()))
84+
}
85+
return size + int64(len(msg.GetValue())+len(msg.GetKey()))
86+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2024 Datadog, Inc.
5+
6+
package tracing
7+
8+
import (
9+
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
10+
)
11+
12+
// A MessageCarrier implements TextMapReader/TextMapWriter for extracting/injecting traces on a kafka.Message
13+
type MessageCarrier struct {
14+
msg Message
15+
}
16+
17+
var _ interface {
18+
tracer.TextMapReader
19+
tracer.TextMapWriter
20+
} = (*MessageCarrier)(nil)
21+
22+
// ForeachKey conforms to the TextMapReader interface.
23+
func (c MessageCarrier) ForeachKey(handler func(key, val string) error) error {
24+
for _, h := range c.msg.GetHeaders() {
25+
err := handler(h.GetKey(), string(h.GetValue()))
26+
if err != nil {
27+
return err
28+
}
29+
}
30+
return nil
31+
}
32+
33+
// Set implements TextMapWriter
34+
func (c MessageCarrier) Set(key, val string) {
35+
headers := c.msg.GetHeaders()
36+
// ensure uniqueness of keys
37+
for i := 0; i < len(headers); i++ {
38+
if headers[i].GetKey() == key {
39+
headers = append(headers[:i], headers[i+1:]...)
40+
i--
41+
}
42+
}
43+
headers = append(headers, KafkaHeader{
44+
Key: key,
45+
Value: []byte(val),
46+
})
47+
c.msg.SetHeaders(headers)
48+
}
49+
50+
func NewMessageCarrier(msg Message) MessageCarrier {
51+
return MessageCarrier{msg: msg}
52+
}

0 commit comments

Comments
 (0)