Skip to content

Commit fd64eaa

Browse files
authored
Add DeleteGroups function to Client (#1095)
* Add DeleteGroups function to Client * goimports * set minimum Kafka version * goimports * improvements
1 parent 2304d4a commit fd64eaa

File tree

4 files changed

+218
-0
lines changed

4 files changed

+218
-0
lines changed

deletegroups.go

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"time"
8+
9+
"github.com/segmentio/kafka-go/protocol/deletegroups"
10+
)
11+
12+
// DeleteGroupsRequest represents a request sent to a kafka broker to delete
13+
// consumer groups.
14+
type DeleteGroupsRequest struct {
15+
// Address of the kafka broker to send the request to.
16+
Addr net.Addr
17+
18+
// Identifiers of groups to delete.
19+
GroupIDs []string
20+
}
21+
22+
// DeleteGroupsResponse represents a response from a kafka broker to a consumer group
23+
// deletion request.
24+
type DeleteGroupsResponse struct {
25+
// The amount of time that the broker throttled the request.
26+
Throttle time.Duration
27+
28+
// Mapping of group ids to errors that occurred while attempting to delete those groups.
29+
//
30+
// The errors contain the kafka error code. Programs may use the standard
31+
// errors.Is function to test the error against kafka error codes.
32+
Errors map[string]error
33+
}
34+
35+
// DeleteGroups sends a delete groups request and returns the response. The request is sent to the group coordinator of the first group
36+
// of the request. All deleted groups must be managed by the same group coordinator.
37+
func (c *Client) DeleteGroups(
38+
ctx context.Context,
39+
req *DeleteGroupsRequest,
40+
) (*DeleteGroupsResponse, error) {
41+
m, err := c.roundTrip(ctx, req.Addr, &deletegroups.Request{
42+
GroupIDs: req.GroupIDs,
43+
})
44+
if err != nil {
45+
return nil, fmt.Errorf("kafka.(*Client).DeleteGroups: %w", err)
46+
}
47+
48+
r := m.(*deletegroups.Response)
49+
50+
ret := &DeleteGroupsResponse{
51+
Throttle: makeDuration(r.ThrottleTimeMs),
52+
Errors: make(map[string]error, len(r.Responses)),
53+
}
54+
55+
for _, t := range r.Responses {
56+
ret.Errors[t.GroupID] = makeError(t.ErrorCode, "")
57+
}
58+
59+
return ret, nil
60+
}

deletegroups_test.go

+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
"time"
8+
9+
ktesting "github.com/segmentio/kafka-go/testing"
10+
)
11+
12+
func TestClientDeleteGroups(t *testing.T) {
13+
if !ktesting.KafkaIsAtLeast("1.1.0") {
14+
t.Skip("Skipping test because kafka version is not high enough.")
15+
}
16+
17+
client, shutdown := newLocalClient()
18+
defer shutdown()
19+
20+
topic := makeTopic()
21+
createTopic(t, topic, 1)
22+
23+
groupID := makeGroupID()
24+
25+
group, err := NewConsumerGroup(ConsumerGroupConfig{
26+
ID: groupID,
27+
Topics: []string{topic},
28+
Brokers: []string{"localhost:9092"},
29+
HeartbeatInterval: 2 * time.Second,
30+
RebalanceTimeout: 2 * time.Second,
31+
RetentionTime: time.Hour,
32+
Logger: &testKafkaLogger{T: t},
33+
})
34+
if err != nil {
35+
t.Fatal(err)
36+
}
37+
defer group.Close()
38+
39+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
40+
defer cancel()
41+
42+
gen, err := group.Next(ctx)
43+
if gen == nil {
44+
t.Fatalf("expected generation 1 not to be nil")
45+
}
46+
if err != nil {
47+
t.Fatalf("expected no error, but got %+v", err)
48+
}
49+
50+
// delete not empty group
51+
res, err := client.DeleteGroups(ctx, &DeleteGroupsRequest{
52+
GroupIDs: []string{groupID},
53+
})
54+
55+
if err != nil {
56+
t.Fatal(err)
57+
}
58+
59+
if !errors.Is(res.Errors[groupID], NonEmptyGroup) {
60+
t.Fatalf("expected NonEmptyGroup error, but got %+v", res.Errors[groupID])
61+
}
62+
63+
err = group.Close()
64+
if err != nil {
65+
t.Fatal(err)
66+
}
67+
68+
// delete empty group
69+
res, err = client.DeleteGroups(ctx, &DeleteGroupsRequest{
70+
GroupIDs: []string{groupID},
71+
})
72+
73+
if err != nil {
74+
t.Fatal(err)
75+
}
76+
77+
if err = res.Errors[groupID]; err != nil {
78+
t.Error(err)
79+
}
80+
}

protocol/deletegroups/deletegroups.go

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package deletegroups
2+
3+
import "github.com/segmentio/kafka-go/protocol"
4+
5+
func init() {
6+
protocol.Register(&Request{}, &Response{})
7+
}
8+
9+
type Request struct {
10+
// We need at least one tagged field to indicate that this is a "flexible" message
11+
// type.
12+
_ struct{} `kafka:"min=v2,max=v2,tag"`
13+
14+
GroupIDs []string `kafka:"min=v0,max=v2"`
15+
}
16+
17+
func (r *Request) Group() string {
18+
// use first group to determine group coordinator
19+
if len(r.GroupIDs) > 0 {
20+
return r.GroupIDs[0]
21+
}
22+
return ""
23+
}
24+
25+
func (r *Request) ApiKey() protocol.ApiKey { return protocol.DeleteGroups }
26+
27+
var (
28+
_ protocol.GroupMessage = (*Request)(nil)
29+
)
30+
31+
type Response struct {
32+
// We need at least one tagged field to indicate that this is a "flexible" message
33+
// type.
34+
_ struct{} `kafka:"min=v2,max=v2,tag"`
35+
36+
ThrottleTimeMs int32 `kafka:"min=v0,max=v2"`
37+
Responses []ResponseGroup `kafka:"min=v0,max=v2"`
38+
}
39+
40+
func (r *Response) ApiKey() protocol.ApiKey { return protocol.DeleteGroups }
41+
42+
type ResponseGroup struct {
43+
GroupID string `kafka:"min=v0,max=v2"`
44+
ErrorCode int16 `kafka:"min=v0,max=v2"`
45+
}
+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package deletegroups_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/segmentio/kafka-go/protocol/deletegroups"
7+
"github.com/segmentio/kafka-go/protocol/prototest"
8+
)
9+
10+
func TestDeleteGroupsRequest(t *testing.T) {
11+
for _, version := range []int16{0, 1, 2} {
12+
prototest.TestRequest(t, version, &deletegroups.Request{
13+
GroupIDs: []string{"group1", "group2"},
14+
})
15+
}
16+
}
17+
18+
func TestDeleteGroupsResponse(t *testing.T) {
19+
for _, version := range []int16{0, 1, 2} {
20+
prototest.TestResponse(t, version, &deletegroups.Response{
21+
Responses: []deletegroups.ResponseGroup{
22+
{
23+
GroupID: "group1",
24+
ErrorCode: 0,
25+
},
26+
{
27+
GroupID: "group2",
28+
ErrorCode: 1,
29+
},
30+
},
31+
})
32+
}
33+
}

0 commit comments

Comments
 (0)