Skip to content

Commit 72960e1

Browse files
authored
GH-7971: Add FileReadingMessageSource.directoryExpression (#10486)
Fixes: #7971 Extend a `FileReadingMessageSource.directory` logic to the SpEL expression to evaluate on each scan * Introduce an internal `record DirFile` to keep the `File` and scanned directory per file * Propagate a new directory down to the `WatchServiceDirectoryScanner` and restart it * Calculate a root directory from the failed `Message` we offer back to the queue * Fix tests for a new `FileReadingMessageSource.directoryExpression` property * Expose a `Supplier<File>` configuration for Java DSL * Modify `FileTests.MyService.pollDirectories()` for a `Supplier<File>` configuration * Document this new feature
1 parent b162e2b commit 72960e1

File tree

13 files changed

+177
-92
lines changed

13 files changed

+177
-92
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/dsl/FileInboundChannelAdapterSpec.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,16 @@
2323
import java.util.Map;
2424
import java.util.function.Function;
2525
import java.util.function.Predicate;
26+
import java.util.function.Supplier;
2627

2728
import org.jspecify.annotations.Nullable;
2829

2930
import org.springframework.beans.DirectFieldAccessor;
31+
import org.springframework.expression.Expression;
3032
import org.springframework.integration.dsl.ComponentsRegistration;
3133
import org.springframework.integration.dsl.MessageSourceSpec;
3234
import org.springframework.integration.expression.FunctionExpression;
35+
import org.springframework.integration.expression.SupplierExpression;
3336
import org.springframework.integration.file.DirectoryScanner;
3437
import org.springframework.integration.file.FileLocker;
3538
import org.springframework.integration.file.RecursiveDirectoryScanner;
@@ -70,13 +73,13 @@ protected FileInboundChannelAdapterSpec(@Nullable Comparator<File> receptionOrde
7073
}
7174

7275
/**
73-
* Specify the input directory.
74-
* @param directory the directory.
76+
* Specify the Supplier for input directory.
77+
* @param directorySupplier the Supplier for directory to poll.
7578
* @return the spec.
76-
* @see FileReadingMessageSource#setDirectory(File)
79+
* @see FileReadingMessageSource#setDirectoryExpression(Expression)
7780
*/
78-
FileInboundChannelAdapterSpec directory(File directory) {
79-
this.target.setDirectory(directory);
81+
FileInboundChannelAdapterSpec directory(Supplier<File> directorySupplier) {
82+
this.target.setDirectoryExpression(new SupplierExpression<>(directorySupplier));
8083
return _this();
8184
}
8285

spring-integration-file/src/main/java/org/springframework/integration/file/dsl/Files.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.io.File;
2020
import java.util.Comparator;
2121
import java.util.function.Function;
22+
import java.util.function.Supplier;
2223

2324
import org.jspecify.annotations.Nullable;
2425

@@ -42,7 +43,17 @@ public abstract class Files {
4243
* @return the {@link FileInboundChannelAdapterSpec} instance.
4344
*/
4445
public static FileInboundChannelAdapterSpec inboundAdapter(File directory) {
45-
return inboundAdapter(directory, null);
46+
return inboundAdapter(() -> directory);
47+
}
48+
49+
/**
50+
* Create a {@link FileInboundChannelAdapterSpec} builder for the {@code FileReadingMessageSource}.
51+
* @param directorySupplier the Supplier for directory to scan files.
52+
* @return the {@link FileInboundChannelAdapterSpec} instance.
53+
* @since 7.0
54+
*/
55+
public static FileInboundChannelAdapterSpec inboundAdapter(Supplier<File> directorySupplier) {
56+
return inboundAdapter(directorySupplier, null);
4657
}
4758

4859
/**
@@ -54,7 +65,20 @@ public static FileInboundChannelAdapterSpec inboundAdapter(File directory) {
5465
public static FileInboundChannelAdapterSpec inboundAdapter(File directory,
5566
@Nullable Comparator<File> receptionOrderComparator) {
5667

57-
return new FileInboundChannelAdapterSpec(receptionOrderComparator).directory(directory);
68+
return inboundAdapter(() -> directory, null);
69+
}
70+
71+
/**
72+
* Create a {@link FileInboundChannelAdapterSpec} builder for the {@code FileReadingMessageSource}.
73+
* @param directorySupplier the Supplier for directory to scan files.
74+
* @param receptionOrderComparator the {@link Comparator} for ordering file objects.
75+
* @return the {@link FileInboundChannelAdapterSpec} instance.
76+
* @since 7.0
77+
*/
78+
public static FileInboundChannelAdapterSpec inboundAdapter(Supplier<File> directorySupplier,
79+
@Nullable Comparator<File> receptionOrderComparator) {
80+
81+
return new FileInboundChannelAdapterSpec(receptionOrderComparator).directory(directorySupplier);
5882
}
5983

6084
/**

spring-integration-file/src/main/java/org/springframework/integration/file/inbound/FileReadingMessageSource.java

Lines changed: 110 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@
4646
import org.jspecify.annotations.Nullable;
4747

4848
import org.springframework.context.Lifecycle;
49+
import org.springframework.expression.Expression;
4950
import org.springframework.integration.endpoint.AbstractMessageSource;
51+
import org.springframework.integration.expression.ValueExpression;
5052
import org.springframework.integration.file.DefaultDirectoryScanner;
5153
import org.springframework.integration.file.DirectoryScanner;
5254
import org.springframework.integration.file.FileHeaders;
@@ -78,7 +80,7 @@
7880
* {@link org.springframework.integration.file.filters.AcceptOnceFileListFilter}
7981
* would allow for this.
8082
* <p>
81-
* If a external {@link DirectoryScanner} is used, then the {@link FileLocker}
83+
* If an external {@link DirectoryScanner} is used, then the {@link FileLocker}
8284
* and {@link FileListFilter} objects should be set on the external
8385
* {@link DirectoryScanner}, not the instance of FileReadingMessageSource. An
8486
* {@link IllegalStateException} will result otherwise.
@@ -108,15 +110,10 @@ public class FileReadingMessageSource extends AbstractMessageSource<File> implem
108110

109111
private final AtomicBoolean running = new AtomicBoolean();
110112

111-
/*
112-
* {@link PriorityBlockingQueue#iterator()} throws
113-
* {@link java.util.ConcurrentModificationException} in Java 5.
114-
* There is no locking around the queue, so there is also no iteration.
115-
*/
116-
private final Queue<File> toBeReceived;
113+
private final Queue<DirFile> toBeReceived;
117114

118115
@SuppressWarnings("NullAway.Init")
119-
private File directory;
116+
private Expression directoryExpression;
120117

121118
private DirectoryScanner scanner = new DefaultDirectoryScanner();
122119

@@ -174,7 +171,11 @@ public FileReadingMessageSource(int internalQueueCapacity) {
174171
* @param receptionOrderComparator the comparator to be used to order the files in the internal queue
175172
*/
176173
public FileReadingMessageSource(@Nullable Comparator<File> receptionOrderComparator) {
177-
this.toBeReceived = new PriorityBlockingQueue<>(DEFAULT_INTERNAL_QUEUE_CAPACITY, receptionOrderComparator);
174+
Comparator<DirFile> comparatorToUse = null;
175+
if (receptionOrderComparator != null) {
176+
comparatorToUse = (dirFile1, dirFile2) -> receptionOrderComparator.compare(dirFile1.file, dirFile2.file);
177+
}
178+
this.toBeReceived = new PriorityBlockingQueue<>(DEFAULT_INTERNAL_QUEUE_CAPACITY, comparatorToUse);
178179
}
179180

180181
/**
@@ -183,7 +184,18 @@ public FileReadingMessageSource(@Nullable Comparator<File> receptionOrderCompara
183184
*/
184185
public void setDirectory(File directory) {
185186
Assert.notNull(directory, "directory must not be null");
186-
this.directory = directory;
187+
setDirectoryExpression(new ValueExpression<>(directory));
188+
}
189+
190+
/**
191+
* Specify a SpEL expression for an input directory.
192+
* This expression is evaluated on each scan, but not each poll.
193+
* @param directoryExpression the SpEL expression to resolve a directory to monitor on each scan.
194+
* @since 7.0
195+
*/
196+
public void setDirectoryExpression(Expression directoryExpression) {
197+
Assert.notNull(directoryExpression, "'directoryExpression' must not be null");
198+
this.directoryExpression = directoryExpression;
187199
}
188200

189201
/**
@@ -321,15 +333,23 @@ public String getComponentType() {
321333
@Override
322334
public void start() {
323335
if (!this.running.getAndSet(true)) {
324-
if (!this.directory.exists() && this.autoCreateDirectory && !this.directory.mkdirs()) {
325-
throw new IllegalStateException("Cannot create directory or its parents: " + this.directory);
336+
if (this.directoryExpression instanceof ValueExpression) {
337+
File directoryToCreate = this.directoryExpression.getValue(File.class);
338+
if (directoryToCreate == null ||
339+
(!directoryToCreate.exists() && this.autoCreateDirectory && !directoryToCreate.mkdirs())) {
340+
341+
throw new IllegalStateException("Cannot create directory or its parents: " + directoryToCreate);
342+
}
343+
Assert.isTrue(directoryToCreate.exists(),
344+
() -> "Source directory [" + directoryToCreate + "] does not exist.");
345+
Assert.isTrue(directoryToCreate.isDirectory(),
346+
() -> "Source path [" + directoryToCreate + "] does not point to a directory.");
347+
Assert.isTrue(directoryToCreate.canRead(),
348+
() -> "Source directory [" + directoryToCreate + "] is not readable.");
349+
if (this.scanner instanceof WatchServiceDirectoryScanner watchServiceDirectoryScanner) {
350+
watchServiceDirectoryScanner.directory = directoryToCreate;
351+
}
326352
}
327-
Assert.isTrue(this.directory.exists(),
328-
() -> "Source directory [" + this.directory + "] does not exist.");
329-
Assert.isTrue(this.directory.isDirectory(),
330-
() -> "Source path [" + this.directory + "] does not point to a directory.");
331-
Assert.isTrue(this.directory.canRead(),
332-
() -> "Source directory [" + this.directory + "] is not readable.");
333353
if (this.scanner instanceof Lifecycle lifecycle) {
334354
lifecycle.start();
335355
}
@@ -350,7 +370,7 @@ public boolean isRunning() {
350370

351371
@Override
352372
protected void onInit() {
353-
Assert.notNull(this.directory, "'directory' must not be null");
373+
Assert.notNull(this.directoryExpression, "'directoryExpression' must not be null");
354374

355375
Assert.state(!(this.scannerExplicitlySet && this.useWatchService),
356376
() -> "The 'scanner' and 'useWatchService' options are mutually exclusive: " + this.scanner);
@@ -380,31 +400,42 @@ protected void onInit() {
380400
scanInputDirectory();
381401
}
382402

383-
File file = this.toBeReceived.poll();
403+
DirFile dirFile = this.toBeReceived.poll();
384404

385-
// file == null means the queue was empty
386-
// we can't rely on isEmpty for concurrency reasons
387-
while ((file != null) && !this.scanner.tryClaim(file)) {
388-
file = this.toBeReceived.poll();
405+
while ((dirFile != null) && !this.scanner.tryClaim(dirFile.file)) {
406+
dirFile = this.toBeReceived.poll();
389407
}
390408

391-
if (file != null) {
409+
if (dirFile != null) {
392410
return getMessageBuilderFactory()
393-
.withPayload(file)
394-
.setHeader(FileHeaders.RELATIVE_PATH, this.directory.toPath().relativize(file.toPath()).toString())
395-
.setHeader(FileHeaders.FILENAME, file.getName())
396-
.setHeader(FileHeaders.ORIGINAL_FILE, file);
411+
.withPayload(dirFile.file)
412+
.setHeader(FileHeaders.RELATIVE_PATH,
413+
dirFile.root.toPath().relativize(dirFile.file.toPath()).toString())
414+
.setHeader(FileHeaders.FILENAME, dirFile.file.getName())
415+
.setHeader(FileHeaders.ORIGINAL_FILE, dirFile.file);
397416
}
398417

399418
return null;
400419
}
401420

402421
private void scanInputDirectory() {
403-
List<File> filteredFiles = this.scanner.listFiles(this.directory);
404-
Set<File> freshFiles = new LinkedHashSet<>(filteredFiles);
405-
if (!freshFiles.isEmpty()) {
406-
this.toBeReceived.addAll(freshFiles);
407-
logger.debug(() -> "Added to queue: " + freshFiles);
422+
File directory = this.directoryExpression.getValue(getEvaluationContext(), File.class);
423+
Assert.notNull(directory, "'directoryExpression' must not evaluate to null");
424+
if (this.scanner instanceof WatchServiceDirectoryScanner watchServiceDirectoryScanner) {
425+
if (!watchServiceDirectoryScanner.directory.equals(directory)) {
426+
watchServiceDirectoryScanner.stop();
427+
watchServiceDirectoryScanner.directory = directory;
428+
watchServiceDirectoryScanner.start();
429+
}
430+
}
431+
List<File> filteredFiles = this.scanner.listFiles(directory);
432+
433+
for (File file : filteredFiles) {
434+
this.toBeReceived.add(new DirFile(file, directory));
435+
}
436+
437+
if (!filteredFiles.isEmpty()) {
438+
logger.debug(() -> "Added to queue: " + filteredFiles);
408439
}
409440
}
410441

@@ -414,7 +445,18 @@ private void scanInputDirectory() {
414445
*/
415446
public void onFailure(Message<File> failedMessage) {
416447
logger.warn(() -> "Failed to send: " + failedMessage);
417-
this.toBeReceived.offer(failedMessage.getPayload());
448+
String relativePath = failedMessage.getHeaders().get(FileHeaders.RELATIVE_PATH, String.class);
449+
File file = failedMessage.getPayload();
450+
File root;
451+
if (relativePath != null) {
452+
String absolutePath = file.getAbsolutePath();
453+
String rootPath = absolutePath.substring(0, absolutePath.length() - relativePath.length());
454+
root = new File(rootPath);
455+
}
456+
else {
457+
root = file.getParentFile();
458+
}
459+
this.toBeReceived.offer(new DirFile(file, root));
418460
}
419461

420462
public enum WatchEventType {
@@ -444,6 +486,9 @@ private final class WatchServiceDirectoryScanner extends DefaultDirectoryScanner
444486
@SuppressWarnings("NullAway.Init")
445487
private WatchService watcher;
446488

489+
@SuppressWarnings("NullAway.Init")
490+
private volatile File directory;
491+
447492
WatchServiceDirectoryScanner() {
448493
this.kinds =
449494
Arrays.stream(FileReadingMessageSource.this.watchEvents)
@@ -461,25 +506,33 @@ public void setFilter(FileListFilter<File> filter) {
461506

462507
@Override
463508
public void start() {
509+
if (this.directory == null) {
510+
File directoryToSet =
511+
FileReadingMessageSource.this.directoryExpression.getValue(getEvaluationContext(), File.class);
512+
Assert.notNull(directoryToSet, "'directoryExpression' must not evaluate to null");
513+
this.directory = directoryToSet;
514+
}
464515
try {
465516
this.watcher = FileSystems.getDefault().newWatchService();
466-
Set<File> initialFiles = walkDirectory(FileReadingMessageSource.this.directory.toPath(), null);
517+
Set<File> initialFiles = walkDirectory(this.directory.toPath(), null);
467518
initialFiles.addAll(filesFromEvents());
468519
this.filesToPoll.addAll(initialFiles);
469520
}
470521
catch (IOException ex) {
471-
logger.error(ex, () -> "Failed to create watcher for " + FileReadingMessageSource.this.directory);
522+
logger.error(ex, () -> "Failed to create watcher for " + this.directory);
472523
}
473524
}
474525

475526
@Override
476527
public void stop() {
477528
try {
529+
this.pathKeys.forEach((path, watchKey) -> watchKey.cancel());
478530
this.watcher.close();
479531
this.pathKeys.clear();
532+
this.filesToPoll.clear();
480533
}
481534
catch (IOException ex) {
482-
logger.error(ex, () -> "Failed to close watcher for " + FileReadingMessageSource.this.directory);
535+
logger.error(ex, () -> "Failed to close watcher for " + this.directory);
483536
}
484537
}
485538

@@ -508,13 +561,14 @@ private Set<File> filesFromEvents() {
508561
while (key != null) {
509562
File parentDir = ((Path) key.watchable()).toAbsolutePath().toFile();
510563
for (WatchEvent<?> event : key.pollEvents()) {
511-
if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind()) ||
512-
StandardWatchEventKinds.ENTRY_MODIFY.equals(event.kind()) ||
513-
StandardWatchEventKinds.ENTRY_DELETE.equals(event.kind())) {
564+
WatchEvent.Kind<?> watchEventKind = event.kind();
565+
if (StandardWatchEventKinds.ENTRY_CREATE.equals(watchEventKind) ||
566+
StandardWatchEventKinds.ENTRY_MODIFY.equals(watchEventKind) ||
567+
StandardWatchEventKinds.ENTRY_DELETE.equals(watchEventKind)) {
514568

515569
processFilesFromNormalEvent(files, parentDir, event);
516570
}
517-
else if (StandardWatchEventKinds.OVERFLOW.equals(event.kind())) {
571+
else if (StandardWatchEventKinds.OVERFLOW.equals(watchEventKind)) {
518572
processFilesFromOverflowEvent(files, event);
519573
}
520574
}
@@ -574,15 +628,15 @@ private void processFilesFromOverflowEvent(Set<File> files, WatchEvent<?> event)
574628
files.addAll(walkDirectory(path, event.kind()));
575629
}
576630
else {
577-
files.addAll(walkDirectory(FileReadingMessageSource.this.directory.toPath(), event.kind()));
631+
files.addAll(walkDirectory(this.directory.toPath(), event.kind()));
578632
}
579633
}
580634

581-
private Set<File> walkDirectory(Path directory, WatchEvent.@Nullable Kind<?> kind) {
635+
private Set<File> walkDirectory(Path directoryToWalk, WatchEvent.@Nullable Kind<?> kind) {
582636
final Set<File> walkedFiles = new LinkedHashSet<>();
583637
try {
584-
registerWatch(directory);
585-
Files.walkFileTree(directory, Collections.emptySet(), FileReadingMessageSource.this.watchMaxDepth,
638+
registerWatch(directoryToWalk);
639+
Files.walkFileTree(directoryToWalk, Collections.emptySet(), FileReadingMessageSource.this.watchMaxDepth,
586640
new SimpleFileVisitor<>() {
587641

588642
@Override
@@ -610,7 +664,7 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO
610664
});
611665
}
612666
catch (IOException ex) {
613-
logger.error(ex, () -> "Failed to walk directory: " + directory.toString());
667+
logger.error(ex, () -> "Failed to walk directory: " + directoryToWalk);
614668
}
615669
return walkedFiles;
616670
}
@@ -625,4 +679,13 @@ private void registerWatch(Path dir) throws IOException {
625679

626680
}
627681

682+
private record DirFile(File file, File root) implements Comparable<DirFile> {
683+
684+
@Override
685+
public int compareTo(DirFile other) {
686+
return this.file.compareTo(other.file);
687+
}
688+
689+
}
690+
628691
}

0 commit comments

Comments
 (0)