Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3342,6 +3342,18 @@ public void setInitialized(boolean isInitialized) {
procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized);
}

/**
* Mainly used in procedure related tests, where we will restart ProcedureExecutor and
* AssignmentManager, but we do not want to restart master(to speed up the test), so we need to
* disable rpc for a while otherwise some critical rpc requests such as
* reportRegionStateTransition could fail and cause region server to abort.
*/
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public void setServiceStarted(boolean started) {
this.serviceStarted = started;
}

@Override
public ProcedureEvent<?> getInitializedEvent() {
return initialized;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void upgrade(Set<ServerName> deadServersFromPE, Set<ServerName> liveServe
.forEach(s -> LOG.error("{} has no matching ServerCrashProcedure", s));
// create ServerNode for all possible live servers from wal directory and master local region
liveServersBeforeRestart
.forEach(sn -> server.getAssignmentManager().getRegionStates().getOrCreateServer(sn));
.forEach(sn -> server.getAssignmentManager().getRegionStates().createServer(sn));
ServerManager serverManager = server.getServerManager();
synchronized (this) {
Set<ServerName> liveServers = regionServers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ public ServerName findServerWithSameHostnamePortWithLock(final ServerName server
void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) {
LOG.info("Registering regionserver=" + serverName);
this.onlineServers.put(serverName, sl);
master.getAssignmentManager().getRegionStates().createServer(serverName);
}

public ConcurrentNavigableMap<byte[], Long> getFlushedSequenceIdByRegion() {
Expand Down Expand Up @@ -603,6 +604,10 @@ synchronized long expireServer(final ServerName serverName, boolean force) {
}
LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
long pid = master.getAssignmentManager().submitServerCrash(serverName, true, force);
if (pid == Procedure.NO_PROC_ID) {
// skip later processing as we failed to submit SCP
return Procedure.NO_PROC_ID;
}
storage.expired(serverName);
// Tell our listeners that a server was removed
if (!this.listeners.isEmpty()) {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ private boolean include(final RegionStateNode node, final boolean offline) {
// ============================================================================================

private void setServerState(ServerName serverName, ServerState state) {
ServerStateNode serverNode = getOrCreateServer(serverName);
ServerStateNode serverNode = getServerNode(serverName);
synchronized (serverNode) {
serverNode.setState(state);
}
Expand Down Expand Up @@ -746,18 +746,16 @@ public List<RegionState> getRegionFailedOpen() {
// ==========================================================================

/**
* Be judicious calling this method. Do it on server register ONLY otherwise you could mess up
* online server accounting. TOOD: Review usage and convert to {@link #getServerNode(ServerName)}
* where we can.
* Create the ServerStateNode when registering a new region server
*/
public ServerStateNode getOrCreateServer(final ServerName serverName) {
return serverMap.computeIfAbsent(serverName, key -> new ServerStateNode(key));
public void createServer(ServerName serverName) {
serverMap.computeIfAbsent(serverName, key -> new ServerStateNode(key));
}

/**
* Called by SCP at end of successful processing.
*/
public void removeServer(final ServerName serverName) {
public void removeServer(ServerName serverName) {
serverMap.remove(serverName);
}

Expand All @@ -776,17 +774,24 @@ public double getAverageLoad() {
return numServers == 0 ? 0.0 : (double) totalLoad / (double) numServers;
}

public ServerStateNode addRegionToServer(final RegionStateNode regionNode) {
ServerStateNode serverNode = getOrCreateServer(regionNode.getRegionLocation());
public void addRegionToServer(final RegionStateNode regionNode) {
ServerStateNode serverNode = getServerNode(regionNode.getRegionLocation());
serverNode.addRegion(regionNode);
return serverNode;
}

public ServerStateNode removeRegionFromServer(final ServerName serverName,
public void removeRegionFromServer(final ServerName serverName,
final RegionStateNode regionNode) {
ServerStateNode serverNode = getOrCreateServer(serverName);
serverNode.removeRegion(regionNode);
return serverNode;
ServerStateNode serverNode = getServerNode(serverName);
// here the server node could be null. For example, if there is already a TRSP for a region and
// at the same time, the target server is crashed and there is a SCP. The SCP will interrupt the
// TRSP and the TRSP will first set the region as abnormally closed and remove it from the
// server node. But here, this TRSP is not a child procedure of the SCP, so it is possible that
// the SCP finishes, thus removes the server node for this region server, before the TRSP wakes
// up and enter here to remove the region node from the server node, then we will get a null
// server node here.
if (serverNode != null) {
serverNode.removeRegion(regionNode);
}
}

// ==========================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
public class ServerStateNode implements Comparable<ServerStateNode> {
private final Set<RegionStateNode> regions;
private final ServerName serverName;
// the lock here is for fencing SCP and TRSP, so not all operations need to hold this lock
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private volatile ServerState state = ServerState.ONLINE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ List<RegionInfo> getRegionsOnCrashedServer(MasterProcedureEnv env) {
LOG.warn("Failed scan of hbase:meta for 'Unknown Servers'", ioe);
return ris;
}
// create the server state node too
env.getAssignmentManager().getRegionStates().createServer(getServerName());
LOG.info("Found {} mentions of {} in hbase:meta of OPEN/OPENING Regions: {}",
visitor.getReassigns().size(), getServerName(), visitor.getReassigns().stream()
.map(RegionInfo::getEncodedName).collect(Collectors.joining(",")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void testAssignmentsForBalancer() throws Exception {
AssignmentManager assignmentManager = master.getAssignmentManager();
RegionStates regionStates = assignmentManager.getRegionStates();
ServerName sn1 = ServerName.parseServerName("asf903.gq1.ygridcore.net,52690,1517835491385");
regionStates.getOrCreateServer(sn1);
regionStates.createServer(sn1);

TableStateManager tableStateManager = master.getTableStateManager();
ServerManager serverManager = master.getServerManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
package org.apache.hadoop.hbase.master;

import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.net.InetAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClockOutOfSyncException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
Expand All @@ -44,11 +48,28 @@ public class TestClockSkewDetection {

private static final Logger LOG = LoggerFactory.getLogger(TestClockSkewDetection.class);

private static final class DummyMasterServices extends MockNoopMasterServices {

private final AssignmentManager am;

public DummyMasterServices(Configuration conf) {
super(conf);
am = mock(AssignmentManager.class);
RegionStates rss = mock(RegionStates.class);
when(am.getRegionStates()).thenReturn(rss);
}

@Override
public AssignmentManager getAssignmentManager() {
return am;
}
}

@Test
public void testClockSkewDetection() throws Exception {
final Configuration conf = HBaseConfiguration.create();
ServerManager sm =
new ServerManager(new MockNoopMasterServices(conf), new DummyRegionServerList());
new ServerManager(new DummyMasterServices(conf), new DummyRegionServerList());

LOG.debug("regionServerStartup 1");
InetAddress ia1 = InetAddress.getLocalHost();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hbase.master;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
Expand All @@ -33,6 +33,7 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartTestingClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
Expand Down Expand Up @@ -105,36 +106,44 @@ public void test() throws Exception {
UTIL.waitFor(60000, () -> UTIL.getHBaseCluster().getMaster().isInitialized());
LOG.info("Started cluster master, waiting for {}", SERVER_FOR_TEST);
UTIL.waitFor(60000, () -> getServerStateNode(SERVER_FOR_TEST) != null);
serverNode = getServerStateNode(SERVER_FOR_TEST);
assertFalse("serverNode should not be ONLINE during SCP processing",
serverNode.isInState(ServerState.ONLINE));
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {

@Override
public boolean evaluate() throws Exception {
return !getServerStateNode(SERVER_FOR_TEST).isInState(ServerState.ONLINE);
}

@Override
public String explainFailure() throws Exception {
return "serverNode should not be ONLINE during SCP processing";
}
});
Optional<Procedure<?>> procedure = UTIL.getHBaseCluster().getMaster().getProcedures().stream()
.filter(p -> (p instanceof ServerCrashProcedure)
&& ((ServerCrashProcedure) p).getServerName().equals(SERVER_FOR_TEST))
.findAny();
assertTrue("Should have one SCP for " + SERVER_FOR_TEST, procedure.isPresent());
assertTrue("Submit the SCP for the same serverName " + SERVER_FOR_TEST + " which should fail",
UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST)
== Procedure.NO_PROC_ID);
assertEquals("Submit the SCP for the same serverName " + SERVER_FOR_TEST + " which should fail",
Procedure.NO_PROC_ID,
UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST));

// Wait the SCP to finish
LOG.info("Waiting on latch");
SCP_LATCH.countDown();
UTIL.waitFor(60000, () -> procedure.get().isFinished());
assertNull("serverNode should be deleted after SCP finished",
getServerStateNode(SERVER_FOR_TEST));

assertFalse(
assertEquals(
"Even when the SCP is finished, the duplicate SCP should not be scheduled for "
+ SERVER_FOR_TEST,
UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST)
== Procedure.NO_PROC_ID);
serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
.getServerNode(SERVER_FOR_TEST);
assertNull("serverNode should be deleted after SCP finished", serverNode);
Procedure.NO_PROC_ID,
UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST));

MetricsMasterSource masterSource =
UTIL.getHBaseCluster().getMaster().getMasterMetrics().getMetricsSource();
metricsHelper.assertCounter(MetricsMasterSource.SERVER_CRASH_METRIC_PREFIX + "SubmittedCount",
4, masterSource);
3, masterSource);
}

private void setupCluster() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public static void restartMasterProcedureExecutor(ProcedureExecutor<MasterProced
new Callable<Void>() {
@Override
public Void call() throws Exception {
master.setServiceStarted(false);
AssignmentManager am = env.getAssignmentManager();
// try to simulate a master restart by removing the ServerManager states about seqIDs
for (RegionState regionState : am.getRegionStates().getRegionStates()) {
Expand All @@ -109,6 +110,10 @@ public Void call() throws Exception {
am.setupRIT(procExec.getActiveProceduresNoCopy().stream().filter(p -> !p.isSuccess())
.filter(p -> p instanceof TransitRegionStateProcedure)
.map(p -> (TransitRegionStateProcedure) p).collect(Collectors.toList()));
// create server state node, to simulate master start up
env.getMasterServices().getServerManager().getOnlineServersList()
.forEach(am.getRegionStates()::createServer);
master.setServiceStarted(true);
return null;
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void setUp() throws Exception {
master.start(2, rsDispatcher);
am = master.getAssignmentManager();
master.getServerManager().getOnlineServersList().stream()
.forEach(serverName -> am.getRegionStates().getOrCreateServer(serverName));
.forEach(serverName -> am.getRegionStates().createServer(serverName));
}

@After
Expand Down