Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.List;
import java.util.Map;
import java.util.Collections;

import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
Expand All @@ -34,6 +35,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;

/**
* Base abstract class for {@link FederationRouterPolicy} implementations, that
Expand Down Expand Up @@ -98,7 +100,17 @@ protected Map<SubClusterId, SubClusterInfo> prefilterSubClusters(

// if a reservation exists limit scope to the sub-cluster this
// reservation is mapped to
// TODO: Implemented in YARN-11236
if (reservationId != null) {
// note this might throw YarnException if the reservation is
// unknown. This is to be expected, and should be handled by
// policy invoker.
FederationStateStoreFacade stateStoreFacade =
getPolicyContext().getFederationStateStoreFacade();
SubClusterId resSubCluster = stateStoreFacade.getReservationHomeSubCluster(reservationId);
SubClusterInfo subClusterInfo = activeSubClusters.get(resSubCluster);
return Collections.singletonMap(resSubCluster, subClusterInfo);
}

return activeSubClusters;
}

Expand Down Expand Up @@ -129,7 +141,7 @@ public SubClusterId getHomeSubcluster(ApplicationSubmissionContext appContext,

// apply filtering based on reservation location and active sub-clusters
Map<SubClusterId, SubClusterInfo> filteredSubClusters = prefilterSubClusters(
appContext.getReservationID(), getActiveSubclusters());
appContext.getReservationID(), getActiveSubclusters());

FederationPolicyUtils.validateSubClusterAvailability(filteredSubClusters.keySet(), blackLists);

Expand Down Expand Up @@ -167,8 +179,7 @@ public SubClusterId getReservationHomeSubcluster(ReservationSubmissionRequest re
}

// apply filtering based on reservation location and active sub-clusters
Map<SubClusterId, SubClusterInfo> filteredSubClusters = prefilterSubClusters(
request.getReservationId(), getActiveSubclusters());
Map<SubClusterId, SubClusterInfo> filteredSubClusters = getActiveSubclusters();

// pick the chosen subCluster from the active ones
return chooseSubCluster(request.getQueue(), filteredSubClusters);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.hadoop.yarn.server.federation.store;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;

/**
* FederationReservationHomeSubClusterStore maintains the state of all
* <em>Reservations</em> that have been submitted to the federated cluster.
*
* *
* <p>
* The mapping details contains:
* <ul>
* <li>{@code ReservationId}</li>
* <li>{@code SubClusterId}</li>
* </ul>
*
*/
@Private
@Unstable
public interface FederationReservationHomeSubClusterStore {

/**
* Register the home {@code SubClusterId} of the newly submitted
* {@code ReservationId}. Currently response is empty if the operation was
* successful, if not an exception reporting reason for a failure. If a
* mapping for the Reservation already existed, the {@code SubClusterId} in
* this response will return the existing mapping which might be different
* from that in the {@code AddReservationHomeSubClusterRequest}.
*
* @param request the request to register a new Reservation with its home
* sub-cluster
* @return upon successful registration of the Reservation in the StateStore,
* {@code AddReservationHomeSubClusterRequest} containing the home
* sub-cluster of the Reservation. Otherwise, an exception reporting
* reason for a failure
* @throws YarnException if the request is invalid/fails
*/
AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
AddReservationHomeSubClusterRequest request) throws YarnException;

/**
* Get information about the Reservation identified by the input
* {@code ReservationId}.
*
* @param request contains the Reservation queried
* @return {@code ReservationHomeSubCluster} containing the Reservation's home
* subcluster
* @throws YarnException if the request is invalid/fails
*/
GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
GetReservationHomeSubClusterRequest request) throws YarnException;

/**
* Get the {@code ReservationHomeSubCluster} list representing the mapping of
* all submitted Reservations to it's home sub-cluster.
*
* @param request empty representing all Reservations
* @return the mapping of all submitted Reservation to it's home sub-cluster
* @throws YarnException if the request is invalid/fails
*/
GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
GetReservationsHomeSubClusterRequest request) throws YarnException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
/**
* FederationStore extends the three interfaces used to coordinate the state of
* a federated cluster: {@link FederationApplicationHomeSubClusterStore},
* {@link FederationMembershipStateStore}, and {@link FederationPolicyStore}.
* {@link FederationMembershipStateStore}, {@link FederationPolicyStore}, and
* {@link FederationReservationHomeSubClusterStore}.
*
*/
public interface FederationStateStore
extends FederationApplicationHomeSubClusterStore,
FederationMembershipStateStore, FederationPolicyStore {
public interface FederationStateStore extends
FederationApplicationHomeSubClusterStore, FederationMembershipStateStore,
FederationPolicyStore, FederationReservationHomeSubClusterStore {

/**
* Initialize the FederationStore.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
Expand Down Expand Up @@ -59,7 +60,15 @@
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
Expand All @@ -75,6 +84,7 @@ public class MemoryFederationStateStore implements FederationStateStore {

private Map<SubClusterId, SubClusterInfo> membership;
private Map<ApplicationId, SubClusterId> applications;
private Map<ReservationId, SubClusterId> reservations;
private Map<String, SubClusterPolicyConfiguration> policies;

private final MonotonicClock clock = new MonotonicClock();
Expand All @@ -86,13 +96,15 @@ public class MemoryFederationStateStore implements FederationStateStore {
public void init(Configuration conf) {
membership = new ConcurrentHashMap<SubClusterId, SubClusterInfo>();
applications = new ConcurrentHashMap<ApplicationId, SubClusterId>();
reservations = new ConcurrentHashMap<ReservationId, SubClusterId>();
policies = new ConcurrentHashMap<String, SubClusterPolicyConfiguration>();
}

@Override
public void close() {
membership = null;
applications = null;
reservations = null;
policies = null;
}

Expand Down Expand Up @@ -312,4 +324,45 @@ public Version loadVersion() {
return null;
}

@Override
public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
AddReservationHomeSubClusterRequest request) throws YarnException {
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
ReservationHomeSubCluster homeSubCluster = request.getReservationHomeSubCluster();
ReservationId reservationId = homeSubCluster.getReservationId();
if (!reservations.containsKey(reservationId)) {
reservations.put(reservationId, homeSubCluster.getHomeSubCluster());
}
return AddReservationHomeSubClusterResponse.newInstance(reservations.get(reservationId));
}

@Override
public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
GetReservationHomeSubClusterRequest request) throws YarnException {
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
ReservationId reservationId = request.getReservationId();
if (!reservations.containsKey(reservationId)) {
throw new YarnException("Reservation " + reservationId + " does not exist");
}
SubClusterId subClusterId = reservations.get(reservationId);
ReservationHomeSubCluster homeSubCluster =
ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
return GetReservationHomeSubClusterResponse.newInstance(homeSubCluster);
}

@Override
public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
GetReservationsHomeSubClusterRequest request) throws YarnException {
List<ReservationHomeSubCluster> result = new ArrayList<>();

for (Entry<ReservationId, SubClusterId> entry : reservations.entrySet()) {
ReservationId reservationId = entry.getKey();
SubClusterId subClusterId = entry.getValue();
ReservationHomeSubCluster homeSubCluster =
ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
result.add(homeSubCluster);
}

return GetReservationsHomeSubClusterResponse.newInstance(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
Expand Down Expand Up @@ -1004,4 +1010,21 @@ private static byte[] getByteArray(ByteBuffer bb) {
return ba;
}

@Override
public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
AddReservationHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");
}

@Override
public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
GetReservationHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");
}

@Override
public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
GetReservationsHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.TimeZone;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.api.records.ApplicationId;
Expand Down Expand Up @@ -65,6 +66,12 @@
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterIdPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterInfoPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterPolicyConfigurationPBImpl;
Expand Down Expand Up @@ -637,4 +644,22 @@ private static long getCurrentTime() {
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
return cal.getTimeInMillis();
}

@Override
public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
AddReservationHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");
}

@Override
public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
GetReservationHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");
}

@Override
public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
GetReservationsHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");
}
}
Loading