Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.rsgroup;

import com.google.protobuf.ServiceException;
Expand All @@ -33,7 +32,6 @@
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName;
Expand All @@ -45,7 +43,6 @@
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
Expand All @@ -59,7 +56,6 @@
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerListener;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.net.Address;
Expand Down Expand Up @@ -136,7 +132,6 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
private Set<String> prevRSGroups = new HashSet<>();
private final ServerEventsListenerThread serverEventsListenerThread =
new ServerEventsListenerThread();
private FailedOpenUpdaterThread failedOpenUpdaterThread;

private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException {
this.masterServices = masterServices;
Expand All @@ -150,9 +145,6 @@ private synchronized void init() throws IOException {
refresh();
serverEventsListenerThread.start();
masterServices.getServerManager().registerListener(serverEventsListenerThread);
failedOpenUpdaterThread = new FailedOpenUpdaterThread(masterServices.getConfiguration());
failedOpenUpdaterThread.start();
masterServices.getServerManager().registerListener(failedOpenUpdaterThread);
}

static RSGroupInfoManager getInstance(MasterServices master) throws IOException {
Expand Down Expand Up @@ -625,26 +617,6 @@ private synchronized void updateDefaultServers(SortedSet<Address> servers) throw
flushConfig(newGroupMap);
}

// Called by FailedOpenUpdaterThread
private void updateFailedAssignments() {
// Kick all regions in FAILED_OPEN state
List<RegionInfo> stuckAssignments = Lists.newArrayList();
for (RegionStateNode state : masterServices.getAssignmentManager().getRegionStates()
.getRegionsInTransition()) {
if (state.isStuck()) {
stuckAssignments.add(state.getRegionInfo());
}
}
for (RegionInfo region : stuckAssignments) {
LOG.info("Retrying assignment of " + region);
try {
masterServices.getAssignmentManager().unassign(region);
} catch (IOException e) {
LOG.warn("Unable to reassign " + region, e);
}
}
}

/**
* Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known
* servers. Notifications about server changes are received by registering {@link ServerListener}.
Expand Down Expand Up @@ -704,66 +676,6 @@ public void run() {
}
}

private class FailedOpenUpdaterThread extends Thread implements ServerListener {
private final long waitInterval;
private volatile boolean hasChanged = false;

public FailedOpenUpdaterThread(Configuration conf) {
this.waitInterval = conf.getLong(REASSIGN_WAIT_INTERVAL_KEY, DEFAULT_REASSIGN_WAIT_INTERVAL);
setDaemon(true);
}

@Override
public void serverAdded(ServerName serverName) {
serverChanged();
}

@Override
public void serverRemoved(ServerName serverName) {
}

@Override
public void run() {
while (isMasterRunning(masterServices)) {
boolean interrupted = false;
try {
synchronized (this) {
while (!hasChanged) {
wait();
}
hasChanged = false;
}
} catch (InterruptedException e) {
LOG.warn("Interrupted", e);
interrupted = true;
}
if (!isMasterRunning(masterServices) || interrupted) {
continue;
}

// First, wait a while in case more servers are about to rejoin the cluster
try {
Thread.sleep(waitInterval);
} catch (InterruptedException e) {
LOG.warn("Interrupted", e);
}
if (!isMasterRunning(masterServices)) {
continue;
}

// Kick all regions in FAILED_OPEN state
updateFailedAssignments();
}
}

public void serverChanged() {
synchronized (this) {
hasChanged = true;
this.notify();
}
}
}

private class RSGroupStartupWorker extends Thread {
private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class);
private volatile boolean online = false;
Expand Down