Skip to content

Commit c0fb0de

Browse files
author
Dana Powers
committed
Dont maintain all producer args / kwargs in subclass __init__ and docstrings -- just refer to super class (Producer)
1 parent a9ddf15 commit c0fb0de

File tree

2 files changed

+24
-91
lines changed

2 files changed

+24
-91
lines changed

kafka/producer/keyed.py

Lines changed: 10 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,10 @@
33
import logging
44
import warnings
55

6-
from kafka.partitioner import HashedPartitioner
7-
from kafka.util import kafka_bytestring
6+
from .base import Producer
7+
from ..partitioner import HashedPartitioner
8+
from ..util import kafka_bytestring
89

9-
from .base import (
10-
Producer, BATCH_SEND_DEFAULT_INTERVAL,
11-
BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
12-
ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
13-
)
1410

1511
log = logging.getLogger(__name__)
1612

@@ -19,46 +15,17 @@ class KeyedProducer(Producer):
1915
"""
2016
A producer which distributes messages to partitions based on the key
2117
22-
Arguments:
23-
client: The kafka client instance
18+
See Producer class for Arguments
2419
25-
Keyword Arguments:
20+
Additional Arguments:
2621
partitioner: A partitioner class that will be used to get the partition
27-
to send the message to. Must be derived from Partitioner
28-
async: If True, the messages are sent asynchronously via another
29-
thread (process). We will not wait for a response to these
30-
ack_timeout: Value (in milliseconds) indicating a timeout for waiting
31-
for an acknowledgement
32-
batch_send: If True, messages are send in batches
33-
batch_send_every_n: If set, messages are send in batches of this size
34-
batch_send_every_t: If set, messages are send after this timeout
22+
to send the message to. Must be derived from Partitioner.
23+
Defaults to HashedPartitioner.
3524
"""
36-
def __init__(self, client, partitioner=None, async=False,
37-
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
38-
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
39-
codec=None,
40-
batch_send=False,
41-
batch_send_every_n=BATCH_SEND_MSG_COUNT,
42-
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
43-
async_retry_limit=ASYNC_RETRY_LIMIT,
44-
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
45-
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
46-
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
47-
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
48-
if not partitioner:
49-
partitioner = HashedPartitioner
50-
self.partitioner_class = partitioner
25+
def __init__(self, *args, **kwargs):
26+
self.partitioner_class = kwargs.pop('partitioner', HashedPartitioner)
5127
self.partitioners = {}
52-
53-
super(KeyedProducer, self).__init__(client, req_acks, ack_timeout,
54-
codec, async, batch_send,
55-
batch_send_every_n,
56-
batch_send_every_t,
57-
async_retry_limit,
58-
async_retry_backoff_ms,
59-
async_retry_on_timeouts,
60-
async_queue_maxsize,
61-
async_queue_put_timeout)
28+
super(KeyedProducer, self).__init__(*args, **kwargs)
6229

6330
def _next_partition(self, topic, key):
6431
if topic not in self.partitioners:

kafka/producer/simple.py

Lines changed: 14 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,34 @@
11
from __future__ import absolute_import
22

3+
from itertools import cycle
34
import logging
45
import random
56
import six
67

7-
from itertools import cycle
8-
98
from six.moves import xrange
109

11-
from .base import (
12-
Producer, BATCH_SEND_DEFAULT_INTERVAL,
13-
BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
14-
ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
15-
)
10+
from .base import Producer
11+
1612

1713
log = logging.getLogger(__name__)
1814

1915

2016
class SimpleProducer(Producer):
21-
"""
22-
A simple, round-robin producer. Each message goes to exactly one partition
23-
24-
Arguments:
25-
client: The Kafka client instance to use
26-
27-
Keyword Arguments:
28-
async: If True, the messages are sent asynchronously via another
29-
thread (process). We will not wait for a response to these
30-
req_acks: A value indicating the acknowledgements that the server must
31-
receive before responding to the request
32-
ack_timeout: Value (in milliseconds) indicating a timeout for waiting
33-
for an acknowledgement
34-
batch_send: If True, messages are send in batches
35-
batch_send_every_n: If set, messages are send in batches of this size
36-
batch_send_every_t: If set, messages are send after this timeout
37-
random_start: If true, randomize the initial partition which the
17+
"""A simple, round-robin producer.
18+
19+
See Producer class for Base Arguments
20+
21+
Additional Arguments:
22+
random_start (bool, optional): randomize the initial partition which
3823
the first message block will be published to, otherwise
3924
if false, the first message block will always publish
40-
to partition 0 before cycling through each partition
25+
to partition 0 before cycling through each partition,
26+
defaults to True.
4127
"""
42-
def __init__(self, client, async=False,
43-
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
44-
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
45-
codec=None,
46-
batch_send=False,
47-
batch_send_every_n=BATCH_SEND_MSG_COUNT,
48-
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
49-
random_start=True,
50-
async_retry_limit=ASYNC_RETRY_LIMIT,
51-
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
52-
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
53-
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
54-
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
28+
def __init__(self, *args, **kwargs):
5529
self.partition_cycles = {}
56-
self.random_start = random_start
57-
super(SimpleProducer, self).__init__(client, req_acks, ack_timeout,
58-
codec, async, batch_send,
59-
batch_send_every_n,
60-
batch_send_every_t,
61-
async_retry_limit,
62-
async_retry_backoff_ms,
63-
async_retry_on_timeouts,
64-
async_queue_maxsize,
65-
async_queue_put_timeout)
30+
self.random_start = kwargs.pop('random_start', True)
31+
super(SimpleProducer, self).__init__(*args, **kwargs)
6632

6733
def _next_partition(self, topic):
6834
if topic not in self.partition_cycles:

0 commit comments

Comments
 (0)