Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,24 +16,119 @@

package org.springframework.integration.support.locks;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;

import org.springframework.integration.util.CheckedCallable;
import org.springframework.integration.util.CheckedRunnable;

/**
* Strategy for maintaining a registry of shared locks.
*
* @author Oleg Zhurakousky
* @author Gary Russell
* @author Artem Bilan
*
* @since 2.1.1
*/
@FunctionalInterface
public interface LockRegistry {

/**
* Obtains the lock associated with the parameter object.
* Obtain the lock associated with the parameter object.
* @param lockKey The object with which the lock is associated.
* @return The associated lock.
*/
Lock obtain(Object lockKey);

/**
* Perform the provided task when the lock for the key is locked.
* @param lockKey the lock key to use
* @param runnable the {@link CheckedRunnable} to execute within a lock
* @param <E> type of exception runnable throws
* @throws InterruptedException from a lock operation
* @since 6.2
*/
default <E extends Throwable> void executeLocked(Object lockKey, CheckedRunnable<E> runnable)
throws E, InterruptedException {

executeLocked(lockKey,
() -> {
runnable.run();
return null;
});
}

/**
* Perform the provided task when the lock for the key is locked.
* @param lockKey the lock key to use
* @param callable the {@link CheckedCallable} to execute within a lock
* @param <T> type of callable result
* @param <E> type of exception callable throws
* @return the result of callable
* @throws InterruptedException from a lock operation
* @since 6.2
*/
default <T, E extends Throwable> T executeLocked(Object lockKey, CheckedCallable<T, E> callable)
throws E, InterruptedException {

Lock lock = obtain(lockKey);
lock.lockInterruptibly();
try {
return callable.call();
}
finally {
lock.unlock();
}
}

/**
* Perform the provided task when the lock for the key is locked.
* @param lockKey the lock key to use
* @param waitLockDuration the {@link Duration} for {@link Lock#tryLock(long, TimeUnit)}
* @param runnable the {@link CheckedRunnable} to execute within a lock
* @param <E> type of exception runnable throws
* @throws InterruptedException from a lock operation
* @throws TimeoutException when {@link Lock#tryLock(long, TimeUnit)} has elapsed
* @since 6.2
*/
default <E extends Throwable> void executeLocked(Object lockKey, Duration waitLockDuration,
CheckedRunnable<E> runnable) throws E, InterruptedException, TimeoutException {

executeLocked(lockKey, waitLockDuration,
() -> {
runnable.run();
return null;
});
}

/**
* Perform the provided task when the lock for the key is locked.
* @param lockKey the lock key to use
* @param waitLockDuration the {@link Duration} for {@link Lock#tryLock(long, TimeUnit)}
* @param callable the {@link CheckedCallable} to execute within a lock
* @param <E> type of exception callable throws
* @throws InterruptedException from a lock operation
* @throws TimeoutException when {@link Lock#tryLock(long, TimeUnit)} has elapsed
* @since 6.2
*/
default <T, E extends Throwable> T executeLocked(Object lockKey, Duration waitLockDuration,
CheckedCallable<T, E> callable) throws E, InterruptedException, TimeoutException {

Lock lock = obtain(lockKey);
if (!lock.tryLock(waitLockDuration.toMillis(), TimeUnit.MILLISECONDS)) {
throw new TimeoutException(
"The lock [%s] was not acquired in time: %s".formatted(lockKey, waitLockDuration));
}

try {
return callable.call();
}
finally {
lock.unlock();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.util;

/**
* A Callable-like interface which allows throwing any Throwable.
* Checked exceptions are wrapped in an IllegalStateException.
*
* @param <T> the output type.
* @param <E> the throwable type.
*
* @author Artem Bilan
*
* @since 6.2
*/
@FunctionalInterface
public interface CheckedCallable<T, E extends Throwable> {

T call() throws E;

default Runnable unchecked() {
return () -> {
try {
call();
}
catch (Throwable t) { // NOSONAR
if (t instanceof RuntimeException runtimeException) { // NOSONAR
throw runtimeException;
}
else if (t instanceof Error error) { // NOSONAR
throw error;
}
else {
throw new IllegalStateException(t);
}
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@
*
* @param <T> the input type.
* @param <R> the output type.
* @param <E> the throwable type.
*
* @author Artem Bilan
*
* @since 6.1
*/
@FunctionalInterface
public interface CheckedFunction<T, R> {
public interface CheckedFunction<T, R, E extends Throwable> {

R apply(T t) throws Throwable; // NOSONAR
R apply(T t) throws E;

default Function<T, R> unchecked() {
return t1 -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.util;

/**
* A Runnable-like interface which allows throwing any Throwable.
* Checked exceptions are wrapped in an IllegalStateException.
*
* @param <E> the throwable type.
*
* @author Artem Bilan
*
* @since 6.2
*/
@FunctionalInterface
public interface CheckedRunnable<E extends Throwable> {

void run() throws E;

default Runnable unchecked() {
return () -> {
try {
run();
}
catch (Throwable t) { // NOSONAR
if (t instanceof RuntimeException runtimeException) { // NOSONAR
throw runtimeException;
}
else if (t instanceof Error error) { // NOSONAR
throw error;
}
else {
throw new IllegalStateException(t);
}
}
};
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,10 +30,13 @@
* then call {@link #doWhileLocked()}.
*
* @author Oleg Zhurakousky
* @author Artem Bilan
*
* @since 2.2
*
* @deprecated since 6.2 in favor of {@link LockRegistry#executeLocked}.
*/
@Deprecated(since = "6.2", forRemoval = true)
public abstract class WhileLockedProcessor {

private final Object key;
Expand Down
Loading