Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/main/java/com/uber/rss/clients/DataBlockSyncWriteClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,25 @@ public void startUpload(ShuffleMapTaskAttemptId shuffleMapTaskAttemptId, int num
writeControlMessageNotWaitResponseStatus(startUploadMessage);
}

// TODO do not need mapId/taskAttemptId for StartUploadMessage
public void startUpload(ShuffleMapTaskAttemptId shuffleMapTaskAttemptId, int stageId, int numMaps, int numPartitions, ShuffleWriteConfig shuffleWriteConfig) {
logger.debug(String.format("Starting upload %s, %s", shuffleMapTaskAttemptId, connectionInfo));

startUploadShuffleByteSnapshot = totalWriteBytes;

StartUploadMessage startUploadMessage = new StartUploadMessage(
shuffleMapTaskAttemptId.getShuffleId(),
shuffleMapTaskAttemptId.getMapId(),
shuffleMapTaskAttemptId.getTaskAttemptId(),
numMaps,
numPartitions,
"",
shuffleWriteConfig.getNumSplits(),
stageId);

writeControlMessageNotWaitResponseStatus(startUploadMessage);
}

public void writeData(int partitionId, long taskAttemptId, ByteBuf data) {
final int headerByteCount = Integer.BYTES + Long.BYTES + Integer.BYTES;
final int dataByteCount = data.readableBytes();
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/uber/rss/clients/NotifyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.uber.rss.exceptions.RssInvalidStateException;
import com.uber.rss.messages.FinishApplicationAttemptRequestMessage;
import com.uber.rss.messages.FinishApplicationJobRequestMessage;
import com.uber.rss.messages.FinishApplicationStageRequestMessage;
import com.uber.rss.messages.MessageConstants;
import com.uber.rss.messages.ConnectNotifyRequest;
import com.uber.rss.messages.ConnectNotifyResponse;
Expand Down Expand Up @@ -80,6 +81,12 @@ public void finishApplicationAttempt(String appId, String appAttempt) {
writeControlMessageAndWaitResponseStatus(request);
}

public void finishApplicationStage(String appId, String appAttempt, int stageId) {
FinishApplicationStageRequestMessage request = new FinishApplicationStageRequestMessage(appId, appAttempt, stageId);

writeControlMessageAndWaitResponseStatus(request);
}

@Override
public void close() {
super.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public ConnectUploadResponse connect() {

public void startUpload(AppTaskAttemptId appTaskAttemptId, int numMaps, int numPartitions) {
shuffleMapTaskAttemptId = appTaskAttemptId.getShuffleMapTaskAttemptId();
dataBlockSyncWriteClient.startUpload(shuffleMapTaskAttemptId, numMaps, numPartitions, shuffleWriteConfig);
dataBlockSyncWriteClient.startUpload(shuffleMapTaskAttemptId, appTaskAttemptId.getStageId(), numMaps, numPartitions, shuffleWriteConfig);
}

@Override
Expand Down
18 changes: 16 additions & 2 deletions src/main/java/com/uber/rss/common/AppTaskAttemptId.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ public class AppTaskAttemptId {
private final int mapId;
private final long taskAttemptId;

// if not associated with startUpload pipeline, value will be -1
private final int stageId;

public AppTaskAttemptId(AppShuffleId appShuffleId, int mapId, long taskAttemptId) {
this(appShuffleId.getAppId(), appShuffleId.getAppAttempt(), appShuffleId.getShuffleId(), mapId, taskAttemptId);
}
Expand All @@ -35,11 +38,16 @@ public AppTaskAttemptId(AppMapId appMapId, long taskAttemptId) {
}

public AppTaskAttemptId(String appId, String appAttempt, int shuffleId, int mapId, long taskAttemptId) {
this(appId, appAttempt, shuffleId, mapId, taskAttemptId, -1);
}

public AppTaskAttemptId(String appId, String appAttempt, int shuffleId, int mapId, long taskAttemptId, int stageId) {
this.appId = appId;
this.appAttempt = appAttempt;
this.shuffleId = shuffleId;
this.mapId = mapId;
this.taskAttemptId = taskAttemptId;
this.stageId = stageId;
}

public String getAppId() {
Expand Down Expand Up @@ -74,6 +82,10 @@ public ShuffleMapTaskAttemptId getShuffleMapTaskAttemptId() {
return new ShuffleMapTaskAttemptId(shuffleId, mapId, taskAttemptId);
}

public int getStageId() {
return stageId;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -83,13 +95,14 @@ public boolean equals(Object o) {
mapId == that.mapId &&
taskAttemptId == that.taskAttemptId &&
Objects.equals(appId, that.appId) &&
Objects.equals(appAttempt, that.appAttempt);
Objects.equals(appAttempt, that.appAttempt) &&
stageId == that.stageId;
}

@Override
public int hashCode() {

return Objects.hash(appId, appAttempt, shuffleId, mapId, taskAttemptId);
return Objects.hash(appId, appAttempt, shuffleId, mapId, taskAttemptId, stageId);
}

@Override
Expand All @@ -100,6 +113,7 @@ public String toString() {
", shuffleId=" + shuffleId +
", mapId=" + mapId +
", taskAttemptId=" + taskAttemptId +
", stageId=" + stageId +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.uber.rss.messages.ConnectUploadResponse;
import com.uber.rss.messages.FinishApplicationAttemptRequestMessage;
import com.uber.rss.messages.FinishApplicationJobRequestMessage;
import com.uber.rss.messages.FinishApplicationStageRequestMessage;
import com.uber.rss.messages.FinishUploadMessage;
import com.uber.rss.messages.GetBusyStatusRequest;
import com.uber.rss.messages.GetBusyStatusResponse;
Expand Down Expand Up @@ -359,6 +360,8 @@ private Object getControlMessage(ChannelHandlerContext ctx,
return FinishApplicationJobRequestMessage.deserialize(in);
case MessageConstants.MESSAGE_FinishApplicationAttemptRequest:
return FinishApplicationAttemptRequestMessage.deserialize(in);
case MessageConstants.MESSAGE_FinishApplicationStageRequest:
return FinishApplicationStageRequestMessage.deserialize(in);
case MessageConstants.MESSAGE_ConnectRegistryRequest:
return ConnectRegistryRequest.deserialize(in);
case MessageConstants.MESSAGE_ConnectRegistryResponse:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private void addVersionDecoder(ChannelHandlerContext ctx, byte type, byte versio
} else if (type == MessageConstants.NOTIFY_UPLINK_MAGIC_BYTE &&
version == MessageConstants.NOTIFY_UPLINK_VERSION_3) {
newDecoder = new StreamServerMessageDecoder(null);
NotifyChannelInboundHandler channelInboundHandler = new NotifyChannelInboundHandler(serverId);
NotifyChannelInboundHandler channelInboundHandler = new NotifyChannelInboundHandler(serverId, executor);
channelInboundHandler.processChannelActive(ctx);
newHandler = channelInboundHandler;
} else if (type == MessageConstants.REGISTRY_UPLINK_MAGIC_BYTE &&
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2020 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.uber.rss.exceptions;

/***
* This exception is thrown where there is an error with a shuffle stage in the shuffle server.
* (e.g. the stageId -> shuffleId lookup is passed a stage that doesn't have a mapping)
*/
public class RssInvalidStageException extends RssException {
public RssInvalidStageException(String message) {
super(message);
}
}
22 changes: 21 additions & 1 deletion src/main/java/com/uber/rss/execution/ShuffleExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.uber.m3.tally.Gauge;
import com.uber.rss.clients.ShuffleWriteConfig;
import com.uber.rss.common.*;
import com.uber.rss.exceptions.RssInvalidStageException;
import com.uber.rss.exceptions.RssShuffleCorruptedException;
import com.uber.rss.exceptions.RssShuffleStageNotStartedException;
import com.uber.rss.exceptions.RssTooMuchDataException;
Expand Down Expand Up @@ -92,6 +93,11 @@ public class ShuffleExecutor {
private final ConcurrentHashMap<AppShuffleId, ExecutorShuffleStageState> stageStates
= new ConcurrentHashMap<>();

// TODO should rename so its clear its only for writing stages?
// This field stores the shuffleId for any associated WRITING stage
private final ConcurrentHashMap<Integer, AppShuffleId> stageIdToShuffleIdMap
= new ConcurrentHashMap<>();

private final StateStore stateStore;

private final ShuffleStorage storage;
Expand Down Expand Up @@ -449,6 +455,20 @@ public void checkAppMaxWriteBytes(String appId) {
checkAppMaxWriteBytes(appId, appWriteBytes);
}

public void registerShuffleId(int stageId, AppShuffleId appShuffleId) {
stageIdToShuffleIdMap.putIfAbsent(stageId, appShuffleId);
}

public AppShuffleId getShuffleId(int stageId) {
AppShuffleId shuffleId = stageIdToShuffleIdMap.get(stageId);
if (shuffleId != null) {
return shuffleId;
}
String error = String.format("unable to get a shuffleId for stage= %s could be because this stage doesn't write", stageId);
logger.warn(error);
throw new RssInvalidStageException(error);
}

private void checkAppMaxWriteBytes(AppShuffleId appShuffleId, long currentAppWriteBytes) {
if (currentAppWriteBytes > appMaxWriteBytes) {
numTruncatedApplications.inc(1);
Expand Down Expand Up @@ -485,7 +505,7 @@ private ExecutorAppState getAppState(String appId) {
}
}

private ExecutorShuffleStageState getStageState(AppShuffleId appShuffleId) {
public ExecutorShuffleStageState getStageState(AppShuffleId appShuffleId) {
ExecutorShuffleStageState state = stageStates.get(appShuffleId);
if (state != null) {
return state;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package com.uber.rss.handlers;

import com.uber.rss.exceptions.RssInvalidDataException;
import com.uber.rss.execution.ShuffleExecutor;
import com.uber.rss.messages.FinishApplicationAttemptRequestMessage;
import com.uber.rss.messages.FinishApplicationJobRequestMessage;
import com.uber.rss.messages.ConnectNotifyRequest;
import com.uber.rss.messages.FinishApplicationStageRequestMessage;
import com.uber.rss.metrics.M3Stats;
import com.uber.rss.util.NettyUtils;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -33,8 +35,8 @@ public class NotifyChannelInboundHandler extends ChannelInboundHandlerAdapter {

private final NotifyServerHandler serverHandler;

public NotifyChannelInboundHandler(String serverId) {
serverHandler = new NotifyServerHandler(serverId);
public NotifyChannelInboundHandler(String serverId, ShuffleExecutor executor) {
serverHandler = new NotifyServerHandler(serverId, executor);
}

@Override
Expand All @@ -61,6 +63,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
serverHandler.handleMessage(ctx, (FinishApplicationJobRequestMessage)msg);
} else if (msg instanceof FinishApplicationAttemptRequestMessage) {
serverHandler.handleMessage(ctx, (FinishApplicationAttemptRequestMessage)msg);
} else if (msg instanceof FinishApplicationStageRequestMessage) {
serverHandler.handleMessage(ctx, (FinishApplicationStageRequestMessage) msg);
} else {
throw new RssInvalidDataException(String.format("Unsupported message: %s, %s", msg, connectionInfo));
}
Expand Down
44 changes: 43 additions & 1 deletion src/main/java/com/uber/rss/handlers/NotifyServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@

package com.uber.rss.handlers;

import com.uber.rss.common.AppShuffleId;
import com.uber.rss.exceptions.RssException;
import com.uber.rss.exceptions.RssShuffleStageNotStartedException;
import com.uber.rss.execution.ShuffleExecutor;
import com.uber.rss.messages.FinishApplicationJobRequestMessage;
import com.uber.rss.messages.FinishApplicationAttemptRequestMessage;
import com.uber.rss.messages.FinishApplicationStageRequestMessage;
import com.uber.rss.messages.MessageConstants;
import com.uber.rss.messages.ConnectNotifyRequest;
import com.uber.rss.messages.ConnectNotifyResponse;
import com.uber.rss.messages.ShuffleStageStatus;
import com.uber.rss.metrics.ApplicationJobStatusMetrics;
import com.uber.rss.metrics.ApplicationMetrics;
import com.uber.rss.metrics.NotifyServerMetricsContainer;
Expand All @@ -37,11 +43,13 @@ public class NotifyServerHandler {
private static final NotifyServerMetricsContainer metricsContainer = new NotifyServerMetricsContainer();

private final String serverId;
private final ShuffleExecutor executor;

private String user;

public NotifyServerHandler(String serverId) {
public NotifyServerHandler(String serverId, ShuffleExecutor executor) {
this.serverId = serverId;
this.executor = executor;
}

public void handleMessage(ChannelHandlerContext ctx, ConnectNotifyRequest msg) {
Expand Down Expand Up @@ -73,6 +81,40 @@ public void handleMessage(ChannelHandlerContext ctx, FinishApplicationAttemptReq
metrics.getNumApplications().inc(1);
}

public void handleMessage(ChannelHandlerContext ctx, FinishApplicationStageRequestMessage msg) {
writeAndFlushByte(ctx, MessageConstants.RESPONSE_STATUS_OK);

logger.info("finishApplicationStage, appId: {}, appAttempt: {}, stageId: {}",
msg.getAppId(),
msg.getAppAttempt(),
msg.getStageId());

// TODO investigate further whether stageId->shuffleId is 1-1. initial investigations suggest so but would
// be worth knowing 100%
AppShuffleId shuffleId;
try {
shuffleId = executor.getShuffleId(msg.getStageId());
} catch (RssException e) {
logger.debug("Shuffle Stage {} does not do any writing", msg.getStageId(), e);
return;
}

ShuffleStageStatus status = executor.getShuffleStageStatus(shuffleId);
if (status.getFileStatus() == ShuffleStageStatus.FILE_STATUS_SHUFFLE_STAGE_NOT_STARTED) {
// This case "should" never occur unless thread handling uploadMessage got stuck and this ran first
String error = String.format("Shuffle stage was not started for stage=%s shuffle=%s unable to close shuffle files", msg.getStageId(), shuffleId);
logger.error(error);
throw new RssShuffleStageNotStartedException(error);
}

// TODO investigate whether its possible for next stage to start before this handler is done running
// in current rss implementation, this would be a problem as download requests start before the shuffle
// files had made it to a storage like s3 which is slower than local or hdfs
executor.getStageState(shuffleId).closeWriters();

logger.info("writing is complete for stage= {}, shuffleId= {} ", msg.getStageId(), shuffleId);
}

private void writeAndFlushByte(ChannelHandlerContext ctx, byte value) {
ByteBuf buf = ctx.alloc().buffer(1);
buf.writeByte(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.uber.m3.tally.Gauge;
import com.uber.rss.RssBuildInfo;
import com.uber.rss.clients.ShuffleWriteConfig;
import com.uber.rss.common.AppShuffleId;
import com.uber.rss.common.AppTaskAttemptId;
import com.uber.rss.exceptions.RssInvalidDataException;
import com.uber.rss.exceptions.RssMaxConnectionsException;
Expand Down Expand Up @@ -178,11 +179,14 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
appAttempt,
startUploadMessage.getShuffleId(),
startUploadMessage.getMapId(),
startUploadMessage.getAttemptId());
startUploadMessage.getAttemptId(),
startUploadMessage.getStageId());

ShuffleWriteConfig writeConfig = new ShuffleWriteConfig(startUploadMessage.getNumSplits());
uploadServerHandler.initializeAppTaskAttempt(appTaskAttemptId, startUploadMessage.getNumPartitions(),
writeConfig, ctx);
uploadServerHandler.registerStageId(startUploadMessage.getStageId(),
new AppShuffleId(appId, appAttempt, startUploadMessage.getShuffleId()));
} else if (msg instanceof FinishUploadMessage) {
logger.debug("FinishUploadMessage, {}, {}", msg, connectionInfo);
FinishUploadMessage finishUploadMessage = (FinishUploadMessage)msg;
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/com/uber/rss/handlers/UploadServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

import com.uber.rss.clients.ShuffleWriteConfig;
import com.uber.rss.common.AppMapId;
import com.uber.rss.common.AppShuffleId;
import com.uber.rss.common.AppTaskAttemptId;
import com.uber.rss.exceptions.RssInvalidDataException;
import com.uber.rss.exceptions.RssInvalidStageException;
import com.uber.rss.exceptions.RssInvalidStateException;
import com.uber.rss.exceptions.RssMaxConnectionsException;
import com.uber.rss.execution.ShuffleDataWrapper;
Expand Down Expand Up @@ -119,6 +121,17 @@ public void finishUpload(long taskAttemptId) {
finishUploadImpl(appTaskAttemptIdToFinishUpload);
}

public void registerStageId(int stageId, AppShuffleId appShuffleId) {
// stageId default is -1. this would only occur in cases if method not called for StartUploadMessage or error occurred
if (stageId != -1) {
executor.registerShuffleId(stageId, appShuffleId);
} else {
String error = String.format("registerStageId called not using StartUploadMessage or stageId never set for shuffle=%s", appShuffleId);
logger.error(error);
throw new RssInvalidStageException(error);
}
}

private void finishUploadImpl(AppTaskAttemptId appTaskAttemptIdToFinishUpload) {
lazyStartUpload(appTaskAttemptIdToFinishUpload);
executor.finishUpload(appTaskAttemptIdToFinishUpload.getAppShuffleId(),
Expand Down
Loading