Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type ConsumerGroupConfig struct {
// for more complex use cases.
Topics []string

// The unique identifier of the consumer instance provided by end user.
GroupInstanceID string

// A transport used to send messages to kafka clusters.
//
// If nil, DefaultTransport will be used.
Expand Down Expand Up @@ -311,6 +314,9 @@ type Generation struct {
// coordinator.
MemberID string

// The unique identifier of the consumer instance provided by end user.
GroupInstanceID string

// Assignments is the initial state of this Generation. The partition
// assignments are grouped by topic.
Assignments map[string][]PartitionAssignment
Expand Down Expand Up @@ -471,9 +477,10 @@ func (g *Generation) heartbeatLoop(interval time.Duration) {
return
case <-ticker.C:
_, err := g.coord.heartbeat(ctx, &HeartbeatRequest{
GroupID: g.GroupID,
GenerationID: g.ID,
MemberID: g.MemberID,
GroupID: g.GroupID,
GenerationID: g.ID,
MemberID: g.MemberID,
GroupInstanceID: g.GroupInstanceID,
})
if err != nil {
return
Expand Down Expand Up @@ -826,6 +833,7 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) {
ID: generationID,
GroupID: cg.config.ID,
MemberID: memberID,
GroupInstanceID: cg.config.GroupInstanceID,
Assignments: cg.makeAssignments(assignments, offsets),
coord: cg.coord,
done: make(chan struct{}),
Expand Down Expand Up @@ -936,6 +944,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequest(memberID string) (*JoinGroupReques
SessionTimeout: cg.config.SessionTimeout,
RebalanceTimeout: cg.config.RebalanceTimeout,
ProtocolType: defaultProtocolType,
GroupInstanceID: cg.config.GroupInstanceID,
}

for _, balancer := range cg.config.GroupBalancers {
Expand Down
4 changes: 4 additions & 0 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,9 @@ type ReaderConfig struct {
// The topic to read messages from.
Topic string

// The unique identifier of the consumer instance provided by end user.
GroupInstanceID string

// Partition to read messages from. Either Partition or GroupID may
// be assigned, but not both
Partition int
Expand Down Expand Up @@ -731,6 +734,7 @@ func NewReader(config ReaderConfig) *Reader {
ID: r.config.GroupID,
Brokers: r.config.Brokers,
Topics: r.getTopics(),
GroupInstanceID: r.config.GroupInstanceID,
GroupBalancers: r.config.GroupBalancers,
HeartbeatInterval: r.config.HeartbeatInterval,
PartitionWatchInterval: r.config.PartitionWatchInterval,
Expand Down