Skip to content

Commit b0f6f29

Browse files
committed
fix(plugin): fix NPE from KafkaBasedLog when using mutliple worker
1 parent 1ba75b1 commit b0f6f29

File tree

6 files changed

+46
-39
lines changed

6 files changed

+46
-39
lines changed

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceConnector.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,9 @@ public void start(final Map<String, String> props) {
9797
final String tasksReporterGroupId = config.getTasksReporterGroupId();
9898
connectorGroupName = tasksReporterGroupId != null ? tasksReporterGroupId : connectName;
9999
StateBackingStoreRegistry.instance().register(connectorGroupName, () -> {
100+
final String stateStoreTopic = config.getTaskReporterTopic();
100101
final Map<String, Object> configs = config.getInternalKafkaReporterConfig();
101-
return new FileStateBackingStore(
102-
config.getTaskReporterTopic(),
103-
connectorGroupName, configs);
102+
return new FileStateBackingStore(stateStoreTopic, connectorGroupName, configs, false);
104103
});
105104

106105
final FSDirectoryWalker directoryScanner = this.config.directoryScanner();

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceTask.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,10 @@ private StateBackingStore<SourceFile> getStateStatesBackingStore() {
9999
final String groupId = config.getTasksReporterGroupId();
100100
StateBackingStoreRegistry.instance().register(groupId, () -> {
101101
final Map<String, Object> configs = config.getInternalKafkaReporterConfig();
102-
return new FileStateBackingStore(
103-
config.getTaskReporterTopic(),
104-
groupId, configs);
102+
final String stateStoreTopic = config.getTaskReporterTopic();
103+
FileStateBackingStore store = new FileStateBackingStore(stateStoreTopic, groupId, configs, true);
104+
store.start();
105+
return store;
105106
});
106107

107108
return StateBackingStoreRegistry.instance().get(groupId);

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/KafkaFileStateReporter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class KafkaFileStateReporter implements StateListener {
5353

5454
/**
5555
* Notify a state change for the specified source file.
56+
*
5657
* @param metadata the source file metadata.
5758
* @param offset the source file offset.
5859
* @param status the status.

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/state/FileStateBackingStore.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,15 @@ public class FileStateBackingStore extends KafkaStateBackingStore<SourceFile> {
3232
/**
3333
* Creates a new {@link FileStateBackingStore} instance.
3434
*
35-
* @param store the state store name.
36-
* @param groupId the group attached to the backing store.
37-
* @param configs the configuration.
35+
* @param store the state store name.
36+
* @param groupId the group attached to the backing store.
37+
* @param configs the configuration.
38+
* @param isProducerOnly is the backing store only used for writing data.
3839
*/
3940
public FileStateBackingStore(final String store,
4041
final String groupId,
41-
final Map<String, ?> configs) {
42-
super(store, KEY_PREFIX, groupId, configs, new SourceFileSerde());
42+
final Map<String, ?> configs,
43+
final boolean isProducerOnly) {
44+
super(store, KEY_PREFIX, groupId, configs, new SourceFileSerde(), isProducerOnly);
4345
}
4446
}

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/storage/KafkaBasedLog.java

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ enum States {
7373

7474
/**
7575
* Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until
76-
* {@link #start()} is invoked.
76+
* {@link #start(boolean)} is invoked.
7777
*
7878
* @param topic the topic to treat as a log
7979
* @param producerConfigs configuration options to use when creating the internal producer. At a minimum this must
@@ -86,7 +86,7 @@ enum States {
8686
* behavior of this class.
8787
* @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log
8888
* @param time Time interface
89-
* @param initializer the component that should be run when this log is {@link #start() started}; may be null
89+
* @param initializer the component that should be run when this log is {@link #start(boolean) started}; may be null
9090
*/
9191
KafkaBasedLog(final String topic,
9292
final Map<String, Object> producerConfigs,
@@ -105,43 +105,44 @@ enum States {
105105
this.state = States.CREATED;
106106
}
107107

108-
public synchronized void start() {
108+
public synchronized void start(final boolean producerOnly) {
109109
if (state != States.CREATED) {
110110
throw new IllegalStateException("Cannot restart KafkaBasedLog due to state being " + state +")");
111111
}
112112
LOG.info("Starting KafkaBasedLog with topic {}", topic);
113113
try {
114114
initializer.run();
115115
producer = createProducer();
116-
consumer = createConsumer();
116+
if (!producerOnly) {
117+
consumer = createConsumer();
117118

118-
List<TopicPartition> partitions = new ArrayList<>();
119+
List<TopicPartition> partitions = new ArrayList<>();
119120

120-
// We expect that the topics will have been created either manually by the user or automatically by the herder
121-
List<PartitionInfo> partitionInfos = null;
122-
long started = time.milliseconds();
123-
while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) {
124-
partitionInfos = consumer.partitionsFor(topic);
125-
Utils.sleep(Math.min(time.milliseconds() - started, 1000));
126-
}
127-
if (partitionInfos == null)
128-
throw new ConnectException(
129-
"Could not look up partition metadata for position backing store topic '" + topic + "' in" +
130-
" allotted period (" + CREATE_TOPIC_TIMEOUT_MS + "ms). This could indicate a connectivity issue," +
131-
" unavailable topic partitions, or if this is your first use of the topic it may have taken too long to create.");
121+
List<PartitionInfo> partitionInfos = null;
122+
long started = time.milliseconds();
123+
while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) {
124+
partitionInfos = consumer.partitionsFor(topic);
125+
Utils.sleep(Math.min(time.milliseconds() - started, 1000));
126+
}
127+
if (partitionInfos == null)
128+
throw new ConnectException(
129+
"Could not look up partition metadata for position backing store topic '" + topic + "' in" +
130+
" allotted period (" + CREATE_TOPIC_TIMEOUT_MS + "ms). This could indicate a connectivity issue," +
131+
" unavailable topic partitions, or if this is your first use of the topic it may have taken too long to create.");
132132

133-
for (PartitionInfo partition : partitionInfos)
134-
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
135-
consumer.assign(partitions);
133+
for (PartitionInfo partition : partitionInfos)
134+
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
135+
consumer.assign(partitions);
136136

137-
// Always consume from the beginning of all partitions. Necessary to ensure that we don't use committed offsets
138-
// when a 'group.id' is specified (if offsets happen to have been committed unexpectedly).
139-
consumer.seekToBeginning(partitions);
137+
// Always consume from the beginning of all partitions. Necessary to ensure that we don't use committed offsets
138+
// when a 'group.id' is specified (if offsets happen to have been committed unexpectedly).
139+
consumer.seekToBeginning(partitions);
140140

141-
readToLogEnd();
141+
readToLogEnd();
142142

143-
thread = new WorkThread();
144-
thread.start();
143+
thread = new WorkThread();
144+
thread.start();
145+
}
145146
state = States.RUNNING;
146147
LOG.info("Finished reading KafkaBasedLog for topic {}", topic);
147148
LOG.info("Started KafkaBasedLog for topic {}", topic);

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/storage/KafkaStateBackingStore.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class KafkaStateBackingStore<T> implements StateBackingStore<T> {
4646
private final Map<String, T> states = new HashMap<>();
4747
private final StateSerde<T> serde;
4848
private final String keyPrefix;
49+
private final boolean isProducerOnly;
4950
private States status = States.CREATED;
5051
private StateBackingStore.UpdateListener<T> updateListener;
5152

@@ -62,12 +63,14 @@ public KafkaStateBackingStore(final String topic,
6263
final String keyPrefix,
6364
final String groupId,
6465
final Map<String, ?> configs,
65-
final StateSerde<T> serde) {
66+
final StateSerde<T> serde,
67+
final boolean isProducerOnly) {
6668
KafkaBasedLogFactory factory = new KafkaBasedLogFactory(configs);
6769
this.configLog = factory.make(topic, new ConsumeCallback());
6870
this.groupId = groupId;
6971
this.serde = serde;
7072
this.keyPrefix = keyPrefix;
73+
this.isProducerOnly = isProducerOnly;
7174
}
7275

7376
synchronized States getState() {
@@ -89,7 +92,7 @@ public void start() {
8992
LOG.info("Starting {}", getBackingStoreName());
9093
// Before startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that
9194
// updates can continue to occur in the background
92-
configLog.start();
95+
configLog.start(isProducerOnly);
9396
setState(States.STARTED);
9497
LOG.info("Started {}", getBackingStoreName());
9598
}

0 commit comments

Comments
 (0)