@@ -183,12 +183,11 @@ protected void readInternal(ByteBuffer b) throws IOException {
183183
184184 // Cache miss may be that the cache is completely unavailable (no point in populating it) or that the blob is
185185 // definitely absent. TODO only bother populating the cache in the latter case.
186-
187186 }
188187
189188 // We would have liked to find a cached entry but we did not find anything: the cache on the disk will be requested so
190- // we compute the regions of the file we would like to have the next time. The regions are expressed as tuple of
191- // {position, length} where position is relative to the file.
189+ // we compute the regions of the file we would like to have the next time. The regions are expressed as tuples of
190+ // {start, end} ranges where positions are relative to the whole file.
192191 if (canBeFullyCached ) {
193192 // if the index input is smaller than twice the size of the blob cache, it will be fully indexed
194193 indexCacheMisses = List .of (Tuple .tuple (0L , fileInfo .length ()));
@@ -200,109 +199,148 @@ protected void readInternal(ByteBuffer b) throws IOException {
200199 indexCacheMisses = List .of ();
201200 }
202201
203- int totalBytesRead = 0 ;
204- while (totalBytesRead < length ) {
205- // TODO lose this loop once confirmed that it really isn't necessary
206- assert totalBytesRead == 0 : "readInternal read only [" + totalBytesRead + "] of [" + length + "] bytes for " + this ;
207- final long pos = position + totalBytesRead ;
208- final int len = length - totalBytesRead ;
209- int bytesRead = 0 ;
210- try {
211- final CacheFile cacheFile = getCacheFileSafe ();
212- try (Releasable ignored = cacheFile .fileLock ()) {
213-
214- // Read all target ranges in one go, including any cache misses identified above.
215- final Tuple <Long , Long > startRangeToWrite = computeRange (pos );
216- final Tuple <Long , Long > endRangeToWrite = computeRange (pos + len - 1 );
217- assert startRangeToWrite .v2 () <= endRangeToWrite .v2 () : startRangeToWrite + " vs " + endRangeToWrite ;
218- final Tuple <Long , Long > rangeToWrite = Tuple .tuple (
219- Math .min (startRangeToWrite .v1 (), indexCacheMisses .stream ().mapToLong (Tuple ::v1 ).max ().orElse (Long .MAX_VALUE )),
220- Math .max (endRangeToWrite .v2 (), indexCacheMisses .stream ().mapToLong (Tuple ::v2 ).max ().orElse (Long .MIN_VALUE ))
221- );
202+ try {
203+ final CacheFile cacheFile = getCacheFileSafe ();
204+ try (Releasable ignored = cacheFile .fileLock ()) {
222205
223- final Tuple <Long , Long > rangeToRead = Tuple .tuple (pos , Math .min (pos + len , rangeToWrite .v2 ()));
224-
225- final CompletableFuture <Integer > populateCacheFuture = cacheFile .populateAndRead (rangeToWrite , rangeToRead , channel -> {
226- final int read ;
227- if ((rangeToRead .v2 () - rangeToRead .v1 ()) < b .remaining ()) {
228- final ByteBuffer duplicate = b .duplicate ();
229- duplicate .limit (duplicate .position () + Math .toIntExact (rangeToRead .v2 () - rangeToRead .v1 ()));
230- read = readCacheFile (channel , pos , duplicate );
231- assert duplicate .position () <= b .limit ();
232- b .position (duplicate .position ());
233- } else {
234- read = readCacheFile (channel , pos , b );
235- }
236- return read ;
237- }, this ::writeCacheFile , directory .cacheFetchAsyncExecutor ());
238-
239- for (Tuple <Long , Long > indexCacheMiss : indexCacheMisses ) {
240- cacheFile .populateAndRead (indexCacheMiss , indexCacheMiss , channel -> {
241- final int indexCacheMissLength = Math .toIntExact (indexCacheMiss .v2 () - indexCacheMiss .v1 ());
242-
243- // We assume that we only cache small portions of blobs so that we do not need to:
244- // - use a BigArrays for allocation
245- // - use an intermediate copy buffer to read the file in sensibly-sized chunks
246- // - release the buffer once the indexing operation is complete
247- assert indexCacheMissLength <= COPY_BUFFER_SIZE : indexCacheMiss ;
248-
249- final ByteBuffer byteBuffer = ByteBuffer .allocate (indexCacheMissLength );
250- Channels .readFromFileChannelWithEofException (channel , indexCacheMiss .v1 (), byteBuffer );
251- // NB use Channels.readFromFileChannelWithEofException not readCacheFile() to avoid counting this in the stats
252- byteBuffer .flip ();
253- final BytesReference content = BytesReference .fromByteBuffer (byteBuffer );
254- directory .putCachedBlob (fileInfo .physicalName (), indexCacheMiss .v1 (), content );
255- return indexCacheMissLength ;
256- }, (channel , from , to , progressUpdater ) -> {
257- // normally doesn't happen, we're already obtaining a range covering all cache misses above, but this
258- // can happen if the real populateAndRead call already failed to obtain this range of the file. In that
259- // case, we don't retry, we simply fail to populate the index cache.
260- logger .debug (
261- "failed to fill index cache miss [{}-{}] of {} due to earlier failure" ,
262- from ,
263- to ,
264- CachedBlobContainerIndexInput .this
265- );
266- throw new IOException (
267- "failed to fill index cache miss ["
268- + from
269- + "-"
270- + to
271- + "] of ["
272- + CachedBlobContainerIndexInput .this
273- + "] due to earlier failure"
274- );
275- },
276- EsExecutors .newDirectExecutorService () // if ranges are still missing, fail immediately, so no need to fork
277- );
206+ // Read all target ranges in one go, including any cache misses identified above.
207+ final Tuple <Long , Long > startRangeToWrite = computeRange (position );
208+ final Tuple <Long , Long > endRangeToWrite = computeRange (position + length - 1 );
209+ assert startRangeToWrite .v2 () <= endRangeToWrite .v2 () : startRangeToWrite + " vs " + endRangeToWrite ;
210+ final Tuple <Long , Long > rangeToWrite = Tuple .tuple (
211+ Math .min (startRangeToWrite .v1 (), indexCacheMisses .stream ().mapToLong (Tuple ::v1 ).max ().orElse (Long .MAX_VALUE )),
212+ Math .max (endRangeToWrite .v2 (), indexCacheMisses .stream ().mapToLong (Tuple ::v2 ).max ().orElse (Long .MIN_VALUE ))
213+ );
278214
215+ assert rangeToWrite .v1 () <= position && position + length <= rangeToWrite .v2 () : "["
216+ + position
217+ + "-"
218+ + (position + length )
219+ + "] vs "
220+ + rangeToWrite ;
221+ final Tuple <Long , Long > rangeToRead = Tuple .tuple (position , position + length );
222+
223+ final CompletableFuture <Integer > populateCacheFuture = cacheFile .populateAndRead (rangeToWrite , rangeToRead , channel -> {
224+ final int read ;
225+ if ((rangeToRead .v2 () - rangeToRead .v1 ()) < b .remaining ()) {
226+ final ByteBuffer duplicate = b .duplicate ();
227+ duplicate .limit (duplicate .position () + Math .toIntExact (rangeToRead .v2 () - rangeToRead .v1 ()));
228+ read = readCacheFile (channel , position , duplicate );
229+ assert duplicate .position () <= b .limit ();
230+ b .position (duplicate .position ());
231+ } else {
232+ read = readCacheFile (channel , position , b );
279233 }
280-
281- bytesRead = populateCacheFuture .get ();
234+ return read ;
235+ }, this ::writeCacheFile , directory .cacheFetchAsyncExecutor ());
236+
237+ for (Tuple <Long , Long > indexCacheMiss : indexCacheMisses ) {
238+ cacheFile .populateAndRead (indexCacheMiss , indexCacheMiss , channel -> {
239+ final int indexCacheMissLength = Math .toIntExact (indexCacheMiss .v2 () - indexCacheMiss .v1 ());
240+
241+ // We assume that we only cache small portions of blobs so that we do not need to:
242+ // - use a BigArrays for allocation
243+ // - use an intermediate copy buffer to read the file in sensibly-sized chunks
244+ // - release the buffer once the indexing operation is complete
245+ assert indexCacheMissLength <= COPY_BUFFER_SIZE : indexCacheMiss ;
246+
247+ final ByteBuffer byteBuffer = ByteBuffer .allocate (indexCacheMissLength );
248+ Channels .readFromFileChannelWithEofException (channel , indexCacheMiss .v1 (), byteBuffer );
249+ // NB use Channels.readFromFileChannelWithEofException not readCacheFile() to avoid counting this in the stats
250+ byteBuffer .flip ();
251+ final BytesReference content = BytesReference .fromByteBuffer (byteBuffer );
252+ directory .putCachedBlob (fileInfo .physicalName (), indexCacheMiss .v1 (), content );
253+ return indexCacheMissLength ;
254+ }, (channel , from , to , progressUpdater ) -> {
255+ // normally doesn't happen, we're already obtaining a range covering all cache misses above, but this
256+ // can happen if the real populateAndRead call already failed to obtain this range of the file. In that
257+ // case, we don't retry, we simply fail to populate the index cache.
258+ logger .debug (
259+ "failed to fill index cache miss [{}-{}] of {} due to earlier failure" ,
260+ from ,
261+ to ,
262+ CachedBlobContainerIndexInput .this
263+ );
264+ throw new IOException (
265+ "failed to fill index cache miss ["
266+ + from
267+ + "-"
268+ + to
269+ + "] of ["
270+ + CachedBlobContainerIndexInput .this
271+ + "] due to earlier failure"
272+ );
273+ },
274+ EsExecutors .newDirectExecutorService () // if ranges are still missing, fail immediately, so no need to fork
275+ );
282276
283277 }
284- } catch (final Exception e ) {
285- if (e instanceof AlreadyClosedException || (e .getCause () != null && e .getCause () instanceof AlreadyClosedException )) {
286- try {
287- // cache file was evicted during the range fetching, read bytes directly from source
288- bytesRead = readDirectly (pos , pos + len , b );
289- continue ;
290- } catch (Exception inner ) {
291- e .addSuppressed (inner );
292- }
293- }
294- throw new IOException ("Fail to read data from cache" , e );
295278
296- } finally {
297- totalBytesRead += bytesRead ;
279+ final int bytesRead = populateCacheFuture . get ();
280+ assert bytesRead == length : bytesRead + " vs " + length ;
298281 }
282+ } catch (final Exception e ) {
283+ // may have partially filled the buffer before the exception was thrown, so try and get the remainder directly.
284+ final int alreadyRead = length - b .remaining ();
285+ final int bytesRead = readDirectlyIfAlreadyClosed (position + alreadyRead , b , e );
286+ assert alreadyRead + bytesRead == length : alreadyRead + " + " + bytesRead + " vs " + length ;
299287 }
300- assert totalBytesRead == length : "partial read operation, read [" + totalBytesRead + "] bytes of [" + length + "]" ;
301- stats .incrementBytesRead (lastReadPosition , position , totalBytesRead );
302- lastReadPosition = position + totalBytesRead ;
288+
289+ stats .incrementBytesRead (lastReadPosition , position , length );
290+ lastReadPosition = position + length ;
303291 lastSeekPosition = lastReadPosition ;
304292 }
305293
294+ private int readDirectlyIfAlreadyClosed (long position , ByteBuffer b , Exception e ) throws IOException {
295+ if (e instanceof AlreadyClosedException || (e .getCause () != null && e .getCause () instanceof AlreadyClosedException )) {
296+ try {
297+ // cache file was evicted during the range fetching, read bytes directly from blob container
298+ final long length = b .remaining ();
299+ final byte [] copyBuffer = new byte [Math .toIntExact (Math .min (COPY_BUFFER_SIZE , length ))];
300+ logger .trace (
301+ () -> new ParameterizedMessage (
302+ "direct reading of range [{}-{}] for cache file [{}]" ,
303+ position ,
304+ position + length ,
305+ cacheFileReference
306+ )
307+ );
308+
309+ int bytesCopied = 0 ;
310+ final long startTimeNanos = stats .currentTimeNanos ();
311+ try (InputStream input = openInputStreamFromBlobStore (position , length )) {
312+ long remaining = length ;
313+ while (remaining > 0 ) {
314+ final int len = (remaining < copyBuffer .length ) ? (int ) remaining : copyBuffer .length ;
315+ int bytesRead = input .read (copyBuffer , 0 , len );
316+ if (bytesRead == -1 ) {
317+ throw new EOFException (
318+ String .format (
319+ Locale .ROOT ,
320+ "unexpected EOF reading [%d-%d] ([%d] bytes remaining) from %s" ,
321+ position ,
322+ position + length ,
323+ remaining ,
324+ cacheFileReference
325+ )
326+ );
327+ }
328+ b .put (copyBuffer , 0 , bytesRead );
329+ bytesCopied += bytesRead ;
330+ remaining -= bytesRead ;
331+ assert remaining == b .remaining () : remaining + " vs " + b .remaining ();
332+ }
333+ final long endTimeNanos = stats .currentTimeNanos ();
334+ stats .addDirectBytesRead (bytesCopied , endTimeNanos - startTimeNanos );
335+ }
336+ return bytesCopied ;
337+ } catch (Exception inner ) {
338+ e .addSuppressed (inner );
339+ }
340+ }
341+ throw new IOException ("failed to read data from cache" , e );
342+ }
343+
306344 private boolean readChecksumFromFileInfo (ByteBuffer b ) throws IOException {
307345 assert isClone == false ;
308346 byte [] footer ;
@@ -656,40 +694,6 @@ public String toString() {
656694 + '}' ;
657695 }
658696
659- private int readDirectly (long start , long end , ByteBuffer b ) throws IOException {
660- final long length = end - start ;
661- final byte [] copyBuffer = new byte [Math .toIntExact (Math .min (COPY_BUFFER_SIZE , length ))];
662- logger .trace (() -> new ParameterizedMessage ("direct reading of range [{}-{}] for cache file [{}]" , start , end , cacheFileReference ));
663-
664- int bytesCopied = 0 ;
665- final long startTimeNanos = stats .currentTimeNanos ();
666- try (InputStream input = openInputStreamFromBlobStore (start , length )) {
667- long remaining = end - start ;
668- while (remaining > 0 ) {
669- final int len = (remaining < copyBuffer .length ) ? (int ) remaining : copyBuffer .length ;
670- int bytesRead = input .read (copyBuffer , 0 , len );
671- if (bytesRead == -1 ) {
672- throw new EOFException (
673- String .format (
674- Locale .ROOT ,
675- "unexpected EOF reading [%d-%d] ([%d] bytes remaining) from %s" ,
676- start ,
677- end ,
678- remaining ,
679- cacheFileReference
680- )
681- );
682- }
683- b .put (copyBuffer , 0 , bytesRead );
684- bytesCopied += bytesRead ;
685- remaining -= bytesRead ;
686- }
687- final long endTimeNanos = stats .currentTimeNanos ();
688- stats .addDirectBytesRead (bytesCopied , endTimeNanos - startTimeNanos );
689- }
690- return bytesCopied ;
691- }
692-
693697 private static class CacheFileReference implements CacheFile .EvictionListener {
694698
695699 private final long fileLength ;
0 commit comments