Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router.async;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;

/**
* Represents a function that accepts a value of type T and produces a result of type R.
* This interface extends {@link Async} and provides methods to apply the function
* asynchronously using {@link CompletableFuture}.
*
* <p>ApplyFunction is used to implement the following semantics:</p>
* <pre>
* {@code
* T res = doAsync(input);
* // Can use ApplyFunction
* R result = thenApply(res);
* }
* </pre>
*
* @param <T> the type of the input to the function
* @param <R> the type of the result of the function
*/
@FunctionalInterface
public interface ApplyFunction<T, R> extends Async<R>{

/**
* Applies this function to the given argument.
*
* @param t the function argument
* @return the function result
* @throws IOException if an I/O error occurs
*/
R apply(T t) throws IOException;

/**
* Applies this function asynchronously to the result of the given {@link CompletableFuture}.
* The function is executed on the same thread as the completion of the given future.
*
* @param in the input future
* @return a new future that holds the result of the function application
*/
default CompletableFuture<R> apply(CompletableFuture<T> in) {
return in.thenApply(t -> {
try {
return ApplyFunction.this.apply(t);
} catch (IOException e) {
throw warpCompletionException(e);
}
});
}

/**
* Applies this function asynchronously to the result of the given {@link CompletableFuture},
* using the specified executor for the asynchronous computation.
*
* @param in the input future
* @param executor the executor to use for the asynchronous computation
* @return a new future that holds the result of the function application
*/
default CompletableFuture<R> apply(CompletableFuture<T> in, Executor executor) {
return in.thenApplyAsync(t -> {
try {
return ApplyFunction.this.apply(t);
} catch (IOException e) {
throw warpCompletionException(e);
}
}, executor);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router.async;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CompletionException;

/**
* An interface for asynchronous operations, providing utility methods
* and constants related to asynchronous computations.
*
* @param <R> The type of the result of the asynchronous operation
*/
public interface Async<R> {

/**
* A thread-local variable to store the {@link CompletableFuture} instance for the current thread.
* <p>
* <b>Note:</b> After executing an asynchronous method, the thread stores the CompletableFuture
* of the asynchronous method in the thread's local variable
*/
ThreadLocal<CompletableFuture<Object>> CUR_COMPLETABLE_FUTURE
= new ThreadLocal<>();

/**
* Sets the {@link CompletableFuture} instance for the current thread.
*
* @param completableFuture The {@link CompletableFuture} instance to be set
* @param <T> The type of the result in the CompletableFuture
*/
default <T> void setCurCompletableFuture(CompletableFuture<T> completableFuture) {
CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) completableFuture);
}

/**
* Gets the {@link CompletableFuture} instance for the current thread.
*
* @return The {@link CompletableFuture} instance for the current thread,
* or {@code null} if not set
*/
default CompletableFuture<R> getCurCompletableFuture() {
return (CompletableFuture<R>) CUR_COMPLETABLE_FUTURE.get();
}

/**
* Blocks and retrieves the result of the {@link CompletableFuture} instance
* for the current thread.
*
* @return The result of the CompletableFuture, or {@code null} if the thread was interrupted
* @throws IOException If the completion exception to the CompletableFuture
* is an IOException or a subclass of it
*/
default R result() throws IOException {
try {
CompletableFuture<R> completableFuture =
(CompletableFuture<R>) CUR_COMPLETABLE_FUTURE.get();
assert completableFuture != null;
return completableFuture.get();
} catch (InterruptedException e) {
return null;
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException)cause;
}
throw new IOException(e);
}
}

/**
* Extracts the real cause of an exception wrapped by CompletionException.
*
* @param e The incoming exception, which may be a CompletionException.
* @return Returns the real cause of the original exception,
* or the original exception if there is no cause.
*/
static Throwable unWarpCompletionException(Throwable e) {
if (e instanceof CompletionException) {
if (e.getCause() != null) {
return e.getCause();
}
}
return e;
}

/**
* Wraps the incoming exception in a new CompletionException.
*
* @param e The incoming exception, which may be any type of Throwable.
* @return Returns a new CompletionException with the original exception as its cause.
*/
static CompletionException warpCompletionException(Throwable e) {
if (e instanceof CompletionException) {
return (CompletionException) e;
}
return new CompletionException(e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router.async;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;

/**
* The AsyncApplyFunction interface represents a function that
* asynchronously accepts a value of type T and produces a result
* of type R. This interface extends {@link ApplyFunction} and is
* designed to be used with asynchronous computation frameworks,
* such as Java's {@link java.util.concurrent.CompletableFuture}.
*
* <p>An implementation of this interface is expected to perform an
* asynchronous operation and return a result, which is typically
* represented as a {@code CompletableFuture<R>}. This allows for
* non-blocking execution of tasks and is particularly useful for
* I/O operations or any operation that may take a significant amount
* of time to complete.</p>
*
* <p>AsyncApplyFunction is used to implement the following semantics:</p>
* <pre>
* {@code
* T res = doAsync1(input);
* // Can use AsyncApplyFunction
* R result = doAsync2(res);
* }
* </pre>
*
* @param <T> the type of the input to the function
* @param <R> the type of the result of the function
* @see ApplyFunction
* @see java.util.concurrent.CompletableFuture
*/
@FunctionalInterface
public interface AsyncApplyFunction<T, R> extends ApplyFunction<T, R> {

/**
* Asynchronously applies this function to the given argument.
*
* <p>This method is intended to initiate the function application
* without waiting for the result. It is typically used when the
* result of the operation is not required immediately or when the
* operation is part of a larger asynchronous workflow.</p>
*
* @param t the function argument
* @throws IOException if an I/O error occurs during the application
* of the function
*/
void applyAsync(T t) throws IOException;

/**
* Synchronously applies this function to the given argument and
* returns the result.
*
* <p>This method waits for the asynchronous operation to complete
* and returns its result. It is useful when the result is needed
* immediately and the calling code cannot proceed without it.</p>
*
* @param t the function argument
* @return the result of applying the function to the argument
* @throws IOException if an I/O error occurs during the application
* of the function
*/
@Override
default R apply(T t) throws IOException {
applyAsync(t);
return result();
}

/**
* Initiates the asynchronous application of this function to the given result.
* <p>
* This method calls applyAsync to start the asynchronous operation and then retrieves
* the current thread's CompletableFuture using getCurCompletableFuture.
* It returns this CompletableFuture, which will be completed with the result of the
* asynchronous operation once it is finished.
* <p>
* This method is useful for chaining with other asynchronous operations, as it allows the
* current operation to be part of a larger asynchronous workflow.
*
* @param t the function argument
* @return a CompletableFuture that will be completed with the result of the
* asynchronous operation
* @throws IOException if an I/O error occurs during the initiation of the asynchronous operation
*/
default CompletableFuture<R> async(T t) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest to add some javadoc here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

applyAsync(t);
CompletableFuture<R> completableFuture = getCurCompletableFuture();
assert completableFuture != null;
return completableFuture;
}

/**
* Asynchronously applies this function to the result of the given
* CompletableFuture.
*
* <p>This method chains the function application to the completion
* of the input future. It returns a new CompletableFuture that
* completes with the function's result when the input future
* completes.</p>
*
* @param in the input future
* @return a new CompletableFuture that holds the result of the
* function application
*/
@Override
default CompletableFuture<R> apply(CompletableFuture<T> in) {
return in.thenCompose(t -> {
try {
return async(t);
} catch (IOException e) {
throw warpCompletionException(e);
}
});
}

/**
* Asynchronously applies this function to the result of the given
* CompletableFuture, using the specified executor for the
* asynchronous computation.
*
* <p>This method allows for more control over the execution
* context of the asynchronous operation, such as running the
* operation in a separate thread or thread pool.</p>
*
* @param in the input future
* @param executor the executor to use for the asynchronous
* computation
* @return a new CompletableFuture that holds the result of the
* function application
*/
@Override
default CompletableFuture<R> apply(CompletableFuture<T> in, Executor executor) {
return in.thenComposeAsync(t -> {
try {
return async(t);
} catch (IOException e) {
throw warpCompletionException(e);
}
}, executor);
}
}
Loading