Skip to content

Commit d435f7f

Browse files
committed
HBASE-22404 Open/Close region request may be executed twice when master restart
1 parent 0b8493f commit d435f7f

File tree

4 files changed

+92
-22
lines changed

4 files changed

+92
-22
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.concurrent.ConcurrentHashMap;
5252
import java.util.concurrent.ConcurrentMap;
5353
import java.util.concurrent.ConcurrentSkipListMap;
54+
import java.util.concurrent.TimeUnit;
5455
import java.util.concurrent.atomic.AtomicBoolean;
5556
import java.util.concurrent.locks.ReentrantReadWriteLock;
5657
import java.util.function.Function;
@@ -191,6 +192,8 @@
191192
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
192193
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
193194
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
195+
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
196+
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
194197
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
195198
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
196199
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
@@ -255,6 +258,16 @@ public class HRegionServer extends HasThread implements
255258
protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
256259
new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
257260

261+
/**
262+
* Used to cache the open/close region procedures which already submitted.
263+
*/
264+
private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>();
265+
/**
266+
* Used to cache the open/close region procedures which already executed.
267+
*/
268+
private final Cache<Long, Long> executedRegionProcedures =
269+
CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build();
270+
258271
// Cache flushing
259272
protected MemStoreFlusher cacheFlusher;
260273

@@ -3882,6 +3895,52 @@ void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException
38823895
}
38833896
}
38843897

3898+
/**
3899+
* Will ignore the open/close region procedures which already submitted or executed.
3900+
*
3901+
* When master had unfinished open/close region procedure and restarted, new active master may
3902+
* send duplicate open/close region request to regionserver. The open/close request is submitted
3903+
* to a thread pool and execute. So first need a cache for submitted open/close region procedures.
3904+
*
3905+
* After the open/close region request executed and report region transition succeed, cache it in
3906+
* executed region procedures cache. See {@link #finishRegionProcedure(long)}. And the cache will
3907+
* expire after 600 seconds. Because the duplicate open/close region request should not be delayed
3908+
* more than 600 seconds. And the open/close region procedures finished after report region
3909+
* transition succeed, so there are no more duplicate open/close region request when master
3910+
* restart again.
3911+
*
3912+
* See HBASE-22404 for more details.
3913+
*
3914+
* @param procId the id of the open/close region procedure
3915+
* @return true if the procedure can be submitted.
3916+
*/
3917+
boolean submitRegionProcedure(long procId) {
3918+
if (procId == -1) {
3919+
return true;
3920+
}
3921+
// Ignore the region procedures which already submitted.
3922+
Long previous = submittedRegionProcedures.putIfAbsent(procId, procId);
3923+
if (previous != null) {
3924+
LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId);
3925+
return false;
3926+
}
3927+
// Ignore the region procedures which already executed.
3928+
if (executedRegionProcedures.getIfPresent(procId) != null) {
3929+
LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId);
3930+
return false;
3931+
}
3932+
return true;
3933+
}
3934+
3935+
/**
3936+
* See {@link #submitRegionProcedure(long)}.
3937+
* @param procId the id of the open/close region procedure
3938+
*/
3939+
public void finishRegionProcedure(long procId) {
3940+
executedRegionProcedures.put(procId, procId);
3941+
submittedRegionProcedures.remove(procId);
3942+
}
3943+
38853944
public boolean isShutDown() {
38863945
return shutDown;
38873946
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3721,8 +3721,12 @@ private void executeOpenRegionProcedures(OpenRegionRequest request,
37213721
regionServer.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(),
37223722
regionOpenInfo.getFavoredNodesList());
37233723
}
3724-
regionServer.executorService.submit(AssignRegionHandler.create(regionServer, regionInfo,
3725-
regionOpenInfo.getOpenProcId(), tableDesc, masterSystemTime));
3724+
long procId = regionOpenInfo.getOpenProcId();
3725+
if (regionServer.submitRegionProcedure(procId)) {
3726+
regionServer.executorService.submit(AssignRegionHandler
3727+
.create(regionServer, regionInfo, procId, tableDesc,
3728+
masterSystemTime));
3729+
}
37263730
}
37273731
}
37283732

@@ -3733,11 +3737,14 @@ private void executeCloseRegionProcedures(CloseRegionRequest request) {
37333737
} catch (DoNotRetryIOException e) {
37343738
throw new UncheckedIOException("Should not happen", e);
37353739
}
3736-
ServerName destination =
3737-
request.hasDestinationServer() ? ProtobufUtil.toServerName(request.getDestinationServer())
3738-
: null;
3739-
regionServer.executorService.submit(UnassignRegionHandler.create(regionServer, encodedName,
3740-
request.getCloseProcId(), false, destination));
3740+
ServerName destination = request.hasDestinationServer() ?
3741+
ProtobufUtil.toServerName(request.getDestinationServer()) :
3742+
null;
3743+
long procId = request.getCloseProcId();
3744+
if (regionServer.submitRegionProcedure(procId)) {
3745+
regionServer.executorService.submit(UnassignRegionHandler
3746+
.create(regionServer, encodedName, procId, false, destination));
3747+
}
37413748
}
37423749

37433750
private void executeProcedures(RemoteProcedureRequest request) {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
import org.apache.hadoop.hbase.executor.EventHandler;
2727
import org.apache.hadoop.hbase.executor.EventType;
2828
import org.apache.hadoop.hbase.regionserver.HRegion;
29+
import org.apache.hadoop.hbase.regionserver.HRegionServer;
2930
import org.apache.hadoop.hbase.regionserver.Region;
30-
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
3131
import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext;
3232
import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
3333
import org.apache.hadoop.hbase.util.RetryCounter;
@@ -59,7 +59,7 @@ public class AssignRegionHandler extends EventHandler {
5959

6060
private final RetryCounter retryCounter;
6161

62-
public AssignRegionHandler(RegionServerServices server, RegionInfo regionInfo, long openProcId,
62+
public AssignRegionHandler(HRegionServer server, RegionInfo regionInfo, long openProcId,
6363
@Nullable TableDescriptor tableDesc, long masterSystemTime, EventType eventType) {
6464
super(server, eventType);
6565
this.regionInfo = regionInfo;
@@ -69,14 +69,14 @@ public AssignRegionHandler(RegionServerServices server, RegionInfo regionInfo, l
6969
this.retryCounter = HandlerUtil.getRetryCounter();
7070
}
7171

72-
private RegionServerServices getServer() {
73-
return (RegionServerServices) server;
72+
private HRegionServer getServer() {
73+
return (HRegionServer) server;
7474
}
7575

7676
private void cleanUpAndReportFailure(IOException error) throws IOException {
7777
LOG.warn("Failed to open region {}, will report to master", regionInfo.getRegionNameAsString(),
7878
error);
79-
RegionServerServices rs = getServer();
79+
HRegionServer rs = getServer();
8080
rs.getRegionsInTransitionInRS().remove(regionInfo.getEncodedNameAsBytes(), Boolean.TRUE);
8181
if (!rs.reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.FAILED_OPEN,
8282
HConstants.NO_SEQNUM, openProcId, masterSystemTime, regionInfo))) {
@@ -87,7 +87,7 @@ private void cleanUpAndReportFailure(IOException error) throws IOException {
8787

8888
@Override
8989
public void process() throws IOException {
90-
RegionServerServices rs = getServer();
90+
HRegionServer rs = getServer();
9191
String encodedName = regionInfo.getEncodedName();
9292
byte[] encodedNameBytes = regionInfo.getEncodedNameAsBytes();
9393
String regionName = regionInfo.getRegionNameAsString();
@@ -101,7 +101,6 @@ public void process() throws IOException {
101101
// reportRegionStateTransition any more.
102102
return;
103103
}
104-
LOG.info("Open {}", regionName);
105104
Boolean previous = rs.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE);
106105
if (previous != null) {
107106
if (previous) {
@@ -121,6 +120,7 @@ public void process() throws IOException {
121120
}
122121
return;
123122
}
123+
LOG.info("Open {}", regionName);
124124
HRegion region;
125125
try {
126126
TableDescriptor htd =
@@ -139,6 +139,8 @@ public void process() throws IOException {
139139
rs.postOpenDeployTasks(new PostOpenDeployContext(region, openProcId, masterSystemTime));
140140
rs.addRegion(region);
141141
LOG.info("Opened {}", regionName);
142+
// Cache the open region procedure id after report region transition succeed.
143+
rs.finishRegionProcedure(openProcId);
142144
Boolean current = rs.getRegionsInTransitionInRS().remove(regionInfo.getEncodedNameAsBytes());
143145
if (current == null) {
144146
// Should NEVER happen, but let's be paranoid.
@@ -158,7 +160,7 @@ protected void handleException(Throwable t) {
158160
"Failed to open region " + regionInfo.getRegionNameAsString() + " and can not recover", t);
159161
}
160162

161-
public static AssignRegionHandler create(RegionServerServices server, RegionInfo regionInfo,
163+
public static AssignRegionHandler create(HRegionServer server, RegionInfo regionInfo,
162164
long openProcId, TableDescriptor tableDesc, long masterSystemTime) {
163165
EventType eventType;
164166
if (regionInfo.isMetaRegion()) {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
import org.apache.hadoop.hbase.executor.EventHandler;
2626
import org.apache.hadoop.hbase.executor.EventType;
2727
import org.apache.hadoop.hbase.regionserver.HRegion;
28+
import org.apache.hadoop.hbase.regionserver.HRegionServer;
2829
import org.apache.hadoop.hbase.regionserver.Region;
29-
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
3030
import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
3131
import org.apache.hadoop.hbase.util.Bytes;
3232
import org.apache.hadoop.hbase.util.RetryCounter;
@@ -60,7 +60,7 @@ public class UnassignRegionHandler extends EventHandler {
6060

6161
private final RetryCounter retryCounter;
6262

63-
public UnassignRegionHandler(RegionServerServices server, String encodedName, long closeProcId,
63+
public UnassignRegionHandler(HRegionServer server, String encodedName, long closeProcId,
6464
boolean abort, @Nullable ServerName destination, EventType eventType) {
6565
super(server, eventType);
6666
this.encodedName = encodedName;
@@ -70,13 +70,13 @@ public UnassignRegionHandler(RegionServerServices server, String encodedName, lo
7070
this.retryCounter = HandlerUtil.getRetryCounter();
7171
}
7272

73-
private RegionServerServices getServer() {
74-
return (RegionServerServices) server;
73+
private HRegionServer getServer() {
74+
return (HRegionServer) server;
7575
}
7676

7777
@Override
7878
public void process() throws IOException {
79-
RegionServerServices rs = getServer();
79+
HRegionServer rs = getServer();
8080
byte[] encodedNameBytes = Bytes.toBytes(encodedName);
8181
Boolean previous = rs.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.FALSE);
8282
if (previous != null) {
@@ -94,7 +94,7 @@ public void process() throws IOException {
9494
}
9595
return;
9696
}
97-
HRegion region = (HRegion) rs.getRegion(encodedName);
97+
HRegion region = rs.getRegion(encodedName);
9898
if (region == null) {
9999
LOG.debug(
100100
"Received CLOSE for a region {} which is not online, and we're not opening/closing.",
@@ -124,6 +124,8 @@ public void process() throws IOException {
124124
HConstants.NO_SEQNUM, closeProcId, -1, region.getRegionInfo()))) {
125125
throw new IOException("Failed to report close to master: " + regionName);
126126
}
127+
// Cache the close region procedure id after report region transition succeed.
128+
rs.finishRegionProcedure(closeProcId);
127129
rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE);
128130
LOG.info("Closed {}", regionName);
129131
}
@@ -134,7 +136,7 @@ protected void handleException(Throwable t) {
134136
getServer().abort("Failed to close region " + encodedName + " and can not recover", t);
135137
}
136138

137-
public static UnassignRegionHandler create(RegionServerServices server, String encodedName,
139+
public static UnassignRegionHandler create(HRegionServer server, String encodedName,
138140
long closeProcId, boolean abort, @Nullable ServerName destination) {
139141
// Just try our best to determine whether it is for closing meta. It is not the end of the world
140142
// if we put the handler into a wrong executor.

0 commit comments

Comments
 (0)