From e815258b9e301b1a3b94b62de18e6f6912be0d26 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Thu, 16 May 2019 19:21:33 +0530 Subject: [PATCH 1/6] Rebased to latest trunk. --- .../hadoop/hdds/scm/XceiverClientGrpc.java | 96 ++++++++++--------- .../hadoop/hdds/scm/XceiverClientRatis.java | 1 + .../hdds/scm/storage/BlockInputStream.java | 85 +++++++--------- .../scm/storage/TestBlockInputStream.java | 3 +- .../hadoop/hdds/scm/XceiverClientSpi.java | 15 +-- .../hdds/scm/storage/CheckedFunction.java | 27 ++++++ .../scm/storage/ContainerProtocolCalls.java | 77 +++++++-------- .../rpc/TestOzoneRpcClientAbstract.java | 6 +- 8 files changed, 160 insertions(+), 150 deletions(-) create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedFunction.java diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index a535c9f3f2aa4..837c1579fa756 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -31,12 +31,14 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.storage.CheckedFunction; import org.apache.hadoop.hdds.security.exception.SCMSecurityException; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.tracing.GrpcClientInterceptor; import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.common.OzoneChecksumException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; @@ -83,7 +85,7 @@ public class XceiverClientGrpc extends XceiverClientSpi { * data nodes. * * @param pipeline - Pipeline that defines the machines. - * @param config -- Ozone Config + * @param config -- Ozone Config */ public XceiverClientGrpc(Pipeline pipeline, Configuration config) { super(); @@ -91,7 +93,7 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config) { Preconditions.checkNotNull(config); this.pipeline = pipeline; this.config = config; - this.secConfig = new SecurityConfig(config); + this.secConfig = new SecurityConfig(config); this.semaphore = new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config)); this.metrics = XceiverClientManager.getXceiverClientMetrics(); @@ -101,9 +103,8 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config) { /** * To be used when grpc token is not enabled. - * */ - @Override - public void connect() throws Exception { + */ + @Override public void connect() throws Exception { // leader by default is the 1st datanode in the datanode list of pipleline DatanodeDetails dn = this.pipeline.getFirstNode(); // just make a connection to the 1st datanode at the beginning @@ -112,9 +113,8 @@ public void connect() throws Exception { /** * Passed encoded token to GRPC header when security is enabled. - * */ - @Override - public void connect(String encodedToken) throws Exception { + */ + @Override public void connect(String encodedToken) throws Exception { // leader by default is the 1st datanode in the datanode list of pipleline DatanodeDetails dn = this.pipeline.getFirstNode(); // just make a connection to the 1st datanode at the beginning @@ -132,11 +132,10 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken) } // Add credential context to the client call - String userName = UserGroupInformation.getCurrentUser() - .getShortUserName(); + String userName = UserGroupInformation.getCurrentUser().getShortUserName(); LOG.debug("Connecting to server Port : " + dn.getIpAddress()); - NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(dn - .getIpAddress(), port).usePlaintext() + NettyChannelBuilder channelBuilder = + NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext() .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE) .intercept(new ClientCredentialInterceptor(userName, encodedToken), new GrpcClientInterceptor()); @@ -149,8 +148,8 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken) if (trustCertCollectionFile != null) { sslContextBuilder.trustManager(trustCertCollectionFile); } - if (secConfig.isGrpcMutualTlsRequired() && clientCertChainFile != null && - privateKeyFile != null) { + if (secConfig.isGrpcMutualTlsRequired() && clientCertChainFile != null + && privateKeyFile != null) { sslContextBuilder.keyManager(clientCertChainFile, privateKeyFile); } @@ -174,8 +173,7 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken) * * @return True if the connection is alive, false otherwise. */ - @VisibleForTesting - public boolean isConnected(DatanodeDetails details) { + @VisibleForTesting public boolean isConnected(DatanodeDetails details) { return isConnected(channels.get(details.getUuid())); } @@ -183,8 +181,7 @@ private boolean isConnected(ManagedChannel channel) { return channel != null && !channel.isTerminated() && !channel.isShutdown(); } - @Override - public void close() { + @Override public void close() { closed = true; for (ManagedChannel channel : channels.values()) { channel.shutdownNow(); @@ -216,49 +213,46 @@ public ContainerCommandResponseProto sendCommand( } @Override - public XceiverClientReply sendCommand( - ContainerCommandRequestProto request, List excludeDns) + public ContainerCommandResponseProto sendCommand( + ContainerCommandRequestProto request, CheckedFunction function) throws IOException { Preconditions.checkState(HddsUtils.isReadOnly(request)); - return sendCommandWithTraceIDAndRetry(request, excludeDns); + try { + XceiverClientReply reply; + reply = sendCommandWithTraceIDAndRetry(request, function); + ContainerCommandResponseProto responseProto = reply.getResponse().get(); + return responseProto; + } catch (ExecutionException | InterruptedException e) { + throw new IOException("Failed to execute command " + request, e); + } } private XceiverClientReply sendCommandWithTraceIDAndRetry( - ContainerCommandRequestProto request, List excludeDns) + ContainerCommandRequestProto request, CheckedFunction function) throws IOException { try (Scope scope = GlobalTracer.get() .buildSpan("XceiverClientGrpc." + request.getCmdType().name()) .startActive(true)) { ContainerCommandRequestProto finalPayload = ContainerCommandRequestProto.newBuilder(request) - .setTraceID(TracingUtil.exportCurrentSpan()) - .build(); - return sendCommandWithRetry(finalPayload, excludeDns); + .setTraceID(TracingUtil.exportCurrentSpan()).build(); + return sendCommandWithRetry(finalPayload, function); } } private XceiverClientReply sendCommandWithRetry( - ContainerCommandRequestProto request, List excludeDns) + ContainerCommandRequestProto request, CheckedFunction function) throws IOException { ContainerCommandResponseProto responseProto = null; + IOException ioException = null; // In case of an exception or an error, we will try to read from the // datanodes in the pipeline in a round robin fashion. // TODO: cache the correct leader info in here, so that any subsequent calls // should first go to leader - List dns = pipeline.getNodes(); - List healthyDns = - excludeDns != null ? dns.stream().filter(dnId -> { - for (DatanodeDetails excludeId : excludeDns) { - if (dnId.equals(excludeId)) { - return false; - } - } - return true; - }).collect(Collectors.toList()) : dns; XceiverClientReply reply = new XceiverClientReply(null); - for (DatanodeDetails dn : healthyDns) { + for (DatanodeDetails dn : pipeline.getNodes()) { try { LOG.debug("Executing command " + request + " on datanode " + dn); // In case the command gets retried on a 2nd datanode, @@ -266,17 +260,24 @@ private XceiverClientReply sendCommandWithRetry( // in case these don't exist for the specific datanode. reply.addDatanode(dn); responseProto = sendCommandAsync(request, dn).getResponse().get(); - if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) { - break; + if (function != null) { + function.apply(responseProto); } - } catch (ExecutionException | InterruptedException e) { + break; + } catch (ExecutionException | InterruptedException | IOException e) { LOG.debug("Failed to execute command " + request + " on datanode " + dn .getUuidString(), e); - if (Status.fromThrowable(e.getCause()).getCode() - == Status.UNAUTHENTICATED.getCode()) { - throw new SCMSecurityException("Failed to authenticate with " - + "GRPC XceiverServer with Ozone block token."); + if (!(e instanceof IOException)) { + if (Status.fromThrowable(e.getCause()).getCode() + == Status.UNAUTHENTICATED.getCode()) { + throw new SCMSecurityException("Failed to authenticate with " + + "GRPC XceiverServer with Ozone block token."); + } + ioException = new IOException(e); + } else { + ioException = (IOException) e; } + responseProto = null; } } @@ -284,9 +285,10 @@ private XceiverClientReply sendCommandWithRetry( reply.setResponse(CompletableFuture.completedFuture(responseProto)); return reply; } else { - throw new IOException( - "Failed to execute command " + request + " on the pipeline " - + pipeline.getId()); + Preconditions.checkNotNull(ioException); + LOG.error("Failed to execute command " + request + " on the pipeline " + + pipeline.getId()); + throw ioException; } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index efd82bce7bbd5..607966d5efc7d 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.storage.CheckedFunction; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import io.opentracing.Scope; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index bb4a5b05633d9..c84d1b84ca0c8 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -27,6 +27,7 @@ .StorageContainerException; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; +import org.apache.hadoop.ozone.common.OzoneChecksumException; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.hdds.scm.XceiverClientManager; @@ -74,7 +75,7 @@ public class BlockInputStream extends InputStream implements Seekable { private List buffers; private int bufferIndex; private long bufferPosition; - private final boolean verifyChecksum; + private boolean verifyChecksum; /** * Creates a new BlockInputStream. @@ -323,41 +324,8 @@ private boolean chunksRemaining() { private synchronized void readChunkFromContainer() throws IOException { // Read the chunk at chunkIndex final ChunkInfo chunkInfo = chunks.get(chunkIndex); - List excludeDns = null; ByteString byteString; - List dnList = getDatanodeList(); - while (true) { - List dnListFromReadChunkCall = new ArrayList<>(); - byteString = readChunk(chunkInfo, excludeDns, dnListFromReadChunkCall); - try { - if (byteString.size() != chunkInfo.getLen()) { - // Bytes read from chunk should be equal to chunk size. - throw new IOException(String - .format("Inconsistent read for chunk=%s len=%d bytesRead=%d", - chunkInfo.getChunkName(), chunkInfo.getLen(), - byteString.size())); - } - ChecksumData checksumData = - ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData()); - if (verifyChecksum) { - Checksum.verifyChecksum(byteString, checksumData); - } - break; - } catch (IOException ioe) { - // we will end up in this situation only if the checksum mismatch - // happens or the length of the chunk mismatches. - // In this case, read should be retried on a different replica. - // TODO: Inform SCM of a possible corrupt container replica here - if (excludeDns == null) { - excludeDns = new ArrayList<>(); - } - excludeDns.addAll(dnListFromReadChunkCall); - if (excludeDns.size() == dnList.size()) { - throw ioe; - } - } - } - + byteString = readChunk(chunkInfo); buffers = byteString.asReadOnlyByteBufferList(); bufferIndex = 0; chunkIndexOfCurrentBuffer = chunkIndex; @@ -372,28 +340,17 @@ private synchronized void readChunkFromContainer() throws IOException { * Send RPC call to get the chunk from the container. */ @VisibleForTesting - protected ByteString readChunk(final ChunkInfo chunkInfo, - List excludeDns, List dnListFromReply) + protected ByteString readChunk(final ChunkInfo chunkInfo) throws IOException { - XceiverClientReply reply; - ReadChunkResponseProto readChunkResponse = null; + ReadChunkResponseProto readChunkResponse; try { - reply = ContainerProtocolCalls - .readChunk(xceiverClient, chunkInfo, blockID, traceID, excludeDns); - ContainerProtos.ContainerCommandResponseProto response; - response = reply.getResponse().get(); - ContainerProtocolCalls.validateContainerResponse(response); - readChunkResponse = response.getReadChunk(); - dnListFromReply.addAll(reply.getDatanodes()); + readChunkResponse = ContainerProtocolCalls + .readChunk(xceiverClient, chunkInfo, blockID, traceID, validator); } catch (IOException e) { if (e instanceof StorageContainerException) { throw e; } throw new IOException("Unexpected OzoneException: " + e.toString(), e); - } catch (ExecutionException | InterruptedException e) { - throw new IOException( - "Failed to execute ReadChunk command for chunk " + chunkInfo - .getChunkName(), e); } return readChunkResponse.getData(); } @@ -403,6 +360,34 @@ protected List getDatanodeList() { return xceiverClient.getPipeline().getNodes(); } + private CheckedFunction + validator = (response) -> { + ReadChunkResponseProto readChunkResponse; + final ChunkInfo chunkInfo = chunks.get(chunkIndex); + ByteString byteString; + try { + ContainerProtocolCalls.validateContainerResponse(response); + readChunkResponse = response.getReadChunk(); + } catch (IOException e) { + if (e instanceof StorageContainerException) { + throw e; + } + throw new IOException("Unexpected OzoneException: " + e.toString(), e); + } + byteString = readChunkResponse.getData(); + if (byteString.size() != chunkInfo.getLen()) { + // Bytes read from chunk should be equal to chunk size. + throw new OzoneChecksumException(String + .format("Inconsistent read for chunk=%s len=%d bytesRead=%d", + chunkInfo.getChunkName(), chunkInfo.getLen(), byteString.size())); + } + ChecksumData checksumData = + ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData()); + if (verifyChecksum) { + Checksum.verifyChecksum(byteString, checksumData); + } + }; + @Override public synchronized void seek(long pos) throws IOException { if (pos < 0 || (chunks.size() == 0 && pos > 0) diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java index 35c102257f4b4..b6ceb2b2af738 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java @@ -114,8 +114,7 @@ private static class DummyBlockInputStream extends BlockInputStream { } @Override - protected ByteString readChunk(final ChunkInfo chunkInfo, - List excludeDns, List dnListFromReply) + protected ByteString readChunk(final ChunkInfo chunkInfo) throws IOException { return getByteString(chunkInfo.getChunkName(), (int) chunkInfo.getLen()); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index 1a183664a127f..9f4ad210ee91a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdds.scm.storage.CheckedFunction; +import org.apache.hadoop.hdds.scm.storage.CheckedFunction; /** * A Client for the storageContainer protocol. @@ -118,18 +120,19 @@ public ContainerCommandResponseProto sendCommand( * Sends a given command to server and gets the reply back along with * the server associated info. * @param request Request - * @param excludeDns list of servers on which the command won't be sent to. + * @param function function to validate the response * @return Response to the command * @throws IOException */ - public XceiverClientReply sendCommand( - ContainerCommandRequestProto request, List excludeDns) + public ContainerCommandResponseProto sendCommand( + ContainerCommandRequestProto request, CheckedFunction function) throws IOException { try { XceiverClientReply reply; reply = sendCommandAsync(request); - reply.getResponse().get(); - return reply; + ContainerCommandResponseProto responseProto = reply.getResponse().get(); + function.apply(responseProto); + return responseProto; } catch (ExecutionException | InterruptedException e) { throw new IOException("Failed to command " + request, e); } @@ -156,7 +159,7 @@ public XceiverClientReply sendCommand( /** * Check if an specfic commitIndex is replicated to majority/all servers. * @param index index to watch for - * @param timeout timeout provided for the watch ipeartion to complete + * @param timeout timeout provided for the watch operation to complete * @return reply containing the min commit index replicated to all or majority * servers in case of a failure * @throws InterruptedException diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedFunction.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedFunction.java new file mode 100644 index 0000000000000..3c722b6c33411 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedFunction.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + + +import java.io.IOException; + +@FunctionalInterface +public interface CheckedFunction { + void apply(IN input) throws THROWABLE; +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 5a1a75eb90fd9..8df94326dc8b5 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -116,9 +116,8 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, } ContainerCommandRequestProto request = builder.build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response); - + ContainerCommandResponseProto response = + xceiverClient.sendCommand(request, validator); return response.getGetBlock(); } @@ -153,8 +152,8 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, builder.setEncodedToken(encodedToken); } ContainerCommandRequestProto request = builder.build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response); + ContainerCommandResponseProto response = + xceiverClient.sendCommand(request, validator); return response.getGetCommittedBlockLength(); } @@ -184,8 +183,8 @@ public static ContainerProtos.PutBlockResponseProto putBlock( builder.setEncodedToken(encodedToken); } ContainerCommandRequestProto request = builder.build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response); + ContainerCommandResponseProto response = + xceiverClient.sendCommand(request, validator); return response.getPutBlock(); } @@ -228,35 +227,31 @@ public static XceiverClientReply putBlockAsync( * @param chunk information about chunk to read * @param blockID ID of the block * @param traceID container protocol call args - * @param excludeDns datamode to exclude while executing the command + * @param validator function to validate the response * @return container protocol read chunk response * @throws IOException if there is an I/O error while performing the call */ - public static XceiverClientReply readChunk(XceiverClientSpi xceiverClient, - ChunkInfo chunk, BlockID blockID, String traceID, - List excludeDns) - throws IOException { - ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto - .newBuilder() - .setBlockID(blockID.getDatanodeBlockIDProtobuf()) - .setChunkData(chunk); + public static ContainerProtos.ReadChunkResponseProto readChunk( + XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID, + String traceID, CheckedFunction validator) throws IOException { + ReadChunkRequestProto.Builder readChunkRequest = + ReadChunkRequestProto.newBuilder() + .setBlockID(blockID.getDatanodeBlockIDProtobuf()) + .setChunkData(chunk); String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); - ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto - .newBuilder() - .setCmdType(Type.ReadChunk) - .setContainerID(blockID.getContainerID()) - .setTraceID(traceID) - .setDatanodeUuid(id) - .setReadChunk(readChunkRequest); + ContainerCommandRequestProto.Builder builder = + ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk) + .setContainerID(blockID.getContainerID()).setTraceID(traceID) + .setDatanodeUuid(id).setReadChunk(readChunkRequest); String encodedToken = getEncodedBlockToken(new Text(blockID. getContainerBlockID().toString())); if (encodedToken != null) { builder.setEncodedToken(encodedToken); } ContainerCommandRequestProto request = builder.build(); - XceiverClientReply reply = - xceiverClient.sendCommand(request, excludeDns); - return reply; + ContainerCommandResponseProto reply = + xceiverClient.sendCommand(request, validator); + return reply.getReadChunk(); } /** @@ -291,8 +286,7 @@ public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, builder.setEncodedToken(encodedToken); } ContainerCommandRequestProto request = builder.build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response); + xceiverClient.sendCommand(request, validator); } /** @@ -384,8 +378,8 @@ public static PutSmallFileResponseProto writeSmallFile( builder.setEncodedToken(encodedToken); } ContainerCommandRequestProto request = builder.build(); - ContainerCommandResponseProto response = client.sendCommand(request); - validateContainerResponse(response); + ContainerCommandResponseProto response = + client.sendCommand(request, validator); return response.getPutSmallFile(); } @@ -416,9 +410,7 @@ public static void createContainer(XceiverClientSpi client, long containerID, request.setCreateContainer(createRequest.build()); request.setDatanodeUuid(id); request.setTraceID(traceID); - ContainerCommandResponseProto response = client.sendCommand( - request.build()); - validateContainerResponse(response); + client.sendCommand(request.build(), validator); } /** @@ -444,12 +436,10 @@ public static void deleteContainer(XceiverClientSpi client, long containerID, request.setDeleteContainer(deleteRequest); request.setTraceID(traceID); request.setDatanodeUuid(id); - if(encodedToken != null) { + if (encodedToken != null) { request.setEncodedToken(encodedToken); } - ContainerCommandResponseProto response = - client.sendCommand(request.build()); - validateContainerResponse(response); + client.sendCommand(request.build(), validator); } /** @@ -505,8 +495,7 @@ public static ReadContainerResponseProto readContainer( request.setEncodedToken(encodedToken); } ContainerCommandResponseProto response = - client.sendCommand(request.build()); - validateContainerResponse(response); + client.sendCommand(request.build(), validator); return response.getReadContainer(); } @@ -544,9 +533,8 @@ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client, builder.setEncodedToken(encodedToken); } ContainerCommandRequestProto request = builder.build(); - ContainerCommandResponseProto response = client.sendCommand(request); - validateContainerResponse(response); - + ContainerCommandResponseProto response = + client.sendCommand(request, validator); return response.getGetSmallFile(); } @@ -598,4 +586,9 @@ private static Text getService(DatanodeBlockID blockId) { .append(blockId.getLocalID()) .toString()); } + + private static CheckedFunction + + validator = (response) -> + validateContainerResponse(response); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java index bd496d0803ffd..56e0eeae42050 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java @@ -867,7 +867,7 @@ private void readCorruptedKey(String volumeName, String bucketName, fail("Reading corrupted data should fail, as verify checksum is " + "enabled"); } - } catch (OzoneChecksumException e) { + } catch (IOException e) { if (!verifyChecksum) { fail("Reading corrupted data should not fail, as verify checksum is " + "disabled"); @@ -1017,7 +1017,7 @@ public void testReadKeyWithCorruptedData() throws IOException { OzoneInputStream is = bucket.readKey(keyName); is.read(new byte[100]); fail("Reading corrupted data should fail."); - } catch (OzoneChecksumException e) { + } catch (IOException e) { GenericTestUtils.assertExceptionContains("Checksum mismatch", e); } } @@ -1098,7 +1098,7 @@ public void testReadKeyWithCorruptedDataWithMutiNodes() throws IOException { byte[] b = new byte[data.length]; is.read(b); fail("Reading corrupted data should fail."); - } catch (OzoneChecksumException e) { + } catch (IOException e) { GenericTestUtils.assertExceptionContains("Checksum mismatch", e); } } From 97bcc05ea2b97d354b3c77a9a3255195bd39f13a Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Wed, 29 May 2019 23:01:38 +0530 Subject: [PATCH 2/6] Addressed Review Comments. --- .../hadoop/hdds/scm/XceiverClientGrpc.java | 18 ++++----- .../hadoop/hdds/scm/XceiverClientRatis.java | 1 - .../hdds/scm/storage/BlockInputStream.java | 23 +++++------ .../hadoop/hdds/scm/XceiverClientSpi.java | 12 +++--- ...edFunction.java => CheckedBiFunction.java} | 4 +- .../scm/storage/ContainerProtocolCalls.java | 38 ++++++++++--------- 6 files changed, 47 insertions(+), 49 deletions(-) rename hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/{CheckedFunction.java => CheckedBiFunction.java} (86%) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 837c1579fa756..2b136a11e377b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -31,14 +31,13 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.scm.storage.CheckedFunction; +import org.apache.hadoop.hdds.scm.storage.CheckedBiFunction; import org.apache.hadoop.hdds.security.exception.SCMSecurityException; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.tracing.GrpcClientInterceptor; import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.common.OzoneChecksumException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; @@ -214,12 +213,12 @@ public ContainerCommandResponseProto sendCommand( @Override public ContainerCommandResponseProto sendCommand( - ContainerCommandRequestProto request, CheckedFunction function) + ContainerCommandRequestProto request, List validators) throws IOException { Preconditions.checkState(HddsUtils.isReadOnly(request)); try { XceiverClientReply reply; - reply = sendCommandWithTraceIDAndRetry(request, function); + reply = sendCommandWithTraceIDAndRetry(request, validators); ContainerCommandResponseProto responseProto = reply.getResponse().get(); return responseProto; } catch (ExecutionException | InterruptedException e) { @@ -228,7 +227,7 @@ public ContainerCommandResponseProto sendCommand( } private XceiverClientReply sendCommandWithTraceIDAndRetry( - ContainerCommandRequestProto request, CheckedFunction function) + ContainerCommandRequestProto request, List validators) throws IOException { try (Scope scope = GlobalTracer.get() .buildSpan("XceiverClientGrpc." + request.getCmdType().name()) @@ -236,12 +235,12 @@ private XceiverClientReply sendCommandWithTraceIDAndRetry( ContainerCommandRequestProto finalPayload = ContainerCommandRequestProto.newBuilder(request) .setTraceID(TracingUtil.exportCurrentSpan()).build(); - return sendCommandWithRetry(finalPayload, function); + return sendCommandWithRetry(finalPayload, validators); } } private XceiverClientReply sendCommandWithRetry( - ContainerCommandRequestProto request, CheckedFunction function) + ContainerCommandRequestProto request, List validators) throws IOException { ContainerCommandResponseProto responseProto = null; IOException ioException = null; @@ -260,8 +259,9 @@ private XceiverClientReply sendCommandWithRetry( // in case these don't exist for the specific datanode. reply.addDatanode(dn); responseProto = sendCommandAsync(request, dn).getResponse().get(); - if (function != null) { - function.apply(responseProto); + if (validators != null && !validators.isEmpty()) { + for (CheckedBiFunction validator : validators) + validator.apply(request, responseProto); } break; } catch (ExecutionException | InterruptedException | IOException e) { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 607966d5efc7d..efd82bce7bbd5 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.scm.storage.CheckedFunction; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import io.opentracing.Scope; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index c84d1b84ca0c8..c1a94e73e1c85 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -344,8 +344,11 @@ protected ByteString readChunk(final ChunkInfo chunkInfo) throws IOException { ReadChunkResponseProto readChunkResponse; try { + List validators = + ContainerProtocolCalls.getValidatorList(); + validators.add(validator); readChunkResponse = ContainerProtocolCalls - .readChunk(xceiverClient, chunkInfo, blockID, traceID, validator); + .readChunk(xceiverClient, chunkInfo, blockID, traceID, validators); } catch (IOException e) { if (e instanceof StorageContainerException) { throw e; @@ -360,20 +363,12 @@ protected List getDatanodeList() { return xceiverClient.getPipeline().getNodes(); } - private CheckedFunction - validator = (response) -> { - ReadChunkResponseProto readChunkResponse; - final ChunkInfo chunkInfo = chunks.get(chunkIndex); + private CheckedBiFunction + validator = (request, response) -> { + ReadChunkResponseProto readChunkResponse = response.getReadChunk(); + final ChunkInfo chunkInfo = readChunkResponse.getChunkData(); ByteString byteString; - try { - ContainerProtocolCalls.validateContainerResponse(response); - readChunkResponse = response.getReadChunk(); - } catch (IOException e) { - if (e instanceof StorageContainerException) { - throw e; - } - throw new IOException("Unexpected OzoneException: " + e.toString(), e); - } byteString = readChunkResponse.getData(); if (byteString.size() != chunkInfo.getLen()) { // Bytes read from chunk should be equal to chunk size. diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index 9f4ad210ee91a..5631badf44c93 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -25,15 +25,13 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hdds.scm.storage.CheckedFunction; -import org.apache.hadoop.hdds.scm.storage.CheckedFunction; +import org.apache.hadoop.hdds.scm.storage.CheckedBiFunction; /** * A Client for the storageContainer protocol. @@ -120,18 +118,20 @@ public ContainerCommandResponseProto sendCommand( * Sends a given command to server and gets the reply back along with * the server associated info. * @param request Request - * @param function function to validate the response + * @param validators functions to validate the response * @return Response to the command * @throws IOException */ public ContainerCommandResponseProto sendCommand( - ContainerCommandRequestProto request, CheckedFunction function) + ContainerCommandRequestProto request, List validators) throws IOException { try { XceiverClientReply reply; reply = sendCommandAsync(request); ContainerCommandResponseProto responseProto = reply.getResponse().get(); - function.apply(responseProto); + for (CheckedBiFunction function : validators) { + function.apply(request, responseProto); + } return responseProto; } catch (ExecutionException | InterruptedException e) { throw new IOException("Failed to command " + request, e); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedFunction.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedBiFunction.java similarity index 86% rename from hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedFunction.java rename to hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedBiFunction.java index 3c722b6c33411..3773777e918e9 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedFunction.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedBiFunction.java @@ -22,6 +22,6 @@ import java.io.IOException; @FunctionalInterface -public interface CheckedFunction { - void apply(IN input) throws THROWABLE; +public interface CheckedBiFunction { + void apply(LEFT left, RIGHT right) throws THROWABLE; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 8df94326dc8b5..8d5d7006e09ee 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdds.scm.storage; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.container.common.helpers .BlockNotCommittedException; @@ -72,6 +71,7 @@ import org.apache.hadoop.hdds.client.BlockID; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; @@ -117,7 +117,7 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = - xceiverClient.sendCommand(request, validator); + xceiverClient.sendCommand(request, getValidatorList()); return response.getGetBlock(); } @@ -153,7 +153,7 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, } ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = - xceiverClient.sendCommand(request, validator); + xceiverClient.sendCommand(request, getValidatorList()); return response.getGetCommittedBlockLength(); } @@ -184,7 +184,7 @@ public static ContainerProtos.PutBlockResponseProto putBlock( } ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = - xceiverClient.sendCommand(request, validator); + xceiverClient.sendCommand(request, getValidatorList()); return response.getPutBlock(); } @@ -227,13 +227,13 @@ public static XceiverClientReply putBlockAsync( * @param chunk information about chunk to read * @param blockID ID of the block * @param traceID container protocol call args - * @param validator function to validate the response + * @param validators functions to validate the response * @return container protocol read chunk response * @throws IOException if there is an I/O error while performing the call */ public static ContainerProtos.ReadChunkResponseProto readChunk( XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID, - String traceID, CheckedFunction validator) throws IOException { + String traceID, List validators) throws IOException { ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto.newBuilder() .setBlockID(blockID.getDatanodeBlockIDProtobuf()) @@ -250,7 +250,7 @@ public static ContainerProtos.ReadChunkResponseProto readChunk( } ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto reply = - xceiverClient.sendCommand(request, validator); + xceiverClient.sendCommand(request, validators); return reply.getReadChunk(); } @@ -286,7 +286,7 @@ public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, builder.setEncodedToken(encodedToken); } ContainerCommandRequestProto request = builder.build(); - xceiverClient.sendCommand(request, validator); + xceiverClient.sendCommand(request, getValidatorList()); } /** @@ -379,7 +379,7 @@ public static PutSmallFileResponseProto writeSmallFile( } ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = - client.sendCommand(request, validator); + client.sendCommand(request, getValidatorList()); return response.getPutSmallFile(); } @@ -410,7 +410,7 @@ public static void createContainer(XceiverClientSpi client, long containerID, request.setCreateContainer(createRequest.build()); request.setDatanodeUuid(id); request.setTraceID(traceID); - client.sendCommand(request.build(), validator); + client.sendCommand(request.build(), getValidatorList()); } /** @@ -439,7 +439,7 @@ public static void deleteContainer(XceiverClientSpi client, long containerID, if (encodedToken != null) { request.setEncodedToken(encodedToken); } - client.sendCommand(request.build(), validator); + client.sendCommand(request.build(), getValidatorList()); } /** @@ -495,7 +495,7 @@ public static ReadContainerResponseProto readContainer( request.setEncodedToken(encodedToken); } ContainerCommandResponseProto response = - client.sendCommand(request.build(), validator); + client.sendCommand(request.build(), getValidatorList()); return response.getReadContainer(); } @@ -534,7 +534,7 @@ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client, } ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = - client.sendCommand(request, validator); + client.sendCommand(request, getValidatorList()); return response.getGetSmallFile(); } @@ -587,8 +587,12 @@ private static Text getService(DatanodeBlockID blockId) { .toString()); } - private static CheckedFunction - - validator = (response) -> - validateContainerResponse(response); + public static List getValidatorList() { + List validators = new ArrayList<>(1); + CheckedBiFunction + validator = (request, response) -> validateContainerResponse(response); + validators.add(validator); + return validators; + } } From 1a328b232d1a48a9dca251ad996592522fab5eb3 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Fri, 31 May 2019 15:27:29 +0530 Subject: [PATCH 3/6] Addressed Checkstyle and related uit failures. --- .../hadoop/hdds/scm/XceiverClientGrpc.java | 20 ++++++++++--------- .../hdds/scm/storage/BlockInputStream.java | 3 --- .../hdds/scm/storage/CheckedBiFunction.java | 6 +++++- .../scm/storage/ContainerProtocolCalls.java | 4 +--- 4 files changed, 17 insertions(+), 16 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 2b136a11e377b..13d3eedec340a 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc; @@ -63,7 +62,6 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; /** * A Client for the storageContainer protocol. @@ -103,7 +101,8 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config) { /** * To be used when grpc token is not enabled. */ - @Override public void connect() throws Exception { + @Override + public void connect() throws Exception { // leader by default is the 1st datanode in the datanode list of pipleline DatanodeDetails dn = this.pipeline.getFirstNode(); // just make a connection to the 1st datanode at the beginning @@ -113,7 +112,8 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config) { /** * Passed encoded token to GRPC header when security is enabled. */ - @Override public void connect(String encodedToken) throws Exception { + @Override + public void connect(String encodedToken) throws Exception { // leader by default is the 1st datanode in the datanode list of pipleline DatanodeDetails dn = this.pipeline.getFirstNode(); // just make a connection to the 1st datanode at the beginning @@ -172,7 +172,8 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken) * * @return True if the connection is alive, false otherwise. */ - @VisibleForTesting public boolean isConnected(DatanodeDetails details) { + @VisibleForTesting + public boolean isConnected(DatanodeDetails details) { return isConnected(channels.get(details.getUuid())); } @@ -180,7 +181,8 @@ private boolean isConnected(ManagedChannel channel) { return channel != null && !channel.isTerminated() && !channel.isShutdown(); } - @Override public void close() { + @Override + public void close() { closed = true; for (ManagedChannel channel : channels.values()) { channel.shutdownNow(); @@ -215,7 +217,6 @@ public ContainerCommandResponseProto sendCommand( public ContainerCommandResponseProto sendCommand( ContainerCommandRequestProto request, List validators) throws IOException { - Preconditions.checkState(HddsUtils.isReadOnly(request)); try { XceiverClientReply reply; reply = sendCommandWithTraceIDAndRetry(request, validators); @@ -260,8 +261,9 @@ private XceiverClientReply sendCommandWithRetry( reply.addDatanode(dn); responseProto = sendCommandAsync(request, dn).getResponse().get(); if (validators != null && !validators.isEmpty()) { - for (CheckedBiFunction validator : validators) - validator.apply(request, responseProto); + for (CheckedBiFunction validator : validators) { + validator.apply(request, responseProto); + } } break; } catch (ExecutionException | InterruptedException | IOException e) { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index c1a94e73e1c85..6e3b10b0fac68 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -22,7 +22,6 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; import org.apache.hadoop.ozone.common.Checksum; @@ -41,10 +40,8 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.ExecutionException; /** * An {@link InputStream} used by the REST service in combination with the diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedBiFunction.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedBiFunction.java index 3773777e918e9..df84859ab0294 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedBiFunction.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/CheckedBiFunction.java @@ -21,7 +21,11 @@ import java.io.IOException; +/** + * Defines a functional interface having two inputs which throws IOException. + */ @FunctionalInterface -public interface CheckedBiFunction { +public interface CheckedBiFunction { void apply(LEFT left, RIGHT right) throws THROWABLE; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 8d5d7006e09ee..08f5d87f5ec13 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -466,9 +466,7 @@ public static void closeContainer(XceiverClientSpi client, if(encodedToken != null) { request.setEncodedToken(encodedToken); } - ContainerCommandResponseProto response = - client.sendCommand(request.build()); - validateContainerResponse(response); + client.sendCommand(request.build(), getValidatorList()); } /** From 90d25c50e863b3e8ef41f233e5405aac3e89b7e6 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Mon, 3 Jun 2019 21:22:33 +0530 Subject: [PATCH 4/6] Adde=ressed CheckStyle issues. --- .../hadoop/hdds/scm/storage/BlockInputStream.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 6e3b10b0fac68..9e98ade38b80a 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; import org.apache.hadoop.ozone.common.Checksum; @@ -34,8 +33,11 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ReadChunkResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. + ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. + ContainerCommandRequestProto; import org.apache.hadoop.hdds.client.BlockID; - import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -360,8 +362,8 @@ protected List getDatanodeList() { return xceiverClient.getPipeline().getNodes(); } - private CheckedBiFunction + private CheckedBiFunction validator = (request, response) -> { ReadChunkResponseProto readChunkResponse = response.getReadChunk(); final ChunkInfo chunkInfo = readChunkResponse.getChunkData(); From d59a67a5b76c5c390a54170207310b9113888cc5 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Tue, 4 Jun 2019 16:58:44 +0530 Subject: [PATCH 5/6] Addressed Test failires and checkstyle issues. --- .../hdds/scm/storage/BlockInputStream.java | 34 +++++++++---------- .../ozone/client/rpc/TestReadRetries.java | 5 ++- 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 9e98ade38b80a..3cb2161cad001 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -363,23 +363,23 @@ protected List getDatanodeList() { } private CheckedBiFunction - validator = (request, response) -> { - ReadChunkResponseProto readChunkResponse = response.getReadChunk(); - final ChunkInfo chunkInfo = readChunkResponse.getChunkData(); - ByteString byteString; - byteString = readChunkResponse.getData(); - if (byteString.size() != chunkInfo.getLen()) { - // Bytes read from chunk should be equal to chunk size. - throw new OzoneChecksumException(String - .format("Inconsistent read for chunk=%s len=%d bytesRead=%d", - chunkInfo.getChunkName(), chunkInfo.getLen(), byteString.size())); - } - ChecksumData checksumData = - ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData()); - if (verifyChecksum) { - Checksum.verifyChecksum(byteString, checksumData); - } + ContainerCommandResponseProto, IOException> validator = + (request, response) -> { + ReadChunkResponseProto readChunkResponse = response.getReadChunk(); + final ChunkInfo chunkInfo = readChunkResponse.getChunkData(); + ByteString byteString; + byteString = readChunkResponse.getData(); + if (byteString.size() != chunkInfo.getLen()) { + // Bytes read from chunk should be equal to chunk size. + throw new OzoneChecksumException(String + .format("Inconsistent read for chunk=%s len=%d bytesRead=%d", + chunkInfo.getChunkName(), chunkInfo.getLen(), byteString.size())); + } + ChecksumData checksumData = + ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData()); + if (verifyChecksum) { + Checksum.verifyChecksum(byteString, checksumData); + } }; @Override diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java index 0145c6dc9f196..1343a03a2a952 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java @@ -204,9 +204,8 @@ public void testPutKeyAndGetKeyThreeNodes() readKey(bucket, keyName, value); fail("Expected exception not thrown"); } catch (IOException e) { - Assert.assertTrue(e.getMessage().contains("Failed to execute command")); - Assert.assertTrue( - e.getMessage().contains("on the pipeline " + pipeline.getId())); + // it should throw an ioException as none of the servers + // are available } manager.releaseClient(clientSpi, false); } From f030b60e06adef104f250892e740e4d7b15a50cb Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Tue, 4 Jun 2019 20:39:13 +0530 Subject: [PATCH 6/6] Fixed Checkstyle issues --- .../hdds/scm/storage/BlockInputStream.java | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 3cb2161cad001..82fb1063d3bd8 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -364,23 +364,23 @@ protected List getDatanodeList() { private CheckedBiFunction validator = - (request, response) -> { - ReadChunkResponseProto readChunkResponse = response.getReadChunk(); - final ChunkInfo chunkInfo = readChunkResponse.getChunkData(); - ByteString byteString; - byteString = readChunkResponse.getData(); - if (byteString.size() != chunkInfo.getLen()) { - // Bytes read from chunk should be equal to chunk size. - throw new OzoneChecksumException(String - .format("Inconsistent read for chunk=%s len=%d bytesRead=%d", - chunkInfo.getChunkName(), chunkInfo.getLen(), byteString.size())); - } - ChecksumData checksumData = - ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData()); - if (verifyChecksum) { - Checksum.verifyChecksum(byteString, checksumData); - } - }; + (request, response) -> { + ReadChunkResponseProto readChunkResponse = response.getReadChunk(); + final ChunkInfo chunkInfo = readChunkResponse.getChunkData(); + ByteString byteString = readChunkResponse.getData(); + if (byteString.size() != chunkInfo.getLen()) { + // Bytes read from chunk should be equal to chunk size. + throw new OzoneChecksumException(String + .format("Inconsistent read for chunk=%s len=%d bytesRead=%d", + chunkInfo.getChunkName(), chunkInfo.getLen(), + byteString.size())); + } + ChecksumData checksumData = + ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData()); + if (verifyChecksum) { + Checksum.verifyChecksum(byteString, checksumData); + } + }; @Override public synchronized void seek(long pos) throws IOException {