@@ -38,39 +38,42 @@ final class KafkaRecordTracker {
3838 private AtomicLong total ;
3939 private ConcurrentLinkedQueue <EventBatch > failed ;
4040 private volatile Map <TopicPartition , OffsetAndMetadata > offsets ;
41+ private Collection <TopicPartition > partitions ;
4142
4243 public KafkaRecordTracker () {
4344 all = new ConcurrentHashMap <>();
4445 failed = new ConcurrentLinkedQueue <>();
4546 total = new AtomicLong ();
4647 offsets = new HashMap <>();
48+ partitions = new ArrayList <TopicPartition >();
4749 }
4850
51+ /**
52+ * Remove acked events and update the corresponding offsets finding the
53+ * lowest consecutive HEC-commited offsets.
54+ *
55+ * @param batches the acked event batches
56+ */
4957 public void removeAckedEventBatches (final List <EventBatch > batches ) {
50- for (final EventBatch batch : batches ) {
51- //log.debug("Processing batch {}", batch.getUUID());
52- removeAckedEventBatch (batch );
53- }
54- }
55-
56- public void removeAckedEventBatch (final EventBatch batch ) {
57- final List <Event > events = batch .getEvents ();
58- final Event event = events .get (0 );
59- if (event .getTied () instanceof SinkRecord ) {
60- final SinkRecord record = (SinkRecord ) event .getTied ();
61- TopicPartition tp = new TopicPartition (record .topic (), record .kafkaPartition ());
62- //log.debug("Processing topic {} partition {}", record.topic(), record.kafkaPartition());
58+ log .debug ("received acked event batches={}" , batches );
59+ /* Loop all *assigned* partitions to find the lowest consecutive
60+ * HEC-commited offsets. A batch could contain events coming from a
61+ * variety of topic/partitions, and scanning those events coulb be
62+ * expensive.
63+ * Note that if some events are tied to an unassigned partition those
64+ * offsets won't be able to be commited.
65+ */
66+ for (TopicPartition tp : partitions ) {
6367 ConcurrentNavigableMap <Long , EventBatch > tpRecords = all .get (tp );
6468 if (tpRecords == null ) {
65- log .error ("KafkaRecordTracker removing a batch in an unknown partition {} {} {}" , record .topic (), record .kafkaPartition (), record .kafkaOffset ());
66- return ;
69+ continue ; // nothing to remove in this case
6770 }
6871 long offset = -1 ;
6972 Iterator <Map .Entry <Long , EventBatch >> iter = tpRecords .entrySet ().iterator ();
7073 for (; iter .hasNext ();) {
7174 Map .Entry <Long , EventBatch > e = iter .next ();
7275 if (e .getValue ().isCommitted ()) {
73- // log.debug("processing offset {}", e.getKey());
76+ log .debug ("processing offset {}" , e .getKey ());
7477 offset = e .getKey ();
7578 iter .remove ();
7679 total .decrementAndGet ();
@@ -107,21 +110,77 @@ public Collection<EventBatch> getAndRemoveFailedRecords() {
107110 Collection <EventBatch > records = new ArrayList <>();
108111 while (!failed .isEmpty ()) {
109112 final EventBatch batch = failed .poll ();
113+ /* Don't return null batches. */
110114 if (batch != null ) {
115+ /* Purge events from closed partitions because it won't be
116+ * possible to commit their offsets. */
117+ batch .getEvents ().removeIf (e -> !partitions .contains (getPartitionFromEvent (e )));
111118 records .add (batch );
112119 }
113120 }
114121 return records ;
115122 }
116123
117- // Loop through all SinkRecords for all topic partitions to
118- // find all lowest consecutive committed offsets, calculate
119- // the topic/partition offsets and then remove them
124+ /**
125+ * Return offsets computed when event batches are acked.
126+ *
127+ * @return map of topic/partition to offset/metadata
128+ */
120129 public Map <TopicPartition , OffsetAndMetadata > computeOffsets () {
121130 return offsets ;
122131 }
123132
124133 public long totalEvents () {
125134 return total .get ();
126135 }
136+
137+ public void open (Collection <TopicPartition > partitions ) {
138+ this .partitions .addAll (partitions );
139+ log .debug ("open partitions={} so currently assigned partitions={}" ,
140+ partitions , this .partitions );
141+ }
142+
143+ public void close (Collection <TopicPartition > partitions ) {
144+ this .partitions .removeAll (partitions );
145+ log .debug ("close partitions={} so currently assigned partitions={}" ,
146+ partitions , this .partitions );
147+ cleanupAfterClosedPartitions (partitions );
148+ }
149+
150+ private TopicPartition getPartitionFromEvent (Event event ) {
151+ if (event .getTied () instanceof SinkRecord ) {
152+ final SinkRecord r = (SinkRecord ) event .getTied ();
153+ return new TopicPartition (r .topic (), r .kafkaPartition ());
154+ } else return null ;
155+ }
156+
157+ /**
158+ * Clean up and purge all things related to a partition that's closed (i.e.
159+ * became unassigned) to this task and reported via SinkTask.close(). This
160+ * avoids race conditions related to late received acks after a partition
161+ * rebalance.
162+ *
163+ * @param partitions partition closed and now unassigned for this task
164+ */
165+ public void cleanupAfterClosedPartitions (Collection <TopicPartition > partitions )
166+ {
167+ /* Purge offsets. */
168+ offsets .keySet ().removeAll (partitions );
169+ log .warn ("purge offsets for closed partitions={} leaving offsets={}" ,
170+ partitions , offsets );
171+
172+ /* Count and purge outstanding event topic/partition records. */
173+ long countOfEventsToRemove = partitions .stream ()
174+ .map (tp -> all .get (tp )) // get unassigned topic/partition records
175+ .filter (Objects ::nonNull ) // filter out null values
176+ .map (tpr -> tpr .size ()) // get number of tp records
177+ .mapToInt (Integer ::intValue ) // map to int
178+ .sum ();
179+ if (countOfEventsToRemove > 0 ) {
180+ log .warn ("purge events={} from closed partitions={}" ,
181+ countOfEventsToRemove , partitions );
182+ all .keySet ().removeAll (partitions );
183+ total .addAndGet (-1L * countOfEventsToRemove );
184+ }
185+ }
127186}
0 commit comments