diff --git a/src/main/java/com/splunk/kafka/connect/KafkaRecordTracker.java b/src/main/java/com/splunk/kafka/connect/KafkaRecordTracker.java index 63c0ffed..5a554053 100644 --- a/src/main/java/com/splunk/kafka/connect/KafkaRecordTracker.java +++ b/src/main/java/com/splunk/kafka/connect/KafkaRecordTracker.java @@ -38,39 +38,42 @@ final class KafkaRecordTracker { private AtomicLong total; private ConcurrentLinkedQueue failed; private volatile Map offsets; + private Collection partitions; public KafkaRecordTracker() { all = new ConcurrentHashMap<>(); failed = new ConcurrentLinkedQueue<>(); total = new AtomicLong(); offsets = new HashMap<>(); + partitions = new ArrayList(); } + /** + * Remove acked events and update the corresponding offsets finding the + * lowest consecutive HEC-commited offsets. + * + * @param batches the acked event batches + */ public void removeAckedEventBatches(final List batches) { - for (final EventBatch batch: batches) { - //log.debug("Processing batch {}", batch.getUUID()); - removeAckedEventBatch(batch); - } - } - - public void removeAckedEventBatch(final EventBatch batch) { - final List events = batch.getEvents(); - final Event event = events.get(0); - if (event.getTied() instanceof SinkRecord) { - final SinkRecord record = (SinkRecord) event.getTied(); - TopicPartition tp = new TopicPartition(record.topic(), record.kafkaPartition()); - //log.debug("Processing topic {} partition {}", record.topic(), record.kafkaPartition()); + log.debug("received acked event batches={}", batches); + /* Loop all *assigned* partitions to find the lowest consecutive + * HEC-commited offsets. A batch could contain events coming from a + * variety of topic/partitions, and scanning those events coulb be + * expensive. + * Note that if some events are tied to an unassigned partition those + * offsets won't be able to be commited. + */ + for (TopicPartition tp : partitions) { ConcurrentNavigableMap tpRecords = all.get(tp); if (tpRecords == null) { - log.error("KafkaRecordTracker removing a batch in an unknown partition {} {} {}", record.topic(), record.kafkaPartition(), record.kafkaOffset()); - return; + continue; // nothing to remove in this case } long offset = -1; Iterator> iter = tpRecords.entrySet().iterator(); for (; iter.hasNext();) { Map.Entry e = iter.next(); if (e.getValue().isCommitted()) { - //log.debug("processing offset {}", e.getKey()); + log.debug("processing offset {}", e.getKey()); offset = e.getKey(); iter.remove(); total.decrementAndGet(); @@ -107,16 +110,22 @@ public Collection getAndRemoveFailedRecords() { Collection records = new ArrayList<>(); while (!failed.isEmpty()) { final EventBatch batch = failed.poll(); + /* Don't return null batches. */ if (batch != null) { + /* Purge events from closed partitions because it won't be + * possible to commit their offsets. */ + batch.getEvents().removeIf(e -> !partitions.contains(getPartitionFromEvent(e))); records.add(batch); } } return records; } - // Loop through all SinkRecords for all topic partitions to - // find all lowest consecutive committed offsets, calculate - // the topic/partition offsets and then remove them + /** + * Return offsets computed when event batches are acked. + * + * @return map of topic/partition to offset/metadata + */ public Map computeOffsets() { return offsets; } @@ -124,4 +133,54 @@ public Map computeOffsets() { public long totalEvents() { return total.get(); } + + public void open(Collection partitions) { + this.partitions.addAll(partitions); + log.debug("open partitions={} so currently assigned partitions={}", + partitions, this.partitions); + } + + public void close(Collection partitions) { + this.partitions.removeAll(partitions); + log.debug("close partitions={} so currently assigned partitions={}", + partitions, this.partitions); + cleanupAfterClosedPartitions(partitions); + } + + private TopicPartition getPartitionFromEvent(Event event) { + if (event.getTied() instanceof SinkRecord) { + final SinkRecord r = (SinkRecord) event.getTied(); + return new TopicPartition(r.topic(), r.kafkaPartition()); + } else return null; + } + + /** + * Clean up and purge all things related to a partition that's closed (i.e. + * became unassigned) to this task and reported via SinkTask.close(). This + * avoids race conditions related to late received acks after a partition + * rebalance. + * + * @param partitions partition closed and now unassigned for this task + */ + public void cleanupAfterClosedPartitions(Collection partitions) + { + /* Purge offsets. */ + offsets.keySet().removeAll(partitions); + log.warn("purge offsets for closed partitions={} leaving offsets={}", + partitions, offsets); + + /* Count and purge outstanding event topic/partition records. */ + long countOfEventsToRemove = partitions.stream() + .map(tp -> all.get(tp)) // get unassigned topic/partition records + .filter(Objects::nonNull) // filter out null values + .map(tpr -> tpr.size()) // get number of tp records + .mapToInt(Integer::intValue) // map to int + .sum(); + if (countOfEventsToRemove > 0) { + log.warn("purge events={} from closed partitions={}", + countOfEventsToRemove, partitions); + all.keySet().removeAll(partitions); + total.addAndGet(-1L * countOfEventsToRemove); + } + } } diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java index 6a90c98e..306865fc 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java @@ -325,6 +325,21 @@ private EventBatch createRawEventBatch(final TopicPartition tp) { .build(); } + @Override + public void open(Collection partitions) { + tracker.open(partitions); + } + + @Override + public void close(Collection partitions) { + /* Purge buffered events tied to closed partitions because this task + * won't be able to commit their offsets. */ + bufferedRecords.removeIf(r -> partitions.contains( + new TopicPartition(r.topic(), r.kafkaPartition()))); + /* Tell tracker about now closed partitions so to clean up. */ + tracker.close(partitions); + } + @Override public Map preCommit(Map meta) { // tell Kafka Connect framework what are offsets we can safely commit to Kafka now diff --git a/src/test/java/com/splunk/kafka/connect/KafkaRecordTrackerTest.java b/src/test/java/com/splunk/kafka/connect/KafkaRecordTrackerTest.java index 94309a72..baa6f6ef 100644 --- a/src/test/java/com/splunk/kafka/connect/KafkaRecordTrackerTest.java +++ b/src/test/java/com/splunk/kafka/connect/KafkaRecordTrackerTest.java @@ -33,8 +33,9 @@ public class KafkaRecordTrackerTest { public void addFailedEventBatch() { EventBatch batch = UnitUtil.createBatch(); batch.fail(); - + batch.getEvents().get(0).setTied(createSinkRecord(1)); KafkaRecordTracker tracker = new KafkaRecordTracker(); + tracker.open(createTopicPartitionList()); tracker.addFailedEventBatch(batch); Collection failed = tracker.getAndRemoveFailedRecords(); Assert.assertEquals(1, failed.size()); @@ -55,6 +56,7 @@ public void addEventBatch() { EventBatch batch = UnitUtil.createBatch(); batch.getEvents().get(0).setTied(createSinkRecord(i)); batches.add(batch); + tracker.open(createTopicPartitionList()); tracker.addEventBatch(batch); } Map offsets = tracker.computeOffsets(); @@ -96,4 +98,10 @@ public void addEventBatchWithNonSinkRecord() { private SinkRecord createSinkRecord(long offset) { return new SinkRecord("t", 1, null, null, null, "ni, hao", offset); } + + private List createTopicPartitionList() { + ArrayList tps = new ArrayList<>(); + tps.add(new TopicPartition("t", 1)); + return tps; + } } diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java index fade05f6..b430e860 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java @@ -78,16 +78,21 @@ public void putWithoutMaxBatchAligned() { SplunkSinkTask task = new SplunkSinkTask(); HecMock hec = new HecMock(task); + TopicPartition tp = new TopicPartition(uu.configProfile.getTopics(), 1); + List partitions = new ArrayList<>(); + partitions.add(tp); // success hec.setSendReturnResult(HecMock.success); task.setHec(hec); task.start(config); + task.open(partitions); task.put(createSinkRecords(120)); Assert.assertEquals(2, hec.getBatches().size()); Map offsets = new HashMap<>(); offsets.put(new TopicPartition(uu.configProfile.getTopics(), 1), new OffsetAndMetadata(120)); Assert.assertEquals(offsets, task.preCommit(new HashMap<>())); Assert.assertTrue(task.getTracker().getAndRemoveFailedRecords().isEmpty()); + task.close(partitions); task.stop(); } @@ -105,6 +110,7 @@ public void putWithFailure() { hec.setSendReturnResult(HecMock.failure); task.setHec(hec); task.start(config); + task.open(createTopicPartitionList()); task.put(createSinkRecords(1000)); Assert.assertEquals(10, hec.getBatches().size()); Assert.assertTrue(task.getTracker().computeOffsets().isEmpty()); @@ -266,10 +272,14 @@ private void putWithSuccess(boolean raw, boolean withMeta) { SplunkSinkTask task = new SplunkSinkTask(); HecMock hec = new HecMock(task); + TopicPartition tp = new TopicPartition(uu.configProfile.getTopics(), 1); + List partitions = new ArrayList<>(); + partitions.add(tp); // success hec.setSendReturnResult(HecMock.success); task.setHec(hec); task.start(config); + task.open(partitions); task.put(createSinkRecords(total)); Assert.assertEquals(10, hec.getBatches().size()); if (raw && withMeta) { @@ -303,6 +313,7 @@ private void putWithSuccess(boolean raw, boolean withMeta) { offsets.put(new TopicPartition(uu.configProfile.getTopics(), 1), new OffsetAndMetadata(1000)); Assert.assertEquals(offsets, task.preCommit(new HashMap<>())); Assert.assertTrue(task.getTracker().getAndRemoveFailedRecords().isEmpty()); + task.close(partitions); task.stop(); } @@ -329,4 +340,10 @@ private Collection createNullSinkRecord() { records.add(rec); return records; } + + private List createTopicPartitionList() { + ArrayList tps = new ArrayList<>(); + tps.add(new TopicPartition("mytopic", 1)); + return tps; + } }