|
19 | 19 | STOP_ASYNC_PRODUCER = -1 |
20 | 20 |
|
21 | 21 |
|
| 22 | +def _send_upstream(topic, queue, client, batch_time, batch_size, |
| 23 | + req_acks, ack_timeout): |
| 24 | + """ |
| 25 | + Listen on the queue for a specified number of messages or till |
| 26 | + a specified timeout and send them upstream to the brokers in one |
| 27 | + request |
| 28 | +
|
| 29 | + NOTE: Ideally, this should have been a method inside the Producer |
| 30 | + class. However, multiprocessing module has issues in windows. The |
| 31 | + functionality breaks unless this function is kept outside of a class |
| 32 | + """ |
| 33 | + stop = False |
| 34 | + client.reinit() |
| 35 | + |
| 36 | + while not stop: |
| 37 | + timeout = batch_time |
| 38 | + count = batch_size |
| 39 | + send_at = datetime.now() + timedelta(seconds=timeout) |
| 40 | + msgset = defaultdict(list) |
| 41 | + |
| 42 | + # Keep fetching till we gather enough messages or a |
| 43 | + # timeout is reached |
| 44 | + while count > 0 and timeout >= 0: |
| 45 | + try: |
| 46 | + partition, msg = queue.get(timeout=timeout) |
| 47 | + except Empty: |
| 48 | + break |
| 49 | + |
| 50 | + # Check if the controller has requested us to stop |
| 51 | + if partition == STOP_ASYNC_PRODUCER: |
| 52 | + stop = True |
| 53 | + break |
| 54 | + |
| 55 | + # Adjust the timeout to match the remaining period |
| 56 | + count -= 1 |
| 57 | + timeout = (send_at - datetime.now()).total_seconds() |
| 58 | + msgset[partition].append(msg) |
| 59 | + |
| 60 | + # Send collected requests upstream |
| 61 | + reqs = [] |
| 62 | + for partition, messages in msgset.items(): |
| 63 | + req = ProduceRequest(topic, partition, messages) |
| 64 | + reqs.append(req) |
| 65 | + |
| 66 | + try: |
| 67 | + client.send_produce_request(reqs, |
| 68 | + acks=req_acks, |
| 69 | + timeout=ack_timeout) |
| 70 | + except Exception as exp: |
| 71 | + log.exception("Unable to send message") |
| 72 | + |
| 73 | + |
22 | 74 | class Producer(object): |
23 | 75 | """ |
24 | 76 | Base class to be used by producers |
@@ -62,60 +114,22 @@ def __init__(self, client, async=False, |
62 | 114 | self.async = async |
63 | 115 | self.req_acks = req_acks |
64 | 116 | self.ack_timeout = ack_timeout |
65 | | - self.batch_send = batch_send |
66 | | - self.batch_size = batch_send_every_n |
67 | | - self.batch_time = batch_send_every_t |
68 | 117 |
|
69 | 118 | if self.async: |
70 | 119 | self.queue = Queue() # Messages are sent through this queue |
71 | | - self.proc = Process(target=self._send_upstream, args=(self.queue,)) |
72 | | - self.proc.daemon = True # Process will die if main thread exits |
| 120 | + self.proc = Process(target=_send_upstream, |
| 121 | + args=(self.topic, |
| 122 | + self.queue, |
| 123 | + self.client.copy(), |
| 124 | + batch_send_every_t, |
| 125 | + batch_send_every_n, |
| 126 | + self.req_acks, |
| 127 | + self.ack_timeout)) |
| 128 | + |
| 129 | + # Process will die if main thread exits |
| 130 | + self.proc.daemon = True |
73 | 131 | self.proc.start() |
74 | 132 |
|
75 | | - def _send_upstream(self, queue): |
76 | | - """ |
77 | | - Listen on the queue for a specified number of messages or till |
78 | | - a specified timeout and send them upstream to the brokers in one |
79 | | - request |
80 | | - """ |
81 | | - stop = False |
82 | | - |
83 | | - while not stop: |
84 | | - timeout = self.batch_time |
85 | | - send_at = datetime.now() + timedelta(seconds=timeout) |
86 | | - count = self.batch_size |
87 | | - msgset = defaultdict(list) |
88 | | - |
89 | | - # Keep fetching till we gather enough messages or a |
90 | | - # timeout is reached |
91 | | - while count > 0 and timeout >= 0: |
92 | | - try: |
93 | | - partition, msg = queue.get(timeout=timeout) |
94 | | - except Empty: |
95 | | - break |
96 | | - |
97 | | - # Check if the controller has requested us to stop |
98 | | - if partition == STOP_ASYNC_PRODUCER: |
99 | | - stop = True |
100 | | - break |
101 | | - |
102 | | - # Adjust the timeout to match the remaining period |
103 | | - count -= 1 |
104 | | - timeout = (send_at - datetime.now()).total_seconds() |
105 | | - msgset[partition].append(msg) |
106 | | - |
107 | | - # Send collected requests upstream |
108 | | - reqs = [] |
109 | | - for partition, messages in msgset.items(): |
110 | | - req = ProduceRequest(self.topic, partition, messages) |
111 | | - reqs.append(req) |
112 | | - |
113 | | - try: |
114 | | - self.client.send_produce_request(reqs, acks=self.req_acks, |
115 | | - timeout=self.ack_timeout) |
116 | | - except Exception: |
117 | | - log.exception("Unable to send message") |
118 | | - |
119 | 133 | def send_messages(self, partition, *msg): |
120 | 134 | """ |
121 | 135 | Helper method to send produce requests |
|
0 commit comments