Skip to content

Commit f8cc793

Browse files
author
Csaba Huszka
committed
Supports older versions of kafka server
1 parent f30a6a5 commit f8cc793

File tree

4 files changed

+289
-19
lines changed

4 files changed

+289
-19
lines changed

conn.go

Lines changed: 77 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -346,13 +346,41 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato
346346
func (c *Conn) heartbeat(request heartbeatRequestV3) (heartbeatResponseV3, error) {
347347
var response heartbeatResponseV3
348348

349-
err := c.writeOperation(
349+
apiVersion, err := c.negotiateVersion(heartbeat, v0, v3)
350+
if err != nil {
351+
return heartbeatResponseV3{}, err
352+
}
353+
354+
err = c.writeOperation(
350355
func(deadline time.Time, id int32) error {
351-
return c.writeRequest(heartbeat, v3, id, request)
356+
switch apiVersion {
357+
case 0:
358+
return c.writeRequest(heartbeat, v0, id, heartbeatRequestV0{
359+
GroupID: request.GroupID,
360+
GenerationID: request.GenerationID,
361+
MemberID: request.MemberID,
362+
})
363+
case 3:
364+
return c.writeRequest(heartbeat, v3, id, request)
365+
}
366+
return fmt.Errorf("given API version is not supported: %d", apiVersion)
352367
},
353368
func(deadline time.Time, size int) error {
354369
return expectZeroSize(func() (remain int, err error) {
355-
return (&response).readFrom(&c.rbuf, size)
370+
switch apiVersion {
371+
case 0:
372+
var responseV0 heartbeatResponseV0
373+
remain, err := (&responseV0).readFrom(&c.rbuf, size)
374+
375+
response = heartbeatResponseV3{
376+
ErrorCode: responseV0.ErrorCode,
377+
}
378+
379+
return remain, err
380+
case 3:
381+
return (&response).readFrom(&c.rbuf, size)
382+
}
383+
return remain, fmt.Errorf("given API version is not supported: %d", apiVersion)
356384
}())
357385
},
358386
)
@@ -372,12 +400,56 @@ func (c *Conn) heartbeat(request heartbeatRequestV3) (heartbeatResponseV3, error
372400
func (c *Conn) joinGroup(request joinGroupRequestV5) (joinGroupResponseV5, error) {
373401
var response joinGroupResponseV5
374402

375-
err := c.writeOperation(
403+
apiVersion, err := c.negotiateVersion(joinGroup, v1, v5)
404+
if err != nil {
405+
return joinGroupResponseV5{}, err
406+
}
407+
408+
err = c.writeOperation(
376409
func(deadline time.Time, id int32) error {
377-
return c.writeRequest(joinGroup, v5, id, request)
410+
switch apiVersion {
411+
case 5:
412+
return c.writeRequest(joinGroup, v5, id, request)
413+
case 1:
414+
return c.writeRequest(joinGroup, v1, id, joinGroupRequestV1{
415+
GroupID: request.GroupID,
416+
SessionTimeout: request.SessionTimeout,
417+
RebalanceTimeout: request.RebalanceTimeout,
418+
MemberID: request.MemberID,
419+
ProtocolType: request.ProtocolType,
420+
GroupProtocols: request.GroupProtocols,
421+
})
422+
}
423+
return fmt.Errorf("given API version is not supported: %d", apiVersion)
378424
},
379425
func(deadline time.Time, size int) error {
380426
return expectZeroSize(func() (remain int, err error) {
427+
switch apiVersion {
428+
case 5:
429+
break
430+
case 1:
431+
var responseV1 joinGroupResponseV1
432+
var members []joinGroupResponseMemberV5
433+
remain, err := (&responseV1).readFrom(&c.rbuf, size)
434+
435+
for _, jgrm := range responseV1.Members {
436+
members = append(members, joinGroupResponseMemberV5{
437+
MemberID: jgrm.MemberID,
438+
MemberMetadata: jgrm.MemberMetadata,
439+
})
440+
}
441+
442+
response = joinGroupResponseV5{
443+
ErrorCode: responseV1.ErrorCode,
444+
GenerationID: responseV1.GenerationID,
445+
GroupProtocol: responseV1.GroupProtocol,
446+
LeaderID: responseV1.LeaderID,
447+
MemberID: responseV1.MemberID,
448+
Members: members,
449+
}
450+
451+
return remain, err
452+
}
381453
return (&response).readFrom(&c.rbuf, size)
382454
}())
383455
},

docker-compose-test.yml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
version: "3"
2+
services:
3+
zookeeper:
4+
image: 'bitnami/zookeeper:latest'
5+
ports:
6+
- '2181:2181'
7+
environment:
8+
- ALLOW_ANONYMOUS_LOGIN=yes
9+
kafka:
10+
image: 'bitnami/kafka:latest'
11+
ports:
12+
- '9092:9092'
13+
environment:
14+
- KAFKA_BROKER_ID=1
15+
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
16+
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
17+
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
18+
- ALLOW_PLAINTEXT_LISTENER=yes
19+
depends_on:
20+
- zookeeper

heartbeat.go

Lines changed: 57 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,6 @@ type HeartbeatResponse struct {
4040
Throttle time.Duration
4141
}
4242

43-
type heartbeatRequestV3 struct {
44-
// GroupID holds the unique group identifier
45-
GroupID string
46-
47-
// GenerationID holds the generation of the group.
48-
GenerationID int32
49-
50-
// MemberID assigned by the group coordinator
51-
MemberID string
52-
53-
// The unique identifier of the consumer instance provided by end user.
54-
GroupInstanceID *string
55-
}
56-
5743
// Heartbeat sends a heartbeat request to a kafka broker and returns the response.
5844
func (c *Client) Heartbeat(ctx context.Context, req *HeartbeatRequest) (*HeartbeatResponse, error) {
5945
m, err := c.roundTrip(ctx, req.Addr, &heartbeatAPI.Request{
@@ -79,6 +65,43 @@ func (c *Client) Heartbeat(ctx context.Context, req *HeartbeatRequest) (*Heartbe
7965
return ret, nil
8066
}
8167

68+
type heartbeatRequestV0 struct {
69+
// GroupID holds the unique group identifier
70+
GroupID string
71+
72+
// GenerationID holds the generation of the group.
73+
GenerationID int32
74+
75+
// MemberID assigned by the group coordinator
76+
MemberID string
77+
}
78+
79+
type heartbeatRequestV3 struct {
80+
// GroupID holds the unique group identifier
81+
GroupID string
82+
83+
// GenerationID holds the generation of the group.
84+
GenerationID int32
85+
86+
// MemberID assigned by the group coordinator
87+
MemberID string
88+
89+
// The unique identifier of the consumer instance provided by end user.
90+
GroupInstanceID *string
91+
}
92+
93+
func (t heartbeatRequestV0) size() int32 {
94+
return sizeofString(t.GroupID) +
95+
sizeofInt32(t.GenerationID) +
96+
sizeofString(t.MemberID)
97+
}
98+
99+
func (t heartbeatRequestV0) writeTo(wb *writeBuffer) {
100+
wb.writeString(t.GroupID)
101+
wb.writeInt32(t.GenerationID)
102+
wb.writeString(t.MemberID)
103+
}
104+
82105
func (t heartbeatRequestV3) size() int32 {
83106
return sizeofString(t.GroupID) +
84107
sizeofInt32(t.GenerationID) +
@@ -93,6 +116,26 @@ func (t heartbeatRequestV3) writeTo(wb *writeBuffer) {
93116
wb.writeNullableString(t.GroupInstanceID)
94117
}
95118

119+
type heartbeatResponseV0 struct {
120+
// ErrorCode holds response error code
121+
ErrorCode int16
122+
}
123+
124+
func (t heartbeatResponseV0) size() int32 {
125+
return sizeofInt16(t.ErrorCode)
126+
}
127+
128+
func (t heartbeatResponseV0) writeTo(wb *writeBuffer) {
129+
wb.writeInt16(t.ErrorCode)
130+
}
131+
132+
func (t *heartbeatResponseV0) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
133+
if remain, err = readInt16(r, sz, &t.ErrorCode); err != nil {
134+
return
135+
}
136+
return
137+
}
138+
96139
type heartbeatResponseV3 struct {
97140
// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
98141
ThrottleTime int32

joingroup.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,141 @@ func (t joinGroupRequestGroupProtocolV1) writeTo(wb *writeBuffer) {
241241
wb.writeBytes(t.ProtocolMetadata)
242242
}
243243

244+
type joinGroupRequestV1 struct {
245+
// GroupID holds the unique group identifier
246+
GroupID string
247+
248+
// SessionTimeout holds the coordinator considers the consumer dead if it
249+
// receives no heartbeat after this timeout in ms.
250+
SessionTimeout int32
251+
252+
// RebalanceTimeout holds the maximum time that the coordinator will wait
253+
// for each member to rejoin when rebalancing the group in ms
254+
RebalanceTimeout int32
255+
256+
// MemberID assigned by the group coordinator or the zero string if joining
257+
// for the first time.
258+
MemberID string
259+
260+
// ProtocolType holds the unique name for class of protocols implemented by group
261+
ProtocolType string
262+
263+
// GroupProtocols holds the list of protocols that the member supports
264+
GroupProtocols []joinGroupRequestGroupProtocolV1
265+
}
266+
267+
func (t joinGroupRequestV1) size() int32 {
268+
return sizeofString(t.GroupID) +
269+
sizeofInt32(t.SessionTimeout) +
270+
sizeofInt32(t.RebalanceTimeout) +
271+
sizeofString(t.MemberID) +
272+
sizeofString(t.ProtocolType) +
273+
sizeofArray(len(t.GroupProtocols), func(i int) int32 { return t.GroupProtocols[i].size() })
274+
}
275+
276+
func (t joinGroupRequestV1) writeTo(wb *writeBuffer) {
277+
wb.writeString(t.GroupID)
278+
wb.writeInt32(t.SessionTimeout)
279+
wb.writeInt32(t.RebalanceTimeout)
280+
wb.writeString(t.MemberID)
281+
wb.writeString(t.ProtocolType)
282+
wb.writeArray(len(t.GroupProtocols), func(i int) { t.GroupProtocols[i].writeTo(wb) })
283+
}
284+
285+
type joinGroupResponseMemberV1 struct {
286+
// MemberID assigned by the group coordinator
287+
MemberID string
288+
MemberMetadata []byte
289+
}
290+
291+
func (t joinGroupResponseMemberV1) size() int32 {
292+
return sizeofString(t.MemberID) +
293+
sizeofBytes(t.MemberMetadata)
294+
}
295+
296+
func (t joinGroupResponseMemberV1) writeTo(wb *writeBuffer) {
297+
wb.writeString(t.MemberID)
298+
wb.writeBytes(t.MemberMetadata)
299+
}
300+
301+
func (t *joinGroupResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain int, err error) {
302+
if remain, err = readString(r, size, &t.MemberID); err != nil {
303+
return
304+
}
305+
if remain, err = readBytes(r, remain, &t.MemberMetadata); err != nil {
306+
return
307+
}
308+
return
309+
}
310+
311+
type joinGroupResponseV1 struct {
312+
// ErrorCode holds response error code
313+
ErrorCode int16
314+
315+
// GenerationID holds the generation of the group.
316+
GenerationID int32
317+
318+
// GroupProtocol holds the group protocol selected by the coordinator
319+
GroupProtocol string
320+
321+
// LeaderID holds the leader of the group
322+
LeaderID string
323+
324+
// MemberID assigned by the group coordinator
325+
MemberID string
326+
Members []joinGroupResponseMemberV1
327+
}
328+
329+
func (t joinGroupResponseV1) size() int32 {
330+
return sizeofInt16(t.ErrorCode) +
331+
sizeofInt32(t.GenerationID) +
332+
sizeofString(t.GroupProtocol) +
333+
sizeofString(t.LeaderID) +
334+
sizeofString(t.MemberID) +
335+
sizeofArray(len(t.MemberID), func(i int) int32 { return t.Members[i].size() })
336+
}
337+
338+
func (t joinGroupResponseV1) writeTo(wb *writeBuffer) {
339+
wb.writeInt16(t.ErrorCode)
340+
wb.writeInt32(t.GenerationID)
341+
wb.writeString(t.GroupProtocol)
342+
wb.writeString(t.LeaderID)
343+
wb.writeString(t.MemberID)
344+
wb.writeArray(len(t.Members), func(i int) { t.Members[i].writeTo(wb) })
345+
}
346+
347+
func (t *joinGroupResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) {
348+
if remain, err = readInt16(r, size, &t.ErrorCode); err != nil {
349+
return
350+
}
351+
if remain, err = readInt32(r, remain, &t.GenerationID); err != nil {
352+
return
353+
}
354+
if remain, err = readString(r, remain, &t.GroupProtocol); err != nil {
355+
return
356+
}
357+
if remain, err = readString(r, remain, &t.LeaderID); err != nil {
358+
return
359+
}
360+
if remain, err = readString(r, remain, &t.MemberID); err != nil {
361+
return
362+
}
363+
364+
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
365+
var item joinGroupResponseMemberV1
366+
if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil {
367+
return
368+
}
369+
t.Members = append(t.Members, item)
370+
return
371+
}
372+
if remain, err = readArrayWith(r, remain, fn); err != nil {
373+
return
374+
}
375+
376+
return
377+
}
378+
244379
type joinGroupRequestV5 struct {
245380
// GroupID holds the unique group identifier
246381
GroupID string

0 commit comments

Comments
 (0)