1919
2020import com .google .common .annotations .VisibleForTesting ;
2121import com .google .common .base .Preconditions ;
22+ import com .google .common .util .concurrent .ThreadFactoryBuilder ;
2223import com .google .protobuf .ServiceException ;
2324import java .io .IOException ;
2425import java .util .Collection ;
2526import java .util .UUID ;
2627import java .util .concurrent .CompletableFuture ;
28+ import java .util .concurrent .ExecutorService ;
29+ import java .util .concurrent .Executors ;
30+ import java .util .concurrent .ThreadFactory ;
31+
2732import org .apache .hadoop .ozone .container .common .transport .server .ratis
2833 .ContainerStateMachine ;
2934import org .apache .hadoop .ozone .om .OzoneManager ;
@@ -70,6 +75,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
7075 private RaftGroupId raftGroupId ;
7176 private long lastAppliedIndex = 0 ;
7277 private final OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer ;
78+ private final ExecutorService executorService ;
7379
7480 public OzoneManagerStateMachine (OzoneManagerRatisServer ratisServer ) {
7581 this .omRatisServer = ratisServer ;
@@ -79,6 +85,9 @@ public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
7985 this ::updateLastAppliedIndex );
8086 this .handler = new OzoneManagerHARequestHandlerImpl (ozoneManager ,
8187 ozoneManagerDoubleBuffer );
88+ ThreadFactory build = new ThreadFactoryBuilder ().setDaemon (true )
89+ .setNameFormat ("OM StateMachine ApplyTransaction Thread - %d" ).build ();
90+ this .executorService = Executors .newSingleThreadExecutor (build );
8291 }
8392
8493 /**
@@ -132,8 +141,36 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
132141 OMRequest request = OMRatisHelper .convertByteStringToOMRequest (
133142 trx .getStateMachineLogEntry ().getLogData ());
134143 long trxLogIndex = trx .getLogEntry ().getIndex ();
135- CompletableFuture <Message > future = CompletableFuture
136- .supplyAsync (() -> runCommand (request , trxLogIndex ));
144+ // In the current approach we have one single global thread executor.
145+ // with single thread. Right now this is being done for correctness, as
146+ // applyTransaction will be run on multiple OM's we want to execute the
147+ // transactions in the same order on all OM's, otherwise there is a
148+ // chance that OM replica's can be out of sync.
149+ // TODO: In this way we are making all applyTransactions in
150+ // OM serial order. Revisit this in future to use multiple executors for
151+ // volume/bucket.
152+
153+ // Reason for not immediately implementing executor per volume is, if
154+ // one executor operations are slow, we cannot update the
155+ // lastAppliedIndex in OzoneManager StateMachine, even if other
156+ // executor has completed the transactions with id more.
157+
158+ // We have 300 transactions, And for each volume we have transactions
159+ // of 150. Volume1 transactions 0 - 149 and Volume2 transactions 150 -
160+ // 299.
161+ // Example: Executor1 - Volume1 - 100 (current completed transaction)
162+ // Example: Executor2 - Volume2 - 299 (current completed transaction)
163+
164+ // Now we have applied transactions of 0 - 100 and 149 - 299. We
165+ // cannot update lastAppliedIndex to 299. We need to update it to 100,
166+ // since 101 - 149 are not applied. When OM restarts it will
167+ // applyTransactions from lastAppliedIndex.
168+ // We can update the lastAppliedIndex to 100, and update it to 299,
169+ // only after completing 101 - 149. In initial stage, we are starting
170+ // with single global executor. Will revisit this when needed.
171+
172+ CompletableFuture <Message > future = CompletableFuture .supplyAsync (
173+ () -> runCommand (request , trxLogIndex ), executorService );
137174 return future ;
138175 } catch (IOException e ) {
139176 return completeExceptionally (e );
0 commit comments