Skip to content

Commit e2bf74f

Browse files
committed
fix(plugin): fix NPE in DefaultFileSystemMonitor (#465)
Resolves: #465
1 parent fccfff6 commit e2bf74f

File tree

8 files changed

+238
-88
lines changed

8 files changed

+238
-88
lines changed

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/storage/StateSnapshot.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package io.streamthoughts.kafka.connect.filepulse.storage;
2020

21+
import java.util.Collections;
2122
import java.util.Map;
2223
import java.util.Objects;
2324

@@ -26,6 +27,10 @@
2627
*/
2728
public class StateSnapshot<T> {
2829

30+
public static <T> StateSnapshot<T> empty() {
31+
return new StateSnapshot<>(-1, Collections.emptyMap());
32+
}
33+
2934
private final long offset;
3035

3136
private final Map<String, T> states;

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/SourceConnectorConfig.java

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicy;
2222
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectStatus;
23+
import java.time.Duration;
2324
import java.util.Arrays;
2425
import java.util.Map;
2526
import java.util.function.Predicate;
@@ -39,21 +40,35 @@ public class SourceConnectorConfig extends CommonSourceConfig {
3940
private static final String FS_CLEANUP_POLICY_EXECUTE_DOC = "Specify the status when a file get cleanup. Valid values are: " + Arrays.toString(FS_CLEANUP_POLICY_EXECUTE_VALID_VALUES);
4041

4142
/* Settings for FileSystemMonitorThread */
42-
public static final String FS_LISTING_INTERVAL_MS_CONFIG = "fs.listing.interval.ms";
43-
private static final String FS_LISTING_INTERVAL_MS_DOC = "The time interval, in milliseconds, in which the connector invokes the scan of the filesystem.";
44-
private static final long FS_LISTING_INTERVAL_MS_DEFAULT = 10000L;
43+
public static final String FS_LISTING_INTERVAL_MS_CONFIG = "fs.listing.interval.ms";
44+
private static final String FS_LISTING_INTERVAL_MS_DOC = "The time interval, in milliseconds, in which the connector invokes the scan of the filesystem.";
45+
private static final long FS_LISTING_INTERVAL_MS_DEFAULT = 10000L;
46+
47+
/* Settings for DefaultFileSystemMonitor */
48+
public static final String STATE_INITIAL_READ_TIMEOUT_MS_CONFIG = "state.initial.read.timeout.ms";
49+
public static final String STATE_INITIAL_READ_TIMEOUT_MS_DOC = "The maximum amount of time in milliseconds " +
50+
"the filesystem monitor thread waits to read all the file processing states before timing out. " +
51+
"This property is used only on connector startup.";
52+
public static final long STATE_INITIAL_READ_TIMEOUT_MS_DEFAULT = 300000L;
53+
54+
public static final String STATE_DEFAULT_READ_TIMEOUT_MS_CONFIG = "state.default.read.timeout.ms";
55+
public static final String STATE_DEFAULT_READ_TIMEOUT_MS_DOC = "The maximum amount of time in milliseconds " +
56+
"the filesystem monitor thread waits to read all the file processing states before timing out.";
57+
public static final long STATE_DEFAULT_READ_TIMEOUT_MS_DEFAULT = 5000L;
4558

4659
/* Settings for FilePulseSourceConnector */
47-
public static final String MAX_SCHEDULED_FILES_CONFIG = "max.scheduled.files";
48-
private static final String MAX_SCHEDULED_FILES_DOC = "Maximum number of files that can be schedules to tasks.";
49-
private static final int MAX_SCHEDULED_FILES_DEFAULT = 1000;
60+
public static final String MAX_SCHEDULED_FILES_CONFIG = "max.scheduled.files";
61+
private static final String MAX_SCHEDULED_FILES_DOC = "Maximum number of files that can be schedules to tasks.";
62+
private static final int MAX_SCHEDULED_FILES_DEFAULT = 1000;
5063

5164
public static final String FS_LISTING_TASK_DELEGATION_ENABLED_CONFIG = "fs.listing.task.delegation.enabled";
5265
private static final String FS_LISTING_TASK_DELEGATION_ENABLED_DOC = "Boolean indicating whether the file listing process should be delegated to tasks.";
5366

67+
5468
/**
5569
* Creates a new {@link SourceConnectorConfig} instance.
56-
* @param originals the originals configuration.
70+
*
71+
* @param originals the original configuration.
5772
*/
5873
public SourceConnectorConfig(final Map<?, ?> originals) {
5974
super(getConf(), originals);
@@ -106,6 +121,20 @@ public static ConfigDef getConf() {
106121
ConfigDef.ValidString.in(FS_CLEANUP_POLICY_EXECUTE_VALID_VALUES),
107122
ConfigDef.Importance.MEDIUM,
108123
FS_CLEANUP_POLICY_EXECUTE_DOC
124+
)
125+
.define(
126+
STATE_INITIAL_READ_TIMEOUT_MS_CONFIG,
127+
ConfigDef.Type.LONG,
128+
STATE_INITIAL_READ_TIMEOUT_MS_DEFAULT,
129+
ConfigDef.Importance.MEDIUM,
130+
STATE_INITIAL_READ_TIMEOUT_MS_DOC
131+
)
132+
.define(
133+
STATE_DEFAULT_READ_TIMEOUT_MS_CONFIG,
134+
ConfigDef.Type.LONG,
135+
STATE_DEFAULT_READ_TIMEOUT_MS_DEFAULT,
136+
ConfigDef.Importance.MEDIUM,
137+
STATE_DEFAULT_READ_TIMEOUT_MS_DOC
109138
);
110139
}
111140

@@ -136,4 +165,12 @@ public long getListingInterval() {
136165
public boolean isFileListingTaskDelegationEnabled() {
137166
return getBoolean(FS_LISTING_TASK_DELEGATION_ENABLED_CONFIG);
138167
}
168+
169+
public Duration getStateDefaultReadTimeoutMs() {
170+
return Duration.ofMillis(getLong(STATE_DEFAULT_READ_TIMEOUT_MS_CONFIG));
171+
}
172+
173+
public Duration getStateInitialReadTimeoutMs() {
174+
return Duration.ofMillis(getLong(STATE_INITIAL_READ_TIMEOUT_MS_CONFIG));
175+
}
139176
}

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/DefaultFileSystemMonitor.java

Lines changed: 50 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,6 @@ public class DefaultFileSystemMonitor implements FileSystemMonitor {
6060

6161
private static final Logger LOG = LoggerFactory.getLogger(DefaultFileSystemMonitor.class);
6262

63-
private static final Duration ON_START_READ_END_LOG_TIMEOUT = Duration.ofSeconds(30);
64-
private static final Duration DEFAULT_READ_END_LOG_TIMEOUT = Duration.ofSeconds(5);
6563
private static final int MAX_SCHEDULE_ATTEMPTS = 3;
6664

6765
private final FileSystemListing<?> fsListing;
@@ -99,12 +97,16 @@ public class DefaultFileSystemMonitor implements FileSystemMonitor {
9997

10098
private final TaskFileOrder taskFileOrder;
10199

100+
private Duration stateInitialReadTimeout = Duration.ofMinutes(5);
101+
102+
private Duration stateDefaultReadTimeout = Duration.ofSeconds(5);
103+
102104
/**
103105
* Creates a new {@link DefaultFileSystemMonitor} instance.
104106
*
105107
* @param allowTasksReconfigurationAfterTimeoutMs {@code true} to allow tasks reconfiguration after a timeout.
106108
* @param fsListening the {@link FileSystemListing} to be used for listing object files.
107-
* @param cleanPolicy the {@link GenericFileCleanupPolicy} to be used for cleaning object files.
109+
* @param cleanPolicy the {@link GenericFileCleanupPolicy} to be used for cleaning object files.
108110
* @param offsetPolicy the {@link SourceOffsetPolicy} to be used computing offset for object fileS.
109111
* @param store the {@link StateBackingStore} used for storing object file cursor.
110112
*/
@@ -162,9 +164,9 @@ public void onStateUpdate(final String key, final FileObject object) {
162164
final FileObjectMeta removed = scheduled.remove(objectId);
163165
if (removed == null && status.isOneOf(FileObjectStatus.CLEANED)) {
164166
LOG.debug(
165-
"Received cleaned status but no object-file currently scheduled for: '{}'. " +
166-
"This warn should only occurred during recovering step",
167-
key
167+
"Received cleaned status but no object-file currently scheduled for: '{}'. " +
168+
"This warn should only occurred during recovering step",
169+
key
168170
);
169171
}
170172
}
@@ -177,23 +179,29 @@ public void onStateUpdate(final String key, final FileObject object) {
177179
"with tasks processing files is already started. You can ignore that warning if the connector " +
178180
" is recovering from a crash or resuming after being paused.");
179181
}
180-
readStatesToEnd(ON_START_READ_END_LOG_TIMEOUT);
181-
recoverPreviouslyCompletedSources();
182-
// Trigger a cleanup during initialization to ensure that all cleanable
183-
// object-files are eventually removed before scheduling any tasks.
184-
cleanUpCompletedFiles();
182+
183+
if (readStatesToEnd(stateInitialReadTimeout)) {
184+
recoverPreviouslyCompletedSources();
185+
// Trigger a cleanup during initialization to ensure that all cleanable
186+
// object-files are eventually removed before scheduling any tasks.
187+
cleanUpCompletedFiles();
188+
} else {
189+
LOG.warn("Cannot recover completed files from previous execution. State is empty.");
190+
}
185191
LOG.info("Initialized FileSystemMonitor");
186192
}
187193

188194
private void recoverPreviouslyCompletedSources() {
189-
LOG.info("Recovering completed files from a previous execution");
190-
fileState.states()
191-
.entrySet()
192-
.stream()
193-
.map(it -> it.getValue().withKey(FileObjectKey.of(it.getKey())))
194-
.filter(it -> cleanablePredicate.test(it.status()))
195-
.forEach(cleanable::add);
196-
LOG.info("Finished recovering previously completed files : {}", cleanable);
195+
if (fileState != null && !fileState.states().isEmpty()) {
196+
LOG.info("Recovering completed files from a previous execution");
197+
fileState.states()
198+
.entrySet()
199+
.stream()
200+
.map(it -> it.getValue().withKey(FileObjectKey.of(it.getKey())))
201+
.filter(it -> cleanablePredicate.test(it.status()))
202+
.forEach(cleanable::add);
203+
LOG.info("Finished recovering completed files from previous execution: {}", cleanable);
204+
}
197205
}
198206

199207
private boolean readStatesToEnd(final Duration timeout) {
@@ -202,14 +210,23 @@ private boolean readStatesToEnd(final Duration timeout) {
202210
fileState = store.snapshot();
203211
LOG.debug(
204212
"Finished reading to end of log and updated states snapshot, new states log position: {}",
205-
fileState.offset());
213+
fileState.offset()
214+
);
206215
return true;
207216
} catch (TimeoutException e) {
208217
LOG.warn("Failed to reach end of states log quickly enough", e);
209218
return false;
210219
}
211220
}
212221

222+
public void setStateInitialReadTimeout(final Duration stateInitialReadTimeout) {
223+
this.stateInitialReadTimeout = stateInitialReadTimeout;
224+
}
225+
226+
public void setStateDefaultReadTimeout(final Duration stateDefaultReadTimeout) {
227+
this.stateDefaultReadTimeout = stateDefaultReadTimeout;
228+
}
229+
213230
/**
214231
* {@inheritDoc}
215232
*/
@@ -267,13 +284,13 @@ private synchronized boolean updateFiles() {
267284
final boolean noScheduledFiles = scheduled.isEmpty();
268285
if (!noScheduledFiles && allowTasksReconfigurationAfterTimeoutMs == Long.MAX_VALUE) {
269286
LOG.info(
270-
"Scheduled files still being processed: {}. Skip filesystem listing while waiting for tasks completion",
271-
scheduled.size()
287+
"Scheduled files still being processed: {}. Skip filesystem listing while waiting for tasks completion",
288+
scheduled.size()
272289
);
273290
return false;
274291
}
275292

276-
boolean toEnd = readStatesToEnd(DEFAULT_READ_END_LOG_TIMEOUT);
293+
boolean toEnd = readStatesToEnd(stateDefaultReadTimeout);
277294
if (noScheduledFiles && !toEnd) {
278295
LOG.warn("Failed to read state changelog. Skip filesystem listing due to timeout");
279296
return false;
@@ -315,7 +332,7 @@ private synchronized boolean updateFiles() {
315332
if (timeout > 0) {
316333
LOG.info(
317334
"Scheduled files still being processed ({}) but new files detected. " +
318-
"Waiting for {} ms before allowing task reconfiguration",
335+
"Waiting for {} ms before allowing task reconfiguration",
319336
scheduled.size(),
320337
timeout
321338
);
@@ -372,13 +389,13 @@ public List<FileObjectMeta> listFilesToSchedule(final int maxFilesToSchedule) {
372389
do {
373390
changed.set(false);
374391
LOG.info(
375-
"Preparing next scheduling using the object files found during last iteration (attempt={}/{}).",
376-
attempts + 1,
377-
MAX_SCHEDULE_ATTEMPTS
392+
"Preparing next scheduling using the object files found during last iteration (attempt={}/{}).",
393+
attempts + 1,
394+
MAX_SCHEDULE_ATTEMPTS
378395
);
379396
// Try to read states to end to make sure we do not attempt
380397
// to schedule an object file that has been cleanup.
381-
final boolean toEnd = readStatesToEnd(DEFAULT_READ_END_LOG_TIMEOUT);
398+
final boolean toEnd = readStatesToEnd(stateDefaultReadTimeout);
382399
if (!toEnd) {
383400
LOG.warn("Failed to read state changelog while scheduling object files. Timeout.");
384401
}
@@ -400,8 +417,8 @@ public List<FileObjectMeta> listFilesToSchedule(final int maxFilesToSchedule) {
400417
if (changed.get()) {
401418
if (attempts == MAX_SCHEDULE_ATTEMPTS) {
402419
LOG.warn(
403-
"Failed to prepare the object files after attempts: {}.",
404-
MAX_SCHEDULE_ATTEMPTS
420+
"Failed to prepare the object files after attempts: {}.",
421+
MAX_SCHEDULE_ATTEMPTS
405422
);
406423
// Make sure to clear the schedule list before returning.
407424
scheduled.clear();
@@ -415,8 +432,8 @@ public List<FileObjectMeta> listFilesToSchedule(final int maxFilesToSchedule) {
415432

416433
if (partitions.isEmpty()) {
417434
LOG.warn(
418-
"Filesystem could not be scanned quickly enough, " +
419-
"or no object file was detected after starting the connector."
435+
"Filesystem could not be scanned quickly enough, " +
436+
"or no object file was detected after starting the connector."
420437
);
421438
}
422439
return taskFileOrder.sort(partitions);
@@ -434,7 +451,7 @@ public void close() {
434451
if (running.compareAndSet(true, false)) {
435452
try {
436453
LOG.info("Closing FileSystemMonitor resources");
437-
readStatesToEnd(DEFAULT_READ_END_LOG_TIMEOUT);
454+
readStatesToEnd(stateDefaultReadTimeout);
438455
cleanUpCompletedFiles();
439456
LOG.info("Closed FileSystemMonitor resources");
440457
} catch (final Exception e) {

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceConnector.java

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.streamthoughts.kafka.connect.filepulse.fs.FileSystemListing;
3232
import io.streamthoughts.kafka.connect.filepulse.fs.FileSystemMonitor;
3333
import io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreAccess;
34+
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
3435
import java.util.ArrayList;
3536
import java.util.Collections;
3637
import java.util.HashMap;
@@ -67,7 +68,7 @@ public class FilePulseSourceConnector extends SourceConnector {
6768

6869
private SourceConnectorConfig connectorConfig;
6970

70-
private FileSystemMonitor monitor;
71+
private FileSystemMonitor fsMonitor;
7172

7273
private String connectorGroupName;
7374

@@ -110,22 +111,8 @@ public void start(final Map<String, String> props) {
110111
);
111112

112113
partitioner = connectorConfig.getTaskPartitioner();
113-
114-
final FileSystemListing<?> fileSystemListing = connectorConfig.getFileSystemListing();
115-
fileSystemListing.setFilter(new CompositeFileListFilter(connectorConfig.getFileSystemListingFilter()));
116-
117-
monitor = new DefaultFileSystemMonitor(
118-
connectorConfig.allowTasksReconfigurationAfterTimeoutMs(),
119-
fileSystemListing,
120-
connectorConfig.getFsCleanupPolicy(),
121-
connectorConfig.getFsCleanupPolicyPredicate(),
122-
connectorConfig.getSourceOffsetPolicy(),
123-
sharedStore.get().getResource(),
124-
connectorConfig.getTaskFilerOrder()
125-
);
126-
127-
monitor.setFileSystemListingEnabled(!connectorConfig.isFileListingTaskDelegationEnabled());
128-
fsMonitorThread = new FileSystemMonitorThread(context, monitor, connectorConfig.getListingInterval());
114+
fsMonitor = createFileSystemMonitor(connectorConfig, sharedStore.get().getResource());
115+
fsMonitorThread = new FileSystemMonitorThread(context, fsMonitor, connectorConfig.getListingInterval());
129116
fsMonitorThread.setUncaughtExceptionHandler((t, e) -> {
130117
LOG.info("Uncaught error from file system monitoring thread [{}]", t.getName(), e);
131118
context.raiseError(new ConnectException("Unexpected error from FileSystemMonitorThread", e));
@@ -138,6 +125,29 @@ public void start(final Map<String, String> props) {
138125
}
139126
}
140127

128+
private FileSystemMonitor createFileSystemMonitor(final SourceConnectorConfig connectorConfig,
129+
final StateBackingStore<FileObject> store) {
130+
131+
final FileSystemListing<?> fileSystemListing = connectorConfig.getFileSystemListing();
132+
fileSystemListing.setFilter(new CompositeFileListFilter(connectorConfig.getFileSystemListingFilter()));
133+
134+
DefaultFileSystemMonitor monitor = new DefaultFileSystemMonitor(
135+
connectorConfig.allowTasksReconfigurationAfterTimeoutMs(),
136+
fileSystemListing,
137+
connectorConfig.getFsCleanupPolicy(),
138+
connectorConfig.getFsCleanupPolicyPredicate(),
139+
connectorConfig.getSourceOffsetPolicy(),
140+
store,
141+
connectorConfig.getTaskFilerOrder()
142+
);
143+
144+
monitor.setStateDefaultReadTimeout(connectorConfig.getStateDefaultReadTimeoutMs());
145+
monitor.setStateInitialReadTimeout(connectorConfig.getStateInitialReadTimeoutMs());
146+
monitor.setFileSystemListingEnabled(!connectorConfig.isFileListingTaskDelegationEnabled());
147+
148+
return monitor;
149+
}
150+
141151
/**
142152
* {@inheritDoc}
143153
*/
@@ -194,7 +204,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
194204
}
195205

196206
private List<List<String>> partitionAndGet(int maxTasks) {
197-
final List<FileObjectMeta> files = monitor.listFilesToSchedule(connectorConfig.getMaxScheduledFiles());
207+
final List<FileObjectMeta> files = fsMonitor.listFilesToSchedule(connectorConfig.getMaxScheduledFiles());
198208
return partitioner.partition(files, maxTasks)
199209
.stream()
200210
.map(it -> it.stream().map(Object::toString).collect(Collectors.toList()))

0 commit comments

Comments
 (0)