Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.EnhancedHeadroom;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
Expand Down Expand Up @@ -147,6 +148,23 @@ public static AllocateResponse newInstance(int responseId,
.collectorInfo(collectorInfo).build();
}

@Private
@Unstable
public static AllocateResponse newInstance(int responseId,
List<ContainerStatus> completedContainers,
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
Resource availResources, AMCommand command, int numClusterNodes,
PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken,
List<UpdatedContainer> updatedContainers, CollectorInfo collectorInfo,
EnhancedHeadroom enhancedHeadroom) {
AllocateResponse response =
newInstance(responseId, completedContainers, allocatedContainers,
updatedNodes, availResources, command, numClusterNodes, preempt,
nmTokens, amRMToken, updatedContainers, collectorInfo);
response.setEnhancedHeadroom(enhancedHeadroom);
return response;
}

/**
* If the <code>ResourceManager</code> needs the
* <code>ApplicationMaster</code> to take some action then it will send an
Expand Down Expand Up @@ -439,6 +457,14 @@ public static AllocateResponseBuilder newBuilder() {
return new AllocateResponseBuilder();
}

@Public
@Unstable
public abstract EnhancedHeadroom getEnhancedHeadroom();

@Private
@Unstable
public abstract void setEnhancedHeadroom(EnhancedHeadroom enhancedHeadroom);

/**
* Class to construct instances of {@link AllocateResponse} with specific
* options.
Expand Down Expand Up @@ -666,6 +692,18 @@ public AllocateResponseBuilder containersFromPreviousAttempt(
return this;
}

@Public
@Unstable
public EnhancedHeadroom getEnhancedHeadroom() {
return allocateResponse.getEnhancedHeadroom();
}

@Private
@Unstable
public void setEnhancedHeadroom(EnhancedHeadroom enhancedHeadroom){
allocateResponse.setEnhancedHeadroom(enhancedHeadroom);
}

/**
* Return generated {@link AllocateResponse} object.
* @return {@link AllocateResponse}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.api.records;

import org.apache.hadoop.yarn.util.Records;

/**
* Enhanced head room in AllocateResponse.
* This provides a channel for RMs to return load information for AMRMProxy
* decision making when rerouting resource requests.
*
* Contains total pending container count and active cores for a cluster.
*/
public abstract class EnhancedHeadroom {
public static EnhancedHeadroom newInstance(int totalPendingCount,
int totalActiveCores) {
EnhancedHeadroom enhancedHeadroom =
Records.newRecord(EnhancedHeadroom.class);
enhancedHeadroom.setTotalPendingCount(totalPendingCount);
enhancedHeadroom.setTotalActiveCores(totalActiveCores);
return enhancedHeadroom;
}

/**
* Set total pending container count.
* @param totalPendingCount the pending container count
*/
public abstract void setTotalPendingCount(int totalPendingCount);

/**
* Get total pending container count.
* @return the pending container count
*/
public abstract int getTotalPendingCount();

/**
* Set total active cores for the cluster.
* @param totalActiveCores the total active cores for the cluster
*/
public abstract void setTotalActiveCores(int totalActiveCores);

/**
* Get total active cores for the cluster.
* @return totalActiveCores the total active cores for the cluster
*/
public abstract int getTotalActiveCores();

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("<pendingCount:").append(this.getTotalPendingCount());
sb.append(", activeCores:").append(this.getTotalActiveCores());
sb.append(">");
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ message UpdatedContainerProto {
required ContainerProto container = 2;
}

message EnhancedHeadroomProto {
optional int32 total_pending_count = 1;
optional int32 total_active_cores = 2;
}

message AllocateResponseProto {
optional AMCommandProto a_m_command = 1;
optional int32 response_id = 2;
Expand All @@ -123,6 +128,7 @@ message AllocateResponseProto {
repeated UpdatedContainerProto updated_containers = 16;
repeated ContainerProto containers_from_previous_attempts = 17;
repeated RejectedSchedulingRequestProto rejected_scheduling_requests = 18;
optional EnhancedHeadroomProto enhanced_headroom = 19;
}

enum SchedulerResourceTypes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.EnhancedHeadroom;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
Expand All @@ -43,6 +44,7 @@
import org.apache.hadoop.yarn.api.records.impl.pb.CollectorInfoPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.EnhancedHeadroomPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl;
Expand Down Expand Up @@ -89,6 +91,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
private Token amrmToken = null;
private Priority appPriority = null;
private CollectorInfo collectorInfo = null;
private EnhancedHeadroom enhancedHeadroom = null;

public AllocateResponsePBImpl() {
builder = AllocateResponseProto.newBuilder();
Expand Down Expand Up @@ -190,6 +193,9 @@ private synchronized void mergeLocalToBuilder() {
getContainerProtoIterable(this.containersFromPreviousAttempts);
builder.addAllContainersFromPreviousAttempts(iterable);
}
if (this.enhancedHeadroom != null) {
builder.setEnhancedHeadroom(convertToProtoFormat(this.enhancedHeadroom));
}
}

private synchronized void mergeLocalToProto() {
Expand Down Expand Up @@ -422,6 +428,28 @@ public synchronized void setAMRMToken(Token amRMToken) {
this.amrmToken = amRMToken;
}

@Override
public synchronized EnhancedHeadroom getEnhancedHeadroom() {
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
if (enhancedHeadroom != null) {
return enhancedHeadroom;
}
if (!p.hasEnhancedHeadroom()) {
return null;
}
this.enhancedHeadroom = convertFromProtoFormat(p.getEnhancedHeadroom());
return enhancedHeadroom;
}

@Override
public synchronized void setEnhancedHeadroom(
EnhancedHeadroom enhancedHeadroom) {
maybeInitBuilder();
if (enhancedHeadroom == null) {
builder.clearEnhancedHeadroom();
}
this.enhancedHeadroom = enhancedHeadroom;
}

@Override
public synchronized CollectorInfo getCollectorInfo() {
Expand Down Expand Up @@ -933,4 +961,14 @@ private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
private PriorityProto convertToProtoFormat(Priority t) {
return ((PriorityPBImpl)t).getProto();
}

private EnhancedHeadroomPBImpl convertFromProtoFormat(
YarnServiceProtos.EnhancedHeadroomProto p) {
return new EnhancedHeadroomPBImpl(p);
}

private YarnServiceProtos.EnhancedHeadroomProto convertToProtoFormat(
EnhancedHeadroom t) {
return ((EnhancedHeadroomPBImpl) t).getProto();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.api.records.impl.pb;

import org.apache.hadoop.yarn.api.records.EnhancedHeadroom;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.EnhancedHeadroomProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.EnhancedHeadroomProtoOrBuilder;

import org.apache.hadoop.thirdparty.protobuf.TextFormat;

public class EnhancedHeadroomPBImpl extends EnhancedHeadroom {

private EnhancedHeadroomProto proto =
EnhancedHeadroomProto.getDefaultInstance();
private EnhancedHeadroomProto.Builder builder = null;
private boolean viaProto = false;

public EnhancedHeadroomPBImpl() {
builder = EnhancedHeadroomProto.newBuilder();
}

public EnhancedHeadroomPBImpl(EnhancedHeadroomProto proto) {
this.proto = proto;
viaProto = true;
}

public EnhancedHeadroomProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}

@Override
public int hashCode() {
return getProto().hashCode();
}

@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}

@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}

private void mergeLocalToBuilder() {
// No local content yet
}

private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}

private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = EnhancedHeadroomProto.newBuilder(proto);
}
viaProto = false;
}

@Override
public void setTotalPendingCount(int totalPendingCount) {
maybeInitBuilder();
if (totalPendingCount == 0) {
builder.clearTotalPendingCount();
return;
}
builder.setTotalPendingCount(totalPendingCount);
}

@Override
public int getTotalPendingCount() {
EnhancedHeadroomProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasTotalPendingCount()) ? p.getTotalPendingCount() : 0;
}

@Override
public void setTotalActiveCores(int totalActiveCores) {
maybeInitBuilder();
if (totalActiveCores == 0) {
builder.clearTotalActiveCores();
return;
}
builder.setTotalActiveCores(totalActiveCores);
}

@Override
public int getTotalActiveCores() {
EnhancedHeadroomProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasTotalActiveCores()) ? p.getTotalActiveCores() : 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.EnhancedHeadroom;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
Expand Down Expand Up @@ -184,6 +185,7 @@
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerReportPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerRetryContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.EnhancedHeadroomPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ExecutionTypeRequestPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
Expand Down Expand Up @@ -430,6 +432,7 @@ public static void setup() throws Exception {
generateByNewInstance(UpdatedContainer.class);
generateByNewInstance(ContainerUpdateRequest.class);
generateByNewInstance(ContainerUpdateResponse.class);
generateByNewInstance(EnhancedHeadroom.class);
// genByNewInstance does not apply to QueueInfo, cause
// it is recursive(has sub queues)
typeValueCache.put(QueueInfo.class, QueueInfo.
Expand Down Expand Up @@ -1331,4 +1334,10 @@ public void testGetNodesToAttributesResponsePBImpl() throws Exception {
validatePBImplRecord(GetNodesToAttributesResponsePBImpl.class,
YarnServiceProtos.GetNodesToAttributesResponseProto.class);
}

@Test
public void testGetEnhancedHeadroomPBImpl() throws Exception {
validatePBImplRecord(EnhancedHeadroomPBImpl.class,
YarnServiceProtos.EnhancedHeadroomProto.class);
}
}
Loading