From 7921ca14de3fc2626be80cc5fab1e51e7f6c9b7d Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 8 Sep 2023 16:10:01 -0400 Subject: [PATCH 1/3] GH-2971: Add `LockRegistry.executeLocked()` API Fixes https://github.com/spring-projects/spring-integration/issues/2971 * Following best practice and well-known patterns with `Jdbc`, `Rest` or `Jms` templates, introduce `default` methods into `LockRegistry` interface to make it easier to perform tasks when within a lock. * Since all the required logic is now covered by those `LockRegistry.executeLocked()` methods, there is no need in the dedicated abstract `WhileLockedProcessor` class. Deprecated it for removal in the next version * Use a new `LockRegistry.executeLocked()` API in the `FileWritingMessageHandler` instead of just deprecated `WhileLockedProcessor` * To satisfy Java limitations for checked lambdas, introduce `CheckedCallable` and `CheckedRunnable` utilities similar to interfaces in the `io.micrometer.observation.Observation` * Change existing `CheckedFunction` to expose extra generic argument for `Throwable` * Add dedicated chapter for distributed lock into docs * Fix some links and typos in the docs --- .../support/locks/LockRegistry.java | 99 +++++++++++- .../integration/util/CheckedCallable.java | 53 ++++++ .../integration/util/CheckedFunction.java | 4 +- .../integration/util/CheckedRunnable.java | 52 ++++++ .../util/WhileLockedProcessor.java | 5 +- .../locks/DefaultLockRegistryTests.java | 92 ++++++++++- .../file/FileWritingMessageHandler.java | 151 ++++++++---------- .../jms/dsl/JmsInboundGatewaySpec.java | 3 +- src/reference/antora/modules/ROOT/nav.adoc | 1 + .../modules/ROOT/pages/distributed-locks.adoc | 37 +++++ .../modules/ROOT/pages/message-store.adoc | 4 +- .../modules/ROOT/pages/meta-data-store.adoc | 2 +- .../modules/ROOT/pages/router/namespace.adoc | 3 +- .../modules/ROOT/pages/router/spel.adoc | 6 +- .../modules/ROOT/pages/scatter-gather.adoc | 4 +- .../antora/modules/ROOT/pages/whats-new.adoc | 9 +- 16 files changed, 417 insertions(+), 108 deletions(-) create mode 100644 spring-integration-core/src/main/java/org/springframework/integration/util/CheckedCallable.java create mode 100644 spring-integration-core/src/main/java/org/springframework/integration/util/CheckedRunnable.java create mode 100644 src/reference/antora/modules/ROOT/pages/distributed-locks.adoc diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/locks/LockRegistry.java b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/LockRegistry.java index 7fa229fb995..b4ccaa7947c 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/locks/LockRegistry.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/LockRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,13 +16,20 @@ package org.springframework.integration.support.locks; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; +import org.springframework.integration.util.CheckedCallable; +import org.springframework.integration.util.CheckedRunnable; + /** * Strategy for maintaining a registry of shared locks. * * @author Oleg Zhurakousky * @author Gary Russell + * @author Artem Bilan * * @since 2.1.1 */ @@ -30,10 +37,98 @@ public interface LockRegistry { /** - * Obtains the lock associated with the parameter object. + * Obtain the lock associated with the parameter object. * @param lockKey The object with which the lock is associated. * @return The associated lock. */ Lock obtain(Object lockKey); + /** + * Perform a provided task when lock for the key is locked. + * @param lockKey the lock key to use + * @param runnable the {@link CheckedRunnable} to execute within a lock + * @param type of exception runnable throws + * @throws InterruptedException from a lock operation + * @since 6.2 + */ + default void executeLocked(Object lockKey, CheckedRunnable runnable) + throws E, InterruptedException { + + executeLocked(lockKey, + () -> { + runnable.run(); + return null; + }); + } + + /** + * Perform a provided task when lock for the key is locked. + * @param lockKey the lock key to use + * @param callable the {@link CheckedCallable} to execute within a lock + * @param type of callable result + * @param type of exception callable throws + * @return the result of callable + * @throws InterruptedException from a lock operation + * @since 6.2 + */ + default T executeLocked(Object lockKey, CheckedCallable callable) + throws E, InterruptedException { + + Lock lock = obtain(lockKey); + lock.lockInterruptibly(); + try { + return callable.call(); + } + finally { + lock.unlock(); + } + } + + /** + * Perform a provided task when lock for the key is locked. + * @param lockKey the lock key to use + * @param waitLockDuration the {@link Duration} for {@link Lock#tryLock(long, TimeUnit)} + * @param runnable the {@link CheckedRunnable} to execute within a lock + * @param type of exception runnable throws + * @throws InterruptedException from a lock operation + * @throws TimeoutException when {@link Lock#tryLock(long, TimeUnit)} has elapsed + * @since 6.2 + */ + default void executeLocked(Object lockKey, Duration waitLockDuration, + CheckedRunnable runnable) throws E, InterruptedException, TimeoutException { + + executeLocked(lockKey, waitLockDuration, + () -> { + runnable.run(); + return null; + }); + } + + /** + * Perform a provided task when lock for the key is locked. + * @param lockKey the lock key to use + * @param waitLockDuration the {@link Duration} for {@link Lock#tryLock(long, TimeUnit)} + * @param callable the {@link CheckedCallable} to execute within a lock + * @param type of exception callable throws + * @throws InterruptedException from a lock operation + * @throws TimeoutException when {@link Lock#tryLock(long, TimeUnit)} has elapsed + * @since 6.2 + */ + default T executeLocked(Object lockKey, Duration waitLockDuration, + CheckedCallable callable) throws E, InterruptedException, TimeoutException { + + Lock lock = obtain(lockKey); + if (!lock.tryLock(waitLockDuration.toMillis(), TimeUnit.MILLISECONDS)) { + throw new TimeoutException( + "The lock [%s] was not acquired in time: %s".formatted(lockKey, waitLockDuration)); + } + + try { + return callable.call(); + } + finally { + lock.unlock(); + } + } + } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedCallable.java b/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedCallable.java new file mode 100644 index 00000000000..26be34099c5 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedCallable.java @@ -0,0 +1,53 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.util; + +/** + * A Callable-like interface which allows throwing Error. + * + * @param the output type. + * @param the throwable type. + * + * @author Artem Bilan + * + * @since 6.2 + */ +@FunctionalInterface +public interface CheckedCallable { + + T call() throws E; + + default Runnable unchecked() { + return () -> { + try { + call(); + } + catch (Throwable t) { // NOSONAR + if (t instanceof RuntimeException runtimeException) { // NOSONAR + throw runtimeException; + } + else if (t instanceof Error error) { // NOSONAR + throw error; + } + else { + throw new IllegalStateException(t); + } + } + }; + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedFunction.java b/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedFunction.java index 72f18dc9743..3ba74a3185b 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedFunction.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedFunction.java @@ -29,9 +29,9 @@ * @since 6.1 */ @FunctionalInterface -public interface CheckedFunction { +public interface CheckedFunction { - R apply(T t) throws Throwable; // NOSONAR + R apply(T t) throws E; default Function unchecked() { return t1 -> { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedRunnable.java b/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedRunnable.java new file mode 100644 index 00000000000..f48cbd7356e --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedRunnable.java @@ -0,0 +1,52 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.util; + +/** + * A Runnable-like interface which allows throwing Error. + * + * @param the throwable type. + * + * @author Artem Bilan + * + * @since 6.2 + */ +@FunctionalInterface +public interface CheckedRunnable { + + void run() throws E; + + default Runnable unchecked() { + return () -> { + try { + run(); + } + catch (Throwable t) { // NOSONAR + if (t instanceof RuntimeException runtimeException) { // NOSONAR + throw runtimeException; + } + else if (t instanceof Error error) { // NOSONAR + throw error; + } + else { + throw new IllegalStateException(t); + } + } + }; + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/util/WhileLockedProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/util/WhileLockedProcessor.java index 7f9d1fbd9cf..c076a0f76ba 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/util/WhileLockedProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/util/WhileLockedProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,10 +30,13 @@ * then call {@link #doWhileLocked()}. * * @author Oleg Zhurakousky + * @author Artem Bilan * * @since 2.2 * + * @deprecated since 6.2 in favor of {@link LockRegistry#executeLocked}. */ +@Deprecated(since = "6.2", forRemoval = true) public abstract class WhileLockedProcessor { private final Object key; diff --git a/spring-integration-core/src/test/java/org/springframework/integration/support/locks/DefaultLockRegistryTests.java b/spring-integration-core/src/test/java/org/springframework/integration/support/locks/DefaultLockRegistryTests.java index d9de078724d..c6420e500b5 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/support/locks/DefaultLockRegistryTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/support/locks/DefaultLockRegistryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,28 +16,42 @@ package org.springframework.integration.support.locks; +import java.time.Duration; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; -import org.junit.Test; +import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; /** * @author Gary Russell * @author Oleg Zhurakousky + * @author Artem Bilan + * * @since 2.1.1 * */ public class DefaultLockRegistryTests { - @Test(expected = IllegalArgumentException.class) + @Test public void testBadMask() { - new DefaultLockRegistry(4); + assertThatIllegalArgumentException() + .isThrownBy(() -> new DefaultLockRegistry(4)); } - @Test(expected = IllegalArgumentException.class) + @Test public void testBadMaskOutOfRange() { // 32bits - new DefaultLockRegistry(0xffffffff); + assertThatIllegalArgumentException() + .isThrownBy(() -> new DefaultLockRegistry(0xffffffff)); } @Test @@ -197,4 +211,70 @@ public int hashCode() { assertThat(moreLocks[3]).isSameAs(locks[3]); } + @Test + public void cyclicBarrierIsBrokenWhenExecutedConcurrentlyInLock() throws Exception { + LockRegistry registry = new DefaultLockRegistry(1); + + CyclicBarrier cyclicBarrier = new CyclicBarrier(2); + CountDownLatch brokenBarrierLatch = new CountDownLatch(2); + + Runnable runnableLocked = () -> { + try { + registry.executeLocked("lockKey", + () -> { + try { + cyclicBarrier.await(1, TimeUnit.SECONDS); + } + catch (BrokenBarrierException | TimeoutException e) { + brokenBarrierLatch.countDown(); + } + }); + } + catch (Exception e) { + throw new RuntimeException(e); + } + }; + + ExecutorService executorService = Executors.newCachedThreadPool(); + + executorService.execute(runnableLocked); + executorService.execute(runnableLocked); + + assertThat(brokenBarrierLatch.await(10, TimeUnit.SECONDS)).isTrue(); + } + + @Test + public void executeLockedIsTimedOutInOtherThread() throws Exception { + LockRegistry registry = new DefaultLockRegistry(1); + + String lockKey = "lockKey"; + Duration waitLockDuration = Duration.ofMillis(100); + + CountDownLatch timeoutExceptionLatch = new CountDownLatch(1); + AtomicReference exceptionAtomicReference = new AtomicReference<>(); + + Runnable runnable = () -> { + try { + registry.executeLocked(lockKey, waitLockDuration, () -> Thread.sleep(200)); + } + catch (TimeoutException e) { + exceptionAtomicReference.set(e); + timeoutExceptionLatch.countDown(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + }; + + ExecutorService executorService = Executors.newCachedThreadPool(); + + executorService.execute(runnable); + executorService.execute(runnable); + + assertThat(timeoutExceptionLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(exceptionAtomicReference.get()) + .hasMessage("The lock [%s] was not acquired in time: %s".formatted(lockKey, waitLockDuration)); + } + } + diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/FileWritingMessageHandler.java b/spring-integration-file/src/main/java/org/springframework/integration/file/FileWritingMessageHandler.java index 66225d26c26..7d6f9f7ba6c 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/FileWritingMessageHandler.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/FileWritingMessageHandler.java @@ -19,6 +19,7 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.BufferedWriter; +import java.io.Closeable; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -61,9 +62,9 @@ import org.springframework.integration.support.locks.PassThruLockRegistry; import org.springframework.integration.support.management.ManageableLifecycle; import org.springframework.integration.support.utils.IntegrationUtils; -import org.springframework.integration.util.WhileLockedProcessor; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandlingException; +import org.springframework.messaging.MessagingException; import org.springframework.scheduling.TaskScheduler; import org.springframework.util.Assert; import org.springframework.util.StreamUtils; @@ -644,28 +645,29 @@ private File handleInputStreamMessage(InputStream sourceFileInputStream, File or FileExistsMode.APPEND.equals(this.fileExistsMode) || FileExistsMode.APPEND_NO_FLUSH.equals(this.fileExistsMode); - if (append) { - final File fileToWriteTo = determineFileToWrite(resultFile, tempFile); - - WhileLockedProcessor whileLockedProcessor = new WhileLockedProcessor(this.lockRegistry, - fileToWriteTo.getAbsolutePath()) { + File fileToCleanUpAfterCopy = tempFile; - @Override - protected void whileLocked() throws IOException { - if (FileWritingMessageHandler.this.newFileCallback != null && !fileToWriteTo.exists()) { - FileWritingMessageHandler.this.newFileCallback.accept(fileToWriteTo, requestMessage); - } + if (append) { + File fileToWriteTo = determineFileToWrite(resultFile, tempFile); - appendStreamToFile(fileToWriteTo, sourceFileInputStream); - } + try { + this.lockRegistry.executeLocked(fileToWriteTo.getAbsolutePath(), + () -> { + if (this.newFileCallback != null && !fileToWriteTo.exists()) { + this.newFileCallback.accept(fileToWriteTo, requestMessage); + } + + appendStreamToFile(fileToWriteTo, sourceFileInputStream); + }); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new MessagingException(requestMessage, "Thread was interrupted while performing task", ex); + } - }; - whileLockedProcessor.doWhileLocked(); - cleanUpAfterCopy(fileToWriteTo, resultFile, originalFile); - return resultFile; + fileToCleanUpAfterCopy = fileToWriteTo; } else { - try (InputStream inputStream = sourceFileInputStream; OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile), this.bufferSize)) { @@ -679,9 +681,10 @@ protected void whileLocked() throws IOException { } outputStream.flush(); } - cleanUpAfterCopy(tempFile, resultFile, originalFile); - return resultFile; } + + cleanUpAfterCopy(fileToCleanUpAfterCopy, resultFile, originalFile); + return resultFile; } private void appendStreamToFile(File fileToWriteTo, InputStream sourceFileInputStream) throws IOException { @@ -694,25 +697,29 @@ private void appendStreamToFile(File fileToWriteTo, InputStream sourceFileInputS while ((bytesRead = inputStream.read(buffer)) != -1) { // NOSONAR bos.write(buffer, 0, bytesRead); } - if (FileWritingMessageHandler.this.appendNewLine) { + if (this.appendNewLine) { bos.write(System.lineSeparator().getBytes()); } } finally { - try { - if (state == null || FileWritingMessageHandler.this.flushTask == null) { - if (bos != null) { - bos.close(); - } - clearState(fileToWriteTo, state); - } - else { - state.lastWrite = System.currentTimeMillis(); + cleanUpFileState(fileToWriteTo, state, bos); + } + } + + private void cleanUpFileState(File fileToWriteTo, FileState state, Closeable closeable) { + try { + if (state == null || this.flushTask == null) { + if (closeable != null) { + closeable.close(); } + clearState(fileToWriteTo, state); } - catch (IOException ex) { + else { + state.lastWrite = System.currentTimeMillis(); } } + catch (IOException ex) { + } } private File handleByteArrayMessage(byte[] bytes, File originalFile, File tempFile, File resultFile, @@ -723,20 +730,21 @@ private File handleByteArrayMessage(byte[] bytes, File originalFile, File tempFi boolean append = FileExistsMode.APPEND.equals(this.fileExistsMode) || FileExistsMode.APPEND_NO_FLUSH.equals(this.fileExistsMode); - WhileLockedProcessor whileLockedProcessor = new WhileLockedProcessor(this.lockRegistry, - fileToWriteTo.getAbsolutePath()) { - - @Override - protected void whileLocked() throws IOException { - if (append && FileWritingMessageHandler.this.newFileCallback != null && !fileToWriteTo.exists()) { - FileWritingMessageHandler.this.newFileCallback.accept(fileToWriteTo, requestMessage); - } - - writeBytesToFile(fileToWriteTo, append, bytes); - } + try { + this.lockRegistry.executeLocked(fileToWriteTo.getAbsolutePath(), + () -> { + if (append && this.newFileCallback != null && !fileToWriteTo.exists()) { + this.newFileCallback.accept(fileToWriteTo, requestMessage); + } + + writeBytesToFile(fileToWriteTo, append, bytes); + }); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new MessagingException(requestMessage, "Thread was interrupted while performing task", ex); + } - }; - whileLockedProcessor.doWhileLocked(); cleanUpAfterCopy(fileToWriteTo, resultFile, originalFile); return resultFile; } @@ -752,19 +760,7 @@ private void writeBytesToFile(File fileToWriteTo, boolean append, byte[] bytes) } } finally { - try { - if (state == null || this.flushTask == null) { - if (bos != null) { - bos.close(); - } - clearState(fileToWriteTo, state); - } - else { - state.lastWrite = System.currentTimeMillis(); - } - } - catch (IOException ex) { - } + cleanUpFileState(fileToWriteTo, state, bos); } } @@ -776,20 +772,21 @@ private File handleStringMessage(String content, File originalFile, File tempFil boolean append = FileExistsMode.APPEND.equals(this.fileExistsMode) || FileExistsMode.APPEND_NO_FLUSH.equals(this.fileExistsMode); - WhileLockedProcessor whileLockedProcessor = new WhileLockedProcessor(this.lockRegistry, - fileToWriteTo.getAbsolutePath()) { - - @Override - protected void whileLocked() throws IOException { - if (append && FileWritingMessageHandler.this.newFileCallback != null && !fileToWriteTo.exists()) { - FileWritingMessageHandler.this.newFileCallback.accept(fileToWriteTo, requestMessage); - } - - writeStringToFile(fileToWriteTo, append, content); - } - }; - whileLockedProcessor.doWhileLocked(); + try { + this.lockRegistry.executeLocked(fileToWriteTo.getAbsolutePath(), + () -> { + if (append && this.newFileCallback != null && !fileToWriteTo.exists()) { + this.newFileCallback.accept(fileToWriteTo, requestMessage); + } + + writeStringToFile(fileToWriteTo, append, content); + }); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new MessagingException(requestMessage, "Thread was interrupted while performing task", ex); + } cleanUpAfterCopy(fileToWriteTo, resultFile, originalFile); return resultFile; @@ -806,19 +803,7 @@ private void writeStringToFile(File fileToWriteTo, boolean append, String conten } } finally { - try { - if (state == null || FileWritingMessageHandler.this.flushTask == null) { - if (writer != null) { - writer.close(); - } - clearState(fileToWriteTo, state); - } - else { - state.lastWrite = System.currentTimeMillis(); - } - } - catch (IOException ex) { - } + cleanUpFileState(fileToWriteTo, state, writer); } } diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsInboundGatewaySpec.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsInboundGatewaySpec.java index d84d45a7033..22e1d4548ec 100644 --- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsInboundGatewaySpec.java +++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsInboundGatewaySpec.java @@ -19,6 +19,7 @@ import java.util.function.Consumer; import jakarta.jms.Destination; +import jakarta.jms.JMSException; import jakarta.jms.Message; import org.springframework.expression.Expression; @@ -162,7 +163,7 @@ public S replyToExpression(String replyToExpression) { * @since 6.1 * @see ChannelPublishingJmsMessageListener#setReplyToExpression(Expression) */ - public S replyToFunction(CheckedFunction replyToFunction) { + public S replyToFunction(CheckedFunction replyToFunction) { return replyToExpression(new FunctionExpression<>(replyToFunction.unchecked())); } diff --git a/src/reference/antora/modules/ROOT/nav.adoc b/src/reference/antora/modules/ROOT/nav.adoc index 3fa30b40deb..e641cfacb6b 100644 --- a/src/reference/antora/modules/ROOT/nav.adoc +++ b/src/reference/antora/modules/ROOT/nav.adoc @@ -94,6 +94,7 @@ ** xref:message-history.adoc[] ** xref:message-store.adoc[] ** xref:meta-data-store.adoc[] +** xref:distributed-locks.adoc[] ** xref:control-bus.adoc[] ** xref:shutdown.adoc[] ** xref:graph.adoc[] diff --git a/src/reference/antora/modules/ROOT/pages/distributed-locks.adoc b/src/reference/antora/modules/ROOT/pages/distributed-locks.adoc new file mode 100644 index 00000000000..be633698441 --- /dev/null +++ b/src/reference/antora/modules/ROOT/pages/distributed-locks.adoc @@ -0,0 +1,37 @@ +[[distributed-locks]] += Distributed Locks + +In many situations the action against some context (or even single message) has to be performed in an exclusive manner. +One of the example is an aggregator component where we have to check the message group state for the current message to device whether we can release group or just add that message for future consideration. +For this purpose Java provides a good API with `java.util.concurrent.locks.Lock` implementations. +However, the problem becomes more complex when our system is distributed and/or run in the cluster. +The locking in this case is challenging and requires some shared and its specific approach to achieve an exclusivity requirement. + +Spring Integration suggests a `LockRegistrty` abstraction with an in-memory `DefaultLockRegistry` implementation based on the `ReentrantLock` API. +The `obtain(Object)` method of `LockRegistrty` requires some `lock key` for specific context. +For example, an aggregator uses a `correlationKey` to lock operations around its group. +This way different locks can be used concurrently. +This `obtain(Object)` method returns a `java.util.concurrent.locks.Lock` instance (depending on the `LockRegistrty` implementation), therefore the rest of the logic is the same as standard Java Concurrency algorithm. + +Starting with version 6.2, the `LockRegistry` provides an `executeLocked()` API (`default` methods in this interface) to perform some task while locked. +The behavior of this API is similar to well-known `JdbcTemplate`, `JmsTemplate` or `RestTemplate`. +The following example demonstrates the usage of this API: + +[source,java] +---- +LockRegistry registry = new DefaultLockRegistry(); +... +registry.executeLocked("someLockKey", () -> someExclusiveResourceCall()); +---- + +The method rethrows an exception from the task call, throws an `InterruptedException` if `Lock` is interrupted. +In addition, a variant with `Duration` throws a `java.util.concurrent.TimeoutException` when `lock.tryLock()` returns `false`. + +Spring Integration provides these `LockRegistrty` implementations for distributed locks: + +* xref:hazelcast.adoc#hazelcast-lock-registry[Hazelcast] +* xref:jdbc/lock-registry.adoc[JDBC] +* xref:redis.adoc#redis-lock-registry[Redis] +* xref:zookeeper.adoc#zk-lock-registry[Zookeeper] + +https://github.com/spring-projects/spring-integration-aws[Spring Integration AWS] extension also implements a `DynamoDbLockRegistry`. \ No newline at end of file diff --git a/src/reference/antora/modules/ROOT/pages/message-store.adoc b/src/reference/antora/modules/ROOT/pages/message-store.adoc index 8c8d5f26a3d..3688e452bdf 100644 --- a/src/reference/antora/modules/ROOT/pages/message-store.adoc +++ b/src/reference/antora/modules/ROOT/pages/message-store.adoc @@ -27,7 +27,7 @@ The following pair of examples show how to add a reference to a message store fo .Aggregator [source,xml] ---- - + ---- By default, messages are stored in-memory by using `o.s.i.store.SimpleMessageStore`, an implementation of `MessageStore`. @@ -151,7 +151,7 @@ The `MessageGroupStore` exposes a `setGroupCondition(Object groupId, String cond For this purpose a `setGroupConditionSupplier(BiFunction, String, String>)` option has been added to the `AbstractCorrelatingMessageHandler`. This function is evaluated against each message after it has been added to the group as well as the existing condition of the group. The implementation may decide to return a new value, the existing value, or reset the target condition to `null`. -The value for a `condition` can be a JSON, SpEL expression, number or anything what can be serialized as a string and parsed afterwards. +The value for a `condition` can be a JSON, SpEL expression, number or anything what can be serialized as a string and parsed afterward. For example, the `FileMarkerReleaseStrategy` from the xref:file/aggregator.adoc[File Aggregator] component, populates a condition into a group from the `FileHeaders.LINE_COUNT` header of the `FileSplitter.FileMarker.Mark.END` message and consults with it from its `canRelease()` comparing a group size with the value in this condition. This way it doesn't iterate all the messages in group to find a `FileSplitter.FileMarker.Mark.END` message with the `FileHeaders.LINE_COUNT` header. It also allows the end marker to arrive at the aggregator before all the other records; for example when processing a file in a multi-threaded environment. diff --git a/src/reference/antora/modules/ROOT/pages/meta-data-store.adoc b/src/reference/antora/modules/ROOT/pages/meta-data-store.adoc index 33c06004f7a..6d99d43b831 100644 --- a/src/reference/antora/modules/ROOT/pages/meta-data-store.adoc +++ b/src/reference/antora/modules/ROOT/pages/meta-data-store.adoc @@ -15,7 +15,7 @@ If you need to persist metadata between application context restarts, the framew * `PropertiesPersistingMetadataStore` * xref:hazelcast.adoc#hazelcast-metadata-store[Hazelcast Metadata Store] -* xref:jdbc.adoc#jdbc-metadata-store[JDBC Metadata Store] +* xref:jdbc/metadata-store.adoc[JDBC Metadata Store] * xref:mongodb.adoc#mongodb-metadata-store[MongoDB Metadata Store] * xref:redis.adoc#redis-metadata-store[Redis Metadata Store] * xref:zookeeper.adoc#zk-metadata-store[Zookeeper Metadata Store] diff --git a/src/reference/antora/modules/ROOT/pages/router/namespace.adoc b/src/reference/antora/modules/ROOT/pages/router/namespace.adoc index 441cb882ee7..854804f55f4 100644 --- a/src/reference/antora/modules/ROOT/pages/router/namespace.adoc +++ b/src/reference/antora/modules/ROOT/pages/router/namespace.adoc @@ -4,8 +4,7 @@ Spring Integration provides a generic router. You can use it for general-purpose routing (as opposed to the other routers provided by Spring Integration, each of which has some form of specialization). -[[configuring-a-content-based-router-with-xml]] -== Configuring a Content-based Router with XML +The following section explains a router configuration with an XML components. The `router` element provides a way to connect a router to an input channel and also accepts the optional `default-output-channel` attribute. The `ref` attribute references the bean name of a custom router implementation (which must extend `AbstractMessageRouter`). diff --git a/src/reference/antora/modules/ROOT/pages/router/spel.adoc b/src/reference/antora/modules/ROOT/pages/router/spel.adoc index 5e3ebee298c..994f629c86a 100644 --- a/src/reference/antora/modules/ROOT/pages/router/spel.adoc +++ b/src/reference/antora/modules/ROOT/pages/router/spel.adoc @@ -4,7 +4,7 @@ Sometimes, the routing logic may be simple, and writing a separate class for it and configuring it as a bean may seem like overkill. As of Spring Integration 2.0, we offer an alternative that lets you use SpEL to implement simple computations that previously required a custom POJO router. -NOTE: For more information about the Spring Expression Language, see the https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#expressions[relevant chapter in the Spring Framework Reference Guide]. +NOTE: For more information about the Spring Expression Language, see the https://docs.spring.io/spring-framework/reference/core/expressions.html[relevant chapter in the Spring Framework Reference Guide]. Generally, a SpEL expression is evaluated and its result is mapped to a channel, as the following example shows: @@ -69,6 +69,6 @@ In the above configuration, if the message includes a header with a name of 'cha You may also find collection projection and collection selection expressions useful when you need to select multiple channels. For further information, see: -* https://docs.spring.io/spring-framework/docs/current/spring-framework-reference/core.html#expressions-collection-projection[Collection Projection] -* https://docs.spring.io/spring-framework/docs/current/spring-framework-reference/core.html#expressions-collection-selection[Collection Selection] +* https://docs.spring.io/spring-framework/reference/core/expressions/language-ref/collection-projection.html[Collection Projection] +* https://docs.spring.io/spring-framework/reference/core/expressions/language-ref/collection-selection.html[Collection Selection] diff --git a/src/reference/antora/modules/ROOT/pages/scatter-gather.adoc b/src/reference/antora/modules/ROOT/pages/scatter-gather.adoc index 7137846eaa3..9f78c5e2595 100644 --- a/src/reference/antora/modules/ROOT/pages/scatter-gather.adoc +++ b/src/reference/antora/modules/ROOT/pages/scatter-gather.adoc @@ -108,7 +108,7 @@ The following example shows how to configure the `` endpoint by <1> The id of the endpoint. The `ScatterGatherHandler` bean is registered with an alias of `id + '.handler'`. The `RecipientListRouter` bean is registered with an alias of `id + '.scatterer'`. -The `AggregatingMessageHandler`bean is registered with an alias of `id + '.gatherer'`. +The `AggregatingMessageHandler` bean is registered with an alias of `id + '.gatherer'`. Optional. (The `BeanFactory` generates a default `id` value.) <2> Lifecycle attribute signaling whether the endpoint should be started during application context initialization. @@ -171,7 +171,7 @@ This way all other sub-flows will work for nothing and their replies are going t This might be an expected behavior sometimes, but in most cases it would be better to handle the error in the particular sub-flow without impacting all others and the expectations in the gatherer. Starting with version 5.1.3, the `ScatterGatherHandler` is supplied with the `errorChannelName` option. -It is populated to the `errorChannel` header of the scatter message and is used in the when async error happens or can be used in the regular synchronous sub-flow for directly sending an error message. +It is populated to the `errorChannel` header of the scatter message and is used when an async error happens or can be used in the regular synchronous sub-flow for directly sending an error message. The sample configuration below demonstrates async error handling by returning a compensation message: diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index c7fd7eb5f5d..d3c28f2578f 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -31,13 +31,16 @@ See xref:debezium.adoc[Debezium Support] for more information. See xref:endpoint.adoc#endpoint-pollingconsumer[Polling Consumer] for more information. - Java, Groovy and Kotlin DSLs have now context-specific methods in the `IntegrationFlowDefinition` with a single `Consumer` argument to configure an endpoint and its handler with one builder and readable options. -See, for example, `transformWith()`, `splitWith()` in xref:dsl.adoc#java-dsl[ Java DSL Chapter]. +See, for example, `transformWith()`, `splitWith()` in xref:dsl.adoc#java-dsl[Java DSL Chapter]. - A new `spring.integration.endpoints.defaultTimeout` global property has been introduced to override the default 30 seconds timeout for all the endpoints in the application. See xref:configuration/global-properties.adoc[Global Properties] for more information. - The `@MessagingGateway` and `GatewayEndpointSpec` provided by the Java DSL now expose the `errorOnTimeout` property of the internal `MethodInvocationGateway` extension of the `MessagingGatewaySupport`. -See xref:gateway.adoc#gateway-no-response[ Gateway Behavior When No response Arrives] for more information. +See xref:gateway.adoc#gateway-no-response[Gateway Behavior When No response Arrives] for more information. + +- The `LockRegistry` provides template-like API to execute provided task while locked. +See xref:distributed-locks.adoc[Distributed Locks] for more information. [[x6.2-websockets]] === WebSockets Changes @@ -55,7 +58,7 @@ See xref:kafka.adoc#kafka-inbound-pollable[Kafka Inbound Channel Adapter] for mo [[x6.2-jdbc]] === JDBC Support Changes -The `JdbcMessageStore`, `JdbcChannelMessageStore`, `JdbcMetadataStore`, and `DefaultLockRepository` implement `SmartLifecycle` and perform a`SELECT COUNT` query, on their respective tables, in the `start()` method to ensure that the required table (according to the provided prefix) is present in the target database. +The `JdbcMessageStore`, `JdbcChannelMessageStore`, `JdbcMetadataStore`, and `DefaultLockRepository` implement `SmartLifecycle` and perform a `SELECT COUNT` query, on their respective tables, in the `start()` method to ensure that the required table (according to the provided prefix) is present in the target database. See xref:jdbc/message-store.adoc#jdbc-db-init[Initializing the Database] for more information. [[x6.2-mongodb]] From 384fc9b2b6ea4e7d0b5ebcde2aee24d67bfaa413 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 8 Sep 2023 16:17:16 -0400 Subject: [PATCH 2/3] * Fix Javadoc for `CheckedFunction` --- .../org/springframework/integration/util/CheckedFunction.java | 1 + 1 file changed, 1 insertion(+) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedFunction.java b/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedFunction.java index 3ba74a3185b..d3890ab1d2a 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedFunction.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedFunction.java @@ -23,6 +23,7 @@ * * @param the input type. * @param the output type. + * @param the throwable type. * * @author Artem Bilan * From 7567c9b19bbe71f0fd0957ac3d67140d8ea77305 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 11 Sep 2023 12:14:06 -0400 Subject: [PATCH 3/3] Fix language in docs Co-authored-by: Gary Russell --- .../integration/support/locks/LockRegistry.java | 8 ++++---- .../integration/util/CheckedCallable.java | 3 ++- .../integration/util/CheckedRunnable.java | 3 ++- .../modules/ROOT/pages/distributed-locks.adoc | 14 +++++++------- 4 files changed, 15 insertions(+), 13 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/locks/LockRegistry.java b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/LockRegistry.java index b4ccaa7947c..74d3cb80bbd 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/locks/LockRegistry.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/LockRegistry.java @@ -44,7 +44,7 @@ public interface LockRegistry { Lock obtain(Object lockKey); /** - * Perform a provided task when lock for the key is locked. + * Perform the provided task when the lock for the key is locked. * @param lockKey the lock key to use * @param runnable the {@link CheckedRunnable} to execute within a lock * @param type of exception runnable throws @@ -62,7 +62,7 @@ default void executeLocked(Object lockKey, CheckedRunnable } /** - * Perform a provided task when lock for the key is locked. + * Perform the provided task when the lock for the key is locked. * @param lockKey the lock key to use * @param callable the {@link CheckedCallable} to execute within a lock * @param type of callable result @@ -85,7 +85,7 @@ default T executeLocked(Object lockKey, CheckedCallable } /** - * Perform a provided task when lock for the key is locked. + * Perform the provided task when the lock for the key is locked. * @param lockKey the lock key to use * @param waitLockDuration the {@link Duration} for {@link Lock#tryLock(long, TimeUnit)} * @param runnable the {@link CheckedRunnable} to execute within a lock @@ -105,7 +105,7 @@ default void executeLocked(Object lockKey, Duration waitLo } /** - * Perform a provided task when lock for the key is locked. + * Perform the provided task when the lock for the key is locked. * @param lockKey the lock key to use * @param waitLockDuration the {@link Duration} for {@link Lock#tryLock(long, TimeUnit)} * @param callable the {@link CheckedCallable} to execute within a lock diff --git a/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedCallable.java b/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedCallable.java index 26be34099c5..cb6200926f4 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedCallable.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedCallable.java @@ -17,7 +17,8 @@ package org.springframework.integration.util; /** - * A Callable-like interface which allows throwing Error. + * A Callable-like interface which allows throwing any Throwable. + * Checked exceptions are wrapped in an IllegalStateException. * * @param the output type. * @param the throwable type. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedRunnable.java b/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedRunnable.java index f48cbd7356e..12b7ee757e5 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedRunnable.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedRunnable.java @@ -17,7 +17,8 @@ package org.springframework.integration.util; /** - * A Runnable-like interface which allows throwing Error. + * A Runnable-like interface which allows throwing any Throwable. + * Checked exceptions are wrapped in an IllegalStateException. * * @param the throwable type. * diff --git a/src/reference/antora/modules/ROOT/pages/distributed-locks.adoc b/src/reference/antora/modules/ROOT/pages/distributed-locks.adoc index be633698441..fca279b8f46 100644 --- a/src/reference/antora/modules/ROOT/pages/distributed-locks.adoc +++ b/src/reference/antora/modules/ROOT/pages/distributed-locks.adoc @@ -2,16 +2,16 @@ = Distributed Locks In many situations the action against some context (or even single message) has to be performed in an exclusive manner. -One of the example is an aggregator component where we have to check the message group state for the current message to device whether we can release group or just add that message for future consideration. -For this purpose Java provides a good API with `java.util.concurrent.locks.Lock` implementations. -However, the problem becomes more complex when our system is distributed and/or run in the cluster. -The locking in this case is challenging and requires some shared and its specific approach to achieve an exclusivity requirement. +One example is an aggregator component where we have to check the message group state for the current message to determine whether we can release the group or just add that message for future consideration. +For this purpose Java provides an API with `java.util.concurrent.locks.Lock` implementations. +However, the problem becomes more complex when an application is distributed and/or run in the cluster. +The locking in this case is challenging and requires some shared state and its specific approach to achieve the exclusivity requirement. -Spring Integration suggests a `LockRegistrty` abstraction with an in-memory `DefaultLockRegistry` implementation based on the `ReentrantLock` API. -The `obtain(Object)` method of `LockRegistrty` requires some `lock key` for specific context. +Spring Integration provides a `LockRegistrty` abstraction with an in-memory `DefaultLockRegistry` implementation based on the `ReentrantLock` API. +The `obtain(Object)` method of the `LockRegistrty` requires a `lock key` for specific context. For example, an aggregator uses a `correlationKey` to lock operations around its group. This way different locks can be used concurrently. -This `obtain(Object)` method returns a `java.util.concurrent.locks.Lock` instance (depending on the `LockRegistrty` implementation), therefore the rest of the logic is the same as standard Java Concurrency algorithm. +This `obtain(Object)` method returns a `java.util.concurrent.locks.Lock` instance (depending on the `LockRegistry` implementation), therefore the rest of the logic is the same as standard Java Concurrency algorithm. Starting with version 6.2, the `LockRegistry` provides an `executeLocked()` API (`default` methods in this interface) to perform some task while locked. The behavior of this API is similar to well-known `JdbcTemplate`, `JmsTemplate` or `RestTemplate`.