Skip to content

Commit 0976f6f

Browse files
authored
HDDS-1766. ContainerStateMachine is unable to increment lastAppliedTermIndex. Contributed by Mukul Kumar Singh. (#1072)
1 parent 4a70a0d commit 0976f6f

File tree

1 file changed

+20
-19
lines changed
  • hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis

1 file changed

+20
-19
lines changed

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -197,17 +197,16 @@ private long loadSnapshot(SingleFileSnapshotInfo snapshot)
197197
if (snapshot == null) {
198198
TermIndex empty =
199199
TermIndex.newTermIndex(0, RaftLog.INVALID_LOG_INDEX);
200-
LOG.info(
201-
"The snapshot info is null." + "Setting the last applied index to:"
202-
+ empty);
200+
LOG.info("{}: The snapshot info is null. Setting the last applied index" +
201+
"to:{}", gid, empty);
203202
setLastAppliedTermIndex(empty);
204-
return RaftLog.INVALID_LOG_INDEX;
203+
return empty.getIndex();
205204
}
206205

207206
final File snapshotFile = snapshot.getFile().getPath().toFile();
208207
final TermIndex last =
209208
SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
210-
LOG.info("Setting the last applied index to " + last);
209+
LOG.info("{}: Setting the last applied index to {}", gid, last);
211210
setLastAppliedTermIndex(last);
212211

213212
// initialize the dispatcher with snapshot so that it build the missing
@@ -243,18 +242,20 @@ public void persistContainerSet(OutputStream out) throws IOException {
243242
@Override
244243
public long takeSnapshot() throws IOException {
245244
TermIndex ti = getLastAppliedTermIndex();
246-
LOG.info("Taking snapshot at termIndex:" + ti);
245+
long startTime = Time.monotonicNow();
247246
if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) {
248247
final File snapshotFile =
249248
storage.getSnapshotFile(ti.getTerm(), ti.getIndex());
250-
LOG.info("Taking a snapshot to file {}", snapshotFile);
249+
LOG.info("{}: Taking a snapshot at:{} file {}", gid, ti, snapshotFile);
251250
try (FileOutputStream fos = new FileOutputStream(snapshotFile)) {
252251
persistContainerSet(fos);
253252
} catch (IOException ioe) {
254-
LOG.warn("Failed to write snapshot file \"" + snapshotFile
255-
+ "\", last applied index=" + ti);
253+
LOG.info("{}: Failed to write snapshot at:{} file {}", gid, ti,
254+
snapshotFile);
256255
throw ioe;
257256
}
257+
LOG.info("{}: Finished taking a snapshot at:{} file:{} time:{}",
258+
gid, ti, snapshotFile, (Time.monotonicNow() - startTime));
258259
return ti.getIndex();
259260
}
260261
return -1;
@@ -337,7 +338,7 @@ private ContainerCommandRequestProto getContainerCommandRequestProto(
337338

338339
private ContainerCommandResponseProto dispatchCommand(
339340
ContainerCommandRequestProto requestProto, DispatcherContext context) {
340-
LOG.trace("dispatch {} containerID={} pipelineID={} traceID={}",
341+
LOG.trace("{}: dispatch {} containerID={} pipelineID={} traceID={}", gid,
341342
requestProto.getCmdType(), requestProto.getContainerID(),
342343
requestProto.getPipelineID(), requestProto.getTraceID());
343344
if (isBlockTokenEnabled) {
@@ -355,7 +356,7 @@ private ContainerCommandResponseProto dispatchCommand(
355356
}
356357
ContainerCommandResponseProto response =
357358
dispatcher.dispatch(requestProto, context);
358-
LOG.trace("response {}", response);
359+
LOG.trace("{}: response {}", gid, response);
359360
return response;
360361
}
361362

@@ -395,18 +396,18 @@ private CompletableFuture<Message> handleWriteChunk(
395396
.supplyAsync(() -> runCommand(requestProto, context), chunkExecutor);
396397

397398
writeChunkFutureMap.put(entryIndex, writeChunkFuture);
398-
LOG.debug("writeChunk writeStateMachineData : blockId " + write.getBlockID()
399-
+ " logIndex " + entryIndex + " chunkName " + write.getChunkData()
400-
.getChunkName());
399+
LOG.debug(gid + ": writeChunk writeStateMachineData : blockId " +
400+
write.getBlockID() + " logIndex " + entryIndex + " chunkName "
401+
+ write.getChunkData().getChunkName());
401402
// Remove the future once it finishes execution from the
402403
// writeChunkFutureMap.
403404
writeChunkFuture.thenApply(r -> {
404405
metrics.incNumBytesWrittenCount(
405406
requestProto.getWriteChunk().getChunkData().getLen());
406407
writeChunkFutureMap.remove(entryIndex);
407-
LOG.debug("writeChunk writeStateMachineData completed: blockId " + write
408-
.getBlockID() + " logIndex " + entryIndex + " chunkName " + write
409-
.getChunkData().getChunkName());
408+
LOG.debug(gid + ": writeChunk writeStateMachineData completed: blockId" +
409+
write.getBlockID() + " logIndex " + entryIndex + " chunkName "
410+
+ write.getChunkData().getChunkName());
410411
return r;
411412
});
412413
return writeChunkFuture;
@@ -564,12 +565,12 @@ public CompletableFuture<ByteString> readStateMachineData(
564565
}
565566
} catch (Exception e) {
566567
metrics.incNumReadStateMachineFails();
567-
LOG.error("unable to read stateMachineData:" + e);
568+
LOG.error("{} unable to read stateMachineData:", gid, e);
568569
return completeExceptionally(e);
569570
}
570571
}
571572

572-
private void updateLastApplied() {
573+
private synchronized void updateLastApplied() {
573574
Long appliedTerm = null;
574575
long appliedIndex = -1;
575576
for(long i = getLastAppliedTermIndex().getIndex() + 1;; i++) {

0 commit comments

Comments
 (0)