diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java index af4cf75af7fa..cc8d2c6f50bc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java @@ -19,6 +19,8 @@ import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; +import java.util.Locale; import java.util.ServiceLoader; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -35,7 +37,7 @@ * The entry point for creating a {@link ConnectionRegistry}. */ @InterfaceAudience.Private -final class ConnectionRegistryFactory { +public final class ConnectionRegistryFactory { private static final Logger LOG = LoggerFactory.getLogger(ConnectionRegistryFactory.class); @@ -90,4 +92,46 @@ static ConnectionRegistry create(Configuration conf, User user) { RpcConnectionRegistry.class, ConnectionRegistry.class); return ReflectionUtils.newInstance(clazz, conf, user); } + + /** + * Check whether the given {@code uri} is valid. + *

+ * Notice that there is no fallback logic for this method, so passing an URI with null scheme can + * not pass. + * @throws IOException if this is not a valid connection registry URI + */ + public static void validate(URI uri) throws IOException { + if (StringUtils.isBlank(uri.getScheme())) { + throw new IOException("No schema for uri: " + uri); + } + ConnectionRegistryURIFactory factory = FACTORIES.get(uri.getScheme().toLowerCase(Locale.ROOT)); + if (factory == null) { + throw new IOException( + "No factory registered for scheme " + uri.getScheme() + ", uri: " + uri); + } + factory.validate(uri); + } + + /** + * If the given {@code clusterKey} can be parsed to a {@link URI}, and the scheme of the + * {@link URI} is supported by us, return the {@link URI}, otherwise return {@code null}. + * @param clusterKey the cluster key, typically from replication peer config + * @return a {@link URI} or {@code null}. + */ + public static URI tryParseAsConnectionURI(String clusterKey) { + // The old cluster key format may not be parsed as URI if we use ip address as the zookeeper + // address, so here we need to catch the URISyntaxException and return false + URI uri; + try { + uri = new URI(clusterKey); + } catch (URISyntaxException e) { + LOG.debug("failed to parse cluster key to URI: {}", clusterKey, e); + return null; + } + if (FACTORIES.containsKey(uri.getScheme())) { + return uri; + } else { + return null; + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryURIFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryURIFactory.java index ab2037a1c138..e5ee4c2321df 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryURIFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryURIFactory.java @@ -39,4 +39,10 @@ public interface ConnectionRegistryURIFactory { * {@link ConnectionRegistryFactory}. */ String getScheme(); + + /** + * Validate the given {@code uri}. + * @throws IOException if this is not a valid connection registry URI. + */ + void validate(URI uri) throws IOException; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 103a64e520a1..5822657fd88b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -24,6 +24,7 @@ import edu.umd.cs.findbugs.annotations.Nullable; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -3785,15 +3786,17 @@ private CompletableFuture checkAndSyncTableToPeerClusters(TableName tableN private CompletableFuture trySyncTableToPeerCluster(TableName tableName, byte[][] splits, ReplicationPeerDescription peer) { - Configuration peerConf = null; + Configuration peerConf; try { - peerConf = - ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer); + peerConf = ReplicationPeerConfigUtil + .getPeerClusterConfiguration(connection.getConfiguration(), peer.getPeerConfig()); } catch (IOException e) { return failedFuture(e); } + URI connectionUri = + ConnectionRegistryFactory.tryParseAsConnectionURI(peer.getPeerConfig().getClusterKey()); CompletableFuture future = new CompletableFuture<>(); - addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> { + addListener(ConnectionFactory.createAsyncConnection(connectionUri, peerConf), (conn, err) -> { if (err != null) { future.completeExceptionally(err); return; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistryURIFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistryURIFactory.java index 79081ee6c649..064b6ef226a7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistryURIFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistryURIFactory.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.net.URI; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.security.User; import org.apache.yetus.audience.InterfaceAudience; @@ -46,4 +47,12 @@ public ConnectionRegistry create(URI uri, Configuration conf, User user) throws public String getScheme() { return "hbase+rpc"; } + + @Override + public void validate(URI uri) throws IOException { + if (StringUtils.isBlank(uri.getAuthority())) { + throw new IOException("no bootstrap nodes specified, uri: " + uri); + } + // TODO: add more check about the bootstrap nodes + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistryURIFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistryURIFactory.java index 939adab23b78..86af8d91d2d1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistryURIFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistryURIFactory.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.net.URI; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.security.User; @@ -49,4 +50,15 @@ public ConnectionRegistry create(URI uri, Configuration conf, User user) throws public String getScheme() { return "hbase+zk"; } + + @Override + public void validate(URI uri) throws IOException { + if (StringUtils.isBlank(uri.getAuthority())) { + throw new IOException("no zookeeper quorum specified, uri: " + uri); + } + // TODO: add more check about the zookeeper quorum + if (StringUtils.isBlank(uri.getPath())) { + throw new IOException("no zookeeper parent path specified, uri: " + uri); + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index 2fc5fa3c1152..57be558fb492 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionRegistryFactory; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -629,19 +630,19 @@ public static ReplicationPeerConfig removeExcludeTableCFsFromReplicationPeerConf /** * Returns the configuration needed to talk to the remote slave cluster. - * @param conf the base configuration - * @param peer the description of replication peer + * @param conf the base configuration + * @param peerConfig the peer config of replication peer * @return the configuration for the peer cluster, null if it was unable to get the configuration * @throws IOException when create peer cluster configuration failed */ public static Configuration getPeerClusterConfiguration(Configuration conf, - ReplicationPeerDescription peer) throws IOException { - ReplicationPeerConfig peerConfig = peer.getPeerConfig(); + ReplicationPeerConfig peerConfig) throws IOException { Configuration otherConf; - try { + if (ConnectionRegistryFactory.tryParseAsConnectionURI(peerConfig.getClusterKey()) != null) { + otherConf = HBaseConfiguration.create(conf); + } else { + // only need to apply cluster key for old style cluster key otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey()); - } catch (IOException e) { - throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e); } if (!peerConfig.getConfiguration().isEmpty()) { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java index 5fc030581dad..c554b5f40526 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.zookeeper.ZKConfig; @@ -226,11 +227,11 @@ public static Configuration createClusterConf(Configuration baseConf, String clu public static Configuration createClusterConf(Configuration baseConf, String clusterKey, String overridePrefix) throws IOException { Configuration clusterConf = HBaseConfiguration.create(baseConf); - if (clusterKey != null && !clusterKey.isEmpty()) { + if (!StringUtils.isBlank(clusterKey)) { applyClusterKeyToConf(clusterConf, clusterKey); } - if (overridePrefix != null && !overridePrefix.isEmpty()) { + if (!StringUtils.isBlank(overridePrefix)) { Configuration clusterSubset = HBaseConfiguration.subset(clusterConf, overridePrefix); HBaseConfiguration.merge(clusterConf, clusterSubset); } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 6e3650297bd3..d83fa1d52522 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableSnapshotScanner; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.PrefixFilter; @@ -55,7 +56,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; -import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; @@ -397,7 +397,7 @@ public boolean isAborted() { ReplicationStorageFactory.getReplicationPeerStorage(FileSystem.get(conf), localZKW, conf); ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId); return Pair.newPair(peerConfig, - ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf)); + ReplicationPeerConfigUtil.getPeerClusterConfiguration(conf, peerConfig)); } catch (ReplicationException e) { throw new IOException("An error occurred while trying to connect to the remote peer cluster", e); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java index 2958c5ef9114..c7b0ed4c4b05 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java @@ -86,6 +86,14 @@ public class TestVerifyReplication extends TestReplicationBase { @Rule public TestName name = new TestName(); + @Override + protected String getClusterKey(HBaseTestingUtil util) throws Exception { + // TODO: VerifyReplication does not support connection uri yet, so here we need to use cluster + // key, as in this test we will pass the cluster key config in peer config directly to + // VerifyReplication job. + return util.getClusterKey(); + } + @Before public void setUp() throws Exception { cleanUp(); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationAdjunct.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationAdjunct.java index d78b2f2e2edd..7044b002a5eb 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationAdjunct.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationAdjunct.java @@ -81,6 +81,14 @@ public class TestVerifyReplicationAdjunct extends TestReplicationBase { @Rule public TestName name = new TestName(); + @Override + protected String getClusterKey(HBaseTestingUtil util) throws Exception { + // TODO: VerifyReplication does not support connection uri yet, so here we need to use cluster + // key, as in this test we will pass the cluster key config in peer config directly to + // VerifyReplication job. + return util.getClusterKey(); + } + @Before public void setUp() throws Exception { cleanUp(); diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index 56b86a6f9d13..34da4d237c20 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.replication; +import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.Set; @@ -24,6 +25,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; @@ -155,8 +157,15 @@ private ReplicationPeerImpl createPeer(String peerId) throws ReplicationExceptio SyncReplicationState syncReplicationState = peerStorage.getPeerSyncReplicationState(peerId); SyncReplicationState newSyncReplicationState = peerStorage.getPeerNewSyncReplicationState(peerId); - return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf), - peerId, peerConfig, enabled, syncReplicationState, newSyncReplicationState); + Configuration peerClusterConf; + try { + peerClusterConf = ReplicationPeerConfigUtil.getPeerClusterConfiguration(conf, peerConfig); + } catch (IOException e) { + throw new ReplicationException( + "failed to apply cluster key to configuration for peer config " + peerConfig, e); + } + return new ReplicationPeerImpl(peerClusterConf, peerId, peerConfig, enabled, + syncReplicationState, newSyncReplicationState); } @Override @@ -166,8 +175,8 @@ public void onConfigurationChange(Configuration conf) { for (ReplicationPeerImpl peer : peerCache.values()) { try { peer.onConfigurationChange( - ReplicationUtils.getPeerClusterConfiguration(peer.getPeerConfig(), conf)); - } catch (ReplicationException e) { + ReplicationPeerConfigUtil.getPeerClusterConfiguration(conf, peer.getPeerConfig())); + } catch (IOException e) { LOG.warn("failed to reload configuration for peer {}", peer.getId(), e); } } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index ae78781a3133..46a59f896922 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -25,8 +25,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.CompoundConfiguration; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; @@ -61,25 +59,6 @@ public final class ReplicationUtils { private ReplicationUtils() { } - public static Configuration getPeerClusterConfiguration(ReplicationPeerConfig peerConfig, - Configuration baseConf) throws ReplicationException { - Configuration otherConf; - try { - otherConf = HBaseConfiguration.createClusterConf(baseConf, peerConfig.getClusterKey()); - } catch (IOException e) { - throw new ReplicationException("Can't get peer configuration for peer " + peerConfig, e); - } - - if (!peerConfig.getConfiguration().isEmpty()) { - CompoundConfiguration compound = new CompoundConfiguration(); - compound.add(otherConf); - compound.addStringMap(peerConfig.getConfiguration()); - return compound; - } - - return otherConf; - } - private static boolean isCollectionEqual(Collection c1, Collection c2) { if (c1 == null) { return c2 == null; diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index dc46e4f1c7c8..bc843eb297c1 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -23,6 +23,7 @@ import static org.junit.Assert.fail; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.junit.Test; @@ -86,8 +87,8 @@ public void testReplicationPeers() throws Exception { SyncReplicationState.NONE); assertNumberOfPeers(2); - assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils - .getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf()))); + assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationPeerConfigUtil + .getPeerClusterConfiguration(rp.getConf(), rp.getPeerStorage().getPeerConfig(ID_ONE)))); rp.getPeerStorage().removePeer(ID_ONE); rp.removePeer(ID_ONE); assertNumberOfPeers(1); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java index ed90863763a7..70a1e703c667 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.net.SocketAddress; +import java.net.URI; import java.security.PrivilegedExceptionAction; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.security.User; @@ -68,6 +69,20 @@ public static AsyncClusterConnection createAsyncClusterConnection(Configuration localAddress, user); } + /** + * Create a new {@link AsyncClusterConnection} instance. + *

+ * This is usually used in replication, the given {@code uri} specifies the connection info of the + * remote cluster. + */ + public static AsyncClusterConnection createAsyncClusterConnection(URI uri, Configuration conf, + SocketAddress localAddress, User user) throws IOException { + ConnectionRegistry registry = uri != null + ? ConnectionRegistryFactory.create(uri, conf, user) + : ConnectionRegistryFactory.create(conf, user); + return createAsyncClusterConnection(conf, registry, localAddress, user); + } + /** * Create a new {@link AsyncClusterConnection} instance to be used at server side where we have a * {@link ConnectionRegistryEndpoint}. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 322b5bb7fc78..ac9491834ae8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -39,11 +39,16 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.ConnectionRegistryFactory; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.master.MasterServices; @@ -402,6 +407,57 @@ public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationExcepti queueStorage.removePeerFromHFileRefs(peerId); } + private void checkClusterKey(String clusterKey, ReplicationEndpoint endpoint) + throws DoNotRetryIOException { + if (endpoint != null && !(endpoint instanceof HBaseReplicationEndpoint)) { + return; + } + // Endpoints implementing HBaseReplicationEndpoint need to check cluster key + URI connectionUri = ConnectionRegistryFactory.tryParseAsConnectionURI(clusterKey); + try { + if (connectionUri != null) { + ConnectionRegistryFactory.validate(connectionUri); + } else { + ZKConfig.validateClusterKey(clusterKey); + } + } catch (IOException e) { + throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e); + } + if (endpoint != null && endpoint.canReplicateToSameCluster()) { + return; + } + // make sure we do not replicate to same cluster + String peerClusterId; + try { + if (connectionUri != null) { + // fetch cluster id through standard admin API + try (Connection conn = ConnectionFactory.createConnection(connectionUri, conf); + Admin admin = conn.getAdmin()) { + peerClusterId = + admin.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.CLUSTER_ID)).getClusterId(); + } + } else { + // Create the peer cluster config for get peer cluster id + Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey); + try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) { + peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher); + } + } + } catch (IOException | KeeperException e) { + // we just want to check whether we will replicate to the same cluster, so if we get an error + // while getting the cluster id of the peer cluster, it means we are not connecting to + // ourselves, as we are still alive. So here we just log the error and continue + LOG.warn("Can't get peerClusterId for clusterKey=" + clusterKey, e); + return; + } + // In rare case, zookeeper setting may be messed up. That leads to the incorrect + // peerClusterId value, which is the same as the source clusterId + if (clusterId.equals(peerClusterId)) { + throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey + + ", should not replicate to itself for HBaseInterClusterReplicationEndpoint"); + } + } + private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl(); ReplicationEndpoint endpoint = null; @@ -416,14 +472,7 @@ private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetry e); } } - // Endpoints implementing HBaseReplicationEndpoint need to check cluster key - if (endpoint == null || endpoint instanceof HBaseReplicationEndpoint) { - checkClusterKey(peerConfig.getClusterKey()); - // Check if endpoint can replicate to the same cluster - if (endpoint == null || !endpoint.canReplicateToSameCluster()) { - checkSameClusterKey(peerConfig.getClusterKey()); - } - } + checkClusterKey(peerConfig.getClusterKey(), endpoint); if (peerConfig.replicateAllUserTables()) { // If replicate_all flag is true, it means all user tables will be replicated to peer cluster. @@ -563,33 +612,6 @@ private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) } } - private void checkClusterKey(String clusterKey) throws DoNotRetryIOException { - try { - ZKConfig.validateClusterKey(clusterKey); - } catch (IOException e) { - throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e); - } - } - - private void checkSameClusterKey(String clusterKey) throws DoNotRetryIOException { - String peerClusterId = ""; - try { - // Create the peer cluster config for get peer cluster id - Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey); - try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) { - peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher); - } - } catch (IOException | KeeperException e) { - throw new DoNotRetryIOException("Can't get peerClusterId for clusterKey=" + clusterKey, e); - } - // In rare case, zookeeper setting may be messed up. That leads to the incorrect - // peerClusterId value, which is the same as the source clusterId - if (clusterId.equals(peerClusterId)) { - throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey - + ", should not replicate to itself for HBaseInterClusterReplicationEndpoint"); - } - } - public List getSerialPeerIdsBelongsTo(TableName tableName) { return peers.values().stream().filter(p -> p.getPeerConfig().isSerial()) .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId()) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index f0ea993a41ba..b85ce2bad47d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; import org.apache.hadoop.hbase.client.ClusterConnectionFactory; +import org.apache.hadoop.hbase.client.ConnectionRegistryFactory; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.ReservoirSample; @@ -55,6 +57,8 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint protected Configuration conf; + private URI clusterURI; + private final Object connLock = new Object(); private volatile AsyncClusterConnection conn; @@ -82,19 +86,23 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint private List sinkServers = new ArrayList<>(0); - /* + /** * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different * Connection implementations, or initialize it in a different way, so defining createConnection * as protected for possible overridings. */ - protected AsyncClusterConnection createConnection(Configuration conf) throws IOException { - return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent()); + protected AsyncClusterConnection createConnection(URI clusterURI, Configuration conf) + throws IOException { + return ClusterConnectionFactory.createAsyncClusterConnection(clusterURI, conf, null, + User.getCurrent()); } @Override public void init(Context context) throws IOException { super.init(context); this.conf = HBaseConfiguration.create(ctx.getConfiguration()); + this.clusterURI = ConnectionRegistryFactory + .tryParseAsConnectionURI(context.getReplicationPeer().getPeerConfig().getClusterKey()); this.ratio = ctx.getConfiguration().getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO); this.badSinkThreshold = @@ -167,7 +175,7 @@ private AsyncClusterConnection connect() throws IOException { if (c != null) { return c; } - c = createConnection(this.conf); + c = createConnection(clusterURI, conf); conn = c; } return c; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtil.java index dcdf55a945b5..fd5b7dd729e0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtil.java @@ -3216,6 +3216,24 @@ public static String safeGetAsStr(List lst, int i) { } } + public String getRpcConnnectionURI() throws UnknownHostException { + return "hbase+rpc://" + MasterRegistry.getMasterAddr(conf); + } + + public String getZkConnectionURI() { + return "hbase+zk://" + conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); + } + + /** + * Get the zk based cluster key for this cluster. + * @deprecated since 2.7.0, will be removed in 4.0.0. Now we use connection uri to specify the + * connection info of a cluster. Keep here only for compatibility. + * @see #getRpcConnnectionURI() + * @see #getZkConnectionURI() + */ + @Deprecated public String getClusterKey() { return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":" diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin4.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin4.java index e52d8ee92c3c..61e028705f88 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin4.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin4.java @@ -79,7 +79,7 @@ public void testReplicationPeerModificationSwitch() throws Exception { assertTrue(ADMIN.replicationPeerModificationSwitch(false)); IOException error = assertThrows(IOException.class, () -> ADMIN.addReplicationPeer("peer", ReplicationPeerConfig - .newBuilder().setClusterKey(TEST_UTIL.getClusterKey() + "-test").build())); + .newBuilder().setClusterKey(TEST_UTIL.getRpcConnnectionURI()).build())); assertThat(error.getCause().getMessage(), containsString("Replication peer modification disabled")); // enable again, and the previous value should be false diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java index 157277d83022..f1f47b92c9f5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java @@ -86,8 +86,8 @@ public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); TEST_UTIL.startMiniCluster(); - KEY_ONE = TEST_UTIL.getClusterKey() + "-test1"; - KEY_TWO = TEST_UTIL.getClusterKey() + "-test2"; + KEY_ONE = TEST_UTIL.getZkConnectionURI() + "-test1"; + KEY_TWO = TEST_UTIL.getZkConnectionURI() + "-test2"; ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java index 18727866bf78..3144f607a542 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java @@ -87,7 +87,7 @@ public static void setUpBeforeClass() throws Exception { admin2 = connection.getAdmin(); ReplicationPeerConfig rpc = - ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL2.getClusterKey()).build(); + ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL2.getRpcConnnectionURI()).build(); ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java index 3f23364d136c..e02403542a91 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java @@ -392,7 +392,7 @@ public void testReplicaAndReplication() throws Exception { try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); Admin admin = connection.getAdmin()) { ReplicationPeerConfig rpc = - ReplicationPeerConfig.newBuilder().setClusterKey(HTU2.getClusterKey()).build(); + ReplicationPeerConfig.newBuilder().setClusterKey(HTU2.getRpcConnnectionURI()).build(); admin.addReplicationPeer("2", rpc); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminForSyncReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminForSyncReplication.java index 5d0cc33eacb6..daa29908251a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminForSyncReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminForSyncReplication.java @@ -79,7 +79,7 @@ public void testAddPeerWithSameTable() throws Exception { Thread[] threads = new Thread[5]; for (int i = 0; i < 5; i++) { String peerId = "id" + i; - String clusterKey = TEST_UTIL.getClusterKey() + "-test" + i; + String clusterKey = TEST_UTIL.getZkConnectionURI() + "-test" + i; int index = i; threads[i] = new Thread(() -> { try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java index 5aef1eaf1c6b..da1bc04d7e03 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -109,8 +109,8 @@ public static void tearDownAfterClass() throws Exception { public void setup() throws ReplicationException, IOException { root = TEST_UTIL.getDataTestDirOnTestFS(); rp.getPeerStorage().addPeer(peerId, - ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL.getClusterKey()).build(), true, - SyncReplicationState.NONE); + ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL.getRpcConnnectionURI()).build(), + true, SyncReplicationState.NONE); } @After diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDisablePeerModification.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDisablePeerModification.java index 7b9fa7001559..2ce61cd1b5d3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDisablePeerModification.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDisablePeerModification.java @@ -117,7 +117,7 @@ public void testDrainProcs() throws Exception { RESUME = new CountDownLatch(1); AsyncAdmin admin = UTIL.getAsyncConnection().getAdmin(); ReplicationPeerConfig rpc = - ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test") + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getRpcConnnectionURI() + "-test") .setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build(); CompletableFuture addFuture = admin.addReplicationPeer("test_peer_" + async, rpc); ARRIVE.await(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java index b711269c5ba7..7ab7578df1c2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java @@ -25,6 +25,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.net.UnknownHostException; import java.util.List; import java.util.Map; import java.util.Optional; @@ -170,8 +171,9 @@ public void setUpBase() throws Exception { BULK_LOADS_COUNT = new AtomicInteger(0); } - private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtil util) { - return ReplicationPeerConfig.newBuilder().setClusterKey(util.getClusterKey()) + private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtil util) + throws UnknownHostException { + return ReplicationPeerConfig.newBuilder().setClusterKey(util.getRpcConnnectionURI()) .setSerial(isSerialPeer()).build(); } @@ -185,7 +187,7 @@ private void setupCoprocessor(HBaseTestingUtil cluster) { cluster.getConfiguration()); cp = r.getCoprocessorHost() .findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class); - cp.clusterName = cluster.getClusterKey(); + cp.clusterName = cluster.getRpcConnnectionURI(); } } catch (Exception e) { LOG.error(e.getMessage(), e); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplicationHFileRefs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplicationHFileRefs.java index 787784c8ec40..bfc80232792f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplicationHFileRefs.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplicationHFileRefs.java @@ -161,7 +161,7 @@ public void testWhenExcludeCF() throws Exception { Map> excludeTableCFs = Maps.newHashMap(); excludeTableCFs.put(REPLICATE_TABLE, Lists.newArrayList(Bytes.toString(CF_B))); ReplicationPeerConfig peerConfig = - ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()) + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI()) .setReplicateAllUserTables(true).setExcludeTableCFsMap(excludeTableCFs).build(); admin1.addReplicationPeer(PEER_ID2, peerConfig); Assert.assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE)); @@ -192,7 +192,7 @@ public void testWhenExcludeTable() throws Exception { Map> excludeTableCFs = Maps.newHashMap(); excludeTableCFs.put(NO_REPLICATE_TABLE, null); ReplicationPeerConfig peerConfig = - ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()) + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI()) .setReplicateAllUserTables(true).setExcludeTableCFsMap(excludeTableCFs).build(); admin1.addReplicationPeer(PEER_ID2, peerConfig); assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE)); @@ -223,7 +223,7 @@ public void testWhenExcludeNamespace() throws Exception { // Add peer, setReplicateAllUserTables true, but exclude one namespace. ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() - .setClusterKey(UTIL2.getClusterKey()).setReplicateAllUserTables(true) + .setClusterKey(UTIL2.getRpcConnnectionURI()).setReplicateAllUserTables(true) .setExcludeNamespaces(Sets.newHashSet(NO_REPLICATE_NAMESPACE)).build(); admin1.addReplicationPeer(PEER_ID2, peerConfig); assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java index 7f5df02ecfc3..f0caa7a02ba8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java @@ -125,12 +125,12 @@ public static void setUp() throws Exception { new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(), "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory()); UTIL1.getAdmin().addReplicationPeer(PEER_ID, - ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()) + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI()) .setReplicateAllUserTables(false) .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>())) .setRemoteWALDir(REMOTE_WAL_DIR2.toUri().toString()).build()); UTIL2.getAdmin().addReplicationPeer(PEER_ID, - ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getClusterKey()) + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getRpcConnnectionURI()) .setReplicateAllUserTables(false) .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>())) .setRemoteWALDir(REMOTE_WAL_DIR1.toUri().toString()).build()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java index 95adc8a365cd..058564dc0ecf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java @@ -19,8 +19,10 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; +import java.net.URI; import java.util.Collection; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -36,8 +38,6 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; @@ -48,22 +48,21 @@ public class TestHBaseReplicationEndpoint { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestHBaseReplicationEndpoint.class); - private static final Logger LOG = LoggerFactory.getLogger(TestHBaseReplicationEndpoint.class); - private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); private HBaseReplicationEndpoint endpoint; @Before public void setUp() throws Exception { - try { - ReplicationEndpoint.Context context = new ReplicationEndpoint.Context(null, - UTIL.getConfiguration(), UTIL.getConfiguration(), null, null, null, null, null, null, null); - endpoint = new DummyHBaseReplicationEndpoint(); - endpoint.init(context); - } catch (Exception e) { - LOG.info("Failed", e); - } + ReplicationPeer replicationPeer = mock(ReplicationPeer.class); + ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); + when(replicationPeer.getPeerConfig()).thenReturn(peerConfig); + when(peerConfig.getClusterKey()).thenReturn("hbase+zk://server1:2181/hbase"); + ReplicationEndpoint.Context context = + new ReplicationEndpoint.Context(null, UTIL.getConfiguration(), UTIL.getConfiguration(), null, + null, null, replicationPeer, null, null, null); + endpoint = new DummyHBaseReplicationEndpoint(); + endpoint.init(context); } @Test @@ -205,7 +204,8 @@ public boolean replicate(ReplicateContext replicateContext) { } @Override - public AsyncClusterConnection createConnection(Configuration conf) throws IOException { + public AsyncClusterConnection createConnection(URI clusterURI, Configuration conf) + throws IOException { return null; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java index f4c26a9b4562..c7a8ec7373b8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java @@ -616,7 +616,7 @@ private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber) try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]); Admin admin = conn.getAdmin()) { admin.addReplicationPeer(id, ReplicationPeerConfig.newBuilder() - .setClusterKey(utilities[slaveClusterNumber].getClusterKey()).build()); + .setClusterKey(utilities[slaveClusterNumber].getRpcConnnectionURI()).build()); } } @@ -626,7 +626,7 @@ private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, Admin admin = conn.getAdmin()) { admin.addReplicationPeer(id, ReplicationPeerConfig.newBuilder() - .setClusterKey(utilities[slaveClusterNumber].getClusterKey()) + .setClusterKey(utilities[slaveClusterNumber].getRpcConnnectionURI()) .setReplicateAllUserTables(false) .setTableCFsMap(ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs)).build()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMigrateRepliationPeerStorageOnline.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMigrateRepliationPeerStorageOnline.java index a824dde42a4a..30dc18f4eb18 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMigrateRepliationPeerStorageOnline.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMigrateRepliationPeerStorageOnline.java @@ -64,7 +64,7 @@ public static void tearDown() throws IOException { public void testMigrate() throws Exception { Admin admin = UTIL.getAdmin(); ReplicationPeerConfig rpc = - ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test") + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getRpcConnnectionURI() + "-test") .setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build(); admin.addReplicationPeer("1", rpc); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java index 66386d275b2e..95772ee639b3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java @@ -142,7 +142,7 @@ public void testMultiSlaveReplication() throws Exception { Table htable3 = utility3.getConnection().getTable(tableName); ReplicationPeerConfigBuilder rpcBuilder = - ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()); + ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getRpcConnnectionURI()); admin1.addReplicationPeer("1", rpcBuilder.build()); // put "row" and wait 'til it got around, then delete @@ -159,7 +159,7 @@ public void testMultiSlaveReplication() throws Exception { // after the log was rolled put a new row putAndWait(row3, famName, htable1, htable2); - rpcBuilder.setClusterKey(utility3.getClusterKey()); + rpcBuilder.setClusterKey(utility3.getRpcConnnectionURI()); admin1.addReplicationPeer("2", rpcBuilder.build()); // put a row, check it was replicated to all clusters diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplicationWithBulkLoadedData.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplicationWithBulkLoadedData.java index 5fc48b2d7298..cd6e39dc65bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplicationWithBulkLoadedData.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplicationWithBulkLoadedData.java @@ -158,7 +158,7 @@ public void setUpBase() throws Exception { Set namespaces = new HashSet<>(); namespaces.add(NS1); ReplicationPeerConfig rpc4_ns = - ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getClusterKey()) + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getRpcConnnectionURI()) .setReplicateAllUserTables(false).setNamespaces(namespaces).build(); admin1.addReplicationPeer(PEER4_NS, rpc4_ns); @@ -169,7 +169,7 @@ public void setUpBase() throws Exception { Map> tableCFsMap = new HashMap<>(); tableCFsMap.put(NS2_TABLE, null); ReplicationPeerConfig rpc4_ns_table = - ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getClusterKey()) + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getRpcConnnectionURI()) .setReplicateAllUserTables(false).setTableCFsMap(tableCFsMap).build(); admin1.addReplicationPeer(PEER4_NS_TABLE, rpc4_ns_table); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java index f0df221c0fe8..62c7c8f5af27 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java @@ -404,7 +404,7 @@ public void testPerTableCFReplication() throws Exception { tableCFs.get(tabBName).add("f1"); tableCFs.get(tabBName).add("f3"); ReplicationPeerConfig rpc2 = - ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()) + ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getRpcConnnectionURI()) .setReplicateAllUserTables(false).setTableCFsMap(tableCFs).build(); replicationAdmin.addReplicationPeer("2", rpc2); @@ -414,7 +414,7 @@ public void testPerTableCFReplication() throws Exception { tableCFs.get(tabBName).add("f1"); tableCFs.get(tabBName).add("f2"); ReplicationPeerConfig rpc3 = - ReplicationPeerConfig.newBuilder().setClusterKey(utility3.getClusterKey()) + ReplicationPeerConfig.newBuilder().setClusterKey(utility3.getRpcConnnectionURI()) .setReplicateAllUserTables(false).setTableCFsMap(tableCFs).build(); replicationAdmin.addReplicationPeer("3", rpc3); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index 1429c3277371..70a6d73c6202 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -282,10 +282,15 @@ private boolean peerExist(String peerId) throws IOException { return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId())); } + // can be override in tests, in case you need to use zk based uri, or the old style uri + protected String getClusterKey(HBaseTestingUtil util) throws Exception { + return util.getRpcConnnectionURI(); + } + protected final void addPeer(String peerId, TableName tableName) throws Exception { if (!peerExist(peerId)) { ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder() - .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()) + .setClusterKey(getClusterKey(UTIL2)).setSerial(isSerialPeer()) .setReplicationEndpointImpl(ReplicationEndpointTest.class.getName()); if (isSyncPeer()) { FileSystem fs2 = UTIL2.getTestFileSystem(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java index fa7548e3eccc..1faa25f116f0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java @@ -21,6 +21,7 @@ import static org.junit.Assert.fail; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -43,6 +44,13 @@ public class TestReplicationDisableInactivePeer extends TestReplicationBase { private static final Logger LOG = LoggerFactory.getLogger(TestReplicationDisableInactivePeer.class); + @Override + protected String getClusterKey(HBaseTestingUtil util) throws Exception { + // in this test we will restart the peer cluster, and the master address will be changed, so we + // need to use zk based connection uri + return util.getZkConnectionURI(); + } + /** * Test disabling an inactive peer. Add a peer which is inactive, trying to insert, disable the * peer, then activate the peer and make sure nothing is replicated. In Addition, enable the peer diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEditsDroppedWithDeletedTableCFs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEditsDroppedWithDeletedTableCFs.java index 7137395ef7c2..f9106c7ce23a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEditsDroppedWithDeletedTableCFs.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEditsDroppedWithDeletedTableCFs.java @@ -122,7 +122,7 @@ public void setup() throws Exception { } // add peer ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() - .setClusterKey(utility2.getClusterKey()).setReplicateAllUserTables(true).build(); + .setClusterKey(utility2.getRpcConnnectionURI()).setReplicateAllUserTables(true).build(); admin1.addReplicationPeer(PEER_ID, rpc); // create table createTable(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEditsDroppedWithDroppedTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEditsDroppedWithDroppedTable.java index 3b73262980ae..ed42df416e95 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEditsDroppedWithDroppedTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEditsDroppedWithDroppedTable.java @@ -125,7 +125,7 @@ public void setup() throws Exception { } // add peer ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() - .setClusterKey(utility2.getClusterKey()).setReplicateAllUserTables(true).build(); + .setClusterKey(utility2.getRpcConnnectionURI()).setReplicateAllUserTables(true).build(); admin1.addReplicationPeer(PEER_ID, rpc); // create table createTable(NORMAL_TABLE); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 3d9fa06d2e75..aae2af10264b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -266,7 +266,7 @@ public void testAddAndRemoveClusters() throws Exception { } } ReplicationPeerConfig rpc = - ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()).build(); + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI()).build(); hbaseAdmin.addReplicationPeer(PEER_ID, rpc); Thread.sleep(SLEEP_TIME); rowKey = Bytes.toBytes("do rep"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusAfterLagging.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusAfterLagging.java index d9a90b57c2a0..c761078dfab3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusAfterLagging.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusAfterLagging.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.ClusterMetrics.Option; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Put; @@ -42,6 +43,13 @@ public class TestReplicationStatusAfterLagging extends TestReplicationBase { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationStatusAfterLagging.class); + @Override + protected String getClusterKey(HBaseTestingUtil util) throws Exception { + // in this test we will restart the peer cluster, and the master address will be changed, so we + // need to use zk based connection uri + return util.getZkConnectionURI(); + } + @Test public void testReplicationStatusAfterLagging() throws Exception { UTIL2.shutdownMiniHBaseCluster(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStuckWithDeletedTableCFs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStuckWithDeletedTableCFs.java index 43c15787839d..8b61c049be7a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStuckWithDeletedTableCFs.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStuckWithDeletedTableCFs.java @@ -120,7 +120,7 @@ private void createTable(TableName tableName) throws Exception { public void testEditsStuckBehindDeletedCFs() throws Exception { // add peer ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() - .setClusterKey(utility2.getClusterKey()).setReplicateAllUserTables(true).build(); + .setClusterKey(utility2.getRpcConnnectionURI()).setReplicateAllUserTables(true).build(); admin1.addReplicationPeer(PEER_ID, rpc); // create table diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStuckWithDroppedTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStuckWithDroppedTable.java index 2a0ccdb9d095..ae720dd4f15c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStuckWithDroppedTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStuckWithDroppedTable.java @@ -121,7 +121,7 @@ private void createTable(TableName tableName) throws Exception { public void testEditsStuckBehindDroppedTable() throws Exception { // add peer ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() - .setClusterKey(utility2.getClusterKey()).setReplicateAllUserTables(true).build(); + .setClusterKey(utility2.getRpcConnnectionURI()).setReplicateAllUserTables(true).build(); admin1.addReplicationPeer(PEER_ID, rpc); // create table diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java index 442582410581..9455cf567276 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java @@ -118,12 +118,12 @@ final void setupReplication() throws Exception { admin2.createTable(t2SyncupTarget); // Get HTable from Master - Connection conn1 = ConnectionFactory.createConnection(UTIL1.getConfiguration()); + conn1 = ConnectionFactory.createConnection(UTIL1.getConfiguration()); ht1Source = conn1.getTable(TN1); ht2Source = conn1.getTable(TN2); // Get HTable from Peer1 - Connection conn2 = ConnectionFactory.createConnection(UTIL2.getConfiguration()); + conn2 = ConnectionFactory.createConnection(UTIL2.getConfiguration()); ht1TargetAtPeer1 = conn2.getTable(TN1); ht2TargetAtPeer1 = conn2.getTable(TN2); @@ -131,7 +131,7 @@ final void setupReplication() throws Exception { * set M-S : Master: utility1 Slave1: utility2 */ ReplicationPeerConfig rpc = - ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()).build(); + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getZkConnectionURI()).build(); admin1.addReplicationPeer("1", rpc); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java index 7128b02f3c63..484206ad8387 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java @@ -144,7 +144,7 @@ public static void setUpBeforeClass() throws Exception { connection1 = ConnectionFactory.createConnection(conf1); replicationAdmin = connection1.getAdmin(); ReplicationPeerConfig rpc = - ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build(); + ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getRpcConnnectionURI()).build(); replicationAdmin.addReplicationPeer("2", rpc); TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAME) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java index a41d47df64d7..971b0938ccd8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java @@ -145,7 +145,7 @@ public static void setUpBeforeClass() throws Exception { connection1 = ConnectionFactory.createConnection(conf1); replicationAdmin = connection1.getAdmin(); ReplicationPeerConfig rpc = - ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build(); + ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getRpcConnnectionURI()).build(); replicationAdmin.addReplicationPeer("2", rpc); TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAME) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java index 66720b93606f..6d9e70f851fb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; import org.apache.hadoop.fs.Path; @@ -71,7 +72,7 @@ public void testStandbyKillRegionServer() throws Exception { Thread t = new Thread(() -> { try { List regionServers = - UTIL2.getMiniHBaseCluster().getLiveRegionServerThreads(); + new ArrayList<>(UTIL2.getMiniHBaseCluster().getLiveRegionServerThreads()); LOG.debug("Going to stop {} RSes: [{}]", regionServers.size(), regionServers.stream().map(rst -> rst.getRegionServer().getServerName().getServerName()) .collect(Collectors.joining(", "))); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java index 5941e3cc0286..6b14438c8308 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java @@ -17,7 +17,8 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import java.io.IOException; +import static org.junit.Assert.assertTrue; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -30,7 +31,6 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; @@ -40,12 +40,10 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; @@ -55,6 +53,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; + @Category({ ReplicationTests.class, LargeTests.class }) public class TestGlobalReplicationThrottler { @@ -91,21 +91,21 @@ public static void setUpBeforeClass() throws Exception { utility1 = new HBaseTestingUtil(conf1); utility1.startMiniZKCluster(); MiniZooKeeperCluster miniZK = utility1.getZkCluster(); - new ZKWatcher(conf1, "cluster1", null, true); + new ZKWatcher(conf1, "cluster1", null, true).close(); conf2 = new Configuration(conf1); conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); utility2 = new HBaseTestingUtil(conf2); utility2.setZkCluster(miniZK); - new ZKWatcher(conf2, "cluster2", null, true); - - ReplicationPeerConfig rpc = - ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build(); + new ZKWatcher(conf2, "cluster2", null, true).close(); utility1.startMiniCluster(); utility2.startMiniCluster(); + ReplicationPeerConfig rpc = + ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getRpcConnnectionURI()).build(); + try (Connection connection = ConnectionFactory.createConnection(utility1.getConfiguration()); Admin admin1 = connection.getAdmin()) { admin1.addReplicationPeer("peer1", rpc); @@ -121,11 +121,11 @@ public static void tearDownAfterClass() throws Exception { utility1.shutdownMiniCluster(); } - volatile private boolean testQuotaPass = false; - volatile private boolean testQuotaNonZero = false; + private volatile boolean testQuotaPass = false; + private volatile boolean testQuotaNonZero = false; @Test - public void testQuota() throws IOException { + public void testQuota() throws Exception { final TableName tableName = TableName.valueOf(name.getMethodName()); TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder @@ -143,10 +143,8 @@ public void testQuota() throws IOException { testQuotaNonZero = true; } // the reason here doing "numOfPeer + 1" is because by using method addEntryToBatch(), even - // the - // batch size (after added last entry) exceeds quota, it still keeps the last one in the - // batch - // so total used buffer size can be one "replication.total.buffer.quota" larger than + // the batch size (after added last entry) exceeds quota, it still keeps the last one in the + // batch so total used buffer size can be one "replication.total.buffer.quota" larger than // expected if (size > REPLICATION_SOURCE_QUOTA * (numOfPeer + 1)) { // We read logs first then check throttler, so if the buffer quota limiter doesn't @@ -158,35 +156,24 @@ public void testQuota() throws IOException { }); watcher.start(); - try (Table t1 = utility1.getConnection().getTable(tableName); - Table t2 = utility2.getConnection().getTable(tableName)) { + try (Table t1 = utility1.getConnection().getTable(tableName)) { for (int i = 0; i < 50; i++) { Put put = new Put(ROWS[i]); put.addColumn(famName, VALUE, VALUE); t1.put(put); } - long start = EnvironmentEdgeManager.currentTime(); - while (EnvironmentEdgeManager.currentTime() - start < 180000) { - Scan scan = new Scan(); - scan.setCaching(50); - int count = 0; - try (ResultScanner results = t2.getScanner(scan)) { - for (Result result : results) { - count++; - } - } - if (count < 50) { - LOG.info("Waiting all logs pushed to slave. Expected 50 , actual " + count); - Threads.sleep(200); - continue; - } - break; - } } + utility2.waitFor(180000, () -> { + try (Table t2 = utility2.getConnection().getTable(tableName); + ResultScanner results = t2.getScanner(new Scan().setCaching(50))) { + int count = Iterables.size(results); + return count >= 50; + } + }); watcher.interrupt(); - Assert.assertTrue(testQuotaPass); - Assert.assertTrue(testQuotaNonZero); + assertTrue(testQuotaPass); + assertTrue(testQuotaNonZero); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestHBaseInterClusterReplicationEndpointFilterEdits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestHBaseInterClusterReplicationEndpointFilterEdits.java index d6de3dc7a02f..cdef7de2076b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestHBaseInterClusterReplicationEndpointFilterEdits.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestHBaseInterClusterReplicationEndpointFilterEdits.java @@ -80,6 +80,7 @@ public static void setUpBeforeClass() throws Exception { ReplicationPeerConfig rpc = mock(ReplicationPeerConfig.class); when(rpc.isSerial()).thenReturn(false); when(replicationPeer.getPeerConfig()).thenReturn(rpc); + when(rpc.getClusterKey()).thenReturn("hbase+zk://localhost:2181"); Context context = new Context(null, UTIL.getConfiguration(), UTIL.getConfiguration(), null, null, null, replicationPeer, null, null, null); endpoint = new HBaseInterClusterReplicationEndpoint(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationMarker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationMarker.java index 79487ab309e6..1466f5f5a03e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationMarker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationMarker.java @@ -105,7 +105,7 @@ public static void setUpBeforeClass() throws Exception { utility1.startMiniCluster(1); Admin admin1 = utility1.getAdmin(); ReplicationPeerConfigBuilder rpcBuilder = ReplicationPeerConfig.newBuilder(); - rpcBuilder.setClusterKey(utility2.getClusterKey()); + rpcBuilder.setClusterKey(utility2.getRpcConnnectionURI()); admin1.addReplicationPeer("1", rpcBuilder.build()); ReplicationSourceManager manager = utility1.getHBaseCluster().getRegionServer(0) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 53996c376647..05b268d3a0a9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -337,8 +337,8 @@ public void testServerShutdownRecoveredQueue() throws Exception { final Admin admin = TEST_UTIL.getAdmin(); final String peerId = "TestPeer"; - admin.addReplicationPeer(peerId, - ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build()); + admin.addReplicationPeer(peerId, ReplicationPeerConfig.newBuilder() + .setClusterKey(TEST_UTIL_PEER.getRpcConnnectionURI()).build()); // Wait for replication sources to come up Waiter.waitFor(conf, 20000, new Waiter.Predicate() { @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java index c48755fb5f0d..979db712ef34 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java @@ -71,7 +71,7 @@ public void testReplicatorBatching() throws Exception { // Replace the peer set up for us by the base class with a wrapper for this test hbaseAdmin.addReplicationPeer("testReplicatorBatching", - ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()) + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI()) .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()).build()); ReplicationEndpointForTest.setBatchCount(0); @@ -120,7 +120,7 @@ public void testReplicatorWithErrors() throws Exception { // Replace the peer set up for us by the base class with a wrapper for this test hbaseAdmin.addReplicationPeer("testReplicatorWithErrors", - ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()) + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI()) .setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()) .build()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java index ec76386046a7..091991b44997 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java @@ -137,7 +137,7 @@ public void setup() throws Exception { admin = TEST_UTIL.getAdmin(); ReplicationPeerConfig rpc = - ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL1.getClusterKey()).build(); + ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL1.getRpcConnnectionURI()).build(); admin.addReplicationPeer("2", rpc); TableDescriptor tableDescriptor = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java index ff586e2b682d..dc313d414ae8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java @@ -189,7 +189,7 @@ public void setup() throws Exception { admin = TEST_UTIL.getAdmin(); ReplicationPeerConfig rpc = - ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL1.getClusterKey()).build(); + ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL1.getRpcConnnectionURI()).build(); admin.addReplicationPeer("2", rpc); Admin hBaseAdmin = TEST_UTIL.getAdmin(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java index 20ed3796dbd9..c8e96383492a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java @@ -182,7 +182,7 @@ public void testCleanReplicationBarrierWithExistTable() throws Exception { public static void createPeer() throws IOException { ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() - .setClusterKey(UTIL.getClusterKey() + "-test").setSerial(true).build(); + .setClusterKey(UTIL.getZkConnectionURI() + "-test").setSerial(true).build(); UTIL.getAdmin().addReplicationPeer(PEER_1, rpc); UTIL.getAdmin().addReplicationPeer(PEER_2, rpc); } diff --git a/hbase-testing-util/src/main/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-testing-util/src/main/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 8cdf2719db93..a0453d7e5f38 100644 --- a/hbase-testing-util/src/main/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-testing-util/src/main/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -3528,6 +3528,24 @@ public static String safeGetAsStr(List lst, int i) { } } + public String getRpcConnnectionURI() throws UnknownHostException { + return "hbase+rpc://" + MasterRegistry.getMasterAddr(conf); + } + + public String getZkConnectionURI() { + return "hbase+zk://" + conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); + } + + /** + * Get the zk based cluster key for this cluster. + * @deprecated since 2.7.0, will be removed in 4.0.0. Now we use connection uri to specify the + * connection info of a cluster. Keep here only for compatibility. + * @see #getRpcConnnectionURI() + * @see #getZkConnectionURI() + */ + @Deprecated public String getClusterKey() { return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":"