Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public class ContainerStateMachine extends BaseStateMachine {
private final Cache<Long, ByteString> stateMachineDataCache;
private final boolean isBlockTokenEnabled;
private final TokenVerifier tokenVerifier;
private final AtomicBoolean isStateMachineHealthy;
private final AtomicBoolean stateMachineHealthy;

private final Semaphore applyTransactionSemaphore;
/**
Expand Down Expand Up @@ -190,7 +190,7 @@ public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
ScmConfigKeys.
DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT);
applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
isStateMachineHealthy = new AtomicBoolean(true);
stateMachineHealthy = new AtomicBoolean(true);
this.executors = new ExecutorService[numContainerOpExecutors];
for (int i = 0; i < numContainerOpExecutors; i++) {
final int index = i;
Expand Down Expand Up @@ -271,11 +271,15 @@ public void persistContainerSet(OutputStream out) throws IOException {
IOUtils.write(builder.build().toByteArray(), out);
}

public boolean isStateMachineHealthy() {
return stateMachineHealthy.get();
}

@Override
public long takeSnapshot() throws IOException {
TermIndex ti = getLastAppliedTermIndex();
long startTime = Time.monotonicNow();
if (!isStateMachineHealthy.get()) {
if (!isStateMachineHealthy()) {
String msg =
"Failed to take snapshot " + " for " + gid + " as the stateMachine"
+ " is unhealthy. The last applied index is at " + ti;
Expand Down Expand Up @@ -731,7 +735,11 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
metrics.incPipelineLatency(cmdType,
Time.monotonicNowNanos() - startTime);
}
if (r.getResult() != ContainerProtos.Result.SUCCESS) {
// ignore close container exception while marking the stateMachine
// unhealthy
if (r.getResult() != ContainerProtos.Result.SUCCESS
&& r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
&& r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) {
StorageContainerException sce =
new StorageContainerException(r.getMessage(), r.getResult());
LOG.error(
Expand All @@ -744,7 +752,7 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
// caught in stateMachineUpdater in Ratis and ratis server will
// shutdown.
applyTransactionFuture.completeExceptionally(sce);
isStateMachineHealthy.compareAndSet(true, false);
stateMachineHealthy.compareAndSet(true, false);
ratisServer.handleApplyTransactionFailure(gid, trx.getServerRole());
} else {
LOG.debug(
Expand All @@ -759,7 +767,7 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
// add the entry to the applyTransactionCompletionMap only if the
// stateMachine is healthy i.e, there has been no applyTransaction
// failures before.
if (isStateMachineHealthy.get()) {
if (isStateMachineHealthy()) {
final Long previous = applyTransactionCompletionMap
.put(index, trx.getLogEntry().getTerm());
Preconditions.checkState(previous == null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,71 @@ public void testApplyTransactionFailure() throws Exception {
Assert.assertTrue(snapshot.getPath().equals(latestSnapshot.getPath()));
}

@Test
public void testApplyTransactionIdempotencyWithClosedContainer()
throws Exception {
OzoneOutputStream key =
objectStore.getVolume(volumeName).getBucket(bucketName)
.createKey("ratis", 1024, ReplicationType.RATIS,
ReplicationFactor.ONE, new HashMap<>());
// First write and flush creates a container in the datanode
key.write("ratis".getBytes());
key.flush();
key.write("ratis".getBytes());
KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
groupOutputStream.getLocationInfoList();
Assert.assertEquals(1, locationInfoList.size());
OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
ContainerData containerData =
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getContainerSet()
.getContainer(omKeyLocationInfo.getContainerID())
.getContainerData();
Assert.assertTrue(containerData instanceof KeyValueContainerData);
key.close();
ContainerStateMachine stateMachine =
(ContainerStateMachine) ContainerTestHelper.getStateMachine(cluster);
SimpleStateMachineStorage storage =
(SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
Path parentPath = storage.findLatestSnapshot().getFile().getPath();
// Since the snapshot threshold is set to 1, since there are
// applyTransactions, we should see snapshots
Assert.assertTrue(parentPath.getParent().toFile().listFiles().length > 0);
FileInfo snapshot = storage.findLatestSnapshot().getFile();
Assert.assertNotNull(snapshot);
long containerID = omKeyLocationInfo.getContainerID();
Pipeline pipeline = cluster.getStorageContainerLocationClient()
.getContainerWithPipeline(containerID).getPipeline();
XceiverClientSpi xceiverClient =
xceiverClientManager.acquireClient(pipeline);
ContainerProtos.ContainerCommandRequestProto.Builder request =
ContainerProtos.ContainerCommandRequestProto.newBuilder();
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
request.setCmdType(ContainerProtos.Type.CloseContainer);
request.setContainerID(containerID);
request.setCloseContainer(
ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
try {
xceiverClient.sendCommand(request.build());
} catch (IOException e) {
Assert.fail("Exception should not be thrown");
}
Assert.assertTrue(
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getContainerSet().getContainer(containerID)
.getContainerState()
== ContainerProtos.ContainerDataProto.State.CLOSED);
Assert.assertTrue(stateMachine.isStateMachineHealthy());
try {
stateMachine.takeSnapshot();
} catch (IOException ioe) {
Assert.fail("Exception should not be thrown");
}
FileInfo latestSnapshot = storage.findLatestSnapshot().getFile();
Assert.assertFalse(snapshot.getPath().equals(latestSnapshot.getPath()));
}

@Test
public void testValidateBCSIDOnDnRestart() throws Exception {
OzoneOutputStream key =
Expand Down