@@ -433,23 +433,21 @@ private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruptio
433433 }
434434 }
435435
436- // add the nonce to the map
437436 if (nonceKey != null ) {
438- nonceKeysToProcIdsMap .put (nonceKey , procId );
437+ nonceKeysToProcIdsMap .put (nonceKey , procId ); // add the nonce to the map
439438 }
440439 }
441440
442- // 2. Initialize the stacks
443- // In the old implementation, for procedures in FAILED state, we will push it into the
444- // ProcedureScheduler directly to execute the rollback. But this does not work after we
445- // introduce the restore lock stage.
446- // For now, when we acquire a xlock, we will remove the queue from runQueue in scheduler, and
447- // then when a procedure which has lock access, for example, a sub procedure of the procedure
448- // which has the xlock, is pushed into the scheduler, we will add the queue back to let the
449- // workers poll from it. The assumption here is that, the procedure which has the xlock should
450- // have been polled out already, so when loading we can not add the procedure to scheduler first
451- // and then call acquireLock, since the procedure is still in the queue, and since we will
452- // remove the queue from runQueue, then no one can poll it out, then there is a dead lock
441+ // 2. Initialize the stacks: In the old implementation, for procedures in FAILED state, we will
442+ // push it into the ProcedureScheduler directly to execute the rollback. But this does not work
443+ // after we introduce the restore lock stage. For now, when we acquire a xlock, we will remove
444+ // the queue from runQueue in scheduler, and then when a procedure which has lock access, for
445+ // example, a sub procedure of the procedure which has the xlock, is pushed into the scheduler,
446+ // we will add the queue back to let the workers poll from it. The assumption here is that, the
447+ // procedure which has the xlock should have been polled out already, so when loading we can not
448+ // add the procedure to scheduler first and then call acquireLock, since the procedure is still
449+ // in the queue, and since we will remove the queue from runQueue, then no one can poll it out,
450+ // then there is a dead lock
453451 List <Procedure <TEnvironment >> runnableList = new ArrayList <>(runnableCount );
454452 List <Procedure <TEnvironment >> failedList = new ArrayList <>(failedCount );
455453 List <Procedure <TEnvironment >> waitingList = new ArrayList <>(waitingCount );
@@ -464,9 +462,7 @@ private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruptio
464462 @ SuppressWarnings ("unchecked" )
465463 Procedure <TEnvironment > proc = procIter .next ();
466464 assert !(proc .isFinished () && !proc .hasParent ()) : "unexpected completed proc=" + proc ;
467-
468465 LOG .debug ("Loading {}" , proc );
469-
470466 Long rootProcId = getRootProcedureId (proc );
471467 // The orphan procedures will be passed to handleCorrupted, so add an assert here
472468 assert rootProcId != null ;
@@ -508,14 +504,12 @@ private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruptio
508504 // 3. Check the waiting procedures to see if some of them can be added to runnable.
509505 waitingList .forEach (proc -> {
510506 if (!proc .hasChildren ()) {
511- // Normally, WAITING procedures should be waken by its children.
512- // But, there is a case that, all the children are successful and before
513- // they can wake up their parent procedure, the master was killed.
514- // So, during recovering the procedures from ProcedureWal, its children
515- // are not loaded because of their SUCCESS state.
516- // So we need to continue to run this WAITING procedure. But before
517- // executing, we need to set its state to RUNNABLE, otherwise, a exception
518- // will throw:
507+ // Normally, WAITING procedures should be waken by its children. But, there is a case that,
508+ // all the children are successful and before they can wake up their parent procedure, the
509+ // master was killed. So, during recovering the procedures from ProcedureWal, its children
510+ // are not loaded because of their SUCCESS state. So we need to continue to run this WAITING
511+ // procedure. But before executing, we need to set its state to RUNNABLE, otherwise, a
512+ // exception will throw:
519513 // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
520514 // "NOT RUNNABLE! " + procedure.toString());
521515 proc .setState (ProcedureState .RUNNABLE );
@@ -743,9 +737,9 @@ public boolean removeChore(ProcedureInMemoryChore<TEnvironment> chore) {
743737 // Nonce Procedure helpers
744738 // ==========================================================================
745739 /**
746- * Create a NoneKey from the specified nonceGroup and nonce.
747- * @param nonceGroup
748- * @param nonce
740+ * Create a NonceKey from the specified nonceGroup and nonce.
741+ * @param nonceGroup the group to use for the {@link NonceKey}
742+ * @param nonce the nonce to use in the {@link NonceKey}
749743 * @return the generated NonceKey
750744 */
751745 public NonceKey createNonceKey (final long nonceGroup , final long nonce ) {
@@ -764,7 +758,9 @@ public NonceKey createNonceKey(final long nonceGroup, final long nonce) {
764758 * @return the procId associated with the nonce, if any otherwise an invalid procId.
765759 */
766760 public long registerNonce (final NonceKey nonceKey ) {
767- if (nonceKey == null ) return -1 ;
761+ if (nonceKey == null ) {
762+ return -1 ;
763+ }
768764
769765 // check if we have already a Reserved ID for the nonce
770766 Long oldProcId = nonceKeysToProcIdsMap .get (nonceKey );
@@ -773,7 +769,9 @@ public long registerNonce(final NonceKey nonceKey) {
773769 // and the procedure submitted with the specified nonce will use this ID.
774770 final long newProcId = nextProcId ();
775771 oldProcId = nonceKeysToProcIdsMap .putIfAbsent (nonceKey , newProcId );
776- if (oldProcId == null ) return -1 ;
772+ if (oldProcId == null ) {
773+ return -1 ;
774+ }
777775 }
778776
779777 // we found a registered nonce, but the procedure may not have been submitted yet.
@@ -795,10 +793,14 @@ public long registerNonce(final NonceKey nonceKey) {
795793 * @param nonceKey A unique identifier for this operation from the client or process.
796794 */
797795 public void unregisterNonceIfProcedureWasNotSubmitted (final NonceKey nonceKey ) {
798- if (nonceKey == null ) return ;
796+ if (nonceKey == null ) {
797+ return ;
798+ }
799799
800800 final Long procId = nonceKeysToProcIdsMap .get (nonceKey );
801- if (procId == null ) return ;
801+ if (procId == null ) {
802+ return ;
803+ }
802804
803805 // if the procedure was not submitted, remove the nonce
804806 if (!(procedures .containsKey (procId ) || completed .containsKey (procId ))) {
@@ -1295,8 +1297,9 @@ private long nextProcId() {
12951297 if (procId < 0 ) {
12961298 while (!lastProcId .compareAndSet (procId , 0 )) {
12971299 procId = lastProcId .get ();
1298- if (procId >= 0 )
1300+ if (procId >= 0 ) {
12991301 break ;
1302+ }
13001303 }
13011304 while (procedures .containsKey (procId )) {
13021305 procId = lastProcId .incrementAndGet ();
@@ -2003,7 +2006,6 @@ protected boolean keepAlive(long lastUpdate) {
20032006 // A worker thread which can be added when core workers are stuck. Will timeout after
20042007 // keepAliveTime if there is no procedure to run.
20052008 private final class KeepAliveWorkerThread extends WorkerThread {
2006-
20072009 public KeepAliveWorkerThread (ThreadGroup group ) {
20082010 super (group , "KeepAlivePEWorker-" );
20092011 }
0 commit comments