Skip to content

Commit e6d880b

Browse files
Umeshkumar9414ukumawat
andcommitted
HBASE-28420 Update the procedure's field to store for ServerRemoteProcedure (apache#5816)
Co-authored-by: ukumawat <[email protected]> Signed-off-by: Duo Zhang <[email protected]>
1 parent 7869f7f commit e6d880b

File tree

8 files changed

+151
-58
lines changed

8 files changed

+151
-58
lines changed

hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import "RPC.proto";
2929
import "Snapshot.proto";
3030
import "Replication.proto";
3131
import "RegionServerStatus.proto";
32+
import "ErrorHandling.proto";
3233

3334
// ============================================================================
3435
// WARNING - Compatibility rules
@@ -254,6 +255,8 @@ message SnapshotVerifyProcedureStateData {
254255
required SnapshotDescription snapshot = 1;
255256
required RegionInfo region = 2;
256257
optional ServerName target_server = 3;
258+
optional ServerRemoteProcedureState state = 4;
259+
optional ForeignExceptionMessage error = 5;
257260
}
258261

259262
message SnapshotVerifyParameter {
@@ -503,6 +506,8 @@ message RefreshPeerStateData {
503506
required string peer_id = 1;
504507
required PeerModificationType type = 2;
505508
required ServerName target_server = 3;
509+
optional ServerRemoteProcedureState state = 5;
510+
optional ForeignExceptionMessage error = 6;
506511
}
507512

508513
message RefreshPeerParameter {
@@ -586,6 +591,14 @@ enum RegionRemoteProcedureBaseState {
586591
REGION_REMOTE_PROCEDURE_SERVER_CRASH = 4;
587592
}
588593

594+
enum ServerRemoteProcedureState {
595+
SERVER_REMOTE_PROCEDURE_DISPATCH = 1;
596+
SERVER_REMOTE_PROCEDURE_DISPATCH_FAIL = 2;
597+
SERVER_REMOTE_PROCEDURE_REPORT_SUCCEED = 3;
598+
SERVER_REMOTE_PROCEDURE_REPORT_FAILED = 4;
599+
SERVER_REMOTE_PROCEDURE_SERVER_CRASH = 5;
600+
}
601+
589602
message RegionRemoteProcedureBaseStateData {
590603
required RegionInfo region = 1;
591604
required ServerName target_server = 2;
@@ -617,6 +630,8 @@ message SwitchRpcThrottleStateData {
617630
message SwitchRpcThrottleRemoteStateData {
618631
required ServerName target_server = 1;
619632
required bool rpc_throttle_enabled = 2;
633+
optional ServerRemoteProcedureState state = 3;
634+
optional ForeignExceptionMessage error = 4;
620635
}
621636

622637
message SplitWALParameter {
@@ -634,6 +649,8 @@ message SplitWALRemoteData {
634649
required string wal_path = 1;
635650
required ServerName crashed_server = 2;
636651
required ServerName worker = 3;
652+
optional ServerRemoteProcedureState state = 4;
653+
optional ForeignExceptionMessage error = 5;
637654
}
638655

639656
enum SplitWALState {
@@ -650,6 +667,8 @@ message ClaimReplicationQueueRemoteStateData {
650667
required ServerName crashed_server = 1;
651668
required string queue = 2;
652669
required ServerName target_server = 3;
670+
optional ServerRemoteProcedureState state = 5;
671+
optional ForeignExceptionMessage error = 6;
653672
}
654673

655674
message ClaimReplicationQueueRemoteParameter {

hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
3232

33+
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
34+
3335
@InterfaceAudience.Private
3436
/**
3537
* The base class for Procedures that run {@link java.util.concurrent.Callable}s on a (remote)
@@ -63,34 +65,38 @@
6365
* <p>
6466
* If sending the operation to remote RS failed, dispatcher will call remoteCallFailed() to handle
6567
* this which calls remoteOperationDone with the exception. If the targetServer crashed but this
66-
* procedure has no response, than dispatcher will call remoteOperationFailed() which also calls
67-
* remoteOperationDone with the exception. If the operation is successful, then
68-
* remoteOperationCompleted will be called and actually calls the remoteOperationDone without
69-
* exception. In remoteOperationDone, we'll check if the procedure is already get wake up by others.
70-
* Then developer could implement complete() based on their own purpose. But basic logic is that if
71-
* operation succeed, set succ to true and do the clean work. If operation failed and require to
72-
* resend it to the same server, leave the succ as false. If operation failed and require to resend
73-
* it to another server, set succ to true and upper layer should be able to find out this operation
74-
* not work and send a operation to another server.
68+
* procedure has no response or if we receive failed response, then dispatcher will call
69+
* remoteOperationFailed() which also calls remoteOperationDone with the exception. If the operation
70+
* is successful, then remoteOperationCompleted will be called and actually calls the
71+
* remoteOperationDone without exception. In remoteOperationDone, we'll check if the procedure is
72+
* already get wake up by others. Then developer could implement complete() based on their own
73+
* purpose. But basic logic is that if operation succeed, set succ to true and do the clean work. If
74+
* operation failed and require to resend it to the same server, leave the succ as false. If
75+
* operation failed and require to resend it to another server, set succ to true and upper layer
76+
* should be able to find out this operation not work and send a operation to another server.
7577
*/
7678
public abstract class ServerRemoteProcedure extends Procedure<MasterProcedureEnv>
7779
implements RemoteProcedureDispatcher.RemoteProcedure<MasterProcedureEnv, ServerName> {
7880
protected static final Logger LOG = LoggerFactory.getLogger(ServerRemoteProcedure.class);
7981
protected ProcedureEvent<?> event;
8082
protected ServerName targetServer;
81-
protected boolean dispatched;
82-
protected boolean succ;
83+
// after remoteProcedureDone we require error field to decide the next state
84+
protected Throwable remoteError;
85+
protected MasterProcedureProtos.ServerRemoteProcedureState state =
86+
MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH;
8387

84-
protected abstract void complete(MasterProcedureEnv env, Throwable error);
88+
protected abstract boolean complete(MasterProcedureEnv env, Throwable error);
8589

8690
@Override
8791
protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
8892
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
89-
if (dispatched) {
90-
if (succ) {
93+
if (
94+
state != MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH
95+
) {
96+
if (complete(env, this.remoteError)) {
9197
return null;
9298
}
93-
dispatched = false;
99+
state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH;
94100
}
95101
try {
96102
env.getRemoteDispatcher().addOperationToNode(targetServer, this);
@@ -99,7 +105,6 @@ protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEn
99105
+ "be retried to send to another server", this.getProcId(), targetServer);
100106
return null;
101107
}
102-
dispatched = true;
103108
event = new ProcedureEvent<>(this);
104109
event.suspendIfNotReady(this);
105110
throw new ProcedureSuspendedException();
@@ -113,17 +118,20 @@ protected synchronized void completionCleanup(MasterProcedureEnv env) {
113118
@Override
114119
public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName serverName,
115120
IOException exception) {
121+
state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH_FAIL;
116122
remoteOperationDone(env, exception);
117123
}
118124

119125
@Override
120126
public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
127+
state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_REPORT_SUCCEED;
121128
remoteOperationDone(env, null);
122129
}
123130

124131
@Override
125132
public synchronized void remoteOperationFailed(MasterProcedureEnv env,
126133
RemoteProcedureException error) {
134+
state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_REPORT_FAILED;
127135
remoteOperationDone(env, error);
128136
}
129137

@@ -137,7 +145,9 @@ synchronized void remoteOperationDone(MasterProcedureEnv env, Throwable error) {
137145
getProcId());
138146
return;
139147
}
140-
complete(env, error);
148+
this.remoteError = error;
149+
// below persistence is added so that if report goes to last active master, it throws exception
150+
env.getMasterServices().getMasterProcedureExecutor().getStore().update(this);
141151
event.wake(env.getProcedureScheduler());
142152
event = null;
143153
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,15 @@
3232
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
3333
import org.apache.hadoop.hbase.regionserver.SnapshotVerifyCallable;
3434
import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
35+
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
3536
import org.apache.hadoop.hbase.util.RetryCounter;
3637
import org.apache.yetus.audience.InterfaceAudience;
3738
import org.slf4j.Logger;
3839
import org.slf4j.LoggerFactory;
3940

4041
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
42+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos;
43+
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
4144
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyParameter;
4245
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyProcedureStateData;
4346
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
@@ -75,31 +78,28 @@ protected boolean abort(MasterProcedureEnv env) {
7578
}
7679

7780
@Override
78-
protected synchronized void complete(MasterProcedureEnv env, Throwable error) {
81+
protected synchronized boolean complete(MasterProcedureEnv env, Throwable error) {
82+
boolean isProcedureCompleted = false;
7983
try {
8084
if (error != null) {
8185
if (error instanceof RemoteProcedureException) {
8286
// remote operation failed
8387
Throwable remoteEx = unwrapRemoteProcedureException((RemoteProcedureException) error);
8488
if (remoteEx instanceof CorruptedSnapshotException) {
8589
// snapshot is corrupted, will touch a flag file and finish the procedure
86-
succ = true;
90+
isProcedureCompleted = true;
8791
SnapshotProcedure parent = env.getMasterServices().getMasterProcedureExecutor()
8892
.getProcedure(SnapshotProcedure.class, getParentProcId());
8993
if (parent != null) {
9094
parent.markSnapshotCorrupted();
9195
}
92-
} else {
93-
// unexpected exception in remote server, will retry on other servers
94-
succ = false;
95-
}
96-
} else {
97-
// the mostly like thing is that remote call failed, will retry on other servers
98-
succ = false;
99-
}
96+
} // else unexpected exception in remote server, will retry on other servers,
97+
// procedureCompleted will stay false
98+
} // else the mostly like thing is that remote call failed, will retry on other servers,
99+
// procedureCompleted will stay false
100100
} else {
101101
// remote operation finished without error
102-
succ = true;
102+
isProcedureCompleted = true;
103103
}
104104
} catch (IOException e) {
105105
// if we can't create the flag file, then mark the current procedure as FAILED
@@ -112,6 +112,7 @@ protected synchronized void complete(MasterProcedureEnv env, Throwable error) {
112112
env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, targetServer,
113113
env.getProcedureScheduler());
114114
}
115+
return isProcedureCompleted;
115116
}
116117

117118
// we will wrap remote exception into a RemoteProcedureException,
@@ -126,22 +127,29 @@ protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEn
126127
try {
127128
// if we've already known the snapshot is corrupted, then stop scheduling
128129
// the new procedures and the undispatched procedures
129-
if (!dispatched) {
130+
if (
131+
state == MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH
132+
) {
130133
SnapshotProcedure parent = env.getMasterServices().getMasterProcedureExecutor()
131134
.getProcedure(SnapshotProcedure.class, getParentProcId());
132135
if (parent != null && parent.isSnapshotCorrupted()) {
133136
return null;
134137
}
135138
}
136139
// acquire a worker
137-
if (!dispatched && targetServer == null) {
140+
if (
141+
state == MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH
142+
&& targetServer == null
143+
) {
138144
targetServer =
139145
env.getMasterServices().getSnapshotManager().acquireSnapshotVerifyWorker(this);
140146
}
141147
// send remote request
142148
Procedure<MasterProcedureEnv>[] res = super.execute(env);
143149
// retry if necessary
144-
if (!dispatched) {
150+
if (
151+
state == MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH
152+
) {
145153
// the mostly like thing is that a FailedRemoteDispatchException is thrown.
146154
// we need to retry on another remote server
147155
targetServer = null;
@@ -177,10 +185,15 @@ protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
177185
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
178186
SnapshotVerifyProcedureStateData.Builder builder =
179187
SnapshotVerifyProcedureStateData.newBuilder();
180-
builder.setSnapshot(snapshot).setRegion(ProtobufUtil.toRegionInfo(region));
188+
builder.setSnapshot(snapshot).setRegion(ProtobufUtil.toRegionInfo(region)).setState(state);
181189
if (targetServer != null) {
182190
builder.setTargetServer(ProtobufUtil.toServerName(targetServer));
183191
}
192+
if (this.remoteError != null) {
193+
ErrorHandlingProtos.ForeignExceptionMessage fem =
194+
ForeignExceptionUtil.toProtoForeignException(remoteError);
195+
builder.setError(fem);
196+
}
184197
serializer.serialize(builder.build());
185198
}
186199

@@ -190,9 +203,13 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
190203
serializer.deserialize(SnapshotVerifyProcedureStateData.class);
191204
this.snapshot = data.getSnapshot();
192205
this.region = ProtobufUtil.toRegionInfo(data.getRegion());
206+
this.state = data.getState();
193207
if (data.hasTargetServer()) {
194208
this.targetServer = ProtobufUtil.toServerName(data.getTargetServer());
195209
}
210+
if (data.hasError()) {
211+
this.remoteError = ForeignExceptionUtil.toException(data.getError());
212+
}
196213
}
197214

198215
@Override

hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@
2525
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
2626
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
2727
import org.apache.hadoop.hbase.regionserver.SplitWALCallable;
28+
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
2829
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
2930
import org.apache.yetus.audience.InterfaceAudience;
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
3233

3334
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
35+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos;
3436
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
3537

3638
/**
@@ -70,7 +72,12 @@ protected void serializeStateData(ProcedureStateSerializer serializer) throws IO
7072
MasterProcedureProtos.SplitWALRemoteData.Builder builder =
7173
MasterProcedureProtos.SplitWALRemoteData.newBuilder();
7274
builder.setWalPath(walPath).setWorker(ProtobufUtil.toServerName(targetServer))
73-
.setCrashedServer(ProtobufUtil.toServerName(crashedServer));
75+
.setCrashedServer(ProtobufUtil.toServerName(crashedServer)).setState(state);
76+
if (this.remoteError != null) {
77+
ErrorHandlingProtos.ForeignExceptionMessage fem =
78+
ForeignExceptionUtil.toProtoForeignException(remoteError);
79+
builder.setError(fem);
80+
}
7481
serializer.serialize(builder.build());
7582
}
7683

@@ -81,6 +88,10 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
8188
walPath = data.getWalPath();
8289
targetServer = ProtobufUtil.toServerName(data.getWorker());
8390
crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
91+
state = data.getState();
92+
if (data.hasError()) {
93+
this.remoteError = ForeignExceptionUtil.toException(data.getError());
94+
}
8495
}
8596

8697
@Override
@@ -92,21 +103,21 @@ public Optional<RemoteProcedureDispatcher.RemoteOperation> remoteCallBuild(Maste
92103
}
93104

94105
@Override
95-
protected void complete(MasterProcedureEnv env, Throwable error) {
106+
protected boolean complete(MasterProcedureEnv env, Throwable error) {
96107
if (error == null) {
97108
try {
98109
env.getMasterServices().getSplitWALManager().archive(walPath);
99110
} catch (IOException e) {
100111
LOG.warn("Failed split of {}; ignore...", walPath, e);
101112
}
102-
succ = true;
113+
return true;
103114
} else {
104115
if (error instanceof DoNotRetryIOException) {
105116
LOG.warn("Sent {} to wrong server {}, try another", walPath, targetServer, error);
106-
succ = true;
117+
return true;
107118
} else {
108119
LOG.warn("Failed split of {}, retry...", walPath, error);
109-
succ = false;
120+
return false;
110121
}
111122
}
112123
}

0 commit comments

Comments
 (0)