@@ -23,89 +23,14 @@ import (
2323 "github.com/confluentinc/confluent-kafka-go/v2/kafka"
2424 "github.com/stretchr/testify/assert"
2525 "github.com/stretchr/testify/require"
26+ "go.uber.org/goleak"
2627)
2728
2829var (
2930 testGroupID = "gotest"
3031 testTopic = "gotest"
3132)
3233
33- type consumerActionFn func (c * Consumer ) (* kafka.Message , error )
34-
35- func produceThenConsume (t * testing.T , consumerAction consumerActionFn , producerOpts []Option , consumerOpts []Option ) ([]mocktracer.Span , * kafka.Message ) {
36- if _ , ok := os .LookupEnv ("INTEGRATION" ); ! ok {
37- t .Skip ("to enable integration test, set the INTEGRATION environment variable" )
38- }
39- mt := mocktracer .Start ()
40- defer mt .Stop ()
41-
42- // first write a message to the topic
43- p , err := NewProducer (& kafka.ConfigMap {
44- "bootstrap.servers" : "127.0.0.1:9092" ,
45- "go.delivery.reports" : true ,
46- }, producerOpts ... )
47- require .NoError (t , err )
48-
49- delivery := make (chan kafka.Event , 1 )
50- err = p .Produce (& kafka.Message {
51- TopicPartition : kafka.TopicPartition {
52- Topic : & testTopic ,
53- Partition : 0 ,
54- },
55- Key : []byte ("key2" ),
56- Value : []byte ("value2" ),
57- }, delivery )
58- require .NoError (t , err )
59-
60- msg1 , _ := (<- delivery ).(* kafka.Message )
61- p .Close ()
62-
63- // next attempt to consume the message
64- c , err := NewConsumer (& kafka.ConfigMap {
65- "group.id" : testGroupID ,
66- "bootstrap.servers" : "127.0.0.1:9092" ,
67- "fetch.wait.max.ms" : 500 ,
68- "socket.timeout.ms" : 1500 ,
69- "session.timeout.ms" : 1500 ,
70- "enable.auto.offset.store" : false ,
71- }, consumerOpts ... )
72- require .NoError (t , err )
73-
74- err = c .Assign ([]kafka.TopicPartition {
75- {Topic : & testTopic , Partition : 0 , Offset : msg1 .TopicPartition .Offset },
76- })
77- require .NoError (t , err )
78-
79- msg2 , err := consumerAction (c )
80- require .NoError (t , err )
81- _ , err = c .CommitMessage (msg2 )
82- require .NoError (t , err )
83- assert .Equal (t , msg1 .String (), msg2 .String ())
84- err = c .Close ()
85- require .NoError (t , err )
86-
87- spans := mt .FinishedSpans ()
88- require .Len (t , spans , 2 )
89- // they should be linked via headers
90- assert .Equal (t , spans [0 ].TraceID (), spans [1 ].TraceID ())
91-
92- if c .cfg .dataStreamsEnabled {
93- backlogs := mt .SentDSMBacklogs ()
94- toMap := func (b []internaldsm.Backlog ) map [string ]struct {} {
95- m := make (map [string ]struct {})
96- for _ , b := range backlogs {
97- m [strings .Join (b .Tags , "" )] = struct {}{}
98- }
99- return m
100- }
101- backlogsMap := toMap (backlogs )
102- require .Contains (t , backlogsMap , "consumer_group:" + testGroupID + "partition:0" + "topic:" + testTopic + "type:kafka_commit" )
103- require .Contains (t , backlogsMap , "partition:0" + "topic:" + testTopic + "type:kafka_high_watermark" )
104- require .Contains (t , backlogsMap , "partition:0" + "topic:" + testTopic + "type:kafka_produce" )
105- }
106- return spans , msg2
107- }
108-
10934func TestConsumerChannel (t * testing.T ) {
11035 // we can test consuming via the Events channel by artifically sending
11136 // messages. Testing .Poll is done via an integration test.
@@ -320,7 +245,7 @@ func TestCustomTags(t *testing.T) {
320245 "socket.timeout.ms" : 10 ,
321246 "session.timeout.ms" : 10 ,
322247 "enable.auto.offset.store" : false ,
323- }, WithCustomTag ("foo" , func (msg * kafka.Message ) interface {} {
248+ }, WithCustomTag ("foo" , func (_ * kafka.Message ) interface {} {
324249 return "bar"
325250 }), WithCustomTag ("key" , func (msg * kafka.Message ) interface {} {
326251 return msg .Key
@@ -370,3 +295,123 @@ func TestNamingSchema(t *testing.T) {
370295 }
371296 namingschematest .NewKafkaTest (genSpans )(t )
372297}
298+
299+ // Test we don't leak goroutines and properly close the span when Produce returns an error.
300+ func TestProduceError (t * testing.T ) {
301+ defer func () {
302+ err := goleak .Find ()
303+ if err != nil {
304+ // if a goroutine is leaking, ensure it is not coming from this package
305+ assert .NotContains (t , err .Error (), "contrib/confluentinc/confluent-kafka-go" )
306+ }
307+ }()
308+
309+ mt := mocktracer .Start ()
310+ defer mt .Stop ()
311+
312+ // first write a message to the topic
313+ p , err := NewProducer (& kafka.ConfigMap {
314+ "bootstrap.servers" : "127.0.0.1:9092" ,
315+ "go.delivery.reports" : true ,
316+ })
317+ require .NoError (t , err )
318+ defer p .Close ()
319+
320+ // this empty message should cause an error in the Produce call.
321+ topic := ""
322+ msg := & kafka.Message {
323+ TopicPartition : kafka.TopicPartition {
324+ Topic : & topic ,
325+ },
326+ }
327+ deliveryChan := make (chan kafka.Event , 1 )
328+ err = p .Produce (msg , deliveryChan )
329+ require .Error (t , err )
330+ require .EqualError (t , err , "Local: Invalid argument or configuration" )
331+
332+ select {
333+ case <- deliveryChan :
334+ assert .Fail (t , "there should be no events in the deliveryChan" )
335+ case <- time .After (1 * time .Second ):
336+ // assume there is no event
337+ }
338+
339+ spans := mt .FinishedSpans ()
340+ assert .Len (t , spans , 1 )
341+ }
342+
343+ type consumerActionFn func (c * Consumer ) (* kafka.Message , error )
344+
345+ func produceThenConsume (t * testing.T , consumerAction consumerActionFn , producerOpts []Option , consumerOpts []Option ) ([]mocktracer.Span , * kafka.Message ) {
346+ if _ , ok := os .LookupEnv ("INTEGRATION" ); ! ok {
347+ t .Skip ("to enable integration test, set the INTEGRATION environment variable" )
348+ }
349+ mt := mocktracer .Start ()
350+ defer mt .Stop ()
351+
352+ // first write a message to the topic
353+ p , err := NewProducer (& kafka.ConfigMap {
354+ "bootstrap.servers" : "127.0.0.1:9092" ,
355+ "go.delivery.reports" : true ,
356+ }, producerOpts ... )
357+ require .NoError (t , err )
358+
359+ delivery := make (chan kafka.Event , 1 )
360+ err = p .Produce (& kafka.Message {
361+ TopicPartition : kafka.TopicPartition {
362+ Topic : & testTopic ,
363+ Partition : 0 ,
364+ },
365+ Key : []byte ("key2" ),
366+ Value : []byte ("value2" ),
367+ }, delivery )
368+ require .NoError (t , err )
369+
370+ msg1 , _ := (<- delivery ).(* kafka.Message )
371+ p .Close ()
372+
373+ // next attempt to consume the message
374+ c , err := NewConsumer (& kafka.ConfigMap {
375+ "group.id" : testGroupID ,
376+ "bootstrap.servers" : "127.0.0.1:9092" ,
377+ "fetch.wait.max.ms" : 500 ,
378+ "socket.timeout.ms" : 1500 ,
379+ "session.timeout.ms" : 1500 ,
380+ "enable.auto.offset.store" : false ,
381+ }, consumerOpts ... )
382+ require .NoError (t , err )
383+
384+ err = c .Assign ([]kafka.TopicPartition {
385+ {Topic : & testTopic , Partition : 0 , Offset : msg1 .TopicPartition .Offset },
386+ })
387+ require .NoError (t , err )
388+
389+ msg2 , err := consumerAction (c )
390+ require .NoError (t , err )
391+ _ , err = c .CommitMessage (msg2 )
392+ require .NoError (t , err )
393+ assert .Equal (t , msg1 .String (), msg2 .String ())
394+ err = c .Close ()
395+ require .NoError (t , err )
396+
397+ spans := mt .FinishedSpans ()
398+ require .Len (t , spans , 2 )
399+ // they should be linked via headers
400+ assert .Equal (t , spans [0 ].TraceID (), spans [1 ].TraceID ())
401+
402+ if c .cfg .dataStreamsEnabled {
403+ backlogs := mt .SentDSMBacklogs ()
404+ toMap := func (_ []internaldsm.Backlog ) map [string ]struct {} {
405+ m := make (map [string ]struct {})
406+ for _ , b := range backlogs {
407+ m [strings .Join (b .Tags , "" )] = struct {}{}
408+ }
409+ return m
410+ }
411+ backlogsMap := toMap (backlogs )
412+ require .Contains (t , backlogsMap , "consumer_group:" + testGroupID + "partition:0" + "topic:" + testTopic + "type:kafka_commit" )
413+ require .Contains (t , backlogsMap , "partition:0" + "topic:" + testTopic + "type:kafka_high_watermark" )
414+ require .Contains (t , backlogsMap , "partition:0" + "topic:" + testTopic + "type:kafka_produce" )
415+ }
416+ return spans , msg2
417+ }
0 commit comments