Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class ReplicationPeerImpl implements ReplicationPeer {
public class ReplicationPeerImpl implements ReplicationPeer, ConfigurationObserver {

private final Configuration conf;
private volatile Configuration conf;

private final String id;

Expand Down Expand Up @@ -151,4 +152,9 @@ public long getPeerBandwidth() {
public void registerPeerConfigListener(ReplicationPeerConfigListener listener) {
this.peerConfigListeners.add(listener);
}

@Override
public void onConfigurationChange(Configuration conf) {
this.conf = conf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,38 @@
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This provides an class for maintaining a set of peer clusters. These peers are remote slave
* clusters that data is replicated to.
* <p>
* We implement {@link ConfigurationObserver} mainly for recreating the
* {@link ReplicationPeerStorage}, so we can change the {@link ReplicationPeerStorage} without
* restarting the region server.
*/
@InterfaceAudience.Private
public class ReplicationPeers {
public class ReplicationPeers implements ConfigurationObserver {

private final Configuration conf;
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeers.class);

private volatile Configuration conf;

// Map of peer clusters keyed by their id
private final ConcurrentMap<String, ReplicationPeerImpl> peerCache;
private final ReplicationPeerStorage peerStorage;
private final FileSystem fs;
private final ZKWatcher zookeeper;
private volatile ReplicationPeerStorage peerStorage;

ReplicationPeers(FileSystem fs, ZKWatcher zookeeper, Configuration conf) {
this.conf = conf;
this.fs = fs;
this.zookeeper = zookeeper;
this.peerCache = new ConcurrentHashMap<>();
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf);
}
Expand Down Expand Up @@ -145,4 +158,18 @@ private ReplicationPeerImpl createPeer(String peerId) throws ReplicationExceptio
return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf),
peerId, peerConfig, enabled, syncReplicationState, newSyncReplicationState);
}

@Override
public void onConfigurationChange(Configuration conf) {
this.conf = conf;
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf);
for (ReplicationPeerImpl peer : peerCache.values()) {
try {
peer.onConfigurationChange(
ReplicationUtils.getPeerClusterConfiguration(peer.getPeerConfig(), conf));
} catch (ReplicationException e) {
LOG.warn("failed to reload configuration for peer {}", peer.getId(), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,12 @@ public MetaRegionLocationCache getMetaRegionLocationCache() {
return this.metaRegionLocationCache;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public ConfigurationManager getConfigurationManager() {
return configurationManager;
}

/**
* Reload the configuration from disk.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.MasterStoppedException;
Expand Down Expand Up @@ -791,6 +790,7 @@ private void initializeZKBasedSystemTrackers()

this.replicationPeerManager =
ReplicationPeerManager.create(fileSystemManager.getFileSystem(), zooKeeper, conf, clusterId);
this.configurationManager.registerObserver(replicationPeerManager);
this.replicationPeerModificationStateStore =
new ReplicationPeerModificationStateStore(masterRegion);

Expand Down Expand Up @@ -4293,12 +4293,6 @@ static void setDisableBalancerChoreForTest(boolean disable) {
disableBalancerChoreForTest = disable;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public ConfigurationManager getConfigurationManager() {
return configurationManager;
}

private void setQuotasObserver(Configuration conf) {
// Add the Observer to delete quotas on table deletion before starting all CPs by
// default with quota support, avoiding if user specifically asks to not load this Observer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
Expand Down Expand Up @@ -69,13 +70,16 @@
* Manages and performs all replication admin operations.
* <p>
* Used to add/remove a replication peer.
* <p>
* Implement {@link ConfigurationObserver} mainly for recreating {@link ReplicationPeerStorage}, for
* supporting migrating across different replication peer storages without restarting master.
*/
@InterfaceAudience.Private
public class ReplicationPeerManager {
public class ReplicationPeerManager implements ConfigurationObserver {

private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerManager.class);

private final ReplicationPeerStorage peerStorage;
private volatile ReplicationPeerStorage peerStorage;

private final ReplicationQueueStorage queueStorage;

Expand All @@ -94,10 +98,18 @@ public class ReplicationPeerManager {

private final String clusterId;

private final Configuration conf;
private volatile Configuration conf;

// for dynamic recreating ReplicationPeerStorage.
private final FileSystem fs;

private final ZKWatcher zk;

ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
ConcurrentMap<String, ReplicationPeerDescription> peers, Configuration conf, String clusterId) {
ReplicationPeerManager(FileSystem fs, ZKWatcher zk, ReplicationPeerStorage peerStorage,
ReplicationQueueStorage queueStorage, ConcurrentMap<String, ReplicationPeerDescription> peers,
Configuration conf, String clusterId) {
this.fs = fs;
this.zk = zk;
this.peerStorage = peerStorage;
this.queueStorage = queueStorage;
this.peers = peers;
Expand Down Expand Up @@ -582,7 +594,7 @@ public static ReplicationPeerManager create(FileSystem fs, ZKWatcher zk, Configu
SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId);
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state));
}
return new ReplicationPeerManager(peerStorage,
return new ReplicationPeerManager(fs, zk, peerStorage,
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, conf, clusterId);
}

Expand All @@ -604,4 +616,10 @@ public boolean tryAcquireSyncReplicationPeerLock() {
public void releaseSyncReplicationPeerLock() {
syncReplicationPeerLock.release();
}

@Override
public void onConfigurationChange(Configuration conf) {
this.conf = conf;
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.locking.EntityLock;
import org.apache.hadoop.hbase.client.locking.LockServiceClient;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
Expand Down Expand Up @@ -2065,6 +2065,14 @@ private void initializeThreads() {
}

private void registerConfigurationObservers() {
// Register Replication if possible, as now we support recreating replication peer storage, for
// migrating across different replication peer storages online
if (replicationSourceHandler instanceof ConfigurationObserver) {
configurationManager.registerObserver((ConfigurationObserver) replicationSourceHandler);
}
if (!sameReplicationSourceAndSink && replicationSinkHandler instanceof ConfigurationObserver) {
configurationManager.registerObserver((ConfigurationObserver) replicationSinkHandler);
}
// Registering the compactSplitThread object with the ConfigurationManager.
configurationManager.registerObserver(this.compactSplitThread);
configurationManager.registerObserver(this.rpcServices);
Expand Down Expand Up @@ -3315,11 +3323,6 @@ public Optional<MobFileCache> getMobFileCache() {
return Optional.ofNullable(this.mobFileCache);
}

/** Returns : Returns the ConfigurationManager object for testing purposes. */
ConfigurationManager getConfigurationManager() {
return configurationManager;
}

CacheEvictionStats clearRegionBlockCache(Region region) {
long evictedBlocks = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
Expand All @@ -50,15 +52,18 @@

/**
* Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
* <p>
* Implement {@link PropagatingConfigurationObserver} mainly for registering
* {@link ReplicationPeers}, so we can recreating the replication peer storage.
*/
@InterfaceAudience.Private
public class Replication implements ReplicationSourceService {
public class Replication implements ReplicationSourceService, PropagatingConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(Replication.class);
private boolean isReplicationForBulkLoadDataEnabled;
private ReplicationSourceManager replicationManager;
private ReplicationQueueStorage queueStorage;
private ReplicationPeers replicationPeers;
private Configuration conf;
private volatile Configuration conf;
private SyncReplicationPeerInfoProvider syncReplicationPeerInfoProvider;
// Hosting server
private Server server;
Expand Down Expand Up @@ -229,4 +234,19 @@ public SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider() {
public ReplicationPeers getReplicationPeers() {
return replicationPeers;
}

@Override
public void onConfigurationChange(Configuration conf) {
this.conf = conf;
}

@Override
public void registerChildren(ConfigurationManager manager) {
manager.registerObserver(replicationPeers);
}

@Override
public void deregisterChildren(ConfigurationManager manager) {
manager.deregisterObserver(replicationPeers);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ ReplicationTests.class, LargeTests.class })
public class TestMigrateRepliationPeerStorageOnline {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMigrateRepliationPeerStorageOnline.class);

private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();

@BeforeClass
public static void setUp() throws Exception {
// use zookeeper first, and then migrate to filesystem
UTIL.getConfiguration().set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
ReplicationPeerStorageType.ZOOKEEPER.name());
UTIL.startMiniCluster(1);
}

@AfterClass
public static void tearDown() throws IOException {
UTIL.shutdownMiniCluster();
}

@Test
public void testMigrate() throws Exception {
Admin admin = UTIL.getAdmin();
ReplicationPeerConfig rpc =
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test")
.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build();
admin.addReplicationPeer("1", rpc);

// disable peer modification
admin.replicationPeerModificationSwitch(false, true);

// migrate replication peer data
Configuration conf = new Configuration(UTIL.getConfiguration());
assertEquals(0, ToolRunner.run(conf, new CopyReplicationPeers(conf),
new String[] { "zookeeper", "filesystem" }));
conf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
ReplicationPeerStorageType.FILESYSTEM.name());
// confirm that we have copied the data
ReplicationPeerStorage fsPeerStorage = ReplicationStorageFactory
.getReplicationPeerStorage(UTIL.getTestFileSystem(), UTIL.getZooKeeperWatcher(), conf);
assertNotNull(fsPeerStorage.getPeerConfig("1"));

for (MasterThread mt : UTIL.getMiniHBaseCluster().getMasterThreads()) {
Configuration newConf = new Configuration(mt.getMaster().getConfiguration());
newConf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
ReplicationPeerStorageType.FILESYSTEM.name());
mt.getMaster().getConfigurationManager().notifyAllObservers(newConf);
}
for (RegionServerThread rt : UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
Configuration newConf = new Configuration(rt.getRegionServer().getConfiguration());
newConf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
ReplicationPeerStorageType.FILESYSTEM.name());
rt.getRegionServer().getConfigurationManager().notifyAllObservers(newConf);
}

admin.replicationPeerModificationSwitch(true);
admin.removeReplicationPeer("1");

// confirm that we will operation on the new peer storage
assertThat(fsPeerStorage.listPeerIds(), empty());
}
}