|
18 | 18 |
|
19 | 19 | package org.apache.hadoop.hdfs.protocolPB; |
20 | 20 |
|
| 21 | +import org.apache.hadoop.hdfs.server.federation.router.ThreadLocalContext; |
21 | 22 | import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction; |
22 | 23 | import org.apache.hadoop.io.Writable; |
23 | | -import org.apache.hadoop.ipc.CallerContext; |
24 | 24 | import org.apache.hadoop.ipc.Client; |
25 | 25 | import org.apache.hadoop.ipc.ProtobufRpcEngine2; |
26 | | -import org.apache.hadoop.ipc.Server; |
27 | 26 | import org.apache.hadoop.ipc.internal.ShadedProtobufHelper; |
28 | 27 | import org.apache.hadoop.util.concurrent.AsyncGet; |
29 | 28 | import org.slf4j.Logger; |
30 | 29 | import org.slf4j.LoggerFactory; |
31 | 30 |
|
32 | 31 | import java.io.IOException; |
33 | 32 | import java.util.concurrent.CompletableFuture; |
| 33 | +import java.util.concurrent.Executor; |
34 | 34 |
|
35 | 35 | import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException; |
36 | | -import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; |
37 | 36 | import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCompleteWith; |
38 | 37 | import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; |
39 | 38 | import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc; |
40 | 39 |
|
| 40 | +/** |
| 41 | + * <p>This utility class encapsulates the logic required to initiate asynchronous RPCs, |
| 42 | + * handle responses, and propagate exceptions. It works in conjunction with |
| 43 | + * {@link ProtobufRpcEngine2} and {@link Client} to facilitate the asynchronous |
| 44 | + * nature of the operations. |
| 45 | + * |
| 46 | + * @see ProtobufRpcEngine2 |
| 47 | + * @see Client |
| 48 | + * @see CompletableFuture |
| 49 | + */ |
41 | 50 | public final class AsyncRpcProtocolPBUtil { |
42 | 51 | public static final Logger LOG = LoggerFactory.getLogger(AsyncRpcProtocolPBUtil.class); |
| 52 | + /** The executor used for handling responses asynchronously. */ |
| 53 | + private static Executor worker; |
43 | 54 |
|
44 | 55 | private AsyncRpcProtocolPBUtil() {} |
45 | 56 |
|
| 57 | + /** |
| 58 | + * Asynchronously invokes an RPC call and applies a response transformation function |
| 59 | + * to the result. This method is generic and can be used to handle any type of |
| 60 | + * RPC call. |
| 61 | + * |
| 62 | + * <p>The method uses the {@link ShadedProtobufHelper.IpcCall} to prepare the RPC call |
| 63 | + * and the {@link ApplyFunction} to process the response. It also handles exceptions |
| 64 | + * that may occur during the RPC call and wraps them in a user-friendly manner. |
| 65 | + * |
| 66 | + * @param call The IPC call encapsulating the RPC request. |
| 67 | + * @param response The function to apply to the response of the RPC call. |
| 68 | + * @param clazz The class object representing the type {@code R} of the response. |
| 69 | + * @param <T> Type of the call's result. |
| 70 | + * @param <R> Type of method return. |
| 71 | + * @return An object of type {@code R} that is the result of applying the response |
| 72 | + * function to the RPC call result. |
| 73 | + * @throws IOException If an I/O error occurs during the asynchronous RPC call. |
| 74 | + */ |
46 | 75 | public static <T, R> R asyncIpcClient( |
47 | 76 | ShadedProtobufHelper.IpcCall<T> call, ApplyFunction<T, R> response, |
48 | 77 | Class<R> clazz) throws IOException { |
49 | 78 | ipc(call); |
50 | 79 | AsyncGet<T, Exception> asyncReqMessage = |
51 | 80 | (AsyncGet<T, Exception>) ProtobufRpcEngine2.getAsyncReturnMessage(); |
52 | 81 | CompletableFuture<Writable> responseFuture = Client.getResponseFuture(); |
53 | | - // transfer originCall & callerContext to worker threads of executor. |
54 | | - final Server.Call originCall = Server.getCurCall().get(); |
55 | | - final CallerContext originContext = CallerContext.getCurrent(); |
56 | | - asyncCompleteWith(responseFuture); |
57 | | - asyncApply(o -> { |
| 82 | + // transfer thread local context to worker threads of executor. |
| 83 | + ThreadLocalContext threadLocalContext = new ThreadLocalContext(); |
| 84 | + asyncCompleteWith(responseFuture.handleAsync((result, e) -> { |
| 85 | + threadLocalContext.transfer(); |
| 86 | + if (e != null) { |
| 87 | + throw warpCompletionException(e); |
| 88 | + } |
58 | 89 | try { |
59 | | - Server.getCurCall().set(originCall); |
60 | | - CallerContext.setCurrent(originContext); |
61 | 90 | T res = asyncReqMessage.get(-1, null); |
62 | 91 | return response.apply(res); |
63 | | - } catch (Exception e) { |
64 | | - throw warpCompletionException(e); |
| 92 | + } catch (Exception ex) { |
| 93 | + throw warpCompletionException(ex); |
65 | 94 | } |
66 | | - }); |
| 95 | + }, worker)); |
67 | 96 | return asyncReturn(clazz); |
68 | 97 | } |
| 98 | + |
| 99 | + /** |
| 100 | + * Sets the executor used for handling responses asynchronously within |
| 101 | + * the utility class. |
| 102 | + * |
| 103 | + * @param worker The executor to be used for handling responses asynchronously. |
| 104 | + */ |
| 105 | + public static void setWorker(Executor worker) { |
| 106 | + AsyncRpcProtocolPBUtil.worker = worker; |
| 107 | + } |
69 | 108 | } |
0 commit comments