Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public ResultCodes getResult() {
* Error codes to make it easy to decode these exceptions.
*/
public enum ResultCodes {
SUCCEESS,
OK,
FAILED_TO_LOAD_NODEPOOL,
FAILED_TO_FIND_NODE_IN_POOL,
FAILED_TO_FIND_HEALTHY_NODES,
Expand All @@ -120,6 +120,8 @@ public enum ResultCodes {
NO_SUCH_DATANODE,
NO_REPLICA_FOUND,
FAILED_TO_FIND_ACTIVE_PIPELINE,
FAILED_TO_INIT_CONTAINER_PLACEMENT_POLICY
FAILED_TO_INIT_CONTAINER_PLACEMENT_POLICY,
FAILED_TO_ALLOCATE_ENOUGH_BLOCKS,
INTERNAL_ERROR
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.exceptions;
// Exceptions thrown by SCM.
/**
Exception objects for the SCM Server.
*/
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.common.BlockGroup;
Expand All @@ -49,6 +51,8 @@
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;

import static org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Status.OK;

/**
* This class is the client-side translator to translate the requests made on
* the {@link ScmBlockLocationProtocol} interface to the RPC server
Expand Down Expand Up @@ -85,6 +89,32 @@ private SCMBlockLocationRequest.Builder createSCMBlockRequest(Type cmdType) {
.setTraceID(TracingUtil.exportCurrentSpan());
}

/**
* Submits client request to SCM server.
* @param req client request
* @return response from SCM
* @throws IOException thrown if any Protobuf service exception occurs
*/
private SCMBlockLocationResponse submitRequest(
SCMBlockLocationRequest req) throws IOException {
try {
SCMBlockLocationResponse response =
rpcProxy.send(NULL_RPC_CONTROLLER, req);
return response;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}

private SCMBlockLocationResponse handleError(SCMBlockLocationResponse resp)
throws SCMException {
if (resp.getStatus() != OK) {
throw new SCMException(resp.getMessage(),
SCMException.ResultCodes.values()[resp.getStatus().ordinal()]);
}
return resp;
}

/**
* Asks SCM where a block should be allocated. SCM responds with the
* set of datanodes that should be used creating this block.
Expand Down Expand Up @@ -117,19 +147,10 @@ public List<AllocatedBlock> allocateBlock(long size, int num,
.setAllocateScmBlockRequest(request)
.build();

final AllocateScmBlockResponseProto response;
final SCMBlockLocationResponse wrappedResponse;
try {
wrappedResponse = rpcProxy.send(NULL_RPC_CONTROLLER, wrapper);
response = wrappedResponse.getAllocateScmBlockResponse();
} catch (ServiceException e) {
throw transformServiceException(e);
}
if (response.getErrorCode() !=
AllocateScmBlockResponseProto.Error.success) {
throw new IOException(response.hasErrorMessage() ?
response.getErrorMessage() : "Allocate block failed.");
}
final SCMBlockLocationResponse wrappedResponse =
handleError(submitRequest(wrapper));
final AllocateScmBlockResponseProto response =
wrappedResponse.getAllocateScmBlockResponse();

List<AllocatedBlock> blocks = new ArrayList<>(response.getBlocksCount());
for (AllocateBlockResponse resp : response.getBlocksList()) {
Expand Down Expand Up @@ -166,14 +187,11 @@ public List<DeleteBlockGroupResult> deleteKeyBlocks(
.setDeleteScmKeyBlocksRequest(request)
.build();

final DeleteScmKeyBlocksResponseProto resp;
final SCMBlockLocationResponse wrappedResponse;
try {
wrappedResponse = rpcProxy.send(NULL_RPC_CONTROLLER, wrapper);
resp = wrappedResponse.getDeleteScmKeyBlocksResponse();
} catch (ServiceException e) {
throw transformServiceException(e);
}
final SCMBlockLocationResponse wrappedResponse =
handleError(submitRequest(wrapper));
final DeleteScmKeyBlocksResponseProto resp =
wrappedResponse.getDeleteScmKeyBlocksResponse();

List<DeleteBlockGroupResult> results =
new ArrayList<>(resp.getResultsCount());
results.addAll(resp.getResultsList().stream().map(
Expand All @@ -184,30 +202,6 @@ public List<DeleteBlockGroupResult> deleteKeyBlocks(
return results;
}

private IOException transformServiceException(
ServiceException se) throws IOException {
//TODO SCM has no perfect way to return with business exceptions. All
//the exceptions will be mapped to ServiceException.
//ServiceException is handled in a special way in hadoop rpc: the message
//contains the whole stack trace which is not required for the business
//exception. As of now I remove the stack trace (use first line only).
//Long term we need a proper way of the exception propagation.
Throwable cause = se.getCause();
if (cause == null) {
return new IOException(
new ServiceException(useFirstLine(se.getMessage()), se.getCause()));
}
return new IOException(useFirstLine(cause.getMessage()), cause.getCause());
}

private String useFirstLine(String message) {
if (message == null) {
return null;
} else {
return message.split("\n")[0];
}
}

/**
* Gets the cluster Id and Scm Id from SCM.
* @return ScmInfo
Expand All @@ -224,13 +218,9 @@ public ScmInfo getScmInfo() throws IOException {
.setGetScmInfoRequest(request)
.build();

final SCMBlockLocationResponse wrappedResponse;
try {
wrappedResponse = rpcProxy.send(NULL_RPC_CONTROLLER, wrapper);
resp = wrappedResponse.getGetScmInfoResponse();
} catch (ServiceException e) {
throw transformServiceException(e);
}
final SCMBlockLocationResponse wrappedResponse =
handleError(submitRequest(wrapper));
resp = wrappedResponse.getGetScmInfoResponse();
ScmInfo.Builder builder = new ScmInfo.Builder()
.setClusterId(resp.getClusterId())
.setScmId(resp.getScmId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
Expand Down Expand Up @@ -78,7 +79,6 @@ public ScmBlockLocationProtocolServerSideTranslatorPB(
this.impl = impl;
}


private SCMBlockLocationResponse.Builder createSCMBlockResponse(
ScmBlockLocationProtocolProtos.Type cmdType,
String traceID) {
Expand All @@ -95,100 +95,99 @@ public SCMBlockLocationResponse send(RpcController controller,
SCMBlockLocationResponse.Builder response = createSCMBlockResponse(
request.getCmdType(),
traceId);
response.setSuccess(true);
response.setStatus(Status.OK);

switch (request.getCmdType()) {
case AllocateScmBlock:
response.setAllocateScmBlockResponse(
allocateScmBlock(traceId, request.getAllocateScmBlockRequest()));
break;
case DeleteScmKeyBlocks:
response.setDeleteScmKeyBlocksResponse(
deleteScmKeyBlocks(traceId, request.getDeleteScmKeyBlocksRequest()));
break;
case GetScmInfo:
response.setGetScmInfoResponse(
getScmInfo(traceId, request.getGetScmInfoRequest()));
break;
default:
throw new ServiceException("Unknown Operation");
try(Scope scope = TracingUtil
.importAndCreateScope("ScmBlockLocationProtocol."+request.getCmdType(),
request.getTraceID())) {
switch (request.getCmdType()) {
case AllocateScmBlock:
response.setAllocateScmBlockResponse(
allocateScmBlock(request.getAllocateScmBlockRequest()));
break;
case DeleteScmKeyBlocks:
response.setDeleteScmKeyBlocksResponse(
deleteScmKeyBlocks(request.getDeleteScmKeyBlocksRequest()));
break;
case GetScmInfo:
response.setGetScmInfoResponse(
getScmInfo(request.getGetScmInfoRequest()));
break;
default:
// Should never happen
throw new IOException("Unknown Operation "+request.getCmdType()+
" in ScmBlockLocationProtocol");
}
} catch (IOException e) {
response.setSuccess(false);
response.setStatus(exceptionToResponseStatus(e));
if (e.getMessage() != null) {
response.setMessage(e.getMessage());
}
}
response.setSuccess(true)
.setStatus(Status.OK);
return response.build();
}

public AllocateScmBlockResponseProto allocateScmBlock(
String traceId, AllocateScmBlockRequestProto request)
throws ServiceException {
try(Scope scope = TracingUtil
.importAndCreateScope("ScmBlockLocationProtocol.allocateBlock",
traceId)) {
List<AllocatedBlock> allocatedBlocks =
impl.allocateBlock(request.getSize(),
request.getNumBlocks(), request.getType(),
request.getFactor(), request.getOwner(),
ExcludeList.getFromProtoBuf(request.getExcludeList()));

AllocateScmBlockResponseProto.Builder builder =
AllocateScmBlockResponseProto.newBuilder();

if (allocatedBlocks.size() < request.getNumBlocks()) {
return builder
.setErrorCode(AllocateScmBlockResponseProto.Error.unknownFailure)
.build();
}

for (AllocatedBlock block : allocatedBlocks) {
builder.addBlocks(AllocateBlockResponse.newBuilder()
.setContainerBlockID(block.getBlockID().getProtobuf())
.setPipeline(block.getPipeline().getProtobufMessage()));
}
private Status exceptionToResponseStatus(IOException ex) {
if (ex instanceof SCMException) {
return Status.values()[((SCMException) ex).getResult().ordinal()];
} else {
return Status.INTERNAL_ERROR;
}
}

return builder
.setErrorCode(AllocateScmBlockResponseProto.Error.success)
.build();
} catch (IOException e) {
throw new ServiceException(e);
public AllocateScmBlockResponseProto allocateScmBlock(
AllocateScmBlockRequestProto request)
throws IOException {
List<AllocatedBlock> allocatedBlocks =
impl.allocateBlock(request.getSize(),
request.getNumBlocks(), request.getType(),
request.getFactor(), request.getOwner(),
ExcludeList.getFromProtoBuf(request.getExcludeList()));

AllocateScmBlockResponseProto.Builder builder =
AllocateScmBlockResponseProto.newBuilder();

if (allocatedBlocks.size() < request.getNumBlocks()) {
throw new SCMException("Allocated " + allocatedBlocks.size() +
" blocks. Requested " + request.getNumBlocks() + " blocks",
SCMException.ResultCodes.FAILED_TO_ALLOCATE_ENOUGH_BLOCKS);
}
for (AllocatedBlock block : allocatedBlocks) {
builder.addBlocks(AllocateBlockResponse.newBuilder()
.setContainerBlockID(block.getBlockID().getProtobuf())
.setPipeline(block.getPipeline().getProtobufMessage()));
}

return builder.build();
}

public DeleteScmKeyBlocksResponseProto deleteScmKeyBlocks(
String traceId, DeleteScmKeyBlocksRequestProto req)
throws ServiceException {
DeleteScmKeyBlocksRequestProto req)
throws IOException {
DeleteScmKeyBlocksResponseProto.Builder resp =
DeleteScmKeyBlocksResponseProto.newBuilder();
try(Scope scope = TracingUtil
.importAndCreateScope("ScmBlockLocationProtocol.deleteKeyBlocks",
traceId)) {
List<BlockGroup> infoList = req.getKeyBlocksList().stream()
.map(BlockGroup::getFromProto).collect(Collectors.toList());
final List<DeleteBlockGroupResult> results =
impl.deleteKeyBlocks(infoList);
for (DeleteBlockGroupResult result: results) {
DeleteKeyBlocksResultProto.Builder deleteResult =
DeleteKeyBlocksResultProto
.newBuilder()
.setObjectKey(result.getObjectKey())
.addAllBlockResults(result.getBlockResultProtoList());
resp.addResults(deleteResult.build());
}
} catch (IOException ex) {
throw new ServiceException(ex);

List<BlockGroup> infoList = req.getKeyBlocksList().stream()
.map(BlockGroup::getFromProto).collect(Collectors.toList());
final List<DeleteBlockGroupResult> results =
impl.deleteKeyBlocks(infoList);
for (DeleteBlockGroupResult result: results) {
DeleteKeyBlocksResultProto.Builder deleteResult =
DeleteKeyBlocksResultProto
.newBuilder()
.setObjectKey(result.getObjectKey())
.addAllBlockResults(result.getBlockResultProtoList());
resp.addResults(deleteResult.build());
}
return resp.build();
}

public HddsProtos.GetScmInfoResponseProto getScmInfo(
String traceId, HddsProtos.GetScmInfoRequestProto req)
throws ServiceException {
ScmInfo scmInfo;
try(Scope scope = TracingUtil
.importAndCreateScope("ScmBlockLocationProtocol.getInfo",
traceId)) {
scmInfo = impl.getScmInfo();
} catch (IOException ex) {
throw new ServiceException(ex);
}
HddsProtos.GetScmInfoRequestProto req)
throws IOException {
ScmInfo scmInfo = impl.getScmInfo();
return HddsProtos.GetScmInfoResponseProto.newBuilder()
.setClusterId(scmInfo.getClusterId())
.setScmId(scmInfo.getScmId())
Expand Down
Loading