Skip to content

Commit 5c7fc8b

Browse files
authored
Remove unused partial KIP-467 implementation (ProduceResponse batch error details) (#2524)
1 parent 6f8de58 commit 5c7fc8b

File tree

2 files changed

+17
-17
lines changed

2 files changed

+17
-17
lines changed

kafka/producer/record_accumulator.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,17 +68,19 @@ def try_append(self, timestamp_ms, key, value, headers):
6868
sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1)
6969
return future
7070

71-
def done(self, base_offset=None, timestamp_ms=None, exception=None, log_start_offset=None, global_error=None):
72-
level = logging.DEBUG if exception is None else logging.WARNING
73-
log.log(level, "Produced messages to topic-partition %s with base offset"
74-
" %s log start offset %s and error %s.", self.topic_partition, base_offset,
75-
log_start_offset, global_error) # trace
71+
def done(self, base_offset=None, timestamp_ms=None, exception=None, log_start_offset=None):
7672
if self.produce_future.is_done:
7773
log.warning('Batch is already closed -- ignoring batch.done()')
7874
return
7975
elif exception is None:
76+
log.debug("Produced messages to topic-partition %s with base offset"
77+
" %s log start offset %s.", self.topic_partition, base_offset,
78+
log_start_offset) # trace
8079
self.produce_future.success((base_offset, timestamp_ms, log_start_offset))
8180
else:
81+
log.warning("Failed to produce messages to topic-partition %s with base offset"
82+
" %s log start offset %s and error %s.", self.topic_partition, base_offset,
83+
log_start_offset, exception) # trace
8284
self.produce_future.failure(exception)
8385

8486
def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full):
@@ -109,7 +111,7 @@ def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full)
109111

110112
if error:
111113
self.records.close()
112-
self.done(-1, None, Errors.KafkaTimeoutError(
114+
self.done(base_offset=-1, exception=Errors.KafkaTimeoutError(
113115
"Batch for %s containing %s record(s) expired: %s" % (
114116
self.topic_partition, self.records.next_offset(), error)))
115117
return True

kafka/producer/sender.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ def add_topic(self, topic):
182182
def _failed_produce(self, batches, node_id, error):
183183
log.error("Error sending produce request to node %d: %s", node_id, error) # trace
184184
for batch in batches:
185-
self._complete_batch(batch, error, -1, None)
185+
self._complete_batch(batch, error, -1)
186186

187187
def _handle_produce_response(self, node_id, send_time, batches, response):
188188
"""Handle a produce response."""
@@ -194,7 +194,6 @@ def _handle_produce_response(self, node_id, send_time, batches, response):
194194

195195
for topic, partitions in response.topics:
196196
for partition_info in partitions:
197-
global_error = None
198197
log_start_offset = None
199198
if response.API_VERSION < 2:
200199
partition, error_code, offset = partition_info
@@ -204,28 +203,27 @@ def _handle_produce_response(self, node_id, send_time, batches, response):
204203
elif 5 <= response.API_VERSION <= 7:
205204
partition, error_code, offset, ts, log_start_offset = partition_info
206205
else:
207-
# the ignored parameter is record_error of type list[(batch_index: int, error_message: str)]
208-
partition, error_code, offset, ts, log_start_offset, _, global_error = partition_info
206+
# Currently unused / TODO: KIP-467
207+
partition, error_code, offset, ts, log_start_offset, _record_errors, _global_error = partition_info
209208
tp = TopicPartition(topic, partition)
210209
error = Errors.for_code(error_code)
211210
batch = batches_by_partition[tp]
212-
self._complete_batch(batch, error, offset, ts, log_start_offset, global_error)
211+
self._complete_batch(batch, error, offset, timestamp_ms=ts, log_start_offset=log_start_offset)
213212

214213
else:
215214
# this is the acks = 0 case, just complete all requests
216215
for batch in batches:
217-
self._complete_batch(batch, None, -1, None)
216+
self._complete_batch(batch, None, -1)
218217

219-
def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_start_offset=None, global_error=None):
218+
def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_start_offset=None):
220219
"""Complete or retry the given batch of records.
221220
222221
Arguments:
223222
batch (RecordBatch): The record batch
224223
error (Exception): The error (or None if none)
225224
base_offset (int): The base offset assigned to the records if successful
226225
timestamp_ms (int, optional): The timestamp returned by the broker for this batch
227-
log_start_offset (int): The start offset of the log at the time this produce response was created
228-
global_error (str): The summarising error message
226+
log_start_offset (int, optional): The start offset of the log at the time this produce response was created
229227
"""
230228
# Standardize no-error to None
231229
if error is Errors.NoError:
@@ -237,15 +235,15 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star
237235
" retrying (%d attempts left). Error: %s",
238236
batch.topic_partition,
239237
self.config['retries'] - batch.attempts - 1,
240-
global_error or error)
238+
error)
241239
self._accumulator.reenqueue(batch)
242240
self._sensors.record_retries(batch.topic_partition.topic, batch.record_count)
243241
else:
244242
if error is Errors.TopicAuthorizationFailedError:
245243
error = error(batch.topic_partition.topic)
246244

247245
# tell the user the result of their request
248-
batch.done(base_offset, timestamp_ms, error, log_start_offset, global_error)
246+
batch.done(base_offset, timestamp_ms, error, log_start_offset)
249247
self._accumulator.deallocate(batch)
250248
if error is not None:
251249
self._sensors.record_errors(batch.topic_partition.topic, batch.record_count)

0 commit comments

Comments
 (0)