diff --git a/consumergroup.go b/consumergroup.go index 0c9843a45..05a708a32 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -73,6 +73,9 @@ type ConsumerGroupConfig struct { // for more complex use cases. Topics []string + // The unique identifier of the consumer instance provided by end user. + GroupInstanceID string + // A transport used to send messages to kafka clusters. // // If nil, DefaultTransport will be used. @@ -311,6 +314,9 @@ type Generation struct { // coordinator. MemberID string + // The unique identifier of the consumer instance provided by end user. + GroupInstanceID string + // Assignments is the initial state of this Generation. The partition // assignments are grouped by topic. Assignments map[string][]PartitionAssignment @@ -471,9 +477,10 @@ func (g *Generation) heartbeatLoop(interval time.Duration) { return case <-ticker.C: _, err := g.coord.heartbeat(ctx, &HeartbeatRequest{ - GroupID: g.GroupID, - GenerationID: g.ID, - MemberID: g.MemberID, + GroupID: g.GroupID, + GenerationID: g.ID, + MemberID: g.MemberID, + GroupInstanceID: g.GroupInstanceID, }) if err != nil { return @@ -826,6 +833,7 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) { ID: generationID, GroupID: cg.config.ID, MemberID: memberID, + GroupInstanceID: cg.config.GroupInstanceID, Assignments: cg.makeAssignments(assignments, offsets), coord: cg.coord, done: make(chan struct{}), @@ -936,6 +944,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequest(memberID string) (*JoinGroupReques SessionTimeout: cg.config.SessionTimeout, RebalanceTimeout: cg.config.RebalanceTimeout, ProtocolType: defaultProtocolType, + GroupInstanceID: cg.config.GroupInstanceID, } for _, balancer := range cg.config.GroupBalancers { diff --git a/reader.go b/reader.go index 6a965c07a..dd68ec9f4 100644 --- a/reader.go +++ b/reader.go @@ -367,6 +367,9 @@ type ReaderConfig struct { // The topic to read messages from. Topic string + // The unique identifier of the consumer instance provided by end user. + GroupInstanceID string + // Partition to read messages from. Either Partition or GroupID may // be assigned, but not both Partition int @@ -731,6 +734,7 @@ func NewReader(config ReaderConfig) *Reader { ID: r.config.GroupID, Brokers: r.config.Brokers, Topics: r.getTopics(), + GroupInstanceID: r.config.GroupInstanceID, GroupBalancers: r.config.GroupBalancers, HeartbeatInterval: r.config.HeartbeatInterval, PartitionWatchInterval: r.config.PartitionWatchInterval,