diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index a535c9f3f2aa4..13d3eedec340a 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc; @@ -31,6 +30,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.storage.CheckedBiFunction; import org.apache.hadoop.hdds.security.exception.SCMSecurityException; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.tracing.GrpcClientInterceptor; @@ -62,7 +62,6 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; /** * A Client for the storageContainer protocol. @@ -83,7 +82,7 @@ public class XceiverClientGrpc extends XceiverClientSpi { * data nodes. * * @param pipeline - Pipeline that defines the machines. - * @param config -- Ozone Config + * @param config -- Ozone Config */ public XceiverClientGrpc(Pipeline pipeline, Configuration config) { super(); @@ -91,7 +90,7 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config) { Preconditions.checkNotNull(config); this.pipeline = pipeline; this.config = config; - this.secConfig = new SecurityConfig(config); + this.secConfig = new SecurityConfig(config); this.semaphore = new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config)); this.metrics = XceiverClientManager.getXceiverClientMetrics(); @@ -101,7 +100,7 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config) { /** * To be used when grpc token is not enabled. - * */ + */ @Override public void connect() throws Exception { // leader by default is the 1st datanode in the datanode list of pipleline @@ -112,7 +111,7 @@ public void connect() throws Exception { /** * Passed encoded token to GRPC header when security is enabled. - * */ + */ @Override public void connect(String encodedToken) throws Exception { // leader by default is the 1st datanode in the datanode list of pipleline @@ -132,11 +131,10 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken) } // Add credential context to the client call - String userName = UserGroupInformation.getCurrentUser() - .getShortUserName(); + String userName = UserGroupInformation.getCurrentUser().getShortUserName(); LOG.debug("Connecting to server Port : " + dn.getIpAddress()); - NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(dn - .getIpAddress(), port).usePlaintext() + NettyChannelBuilder channelBuilder = + NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext() .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE) .intercept(new ClientCredentialInterceptor(userName, encodedToken), new GrpcClientInterceptor()); @@ -149,8 +147,8 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken) if (trustCertCollectionFile != null) { sslContextBuilder.trustManager(trustCertCollectionFile); } - if (secConfig.isGrpcMutualTlsRequired() && clientCertChainFile != null && - privateKeyFile != null) { + if (secConfig.isGrpcMutualTlsRequired() && clientCertChainFile != null + && privateKeyFile != null) { sslContextBuilder.keyManager(clientCertChainFile, privateKeyFile); } @@ -216,49 +214,45 @@ public ContainerCommandResponseProto sendCommand( } @Override - public XceiverClientReply sendCommand( - ContainerCommandRequestProto request, List excludeDns) + public ContainerCommandResponseProto sendCommand( + ContainerCommandRequestProto request, List validators) throws IOException { - Preconditions.checkState(HddsUtils.isReadOnly(request)); - return sendCommandWithTraceIDAndRetry(request, excludeDns); + try { + XceiverClientReply reply; + reply = sendCommandWithTraceIDAndRetry(request, validators); + ContainerCommandResponseProto responseProto = reply.getResponse().get(); + return responseProto; + } catch (ExecutionException | InterruptedException e) { + throw new IOException("Failed to execute command " + request, e); + } } private XceiverClientReply sendCommandWithTraceIDAndRetry( - ContainerCommandRequestProto request, List excludeDns) + ContainerCommandRequestProto request, List validators) throws IOException { try (Scope scope = GlobalTracer.get() .buildSpan("XceiverClientGrpc." + request.getCmdType().name()) .startActive(true)) { ContainerCommandRequestProto finalPayload = ContainerCommandRequestProto.newBuilder(request) - .setTraceID(TracingUtil.exportCurrentSpan()) - .build(); - return sendCommandWithRetry(finalPayload, excludeDns); + .setTraceID(TracingUtil.exportCurrentSpan()).build(); + return sendCommandWithRetry(finalPayload, validators); } } private XceiverClientReply sendCommandWithRetry( - ContainerCommandRequestProto request, List excludeDns) + ContainerCommandRequestProto request, List validators) throws IOException { ContainerCommandResponseProto responseProto = null; + IOException ioException = null; // In case of an exception or an error, we will try to read from the // datanodes in the pipeline in a round robin fashion. // TODO: cache the correct leader info in here, so that any subsequent calls // should first go to leader - List dns = pipeline.getNodes(); - List healthyDns = - excludeDns != null ? dns.stream().filter(dnId -> { - for (DatanodeDetails excludeId : excludeDns) { - if (dnId.equals(excludeId)) { - return false; - } - } - return true; - }).collect(Collectors.toList()) : dns; XceiverClientReply reply = new XceiverClientReply(null); - for (DatanodeDetails dn : healthyDns) { + for (DatanodeDetails dn : pipeline.getNodes()) { try { LOG.debug("Executing command " + request + " on datanode " + dn); // In case the command gets retried on a 2nd datanode, @@ -266,17 +260,26 @@ private XceiverClientReply sendCommandWithRetry( // in case these don't exist for the specific datanode. reply.addDatanode(dn); responseProto = sendCommandAsync(request, dn).getResponse().get(); - if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) { - break; + if (validators != null && !validators.isEmpty()) { + for (CheckedBiFunction validator : validators) { + validator.apply(request, responseProto); + } } - } catch (ExecutionException | InterruptedException e) { + break; + } catch (ExecutionException | InterruptedException | IOException e) { LOG.debug("Failed to execute command " + request + " on datanode " + dn .getUuidString(), e); - if (Status.fromThrowable(e.getCause()).getCode() - == Status.UNAUTHENTICATED.getCode()) { - throw new SCMSecurityException("Failed to authenticate with " - + "GRPC XceiverServer with Ozone block token."); + if (!(e instanceof IOException)) { + if (Status.fromThrowable(e.getCause()).getCode() + == Status.UNAUTHENTICATED.getCode()) { + throw new SCMSecurityException("Failed to authenticate with " + + "GRPC XceiverServer with Ozone block token."); + } + ioException = new IOException(e); + } else { + ioException = (IOException) e; } + responseProto = null; } } @@ -284,9 +287,10 @@ private XceiverClientReply sendCommandWithRetry( reply.setResponse(CompletableFuture.completedFuture(responseProto)); return reply; } else { - throw new IOException( - "Failed to execute command " + request + " on the pipeline " - + pipeline.getId()); + Preconditions.checkNotNull(ioException); + LOG.error("Failed to execute command " + request + " on the pipeline " + + pipeline.getId()); + throw ioException; } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index bb4a5b05633d9..82fb1063d3bd8 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -21,12 +21,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; +import org.apache.hadoop.ozone.common.OzoneChecksumException; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.hdds.scm.XceiverClientManager; @@ -34,16 +33,17 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ReadChunkResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. + ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. + ContainerCommandRequestProto; import org.apache.hadoop.hdds.client.BlockID; - import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.ExecutionException; /** * An {@link InputStream} used by the REST service in combination with the @@ -74,7 +74,7 @@ public class BlockInputStream extends InputStream implements Seekable { private List buffers; private int bufferIndex; private long bufferPosition; - private final boolean verifyChecksum; + private boolean verifyChecksum; /** * Creates a new BlockInputStream. @@ -323,41 +323,8 @@ private boolean chunksRemaining() { private synchronized void readChunkFromContainer() throws IOException { // Read the chunk at chunkIndex final ChunkInfo chunkInfo = chunks.get(chunkIndex); - List excludeDns = null; ByteString byteString; - List dnList = getDatanodeList(); - while (true) { - List dnListFromReadChunkCall = new ArrayList<>(); - byteString = readChunk(chunkInfo, excludeDns, dnListFromReadChunkCall); - try { - if (byteString.size() != chunkInfo.getLen()) { - // Bytes read from chunk should be equal to chunk size. - throw new IOException(String - .format("Inconsistent read for chunk=%s len=%d bytesRead=%d", - chunkInfo.getChunkName(), chunkInfo.getLen(), - byteString.size())); - } - ChecksumData checksumData = - ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData()); - if (verifyChecksum) { - Checksum.verifyChecksum(byteString, checksumData); - } - break; - } catch (IOException ioe) { - // we will end up in this situation only if the checksum mismatch - // happens or the length of the chunk mismatches. - // In this case, read should be retried on a different replica. - // TODO: Inform SCM of a possible corrupt container replica here - if (excludeDns == null) { - excludeDns = new ArrayList<>(); - } - excludeDns.addAll(dnListFromReadChunkCall); - if (excludeDns.size() == dnList.size()) { - throw ioe; - } - } - } - + byteString = readChunk(chunkInfo); buffers = byteString.asReadOnlyByteBufferList(); bufferIndex = 0; chunkIndexOfCurrentBuffer = chunkIndex; @@ -372,28 +339,20 @@ private synchronized void readChunkFromContainer() throws IOException { * Send RPC call to get the chunk from the container. */ @VisibleForTesting - protected ByteString readChunk(final ChunkInfo chunkInfo, - List excludeDns, List dnListFromReply) + protected ByteString readChunk(final ChunkInfo chunkInfo) throws IOException { - XceiverClientReply reply; - ReadChunkResponseProto readChunkResponse = null; + ReadChunkResponseProto readChunkResponse; try { - reply = ContainerProtocolCalls - .readChunk(xceiverClient, chunkInfo, blockID, traceID, excludeDns); - ContainerProtos.ContainerCommandResponseProto response; - response = reply.getResponse().get(); - ContainerProtocolCalls.validateContainerResponse(response); - readChunkResponse = response.getReadChunk(); - dnListFromReply.addAll(reply.getDatanodes()); + List validators = + ContainerProtocolCalls.getValidatorList(); + validators.add(validator); + readChunkResponse = ContainerProtocolCalls + .readChunk(xceiverClient, chunkInfo, blockID, traceID, validators); } catch (IOException e) { if (e instanceof StorageContainerException) { throw e; } throw new IOException("Unexpected OzoneException: " + e.toString(), e); - } catch (ExecutionException | InterruptedException e) { - throw new IOException( - "Failed to execute ReadChunk command for chunk " + chunkInfo - .getChunkName(), e); } return readChunkResponse.getData(); } @@ -403,6 +362,26 @@ protected List getDatanodeList() { return xceiverClient.getPipeline().getNodes(); } + private CheckedBiFunction validator = + (request, response) -> { + ReadChunkResponseProto readChunkResponse = response.getReadChunk(); + final ChunkInfo chunkInfo = readChunkResponse.getChunkData(); + ByteString byteString = readChunkResponse.getData(); + if (byteString.size() != chunkInfo.getLen()) { + // Bytes read from chunk should be equal to chunk size. + throw new OzoneChecksumException(String + .format("Inconsistent read for chunk=%s len=%d bytesRead=%d", + chunkInfo.getChunkName(), chunkInfo.getLen(), + byteString.size())); + } + ChecksumData checksumData = + ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData()); + if (verifyChecksum) { + Checksum.verifyChecksum(byteString, checksumData); + } + }; + @Override public synchronized void seek(long pos) throws IOException { if (pos < 0 || (chunks.size() == 0 && pos > 0) diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java index 35c102257f4b4..b6ceb2b2af738 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java @@ -114,8 +114,7 @@ private static class DummyBlockInputStream extends BlockInputStream { } @Override - protected ByteString readChunk(final ChunkInfo chunkInfo, - List excludeDns, List dnListFromReply) + protected ByteString readChunk(final ChunkInfo chunkInfo) throws IOException { return getByteString(chunkInfo.getChunkName(), (int) chunkInfo.getLen()); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index 1a183664a127f..5631badf44c93 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -25,13 +25,13 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdds.scm.storage.CheckedBiFunction; /** * A Client for the storageContainer protocol. @@ -118,18 +118,21 @@ public ContainerCommandResponseProto sendCommand( * Sends a given command to server and gets the reply back along with * the server associated info. * @param request Request - * @param excludeDns list of servers on which the command won't be sent to. + * @param validators functions to validate the response * @return Response to the command * @throws IOException */ - public XceiverClientReply sendCommand( - ContainerCommandRequestProto request, List excludeDns) + public ContainerCommandResponseProto sendCommand( + ContainerCommandRequestProto request, List validators) throws IOException { try { XceiverClientReply reply; reply = sendCommandAsync(request); - reply.getResponse().get(); - return reply; + ContainerCommandResponseProto responseProto = reply.getResponse().get(); + for (CheckedBiFunction function : validators) { + function.apply(request, responseProto); + } + return responseProto; } catch (ExecutionException | InterruptedException e) { throw new IOException("Failed to command " + request, e); } @@ -156,7 +159,7 @@ public XceiverClientReply sendCommand( /** * Check if an specfic commitIndex is replicated to majority/all servers. * @param index index to watch for - * @param timeout timeout provided for the watch ipeartion to complete + * @param timeout timeout provided for the watch operation to complete * @return reply containing the min commit index replicated to all or majority * servers in case of a failure * @throws InterruptedException diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedBiFunction.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedBiFunction.java new file mode 100644 index 0000000000000..df84859ab0294 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedBiFunction.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + + +import java.io.IOException; + +/** + * Defines a functional interface having two inputs which throws IOException. + */ +@FunctionalInterface +public interface CheckedBiFunction { + void apply(LEFT left, RIGHT right) throws THROWABLE; +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 5a1a75eb90fd9..08f5d87f5ec13 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdds.scm.storage; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.container.common.helpers .BlockNotCommittedException; @@ -72,6 +71,7 @@ import org.apache.hadoop.hdds.client.BlockID; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; @@ -116,9 +116,8 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, } ContainerCommandRequestProto request = builder.build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response); - + ContainerCommandResponseProto response = + xceiverClient.sendCommand(request, getValidatorList()); return response.getGetBlock(); } @@ -153,8 +152,8 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, builder.setEncodedToken(encodedToken); } ContainerCommandRequestProto request = builder.build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response); + ContainerCommandResponseProto response = + xceiverClient.sendCommand(request, getValidatorList()); return response.getGetCommittedBlockLength(); } @@ -184,8 +183,8 @@ public static ContainerProtos.PutBlockResponseProto putBlock( builder.setEncodedToken(encodedToken); } ContainerCommandRequestProto request = builder.build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response); + ContainerCommandResponseProto response = + xceiverClient.sendCommand(request, getValidatorList()); return response.getPutBlock(); } @@ -228,35 +227,31 @@ public static XceiverClientReply putBlockAsync( * @param chunk information about chunk to read * @param blockID ID of the block * @param traceID container protocol call args - * @param excludeDns datamode to exclude while executing the command + * @param validators functions to validate the response * @return container protocol read chunk response * @throws IOException if there is an I/O error while performing the call */ - public static XceiverClientReply readChunk(XceiverClientSpi xceiverClient, - ChunkInfo chunk, BlockID blockID, String traceID, - List excludeDns) - throws IOException { - ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto - .newBuilder() - .setBlockID(blockID.getDatanodeBlockIDProtobuf()) - .setChunkData(chunk); + public static ContainerProtos.ReadChunkResponseProto readChunk( + XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID, + String traceID, List validators) throws IOException { + ReadChunkRequestProto.Builder readChunkRequest = + ReadChunkRequestProto.newBuilder() + .setBlockID(blockID.getDatanodeBlockIDProtobuf()) + .setChunkData(chunk); String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); - ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto - .newBuilder() - .setCmdType(Type.ReadChunk) - .setContainerID(blockID.getContainerID()) - .setTraceID(traceID) - .setDatanodeUuid(id) - .setReadChunk(readChunkRequest); + ContainerCommandRequestProto.Builder builder = + ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk) + .setContainerID(blockID.getContainerID()).setTraceID(traceID) + .setDatanodeUuid(id).setReadChunk(readChunkRequest); String encodedToken = getEncodedBlockToken(new Text(blockID. getContainerBlockID().toString())); if (encodedToken != null) { builder.setEncodedToken(encodedToken); } ContainerCommandRequestProto request = builder.build(); - XceiverClientReply reply = - xceiverClient.sendCommand(request, excludeDns); - return reply; + ContainerCommandResponseProto reply = + xceiverClient.sendCommand(request, validators); + return reply.getReadChunk(); } /** @@ -291,8 +286,7 @@ public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, builder.setEncodedToken(encodedToken); } ContainerCommandRequestProto request = builder.build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response); + xceiverClient.sendCommand(request, getValidatorList()); } /** @@ -384,8 +378,8 @@ public static PutSmallFileResponseProto writeSmallFile( builder.setEncodedToken(encodedToken); } ContainerCommandRequestProto request = builder.build(); - ContainerCommandResponseProto response = client.sendCommand(request); - validateContainerResponse(response); + ContainerCommandResponseProto response = + client.sendCommand(request, getValidatorList()); return response.getPutSmallFile(); } @@ -416,9 +410,7 @@ public static void createContainer(XceiverClientSpi client, long containerID, request.setCreateContainer(createRequest.build()); request.setDatanodeUuid(id); request.setTraceID(traceID); - ContainerCommandResponseProto response = client.sendCommand( - request.build()); - validateContainerResponse(response); + client.sendCommand(request.build(), getValidatorList()); } /** @@ -444,12 +436,10 @@ public static void deleteContainer(XceiverClientSpi client, long containerID, request.setDeleteContainer(deleteRequest); request.setTraceID(traceID); request.setDatanodeUuid(id); - if(encodedToken != null) { + if (encodedToken != null) { request.setEncodedToken(encodedToken); } - ContainerCommandResponseProto response = - client.sendCommand(request.build()); - validateContainerResponse(response); + client.sendCommand(request.build(), getValidatorList()); } /** @@ -476,9 +466,7 @@ public static void closeContainer(XceiverClientSpi client, if(encodedToken != null) { request.setEncodedToken(encodedToken); } - ContainerCommandResponseProto response = - client.sendCommand(request.build()); - validateContainerResponse(response); + client.sendCommand(request.build(), getValidatorList()); } /** @@ -505,8 +493,7 @@ public static ReadContainerResponseProto readContainer( request.setEncodedToken(encodedToken); } ContainerCommandResponseProto response = - client.sendCommand(request.build()); - validateContainerResponse(response); + client.sendCommand(request.build(), getValidatorList()); return response.getReadContainer(); } @@ -544,9 +531,8 @@ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client, builder.setEncodedToken(encodedToken); } ContainerCommandRequestProto request = builder.build(); - ContainerCommandResponseProto response = client.sendCommand(request); - validateContainerResponse(response); - + ContainerCommandResponseProto response = + client.sendCommand(request, getValidatorList()); return response.getGetSmallFile(); } @@ -598,4 +584,13 @@ private static Text getService(DatanodeBlockID blockId) { .append(blockId.getLocalID()) .toString()); } + + public static List getValidatorList() { + List validators = new ArrayList<>(1); + CheckedBiFunction + validator = (request, response) -> validateContainerResponse(response); + validators.add(validator); + return validators; + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java index bd496d0803ffd..56e0eeae42050 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java @@ -867,7 +867,7 @@ private void readCorruptedKey(String volumeName, String bucketName, fail("Reading corrupted data should fail, as verify checksum is " + "enabled"); } - } catch (OzoneChecksumException e) { + } catch (IOException e) { if (!verifyChecksum) { fail("Reading corrupted data should not fail, as verify checksum is " + "disabled"); @@ -1017,7 +1017,7 @@ public void testReadKeyWithCorruptedData() throws IOException { OzoneInputStream is = bucket.readKey(keyName); is.read(new byte[100]); fail("Reading corrupted data should fail."); - } catch (OzoneChecksumException e) { + } catch (IOException e) { GenericTestUtils.assertExceptionContains("Checksum mismatch", e); } } @@ -1098,7 +1098,7 @@ public void testReadKeyWithCorruptedDataWithMutiNodes() throws IOException { byte[] b = new byte[data.length]; is.read(b); fail("Reading corrupted data should fail."); - } catch (OzoneChecksumException e) { + } catch (IOException e) { GenericTestUtils.assertExceptionContains("Checksum mismatch", e); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java index 0145c6dc9f196..1343a03a2a952 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java @@ -204,9 +204,8 @@ public void testPutKeyAndGetKeyThreeNodes() readKey(bucket, keyName, value); fail("Expected exception not thrown"); } catch (IOException e) { - Assert.assertTrue(e.getMessage().contains("Failed to execute command")); - Assert.assertTrue( - e.getMessage().contains("on the pipeline " + pipeline.getId())); + // it should throw an ioException as none of the servers + // are available } manager.releaseClient(clientSpi, false); }