2424import com .lmax .disruptor .RingBuffer ;
2525import com .lmax .disruptor .Sequence ;
2626import com .lmax .disruptor .Sequencer ;
27- import io .opentelemetry .api .trace .Span ;
28- import io .opentelemetry .context .Scope ;
2927import java .io .IOException ;
3028import java .lang .reflect .Field ;
3129import java .util .ArrayDeque ;
5351import org .apache .hadoop .hbase .HBaseInterfaceAudience ;
5452import org .apache .hadoop .hbase .client .RegionInfo ;
5553import org .apache .hadoop .hbase .io .asyncfs .AsyncFSOutput ;
56- import org .apache .hadoop .hbase .trace .TraceUtil ;
5754import org .apache .hadoop .hbase .wal .AsyncFSWALProvider ;
5855import org .apache .hadoop .hbase .wal .WALEdit ;
5956import org .apache .hadoop .hbase .wal .WALKeyImpl ;
@@ -345,7 +342,7 @@ private void syncCompleted(AsyncWriter writer, long processedTxid, long startTim
345342 break ;
346343 }
347344 }
348- postSync (System .nanoTime () - startTimeNs , finishSync (true ));
345+ postSync (System .nanoTime () - startTimeNs , finishSync ());
349346 if (trySetReadyForRolling ()) {
350347 // we have just finished a roll, then do not need to check for log rolling, the writer will be
351348 // closed soon.
@@ -394,23 +391,14 @@ private void sync(AsyncWriter writer) {
394391 }, consumeExecutor );
395392 }
396393
397- private void addTimeAnnotation (SyncFuture future , String annotation ) {
398- Span .current ().addEvent (annotation );
399- // TODO handle htrace API change, see HBASE-18895
400- // future.setSpan(scope.getSpan());
401- }
402-
403- private int finishSyncLowerThanTxid (long txid , boolean addSyncTrace ) {
394+ private int finishSyncLowerThanTxid (long txid ) {
404395 int finished = 0 ;
405396 for (Iterator <SyncFuture > iter = syncFutures .iterator (); iter .hasNext ();) {
406397 SyncFuture sync = iter .next ();
407398 if (sync .getTxid () <= txid ) {
408399 sync .done (txid , null );
409400 iter .remove ();
410401 finished ++;
411- if (addSyncTrace ) {
412- addTimeAnnotation (sync , "writer synced" );
413- }
414402 } else {
415403 break ;
416404 }
@@ -419,7 +407,7 @@ private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) {
419407 }
420408
421409 // try advancing the highestSyncedTxid as much as possible
422- private int finishSync (boolean addSyncTrace ) {
410+ private int finishSync () {
423411 if (unackedAppends .isEmpty ()) {
424412 // All outstanding appends have been acked.
425413 if (toWriteAppends .isEmpty ()) {
@@ -428,9 +416,6 @@ private int finishSync(boolean addSyncTrace) {
428416 for (SyncFuture sync : syncFutures ) {
429417 maxSyncTxid = Math .max (maxSyncTxid , sync .getTxid ());
430418 sync .done (maxSyncTxid , null );
431- if (addSyncTrace ) {
432- addTimeAnnotation (sync , "writer synced" );
433- }
434419 }
435420 highestSyncedTxid .set (maxSyncTxid );
436421 int finished = syncFutures .size ();
@@ -444,23 +429,23 @@ private int finishSync(boolean addSyncTrace) {
444429 assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid ;
445430 long doneTxid = lowestUnprocessedAppendTxid - 1 ;
446431 highestSyncedTxid .set (doneTxid );
447- return finishSyncLowerThanTxid (doneTxid , addSyncTrace );
432+ return finishSyncLowerThanTxid (doneTxid );
448433 }
449434 } else {
450435 // There are still unacked appends. So let's move the highestSyncedTxid to the txid of the
451436 // first unacked append minus 1.
452437 long lowestUnackedAppendTxid = unackedAppends .peek ().getTxid ();
453438 long doneTxid = Math .max (lowestUnackedAppendTxid - 1 , highestSyncedTxid .get ());
454439 highestSyncedTxid .set (doneTxid );
455- return finishSyncLowerThanTxid (doneTxid , addSyncTrace );
440+ return finishSyncLowerThanTxid (doneTxid );
456441 }
457442 }
458443
459444 private void appendAndSync () {
460445 final AsyncWriter writer = this .writer ;
461446 // maybe a sync request is not queued when we issue a sync, so check here to see if we could
462447 // finish some.
463- finishSync (false );
448+ finishSync ();
464449 long newHighestProcessedAppendTxid = -1L ;
465450 for (Iterator <FSWALEntry > iter = toWriteAppends .iterator (); iter .hasNext ();) {
466451 FSWALEntry entry = iter .next ();
@@ -501,7 +486,7 @@ private void appendAndSync() {
501486 // stamped some region sequence id.
502487 if (unackedAppends .isEmpty ()) {
503488 highestSyncedTxid .set (highestProcessedAppendTxid );
504- finishSync (false );
489+ finishSync ();
505490 trySetReadyForRolling ();
506491 }
507492 return ;
@@ -648,74 +633,54 @@ protected boolean markerEditOnly() {
648633
649634 @ Override
650635 protected long append (RegionInfo hri , WALKeyImpl key , WALEdit edits , boolean inMemstore )
651- throws IOException {
652- if (markerEditOnly () && !edits .isMetaEdit ()) {
653- throw new IOException ("WAL is closing, only marker edit is allowed" );
654- }
655- long txid = stampSequenceIdAndPublishToRingBuffer ( hri , key , edits , inMemstore ,
656- waitingConsumePayloads );
636+ throws IOException {
637+ if (markerEditOnly () && !edits .isMetaEdit ()) {
638+ throw new IOException ("WAL is closing, only marker edit is allowed" );
639+ }
640+ long txid =
641+ stampSequenceIdAndPublishToRingBuffer ( hri , key , edits , inMemstore , waitingConsumePayloads );
657642 if (shouldScheduleConsumer ()) {
658643 consumeExecutor .execute (consumer );
659644 }
660645 return txid ;
661646 }
662647
663648 @ Override
664- public void sync () throws IOException {
665- sync (useHsync );
666- }
667-
668- @ Override
669- public void sync (long txid ) throws IOException {
670- sync (txid , useHsync );
671- }
672-
673- @ Override
674- public void sync (boolean forceSync ) throws IOException {
675- Span span = TraceUtil .getGlobalTracer ().spanBuilder ("AsyncFSWAL.sync" ).startSpan ();
676- try (Scope scope = span .makeCurrent ()) {
677- long txid = waitingConsumePayloads .next ();
678- SyncFuture future ;
679- try {
680- future = getSyncFuture (txid , forceSync );
681- RingBufferTruck truck = waitingConsumePayloads .get (txid );
682- truck .load (future );
683- } finally {
684- waitingConsumePayloads .publish (txid );
685- }
686- if (shouldScheduleConsumer ()) {
687- consumeExecutor .execute (consumer );
688- }
689- blockOnSync (future );
649+ protected void doSync (boolean forceSync ) throws IOException {
650+ long txid = waitingConsumePayloads .next ();
651+ SyncFuture future ;
652+ try {
653+ future = getSyncFuture (txid , forceSync );
654+ RingBufferTruck truck = waitingConsumePayloads .get (txid );
655+ truck .load (future );
690656 } finally {
691- span . end ( );
657+ waitingConsumePayloads . publish ( txid );
692658 }
659+ if (shouldScheduleConsumer ()) {
660+ consumeExecutor .execute (consumer );
661+ }
662+ blockOnSync (future );
693663 }
694664
695665 @ Override
696- public void sync (long txid , boolean forceSync ) throws IOException {
666+ protected void doSync (long txid , boolean forceSync ) throws IOException {
697667 if (highestSyncedTxid .get () >= txid ) {
698668 return ;
699669 }
700- Span span = TraceUtil .getGlobalTracer ().spanBuilder ("AsyncFSWAL.sync" ).startSpan ();
701- try (Scope scope = span .makeCurrent ()) {
702- // here we do not use ring buffer sequence as txid
703- long sequence = waitingConsumePayloads .next ();
704- SyncFuture future ;
705- try {
706- future = getSyncFuture (txid , forceSync );
707- RingBufferTruck truck = waitingConsumePayloads .get (sequence );
708- truck .load (future );
709- } finally {
710- waitingConsumePayloads .publish (sequence );
711- }
712- if (shouldScheduleConsumer ()) {
713- consumeExecutor .execute (consumer );
714- }
715- blockOnSync (future );
670+ // here we do not use ring buffer sequence as txid
671+ long sequence = waitingConsumePayloads .next ();
672+ SyncFuture future ;
673+ try {
674+ future = getSyncFuture (txid , forceSync );
675+ RingBufferTruck truck = waitingConsumePayloads .get (sequence );
676+ truck .load (future );
716677 } finally {
717- span .end ();
678+ waitingConsumePayloads .publish (sequence );
679+ }
680+ if (shouldScheduleConsumer ()) {
681+ consumeExecutor .execute (consumer );
718682 }
683+ blockOnSync (future );
719684 }
720685
721686 protected final AsyncWriter createAsyncWriter (FileSystem fs , Path path ) throws IOException {
0 commit comments