| 
 | 1 | +/**  | 
 | 2 | + * Licensed to the Apache Software Foundation (ASF) under one  | 
 | 3 | + * or more contributor license agreements.  See the NOTICE file  | 
 | 4 | + * distributed with this work for additional information  | 
 | 5 | + * regarding copyright ownership.  The ASF licenses this file  | 
 | 6 | + * to you under the Apache License, Version 2.0 (the  | 
 | 7 | + * "License"); you may not use this file except in compliance  | 
 | 8 | + * with the License.  You may obtain a copy of the License at  | 
 | 9 | + *  | 
 | 10 | + * http://www.apache.org/licenses/LICENSE-2.0  | 
 | 11 | + *  | 
 | 12 | + * Unless required by applicable law or agreed to in writing, software  | 
 | 13 | + * distributed under the License is distributed on an "AS IS" BASIS,  | 
 | 14 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  | 
 | 15 | + * See the License for the specific language governing permissions and  | 
 | 16 | + * limitations under the License.  | 
 | 17 | + */  | 
 | 18 | +package org.apache.hadoop.hdfs.server.federation.router.async;  | 
 | 19 | + | 
 | 20 | +import java.io.IOException;  | 
 | 21 | +import java.util.concurrent.CompletableFuture;  | 
 | 22 | +import java.util.concurrent.Executor;  | 
 | 23 | + | 
 | 24 | +import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;  | 
 | 25 | + | 
 | 26 | +/**  | 
 | 27 | + * The AsyncApplyFunction interface represents a function that  | 
 | 28 | + * asynchronously accepts a value of type T and produces a result  | 
 | 29 | + * of type R. This interface extends {@link ApplyFunction} and is  | 
 | 30 | + * designed to be used with asynchronous computation frameworks,  | 
 | 31 | + * such as Java's {@link java.util.concurrent.CompletableFuture}.  | 
 | 32 | + *  | 
 | 33 | + * <p>An implementation of this interface is expected to perform an  | 
 | 34 | + * asynchronous operation and return a result, which is typically  | 
 | 35 | + * represented as a {@code CompletableFuture<R>}. This allows for  | 
 | 36 | + * non-blocking execution of tasks and is particularly useful for  | 
 | 37 | + * I/O operations or any operation that may take a significant amount  | 
 | 38 | + * of time to complete.</p>  | 
 | 39 | + *  | 
 | 40 | + * <p>AsyncApplyFunction is used to implement the following semantics:</p>  | 
 | 41 | + * <pre>  | 
 | 42 | + * {@code  | 
 | 43 | + *    T res = doAsync1(input);  | 
 | 44 | + *    // Can use AsyncApplyFunction  | 
 | 45 | + *    R result = doAsync2(res);  | 
 | 46 | + * }  | 
 | 47 | + * </pre>  | 
 | 48 | + *  | 
 | 49 | + * @param <T> the type of the input to the function  | 
 | 50 | + * @param <R> the type of the result of the function  | 
 | 51 | + * @see ApplyFunction  | 
 | 52 | + * @see java.util.concurrent.CompletableFuture  | 
 | 53 | + */  | 
 | 54 | +@FunctionalInterface  | 
 | 55 | +public interface AsyncApplyFunction<T, R> extends ApplyFunction<T, R> {  | 
 | 56 | + | 
 | 57 | +  /**  | 
 | 58 | +   * Asynchronously applies this function to the given argument.  | 
 | 59 | +   *  | 
 | 60 | +   * <p>This method is intended to initiate the function application  | 
 | 61 | +   * without waiting for the result. It is typically used when the  | 
 | 62 | +   * result of the operation is not required immediately or when the  | 
 | 63 | +   * operation is part of a larger asynchronous workflow.</p>  | 
 | 64 | +   *  | 
 | 65 | +   * @param t the function argument  | 
 | 66 | +   * @throws IOException if an I/O error occurs during the application  | 
 | 67 | +   *                     of the function  | 
 | 68 | +   */  | 
 | 69 | +  void applyAsync(T t) throws IOException;  | 
 | 70 | + | 
 | 71 | +  /**  | 
 | 72 | +   * Synchronously applies this function to the given argument and  | 
 | 73 | +   * returns the result.  | 
 | 74 | +   *  | 
 | 75 | +   * <p>This method waits for the asynchronous operation to complete  | 
 | 76 | +   * and returns its result. It is useful when the result is needed  | 
 | 77 | +   * immediately and the calling code cannot proceed without it.</p>  | 
 | 78 | +   *  | 
 | 79 | +   * @param t the function argument  | 
 | 80 | +   * @return the result of applying the function to the argument  | 
 | 81 | +   * @throws IOException if an I/O error occurs during the application  | 
 | 82 | +   *                     of the function  | 
 | 83 | +   */  | 
 | 84 | +  @Override  | 
 | 85 | +  default R apply(T t) throws IOException {  | 
 | 86 | +    applyAsync(t);  | 
 | 87 | +    return result();  | 
 | 88 | +  }  | 
 | 89 | + | 
 | 90 | +  /**  | 
 | 91 | +   * Initiates the asynchronous application of this function to the given result.  | 
 | 92 | +   * <p>  | 
 | 93 | +   * This method calls applyAsync to start the asynchronous operation and then retrieves  | 
 | 94 | +   * the current thread's CompletableFuture using getCurCompletableFuture.  | 
 | 95 | +   * It returns this CompletableFuture, which will be completed with the result of the  | 
 | 96 | +   * asynchronous operation once it is finished.  | 
 | 97 | +   * <p>  | 
 | 98 | +   * This method is useful for chaining with other asynchronous operations, as it allows the  | 
 | 99 | +   * current operation to be part of a larger asynchronous workflow.  | 
 | 100 | +   *  | 
 | 101 | +   * @param t the function argument  | 
 | 102 | +   * @return a CompletableFuture that will be completed with the result of the  | 
 | 103 | +   *         asynchronous operation  | 
 | 104 | +   * @throws IOException if an I/O error occurs during the initiation of the asynchronous operation  | 
 | 105 | +   */  | 
 | 106 | +  default CompletableFuture<R> async(T t) throws IOException {  | 
 | 107 | +    applyAsync(t);  | 
 | 108 | +    CompletableFuture<R> completableFuture = getCurCompletableFuture();  | 
 | 109 | +    assert completableFuture != null;  | 
 | 110 | +    return completableFuture;  | 
 | 111 | +  }  | 
 | 112 | + | 
 | 113 | +  /**  | 
 | 114 | +   * Asynchronously applies this function to the result of the given  | 
 | 115 | +   * CompletableFuture.  | 
 | 116 | +   *  | 
 | 117 | +   * <p>This method chains the function application to the completion  | 
 | 118 | +   * of the input future. It returns a new CompletableFuture that  | 
 | 119 | +   * completes with the function's result when the input future  | 
 | 120 | +   * completes.</p>  | 
 | 121 | +   *  | 
 | 122 | +   * @param in the input future  | 
 | 123 | +   * @return a new CompletableFuture that holds the result of the  | 
 | 124 | +   *         function application  | 
 | 125 | +   */  | 
 | 126 | +  @Override  | 
 | 127 | +  default CompletableFuture<R> apply(CompletableFuture<T> in) {  | 
 | 128 | +    return in.thenCompose(t -> {  | 
 | 129 | +      try {  | 
 | 130 | +        return async(t);  | 
 | 131 | +      } catch (IOException e) {  | 
 | 132 | +        throw warpCompletionException(e);  | 
 | 133 | +      }  | 
 | 134 | +    });  | 
 | 135 | +  }  | 
 | 136 | + | 
 | 137 | +  /**  | 
 | 138 | +   * Asynchronously applies this function to the result of the given  | 
 | 139 | +   * CompletableFuture, using the specified executor for the  | 
 | 140 | +   * asynchronous computation.  | 
 | 141 | +   *  | 
 | 142 | +   * <p>This method allows for more control over the execution  | 
 | 143 | +   * context of the asynchronous operation, such as running the  | 
 | 144 | +   * operation in a separate thread or thread pool.</p>  | 
 | 145 | +   *  | 
 | 146 | +   * @param in the input future  | 
 | 147 | +   * @param executor the executor to use for the asynchronous  | 
 | 148 | +   *                 computation  | 
 | 149 | +   * @return a new CompletableFuture that holds the result of the  | 
 | 150 | +   *         function application  | 
 | 151 | +   */  | 
 | 152 | +  @Override  | 
 | 153 | +  default CompletableFuture<R> apply(CompletableFuture<T> in, Executor executor) {  | 
 | 154 | +    return in.thenComposeAsync(t -> {  | 
 | 155 | +      try {  | 
 | 156 | +        return async(t);  | 
 | 157 | +      } catch (IOException e) {  | 
 | 158 | +        throw warpCompletionException(e);  | 
 | 159 | +      }  | 
 | 160 | +    }, executor);  | 
 | 161 | +  }  | 
 | 162 | +}  | 
0 commit comments