Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public interface FederationRPCMBean {

long getProxyOps();

long getActiveProxyOps();

long getObserverProxyOps();

double getProxyAvg();

long getProcessingOps();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
Expand Down Expand Up @@ -49,7 +50,10 @@ public class FederationRPCMetrics implements FederationRPCMBean {
private MutableRate proxy;
@Metric("Number of operations the Router proxied to a Namenode")
private MutableCounterLong proxyOp;

@Metric("Number of operations the Router proxied to a Active Namenode")
private MutableCounterLong activeProxyOp;
@Metric("Number of operations the Router proxied to a Observer Namenode")
private MutableCounterLong observerProxyOp;
@Metric("Number of operations to hit a standby NN")
private MutableCounterLong proxyOpFailureStandby;
@Metric("Number of operations to fail to reach NN")
Expand Down Expand Up @@ -256,9 +260,15 @@ public String getAsyncCallerPool() {
* Add the time to proxy an operation from the moment the Router sends it to
* the Namenode until it replied.
* @param time Proxy time of an operation in nanoseconds.
* @param state NameNode state. Maybe null
*/
public void addProxyTime(long time) {
public void addProxyTime(long time, FederationNamenodeServiceState state) {
proxy.add(time);
if(FederationNamenodeServiceState.ACTIVE == state) {
activeProxyOp.incr();
} else if (FederationNamenodeServiceState.OBSERVER == state) {
observerProxyOp.incr();
}
proxyOp.incr();
}

Expand All @@ -272,6 +282,16 @@ public long getProxyOps() {
return proxyOp.value();
}

@Override
public long getActiveProxyOps() {
return activeProxyOp.value();
}

@Override
public long getObserverProxyOps() {
return observerProxyOp.value();
}

/**
* Add the time to process a request in the Router from the time we receive
* the call until we send it to the Namenode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcMonitor;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
Expand Down Expand Up @@ -147,12 +148,13 @@ public long proxyOp() {
}

@Override
public void proxyOpComplete(boolean success, String nsId) {
public void proxyOpComplete(boolean success, String nsId,
FederationNamenodeServiceState state) {
if (success) {
long proxyTime = getProxyTime();
if (proxyTime >= 0) {
if (metrics != null) {
metrics.addProxyTime(proxyTime);
metrics.addProxyTime(proxyTime, state);
}
if (nameserviceRPCMetricsMap != null &&
nameserviceRPCMetricsMap.containsKey(nsId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ private List<MembershipState> getActiveNamenodeRegistrations()
// Fetch the most recent namenode registration
String nsId = nsInfo.getNameserviceId();
List<? extends FederationNamenodeContext> nns =
namenodeResolver.getNamenodesForNameserviceId(nsId);
namenodeResolver.getNamenodesForNameserviceId(nsId, false);
if (nns != null) {
FederationNamenodeContext nn = nns.get(0);
if (nn instanceof MembershipState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@
@InterfaceStability.Evolving
public interface ActiveNamenodeResolver {

/**
* Report a failed, unavailable NN address for a nameservice or blockPool.
*
* @param ns Nameservice identifier.
* @param failedAddress The address the failed responded to the command.
*
* @throws IOException If the state store cannot be accessed.
*/
void updateUnavailableNamenode(
String ns, InetSocketAddress failedAddress) throws IOException;

/**
* Report a successful, active NN address for a nameservice or blockPool.
*
Expand All @@ -56,27 +67,38 @@ void updateActiveNamenode(

/**
* Returns a prioritized list of the most recent cached registration entries
* for a single nameservice ID.
* Returns an empty list if none are found. Returns entries in preference of:
* for a single nameservice ID. Returns an empty list if none are found.
* In the case of not observerRead Returns entries in preference of :
* <ul>
* <li>The most recent ACTIVE NN
* <li>The most recent OBSERVER NN
* <li>The most recent STANDBY NN
* <li>The most recent UNAVAILABLE NN
* </ul>
*
* In the case of observerRead Returns entries in preference of :
* <ul>
* <li>The most recent OBSERVER NN
* <li>The most recent ACTIVE NN
* <li>The most recent STANDBY NN
* <li>The most recent UNAVAILABLE NN
* </ul>
*
* @param nameserviceId Nameservice identifier.
* @param listObserversFirst Observer read case, observer NN will be ranked first
* @return Prioritized list of namenode contexts.
* @throws IOException If the state store cannot be accessed.
*/
List<? extends FederationNamenodeContext>
getNamenodesForNameserviceId(String nameserviceId) throws IOException;
List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(
String nameserviceId, boolean listObserversFirst) throws IOException;

/**
* Returns a prioritized list of the most recent cached registration entries
* for a single block pool ID.
* Returns an empty list if none are found. Returns entries in preference of:
* <ul>
* <li>The most recent ACTIVE NN
* <li>The most recent OBSERVER NN
* <li>The most recent STANDBY NN
* <li>The most recent UNAVAILABLE NN
* </ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.ACTIVE;
import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.EXPIRED;
import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.OBSERVER;
import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.UNAVAILABLE;

import java.io.IOException;
Expand All @@ -32,6 +33,7 @@
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.store.DisabledNameserviceStore;
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
Expand Down Expand Up @@ -73,8 +75,11 @@ public class MembershipNamenodeResolver
/** Parent router ID. */
private String routerId;

/** Cached lookup of NN for nameservice. Invalidated on cache refresh. */
private Map<String, List<? extends FederationNamenodeContext>> cacheNS;
/** Cached lookup of namenodes for nameservice. The keys are a pair of the nameservice
* name and a boolean indicating if observer namenodes should be listed first.
* If true, observer namenodes are listed first. If false, active namenodes are listed first.
* Invalidated on cache refresh. */
private Map<Pair<String,Boolean>, List<? extends FederationNamenodeContext>> cacheNS;
/** Cached lookup of NN for block pool. Invalidated on cache refresh. */
private Map<String, List<? extends FederationNamenodeContext>> cacheBP;

Expand Down Expand Up @@ -136,11 +141,21 @@ public boolean loadCache(boolean force) {
return true;
}

@Override public void updateUnavailableNamenode(String nsId,
InetSocketAddress address) throws IOException {
updateNameNodeState(nsId, address, UNAVAILABLE);
}

@Override
public void updateActiveNamenode(
final String nsId, final InetSocketAddress address) throws IOException {
updateNameNodeState(nsId, address, ACTIVE);
}

// Called when we have an RPC miss and successful hit on an alternate NN.

private void updateNameNodeState(final String nsId,
final InetSocketAddress address, FederationNamenodeServiceState state)
throws IOException {
// Temporarily update our cache, it will be overwritten on the next update.
try {
MembershipState partial = MembershipState.newInstance();
Expand All @@ -160,10 +175,11 @@ public void updateActiveNamenode(
MembershipState record = records.get(0);
UpdateNamenodeRegistrationRequest updateRequest =
UpdateNamenodeRegistrationRequest.newInstance(
record.getNameserviceId(), record.getNamenodeId(), ACTIVE);
record.getNameserviceId(), record.getNamenodeId(), state);
membership.updateNamenodeRegistration(updateRequest);

cacheNS.remove(nsId);
cacheNS.remove(Pair.of(nsId, Boolean.TRUE));
cacheNS.remove(Pair.of(nsId, Boolean.FALSE));
// Invalidating the full cacheBp since getting the blockpool id from
// namespace id is quite costly.
cacheBP.clear();
Expand All @@ -175,9 +191,9 @@ public void updateActiveNamenode(

@Override
public List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(
final String nsId) throws IOException {
final String nsId, boolean listObserversFirst) throws IOException {

List<? extends FederationNamenodeContext> ret = cacheNS.get(nsId);
List<? extends FederationNamenodeContext> ret = cacheNS.get(Pair.of(nsId, listObserversFirst));
if (ret != null) {
return ret;
}
Expand All @@ -189,7 +205,8 @@ public List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(
partial.setNameserviceId(nsId);
GetNamenodeRegistrationsRequest request =
GetNamenodeRegistrationsRequest.newInstance(partial);
result = getRecentRegistrationForQuery(request, true, false);
result = getRecentRegistrationForQuery(request, true,
false, listObserversFirst);
} catch (StateStoreUnavailableException e) {
LOG.error("Cannot get active NN for {}, State Store unavailable", nsId);
return null;
Expand Down Expand Up @@ -218,7 +235,7 @@ public List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(

// Cache the response
ret = Collections.unmodifiableList(result);
cacheNS.put(nsId, result);
cacheNS.put(Pair.of(nsId, listObserversFirst), result);
return ret;
}

Expand All @@ -235,7 +252,7 @@ public List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId(
GetNamenodeRegistrationsRequest.newInstance(partial);

final List<MembershipState> result =
getRecentRegistrationForQuery(request, true, false);
getRecentRegistrationForQuery(request, true, false, false);
if (result == null || result.isEmpty()) {
LOG.error("Cannot locate eligible NNs for {}", bpId);
} else {
Expand Down Expand Up @@ -346,22 +363,34 @@ public Set<String> getDisabledNamespaces() throws IOException {
}

/**
* Picks the most relevant record registration that matches the query. Return
* registrations matching the query in this preference: 1) Most recently
* updated ACTIVE registration 2) Most recently updated STANDBY registration
* (if showStandby) 3) Most recently updated UNAVAILABLE registration (if
* showUnavailable). EXPIRED registrations are ignored.
* Picks the most relevant record registration that matches the query.
* If not observer read,
* return registrations matching the query in this preference:
* 1) Most recently updated ACTIVE registration
* 2) Most recently updated Observer registration
* 3) Most recently updated STANDBY registration (if showStandby)
* 4) Most recently updated UNAVAILABLE registration (if showUnavailable).
*
* If observer read,
* return registrations matching the query in this preference:
* 1) Observer registrations, shuffled to disperse queries.
* 2) Most recently updated ACTIVE registration
* 3) Most recently updated STANDBY registration (if showStandby)
* 4) Most recently updated UNAVAILABLE registration (if showUnavailable).
*
* EXPIRED registrations are ignored.
*
* @param request The select query for NN registrations.
* @param addUnavailable include UNAVAILABLE registrations.
* @param addExpired include EXPIRED registrations.
* @param observerRead Observer read case, observer NN will be ranked first
* @return List of memberships or null if no registrations that
* both match the query AND the selected states.
* @throws IOException
*/
private List<MembershipState> getRecentRegistrationForQuery(
GetNamenodeRegistrationsRequest request, boolean addUnavailable,
boolean addExpired) throws IOException {
boolean addExpired, boolean observerRead) throws IOException {

// Retrieve a list of all registrations that match this query.
// This may include all NN records for a namespace/blockpool, including
Expand All @@ -371,24 +400,34 @@ private List<MembershipState> getRecentRegistrationForQuery(
membershipStore.getNamenodeRegistrations(request);

List<MembershipState> memberships = response.getNamenodeMemberships();
if (!addExpired || !addUnavailable) {
Iterator<MembershipState> iterator = memberships.iterator();
while (iterator.hasNext()) {
MembershipState membership = iterator.next();
if (membership.getState() == EXPIRED && !addExpired) {
iterator.remove();
} else if (membership.getState() == UNAVAILABLE && !addUnavailable) {
iterator.remove();
}
List<MembershipState> observerMemberships = new ArrayList<>();
Iterator<MembershipState> iterator = memberships.iterator();
while (iterator.hasNext()) {
MembershipState membership = iterator.next();
if (membership.getState() == EXPIRED && !addExpired) {
iterator.remove();
} else if (membership.getState() == UNAVAILABLE && !addUnavailable) {
iterator.remove();
} else if (membership.getState() == OBSERVER && observerRead) {
iterator.remove();
observerMemberships.add(membership);
}
}

List<MembershipState> priorityList = new ArrayList<>();
priorityList.addAll(memberships);
Collections.sort(priorityList, new NamenodePriorityComparator());
memberships.sort(new NamenodePriorityComparator());
if(observerRead) {
List<MembershipState> ret = new ArrayList<>(
memberships.size() + observerMemberships.size());
if(observerMemberships.size() > 1) {
Collections.shuffle(observerMemberships);
}
ret.addAll(observerMemberships);
ret.addAll(memberships);
memberships = ret;
}

LOG.debug("Selected most recent NN {} for query", priorityList);
return priorityList;
LOG.debug("Selected most recent NN {} for query", memberships);
return memberships;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public AtomicInteger getClientIndex() {
}

/**
* Get the alignment context for this pool
* Get the alignment context for this pool.
* @return Alignment context
*/
public PoolAlignmentContext getPoolAlignmentContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
FEDERATION_STORE_PREFIX + "enable";
public static final boolean DFS_ROUTER_STORE_ENABLE_DEFAULT = true;

public static final String DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY =
FEDERATION_ROUTER_PREFIX + "observer.read.default";
public static final boolean DFS_ROUTER_OBSERVER_READ_DEFAULT_VALUE = false;
public static final String DFS_ROUTER_OBSERVER_READ_OVERRIDES =
FEDERATION_ROUTER_PREFIX + "observer.read.overrides";

public static final String DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE =
FEDERATION_ROUTER_PREFIX + "observer.federated.state.propagation.maxsize";
public static final int DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE_DEFAULT = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1918,7 +1918,10 @@ public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,

@Override
public void msync() throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
RemoteMethod method = new RemoteMethod("msync");
rpcClient.invokeConcurrent(nss, method);
}

@Override
Expand Down
Loading