Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions kafka/producer/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, seri
produce_future.add_errback(self.failure)

def _produce_success(self, offset_and_timestamp):
offset, produce_timestamp_ms = offset_and_timestamp
offset, produce_timestamp_ms, log_start_offset = offset_and_timestamp

# Unpacking from args tuple is minor speed optimization
(relative_offset, timestamp_ms, checksum,
Expand All @@ -51,7 +51,7 @@ def _produce_success(self, offset_and_timestamp):
if offset != -1 and relative_offset is not None:
offset += relative_offset
tp = self._produce_future.topic_partition
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms,
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms, log_start_offset,
checksum, serialized_key_size,
serialized_value_size, serialized_header_size)
self.success(metadata)
Expand All @@ -67,5 +67,5 @@ def get(self, timeout=None):


RecordMetadata = collections.namedtuple(
'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp',
'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp', 'log_start_offset',
'checksum', 'serialized_key_size', 'serialized_value_size', 'serialized_header_size'])
8 changes: 4 additions & 4 deletions kafka/producer/record_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,16 @@ def try_append(self, timestamp_ms, key, value, headers):
sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1)
return future

def done(self, base_offset=None, timestamp_ms=None, exception=None):
def done(self, base_offset=None, timestamp_ms=None, exception=None, log_start_offset=None, global_error=None):
level = logging.DEBUG if exception is None else logging.WARNING
log.log(level, "Produced messages to topic-partition %s with base offset"
" %s and error %s.", self.topic_partition, base_offset,
exception) # trace
" %s log start offset %s and error %s.", self.topic_partition, base_offset,
log_start_offset, global_error) # trace
if self.produce_future.is_done:
log.warning('Batch is already closed -- ignoring batch.done()')
return
elif exception is None:
self.produce_future.success((base_offset, timestamp_ms))
self.produce_future.success((base_offset, timestamp_ms, log_start_offset))
else:
self.produce_future.failure(exception)

Expand Down
28 changes: 22 additions & 6 deletions kafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,21 @@ def _handle_produce_response(self, node_id, send_time, batches, response):

for topic, partitions in response.topics:
for partition_info in partitions:
global_error = None
log_start_offset = None
if response.API_VERSION < 2:
partition, error_code, offset = partition_info
ts = None
else:
elif 2 <= response.API_VERSION <= 4:
partition, error_code, offset, ts = partition_info
elif 5 <= response.API_VERSION <= 7:
partition, error_code, offset, ts, log_start_offset = partition_info
else:
partition, error_code, offset, ts, log_start_offset, _, global_error = partition_info
tp = TopicPartition(topic, partition)
error = Errors.for_code(error_code)
batch = batches_by_partition[tp]
self._complete_batch(batch, error, offset, ts)
self._complete_batch(batch, error, offset, ts, log_start_offset, global_error)

if response.API_VERSION > 0:
self._sensors.record_throttle_time(response.throttle_time_ms, node=node_id)
Expand All @@ -213,14 +219,16 @@ def _handle_produce_response(self, node_id, send_time, batches, response):
for batch in batches:
self._complete_batch(batch, None, -1, None)

def _complete_batch(self, batch, error, base_offset, timestamp_ms=None):
def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_start_offset=None, global_error=None):
"""Complete or retry the given batch of records.

Arguments:
batch (RecordBatch): The record batch
error (Exception): The error (or None if none)
base_offset (int): The base offset assigned to the records if successful
timestamp_ms (int, optional): The timestamp returned by the broker for this batch
log_start_offset (int): The start offset of the log at the time this produce response was created
global_error (str): The summarising error message
"""
# Standardize no-error to None
if error is Errors.NoError:
Expand All @@ -232,15 +240,15 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None):
" retrying (%d attempts left). Error: %s",
batch.topic_partition,
self.config['retries'] - batch.attempts - 1,
error)
global_error or error)
self._accumulator.reenqueue(batch)
self._sensors.record_retries(batch.topic_partition.topic, batch.record_count)
else:
if error is Errors.TopicAuthorizationFailedError:
error = error(batch.topic_partition.topic)

# tell the user the result of their request
batch.done(base_offset, timestamp_ms, error)
batch.done(base_offset, timestamp_ms, error, log_start_offset, global_error)
self._accumulator.deallocate(batch)
if error is not None:
self._sensors.record_errors(batch.topic_partition.topic, batch.record_count)
Expand Down Expand Up @@ -293,7 +301,15 @@ def _produce_request(self, node_id, acks, timeout, batches):
produce_records_by_partition[topic][partition] = buf

kwargs = {}
if self.config['api_version'] >= (0, 11):
if self.config['api_version'] >= (2, 1):
version = 7
elif self.config['api_version'] >= (2, 0):
version = 6
elif self.config['api_version'] >= (1, 1):
version = 5
elif self.config['api_version'] >= (1, 0):
version = 4
elif self.config['api_version'] >= (0, 11):
version = 3
kwargs = dict(transactional_id=None)
elif self.config['api_version'] >= (0, 10):
Expand Down
79 changes: 77 additions & 2 deletions kafka/protocol/produce.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,50 @@ class ProduceResponse_v5(Response):
)


class ProduceResponse_v6(Response):
"""
The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
"""
API_KEY = 0
API_VERSION = 6
SCHEMA = ProduceResponse_v5.SCHEMA


class ProduceResponse_v7(Response):
"""
V7 bumped up to indicate ZStandard capability. (see KIP-110)
"""
API_KEY = 0
API_VERSION = 7
SCHEMA = ProduceResponse_v6.SCHEMA


class ProduceResponse_v8(Response):
"""
V8 bumped up to add two new fields record_errors offset list and error_message
(See KIP-467)
"""
API_KEY = 0
API_VERSION = 8
SCHEMA = Schema(
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('error_code', Int16),
('offset', Int64),
('timestamp', Int64),
('log_start_offset', Int64)),
('record_errors', (Array(
('batch_index', Int32),
('batch_index_error_message', String('utf-8'))
))),
('error_message', String('utf-8'))
))),
('throttle_time_ms', Int32)
)


class ProduceRequest(Request):
API_KEY = 0

Expand Down Expand Up @@ -106,6 +150,7 @@ class ProduceRequest_v1(ProduceRequest):
RESPONSE_TYPE = ProduceResponse_v1
SCHEMA = ProduceRequest_v0.SCHEMA


class ProduceRequest_v2(ProduceRequest):
API_VERSION = 2
RESPONSE_TYPE = ProduceResponse_v2
Expand Down Expand Up @@ -147,11 +192,41 @@ class ProduceRequest_v5(ProduceRequest):
SCHEMA = ProduceRequest_v4.SCHEMA


class ProduceRequest_v6(ProduceRequest):
"""
The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
"""
API_VERSION = 6
RESPONSE_TYPE = ProduceResponse_v6
SCHEMA = ProduceRequest_v5.SCHEMA


class ProduceRequest_v7(ProduceRequest):
"""
V7 bumped up to indicate ZStandard capability. (see KIP-110)
"""
API_VERSION = 7
RESPONSE_TYPE = ProduceResponse_v7
SCHEMA = ProduceRequest_v6.SCHEMA


class ProduceRequest_v8(ProduceRequest):
"""
V8 bumped up to add two new fields record_errors offset list and error_message to PartitionResponse
(See KIP-467)
"""
API_VERSION = 8
RESPONSE_TYPE = ProduceResponse_v8
SCHEMA = ProduceRequest_v7.SCHEMA


ProduceRequest = [
ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2,
ProduceRequest_v3, ProduceRequest_v4, ProduceRequest_v5
ProduceRequest_v3, ProduceRequest_v4, ProduceRequest_v5,
ProduceRequest_v6, ProduceRequest_v7, ProduceRequest_v8,
]
ProduceResponse = [
ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2,
ProduceResponse_v3, ProduceResponse_v4, ProduceResponse_v5
ProduceResponse_v3, ProduceResponse_v4, ProduceResponse_v5,
ProduceResponse_v6, ProduceResponse_v7, ProduceResponse_v8,
]