Skip to content

Commit e3761ba

Browse files
authored
HBASE-28436 Use connection url to specify the connection registry information (#5770)
Signed-off-by: Istvan Toth <[email protected]> Signed-off-by: Nick Dimiduk <[email protected]> Reviewed-by: Bryan Beaudreault <[email protected]>
1 parent 5a404c4 commit e3761ba

20 files changed

+828
-74
lines changed

hbase-client/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,11 @@
170170
<artifactId>mockito-core</artifactId>
171171
<scope>test</scope>
172172
</dependency>
173+
<dependency>
174+
<groupId>org.mockito</groupId>
175+
<artifactId>mockito-inline</artifactId>
176+
<scope>test</scope>
177+
</dependency>
173178
<dependency>
174179
<groupId>org.hamcrest</groupId>
175180
<artifactId>hamcrest-library</artifactId>

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java

Lines changed: 261 additions & 52 deletions
Large diffs are not rendered by default.

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,77 @@
1717
*/
1818
package org.apache.hadoop.hbase.client;
1919

20-
import static org.apache.hadoop.hbase.HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
21-
20+
import java.io.IOException;
21+
import java.net.URI;
22+
import java.util.ServiceLoader;
23+
import org.apache.commons.lang3.StringUtils;
2224
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.hbase.HConstants;
2326
import org.apache.hadoop.hbase.security.User;
2427
import org.apache.hadoop.hbase.util.ReflectionUtils;
2528
import org.apache.yetus.audience.InterfaceAudience;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
2633

2734
/**
28-
* Factory class to get the instance of configured connection registry.
35+
* The entry point for creating a {@link ConnectionRegistry}.
2936
*/
3037
@InterfaceAudience.Private
3138
final class ConnectionRegistryFactory {
3239

40+
private static final Logger LOG = LoggerFactory.getLogger(ConnectionRegistryFactory.class);
41+
42+
private static final ImmutableMap<String, ConnectionRegistryURIFactory> CREATORS;
43+
static {
44+
ImmutableMap.Builder<String, ConnectionRegistryURIFactory> builder = ImmutableMap.builder();
45+
for (ConnectionRegistryURIFactory factory : ServiceLoader
46+
.load(ConnectionRegistryURIFactory.class)) {
47+
builder.put(factory.getScheme().toLowerCase(), factory);
48+
}
49+
// throw IllegalArgumentException if there are duplicated keys
50+
CREATORS = builder.buildOrThrow();
51+
}
52+
3353
private ConnectionRegistryFactory() {
3454
}
3555

36-
/** Returns The connection registry implementation to use. */
37-
static ConnectionRegistry getRegistry(Configuration conf, User user) {
56+
/**
57+
* Returns the connection registry implementation to use, for the given connection url
58+
* {@code uri}.
59+
* <p/>
60+
* We use {@link ServiceLoader} to load different implementations, and use the scheme of the given
61+
* {@code uri} to select. And if there is no protocol specified, or we can not find a
62+
* {@link ConnectionRegistryURIFactory} implementation for the given scheme, we will fallback to
63+
* use the old way to create the {@link ConnectionRegistry}. Notice that, if fallback happens, the
64+
* specified connection url {@code uri} will not take effect, we will load all the related
65+
* configurations from the given Configuration instance {@code conf}
66+
*/
67+
static ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException {
68+
if (StringUtils.isBlank(uri.getScheme())) {
69+
LOG.warn("No scheme specified for {}, fallback to use old way", uri);
70+
return create(conf, user);
71+
}
72+
ConnectionRegistryURIFactory creator = CREATORS.get(uri.getScheme().toLowerCase());
73+
if (creator == null) {
74+
LOG.warn("No creator registered for {}, fallback to use old way", uri);
75+
return create(conf, user);
76+
}
77+
return creator.create(uri, conf, user);
78+
}
79+
80+
/**
81+
* Returns the connection registry implementation to use.
82+
* <p/>
83+
* This is used when we do not have a connection url, we will use the old way to load the
84+
* connection registry, by checking the
85+
* {@literal HConstants#CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY} configuration.
86+
*/
87+
static ConnectionRegistry create(Configuration conf, User user) {
3888
Class<? extends ConnectionRegistry> clazz =
39-
conf.getClass(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, RpcConnectionRegistry.class,
40-
ConnectionRegistry.class);
89+
conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
90+
RpcConnectionRegistry.class, ConnectionRegistry.class);
4191
return ReflectionUtils.newInstance(clazz, conf, user);
4292
}
4393
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client;
19+
20+
import java.io.IOException;
21+
import java.net.URI;
22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.hbase.security.User;
24+
import org.apache.yetus.audience.InterfaceAudience;
25+
26+
/**
27+
* For creating different {@link ConnectionRegistry} implementation.
28+
*/
29+
@InterfaceAudience.Private
30+
public interface ConnectionRegistryURIFactory {
31+
32+
/**
33+
* Instantiate the {@link ConnectionRegistry} using the given parameters.
34+
*/
35+
ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException;
36+
37+
/**
38+
* The scheme for this implementation. Used to register this URI factory to the
39+
* {@link ConnectionRegistryFactory}.
40+
*/
41+
String getScheme();
42+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client;
19+
20+
import java.io.IOException;
21+
import java.net.URI;
22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.hbase.security.User;
24+
import org.apache.yetus.audience.InterfaceAudience;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
/**
29+
* Connection registry creator implementation for creating {@link RpcConnectionRegistry}.
30+
*/
31+
@InterfaceAudience.Private
32+
public class RpcConnectionRegistryCreator implements ConnectionRegistryURIFactory {
33+
34+
private static final Logger LOG = LoggerFactory.getLogger(RpcConnectionRegistryCreator.class);
35+
36+
@Override
37+
public ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException {
38+
assert getScheme().equals(uri.getScheme());
39+
LOG.debug("connect to hbase cluster with rpc bootstrap servers='{}'", uri.getAuthority());
40+
Configuration c = new Configuration(conf);
41+
c.set(RpcConnectionRegistry.BOOTSTRAP_NODES, uri.getAuthority());
42+
return new RpcConnectionRegistry(c, user);
43+
}
44+
45+
@Override
46+
public String getScheme() {
47+
return "hbase+rpc";
48+
}
49+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client;
19+
20+
import java.io.IOException;
21+
import java.net.URI;
22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.hbase.HConstants;
24+
import org.apache.hadoop.hbase.security.User;
25+
import org.apache.yetus.audience.InterfaceAudience;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
/**
30+
* Connection registry creator implementation for creating {@link ZKConnectionRegistry}.
31+
*/
32+
@InterfaceAudience.Private
33+
public class ZKConnectionRegistryCreator implements ConnectionRegistryURIFactory {
34+
35+
private static final Logger LOG = LoggerFactory.getLogger(ZKConnectionRegistryCreator.class);
36+
37+
@Override
38+
public ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException {
39+
assert getScheme().equals(uri.getScheme());
40+
LOG.debug("connect to hbase cluster with zk quorum='{}' and parent='{}'", uri.getAuthority(),
41+
uri.getPath());
42+
Configuration c = new Configuration(conf);
43+
c.set(HConstants.CLIENT_ZOOKEEPER_QUORUM, uri.getAuthority());
44+
c.set(HConstants.ZOOKEEPER_ZNODE_PARENT, uri.getPath());
45+
return new ZKConnectionRegistry(c, user);
46+
}
47+
48+
@Override
49+
public String getScheme() {
50+
return "hbase+zk";
51+
}
52+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
org.apache.hadoop.hbase.client.RpcConnectionRegistryCreator
17+
org.apache.hadoop.hbase.client.ZKConnectionRegistryCreator

0 commit comments

Comments
 (0)