@@ -404,6 +404,63 @@ def __iter_partition__(self, partition, offset):
404404 offset = next_offset + 1
405405
406406
407+ def _mp_consume (client , group , topic , chunk , queue , start , exit , pause , size ):
408+ """
409+ A child process worker which consumes messages based on the
410+ notifications given by the controller process
411+
412+ NOTE: Ideally, this should have been a method inside the Consumer
413+ class. However, multiprocessing module has issues in windows. The
414+ functionality breaks unless this function is kept outside of a class
415+ """
416+
417+ # Make the child processes open separate socket connections
418+ client .reinit ()
419+
420+ # We will start consumers without auto-commit. Auto-commit will be
421+ # done by the master controller process.
422+ consumer = SimpleConsumer (client , group , topic ,
423+ partitions = chunk ,
424+ auto_commit = False ,
425+ auto_commit_every_n = None ,
426+ auto_commit_every_t = None )
427+
428+ # Ensure that the consumer provides the partition information
429+ consumer .provide_partition_info ()
430+
431+ while True :
432+ # Wait till the controller indicates us to start consumption
433+ start .wait ()
434+
435+ # If we are asked to quit, do so
436+ if exit .is_set ():
437+ break
438+
439+ # Consume messages and add them to the queue. If the controller
440+ # indicates a specific number of messages, follow that advice
441+ count = 0
442+
443+ for partition , message in consumer :
444+ queue .put ((partition , message ))
445+ count += 1
446+
447+ # We have reached the required size. The controller might have
448+ # more than what he needs. Wait for a while.
449+ # Without this logic, it is possible that we run into a big
450+ # loop consuming all available messages before the controller
451+ # can reset the 'start' event
452+ if count == size .value :
453+ pause .wait ()
454+ break
455+
456+ # In case we did not receive any message, give up the CPU for
457+ # a while before we try again
458+ if count == 0 :
459+ time .sleep (0.1 )
460+
461+ consumer .stop ()
462+
463+
407464class MultiProcessConsumer (Consumer ):
408465 """
409466 A consumer implementation that consumes partitions for a topic in
@@ -468,63 +525,16 @@ def __init__(self, client, group, topic, auto_commit=True,
468525 self .procs = []
469526 for chunk in chunks :
470527 chunk = filter (lambda x : x is not None , chunk )
471- proc = Process (target = self ._consume , args = (chunk ,))
528+ args = (client .copy (),
529+ group , topic , chunk ,
530+ self .queue , self .start , self .exit ,
531+ self .pause , self .size )
532+
533+ proc = Process (target = _mp_consume , args = args )
472534 proc .daemon = True
473535 proc .start ()
474536 self .procs .append (proc )
475537
476- def _consume (self , partitions ):
477- """
478- A child process worker which consumes messages based on the
479- notifications given by the controller process
480- """
481-
482- # Make the child processes open separate socket connections
483- self .client .reinit ()
484-
485- # We will start consumers without auto-commit. Auto-commit will be
486- # done by the master controller process.
487- consumer = SimpleConsumer (self .client , self .group , self .topic ,
488- partitions = partitions ,
489- auto_commit = False ,
490- auto_commit_every_n = None ,
491- auto_commit_every_t = None )
492-
493- # Ensure that the consumer provides the partition information
494- consumer .provide_partition_info ()
495-
496- while True :
497- # Wait till the controller indicates us to start consumption
498- self .start .wait ()
499-
500- # If we are asked to quit, do so
501- if self .exit .is_set ():
502- break
503-
504- # Consume messages and add them to the queue. If the controller
505- # indicates a specific number of messages, follow that advice
506- count = 0
507-
508- for partition , message in consumer :
509- self .queue .put ((partition , message ))
510- count += 1
511-
512- # We have reached the required size. The controller might have
513- # more than what he needs. Wait for a while.
514- # Without this logic, it is possible that we run into a big
515- # loop consuming all available messages before the controller
516- # can reset the 'start' event
517- if count == self .size .value :
518- self .pause .wait ()
519- break
520-
521- # In case we did not receive any message, give up the CPU for
522- # a while before we try again
523- if count == 0 :
524- time .sleep (0.1 )
525-
526- consumer .stop ()
527-
528538 def stop (self ):
529539 # Set exit and start off all waiting consumers
530540 self .exit .set ()
0 commit comments