From 06df7da335349676ff9dfe0ce9da9a10c9a9a095 Mon Sep 17 00:00:00 2001 From: Roberto Moreda Date: Thu, 29 Oct 2020 03:21:18 +0100 Subject: [PATCH] Purge events tied to closed partitions In some cases, after a rebalance of partitions and depending on the late arrival of acks from HEC, it's needed to remove all references to events tied to the closed partitions. This purge has to be made in buffered events, failed events, offsets, and topic/partition records. This is considered safe because those events will be picked up by the task that opens the partitions closed in the original task. --- .../kafka/connect/KafkaRecordTracker.java | 97 +++++++++++++++---- .../splunk/kafka/connect/SplunkSinkTask.java | 15 +++ .../kafka/connect/KafkaRecordTrackerTest.java | 10 +- .../kafka/connect/SplunkSinkTaskTest.java | 17 ++++ 4 files changed, 119 insertions(+), 20 deletions(-) diff --git a/src/main/java/com/splunk/kafka/connect/KafkaRecordTracker.java b/src/main/java/com/splunk/kafka/connect/KafkaRecordTracker.java index 63c0ffed..5a554053 100644 --- a/src/main/java/com/splunk/kafka/connect/KafkaRecordTracker.java +++ b/src/main/java/com/splunk/kafka/connect/KafkaRecordTracker.java @@ -38,39 +38,42 @@ final class KafkaRecordTracker { private AtomicLong total; private ConcurrentLinkedQueue failed; private volatile Map offsets; + private Collection partitions; public KafkaRecordTracker() { all = new ConcurrentHashMap<>(); failed = new ConcurrentLinkedQueue<>(); total = new AtomicLong(); offsets = new HashMap<>(); + partitions = new ArrayList(); } + /** + * Remove acked events and update the corresponding offsets finding the + * lowest consecutive HEC-commited offsets. + * + * @param batches the acked event batches + */ public void removeAckedEventBatches(final List batches) { - for (final EventBatch batch: batches) { - //log.debug("Processing batch {}", batch.getUUID()); - removeAckedEventBatch(batch); - } - } - - public void removeAckedEventBatch(final EventBatch batch) { - final List events = batch.getEvents(); - final Event event = events.get(0); - if (event.getTied() instanceof SinkRecord) { - final SinkRecord record = (SinkRecord) event.getTied(); - TopicPartition tp = new TopicPartition(record.topic(), record.kafkaPartition()); - //log.debug("Processing topic {} partition {}", record.topic(), record.kafkaPartition()); + log.debug("received acked event batches={}", batches); + /* Loop all *assigned* partitions to find the lowest consecutive + * HEC-commited offsets. A batch could contain events coming from a + * variety of topic/partitions, and scanning those events coulb be + * expensive. + * Note that if some events are tied to an unassigned partition those + * offsets won't be able to be commited. + */ + for (TopicPartition tp : partitions) { ConcurrentNavigableMap tpRecords = all.get(tp); if (tpRecords == null) { - log.error("KafkaRecordTracker removing a batch in an unknown partition {} {} {}", record.topic(), record.kafkaPartition(), record.kafkaOffset()); - return; + continue; // nothing to remove in this case } long offset = -1; Iterator> iter = tpRecords.entrySet().iterator(); for (; iter.hasNext();) { Map.Entry e = iter.next(); if (e.getValue().isCommitted()) { - //log.debug("processing offset {}", e.getKey()); + log.debug("processing offset {}", e.getKey()); offset = e.getKey(); iter.remove(); total.decrementAndGet(); @@ -107,16 +110,22 @@ public Collection getAndRemoveFailedRecords() { Collection records = new ArrayList<>(); while (!failed.isEmpty()) { final EventBatch batch = failed.poll(); + /* Don't return null batches. */ if (batch != null) { + /* Purge events from closed partitions because it won't be + * possible to commit their offsets. */ + batch.getEvents().removeIf(e -> !partitions.contains(getPartitionFromEvent(e))); records.add(batch); } } return records; } - // Loop through all SinkRecords for all topic partitions to - // find all lowest consecutive committed offsets, calculate - // the topic/partition offsets and then remove them + /** + * Return offsets computed when event batches are acked. + * + * @return map of topic/partition to offset/metadata + */ public Map computeOffsets() { return offsets; } @@ -124,4 +133,54 @@ public Map computeOffsets() { public long totalEvents() { return total.get(); } + + public void open(Collection partitions) { + this.partitions.addAll(partitions); + log.debug("open partitions={} so currently assigned partitions={}", + partitions, this.partitions); + } + + public void close(Collection partitions) { + this.partitions.removeAll(partitions); + log.debug("close partitions={} so currently assigned partitions={}", + partitions, this.partitions); + cleanupAfterClosedPartitions(partitions); + } + + private TopicPartition getPartitionFromEvent(Event event) { + if (event.getTied() instanceof SinkRecord) { + final SinkRecord r = (SinkRecord) event.getTied(); + return new TopicPartition(r.topic(), r.kafkaPartition()); + } else return null; + } + + /** + * Clean up and purge all things related to a partition that's closed (i.e. + * became unassigned) to this task and reported via SinkTask.close(). This + * avoids race conditions related to late received acks after a partition + * rebalance. + * + * @param partitions partition closed and now unassigned for this task + */ + public void cleanupAfterClosedPartitions(Collection partitions) + { + /* Purge offsets. */ + offsets.keySet().removeAll(partitions); + log.warn("purge offsets for closed partitions={} leaving offsets={}", + partitions, offsets); + + /* Count and purge outstanding event topic/partition records. */ + long countOfEventsToRemove = partitions.stream() + .map(tp -> all.get(tp)) // get unassigned topic/partition records + .filter(Objects::nonNull) // filter out null values + .map(tpr -> tpr.size()) // get number of tp records + .mapToInt(Integer::intValue) // map to int + .sum(); + if (countOfEventsToRemove > 0) { + log.warn("purge events={} from closed partitions={}", + countOfEventsToRemove, partitions); + all.keySet().removeAll(partitions); + total.addAndGet(-1L * countOfEventsToRemove); + } + } } diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java index 6a90c98e..306865fc 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java @@ -325,6 +325,21 @@ private EventBatch createRawEventBatch(final TopicPartition tp) { .build(); } + @Override + public void open(Collection partitions) { + tracker.open(partitions); + } + + @Override + public void close(Collection partitions) { + /* Purge buffered events tied to closed partitions because this task + * won't be able to commit their offsets. */ + bufferedRecords.removeIf(r -> partitions.contains( + new TopicPartition(r.topic(), r.kafkaPartition()))); + /* Tell tracker about now closed partitions so to clean up. */ + tracker.close(partitions); + } + @Override public Map preCommit(Map meta) { // tell Kafka Connect framework what are offsets we can safely commit to Kafka now diff --git a/src/test/java/com/splunk/kafka/connect/KafkaRecordTrackerTest.java b/src/test/java/com/splunk/kafka/connect/KafkaRecordTrackerTest.java index 94309a72..baa6f6ef 100644 --- a/src/test/java/com/splunk/kafka/connect/KafkaRecordTrackerTest.java +++ b/src/test/java/com/splunk/kafka/connect/KafkaRecordTrackerTest.java @@ -33,8 +33,9 @@ public class KafkaRecordTrackerTest { public void addFailedEventBatch() { EventBatch batch = UnitUtil.createBatch(); batch.fail(); - + batch.getEvents().get(0).setTied(createSinkRecord(1)); KafkaRecordTracker tracker = new KafkaRecordTracker(); + tracker.open(createTopicPartitionList()); tracker.addFailedEventBatch(batch); Collection failed = tracker.getAndRemoveFailedRecords(); Assert.assertEquals(1, failed.size()); @@ -55,6 +56,7 @@ public void addEventBatch() { EventBatch batch = UnitUtil.createBatch(); batch.getEvents().get(0).setTied(createSinkRecord(i)); batches.add(batch); + tracker.open(createTopicPartitionList()); tracker.addEventBatch(batch); } Map offsets = tracker.computeOffsets(); @@ -96,4 +98,10 @@ public void addEventBatchWithNonSinkRecord() { private SinkRecord createSinkRecord(long offset) { return new SinkRecord("t", 1, null, null, null, "ni, hao", offset); } + + private List createTopicPartitionList() { + ArrayList tps = new ArrayList<>(); + tps.add(new TopicPartition("t", 1)); + return tps; + } } diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java index fade05f6..b430e860 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java @@ -78,16 +78,21 @@ public void putWithoutMaxBatchAligned() { SplunkSinkTask task = new SplunkSinkTask(); HecMock hec = new HecMock(task); + TopicPartition tp = new TopicPartition(uu.configProfile.getTopics(), 1); + List partitions = new ArrayList<>(); + partitions.add(tp); // success hec.setSendReturnResult(HecMock.success); task.setHec(hec); task.start(config); + task.open(partitions); task.put(createSinkRecords(120)); Assert.assertEquals(2, hec.getBatches().size()); Map offsets = new HashMap<>(); offsets.put(new TopicPartition(uu.configProfile.getTopics(), 1), new OffsetAndMetadata(120)); Assert.assertEquals(offsets, task.preCommit(new HashMap<>())); Assert.assertTrue(task.getTracker().getAndRemoveFailedRecords().isEmpty()); + task.close(partitions); task.stop(); } @@ -105,6 +110,7 @@ public void putWithFailure() { hec.setSendReturnResult(HecMock.failure); task.setHec(hec); task.start(config); + task.open(createTopicPartitionList()); task.put(createSinkRecords(1000)); Assert.assertEquals(10, hec.getBatches().size()); Assert.assertTrue(task.getTracker().computeOffsets().isEmpty()); @@ -266,10 +272,14 @@ private void putWithSuccess(boolean raw, boolean withMeta) { SplunkSinkTask task = new SplunkSinkTask(); HecMock hec = new HecMock(task); + TopicPartition tp = new TopicPartition(uu.configProfile.getTopics(), 1); + List partitions = new ArrayList<>(); + partitions.add(tp); // success hec.setSendReturnResult(HecMock.success); task.setHec(hec); task.start(config); + task.open(partitions); task.put(createSinkRecords(total)); Assert.assertEquals(10, hec.getBatches().size()); if (raw && withMeta) { @@ -303,6 +313,7 @@ private void putWithSuccess(boolean raw, boolean withMeta) { offsets.put(new TopicPartition(uu.configProfile.getTopics(), 1), new OffsetAndMetadata(1000)); Assert.assertEquals(offsets, task.preCommit(new HashMap<>())); Assert.assertTrue(task.getTracker().getAndRemoveFailedRecords().isEmpty()); + task.close(partitions); task.stop(); } @@ -329,4 +340,10 @@ private Collection createNullSinkRecord() { records.add(rec); return records; } + + private List createTopicPartitionList() { + ArrayList tps = new ArrayList<>(); + tps.add(new TopicPartition("mytopic", 1)); + return tps; + } }