From 73da3f25f3b548f5871ecbb25cc533bee18081a2 Mon Sep 17 00:00:00 2001 From: rhansen2 Date: Wed, 2 Nov 2022 11:48:17 -0700 Subject: [PATCH 01/10] parse join group response meta based on version in metadata --- joingroup.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/joingroup.go b/joingroup.go index 13adc71d2..8cb8fb500 100644 --- a/joingroup.go +++ b/joingroup.go @@ -3,7 +3,9 @@ package kafka import ( "bufio" "context" + "errors" "fmt" + "io" "net" "time" @@ -163,7 +165,9 @@ func (c *Client) JoinGroup(ctx context.Context, req *JoinGroupRequest) (*JoinGro for _, member := range r.Members { var meta consumer.Subscription - err = protocol.Unmarshal(member.Metadata, consumer.MaxVersionSupported, &meta) + metaVersion := makeInt16(member.Metadata[0:2]) + err = protocol.Unmarshal(member.Metadata, metaVersion, &meta) + err = joinGroupSubscriptionMetaError(err, metaVersion) if err != nil { return nil, fmt.Errorf("kafka.(*Client).JoinGroup: %w", err) } @@ -188,6 +192,16 @@ func (c *Client) JoinGroup(ctx context.Context, req *JoinGroupRequest) (*JoinGro return res, nil } +// sarama indicates there are some misbehaving clients out there that +// set the version as 1 but don't include the OwnedPartitions section +// https://github.com/Shopify/sarama/blob/610514edec1825240d59b62e4d7f1aba4b1fa000/consumer_group_members.go#L43 +func joinGroupSubscriptionMetaError(err error, version int16) error { + if version >= 1 && errors.Is(err, io.ErrUnexpectedEOF) { + return nil + } + return err +} + type groupMetadata struct { Version int16 Topics []string From abf0777cf7ab760f8c0bd9cd936dc9e6d5e90d5b Mon Sep 17 00:00:00 2001 From: rhansen2 Date: Wed, 2 Nov 2022 19:42:53 -0700 Subject: [PATCH 02/10] decode sync versionp properly --- syncgroup.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/syncgroup.go b/syncgroup.go index e649f0db9..173b4e4b9 100644 --- a/syncgroup.go +++ b/syncgroup.go @@ -127,7 +127,8 @@ func (c *Client) SyncGroup(ctx context.Context, req *SyncGroupRequest) (*SyncGro r := m.(*syncgroup.Response) var assignment consumer.Assignment - err = protocol.Unmarshal(r.Assignments, consumer.MaxVersionSupported, &assignment) + metaVersion := makeInt16(r.Assignments[0:2]) + err = protocol.Unmarshal(r.Assignments, metaVersion, &assignment) if err != nil { return nil, fmt.Errorf("kafka.(*Client).SyncGroup: %w", err) } From bed5be8e623ed2c837de538bb8669e164caa04a5 Mon Sep 17 00:00:00 2001 From: rhansen2 Date: Wed, 2 Nov 2022 20:28:17 -0700 Subject: [PATCH 03/10] check for assigment in syncgroup before unmarshaling --- syncgroup.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/syncgroup.go b/syncgroup.go index 173b4e4b9..9f4766376 100644 --- a/syncgroup.go +++ b/syncgroup.go @@ -127,10 +127,13 @@ func (c *Client) SyncGroup(ctx context.Context, req *SyncGroupRequest) (*SyncGro r := m.(*syncgroup.Response) var assignment consumer.Assignment - metaVersion := makeInt16(r.Assignments[0:2]) - err = protocol.Unmarshal(r.Assignments, metaVersion, &assignment) - if err != nil { - return nil, fmt.Errorf("kafka.(*Client).SyncGroup: %w", err) + var metaVersion int16 + if len(r.Assignments) > 2 { + metaVersion = makeInt16(r.Assignments[0:2]) + err = protocol.Unmarshal(r.Assignments, metaVersion, &assignment) + if err != nil { + return nil, fmt.Errorf("kafka.(*Client).SyncGroup: %w", err) + } } res := &SyncGroupResponse{ From 57038f4b25663865cae39691d5c722895b355f16 Mon Sep 17 00:00:00 2001 From: rhansen2 Date: Wed, 2 Nov 2022 20:51:49 -0700 Subject: [PATCH 04/10] fix heartbeat reqs --- consumergroup.go | 5 ++++- protocol/heartbeat/heartbeat.go | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/consumergroup.go b/consumergroup.go index 0c9843a45..4192eb836 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -470,7 +470,7 @@ func (g *Generation) heartbeatLoop(interval time.Duration) { case <-ctx.Done(): return case <-ticker.C: - _, err := g.coord.heartbeat(ctx, &HeartbeatRequest{ + resp, err := g.coord.heartbeat(ctx, &HeartbeatRequest{ GroupID: g.GroupID, GenerationID: g.ID, MemberID: g.MemberID, @@ -478,6 +478,9 @@ func (g *Generation) heartbeatLoop(interval time.Duration) { if err != nil { return } + if resp.Error != nil { + return + } } } }) diff --git a/protocol/heartbeat/heartbeat.go b/protocol/heartbeat/heartbeat.go index cf4c11185..962d6f467 100644 --- a/protocol/heartbeat/heartbeat.go +++ b/protocol/heartbeat/heartbeat.go @@ -27,8 +27,8 @@ type Response struct { // type. _ struct{} `kafka:"min=v4,max=v4,tag"` - ErrorCode int16 `kafka:"min=v0,max=v4"` ThrottleTimeMs int32 `kafka:"min=v1,max=v4"` + ErrorCode int16 `kafka:"min=v0,max=v4"` } func (r *Response) ApiKey() protocol.ApiKey { From dfbf95463ea9bcea31deb1373f419ca73baba273 Mon Sep 17 00:00:00 2001 From: rhansen2 Date: Wed, 2 Nov 2022 21:11:44 -0700 Subject: [PATCH 05/10] test for heartbeat triggering generation end --- consumergroup_test.go | 64 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/consumergroup_test.go b/consumergroup_test.go index 3bc72b68e..da1c9e7ba 100644 --- a/consumergroup_test.go +++ b/consumergroup_test.go @@ -606,3 +606,67 @@ func TestGenerationStartsFunctionAfterClosed(t *testing.T) { } } } + +func TestGenerationEndsOnHeartbeatError(t *testing.T) { + gen := Generation{ + coord: &mockCoordinator{ + heartbeatFunc: func(context.Context, *HeartbeatRequest) (*HeartbeatResponse, error) { + return nil, errors.New("some error") + }, + }, + done: make(chan struct{}), + joined: make(chan struct{}), + log: func(func(Logger)) {}, + logError: func(func(Logger)) {}, + } + + ch := make(chan error) + gen.Start(func(ctx context.Context) { + <-ctx.Done() + ch <- ctx.Err() + }) + + gen.heartbeatLoop(time.Millisecond) + + select { + case <-time.After(time.Second): + t.Fatal("timed out waiting for func to run") + case err := <-ch: + if !errors.Is(err, ErrGenerationEnded) { + t.Fatalf("expected %v but got %v", ErrGenerationEnded, err) + } + } +} + +func TestGenerationEndsOnHeartbeatRebalaceInProgress(t *testing.T) { + gen := Generation{ + coord: &mockCoordinator{ + heartbeatFunc: func(context.Context, *HeartbeatRequest) (*HeartbeatResponse, error) { + return &HeartbeatResponse{ + Error: makeError(int16(RebalanceInProgress), ""), + }, nil + }, + }, + done: make(chan struct{}), + joined: make(chan struct{}), + log: func(func(Logger)) {}, + logError: func(func(Logger)) {}, + } + + ch := make(chan error) + gen.Start(func(ctx context.Context) { + <-ctx.Done() + ch <- ctx.Err() + }) + + gen.heartbeatLoop(time.Millisecond) + + select { + case <-time.After(time.Second): + t.Fatal("timed out waiting for func to run") + case err := <-ch: + if !errors.Is(err, ErrGenerationEnded) { + t.Fatalf("expected %v but got %v", ErrGenerationEnded, err) + } + } +} From 8807d7422fa09f351dd324699e0555f2217a67a3 Mon Sep 17 00:00:00 2001 From: rhansen2 Date: Thu, 3 Nov 2022 18:04:47 -0700 Subject: [PATCH 06/10] check more errors --- consumergroup.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/consumergroup.go b/consumergroup.go index 4192eb836..103311fe0 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -432,8 +432,15 @@ func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error { Topics: topics, } - _, err := g.coord.offsetCommit(genCtx{g}, request) + resp, err := g.coord.offsetCommit(genCtx{g}, request) if err == nil { + for _, partitions := range resp.Topics { + for _, partition := range partitions { + if partition.Error != nil { + return partition.Error + } + } + } // if logging is enabled, print out the partitions that were committed. g.log(func(l Logger) { var report []string @@ -1094,6 +1101,9 @@ func (cg *ConsumerGroup) fetchOffsets(subs map[string][]int) (map[string]map[int for topic, offsets := range offsets.Topics { offsetsByPartition := map[int]int64{} for _, pr := range offsets { + if pr.Error != nil { + return nil, pr.Error + } if pr.CommittedOffset < 0 { pr.CommittedOffset = cg.config.StartOffset } @@ -1140,7 +1150,7 @@ func (cg *ConsumerGroup) leaveGroup(ctx context.Context, memberID string) error log.Printf("Leaving group %s, member %s", cg.config.ID, memberID) }) - _, err := cg.coord.leaveGroup(ctx, &LeaveGroupRequest{ + resp, err := cg.coord.leaveGroup(ctx, &LeaveGroupRequest{ GroupID: cg.config.ID, Members: []LeaveGroupRequestMember{ { @@ -1148,6 +1158,9 @@ func (cg *ConsumerGroup) leaveGroup(ctx context.Context, memberID string) error }, }, }) + if err == nil && resp.Error != nil { + err = resp.Error + } if err != nil { cg.withErrorLogger(func(log Logger) { log.Printf("leave group failed for group, %v, and member, %v: %v", cg.config.ID, memberID, err) From 02feefaa45c6587dc5c4ea4a2800050342c6afa2 Mon Sep 17 00:00:00 2001 From: rhansen2 Date: Sun, 6 Nov 2022 20:33:21 -0800 Subject: [PATCH 07/10] Client.JoinGroup sarama compatibility test --- joingroup_test.go | 82 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/joingroup_test.go b/joingroup_test.go index 926f5b4a6..f2c273da9 100644 --- a/joingroup_test.go +++ b/joingroup_test.go @@ -5,10 +5,14 @@ import ( "bytes" "context" "errors" + "net" "reflect" "testing" "time" + "github.com/segmentio/kafka-go/protocol" + "github.com/segmentio/kafka-go/protocol/consumer" + "github.com/segmentio/kafka-go/protocol/joingroup" ktesting "github.com/segmentio/kafka-go/testing" ) @@ -124,6 +128,84 @@ func TestClientJoinGroup(t *testing.T) { } } +type roundTripFn func(context.Context, net.Addr, Request) (Response, error) + +func (f roundTripFn) RoundTrip(ctx context.Context, addr net.Addr, req Request) (Response, error) { + return f(ctx, addr, req) +} + +// https://github.com/Shopify/sarama/blob/610514edec1825240d59b62e4d7f1aba4b1fa000/consumer_group_members.go#L43 +func TestClientJoinGroupSaramaCompatibility(t *testing.T) { + subscription := consumer.Subscription{ + Version: 1, + Topics: []string{"topic"}, + } + + // Marhsal as Verzon 0 (Without OwnedPartitions) but + // with Version=1. + metadata, err := protocol.Marshal(0, subscription) + if err != nil { + t.Fatalf("failed to marshal subscription %v", err) + } + + client := &Client{ + Addr: TCP("fake:9092"), + Transport: roundTripFn(func(_ context.Context, _ net.Addr, _ Request) (Response, error) { + resp := joingroup.Response{ + ProtocolType: "consumer", + ProtocolName: RoundRobinGroupBalancer{}.ProtocolName(), + LeaderID: "member", + MemberID: "member", + Members: []joingroup.ResponseMember{ + { + MemberID: "member", + Metadata: metadata, + }, + }, + } + return &resp, nil + }), + } + + expResp := JoinGroupResponse{ + ProtocolName: RoundRobinGroupBalancer{}.ProtocolName(), + ProtocolType: "consumer", + LeaderID: "member", + MemberID: "member", + Members: []JoinGroupResponseMember{ + { + ID: "member", + Metadata: GroupProtocolSubscription{ + Topics: []string{"topic"}, + OwnedPartitions: map[string][]int{}, + }, + }, + }, + } + + gotResp, err := client.JoinGroup(context.Background(), &JoinGroupRequest{ + GroupID: "group", + MemberID: "member", + ProtocolType: "consumer", + Protocols: []GroupProtocol{ + { + Name: RoundRobinGroupBalancer{}.ProtocolName(), + Metadata: GroupProtocolSubscription{ + Topics: []string{"topic"}, + UserData: metadata, + }, + }, + }, + }) + if err != nil { + t.Fatalf("error calling JoinGroup: %v", err) + } + + if !reflect.DeepEqual(expResp, *gotResp) { + t.Fatalf("unexpected JoinGroup resp\nexpected: %#v\n got: %#v", expResp, *gotResp) + } +} + func TestSaramaCompatibility(t *testing.T) { var ( // sample data from github.com/Shopify/sarama From d5efa2f06f3ed709c60cf4b70e0722b3e29c4ea0 Mon Sep 17 00:00:00 2001 From: rhansen2 Date: Sun, 6 Nov 2022 21:23:22 -0800 Subject: [PATCH 08/10] Client.SyncGroup: teset to ensure v0 compatibility --- syncgroup_test.go | 68 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/syncgroup_test.go b/syncgroup_test.go index 930696bde..0cf8a334a 100644 --- a/syncgroup_test.go +++ b/syncgroup_test.go @@ -6,11 +6,79 @@ import ( "context" "errors" "io" + "net" "reflect" "testing" "time" + + "github.com/segmentio/kafka-go/protocol" + "github.com/segmentio/kafka-go/protocol/consumer" + "github.com/segmentio/kafka-go/protocol/syncgroup" ) +func TestClientSyncGroupAssignmentV0(t *testing.T) { + client := &Client{ + Addr: TCP("fake:9092"), + Transport: roundTripFn(func(_ context.Context, _ net.Addr, _ Request) (Response, error) { + assigments := consumer.Assignment{ + Version: 0, + AssignedPartitions: []consumer.TopicPartition{ + { + Topic: "topic", + Partitions: []int32{0, 1, 2}, + }, + }, + UserData: nil, + } + assignmentBytes, err := protocol.Marshal(0, assigments) + if err != nil { + t.Fatalf("failed to marshal assigments: %v", err) + } + resp := syncgroup.Response{ + ProtocolType: "consumer", + ProtocolName: RoundRobinGroupBalancer{}.ProtocolName(), + Assignments: assignmentBytes, + } + + return &resp, nil + }), + } + + expResp := SyncGroupResponse{ + ProtocolType: "consumer", + ProtocolName: RoundRobinGroupBalancer{}.ProtocolName(), + Assignment: GroupProtocolAssignment{ + AssignedPartitions: map[string][]int{ + "topic": {0, 1, 2}, + }, + }, + } + + gotResp, err := client.SyncGroup(context.Background(), &SyncGroupRequest{ + GroupID: "group", + MemberID: "member", + ProtocolType: "consumer", + ProtocolName: "roundrobin", + Assignments: []SyncGroupRequestAssignment{ + { + MemberID: "member", + Assignment: GroupProtocolAssignment{ + AssignedPartitions: map[string][]int{ + "topic": {0, 1, 2}, + }, + }, + }, + }, + }) + if err != nil { + t.Fatalf("error calling SyncGroup: %v", err) + } + + if !reflect.DeepEqual(expResp, *gotResp) { + t.Fatalf("unexpected SyncGroup resp\nexpected: %#v\n got: %#v", expResp, *gotResp) + } +} + func TestClientSyncGroup(t *testing.T) { // In order to get to a sync group call we need to first // join a group. From e1830fe189255bf7426b2d694dae32848e619acf Mon Sep 17 00:00:00 2001 From: rhansen2 Date: Sun, 6 Nov 2022 21:46:32 -0800 Subject: [PATCH 09/10] Generation.OffsetCommit: return errors from api responses make unused parameters in functions are consistent --- consumergroup_test.go | 29 +++++++++++++++++++++++++++++ joingroup_test.go | 2 +- syncgroup_test.go | 4 +--- 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/consumergroup_test.go b/consumergroup_test.go index da1c9e7ba..8c5216ef2 100644 --- a/consumergroup_test.go +++ b/consumergroup_test.go @@ -670,3 +670,32 @@ func TestGenerationEndsOnHeartbeatRebalaceInProgress(t *testing.T) { } } } + +func TestGenerationOffsetCommitErrorsAreReturned(t *testing.T) { + mc := mockCoordinator{ + offsetCommitFunc: func(context.Context, *OffsetCommitRequest) (*OffsetCommitResponse, error) { + return &OffsetCommitResponse{ + Topics: map[string][]OffsetCommitPartition{ + "topic": { + { + Error: ErrGenerationEnded, + }, + }, + }, + }, nil + }, + } + gen := Generation{ + coord: mc, + log: func(func(Logger)) {}, + } + + err := gen.CommitOffsets(map[string]map[int]int64{ + "topic": { + 0: 100, + }, + }) + if err == nil { + t.Fatal("got nil from CommitOffsets when expecting an error") + } +} diff --git a/joingroup_test.go b/joingroup_test.go index f2c273da9..a8695e196 100644 --- a/joingroup_test.go +++ b/joingroup_test.go @@ -150,7 +150,7 @@ func TestClientJoinGroupSaramaCompatibility(t *testing.T) { client := &Client{ Addr: TCP("fake:9092"), - Transport: roundTripFn(func(_ context.Context, _ net.Addr, _ Request) (Response, error) { + Transport: roundTripFn(func(context.Context, net.Addr, Request) (Response, error) { resp := joingroup.Response{ ProtocolType: "consumer", ProtocolName: RoundRobinGroupBalancer{}.ProtocolName(), diff --git a/syncgroup_test.go b/syncgroup_test.go index 0cf8a334a..435a3875f 100644 --- a/syncgroup_test.go +++ b/syncgroup_test.go @@ -19,7 +19,7 @@ import ( func TestClientSyncGroupAssignmentV0(t *testing.T) { client := &Client{ Addr: TCP("fake:9092"), - Transport: roundTripFn(func(_ context.Context, _ net.Addr, _ Request) (Response, error) { + Transport: roundTripFn(func(context.Context, net.Addr, Request) (Response, error) { assigments := consumer.Assignment{ Version: 0, AssignedPartitions: []consumer.TopicPartition{ @@ -28,7 +28,6 @@ func TestClientSyncGroupAssignmentV0(t *testing.T) { Partitions: []int32{0, 1, 2}, }, }, - UserData: nil, } assignmentBytes, err := protocol.Marshal(0, assigments) if err != nil { @@ -39,7 +38,6 @@ func TestClientSyncGroupAssignmentV0(t *testing.T) { ProtocolName: RoundRobinGroupBalancer{}.ProtocolName(), Assignments: assignmentBytes, } - return &resp, nil }), } From 105fbb979829ebfd94fd0945789bf64c15e66a41 Mon Sep 17 00:00:00 2001 From: rhansen2 Date: Sun, 6 Nov 2022 21:53:15 -0800 Subject: [PATCH 10/10] boost test duration --- reader_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reader_test.go b/reader_test.go index 7aa4ca9e1..edf4bc6c3 100644 --- a/reader_test.go +++ b/reader_test.go @@ -1684,7 +1684,7 @@ func TestConsumerGroupMultipleWithDefaultTransport(t *testing.T) { recvErr2 <- err }() - time.Sleep(conf1.MaxWait) + time.Sleep(conf1.MaxWait * 5) totalMessages := 10