Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -783,15 +783,11 @@ public void removeVeryOldStoppedContainersFromCache() {
break;
}
if (!context.getContainers().containsKey(cid)) {
ApplicationId appId =
cid.getApplicationAttemptId().getApplicationId();
if (isApplicationStopped(appId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current code will also delete all delegation tokens in the application, including those used for log aggregation, causing log aggregation to fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate? Are you saying there is a problem with these changes? Or something else wrong in the existing code?

Copy link
Contributor

@zeekling zeekling Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I carefully reviewed the code again and there should be no problem of aggregation failure.

By the way.

Aggregate logs of containers that completed more than 30 minutes (or longer) to onto HDFS in advance, rather than waiting for the job to completed.

Is this implementation better than the current one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't actually change anything with how log aggregation works. It just prevents the node manager from holding onto finished containers in the store after the logs have been aggregated

i.remove();
try {
context.getNMStateStore().removeContainer(cid);
} catch (IOException e) {
LOG.error("Unable to remove container {} in store.", cid, e);
}
i.remove();
try {
context.getNMStateStore().removeContainer(cid);
} catch (IOException e) {
LOG.error("Unable to remove container {} in store.", cid, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerRecoveredEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
Expand All @@ -165,6 +166,7 @@
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerType;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLogAggregatorState;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
Expand Down Expand Up @@ -446,6 +448,11 @@ private void recover() throws IOException, URISyntaxException {
}
}

RecoveredLogAggregatorState logAggregatorState = stateStore.loadLogAggregatorState();
for (ContainerId containerId: logAggregatorState.getLogAggregators()) {
recoverLogAggregator(containerId);
}

// Recovery AMRMProxy state after apps and containers are recovered
if (this.amrmProxyEnabled) {
this.getAMRMProxyService().recover();
Expand Down Expand Up @@ -595,6 +602,11 @@ private void waitForRecoveredContainers() throws InterruptedException {
}
}

private void recoverLogAggregator(ContainerId containerId) {
LOG.info("Recovering log aggregator for " + containerId);
dispatcher.getEventHandler().handle(new LogHandlerContainerRecoveredEvent(containerId));
}

protected LogHandler createLogHandler(Configuration conf, Context context,
DeletionService deletionService) {
if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;

public interface AppLogAggregator extends Runnable {

void startContainerLogAggregation(ContainerLogContext logContext);

void recoverContainerLogAggregation(ContainerId containerId);

void abortLogAggregation();

void finishLogAggregation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,11 @@ private void uploadLogsForContainers(boolean appFinished)
// remove it from containerLogAggregators.
if (finishedContainers.contains(container)) {
containerLogAggregators.remove(container);
try {
context.getNMStateStore().removeLogAggregator(container);
} catch (IOException e) {
LOG.error("Unable to remove log aggregator {} from store.", container, e);
}
}
}

Expand Down Expand Up @@ -616,10 +621,20 @@ public void startContainerLogAggregation(ContainerLogContext logContext) {
if (shouldUploadLogs(logContext)) {
LOG.info("Considering container " + logContext.getContainerId()
+ " for log-aggregation");
try {
context.getNMStateStore().storeLogAggregator(logContext.getContainerId());
} catch (IOException e) {
LOG.error("Unable to add log aggregator {} to store.", logContext.getContainerId(), e);
}
this.pendingContainers.add(logContext.getContainerId());
}
}

@Override
public void recoverContainerLogAggregation(ContainerId containerId) {
this.pendingContainers.add(containerId);
}

@Override
public synchronized void finishLogAggregation() {
LOG.info("Application just finished : " + this.applicationId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerRecoveredEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;


Expand Down Expand Up @@ -336,6 +337,22 @@ private void stopContainer(ContainerId containerId,
new ContainerLogContext(containerId, containerType, exitCode));
}

private void recoverContainer(ContainerId containerId) {
AppLogAggregator aggregator = this.appLogAggregators.get(
containerId.getApplicationAttemptId().getApplicationId());
if (aggregator == null) {
LOG.warn("Log aggregation is not initialized for " + containerId
+ " during recovery, removing from store.");
try {
context.getNMStateStore().removeLogAggregator(containerId);
} catch (IOException e) {
LOG.error("Unable to remove log aggregator {} from store.", containerId, e);
}
return;
}
aggregator.recoverContainerLogAggregation(containerId);
}

@SuppressWarnings("unchecked")
private void stopApp(ApplicationId appId) {

Expand Down Expand Up @@ -381,6 +398,11 @@ public void handle(LogHandlerEvent event) {
(LogHandlerAppFinishedEvent) event;
stopApp(appFinishedEvent.getApplicationId());
break;
case CONTAINER_RECOVERED:
LogHandlerContainerRecoveredEvent containerRecoveredEvent =
(LogHandlerContainerRecoveredEvent) event;
recoverContainer(containerRecoveredEvent.getContainerId());
break;
case LOG_AGG_TOKEN_UPDATE:
checkAndEnableAppAggregators();
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;

import org.apache.hadoop.yarn.api.records.ContainerId;

public class LogHandlerContainerRecoveredEvent extends LogHandlerEvent {

private final ContainerId containerId;

public LogHandlerContainerRecoveredEvent(ContainerId containerId) {
super(LogHandlerEventType.CONTAINER_RECOVERED);
this.containerId = containerId;
}

public ContainerId getContainerId() {
return this.containerId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@
public enum LogHandlerEventType {
APPLICATION_STARTED,
CONTAINER_FINISHED,
APPLICATION_FINISHED, LOG_AGG_TOKEN_UPDATE
APPLICATION_FINISHED,
CONTAINER_RECOVERED,
LOG_AGG_TOKEN_UPDATE
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
CONTAINER_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX;

private static final String LOG_DELETER_KEY_PREFIX = "LogDeleters/";
private static final String LOG_AGGREGATOR_KEY_PREFIX = "LogAggregators/";

private static final String AMRMPROXY_KEY_PREFIX = "AMRMProxy/";

Expand Down Expand Up @@ -1410,6 +1411,65 @@ public void removeLogDeleter(ApplicationId appId) throws IOException {
}
}

@Override
public RecoveredLogAggregatorState loadLogAggregatorState() throws IOException {
RecoveredLogAggregatorState state = new RecoveredLogAggregatorState();
state.logAggregators = new ArrayList<ContainerId>();
LeveldbIterator iter = null;
try {
iter = new LeveldbIterator(db);
iter.seek(bytes(LOG_AGGREGATOR_KEY_PREFIX));
final int logAggregatorKeyPrefixLength = LOG_AGGREGATOR_KEY_PREFIX.length();
while (iter.hasNext()) {
Entry<byte[], byte[]> entry = iter.next();
String fullKey = asString(entry.getKey());
if (!fullKey.startsWith(LOG_AGGREGATOR_KEY_PREFIX)) {
break;
}

String containerIdStr = fullKey.substring(logAggregatorKeyPrefixLength);
ContainerId containerId = null;
try {
containerId = ContainerId.fromString(containerIdStr);
} catch (IllegalArgumentException e) {
LOG.warn("Skipping unknown log aggregator key " + fullKey);
continue;
}
state.logAggregators.add(containerId);
}
} catch (DBException e) {
throw new IOException(e);
} finally {
if (iter != null) {
iter.close();
}
}
return state;
}

@Override
public void storeLogAggregator(ContainerId containerId)
throws IOException {
String key = getLogAggregatorKey(containerId);
try {
db.put(bytes(key), new byte[0]);
} catch (DBException e) {
markStoreUnHealthy(e);
throw new IOException(e);
}
}

@Override
public void removeLogAggregator(ContainerId containerId) throws IOException {
String key = getLogAggregatorKey(containerId);
try {
db.delete(bytes(key));
} catch (DBException e) {
markStoreUnHealthy(e);
throw new IOException(e);
}
}

@Override
public void storeAssignedResources(Container container,
String resourceType, List<Serializable> assignedResources)
Expand Down Expand Up @@ -1490,6 +1550,10 @@ private String getLogDeleterKey(ApplicationId appId) {
return LOG_DELETER_KEY_PREFIX + appId;
}

private String getLogAggregatorKey(ContainerId containerId) {
return LOG_AGGREGATOR_KEY_PREFIX + containerId;
}

@Override
public RecoveredAMRMProxyState loadAMRMProxyState() throws IOException {
RecoveredAMRMProxyState result = new RecoveredAMRMProxyState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,22 @@ public void storeLogDeleter(ApplicationId appId, LogDeleterProto proto)
public void removeLogDeleter(ApplicationId appId) throws IOException {
}

@Override
public RecoveredLogAggregatorState loadLogAggregatorState() throws IOException {
throw new UnsupportedOperationException(
"Recovery not supported by this state store");
}

@Override
public void storeLogAggregator(ContainerId containerId)
throws IOException {
}

@Override
public void removeLogAggregator(ContainerId containerId)
throws IOException {
}

@Override
public RecoveredAMRMProxyState loadAMRMProxyState() throws IOException {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,14 @@ public Map<ApplicationId, LogDeleterProto> getLogDeleterMap() {
}
}

public static class RecoveredLogAggregatorState {
List<ContainerId> logAggregators;

public List<ContainerId> getLogAggregators() {
return logAggregators;
}
}

/**
* Recovered states for AMRMProxy.
*/
Expand Down Expand Up @@ -722,6 +730,30 @@ public abstract void storeLogDeleter(ApplicationId appId,
public abstract void removeLogDeleter(ApplicationId appId)
throws IOException;

/**
* Load the state of the log aggregators
* @return recovered log aggregator state
* @throws IOException if fails
*/
public abstract RecoveredLogAggregatorState loadLogAggregatorState()
throws IOException;

/**
* Store the state of a log aggregator
* @param containerId the container ID for the log aggregator
* @throws IOException if fails
*/
public abstract void storeLogAggregator(ContainerId containerId)
throws IOException;

/**
* Remove the state of a log aggregator
* @param containerId the container ID for the log aggregator
* @throws IOException if fails
*/
public abstract void removeLogAggregator(ContainerId containerId)
throws IOException;

/**
* Load the state of AMRMProxy.
* @return recovered state of AMRMProxy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -965,18 +965,6 @@ public void testRecentlyFinishedContainers() throws Exception {

nodeStatusUpdater.addCompletedContainer(cId);
assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId));

// verify container remains even after expiration if app
// is still active
nm.getNMContext().getContainers().remove(cId);
Thread.sleep(10);
nodeStatusUpdater.removeVeryOldStoppedContainersFromCache();
assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId));

// complete the application and verify container is removed
nm.getNMContext().getApplications().remove(appId);
nodeStatusUpdater.removeVeryOldStoppedContainersFromCache();
assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId));
}

@Test
Expand Down
Loading