2020
2121import  org .apache .hadoop .hdfs .server .federation .router .ThreadLocalContext ;
2222import  org .apache .hadoop .hdfs .server .federation .router .async .utils .ApplyFunction ;
23+ import  org .apache .hadoop .hdfs .server .federation .router .async .utils .AsyncUtil ;
2324import  org .apache .hadoop .io .Writable ;
25+ import  org .apache .hadoop .ipc .CallerContext ;
2426import  org .apache .hadoop .ipc .Client ;
2527import  org .apache .hadoop .ipc .ProtobufRpcEngine2 ;
28+ import  org .apache .hadoop .ipc .ProtobufRpcEngineCallback2 ;
2629import  org .apache .hadoop .ipc .internal .ShadedProtobufHelper ;
30+ import  org .apache .hadoop .thirdparty .protobuf .Message ;
2731import  org .apache .hadoop .util .concurrent .AsyncGet ;
2832import  org .slf4j .Logger ;
2933import  org .slf4j .LoggerFactory ;
3034
3135import  java .io .IOException ;
3236import  java .util .concurrent .CompletableFuture ;
37+ import  java .util .concurrent .CompletionException ;
3338import  java .util .concurrent .Executor ;
3439
3540import  static  org .apache .hadoop .hdfs .server .federation .router .async .utils .Async .warpCompletionException ;
@@ -96,6 +101,45 @@ public static <T, R> R asyncIpcClient(
96101    return  asyncReturn (clazz );
97102  }
98103
104+   /** 
105+    * Asynchronously invokes an RPC call and applies a response transformation function 
106+    * to the result on server-side. 
107+    * @param req The IPC call encapsulating the RPC request on server-side. 
108+    * @param res The function to apply to the response of the RPC call on server-side. 
109+    * @param <T> Type of the call's result. 
110+    */ 
111+   public  static  <T > void  asyncRouterServer (ServerReq <T > req , ServerRes <T > res ) {
112+     final  ProtobufRpcEngineCallback2  callback  =
113+         ProtobufRpcEngine2 .Server .registerForDeferredResponse2 ();
114+ 
115+     CompletableFuture <Object > completableFuture  =
116+         CompletableFuture .completedFuture (null );
117+     completableFuture .thenCompose (o  -> {
118+       try  {
119+         req .req ();
120+         return  (CompletableFuture <T >) AsyncUtil .getAsyncUtilCompletableFuture ();
121+       } catch  (Exception  e ) {
122+         throw  new  CompletionException (e );
123+       }
124+     }).handle ((result , e ) -> {
125+       LOG .debug ("Async response, callback: {}, CallerContext: {}, result: [{}], exception: [{}]" ,
126+           callback , CallerContext .getCurrent (), result , e );
127+       if  (e  == null ) {
128+         Message  value  = null ;
129+         try  {
130+           value  = res .res (result );
131+         } catch  (Exception  re ) {
132+           callback .error (re );
133+           return  null ;
134+         }
135+         callback .setResponse (value );
136+       } else  {
137+         callback .error (e .getCause ());
138+       }
139+       return  null ;
140+     });
141+   }
142+ 
99143  /** 
100144   * Sets the executor used for handling responses asynchronously within 
101145   * the utility class. 
@@ -105,4 +149,14 @@ public static <T, R> R asyncIpcClient(
105149  public  static  void  setWorker (Executor  worker ) {
106150    AsyncRpcProtocolPBUtil .worker  = worker ;
107151  }
152+ 
153+   @ FunctionalInterface 
154+   interface  ServerReq <T > {
155+     T  req () throws  Exception ;
156+   }
157+ 
158+   @ FunctionalInterface 
159+   interface  ServerRes <T > {
160+     Message  res (T  result ) throws  Exception ;
161+   }
108162}
0 commit comments