Skip to content

Commit df2af7f

Browse files
authored
Revert "Fixes for consumer group (#1022)"
This reverts commit a5f270d.
1 parent a5f270d commit df2af7f

File tree

8 files changed

+9
-284
lines changed

8 files changed

+9
-284
lines changed

consumergroup.go

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -432,15 +432,8 @@ func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error {
432432
Topics: topics,
433433
}
434434

435-
resp, err := g.coord.offsetCommit(genCtx{g}, request)
435+
_, err := g.coord.offsetCommit(genCtx{g}, request)
436436
if err == nil {
437-
for _, partitions := range resp.Topics {
438-
for _, partition := range partitions {
439-
if partition.Error != nil {
440-
return partition.Error
441-
}
442-
}
443-
}
444437
// if logging is enabled, print out the partitions that were committed.
445438
g.log(func(l Logger) {
446439
var report []string
@@ -477,17 +470,14 @@ func (g *Generation) heartbeatLoop(interval time.Duration) {
477470
case <-ctx.Done():
478471
return
479472
case <-ticker.C:
480-
resp, err := g.coord.heartbeat(ctx, &HeartbeatRequest{
473+
_, err := g.coord.heartbeat(ctx, &HeartbeatRequest{
481474
GroupID: g.GroupID,
482475
GenerationID: g.ID,
483476
MemberID: g.MemberID,
484477
})
485478
if err != nil {
486479
return
487480
}
488-
if resp.Error != nil {
489-
return
490-
}
491481
}
492482
}
493483
})
@@ -1101,9 +1091,6 @@ func (cg *ConsumerGroup) fetchOffsets(subs map[string][]int) (map[string]map[int
11011091
for topic, offsets := range offsets.Topics {
11021092
offsetsByPartition := map[int]int64{}
11031093
for _, pr := range offsets {
1104-
if pr.Error != nil {
1105-
return nil, pr.Error
1106-
}
11071094
if pr.CommittedOffset < 0 {
11081095
pr.CommittedOffset = cg.config.StartOffset
11091096
}
@@ -1150,17 +1137,14 @@ func (cg *ConsumerGroup) leaveGroup(ctx context.Context, memberID string) error
11501137
log.Printf("Leaving group %s, member %s", cg.config.ID, memberID)
11511138
})
11521139

1153-
resp, err := cg.coord.leaveGroup(ctx, &LeaveGroupRequest{
1140+
_, err := cg.coord.leaveGroup(ctx, &LeaveGroupRequest{
11541141
GroupID: cg.config.ID,
11551142
Members: []LeaveGroupRequestMember{
11561143
{
11571144
ID: memberID,
11581145
},
11591146
},
11601147
})
1161-
if err == nil && resp.Error != nil {
1162-
err = resp.Error
1163-
}
11641148
if err != nil {
11651149
cg.withErrorLogger(func(log Logger) {
11661150
log.Printf("leave group failed for group, %v, and member, %v: %v", cg.config.ID, memberID, err)

consumergroup_test.go

Lines changed: 0 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -606,96 +606,3 @@ func TestGenerationStartsFunctionAfterClosed(t *testing.T) {
606606
}
607607
}
608608
}
609-
610-
func TestGenerationEndsOnHeartbeatError(t *testing.T) {
611-
gen := Generation{
612-
coord: &mockCoordinator{
613-
heartbeatFunc: func(context.Context, *HeartbeatRequest) (*HeartbeatResponse, error) {
614-
return nil, errors.New("some error")
615-
},
616-
},
617-
done: make(chan struct{}),
618-
joined: make(chan struct{}),
619-
log: func(func(Logger)) {},
620-
logError: func(func(Logger)) {},
621-
}
622-
623-
ch := make(chan error)
624-
gen.Start(func(ctx context.Context) {
625-
<-ctx.Done()
626-
ch <- ctx.Err()
627-
})
628-
629-
gen.heartbeatLoop(time.Millisecond)
630-
631-
select {
632-
case <-time.After(time.Second):
633-
t.Fatal("timed out waiting for func to run")
634-
case err := <-ch:
635-
if !errors.Is(err, ErrGenerationEnded) {
636-
t.Fatalf("expected %v but got %v", ErrGenerationEnded, err)
637-
}
638-
}
639-
}
640-
641-
func TestGenerationEndsOnHeartbeatRebalaceInProgress(t *testing.T) {
642-
gen := Generation{
643-
coord: &mockCoordinator{
644-
heartbeatFunc: func(context.Context, *HeartbeatRequest) (*HeartbeatResponse, error) {
645-
return &HeartbeatResponse{
646-
Error: makeError(int16(RebalanceInProgress), ""),
647-
}, nil
648-
},
649-
},
650-
done: make(chan struct{}),
651-
joined: make(chan struct{}),
652-
log: func(func(Logger)) {},
653-
logError: func(func(Logger)) {},
654-
}
655-
656-
ch := make(chan error)
657-
gen.Start(func(ctx context.Context) {
658-
<-ctx.Done()
659-
ch <- ctx.Err()
660-
})
661-
662-
gen.heartbeatLoop(time.Millisecond)
663-
664-
select {
665-
case <-time.After(time.Second):
666-
t.Fatal("timed out waiting for func to run")
667-
case err := <-ch:
668-
if !errors.Is(err, ErrGenerationEnded) {
669-
t.Fatalf("expected %v but got %v", ErrGenerationEnded, err)
670-
}
671-
}
672-
}
673-
674-
func TestGenerationOffsetCommitErrorsAreReturned(t *testing.T) {
675-
mc := mockCoordinator{
676-
offsetCommitFunc: func(context.Context, *OffsetCommitRequest) (*OffsetCommitResponse, error) {
677-
return &OffsetCommitResponse{
678-
Topics: map[string][]OffsetCommitPartition{
679-
"topic": {
680-
{
681-
Error: ErrGenerationEnded,
682-
},
683-
},
684-
},
685-
}, nil
686-
},
687-
}
688-
gen := Generation{
689-
coord: mc,
690-
log: func(func(Logger)) {},
691-
}
692-
693-
err := gen.CommitOffsets(map[string]map[int]int64{
694-
"topic": {
695-
0: 100,
696-
},
697-
})
698-
if err == nil {
699-
t.Fatal("got nil from CommitOffsets when expecting an error")
700-
}
701-
}

joingroup.go

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@ package kafka
33
import (
44
"bufio"
55
"context"
6-
"errors"
76
"fmt"
8-
"io"
97
"net"
108
"time"
119

@@ -165,9 +163,7 @@ func (c *Client) JoinGroup(ctx context.Context, req *JoinGroupRequest) (*JoinGro
165163

166164
for _, member := range r.Members {
167165
var meta consumer.Subscription
168-
metaVersion := makeInt16(member.Metadata[0:2])
169-
err = protocol.Unmarshal(member.Metadata, metaVersion, &meta)
170-
err = joinGroupSubscriptionMetaError(err, metaVersion)
166+
err = protocol.Unmarshal(member.Metadata, consumer.MaxVersionSupported, &meta)
171167
if err != nil {
172168
return nil, fmt.Errorf("kafka.(*Client).JoinGroup: %w", err)
173169
}
@@ -192,16 +188,6 @@ func (c *Client) JoinGroup(ctx context.Context, req *JoinGroupRequest) (*JoinGro
192188
return res, nil
193189
}
194190

195-
// sarama indicates there are some misbehaving clients out there that
196-
// set the version as 1 but don't include the OwnedPartitions section
197-
// https://github.com/Shopify/sarama/blob/610514edec1825240d59b62e4d7f1aba4b1fa000/consumer_group_members.go#L43
198-
func joinGroupSubscriptionMetaError(err error, version int16) error {
199-
if version >= 1 && errors.Is(err, io.ErrUnexpectedEOF) {
200-
return nil
201-
}
202-
return err
203-
}
204-
205191
type groupMetadata struct {
206192
Version int16
207193
Topics []string

joingroup_test.go

Lines changed: 0 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,10 @@ import (
55
"bytes"
66
"context"
77
"errors"
8-
"net"
98
"reflect"
109
"testing"
1110
"time"
1211

13-
"github.com/segmentio/kafka-go/protocol"
14-
"github.com/segmentio/kafka-go/protocol/consumer"
15-
"github.com/segmentio/kafka-go/protocol/joingroup"
1612
ktesting "github.com/segmentio/kafka-go/testing"
1713
)
1814

@@ -128,84 +124,6 @@ func TestClientJoinGroup(t *testing.T) {
128124
}
129125
}
130126

131-
type roundTripFn func(context.Context, net.Addr, Request) (Response, error)
132-
133-
func (f roundTripFn) RoundTrip(ctx context.Context, addr net.Addr, req Request) (Response, error) {
134-
return f(ctx, addr, req)
135-
}
136-
137-
// https://github.com/Shopify/sarama/blob/610514edec1825240d59b62e4d7f1aba4b1fa000/consumer_group_members.go#L43
138-
func TestClientJoinGroupSaramaCompatibility(t *testing.T) {
139-
subscription := consumer.Subscription{
140-
Version: 1,
141-
Topics: []string{"topic"},
142-
}
143-
144-
// Marhsal as Verzon 0 (Without OwnedPartitions) but
145-
// with Version=1.
146-
metadata, err := protocol.Marshal(0, subscription)
147-
if err != nil {
148-
t.Fatalf("failed to marshal subscription %v", err)
149-
}
150-
151-
client := &Client{
152-
Addr: TCP("fake:9092"),
153-
Transport: roundTripFn(func(context.Context, net.Addr, Request) (Response, error) {
154-
resp := joingroup.Response{
155-
ProtocolType: "consumer",
156-
ProtocolName: RoundRobinGroupBalancer{}.ProtocolName(),
157-
LeaderID: "member",
158-
MemberID: "member",
159-
Members: []joingroup.ResponseMember{
160-
{
161-
MemberID: "member",
162-
Metadata: metadata,
163-
},
164-
},
165-
}
166-
return &resp, nil
167-
}),
168-
}
169-
170-
expResp := JoinGroupResponse{
171-
ProtocolName: RoundRobinGroupBalancer{}.ProtocolName(),
172-
ProtocolType: "consumer",
173-
LeaderID: "member",
174-
MemberID: "member",
175-
Members: []JoinGroupResponseMember{
176-
{
177-
ID: "member",
178-
Metadata: GroupProtocolSubscription{
179-
Topics: []string{"topic"},
180-
OwnedPartitions: map[string][]int{},
181-
},
182-
},
183-
},
184-
}
185-
186-
gotResp, err := client.JoinGroup(context.Background(), &JoinGroupRequest{
187-
GroupID: "group",
188-
MemberID: "member",
189-
ProtocolType: "consumer",
190-
Protocols: []GroupProtocol{
191-
{
192-
Name: RoundRobinGroupBalancer{}.ProtocolName(),
193-
Metadata: GroupProtocolSubscription{
194-
Topics: []string{"topic"},
195-
UserData: metadata,
196-
},
197-
},
198-
},
199-
})
200-
if err != nil {
201-
t.Fatalf("error calling JoinGroup: %v", err)
202-
}
203-
204-
if !reflect.DeepEqual(expResp, *gotResp) {
205-
t.Fatalf("unexpected JoinGroup resp\nexpected: %#v\n got: %#v", expResp, *gotResp)
206-
}
207-
}
208-
209127
func TestSaramaCompatibility(t *testing.T) {
210128
var (
211129
// sample data from github.com/Shopify/sarama

protocol/heartbeat/heartbeat.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ type Response struct {
2727
// type.
2828
_ struct{} `kafka:"min=v4,max=v4,tag"`
2929

30-
ThrottleTimeMs int32 `kafka:"min=v1,max=v4"`
3130
ErrorCode int16 `kafka:"min=v0,max=v4"`
31+
ThrottleTimeMs int32 `kafka:"min=v1,max=v4"`
3232
}
3333

3434
func (r *Response) ApiKey() protocol.ApiKey {

reader_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1684,7 +1684,7 @@ func TestConsumerGroupMultipleWithDefaultTransport(t *testing.T) {
16841684
recvErr2 <- err
16851685
}()
16861686

1687-
time.Sleep(conf1.MaxWait * 5)
1687+
time.Sleep(conf1.MaxWait)
16881688

16891689
totalMessages := 10
16901690

syncgroup.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -127,13 +127,9 @@ func (c *Client) SyncGroup(ctx context.Context, req *SyncGroupRequest) (*SyncGro
127127
r := m.(*syncgroup.Response)
128128

129129
var assignment consumer.Assignment
130-
var metaVersion int16
131-
if len(r.Assignments) > 2 {
132-
metaVersion = makeInt16(r.Assignments[0:2])
133-
err = protocol.Unmarshal(r.Assignments, metaVersion, &assignment)
134-
if err != nil {
135-
return nil, fmt.Errorf("kafka.(*Client).SyncGroup: %w", err)
136-
}
130+
err = protocol.Unmarshal(r.Assignments, consumer.MaxVersionSupported, &assignment)
131+
if err != nil {
132+
return nil, fmt.Errorf("kafka.(*Client).SyncGroup: %w", err)
137133
}
138134

139135
res := &SyncGroupResponse{

0 commit comments

Comments
 (0)