@@ -41,6 +41,7 @@ public class FadvisedFileRegion extends DefaultFileRegion {
4141  private  static  final  Logger  LOG  =
4242      LoggerFactory .getLogger (FadvisedFileRegion .class );
4343
44+   private  final  Object  closeLock  = new  Object ();
4445  private  final  boolean  manageOsCache ;
4546  private  final  int  readaheadLength ;
4647  private  final  ReadaheadPool  readaheadPool ;
@@ -51,12 +52,12 @@ public class FadvisedFileRegion extends DefaultFileRegion {
5152  private  final  int  shuffleBufferSize ;
5253  private  final  boolean  shuffleTransferToAllowed ;
5354  private  final  FileChannel  fileChannel ;
54-    
55-   private  ReadaheadRequest  readaheadRequest ;
55+ 
56+   private  volatile   ReadaheadRequest  readaheadRequest ;
5657
5758  public  FadvisedFileRegion (RandomAccessFile  file , long  position , long  count ,
5859      boolean  manageOsCache , int  readaheadLength , ReadaheadPool  readaheadPool ,
59-       String  identifier , int  shuffleBufferSize ,  
60+       String  identifier , int  shuffleBufferSize ,
6061      boolean  shuffleTransferToAllowed ) throws  IOException  {
6162    super (file .getChannel (), position , count );
6263    this .manageOsCache  = manageOsCache ;
@@ -73,97 +74,110 @@ public FadvisedFileRegion(RandomAccessFile file, long position, long count,
7374
7475  @ Override 
7576  public  long  transferTo (WritableByteChannel  target , long  position )
76-       throws  IOException  {
77-     if  (readaheadPool  != null  && readaheadLength  > 0 ) {
78-       readaheadRequest  = readaheadPool .readaheadStream (identifier , fd ,
79-           position () + position , readaheadLength ,
80-           position () + count (), readaheadRequest );
77+           throws  IOException  {
78+     synchronized  (closeLock ) {
79+       if  (fd .valid ()) {
80+         if  (readaheadPool  != null  && readaheadLength  > 0 ) {
81+           readaheadRequest  = readaheadPool .readaheadStream (identifier , fd ,
82+                   position () + position , readaheadLength ,
83+                   position () + count (), readaheadRequest );
84+         }
85+ 
86+         if (this .shuffleTransferToAllowed ) {
87+           return  super .transferTo (target , position );
88+         } else  {
89+           return  customShuffleTransfer (target , position );
90+         }
91+       } else  {
92+         return  0L ;
93+       }
8194    }
82-     
83-     if (this .shuffleTransferToAllowed ) {
84-       return  super .transferTo (target , position );
85-     } else  {
86-       return  customShuffleTransfer (target , position );
87-     } 
95+ 
8896  }
8997
9098  /** 
91-    * This method transfers data using local buffer. It transfers data from   
92-    * a disk to a local buffer in memory, and then it transfers data from the   
99+    * This method transfers data using local buffer. It transfers data from 
100+    * a disk to a local buffer in memory, and then it transfers data from the 
93101   * buffer to the target. This is used only if transferTo is disallowed in 
94-    * the configuration file. super.TransferTo does not perform well on Windows   
95-    * due to a small IO request generated. customShuffleTransfer can control   
96-    * the size of the IO requests by changing the size of the intermediate   
102+    * the configuration file. super.TransferTo does not perform well on Windows 
103+    * due to a small IO request generated. customShuffleTransfer can control 
104+    * the size of the IO requests by changing the size of the intermediate 
97105   * buffer. 
98106   */ 
99107  @ VisibleForTesting 
100108  long  customShuffleTransfer (WritableByteChannel  target , long  position )
101-       throws  IOException  {
109+            throws  IOException  {
102110    long  actualCount  = this .count  - position ;
103111    if  (actualCount  < 0  || position  < 0 ) {
104112      throw  new  IllegalArgumentException (
105-           "position out of range: "  + position  +
106-           " (expected: 0 - "  + (this .count  - 1 ) + ')' );
113+                "position out of range: "  + position  +
114+                        " (expected: 0 - "  + (this .count  - 1 ) + ')' );
107115    }
108116    if  (actualCount  == 0 ) {
109117      return  0L ;
110118    }
111-      
119+ 
112120    long  trans  = actualCount ;
113121    int  readSize ;
114122    ByteBuffer  byteBuffer  = ByteBuffer .allocate (
115-         Math .min (
116-             this .shuffleBufferSize ,
117-             trans  > Integer .MAX_VALUE  ? Integer .MAX_VALUE  : (int ) trans ));
118-      
123+              Math .min (
124+                      this .shuffleBufferSize ,
125+                      trans  > Integer .MAX_VALUE  ? Integer .MAX_VALUE  : (int ) trans ));
126+ 
119127    while (trans  > 0L  &&
120-         (readSize  = fileChannel .read (byteBuffer , this .position +position )) > 0 ) {
128+              (readSize  = fileChannel .read (byteBuffer , this .position +position )) > 0 ) {
121129      //adjust counters and buffer limit 
122130      if (readSize  < trans ) {
123131        trans  -= readSize ;
124132        position  += readSize ;
125133        byteBuffer .flip ();
126134      } else  {
127-         //We can read more than we need if the actualCount is not multiple   
135+         //We can read more than we need if the actualCount is not multiple 
128136        //of the byteBuffer size and file is big enough. In that case we cannot 
129137        //use flip method but we need to set buffer limit manually to trans. 
130138        byteBuffer .limit ((int )trans );
131139        byteBuffer .position (0 );
132-         position  += trans ;  
140+         position  += trans ;
133141        trans  = 0 ;
134142      }
135-        
143+ 
136144      //write data to the target 
137145      while (byteBuffer .hasRemaining ()) {
138146        target .write (byteBuffer );
139147      }
140-        
148+ 
141149      byteBuffer .clear ();
142150    }
143-      
151+ 
144152    return  actualCount  - trans ;
145153  }
146154
147-    
155+ 
148156  @ Override 
149157  protected  void  deallocate () {
150-     if  (readaheadRequest  != null ) {
151-       readaheadRequest .cancel ();
158+     synchronized  (closeLock ) {
159+       if  (readaheadRequest  != null ) {
160+         readaheadRequest .cancel ();
161+         readaheadRequest  = null ;
162+       }
163+       super .deallocate ();
152164    }
153-     super .deallocate ();
154165  }
155-    
166+ 
156167  /** 
157168   * Call when the transfer completes successfully so we can advise the OS that 
158169   * we don't need the region to be cached anymore. 
159170   */ 
160171  public  void  transferSuccessful () {
161-     if  (manageOsCache  && count () > 0 ) {
162-       try  {
163-         NativeIO .POSIX .getCacheManipulator ().posixFadviseIfPossible (identifier ,
164-             fd , position (), count (), POSIX_FADV_DONTNEED );
165-       } catch  (Throwable  t ) {
166-         LOG .warn ("Failed to manage OS cache for "  + identifier , t );
172+     synchronized  (closeLock ) {
173+       if  (fd .valid () && manageOsCache  && count () > 0 ) {
174+         try  {
175+           NativeIO .POSIX .getCacheManipulator ().posixFadviseIfPossible (identifier ,
176+                   fd , position (), count (), POSIX_FADV_DONTNEED );
177+         } catch  (Throwable  t ) {
178+           LOG .warn ("Failed to manage OS cache for "  + identifier  +
179+                   " fd "  + fd , t );
180+         }
167181      }
168182    }
169183  }
0 commit comments