Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ protected void addReservationQueue(
PlanQueue planQueue = (PlanQueue)queue;
try {
ReservationQueue resQueue =
new ReservationQueue(cs, currResId, planQueue);
new ReservationQueue(cs.getQueueContext(), currResId, planQueue);
cs.addQueue(resQueue);
} catch (SchedulerDynamicEditException e) {
LOG.warn(
Expand All @@ -115,7 +115,7 @@ protected void createDefaultReservationQueue(
if (cs.getQueue(defReservationId) == null) {
try {
ReservationQueue defQueue =
new ReservationQueue(cs, defReservationId, planQueue);
new ReservationQueue(cs.getQueueContext(), defReservationId, planQueue);
cs.addQueue(defQueue);
} catch (SchedulerDynamicEditException e) {
LOG.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,15 @@
* of AbstractManagedParentQueue
*/
public class AbstractAutoCreatedLeafQueue extends AbstractLeafQueue {
private static final Logger LOG = LoggerFactory.getLogger(
AbstractAutoCreatedLeafQueue.class);

protected AbstractManagedParentQueue parent;

public AbstractAutoCreatedLeafQueue(CapacitySchedulerContext cs,
public AbstractAutoCreatedLeafQueue(CapacitySchedulerQueueContext queueContext,
String queueName, AbstractManagedParentQueue parent, CSQueue old)
throws IOException {
super(cs, queueName, parent, old);
this.parent = parent;
}

private static final Logger LOG = LoggerFactory.getLogger(
AbstractAutoCreatedLeafQueue.class);

public AbstractAutoCreatedLeafQueue(CapacitySchedulerContext cs,
CapacitySchedulerConfiguration leafQueueConfigs, String queueName,
AbstractManagedParentQueue parent, CSQueue old) throws IOException {
super(cs, leafQueueConfigs, queueName, parent, old);
super(queueContext, queueName, parent, old);
this.parent = parent;
}

Expand All @@ -71,23 +63,23 @@ public void setEntitlement(QueueEntitlement entitlement)
@Override
protected Resource getMinimumAbsoluteResource(String queuePath,
String label) {
return super.getMinimumAbsoluteResource(csContext.getConfiguration()
return super.getMinimumAbsoluteResource(queueContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(this.getParent().getQueuePath()),
label);
}

@Override
protected Resource getMaximumAbsoluteResource(String queuePath,
String label) {
return super.getMaximumAbsoluteResource(csContext.getConfiguration()
return super.getMaximumAbsoluteResource(queueContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(this.getParent().getQueuePath()),
label);
}

@Override
protected boolean checkConfigTypeIsAbsoluteResource(String queuePath,
String label) {
return super.checkConfigTypeIsAbsoluteResource(csContext.getConfiguration()
return super.checkConfigTypeIsAbsoluteResource(queueContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(this.getParent().getQueuePath()),
label);
}
Expand Down Expand Up @@ -122,7 +114,7 @@ public void setEntitlement(String nodeLabel, QueueEntitlement entitlement)

//update queue used capacity etc
CSQueueUtils.updateQueueStatistics(resourceCalculator,
csContext.getClusterResource(),
queueContext.getClusterResource(),
this, labelManager, nodeLabel);
} finally {
writeLock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public enum CapacityConfigType {

private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
protected CapacitySchedulerContext csContext;
protected CapacitySchedulerQueueContext queueContext;
protected YarnAuthorizationProvider authorizer = null;

protected ActivitiesManager activitiesManager;
Expand All @@ -131,33 +131,39 @@ public enum CapacityConfigType {
// is it a dynamic queue?
private boolean dynamicQueue = false;

public AbstractCSQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
this(cs, cs.getConfiguration(), queueName, parent, old);
}

public AbstractCSQueue(CapacitySchedulerContext cs,
CapacitySchedulerConfiguration configuration, String queueName,
public AbstractCSQueue(CapacitySchedulerQueueContext queueContext, String queueName,
CSQueue parent, CSQueue old) {
this.labelManager = cs.getRMContext().getNodeLabelManager();
this.parent = parent;
this.queuePath = createQueuePath(parent, queueName);
this.resourceCalculator = cs.getResourceCalculator();
this.activitiesManager = cs.getActivitiesManager();

this.queueContext = queueContext;
this.resourceCalculator = queueContext.getResourceCalculator();
this.activitiesManager = queueContext.getActivitiesManager();
this.labelManager = queueContext.getLabelManager();

// must be called after parent and queueName is set
CSQueueMetrics metrics = old != null ?
(CSQueueMetrics) old.getMetrics() :
CSQueueMetrics.forQueue(getQueuePath(), parent,
cs.getConfiguration().getEnableUserMetrics(), configuration);
usageTracker = new CSQueueUsageTracker(metrics);
this.csContext = cs;
this.queueAllocationSettings = new QueueAllocationSettings(csContext);
queueEntity = new PrivilegedEntity(EntityType.QUEUE, getQueuePath());
queueCapacities = new QueueCapacities(parent == null);
queueContext.getConfiguration().getEnableUserMetrics(), queueContext.getConfiguration());
this.usageTracker = new CSQueueUsageTracker(metrics);

this.queueCapacities = new QueueCapacities(parent == null);
this.queueAllocationSettings = new QueueAllocationSettings(queueContext.getMinimumAllocation());

this.queueEntity = new PrivilegedEntity(EntityType.QUEUE, getQueuePath());

this.resourceTypes = new HashSet<>();
for (AbsoluteResourceType type : AbsoluteResourceType.values()) {
this.resourceTypes.add(type.toString().toLowerCase());
}

ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();

LOG.debug("Initialized {}: name={}, fullname={}", this.getClass().getSimpleName(),
queueName, getQueuePath());
}

private static QueuePath createQueuePath(CSQueue parent, String queueName) {
Expand All @@ -167,11 +173,6 @@ private static QueuePath createQueuePath(CSQueue parent, String queueName) {
return new QueuePath(parent.getQueuePath(), queueName);
}

@VisibleForTesting
protected void setupConfigurableCapacities() {
setupConfigurableCapacities(csContext.getConfiguration());
}

protected void setupConfigurableCapacities(
CapacitySchedulerConfiguration configuration) {
CSQueueUtils.loadCapacitiesByLabelsFromConf(queuePath, queueCapacities,
Expand Down Expand Up @@ -262,6 +263,10 @@ public PrivilegedEntity getPrivilegedEntity() {
return queueEntity;
}

public CapacitySchedulerQueueContext getQueueContext() {
return queueContext;
}

public Set<String> getAccessibleNodeLabels() {
return queueNodeLabelsSettings.getAccessibleNodeLabels();
}
Expand Down Expand Up @@ -336,26 +341,24 @@ protected void setupQueueConfigs(Resource clusterResource,

// Collect and set the Node label configuration
this.queueNodeLabelsSettings = new QueueNodeLabelsSettings(configuration, parent,
getQueuePath(), csContext);
getQueuePath(), queueContext.getQueueManager().getConfiguredNodeLabelsForAllQueues());

// Initialize the queue capacities
setupConfigurableCapacities(configuration);
updateAbsoluteCapacities();

updateCapacityConfigType();

// Fetch minimum/maximum resource limits for this queue if
// configured
this.resourceTypes = new HashSet<>();
for (AbsoluteResourceType type : AbsoluteResourceType.values()) {
resourceTypes.add(type.toString().toLowerCase());
}
updateConfigurableResourceLimits(clusterResource);

// Setup queue's maximumAllocation respecting the global
// and the queue settings
this.queueAllocationSettings.setupMaximumAllocation(configuration, getQueuePath(),
parent, csContext);
// TODO remove the getConfiguration() param after the AQC configuration duplication
// removal is resolved
this.queueAllocationSettings.setupMaximumAllocation(configuration,
queueContext.getConfiguration(), getQueuePath(),
parent);

// Initialize the queue state based on previous state, configured state
// and its parent state
Expand All @@ -369,7 +372,8 @@ protected void setupQueueConfigs(Resource clusterResource,

this.reservationsContinueLooking =
configuration.getReservationContinueLook();
this.configuredCapacityVectors = csContext.getConfiguration()

this.configuredCapacityVectors = queueContext.getConfiguration()
.parseConfiguredResourceVector(queuePath.getFullPath(),
this.queueNodeLabelsSettings.getConfiguredNodeLabels());

Expand All @@ -378,7 +382,10 @@ protected void setupQueueConfigs(Resource clusterResource,
this, labelManager, null);

// Store preemption settings
this.preemptionSettings = new CSQueuePreemptionSettings(this, csContext, configuration);
// TODO remove the getConfiguration() param after the AQC configuration duplication
// removal is resolved
this.preemptionSettings = new CSQueuePreemptionSettings(this, configuration,
queueContext.getConfiguration());
this.priority = configuration.getQueuePriority(
getQueuePath());

Expand Down Expand Up @@ -409,12 +416,13 @@ protected void setDynamicQueueProperties(
AutoCreatedQueueTemplate.AUTO_QUEUE_TEMPLATE_PREFIX);
parentTemplate = parentTemplate.substring(0, parentTemplate.lastIndexOf(
DOT));
Set<String> parentNodeLabels = csContext
.getCapacitySchedulerQueueManager().getConfiguredNodeLabels()
Set<String> parentNodeLabels = queueContext.getQueueManager()
.getConfiguredNodeLabelsForAllQueues()
.getLabelsByQueue(parentTemplate);

if (parentNodeLabels != null && parentNodeLabels.size() > 1) {
csContext.getCapacitySchedulerQueueManager().getConfiguredNodeLabels()
queueContext.getQueueManager()
.getConfiguredNodeLabelsForAllQueues()
.setLabelsByQueue(getQueuePath(), new HashSet<>(parentNodeLabels));
}
}
Expand All @@ -436,20 +444,18 @@ private UserWeights getUserWeightsFromHierarchy(
}

protected Resource getMinimumAbsoluteResource(String queuePath, String label) {
Resource minResource = csContext.getConfiguration()
return queueContext.getConfiguration()
.getMinimumResourceRequirement(label, queuePath, resourceTypes);
return minResource;
}

protected Resource getMaximumAbsoluteResource(String queuePath, String label) {
Resource maxResource = csContext.getConfiguration()
return queueContext.getConfiguration()
.getMaximumResourceRequirement(label, queuePath, resourceTypes);
return maxResource;
}

protected boolean checkConfigTypeIsAbsoluteResource(String queuePath,
String label) {
return csContext.getConfiguration().checkConfigTypeIsAbsoluteResource(label,
return queueContext.getConfiguration().checkConfigTypeIsAbsoluteResource(label,
queuePath, resourceTypes);
}

Expand Down Expand Up @@ -743,7 +749,7 @@ protected void releaseResource(Resource clusterResource,
}

@Private
public boolean getReservationContinueLooking() {
public boolean isReservationsContinueLooking() {
return reservationsContinueLooking;
}

Expand All @@ -764,7 +770,7 @@ public boolean getPreemptionDisabled() {

@Private
public boolean getIntraQueuePreemptionDisabled() {
return preemptionSettings.getIntraQueuePreemptionDisabled();
return preemptionSettings.isIntraQueuePreemptionDisabled();
}

@Private
Expand Down Expand Up @@ -1026,12 +1032,12 @@ public Set<String> getNodeLabelsForQueue() {
}

public Resource getTotalKillableResource(String partition) {
return csContext.getPreemptionManager().getKillableResource(getQueuePath(),
return queueContext.getPreemptionManager().getKillableResource(getQueuePath(),
partition);
}

public Iterator<RMContainer> getKillableContainers(String partition) {
return csContext.getPreemptionManager().getKillableContainers(
return queueContext.getPreemptionManager().getKillableContainers(
getQueuePath(),
partition);
}
Expand Down Expand Up @@ -1383,7 +1389,7 @@ public boolean isInactiveDynamicQueue() {
long idleDurationSeconds =
(Time.monotonicNow() - getLastSubmittedTimestamp())/1000;
return isDynamicQueue() && isEligibleForAutoDeletion() &&
(idleDurationSeconds > this.csContext.getConfiguration().
(idleDurationSeconds > queueContext.getConfiguration().
getAutoExpiredDeletionTime());
}

Expand Down
Loading