Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.ozone.container.common.transport.server.ratis
.ContainerStateMachine;
import org.apache.hadoop.ozone.om.OzoneManager;
Expand All @@ -38,6 +43,7 @@
import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandler;
import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandlerImpl;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
Expand Down Expand Up @@ -70,6 +76,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
private RaftGroupId raftGroupId;
private long lastAppliedIndex = 0;
private final OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
private final ExecutorService executorService;

public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
this.omRatisServer = ratisServer;
Expand All @@ -79,6 +86,9 @@ public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
this::updateLastAppliedIndex);
this.handler = new OzoneManagerHARequestHandlerImpl(ozoneManager,
ozoneManagerDoubleBuffer);
ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("OM StateMachine ApplyTransaction Thread - %d").build();
this.executorService = HadoopExecutors.newSingleThreadExecutor(build);
}

/**
Expand Down Expand Up @@ -132,8 +142,36 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
OMRequest request = OMRatisHelper.convertByteStringToOMRequest(
trx.getStateMachineLogEntry().getLogData());
long trxLogIndex = trx.getLogEntry().getIndex();
CompletableFuture<Message> future = CompletableFuture
.supplyAsync(() -> runCommand(request, trxLogIndex));
// In the current approach we have one single global thread executor.
// with single thread. Right now this is being done for correctness, as
// applyTransaction will be run on multiple OM's we want to execute the
// transactions in the same order on all OM's, otherwise there is a
// chance that OM replica's can be out of sync.
// TODO: In this way we are making all applyTransactions in
// OM serial order. Revisit this in future to use multiple executors for
// volume/bucket.

// Reason for not immediately implementing executor per volume is, if
// one executor operations are slow, we cannot update the
// lastAppliedIndex in OzoneManager StateMachine, even if other
// executor has completed the transactions with id more.

// We have 300 transactions, And for each volume we have transactions
// of 150. Volume1 transactions 0 - 149 and Volume2 transactions 150 -
// 299.
// Example: Executor1 - Volume1 - 100 (current completed transaction)
// Example: Executor2 - Volume2 - 299 (current completed transaction)

// Now we have applied transactions of 0 - 100 and 149 - 299. We
// cannot update lastAppliedIndex to 299. We need to update it to 100,
// since 101 - 149 are not applied. When OM restarts it will
// applyTransactions from lastAppliedIndex.
// We can update the lastAppliedIndex to 100, and update it to 299,
// only after completing 101 - 149. In initial stage, we are starting
// with single global executor. Will revisit this when needed.

CompletableFuture<Message> future = CompletableFuture.supplyAsync(
() -> runCommand(request, trxLogIndex), executorService);
return future;
} catch (IOException e) {
return completeExceptionally(e);
Expand Down Expand Up @@ -301,6 +339,7 @@ public void setRaftGroupId(RaftGroupId raftGroupId) {

public void stop() {
ozoneManagerDoubleBuffer.stop();
HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
}

}