Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Locale;
import java.util.ServiceLoader;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -35,7 +37,7 @@
* The entry point for creating a {@link ConnectionRegistry}.
*/
@InterfaceAudience.Private
final class ConnectionRegistryFactory {
public final class ConnectionRegistryFactory {

private static final Logger LOG = LoggerFactory.getLogger(ConnectionRegistryFactory.class);

Expand Down Expand Up @@ -90,4 +92,46 @@ static ConnectionRegistry create(Configuration conf, User user) {
RpcConnectionRegistry.class, ConnectionRegistry.class);
return ReflectionUtils.newInstance(clazz, conf, user);
}

/**
* Check whether the given {@code uri} is valid.
* <p/>
* Notice that there is no fallback logic for this method, so passing an URI with null scheme can
* not pass.
* @throws IOException if this is not a valid connection registry URI
*/
public static void validate(URI uri) throws IOException {
if (StringUtils.isBlank(uri.getScheme())) {
throw new IOException("No schema for uri: " + uri);
}
ConnectionRegistryURIFactory factory = FACTORIES.get(uri.getScheme().toLowerCase(Locale.ROOT));
if (factory == null) {
throw new IOException(
"No factory registered for scheme " + uri.getScheme() + ", uri: " + uri);
}
factory.validate(uri);
}

/**
* If the given {@code clusterKey} can be parsed to a {@link URI}, and the scheme of the
* {@link URI} is supported by us, return the {@link URI}, otherwise return {@code null}.
* @param clusterKey the cluster key, typically from replication peer config
* @return a {@link URI} or {@code null}.
*/
public static URI tryParseAsConnectionURI(String clusterKey) {
// The old cluster key format may not be parsed as URI if we use ip address as the zookeeper
// address, so here we need to catch the URISyntaxException and return false
URI uri;
try {
uri = new URI(clusterKey);
} catch (URISyntaxException e) {
LOG.debug("failed to parse cluster key to URI: {}", clusterKey, e);
return null;
}
if (FACTORIES.containsKey(uri.getScheme())) {
return uri;
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,10 @@ public interface ConnectionRegistryURIFactory {
* {@link ConnectionRegistryFactory}.
*/
String getScheme();

/**
* Validate the given {@code uri}.
* @throws IOException if this is not a valid connection registry URI.
*/
void validate(URI uri) throws IOException;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this interface should be boolean isValid(URI), and then the caller is free to throw or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwing exception could let the upper layer know the details about why this is not valid.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I understand. This enforces try-catch flow control where if/else would be better. For this kind of thing, I really like the Result based API that is catching on in other languages.

Anyway, maybe it should throw something besides IOException ? IllegatStateException, for example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like UnsupportedProtocol or IllegalArgument? But IllegalArgument is a RuntimeException, developers may miss to catch it and cause some fatal errors...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a RuntimeException is okay. It happens often enough in the JDK libraries, where they will also add a note about it in the javadoc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked the implementation, now we have these errors

No protocol scheme
No factory registered for the scheme

For zk based registry
Empty zk server string, i.e, empty uri authority
Empty zk parent path, i.e, empty uri path

For rpc based registry
Empty bootstrap nodes, i.e, empty uri authority

In general, there are no accurate exception types for these errors, and since we may add new checks for different registry implementations in the future, I prefer we still keep the throws IOException declaration, and can file new issues to add some specific exceptions which extend HBaseIOException for these cases.

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -3785,15 +3786,17 @@ private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableN

private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
ReplicationPeerDescription peer) {
Configuration peerConf = null;
Configuration peerConf;
try {
peerConf =
ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer);
peerConf = ReplicationPeerConfigUtil
.getPeerClusterConfiguration(connection.getConfiguration(), peer.getPeerConfig());
} catch (IOException e) {
return failedFuture(e);
}
URI connectionUri =
ConnectionRegistryFactory.tryParseAsConnectionURI(peer.getPeerConfig().getClusterKey());
CompletableFuture<Void> future = new CompletableFuture<>();
addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> {
addListener(ConnectionFactory.createAsyncConnection(connectionUri, peerConf), (conn, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.net.URI;
import org.apache.commons.lang3.StringUtils;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I forget which dependencies we expose transitively. Should we be using a shaded version of this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me check if we have commons-lang3 shaded.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not shade commons-lang3 in hbase-thirdparty

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay thanks for checking. I wonder if we should.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commons-lang3 is not likely to introduce big conflicts. It changed its package name to commons-lang3 from commons-lang when introducing breaking changes.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -46,4 +47,12 @@ public ConnectionRegistry create(URI uri, Configuration conf, User user) throws
public String getScheme() {
return "hbase+rpc";
}

@Override
public void validate(URI uri) throws IOException {
if (StringUtils.isBlank(uri.getAuthority())) {
throw new IOException("no bootstrap nodes specified, uri: " + uri);
}
// TODO: add more check about the bootstrap nodes
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.net.URI;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.security.User;
Expand Down Expand Up @@ -49,4 +50,15 @@ public ConnectionRegistry create(URI uri, Configuration conf, User user) throws
public String getScheme() {
return "hbase+zk";
}

@Override
public void validate(URI uri) throws IOException {
if (StringUtils.isBlank(uri.getAuthority())) {
throw new IOException("no zookeeper quorum specified, uri: " + uri);
}
// TODO: add more check about the zookeeper quorum
if (StringUtils.isBlank(uri.getPath())) {
throw new IOException("no zookeeper parent path specified, uri: " + uri);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionRegistryFactory;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
Expand Down Expand Up @@ -629,19 +630,19 @@ public static ReplicationPeerConfig removeExcludeTableCFsFromReplicationPeerConf

/**
* Returns the configuration needed to talk to the remote slave cluster.
* @param conf the base configuration
* @param peer the description of replication peer
* @param conf the base configuration
* @param peerConfig the peer config of replication peer
* @return the configuration for the peer cluster, null if it was unable to get the configuration
* @throws IOException when create peer cluster configuration failed
*/
public static Configuration getPeerClusterConfiguration(Configuration conf,
ReplicationPeerDescription peer) throws IOException {
ReplicationPeerConfig peerConfig = peer.getPeerConfig();
ReplicationPeerConfig peerConfig) throws IOException {
Configuration otherConf;
try {
if (ConnectionRegistryFactory.tryParseAsConnectionURI(peerConfig.getClusterKey()) != null) {
otherConf = HBaseConfiguration.create(conf);
} else {
// only need to apply cluster key for old style cluster key
otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey());
} catch (IOException e) {
throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e);
}

if (!peerConfig.getConfiguration().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
Expand Down Expand Up @@ -226,11 +227,11 @@ public static Configuration createClusterConf(Configuration baseConf, String clu
public static Configuration createClusterConf(Configuration baseConf, String clusterKey,
String overridePrefix) throws IOException {
Configuration clusterConf = HBaseConfiguration.create(baseConf);
if (clusterKey != null && !clusterKey.isEmpty()) {
if (!StringUtils.isBlank(clusterKey)) {
applyClusterKeyToConf(clusterConf, clusterKey);
}

if (overridePrefix != null && !overridePrefix.isEmpty()) {
if (!StringUtils.isBlank(overridePrefix)) {
Configuration clusterSubset = HBaseConfiguration.subset(clusterConf, overridePrefix);
HBaseConfiguration.merge(clusterConf, clusterSubset);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableSnapshotScanner;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
Expand All @@ -55,7 +56,6 @@
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
Expand Down Expand Up @@ -397,7 +397,7 @@ public boolean isAborted() {
ReplicationStorageFactory.getReplicationPeerStorage(FileSystem.get(conf), localZKW, conf);
ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId);
return Pair.newPair(peerConfig,
ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf));
ReplicationPeerConfigUtil.getPeerClusterConfiguration(conf, peerConfig));
} catch (ReplicationException e) {
throw new IOException("An error occurred while trying to connect to the remote peer cluster",
e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ public class TestVerifyReplication extends TestReplicationBase {
@Rule
public TestName name = new TestName();

@Override
protected String getClusterKey(HBaseTestingUtil util) throws Exception {
// TODO: VerifyReplication does not support connection uri yet, so here we need to use cluster
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an issue filed for this TODO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!

// key, as in this test we will pass the cluster key config in peer config directly to
// VerifyReplication job.
return util.getClusterKey();
}

@Before
public void setUp() throws Exception {
cleanUp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ public class TestVerifyReplicationAdjunct extends TestReplicationBase {
@Rule
public TestName name = new TestName();

@Override
protected String getClusterKey(HBaseTestingUtil util) throws Exception {
// TODO: VerifyReplication does not support connection uri yet, so here we need to use cluster
// key, as in this test we will pass the cluster key config in peer config directly to
// VerifyReplication job.
return util.getClusterKey();
}

@Before
public void setUp() throws Exception {
cleanUp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
*/
package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
Expand Down Expand Up @@ -155,8 +157,15 @@ private ReplicationPeerImpl createPeer(String peerId) throws ReplicationExceptio
SyncReplicationState syncReplicationState = peerStorage.getPeerSyncReplicationState(peerId);
SyncReplicationState newSyncReplicationState =
peerStorage.getPeerNewSyncReplicationState(peerId);
return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf),
peerId, peerConfig, enabled, syncReplicationState, newSyncReplicationState);
Configuration peerClusterConf;
try {
peerClusterConf = ReplicationPeerConfigUtil.getPeerClusterConfiguration(conf, peerConfig);
} catch (IOException e) {
throw new ReplicationException(
"failed to apply cluster key to configuration for peer config " + peerConfig, e);
}
return new ReplicationPeerImpl(peerClusterConf, peerId, peerConfig, enabled,
syncReplicationState, newSyncReplicationState);
}

@Override
Expand All @@ -166,8 +175,8 @@ public void onConfigurationChange(Configuration conf) {
for (ReplicationPeerImpl peer : peerCache.values()) {
try {
peer.onConfigurationChange(
ReplicationUtils.getPeerClusterConfiguration(peer.getPeerConfig(), conf));
} catch (ReplicationException e) {
ReplicationPeerConfigUtil.getPeerClusterConfiguration(conf, peer.getPeerConfig()));
} catch (IOException e) {
LOG.warn("failed to reload configuration for peer {}", peer.getId(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -61,25 +59,6 @@ public final class ReplicationUtils {
private ReplicationUtils() {
}

public static Configuration getPeerClusterConfiguration(ReplicationPeerConfig peerConfig,
Configuration baseConf) throws ReplicationException {
Configuration otherConf;
try {
otherConf = HBaseConfiguration.createClusterConf(baseConf, peerConfig.getClusterKey());
} catch (IOException e) {
throw new ReplicationException("Can't get peer configuration for peer " + peerConfig, e);
}

if (!peerConfig.getConfiguration().isEmpty()) {
CompoundConfiguration compound = new CompoundConfiguration();
compound.add(otherConf);
compound.addStringMap(peerConfig.getConfiguration());
return compound;
}

return otherConf;
}

private static boolean isCollectionEqual(Collection<String> c1, Collection<String> c2) {
if (c1 == null) {
return c2 == null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.junit.Assert.fail;

import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.junit.Test;
Expand Down Expand Up @@ -86,8 +87,8 @@ public void testReplicationPeers() throws Exception {
SyncReplicationState.NONE);
assertNumberOfPeers(2);

assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils
.getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf())));
assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationPeerConfigUtil
.getPeerClusterConfiguration(rp.getConf(), rp.getPeerStorage().getPeerConfig(ID_ONE))));
rp.getPeerStorage().removePeer(ID_ONE);
rp.removePeer(ID_ONE);
assertNumberOfPeers(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.net.SocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.security.User;
Expand Down Expand Up @@ -68,6 +69,20 @@ public static AsyncClusterConnection createAsyncClusterConnection(Configuration
localAddress, user);
}

/**
* Create a new {@link AsyncClusterConnection} instance.
* <p/>
* This is usually used in replication, the given {@code uri} specifies the connection info of the
* remote cluster.
*/
public static AsyncClusterConnection createAsyncClusterConnection(URI uri, Configuration conf,
SocketAddress localAddress, User user) throws IOException {
ConnectionRegistry registry = uri != null
? ConnectionRegistryFactory.create(uri, conf, user)
: ConnectionRegistryFactory.create(conf, user);
return createAsyncClusterConnection(conf, registry, localAddress, user);
}

/**
* Create a new {@link AsyncClusterConnection} instance to be used at server side where we have a
* {@link ConnectionRegistryEndpoint}.
Expand Down
Loading