Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object> impl

private Duration closeTimeout = Duration.ofSeconds(DEFAULT_CLOSE_TIMEOUT);

public boolean newAssignment;

private volatile Consumer<K, V> consumer;

private volatile boolean pausing;
Expand All @@ -146,6 +148,8 @@ public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object> impl

private volatile Iterator<ConsumerRecord<K, V>> recordsIterator;

private volatile boolean stopped;

/**
* Construct an instance with the supplied parameters. Fetching multiple
* records per poll will be disabled.
Expand Down Expand Up @@ -386,12 +390,14 @@ public synchronized boolean isRunning() {
@Override
public synchronized void start() {
this.running = true;
this.stopped = false;
}

@Override
public synchronized void stop() {
stopConsumer();
this.running = false;
this.stopped = true;
}

@Override
Expand All @@ -411,6 +417,10 @@ public boolean isPaused() {

@Override
protected synchronized Object doReceive() {
if (this.stopped) {
this.logger.debug("Message source is stopped; no records will be returned");
return null;
}
if (this.consumer == null) {
createConsumer();
this.running = true;
Expand Down Expand Up @@ -511,14 +521,27 @@ private ConsumerRecord<K, V> pollRecord() {
}
else {
synchronized (this.consumerMonitor) {
ConsumerRecords<K, V> records = this.consumer
.poll(this.assignedPartitions.isEmpty() ? this.assignTimeout : this.pollTimeout);
if (records == null || records.count() == 0) {
try {
ConsumerRecords<K, V> records = this.consumer
.poll(this.assignedPartitions.isEmpty() ? this.assignTimeout : this.pollTimeout);
this.logger.debug(() -> records == null
? "Received null"
: "Received " + records.count() + " records");
if (records == null || records.count() == 0) {
return null;
}
this.remainingCount.set(records.count());
this.recordsIterator = records.iterator();
return nextRecord();
}
catch (WakeupException ex) {
this.logger.debug("Woken");
if (this.newAssignment) {
this.newAssignment = false;
return pollRecord();
}
return null;
}
this.remainingCount.set(records.count());
this.recordsIterator = records.iterator();
return nextRecord();
}
}
}
Expand Down Expand Up @@ -632,6 +655,8 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
this.providedRebalanceListener.onPartitionsAssigned(partitions);
}
}
KafkaMessageSource.this.consumer.wakeup();
KafkaMessageSource.this.newAssignment = true;
}

}
Expand Down