@@ -35,39 +35,42 @@ final class KafkaRecordTracker {
3535 private AtomicLong total ;
3636 private ConcurrentLinkedQueue <EventBatch > failed ;
3737 private volatile Map <TopicPartition , OffsetAndMetadata > offsets ;
38+ private Collection <TopicPartition > partitions ;
3839
3940 public KafkaRecordTracker () {
4041 all = new ConcurrentHashMap <>();
4142 failed = new ConcurrentLinkedQueue <>();
4243 total = new AtomicLong ();
4344 offsets = new HashMap <>();
45+ partitions = new ArrayList <TopicPartition >();
4446 }
4547
48+ /**
49+ * Remove acked events and update the corresponding offsets finding the
50+ * lowest consecutive HEC-commited offsets.
51+ *
52+ * @param batches the acked event batches
53+ */
4654 public void removeAckedEventBatches (final List <EventBatch > batches ) {
47- for (final EventBatch batch : batches ) {
48- //log.debug("Processing batch {}", batch.getUUID());
49- removeAckedEventBatch (batch );
50- }
51- }
52-
53- public void removeAckedEventBatch (final EventBatch batch ) {
54- final List <Event > events = batch .getEvents ();
55- final Event event = events .get (0 );
56- if (event .getTied () instanceof SinkRecord ) {
57- final SinkRecord record = (SinkRecord ) event .getTied ();
58- TopicPartition tp = new TopicPartition (record .topic (), record .kafkaPartition ());
59- //log.debug("Processing topic {} partition {}", record.topic(), record.kafkaPartition());
55+ log .debug ("received acked event batches={}" , batches );
56+ /* Loop all *assigned* partitions to find the lowest consecutive
57+ * HEC-commited offsets. A batch could contain events coming from a
58+ * variety of topic/partitions, and scanning those events coulb be
59+ * expensive.
60+ * Note that if some events are tied to an unassigned partition those
61+ * offsets won't be able to be commited.
62+ */
63+ for (TopicPartition tp : partitions ) {
6064 TreeMap <Long , EventBatch > tpRecords = all .get (tp );
6165 if (tpRecords == null ) {
62- log .error ("KafkaRecordTracker removing a batch in an unknown partition {} {} {}" , record .topic (), record .kafkaPartition (), record .kafkaOffset ());
63- return ;
66+ continue ; // nothing to remove in this case
6467 }
6568 long offset = -1 ;
6669 Iterator <Map .Entry <Long , EventBatch >> iter = tpRecords .entrySet ().iterator ();
6770 for (; iter .hasNext ();) {
6871 Map .Entry <Long , EventBatch > e = iter .next ();
6972 if (e .getValue ().isCommitted ()) {
70- // log.debug("processing offset {}", e.getKey());
73+ log .debug ("processing offset {}" , e .getKey ());
7174 offset = e .getKey ();
7275 iter .remove ();
7376 total .decrementAndGet ();
@@ -76,11 +79,7 @@ public void removeAckedEventBatch(final EventBatch batch) {
7679 }
7780 }
7881 if (offset >= 0 ) {
79- if (offsets .containsKey (tp )) {
80- offsets .replace (tp , new OffsetAndMetadata (offset + 1 ));
81- } else {
82- offsets .put (tp , new OffsetAndMetadata (offset + 1 ));
83- }
82+ offsets .put (tp , new OffsetAndMetadata (offset + 1 ));
8483 }
8584 }
8685 }
@@ -116,21 +115,77 @@ public Collection<EventBatch> getAndRemoveFailedRecords() {
116115 Collection <EventBatch > records = new ArrayList <>();
117116 while (!failed .isEmpty ()) {
118117 final EventBatch batch = failed .poll ();
118+ /* Don't return null batches. */
119119 if (batch != null ) {
120+ /* Purge events from closed partitions because it won't be
121+ * possible to commit their offsets. */
122+ batch .getEvents ().removeIf (e -> !partitions .contains (getPartitionFromEvent (e )));
120123 records .add (batch );
121124 }
122125 }
123126 return records ;
124127 }
125128
126- // Loop through all SinkRecords for all topic partitions to
127- // find all lowest consecutive committed offsets, calculate
128- // the topic/partition offsets and then remove them
129+ /**
130+ * Return offsets computed when event batches are acked.
131+ *
132+ * @return map of topic/partition to offset/metadata
133+ */
129134 public Map <TopicPartition , OffsetAndMetadata > computeOffsets () {
130135 return offsets ;
131136 }
132137
133138 public long totalEvents () {
134139 return total .get ();
135140 }
141+
142+ public void open (Collection <TopicPartition > partitions ) {
143+ this .partitions .addAll (partitions );
144+ log .debug ("open partitions={} so currently assigned partitions={}" ,
145+ partitions , this .partitions );
146+ }
147+
148+ public void close (Collection <TopicPartition > partitions ) {
149+ this .partitions .removeAll (partitions );
150+ log .debug ("close partitions={} so currently assigned partitions={}" ,
151+ partitions , this .partitions );
152+ cleanupAfterClosedPartitions (partitions );
153+ }
154+
155+ private TopicPartition getPartitionFromEvent (Event event ) {
156+ if (event .getTied () instanceof SinkRecord ) {
157+ final SinkRecord r = (SinkRecord ) event .getTied ();
158+ return new TopicPartition (r .topic (), r .kafkaPartition ());
159+ } else return null ;
160+ }
161+
162+ /**
163+ * Clean up and purge all things related to a partition that's closed (i.e.
164+ * became unassigned) to this task and reported via SinkTask.close(). This
165+ * avoids race conditions related to late received acks after a partition
166+ * rebalance.
167+ *
168+ * @param partitions partition closed and now unassigned for this task
169+ */
170+ public void cleanupAfterClosedPartitions (Collection <TopicPartition > partitions )
171+ {
172+ /* Purge offsets. */
173+ offsets .keySet ().removeAll (partitions );
174+ log .warn ("purge offsets for closed partitions={} leaving offsets={}" ,
175+ partitions , offsets );
176+
177+ /* Count and purge outstanding event topic/partition records. */
178+ long countOfEventsToRemove = partitions .stream ()
179+ .map (tp -> all .get (tp )) // get unassigned topic/partition records
180+ .filter (Objects ::nonNull ) // filter out null values
181+ .map (tpr -> tpr .size ()) // get number of tp records
182+ .mapToInt (Integer ::intValue ) // map to int
183+ .sum ();
184+ if (countOfEventsToRemove > 0 ) {
185+ log .warn ("purge events={} from closed partitions={}" ,
186+ countOfEventsToRemove , partitions );
187+ all .keySet ().removeAll (partitions );
188+ total .addAndGet (-1L * countOfEventsToRemove );
189+ }
190+ }
136191}
0 commit comments