3232import org .elasticsearch .common .collect .Tuple ;
3333import org .elasticsearch .common .io .Channels ;
3434import org .elasticsearch .common .io .DiskIoBufferPool ;
35- import org .elasticsearch .common .lease . Releasable ;
35+ import org .elasticsearch .common .io . stream . ReleasableBytesStreamOutput ;
3636import org .elasticsearch .common .lease .Releasables ;
3737import org .elasticsearch .common .unit .ByteSizeValue ;
38+ import org .elasticsearch .common .util .BigArrays ;
3839import org .elasticsearch .common .util .concurrent .ReleasableLock ;
3940import org .elasticsearch .core .internal .io .IOUtils ;
4041import org .elasticsearch .index .seqno .SequenceNumbers ;
4647import java .nio .channels .FileChannel ;
4748import java .nio .file .Path ;
4849import java .nio .file .StandardOpenOption ;
49- import java .util .ArrayDeque ;
50- import java .util .ArrayList ;
5150import java .util .HashMap ;
5251import java .util .Map ;
5352import java .util .Objects ;
@@ -61,6 +60,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
6160 private final ShardId shardId ;
6261 private final FileChannel checkpointChannel ;
6362 private final Path checkpointPath ;
63+ private final BigArrays bigArrays ;
6464 // the last checkpoint that was written when the translog was last synced
6565 private volatile Checkpoint lastSyncedCheckpoint ;
6666 /* the number of translog operations written to this file */
@@ -87,8 +87,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
8787
8888 private LongArrayList nonFsyncedSequenceNumbers = new LongArrayList (64 );
8989 private final int forceWriteThreshold ;
90- private final ArrayList <ReleasableBytesReference > bufferedOps = new ArrayList <>();
91- private long bufferedBytes = 0L ;
90+ private ReleasableBytesStreamOutput buffer ;
9291
9392 private final Map <Long , Tuple <BytesReference , Exception >> seenSequenceNumbers ;
9493
@@ -101,8 +100,9 @@ private TranslogWriter(
101100 final Path checkpointPath ,
102101 final ByteSizeValue bufferSize ,
103102 final LongSupplier globalCheckpointSupplier , LongSupplier minTranslogGenerationSupplier , TranslogHeader header ,
104- TragicExceptionHolder tragedy ,
105- final LongConsumer persistedSequenceNumberConsumer )
103+ final TragicExceptionHolder tragedy ,
104+ final LongConsumer persistedSequenceNumberConsumer ,
105+ final BigArrays bigArrays )
106106 throws
107107 IOException {
108108 super (initialCheckpoint .generation , channel , path , header );
@@ -123,14 +123,16 @@ private TranslogWriter(
123123 assert initialCheckpoint .trimmedAboveSeqNo == SequenceNumbers .UNASSIGNED_SEQ_NO : initialCheckpoint .trimmedAboveSeqNo ;
124124 this .globalCheckpointSupplier = globalCheckpointSupplier ;
125125 this .persistedSequenceNumberConsumer = persistedSequenceNumberConsumer ;
126+ this .bigArrays = bigArrays ;
126127 this .seenSequenceNumbers = Assertions .ENABLED ? new HashMap <>() : null ;
127128 this .tragedy = tragedy ;
128129 }
129130
130131 public static TranslogWriter create (ShardId shardId , String translogUUID , long fileGeneration , Path file , ChannelFactory channelFactory ,
131132 ByteSizeValue bufferSize , final long initialMinTranslogGen , long initialGlobalCheckpoint ,
132133 final LongSupplier globalCheckpointSupplier , final LongSupplier minTranslogGenerationSupplier ,
133- final long primaryTerm , TragicExceptionHolder tragedy , LongConsumer persistedSequenceNumberConsumer )
134+ final long primaryTerm , TragicExceptionHolder tragedy ,
135+ final LongConsumer persistedSequenceNumberConsumer , final BigArrays bigArrays )
134136 throws IOException {
135137 final Path checkpointFile = file .getParent ().resolve (Translog .CHECKPOINT_FILE_NAME );
136138
@@ -155,7 +157,7 @@ public static TranslogWriter create(ShardId shardId, String translogUUID, long f
155157 writerGlobalCheckpointSupplier = globalCheckpointSupplier ;
156158 }
157159 return new TranslogWriter (shardId , checkpoint , channel , checkpointChannel , file , checkpointFile , bufferSize ,
158- writerGlobalCheckpointSupplier , minTranslogGenerationSupplier , header , tragedy , persistedSequenceNumberConsumer );
160+ writerGlobalCheckpointSupplier , minTranslogGenerationSupplier , header , tragedy , persistedSequenceNumberConsumer , bigArrays );
159161 } catch (Exception exception ) {
160162 // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that
161163 // file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation
@@ -182,15 +184,17 @@ private synchronized void closeWithTragicEvent(final Exception ex) {
182184 * @return the location the bytes were written to
183185 * @throws IOException if writing to the translog resulted in an I/O exception
184186 */
185- public Translog .Location add (final ReleasableBytesReference data , final long seqNo ) throws IOException {
187+ public Translog .Location add (final BytesReference data , final long seqNo ) throws IOException {
186188 final Translog .Location location ;
187189 final long bytesBufferedAfterAdd ;
188190 synchronized (this ) {
189191 ensureOpen ();
192+ if (buffer == null ) {
193+ buffer = new ReleasableBytesStreamOutput (bigArrays );
194+ }
190195 final long offset = totalOffset ;
191196 totalOffset += data .length ();
192- bufferedBytes += data .length ();
193- bufferedOps .add (data .retain ());
197+ data .writeTo (buffer );
194198
195199 assert minSeqNo != SequenceNumbers .NO_OPS_PERFORMED || operationCounter == 0 ;
196200 assert maxSeqNo != SequenceNumbers .NO_OPS_PERFORMED || operationCounter == 0 ;
@@ -205,7 +209,7 @@ public Translog.Location add(final ReleasableBytesReference data, final long seq
205209 assert assertNoSeqNumberConflict (seqNo , data );
206210
207211 location = new Translog .Location (generation , offset , data .length ());
208- bytesBufferedAfterAdd = bufferedBytes ;
212+ bytesBufferedAfterAdd = buffer . size () ;
209213 }
210214
211215 if (bytesBufferedAfterAdd >= forceWriteThreshold ) {
@@ -335,7 +339,7 @@ public TranslogReader closeIntoReader() throws IOException {
335339 throw ex ;
336340 }
337341 // If we reached this point, all of the buffered ops should have been flushed successfully.
338- assert bufferedOps . size () == 0 ;
342+ assert buffer == null ;
339343 assert checkChannelPositionWhileHandlingException (totalOffset );
340344 assert totalOffset == lastSyncedCheckpoint .offset ;
341345 if (closed .compareAndSet (false , true )) {
@@ -372,7 +376,7 @@ public TranslogSnapshot newSnapshot() {
372376 throw new TranslogException (shardId , "exception while syncing before creating a snapshot" , e );
373377 }
374378 // If we reached this point, all of the buffered ops should have been flushed successfully.
375- assert bufferedOps . size () == 0 ;
379+ assert buffer == null ;
376380 assert checkChannelPositionWhileHandlingException (totalOffset );
377381 assert totalOffset == lastSyncedCheckpoint .offset ;
378382 return super .newSnapshot ();
@@ -398,7 +402,7 @@ final boolean syncUpTo(long offset) throws IOException {
398402 // the lock we should check again since if this code is busy we might have fsynced enough already
399403 final Checkpoint checkpointToSync ;
400404 final LongArrayList flushedSequenceNumbers ;
401- final ArrayDeque < ReleasableBytesReference > toWrite ;
405+ final ReleasableBytesReference toWrite ;
402406 try (ReleasableLock toClose = writeLock .acquire ()) {
403407 synchronized (this ) {
404408 ensureOpen ();
@@ -449,44 +453,39 @@ private void writeBufferedOps(long offset, boolean blockOnExistingWriter) throws
449453 }
450454 }
451455
452- private synchronized ArrayDeque < ReleasableBytesReference > pollOpsToWrite () {
456+ private synchronized ReleasableBytesReference pollOpsToWrite () {
453457 ensureOpen ();
454- final ArrayDeque <ReleasableBytesReference > operationsToWrite = new ArrayDeque <>(bufferedOps .size ());
455- operationsToWrite .addAll (bufferedOps );
456- bufferedOps .clear ();
457- bufferedBytes = 0 ;
458- return operationsToWrite ;
458+ if (this .buffer != null ) {
459+ ReleasableBytesStreamOutput toWrite = this .buffer ;
460+ this .buffer = null ;
461+ return new ReleasableBytesReference (toWrite .bytes (), toWrite );
462+ } else {
463+ return ReleasableBytesReference .wrap (BytesArray .EMPTY );
464+ }
459465 }
460466
461- private void writeAndReleaseOps (final ArrayDeque < ReleasableBytesReference > operationsToWrite ) throws IOException {
462- try {
467+ private void writeAndReleaseOps (ReleasableBytesReference toWrite ) throws IOException {
468+ try ( ReleasableBytesReference toClose = toWrite ) {
463469 assert writeLock .isHeldByCurrentThread ();
464470 ByteBuffer ioBuffer = DiskIoBufferPool .getIoBuffer ();
465471
466- ReleasableBytesReference operation ;
467- while ((operation = operationsToWrite .pollFirst ()) != null ) {
468- try (Releasable toClose = operation ) {
469- BytesRefIterator iterator = operation .iterator ();
470- BytesRef current ;
471- while ((current = iterator .next ()) != null ) {
472- int currentBytesConsumed = 0 ;
473- while (currentBytesConsumed != current .length ) {
474- int nBytesToWrite = Math .min (current .length - currentBytesConsumed , ioBuffer .remaining ());
475- ioBuffer .put (current .bytes , current .offset + currentBytesConsumed , nBytesToWrite );
476- currentBytesConsumed += nBytesToWrite ;
477- if (ioBuffer .hasRemaining () == false ) {
478- ioBuffer .flip ();
479- writeToFile (ioBuffer );
480- ioBuffer .clear ();
481- }
482- }
472+ BytesRefIterator iterator = toWrite .iterator ();
473+ BytesRef current ;
474+ while ((current = iterator .next ()) != null ) {
475+ int currentBytesConsumed = 0 ;
476+ while (currentBytesConsumed != current .length ) {
477+ int nBytesToWrite = Math .min (current .length - currentBytesConsumed , ioBuffer .remaining ());
478+ ioBuffer .put (current .bytes , current .offset + currentBytesConsumed , nBytesToWrite );
479+ currentBytesConsumed += nBytesToWrite ;
480+ if (ioBuffer .hasRemaining () == false ) {
481+ ioBuffer .flip ();
482+ writeToFile (ioBuffer );
483+ ioBuffer .clear ();
483484 }
484485 }
485486 }
486487 ioBuffer .flip ();
487488 writeToFile (ioBuffer );
488- } finally {
489- Releasables .close (operationsToWrite );
490489 }
491490 }
492491
@@ -550,8 +549,8 @@ private boolean checkChannelPositionWhileHandlingException(long expectedOffset)
550549 public final void close () throws IOException {
551550 if (closed .compareAndSet (false , true )) {
552551 synchronized (this ) {
553- Releasables .closeWhileHandlingException (bufferedOps );
554- bufferedOps . clear () ;
552+ Releasables .closeWhileHandlingException (buffer );
553+ buffer = null ;
555554 }
556555 IOUtils .close (checkpointChannel , channel );
557556 }
0 commit comments