From e837e0f90aea4b0e8254830bc66998a26f4c698f Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Tue, 1 Oct 2019 00:43:21 +0530 Subject: [PATCH 1/3] HDDS-2210. ContainerStateMachine should not be marked unhealthy if applyTransaction fails with closed container exception. --- .../server/ratis/ContainerStateMachine.java | 20 +++-- .../TestContainerStateMachineFailures.java | 77 +++++++++++++++++++ 2 files changed, 91 insertions(+), 6 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 0535763537cc3..c5a90fce93308 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -150,7 +150,7 @@ public class ContainerStateMachine extends BaseStateMachine { private final Cache stateMachineDataCache; private final boolean isBlockTokenEnabled; private final TokenVerifier tokenVerifier; - private final AtomicBoolean isStateMachineHealthy; + private final AtomicBoolean stateMachineHealthy; private final Semaphore applyTransactionSemaphore; /** @@ -190,7 +190,7 @@ public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher, ScmConfigKeys. DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT); applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions); - isStateMachineHealthy = new AtomicBoolean(true); + stateMachineHealthy = new AtomicBoolean(true); this.executors = new ExecutorService[numContainerOpExecutors]; for (int i = 0; i < numContainerOpExecutors; i++) { final int index = i; @@ -270,12 +270,16 @@ public void persistContainerSet(OutputStream out) throws IOException { // container happens outside of Ratis. IOUtils.write(builder.build().toByteArray(), out); } + + public boolean isStateMachineHealthy() { + return stateMachineHealthy.get(); + } @Override public long takeSnapshot() throws IOException { TermIndex ti = getLastAppliedTermIndex(); long startTime = Time.monotonicNow(); - if (!isStateMachineHealthy.get()) { + if (!isStateMachineHealthy()) { String msg = "Failed to take snapshot " + " for " + gid + " as the stateMachine" + " is unhealthy. The last applied index is at " + ti; @@ -731,7 +735,11 @@ public CompletableFuture applyTransaction(TransactionContext trx) { metrics.incPipelineLatency(cmdType, Time.monotonicNowNanos() - startTime); } - if (r.getResult() != ContainerProtos.Result.SUCCESS) { + // ignore close container exception while marking the stateMachine + // unhealthy + if (r.getResult() != ContainerProtos.Result.SUCCESS + && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN + && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) { StorageContainerException sce = new StorageContainerException(r.getMessage(), r.getResult()); LOG.error( @@ -744,7 +752,7 @@ public CompletableFuture applyTransaction(TransactionContext trx) { // caught in stateMachineUpdater in Ratis and ratis server will // shutdown. applyTransactionFuture.completeExceptionally(sce); - isStateMachineHealthy.compareAndSet(true, false); + stateMachineHealthy.compareAndSet(true, false); ratisServer.handleApplyTransactionFailure(gid, trx.getServerRole()); } else { LOG.debug( @@ -759,7 +767,7 @@ public CompletableFuture applyTransaction(TransactionContext trx) { // add the entry to the applyTransactionCompletionMap only if the // stateMachine is healthy i.e, there has been no applyTransaction // failures before. - if (isStateMachineHealthy.get()) { + if (stateMachineHealthy.get()) { final Long previous = applyTransactionCompletionMap .put(index, trx.getLogEntry().getTerm()); Preconditions.checkState(previous == null); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java index 7b908151f487f..476323c26536f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java @@ -353,6 +353,83 @@ public void testApplyTransactionFailure() throws Exception { Assert.assertTrue(snapshot.getPath().equals(latestSnapshot.getPath())); } + @Test + public void testApplyTransactionIdempotencyWithClosedContainer() + throws Exception { + OzoneOutputStream key = + objectStore.getVolume(volumeName).getBucket(bucketName) + .createKey("ratis", 1024, ReplicationType.RATIS, + ReplicationFactor.ONE, new HashMap<>()); + // First write and flush creates a container in the datanode + key.write("ratis".getBytes()); + key.flush(); + key.write("ratis".getBytes()); + KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream(); + List locationInfoList = + groupOutputStream.getLocationInfoList(); + Assert.assertEquals(1, locationInfoList.size()); + OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0); + ContainerData containerData = + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() + .getContainer().getContainerSet() + .getContainer(omKeyLocationInfo.getContainerID()) + .getContainerData(); + Assert.assertTrue(containerData instanceof KeyValueContainerData); + KeyValueContainerData keyValueContainerData = + (KeyValueContainerData) containerData; + key.close(); + ContainerStateMachine stateMachine = + (ContainerStateMachine) ContainerTestHelper.getStateMachine(cluster); + SimpleStateMachineStorage storage = + (SimpleStateMachineStorage) stateMachine.getStateMachineStorage(); + Path parentPath = storage.findLatestSnapshot().getFile().getPath(); + // Since the snapshot threshold is set to 1, since there are + // applyTransactions, we should see snapshots + Assert.assertTrue(parentPath.getParent().toFile().listFiles().length > 0); + FileInfo snapshot = storage.findLatestSnapshot().getFile(); + Assert.assertNotNull(snapshot); + long containerID = omKeyLocationInfo.getContainerID(); + Pipeline pipeline = cluster.getStorageContainerLocationClient() + .getContainerWithPipeline(containerID).getPipeline(); + XceiverClientSpi xceiverClient = + xceiverClientManager.acquireClient(pipeline); + ContainerProtos.ContainerCommandRequestProto.Builder request = + ContainerProtos.ContainerCommandRequestProto.newBuilder(); + request.setDatanodeUuid(pipeline.getFirstNode().getUuidString()); + request.setCmdType(ContainerProtos.Type.CloseContainer); + request.setContainerID(containerID); + request.setCloseContainer( + ContainerProtos.CloseContainerRequestProto.getDefaultInstance()); + // close container transaction will fail over Ratis and will initiate + // a pipeline close action + + // Since the applyTransaction failure is propagated to Ratis, + // stateMachineUpdater will it exception while taking the next snapshot + // and should shutdown the RaftServerImpl. The client request will fail + // with RaftRetryFailureException. + try { + xceiverClient.sendCommand(request.build()); + } catch (IOException e) { + Assert.fail("Exceptionn should not be thrown"); + } + // Make sure the container is marked unhealthy + Assert.assertTrue( + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() + .getContainer().getContainerSet().getContainer(containerID) + .getContainerState() + == ContainerProtos.ContainerDataProto.State.CLOSED); + Assert.assertTrue(stateMachine.isStateMachineHealthy()); + try { + // try to take a new snapshot, ideally it should just fail + stateMachine.takeSnapshot(); + } catch (IOException ioe) { + Assert.assertTrue(ioe instanceof StateMachineException); + } + // Make sure the latest snapshot is same as the previous one + FileInfo latestSnapshot = storage.findLatestSnapshot().getFile(); + Assert.assertFalse(snapshot.getPath().equals(latestSnapshot.getPath())); + } + @Test public void testValidateBCSIDOnDnRestart() throws Exception { OzoneOutputStream key = From 22a0b1056017bc1f4f442c11760063d640d38506 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Tue, 1 Oct 2019 18:04:06 +0530 Subject: [PATCH 2/3] Addressed Review Comments. --- .../server/ratis/ContainerStateMachine.java | 2 +- .../rpc/TestContainerStateMachineFailures.java | 16 ++-------------- 2 files changed, 3 insertions(+), 15 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index c5a90fce93308..11545461493c6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -767,7 +767,7 @@ public CompletableFuture applyTransaction(TransactionContext trx) { // add the entry to the applyTransactionCompletionMap only if the // stateMachine is healthy i.e, there has been no applyTransaction // failures before. - if (stateMachineHealthy.get()) { + if (isStateMachineHealthy()) { final Long previous = applyTransactionCompletionMap .put(index, trx.getLogEntry().getTerm()); Preconditions.checkState(previous == null); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java index 476323c26536f..9ac45b8811682 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java @@ -375,8 +375,6 @@ public void testApplyTransactionIdempotencyWithClosedContainer() .getContainer(omKeyLocationInfo.getContainerID()) .getContainerData(); Assert.assertTrue(containerData instanceof KeyValueContainerData); - KeyValueContainerData keyValueContainerData = - (KeyValueContainerData) containerData; key.close(); ContainerStateMachine stateMachine = (ContainerStateMachine) ContainerTestHelper.getStateMachine(cluster); @@ -400,19 +398,11 @@ public void testApplyTransactionIdempotencyWithClosedContainer() request.setContainerID(containerID); request.setCloseContainer( ContainerProtos.CloseContainerRequestProto.getDefaultInstance()); - // close container transaction will fail over Ratis and will initiate - // a pipeline close action - - // Since the applyTransaction failure is propagated to Ratis, - // stateMachineUpdater will it exception while taking the next snapshot - // and should shutdown the RaftServerImpl. The client request will fail - // with RaftRetryFailureException. try { xceiverClient.sendCommand(request.build()); } catch (IOException e) { - Assert.fail("Exceptionn should not be thrown"); + Assert.fail("Exception should not be thrown"); } - // Make sure the container is marked unhealthy Assert.assertTrue( cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() .getContainer().getContainerSet().getContainer(containerID) @@ -420,12 +410,10 @@ public void testApplyTransactionIdempotencyWithClosedContainer() == ContainerProtos.ContainerDataProto.State.CLOSED); Assert.assertTrue(stateMachine.isStateMachineHealthy()); try { - // try to take a new snapshot, ideally it should just fail stateMachine.takeSnapshot(); } catch (IOException ioe) { - Assert.assertTrue(ioe instanceof StateMachineException); + Assert.fail("Exception should not be thrown"); } - // Make sure the latest snapshot is same as the previous one FileInfo latestSnapshot = storage.findLatestSnapshot().getFile(); Assert.assertFalse(snapshot.getPath().equals(latestSnapshot.getPath())); } From 96bf3ca46866c0991192d5d166a56516e8c52f2e Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Tue, 1 Oct 2019 22:05:56 +0530 Subject: [PATCH 3/3] Addressed checkstyle issues. --- .../common/transport/server/ratis/ContainerStateMachine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 11545461493c6..7b638a3cd8cb3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -270,7 +270,7 @@ public void persistContainerSet(OutputStream out) throws IOException { // container happens outside of Ratis. IOUtils.write(builder.build().toByteArray(), out); } - + public boolean isStateMachineHealthy() { return stateMachineHealthy.get(); }