@@ -111,6 +111,7 @@ def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
111111 """
112112 for (host , port ) in self .hosts :
113113 requestId = self ._next_id ()
114+ log .debug ('Request %s: %s' , requestId , payloads )
114115 try :
115116 conn = self ._get_conn (host , port )
116117 request = encoder_fn (client_id = self .client_id ,
@@ -119,7 +120,9 @@ def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
119120
120121 conn .send (requestId , request )
121122 response = conn .recv (requestId )
122- return decoder_fn (response )
123+ decoded = decoder_fn (response )
124+ log .debug ('Response %s: %s' , requestId , decoded )
125+ return decoded
123126
124127 except Exception :
125128 log .exception ('Error sending request [%s] to server %s:%s, '
@@ -150,9 +153,6 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
150153
151154 List of response objects in the same order as the supplied payloads
152155 """
153-
154- log .debug ("Sending Payloads: %s" % payloads )
155-
156156 # Group the requests by topic+partition
157157 brokers_for_payloads = []
158158 payloads_by_broker = collections .defaultdict (list )
@@ -170,6 +170,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
170170 broker_failures = []
171171 for broker , payloads in payloads_by_broker .items ():
172172 requestId = self ._next_id ()
173+ log .debug ('Request %s to %s: %s' , requestId , broker , payloads )
173174 request = encoder_fn (client_id = self .client_id ,
174175 correlation_id = requestId , payloads = payloads )
175176
@@ -222,7 +223,6 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
222223 # Return responses in the same order as provided
223224 responses_by_payload = [responses_by_broker [broker ].pop (0 )
224225 for broker in brokers_for_payloads ]
225- log .debug ('Responses: %s' % responses_by_payload )
226226 return responses_by_payload
227227
228228 def __repr__ (self ):
0 commit comments