From 7d3deb178af5e5afad4621a8378584ef94ddea46 Mon Sep 17 00:00:00 2001 From: Wenqi Li Date: Tue, 13 Aug 2024 00:00:58 +0800 Subject: [PATCH 1/3] [ARR] RouterSnapshot supports asynchronous rpc. --- .../router/RouterAsyncSnapshot.java | 204 +++++++++++++++++ .../federation/router/RouterRpcServer.java | 18 ++ .../router/TestRouterAsyncSnapshot.java | 207 ++++++++++++++++++ 3 files changed, 429 insertions(+) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncSnapshot.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncSnapshot.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncSnapshot.java new file mode 100644 index 0000000000000..7ef353f89d6a9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncSnapshot.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing; +import org.apache.hadoop.hdfs.protocol.SnapshotStatus; +import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction; +import org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil; +import org.apache.hadoop.hdfs.server.namenode.NameNode; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; + +/** + * Module that implements all the asynchronous RPC calls related to snapshots in + * {@link ClientProtocol} in the {@link RouterRpcServer}. + */ +public class RouterAsyncSnapshot extends RouterSnapshot { + /** RPC server to receive client calls. */ + private final RouterRpcServer rpcServer; + /** RPC clients to connect to the Namenodes. */ + private final RouterRpcClient rpcClient; + /** Find generic locations. */ + private final ActiveNamenodeResolver namenodeResolver; + + public RouterAsyncSnapshot(RouterRpcServer server) { + super(server); + this.rpcServer = server; + this.rpcClient = this.rpcServer.getRPCClient(); + this.namenodeResolver = rpcServer.getNamenodeResolver(); + } + + @Override + public String createSnapshot(String snapshotRoot, String snapshotName) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + final List locations = + rpcServer.getLocationsForPath(snapshotRoot, true, false); + RemoteMethod method = new RemoteMethod("createSnapshot", + new Class[] {String.class, String.class}, new RemoteParam(), + snapshotName); + + if (rpcServer.isInvokeConcurrent(snapshotRoot)) { + rpcClient.invokeConcurrent(locations, method, String.class); + asyncApply((ApplyFunction, String>) + results -> { + Map.Entry firstelement = + results.entrySet().iterator().next(); + RemoteLocation loc = firstelement.getKey(); + String result = firstelement.getValue(); + return result.replaceFirst(loc.getDest(), loc.getSrc()); + }); + } else { + rpcClient.invokeSequential(method, locations, String.class, null); + asyncApply((ApplyFunction, String>) + response -> { + RemoteLocation loc = response.getLocation(); + String invokedResult = response.getResult(); + return invokedResult.replaceFirst(loc.getDest(), loc.getSrc()); + }); + } + return AsyncUtil.asyncReturn(String.class); + } + + @Override + public SnapshottableDirectoryStatus[] getSnapshottableDirListing() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + RemoteMethod method = new RemoteMethod("getSnapshottableDirListing"); + Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent( + nss, method, true, false, SnapshottableDirectoryStatus[].class); + asyncApply((ApplyFunction, + SnapshottableDirectoryStatus[]>) + ret -> RouterRpcServer.merge(ret, SnapshottableDirectoryStatus.class)); + return AsyncUtil.asyncReturn(SnapshottableDirectoryStatus[].class); + } + + @Override + public SnapshotStatus[] getSnapshotListing(String snapshotRoot) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + final List locations = + rpcServer.getLocationsForPath(snapshotRoot, true, false); + RemoteMethod remoteMethod = new RemoteMethod("getSnapshotListing", + new Class[]{String.class}, + new RemoteParam()); + if (rpcServer.isInvokeConcurrent(snapshotRoot)) { + rpcClient.invokeConcurrent( + locations, remoteMethod, true, false, SnapshotStatus[].class); + asyncApply((ApplyFunction, SnapshotStatus[]>) + ret -> { + SnapshotStatus[] response = ret.values().iterator().next(); + String src = ret.keySet().iterator().next().getSrc(); + String dst = ret.keySet().iterator().next().getDest(); + for (SnapshotStatus s : response) { + String mountPath = DFSUtil.bytes2String(s.getParentFullPath()). + replaceFirst(src, dst); + s.setParentFullPath(DFSUtil.string2Bytes(mountPath)); + } + return response; + }); + } else { + rpcClient + .invokeSequential(remoteMethod, locations, SnapshotStatus[].class, + null); + asyncApply((ApplyFunction, SnapshotStatus[]>) + invokedResponse -> { + RemoteLocation loc = invokedResponse.getLocation(); + SnapshotStatus[] response = invokedResponse.getResult(); + for (SnapshotStatus s : response) { + String mountPath = DFSUtil.bytes2String(s.getParentFullPath()). + replaceFirst(loc.getDest(), loc.getSrc()); + s.setParentFullPath(DFSUtil.string2Bytes(mountPath)); + } + return response; + }); + } + return asyncReturn(SnapshotStatus[].class); + } + + @Override + public SnapshotDiffReport getSnapshotDiffReport( + String snapshotRoot, String earlierSnapshotName, + String laterSnapshotName) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + final List locations = + rpcServer.getLocationsForPath(snapshotRoot, true, false); + RemoteMethod remoteMethod = new RemoteMethod("getSnapshotDiffReport", + new Class[] {String.class, String.class, String.class}, + new RemoteParam(), earlierSnapshotName, laterSnapshotName); + + if (rpcServer.isInvokeConcurrent(snapshotRoot)) { + rpcClient.invokeConcurrent( + locations, remoteMethod, true, false, SnapshotDiffReport.class); + asyncApply((ApplyFunction, SnapshotDiffReport>) + ret -> ret.values().iterator().next()); + return asyncReturn(SnapshotDiffReport.class); + } else { + return rpcClient.invokeSequential( + locations, remoteMethod, SnapshotDiffReport.class, null); + } + } + + @Override + public SnapshotDiffReportListing getSnapshotDiffReportListing( + String snapshotRoot, String earlierSnapshotName, String laterSnapshotName, + byte[] startPath, int index) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + final List locations = + rpcServer.getLocationsForPath(snapshotRoot, true, false); + Class[] params = new Class[] { + String.class, String.class, String.class, + byte[].class, int.class}; + RemoteMethod remoteMethod = new RemoteMethod( + "getSnapshotDiffReportListing", params, + new RemoteParam(), earlierSnapshotName, laterSnapshotName, + startPath, index); + + if (rpcServer.isInvokeConcurrent(snapshotRoot)) { + rpcClient.invokeConcurrent(locations, remoteMethod, false, false, + SnapshotDiffReportListing.class); + asyncApply((ApplyFunction, + SnapshotDiffReportListing>) ret -> { + Collection listings = ret.values(); + SnapshotDiffReportListing listing0 = listings.iterator().next(); + return listing0; + }); + return asyncReturn(SnapshotDiffReportListing.class); + } else { + return rpcClient.invokeSequential( + locations, remoteMethod, SnapshotDiffReportListing.class, null); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index c23c21c6dfb67..916d7d52880bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -449,6 +449,24 @@ public RouterRpcServer(Configuration conf, Router router, initRouterFedRename(); } + protected void initAsyncThreadPool() { + int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, + DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT); + int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, + DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT); + if (asyncRouterHandler == null) { + LOG.info("init router async handler count: {}", asyncHandlerCount); + asyncRouterHandler = Executors.newFixedThreadPool( + asyncHandlerCount, new AsyncThreadFactory("router async handler ")); + } + if (asyncRouterResponder == null) { + LOG.info("init router async responder count: {}", asyncResponderCount); + asyncRouterResponder = Executors.newFixedThreadPool( + asyncResponderCount, new AsyncThreadFactory("router async responder ")); + } + AsyncRpcProtocolPBUtil.setWorker(asyncRouterResponder); + } + /** * Init router async handlers and router async responders. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncSnapshot.java new file mode 100644 index 0000000000000..49a682cea4b97 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncSnapshot.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing; +import org.apache.hadoop.hdfs.protocol.SnapshotException; +import org.apache.hadoop.hdfs.protocol.SnapshotStatus; +import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.MockResolver; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.ipc.CallerContext; +import org.apache.hadoop.test.LambdaTestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.MODIFY; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestRouterAsyncSnapshot { + private static Configuration routerConf; + /** Federated HDFS cluster. */ + private static MiniRouterDFSCluster cluster; + private static String ns0; + + /** Random Router for this federated cluster. */ + private MiniRouterDFSCluster.RouterContext router; + private FileSystem routerFs; + private RouterRpcServer routerRpcServer; + private RouterAsyncSnapshot asyncSnapshot; + + @BeforeClass + public static void setUpCluster() throws Exception { + cluster = new MiniRouterDFSCluster(true, 1, 2, + DEFAULT_HEARTBEAT_INTERVAL_MS, 1000); + cluster.setNumDatanodesPerNameservice(3); + + cluster.startCluster(); + + // Making one Namenode active per nameservice + if (cluster.isHighAvailability()) { + for (String ns : cluster.getNameservices()) { + cluster.switchToActive(ns, NAMENODES[0]); + cluster.switchToStandby(ns, NAMENODES[1]); + } + } + // Start routers with only an RPC service + routerConf = new RouterConfigBuilder() + .rpc() + .build(); + + // Reduce the number of RPC clients threads to overload the Router easy + routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1); + routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1); + routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1); + // We decrease the DN cache times to make the test faster + routerConf.setTimeDuration( + RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); + cluster.addRouterOverrides(routerConf); + // Start routers with only an RPC service + cluster.startRouters(); + + // Register and verify all NNs with all routers + cluster.registerNamenodes(); + cluster.waitNamenodeRegistration(); + cluster.waitActiveNamespaces(); + ns0 = cluster.getNameservices().get(0); + } + + @AfterClass + public static void shutdownCluster() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Before + public void setUp() throws IOException { + router = cluster.getRandomRouter(); + routerFs = router.getFileSystem(); + routerRpcServer = router.getRouterRpcServer(); + routerRpcServer.initAsyncThreadPool(); + RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient( + routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(), + routerRpcServer.getRPCMonitor(), + routerRpcServer.getRouterStateIdContext()); + RouterRpcServer spy = Mockito.spy(routerRpcServer); + Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient); + asyncSnapshot = new RouterAsyncSnapshot(spy); + + // Create mock locations + MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver(); + resolver.addLocation("/", ns0, "/"); + FsPermission permission = new FsPermission("705"); + routerFs.mkdirs(new Path("/testdir"), permission); + FSDataOutputStream fsDataOutputStream = routerFs.create( + new Path("/testdir/testSnapshot.file"), true); + fsDataOutputStream.write(new byte[1024]); + fsDataOutputStream.close(); + } + + @After + public void tearDown() throws IOException { + // clear client context + CallerContext.setCurrent(null); + boolean delete = routerFs.delete(new Path("/testdir")); + assertTrue(delete); + if (routerFs != null) { + routerFs.close(); + } + } + + @Test + public void testRouterAsyncSnapshot() throws Exception { + asyncSnapshot.allowSnapshot("/testdir"); + syncReturn(null); + asyncSnapshot.createSnapshot("/testdir", "testdirSnapshot"); + String snapshotName = syncReturn(String.class); + assertEquals("/testdir/.snapshot/testdirSnapshot", snapshotName); + asyncSnapshot.getSnapshottableDirListing(); + SnapshottableDirectoryStatus[] snapshottableDirectoryStatuses = + syncReturn(SnapshottableDirectoryStatus[].class); + assertEquals(1, snapshottableDirectoryStatuses.length); + asyncSnapshot.getSnapshotListing("/testdir"); + SnapshotStatus[] snapshotStatuses = syncReturn(SnapshotStatus[].class); + assertEquals(1, snapshotStatuses.length); + + FSDataOutputStream fsDataOutputStream = routerFs.append( + new Path("/testdir/testSnapshot.file"), true); + fsDataOutputStream.write(new byte[1024]); + fsDataOutputStream.close(); + + asyncSnapshot.createSnapshot("/testdir", "testdirSnapshot1"); + snapshotName = syncReturn(String.class); + assertEquals("/testdir/.snapshot/testdirSnapshot1", snapshotName); + + asyncSnapshot.getSnapshotDiffReport("/testdir", + "testdirSnapshot", "testdirSnapshot1"); + SnapshotDiffReport snapshotDiffReport = syncReturn(SnapshotDiffReport.class); + assertEquals(MODIFY, snapshotDiffReport.getDiffList().get(0).getType()); + + asyncSnapshot.getSnapshotDiffReportListing("/testdir", + "testdirSnapshot", "testdirSnapshot1", new byte[]{}, -1); + SnapshotDiffReportListing snapshotDiffReportListing = + syncReturn(SnapshotDiffReportListing.class); + assertEquals(1, snapshotDiffReportListing.getModifyList().size()); + + LambdaTestUtils.intercept(SnapshotException.class, () -> { + asyncSnapshot.disallowSnapshot("/testdir"); + syncReturn(null); + }); + + asyncSnapshot.renameSnapshot("/testdir", + "testdirSnapshot1", "testdirSnapshot2"); + syncReturn(null); + + LambdaTestUtils.intercept(SnapshotException.class, + "Cannot delete snapshot testdirSnapshot1 from path /testdir", + () -> { + asyncSnapshot.deleteSnapshot("/testdir", "testdirSnapshot1"); + syncReturn(null); + }); + + asyncSnapshot.deleteSnapshot("/testdir", "testdirSnapshot2"); + syncReturn(null); + + asyncSnapshot.deleteSnapshot("/testdir", "testdirSnapshot"); + syncReturn(null); + + asyncSnapshot.disallowSnapshot("/testdir"); + syncReturn(null); + } +} \ No newline at end of file From 428c52d91875bfd3a688c31467d7037f19c1b760 Mon Sep 17 00:00:00 2001 From: Wenqi Li Date: Fri, 27 Sep 2024 19:29:30 +0800 Subject: [PATCH 2/3] [ARR] RouterSnapshot supports asynchronous rpc. --- .../federation/router/RouterRpcServer.java | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 916d7d52880bd..c23c21c6dfb67 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -449,24 +449,6 @@ public RouterRpcServer(Configuration conf, Router router, initRouterFedRename(); } - protected void initAsyncThreadPool() { - int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, - DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT); - int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, - DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT); - if (asyncRouterHandler == null) { - LOG.info("init router async handler count: {}", asyncHandlerCount); - asyncRouterHandler = Executors.newFixedThreadPool( - asyncHandlerCount, new AsyncThreadFactory("router async handler ")); - } - if (asyncRouterResponder == null) { - LOG.info("init router async responder count: {}", asyncResponderCount); - asyncRouterResponder = Executors.newFixedThreadPool( - asyncResponderCount, new AsyncThreadFactory("router async responder ")); - } - AsyncRpcProtocolPBUtil.setWorker(asyncRouterResponder); - } - /** * Init router async handlers and router async responders. */ From 50a13b3a9bd6d832713496c43dded767c1fc8bc1 Mon Sep 17 00:00:00 2001 From: Wenqi Li Date: Fri, 27 Sep 2024 19:39:09 +0800 Subject: [PATCH 3/3] delete AsyncUtil --- .../hdfs/server/federation/router/RouterAsyncSnapshot.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncSnapshot.java index 7ef353f89d6a9..8d830b8427147 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAsyncSnapshot.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction; -import org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil; import org.apache.hadoop.hdfs.server.namenode.NameNode; import java.io.IOException; @@ -87,7 +86,7 @@ public String createSnapshot(String snapshotRoot, String snapshotName) throws IO return invokedResult.replaceFirst(loc.getDest(), loc.getSrc()); }); } - return AsyncUtil.asyncReturn(String.class); + return asyncReturn(String.class); } @Override @@ -101,7 +100,7 @@ public SnapshottableDirectoryStatus[] getSnapshottableDirListing() throws IOExce asyncApply((ApplyFunction, SnapshottableDirectoryStatus[]>) ret -> RouterRpcServer.merge(ret, SnapshottableDirectoryStatus.class)); - return AsyncUtil.asyncReturn(SnapshottableDirectoryStatus[].class); + return asyncReturn(SnapshottableDirectoryStatus[].class); } @Override