Skip to content

Commit 55a6256

Browse files
committed
HBASE-27216 Revisit the ReplicationSyncUp tool
1 parent 069d1ca commit 55a6256

File tree

11 files changed

+579
-52
lines changed

11 files changed

+579
-52
lines changed

hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -717,6 +717,7 @@ message ModifyColumnFamilyStoreFileTrackerStateData {
717717
enum AssignReplicationQueuesState {
718718
ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 1;
719719
ASSIGN_REPLICATION_QUEUES_CLAIM = 2;
720+
ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES = 3;
720721
}
721722

722723
message AssignReplicationQueuesStateData {

hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.replication;
1919

2020
import java.io.IOException;
21+
import java.lang.reflect.Constructor;
2122
import org.apache.hadoop.conf.Configuration;
2223
import org.apache.hadoop.hbase.Coprocessor;
2324
import org.apache.hadoop.hbase.NamespaceDescriptor;
@@ -27,20 +28,27 @@
2728
import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
2829
import org.apache.hadoop.hbase.client.TableDescriptor;
2930
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
31+
import org.apache.hadoop.hbase.util.ReflectionUtils;
3032
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
3133
import org.apache.yetus.audience.InterfaceAudience;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
3236

3337
/**
3438
* Used to create replication storage(peer, queue) classes.
3539
*/
3640
@InterfaceAudience.Private
3741
public final class ReplicationStorageFactory {
3842

43+
private static final Logger LOG = LoggerFactory.getLogger(ReplicationStorageFactory.class);
44+
3945
public static final String REPLICATION_QUEUE_TABLE_NAME = "hbase.replication.queue.table.name";
4046

4147
public static final TableName REPLICATION_QUEUE_TABLE_NAME_DEFAULT =
4248
TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication");
4349

50+
public static final String REPLICATION_QUEUE_IMPL = "hbase.replication.queue.impl";
51+
4452
public static TableDescriptor createReplicationQueueTableDescriptor(TableName tableName)
4553
throws IOException {
4654
return TableDescriptorBuilder.newBuilder(tableName)
@@ -72,15 +80,26 @@ public static ReplicationPeerStorage getReplicationPeerStorage(ZKWatcher zk, Con
7280
*/
7381
public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn,
7482
Configuration conf) {
75-
return getReplicationQueueStorage(conn, TableName.valueOf(conf.get(REPLICATION_QUEUE_TABLE_NAME,
76-
REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())));
83+
return getReplicationQueueStorage(conn, conf, TableName.valueOf(conf
84+
.get(REPLICATION_QUEUE_TABLE_NAME, REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())));
7785
}
7886

7987
/**
8088
* Create a new {@link ReplicationQueueStorage}.
8189
*/
8290
public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn,
83-
TableName tableName) {
84-
return new TableReplicationQueueStorage(conn, tableName);
91+
Configuration conf, TableName tableName) {
92+
Class<? extends ReplicationQueueStorage> clazz = conf.getClass(REPLICATION_QUEUE_IMPL,
93+
TableReplicationQueueStorage.class, ReplicationQueueStorage.class);
94+
try {
95+
Constructor<? extends ReplicationQueueStorage> c =
96+
clazz.getConstructor(Connection.class, TableName.class);
97+
return c.newInstance(conn, tableName);
98+
} catch (Exception e) {
99+
LOG.debug(
100+
"failed to create ReplicationQueueStorage with Connection, try creating with Configuration",
101+
e);
102+
return ReflectionUtils.newInstance(clazz, conf, tableName);
103+
}
85104
}
86105
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import java.util.List;
2525
import java.util.Set;
2626
import java.util.stream.Collectors;
27+
import org.apache.hadoop.fs.Path;
2728
import org.apache.hadoop.hbase.ServerName;
29+
import org.apache.hadoop.hbase.master.MasterFileSystem;
2830
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
2931
import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
3032
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
@@ -37,6 +39,7 @@
3739
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
3840
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
3941
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
42+
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
4043
import org.apache.hadoop.hbase.util.RetryCounter;
4144
import org.apache.yetus.audience.InterfaceAudience;
4245
import org.slf4j.Logger;
@@ -102,7 +105,7 @@ private void addMissingQueues(MasterProcedureEnv env) throws ReplicationExceptio
102105
}
103106
}
104107

105-
private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException {
108+
private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException, IOException {
106109
Set<String> existingPeerIds = env.getReplicationPeerManager().listPeers(null).stream()
107110
.map(ReplicationPeerDescription::getPeerId).collect(Collectors.toSet());
108111
ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
@@ -130,18 +133,51 @@ private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException {
130133
return Flow.HAS_MORE_STATE;
131134
}
132135

136+
// check whether ReplicationSyncUp has already done the work for us, if so, we should skip
137+
// claiming the replication queues and deleting them instead.
138+
private boolean shouldSkip(MasterProcedureEnv env) throws IOException {
139+
MasterFileSystem mfs = env.getMasterFileSystem();
140+
Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR);
141+
return mfs.getFileSystem().exists(new Path(syncUpDir, crashedServer.getServerName()));
142+
}
143+
144+
private void removeQueues(MasterProcedureEnv env) throws ReplicationException, IOException {
145+
ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
146+
for (ReplicationQueueId queueId : storage.listAllQueueIds(crashedServer)) {
147+
storage.removeQueue(queueId);
148+
}
149+
MasterFileSystem mfs = env.getMasterFileSystem();
150+
Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR);
151+
// remove the region server record file
152+
mfs.getFileSystem().delete(new Path(syncUpDir, crashedServer.getServerName()), false);
153+
}
154+
133155
@Override
134156
protected Flow executeFromState(MasterProcedureEnv env, AssignReplicationQueuesState state)
135157
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
136158
try {
137159
switch (state) {
138160
case ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES:
139-
addMissingQueues(env);
140-
retryCounter = null;
141-
setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_CLAIM);
142-
return Flow.HAS_MORE_STATE;
161+
if (shouldSkip(env)) {
162+
setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES);
163+
return Flow.HAS_MORE_STATE;
164+
} else {
165+
addMissingQueues(env);
166+
retryCounter = null;
167+
setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_CLAIM);
168+
return Flow.HAS_MORE_STATE;
169+
}
143170
case ASSIGN_REPLICATION_QUEUES_CLAIM:
144-
return claimQueues(env);
171+
if (shouldSkip(env)) {
172+
retryCounter = null;
173+
setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES);
174+
return Flow.HAS_MORE_STATE;
175+
} else {
176+
return claimQueues(env);
177+
}
178+
case ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES:
179+
removeQueues(env);
180+
return Flow.NO_MORE_STATE;
145181
default:
146182
throw new UnsupportedOperationException("unhandled state=" + state);
147183
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,22 @@
1919

2020
import java.io.IOException;
2121
import java.util.Optional;
22+
import org.apache.hadoop.fs.Path;
2223
import org.apache.hadoop.hbase.ServerName;
24+
import org.apache.hadoop.hbase.master.MasterFileSystem;
2325
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
2426
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
2527
import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
2628
import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure;
29+
import org.apache.hadoop.hbase.procedure2.Procedure;
2730
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
31+
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
32+
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
2833
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
2934
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
3035
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
3136
import org.apache.hadoop.hbase.replication.regionserver.ClaimReplicationQueueCallable;
37+
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
3238
import org.apache.yetus.audience.InterfaceAudience;
3339
import org.slf4j.Logger;
3440
import org.slf4j.LoggerFactory;
@@ -54,6 +60,32 @@ public ClaimReplicationQueueRemoteProcedure(ReplicationQueueId queueId, ServerNa
5460
this.targetServer = targetServer;
5561
}
5662

63+
// check whether ReplicationSyncUp has already done the work for us, if so, we should skip
64+
// claiming the replication queues and deleting them instead.
65+
private boolean shouldSkip(MasterProcedureEnv env) throws IOException {
66+
MasterFileSystem mfs = env.getMasterFileSystem();
67+
Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR);
68+
return mfs.getFileSystem().exists(new Path(syncUpDir, getServerName().getServerName()));
69+
}
70+
71+
@Override
72+
protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
73+
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
74+
try {
75+
if (shouldSkip(env)) {
76+
LOG.info("Skip claiming {} because replication sync up has already done it for us",
77+
getServerName());
78+
return null;
79+
}
80+
} catch (IOException e) {
81+
LOG.warn("failed to check whether we should skip claiming {} due to replication sync up",
82+
getServerName(), e);
83+
// just finish the procedure here, as the AssignReplicationQueuesProcedure will reschedule
84+
return null;
85+
}
86+
return super.execute(env);
87+
}
88+
5789
@Override
5890
public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
5991
assert targetServer.equals(remote);

0 commit comments

Comments
 (0)