Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.atomic.AtomicReference;import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import javax.management.MalformedObjectNameException;
import javax.servlet.http.HttpServlet;
Expand Down Expand Up @@ -349,7 +349,10 @@ public class HRegionServer extends HasThread implements
protected final int numRegionsToReport;

// Stub to do region server status calls against the master.
private volatile RegionServerStatusService.BlockingInterface rssStub;
private final Object rssStubLock = new Object();
private RegionServerStatusService.BlockingInterface rssStub;
private boolean resetRssStub;

private volatile LockService.BlockingInterface lockStub;
// RPC client. Used to make the stub above that does region server status checking.
RpcClient rpcClient;
Expand Down Expand Up @@ -1140,9 +1143,10 @@ public void run() {
shutdownWAL(!abortRequested);
}

// Make sure the proxy is down.
if (this.rssStub != null) {
// Make sure the proxy is down
synchronized (rssStubLock) {
this.rssStub = null;
this.resetRssStub = false;
}
if (this.lockStub != null) {
this.lockStub = null;
Expand Down Expand Up @@ -1212,7 +1216,7 @@ private long getWriteRequestCount() {
@VisibleForTesting
protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
throws IOException {
RegionServerStatusService.BlockingInterface rss = rssStub;
RegionServerStatusService.BlockingInterface rss = ensureRssStub();
if (rss == null) {
// the current server could be stopping.
return;
Expand All @@ -1229,12 +1233,10 @@ protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
// This will be caught and handled as a fatal error in run()
throw ioe;
}
if (rssStub == rss) {
rssStub = null;
}
resetRssStubOnServiceException(rss);
// Couldn't connect to the master, get location from zk and reconnect
// Method blocks until new master is found or we are stopped
createRegionServerStatusStub(true);
ensureRssStub();
}
}

Expand All @@ -1245,7 +1247,7 @@ protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
* @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
*/
public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) {
RegionServerStatusService.BlockingInterface rss = rssStub;
RegionServerStatusService.BlockingInterface rss = ensureRssStub();
if (rss == null) {
// the current server could be stopping.
LOG.trace("Skipping Region size report to HMaster as stub is null");
Expand All @@ -1261,10 +1263,8 @@ public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) {
// The Master is coming up. Will retry the report later. Avoid re-creating the stub.
return true;
}
if (rssStub == rss) {
rssStub = null;
}
createRegionServerStatusStub(true);
resetRssStubOnServiceException(rss);
ensureRssStub();
if (ioe instanceof DoNotRetryIOException) {
DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe;
if (doNotRetryEx.getCause() != null) {
Expand Down Expand Up @@ -2308,12 +2308,11 @@ public boolean reportRegionStateTransition(final RegionStateTransitionContext co
// Only go down if clusterConnection is null. It is set to null almost as last thing as the
// HRegionServer does down.
while (this.clusterConnection != null && !this.clusterConnection.isClosed()) {
RegionServerStatusService.BlockingInterface rss = rssStub;
RegionServerStatusService.BlockingInterface rss = ensureRssStub(Integer.MAX_VALUE);
if (rss == null) {
break; // That means cluster connection is closed.
}
try {
if (rss == null) {
createRegionServerStatusStub();
continue;
}
ReportRegionStateTransitionResponse response =
rss.reportRegionStateTransition(null, request);
if (response.hasErrorMessage()) {
Expand Down Expand Up @@ -2345,9 +2344,7 @@ public boolean reportRegionStateTransition(final RegionStateTransitionContext co
ioe);
if (pause) Threads.sleep(pauseTime);
tries++;
if (rssStub == rss) {
rssStub = null;
}
resetRssStubOnServiceException(rss);
}
}
return false;
Expand Down Expand Up @@ -2425,7 +2422,7 @@ public void abort(String reason, Throwable cause) {
msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
}
// Report to the master but only if we have already registered with the master.
RegionServerStatusService.BlockingInterface rss = rssStub;
RegionServerStatusService.BlockingInterface rss = ensureRssStub();
if (rss != null && this.serverName != null) {
ReportRSFatalErrorRequest.Builder builder =
ReportRSFatalErrorRequest.newBuilder();
Expand Down Expand Up @@ -2563,9 +2560,9 @@ public ReplicationSinkService getReplicationSinkService() {
* @return master + port, or null if server has been stopped
*/
@VisibleForTesting
protected synchronized ServerName createRegionServerStatusStub() {
protected ServerName createRegionServerStatusStub() {
// Create RS stub without refreshing the master node from ZK, use cached data
return createRegionServerStatusStub(false);
return createRegionServerStatusStub(true);
}

/**
Expand Down Expand Up @@ -2663,9 +2660,11 @@ private boolean keepLooping() {
*/
private RegionServerStartupResponse reportForDuty() throws IOException {
if (this.masterless) return RegionServerStartupResponse.getDefaultInstance();
ServerName masterServerName = createRegionServerStatusStub(true);
RegionServerStatusService.BlockingInterface rss = rssStub;
if (masterServerName == null || rss == null) return null;
RegionServerStatusService.BlockingInterface rss = ensureRssStub();
ServerName masterServerName = rss == null ? null : masterAddressTracker.getMasterAddress();
if (masterServerName == null) {
return null;
}
RegionServerStartupResponse result = null;
try {
rpcServices.requestCount.reset();
Expand Down Expand Up @@ -2697,7 +2696,7 @@ private RegionServerStartupResponse reportForDuty() throws IOException {
} else {
LOG.warn("error telling master we are up", se);
}
rssStub = null;
resetRssStubOnServiceException(rss);
}
return result;
}
Expand All @@ -2707,16 +2706,11 @@ public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
try {
GetLastFlushedSequenceIdRequest req =
RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
RegionServerStatusService.BlockingInterface rss = rssStub;
if (rss == null) { // Try to connect one more time
createRegionServerStatusStub();
rss = rssStub;
if (rss == null) {
// Still no luck, we tried
LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
.build();
}
RegionServerStatusService.BlockingInterface rss = ensureRssStub(1);
if (rss == null) {
LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
.build();
}
GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
return RegionStoreSequenceIds.newBuilder()
Expand Down Expand Up @@ -3813,12 +3807,7 @@ public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
@Override
public boolean reportFileArchivalForQuotas(TableName tableName,
Collection<Entry<String, Long>> archivedFiles) {
RegionServerStatusService.BlockingInterface rss = rssStub;
if (rss == null || rsSpaceQuotaManager == null) {
// the current server could be stopping.
LOG.trace("Skipping file archival reporting to HMaster as stub is null");
return false;
}
RegionServerStatusService.BlockingInterface rss = ensureRssStub();
try {
RegionServerStatusProtos.FileArchiveNotificationRequest request =
rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles);
Expand All @@ -3833,11 +3822,9 @@ public boolean reportFileArchivalForQuotas(TableName tableName,
// The Master is coming up. Will retry the report later. Avoid re-creating the stub.
return false;
}
if (rssStub == rss) {
rssStub = null;
}
resetRssStubOnServiceException(rss);
// re-create the stub if we failed to report the archival
createRegionServerStatusStub(true);
ensureRssStub();
LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe);
return false;
}
Expand All @@ -3864,21 +3851,43 @@ public void remoteProcedureComplete(long procId, Throwable error) {
}

void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException {
RegionServerStatusService.BlockingInterface rss = rssStub;
for (;;) {
rss = rssStub;
if (rss != null) {
break;
}
createRegionServerStatusStub();
RegionServerStatusService.BlockingInterface rss = ensureRssStub(Integer.MAX_VALUE);
if (rss == null) {
throw new IOException("Cannot connect to master to report; cluster shutting down?");
}
try {
rss.reportProcedureDone(null, request);
} catch (ServiceException se) {
resetRssStubOnServiceException(rss);
throw ProtobufUtil.getRemoteException(se);
}
}

private RegionServerStatusService.BlockingInterface ensureRssStub() {
return ensureRssStub(0);
}

private RegionServerStatusService.BlockingInterface ensureRssStub(int maxRetries) {
RegionServerStatusService.BlockingInterface rss = null;
while (rss == null && maxRetries-- >= 0
&& (clusterConnection != null && !clusterConnection.isClosed())) {
synchronized (rssStubLock) {
if (resetRssStub) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not fully understand what's the usage of this resetRssStub flag... We do not test it in the condition of the while loop?

createRegionServerStatusStub();
resetRssStub = (rssStub != null); // Ensure we try again in some other call.
}
rss = rssStub;
}
}
return rss;
}

private void resetRssStubOnServiceException(RegionServerStatusService.BlockingInterface rss) {
synchronized (rssStubLock) {
if (rssStub == rss) {
rssStub = null;
resetRssStub = true;
}
throw ProtobufUtil.getRemoteException(se);
}
}

Expand Down