Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 78 additions & 19 deletions src/main/java/com/splunk/kafka/connect/KafkaRecordTracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,39 +38,42 @@ final class KafkaRecordTracker {
private AtomicLong total;
private ConcurrentLinkedQueue<EventBatch> failed;
private volatile Map<TopicPartition, OffsetAndMetadata> offsets;
private Collection<TopicPartition> partitions;

public KafkaRecordTracker() {
all = new ConcurrentHashMap<>();
failed = new ConcurrentLinkedQueue<>();
total = new AtomicLong();
offsets = new HashMap<>();
partitions = new ArrayList<TopicPartition>();
}

/**
* Remove acked events and update the corresponding offsets finding the
* lowest consecutive HEC-commited offsets.
*
* @param batches the acked event batches
*/
public void removeAckedEventBatches(final List<EventBatch> batches) {
for (final EventBatch batch: batches) {
//log.debug("Processing batch {}", batch.getUUID());
removeAckedEventBatch(batch);
}
}

public void removeAckedEventBatch(final EventBatch batch) {
final List<Event> events = batch.getEvents();
final Event event = events.get(0);
if (event.getTied() instanceof SinkRecord) {
final SinkRecord record = (SinkRecord) event.getTied();
TopicPartition tp = new TopicPartition(record.topic(), record.kafkaPartition());
//log.debug("Processing topic {} partition {}", record.topic(), record.kafkaPartition());
log.debug("received acked event batches={}", batches);
/* Loop all *assigned* partitions to find the lowest consecutive
* HEC-commited offsets. A batch could contain events coming from a
* variety of topic/partitions, and scanning those events coulb be
* expensive.
* Note that if some events are tied to an unassigned partition those
* offsets won't be able to be commited.
*/
for (TopicPartition tp : partitions) {
ConcurrentNavigableMap<Long, EventBatch> tpRecords = all.get(tp);
if (tpRecords == null) {
log.error("KafkaRecordTracker removing a batch in an unknown partition {} {} {}", record.topic(), record.kafkaPartition(), record.kafkaOffset());
return;
continue; // nothing to remove in this case
}
long offset = -1;
Iterator<Map.Entry<Long, EventBatch>> iter = tpRecords.entrySet().iterator();
for (; iter.hasNext();) {
Map.Entry<Long, EventBatch> e = iter.next();
if (e.getValue().isCommitted()) {
//log.debug("processing offset {}", e.getKey());
log.debug("processing offset {}", e.getKey());
offset = e.getKey();
iter.remove();
total.decrementAndGet();
Expand Down Expand Up @@ -107,21 +110,77 @@ public Collection<EventBatch> getAndRemoveFailedRecords() {
Collection<EventBatch> records = new ArrayList<>();
while (!failed.isEmpty()) {
final EventBatch batch = failed.poll();
/* Don't return null batches. */
if (batch != null) {
/* Purge events from closed partitions because it won't be
* possible to commit their offsets. */
batch.getEvents().removeIf(e -> !partitions.contains(getPartitionFromEvent(e)));
records.add(batch);
}
}
return records;
}

// Loop through all SinkRecords for all topic partitions to
// find all lowest consecutive committed offsets, calculate
// the topic/partition offsets and then remove them
/**
* Return offsets computed when event batches are acked.
*
* @return map of topic/partition to offset/metadata
*/
public Map<TopicPartition, OffsetAndMetadata> computeOffsets() {
return offsets;
}

public long totalEvents() {
return total.get();
}

public void open(Collection<TopicPartition> partitions) {
this.partitions.addAll(partitions);
log.debug("open partitions={} so currently assigned partitions={}",
partitions, this.partitions);
}

public void close(Collection<TopicPartition> partitions) {
this.partitions.removeAll(partitions);
log.debug("close partitions={} so currently assigned partitions={}",
partitions, this.partitions);
cleanupAfterClosedPartitions(partitions);
}

private TopicPartition getPartitionFromEvent(Event event) {
if (event.getTied() instanceof SinkRecord) {
final SinkRecord r = (SinkRecord) event.getTied();
return new TopicPartition(r.topic(), r.kafkaPartition());
} else return null;
}

/**
* Clean up and purge all things related to a partition that's closed (i.e.
* became unassigned) to this task and reported via SinkTask.close(). This
* avoids race conditions related to late received acks after a partition
* rebalance.
*
* @param partitions partition closed and now unassigned for this task
*/
public void cleanupAfterClosedPartitions(Collection<TopicPartition> partitions)
{
/* Purge offsets. */
offsets.keySet().removeAll(partitions);
log.warn("purge offsets for closed partitions={} leaving offsets={}",
partitions, offsets);

/* Count and purge outstanding event topic/partition records. */
long countOfEventsToRemove = partitions.stream()
.map(tp -> all.get(tp)) // get unassigned topic/partition records
.filter(Objects::nonNull) // filter out null values
.map(tpr -> tpr.size()) // get number of tp records
.mapToInt(Integer::intValue) // map to int
.sum();
if (countOfEventsToRemove > 0) {
log.warn("purge events={} from closed partitions={}",
countOfEventsToRemove, partitions);
all.keySet().removeAll(partitions);
total.addAndGet(-1L * countOfEventsToRemove);
}
}
}
15 changes: 15 additions & 0 deletions src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,21 @@ private EventBatch createRawEventBatch(final TopicPartition tp) {
.build();
}

@Override
public void open(Collection<TopicPartition> partitions) {
tracker.open(partitions);
}

@Override
public void close(Collection<TopicPartition> partitions) {
/* Purge buffered events tied to closed partitions because this task
* won't be able to commit their offsets. */
bufferedRecords.removeIf(r -> partitions.contains(
new TopicPartition(r.topic(), r.kafkaPartition())));
/* Tell tracker about now closed partitions so to clean up. */
tracker.close(partitions);
}

@Override
public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> meta) {
// tell Kafka Connect framework what are offsets we can safely commit to Kafka now
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ public class KafkaRecordTrackerTest {
public void addFailedEventBatch() {
EventBatch batch = UnitUtil.createBatch();
batch.fail();

batch.getEvents().get(0).setTied(createSinkRecord(1));
KafkaRecordTracker tracker = new KafkaRecordTracker();
tracker.open(createTopicPartitionList());
tracker.addFailedEventBatch(batch);
Collection<EventBatch> failed = tracker.getAndRemoveFailedRecords();
Assert.assertEquals(1, failed.size());
Expand All @@ -55,6 +56,7 @@ public void addEventBatch() {
EventBatch batch = UnitUtil.createBatch();
batch.getEvents().get(0).setTied(createSinkRecord(i));
batches.add(batch);
tracker.open(createTopicPartitionList());
tracker.addEventBatch(batch);
}
Map<TopicPartition, OffsetAndMetadata> offsets = tracker.computeOffsets();
Expand Down Expand Up @@ -96,4 +98,10 @@ public void addEventBatchWithNonSinkRecord() {
private SinkRecord createSinkRecord(long offset) {
return new SinkRecord("t", 1, null, null, null, "ni, hao", offset);
}

private List<TopicPartition> createTopicPartitionList() {
ArrayList<TopicPartition> tps = new ArrayList<>();
tps.add(new TopicPartition("t", 1));
return tps;
}
}
17 changes: 17 additions & 0 deletions src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,21 @@ public void putWithoutMaxBatchAligned() {

SplunkSinkTask task = new SplunkSinkTask();
HecMock hec = new HecMock(task);
TopicPartition tp = new TopicPartition(uu.configProfile.getTopics(), 1);
List<TopicPartition> partitions = new ArrayList<>();
partitions.add(tp);
// success
hec.setSendReturnResult(HecMock.success);
task.setHec(hec);
task.start(config);
task.open(partitions);
task.put(createSinkRecords(120));
Assert.assertEquals(2, hec.getBatches().size());
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition(uu.configProfile.getTopics(), 1), new OffsetAndMetadata(120));
Assert.assertEquals(offsets, task.preCommit(new HashMap<>()));
Assert.assertTrue(task.getTracker().getAndRemoveFailedRecords().isEmpty());
task.close(partitions);
task.stop();
}

Expand All @@ -105,6 +110,7 @@ public void putWithFailure() {
hec.setSendReturnResult(HecMock.failure);
task.setHec(hec);
task.start(config);
task.open(createTopicPartitionList());
task.put(createSinkRecords(1000));
Assert.assertEquals(10, hec.getBatches().size());
Assert.assertTrue(task.getTracker().computeOffsets().isEmpty());
Expand Down Expand Up @@ -266,10 +272,14 @@ private void putWithSuccess(boolean raw, boolean withMeta) {

SplunkSinkTask task = new SplunkSinkTask();
HecMock hec = new HecMock(task);
TopicPartition tp = new TopicPartition(uu.configProfile.getTopics(), 1);
List<TopicPartition> partitions = new ArrayList<>();
partitions.add(tp);
// success
hec.setSendReturnResult(HecMock.success);
task.setHec(hec);
task.start(config);
task.open(partitions);
task.put(createSinkRecords(total));
Assert.assertEquals(10, hec.getBatches().size());
if (raw && withMeta) {
Expand Down Expand Up @@ -303,6 +313,7 @@ private void putWithSuccess(boolean raw, boolean withMeta) {
offsets.put(new TopicPartition(uu.configProfile.getTopics(), 1), new OffsetAndMetadata(1000));
Assert.assertEquals(offsets, task.preCommit(new HashMap<>()));
Assert.assertTrue(task.getTracker().getAndRemoveFailedRecords().isEmpty());
task.close(partitions);
task.stop();
}

Expand All @@ -329,4 +340,10 @@ private Collection<SinkRecord> createNullSinkRecord() {
records.add(rec);
return records;
}

private List<TopicPartition> createTopicPartitionList() {
ArrayList<TopicPartition> tps = new ArrayList<>();
tps.add(new TopicPartition("mytopic", 1));
return tps;
}
}