@@ -133,16 +133,15 @@ const (
133133 ReadCommitted IsolationLevel = 1
134134)
135135
136- var (
137- // DefaultClientID is the default value used as ClientID of kafka
138- // connections.
139- DefaultClientID string
140- )
136+ // DefaultClientID is the default value used as ClientID of kafka
137+ // connections.
138+ var DefaultClientID string
141139
142140func init () {
143141 progname := filepath .Base (os .Args [0 ])
144142 hostname , _ := os .Hostname ()
145143 DefaultClientID = fmt .Sprintf ("%s@%s (github.com/segmentio/kafka-go)" , progname , hostname )
144+ DefaultTransport .(* Transport ).ClientID = DefaultClientID
146145}
147146
148147// NewConn returns a new kafka connection for the given topic and partition.
@@ -263,10 +262,12 @@ func (c *Conn) Controller() (broker Broker, err error) {
263262 }
264263 for _ , brokerMeta := range res .Brokers {
265264 if brokerMeta .NodeID == res .ControllerID {
266- broker = Broker {ID : int (brokerMeta .NodeID ),
265+ broker = Broker {
266+ ID : int (brokerMeta .NodeID ),
267267 Port : int (brokerMeta .Port ),
268268 Host : brokerMeta .Host ,
269- Rack : brokerMeta .Rack }
269+ Rack : brokerMeta .Rack ,
270+ }
270271 break
271272 }
272273 }
@@ -322,7 +323,6 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato
322323 err := c .readOperation (
323324 func (deadline time.Time , id int32 ) error {
324325 return c .writeRequest (findCoordinator , v0 , id , request )
325-
326326 },
327327 func (deadline time.Time , size int ) error {
328328 return expectZeroSize (func () (remain int , err error ) {
@@ -340,32 +340,6 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato
340340 return response , nil
341341}
342342
343- // heartbeat sends a heartbeat message required by consumer groups
344- //
345- // See http://kafka.apache.org/protocol.html#The_Messages_Heartbeat
346- func (c * Conn ) heartbeat (request heartbeatRequestV0 ) (heartbeatResponseV0 , error ) {
347- var response heartbeatResponseV0
348-
349- err := c .writeOperation (
350- func (deadline time.Time , id int32 ) error {
351- return c .writeRequest (heartbeat , v0 , id , request )
352- },
353- func (deadline time.Time , size int ) error {
354- return expectZeroSize (func () (remain int , err error ) {
355- return (& response ).readFrom (& c .rbuf , size )
356- }())
357- },
358- )
359- if err != nil {
360- return heartbeatResponseV0 {}, err
361- }
362- if response .ErrorCode != 0 {
363- return heartbeatResponseV0 {}, Error (response .ErrorCode )
364- }
365-
366- return response , nil
367- }
368-
369343// joinGroup attempts to join a consumer group
370344//
371345// See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup
@@ -752,9 +726,8 @@ func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch {
752726// ReadBatchWith in every way is similar to ReadBatch. ReadBatch is configured
753727// with the default values in ReadBatchConfig except for minBytes and maxBytes.
754728func (c * Conn ) ReadBatchWith (cfg ReadBatchConfig ) * Batch {
755-
756729 var adjustedDeadline time.Time
757- var maxFetch = int (c .fetchMaxBytes )
730+ maxFetch : = int (c .fetchMaxBytes )
758731
759732 if cfg .MinBytes < 0 || cfg .MinBytes > maxFetch {
760733 return & Batch {err : fmt .Errorf ("kafka.(*Conn).ReadBatch: minBytes of %d out of [1,%d] bounds" , cfg .MinBytes , maxFetch )}
@@ -960,7 +933,6 @@ func (c *Conn) readOffset(t int64) (offset int64, err error) {
960933// connection. If there are none, the method fetches all partitions of the kafka
961934// cluster.
962935func (c * Conn ) ReadPartitions (topics ... string ) (partitions []Partition , err error ) {
963-
964936 if len (topics ) == 0 {
965937 if len (c .topic ) != 0 {
966938 defaultTopics := [... ]string {c .topic }
@@ -1107,11 +1079,10 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
11071079 deadline = adjustDeadlineForRTT (deadline , now , defaultRTT )
11081080 switch produceVersion {
11091081 case v7 :
1110- recordBatch , err :=
1111- newRecordBatch (
1112- codec ,
1113- msgs ... ,
1114- )
1082+ recordBatch , err := newRecordBatch (
1083+ codec ,
1084+ msgs ... ,
1085+ )
11151086 if err != nil {
11161087 return err
11171088 }
@@ -1126,11 +1097,10 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
11261097 recordBatch ,
11271098 )
11281099 case v3 :
1129- recordBatch , err :=
1130- newRecordBatch (
1131- codec ,
1132- msgs ... ,
1133- )
1100+ recordBatch , err := newRecordBatch (
1101+ codec ,
1102+ msgs ... ,
1103+ )
11341104 if err != nil {
11351105 return err
11361106 }
@@ -1195,7 +1165,6 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
11951165 }
11961166 return size , err
11971167 }
1198-
11991168 })
12001169 if err != nil {
12011170 return size , err
@@ -1555,7 +1524,7 @@ func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) {
15551524 return nil , err
15561525 }
15571526 if version == v1 {
1558- var request = saslAuthenticateRequestV0 {Data : data }
1527+ request : = saslAuthenticateRequestV0 {Data : data }
15591528 var response saslAuthenticateResponseV0
15601529
15611530 err := c .writeOperation (
0 commit comments