1717 */
1818package org .apache .hadoop .hbase .io .hfile ;
1919
20- import java .io .File ;
21- import java .io .FileInputStream ;
22- import java .io .FileOutputStream ;
23- import java .io .IOException ;
24- import java .util .HashMap ;
2520import java .util .Map ;
2621import java .util .concurrent .ConcurrentSkipListMap ;
2722import java .util .concurrent .Future ;
3631import org .apache .hadoop .fs .Path ;
3732import org .apache .hadoop .hbase .HBaseConfiguration ;
3833import org .apache .hadoop .hbase .HConstants ;
34+ import org .apache .hadoop .hbase .util .EnvironmentEdgeManager ;
3935import org .apache .yetus .audience .InterfaceAudience ;
4036import org .slf4j .Logger ;
4137import org .slf4j .LoggerFactory ;
4238
43- import org .apache .hadoop .hbase .shaded .protobuf .generated .PersistentPrefetchProtos ;
44-
4539@ InterfaceAudience .Private
4640public final class PrefetchExecutor {
4741
4842 private static final Logger LOG = LoggerFactory .getLogger (PrefetchExecutor .class );
4943
5044 /** Futures for tracking block prefetch activity */
5145 private static final Map <Path , Future <?>> prefetchFutures = new ConcurrentSkipListMap <>();
52- /** Set of files for which prefetch is completed */
53- @ edu .umd .cs .findbugs .annotations .SuppressWarnings (value = "MS_SHOULD_BE_FINAL" )
54- private static HashMap <String , Boolean > prefetchCompleted = new HashMap <>();
5546 /** Executor pool shared among all HFiles for block prefetch */
5647 private static final ScheduledExecutorService prefetchExecutorPool ;
5748 /** Delay before beginning prefetch */
5849 private static final int prefetchDelayMillis ;
5950 /** Variation in prefetch delay times, to mitigate stampedes */
6051 private static final float prefetchDelayVariation ;
61- static String prefetchedFileListPath ;
6252 static {
6353 // Consider doing this on demand with a configuration passed in rather
6454 // than in a static initializer.
@@ -71,7 +61,7 @@ public final class PrefetchExecutor {
7161 prefetchExecutorPool = new ScheduledThreadPoolExecutor (prefetchThreads , new ThreadFactory () {
7262 @ Override
7363 public Thread newThread (Runnable r ) {
74- String name = "hfile-prefetch-" + System . currentTimeMillis ();
64+ String name = "hfile-prefetch-" + EnvironmentEdgeManager . currentTime ();
7565 Thread t = new Thread (r , name );
7666 t .setDaemon (true );
7767 return t ;
@@ -88,13 +78,6 @@ public Thread newThread(Runnable r) {
8878 + HConstants .HREGION_COMPACTIONDIR_NAME .replace ("." , "\\ ." ) + Path .SEPARATOR_CHAR + ")" );
8979
9080 public static void request (Path path , Runnable runnable ) {
91- if (prefetchCompleted != null ) {
92- if (isFilePrefetched (path .getName ())) {
93- LOG .info (
94- "File has already been prefetched before the restart, so skipping prefetch : " + path );
95- return ;
96- }
97- }
9881 if (!prefetchPathExclude .matcher (path .toString ()).find ()) {
9982 long delay ;
10083 if (prefetchDelayMillis > 0 ) {
@@ -120,9 +103,8 @@ public static void request(Path path, Runnable runnable) {
120103 public static void complete (Path path ) {
121104 prefetchFutures .remove (path );
122105 if (LOG .isDebugEnabled ()) {
123- LOG .debug ("Prefetch completed for " + path );
106+ LOG .debug ("Prefetch completed for {}" , path . getName () );
124107 }
125- prefetchCompleted .put (path .getName (), true );
126108 }
127109
128110 public static void cancel (Path path ) {
@@ -131,14 +113,8 @@ public static void cancel(Path path) {
131113 // ok to race with other cancellation attempts
132114 future .cancel (true );
133115 prefetchFutures .remove (path );
134- if (LOG .isDebugEnabled ()) {
135- LOG .debug ("Prefetch cancelled for " + path );
136- }
116+ LOG .debug ("Prefetch cancelled for {}" , path );
137117 }
138- if (LOG .isDebugEnabled ()) {
139- LOG .debug ("Removing filename from the prefetched persistence list: {}" , path .getName ());
140- }
141- removePrefetchedFileWhileEvict (path .getName ());
142118 }
143119
144120 public static boolean isCompleted (Path path ) {
@@ -149,68 +125,6 @@ public static boolean isCompleted(Path path) {
149125 return true ;
150126 }
151127
152- public static void persistToFile (String path ) throws IOException {
153- prefetchedFileListPath = path ;
154- if (prefetchedFileListPath == null ) {
155- LOG .info ("Exception while persisting prefetch!" );
156- throw new IOException ("Error persisting prefetched HFiles set!" );
157- }
158- if (!prefetchCompleted .isEmpty ()) {
159- try (FileOutputStream fos = new FileOutputStream (prefetchedFileListPath , false )) {
160- PrefetchProtoUtils .toPB (prefetchCompleted ).writeDelimitedTo (fos );
161- }
162- }
163- }
164-
165- public static void retrieveFromFile (String path ) throws IOException {
166- prefetchedFileListPath = path ;
167- File prefetchPersistenceFile = new File (prefetchedFileListPath );
168- if (!prefetchPersistenceFile .exists ()) {
169- LOG .warn ("Prefetch persistence file does not exist!" );
170- return ;
171- }
172- LOG .info ("Retrieving from prefetch persistence file " + path );
173- assert (prefetchedFileListPath != null );
174- try (FileInputStream fis = deleteFileOnClose (prefetchPersistenceFile )) {
175- PersistentPrefetchProtos .PrefetchedHfileName proto =
176- PersistentPrefetchProtos .PrefetchedHfileName .parseDelimitedFrom (fis );
177- Map <String , Boolean > protoPrefetchedFilesMap = proto .getPrefetchedFilesMap ();
178- prefetchCompleted .putAll (protoPrefetchedFilesMap );
179- }
180- }
181-
182- private static FileInputStream deleteFileOnClose (final File file ) throws IOException {
183- return new FileInputStream (file ) {
184- private File myFile ;
185-
186- private FileInputStream init (File file ) {
187- myFile = file ;
188- return this ;
189- }
190-
191- @ Override
192- public void close () throws IOException {
193- if (myFile == null ) {
194- return ;
195- }
196-
197- super .close ();
198- if (!myFile .delete ()) {
199- throw new IOException ("Failed deleting persistence file " + myFile .getAbsolutePath ());
200- }
201- myFile = null ;
202- }
203- }.init (file );
204- }
205-
206- public static void removePrefetchedFileWhileEvict (String hfileName ) {
207- prefetchCompleted .remove (hfileName );
208- }
209-
210- public static boolean isFilePrefetched (String hfileName ) {
211- return prefetchCompleted .containsKey (hfileName );
212- }
213-
214128 private PrefetchExecutor () {
215129 }
216130}
0 commit comments