144144import org .apache .hadoop .hdfs .DFSClient ;
145145import org .apache .hadoop .hdfs .DistributedFileSystem ;
146146import org .apache .hadoop .hdfs .MiniDFSCluster ;
147+ import org .apache .hadoop .hdfs .server .datanode .DataNode ;
148+ import org .apache .hadoop .hdfs .server .datanode .fsdataset .FsDatasetSpi ;
147149import org .apache .hadoop .hdfs .server .namenode .EditLogFileOutputStream ;
148150import org .apache .hadoop .mapred .JobConf ;
149151import org .apache .hadoop .mapred .MiniMRCluster ;
@@ -196,6 +198,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
196198 public static final boolean PRESPLIT_TEST_TABLE = true ;
197199
198200 private MiniDFSCluster dfsCluster = null ;
201+ private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null ;
199202
200203 private volatile HBaseCluster hbaseCluster = null ;
201204 private MiniMRCluster mrCluster = null ;
@@ -574,6 +577,56 @@ public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException
574577 return getTestFileSystem ().delete (cpath , true );
575578 }
576579
580+ // Workaround to avoid IllegalThreadStateException
581+ // See HBASE-27148 for more details
582+ private static final class FsDatasetAsyncDiskServiceFixer extends Thread {
583+
584+ private volatile boolean stopped = false ;
585+
586+ private final MiniDFSCluster cluster ;
587+
588+ FsDatasetAsyncDiskServiceFixer (MiniDFSCluster cluster ) {
589+ super ("FsDatasetAsyncDiskServiceFixer" );
590+ setDaemon (true );
591+ this .cluster = cluster ;
592+ }
593+
594+ @ Override
595+ public void run () {
596+ while (!stopped ) {
597+ try {
598+ Thread .sleep (30000 );
599+ } catch (InterruptedException e ) {
600+ Thread .currentThread ().interrupt ();
601+ continue ;
602+ }
603+ // we could add new datanodes during tests, so here we will check every 30 seconds, as the
604+ // timeout of the thread pool executor is 60 seconds by default.
605+ try {
606+ for (DataNode dn : cluster .getDataNodes ()) {
607+ FsDatasetSpi <?> dataset = dn .getFSDataset ();
608+ Field service = dataset .getClass ().getDeclaredField ("asyncDiskService" );
609+ service .setAccessible (true );
610+ Object asyncDiskService = service .get (dataset );
611+ Field group = asyncDiskService .getClass ().getDeclaredField ("threadGroup" );
612+ group .setAccessible (true );
613+ ThreadGroup threadGroup = (ThreadGroup ) group .get (asyncDiskService );
614+ if (threadGroup .isDaemon ()) {
615+ threadGroup .setDaemon (false );
616+ }
617+ }
618+ } catch (Exception e ) {
619+ LOG .warn ("failed to reset thread pool timeout for FsDatasetAsyncDiskService" , e );
620+ }
621+ }
622+ }
623+
624+ void shutdown () {
625+ stopped = true ;
626+ interrupt ();
627+ }
628+ }
629+
577630 /**
578631 * Start a minidfscluster.
579632 * @param servers How many DNs to start. n * @see #shutdownMiniDFSCluster()
@@ -632,7 +685,8 @@ public MiniDFSCluster startMiniDFSCluster(int servers, final String racks[], Str
632685
633686 this .dfsCluster =
634687 new MiniDFSCluster (0 , this .conf , servers , true , true , true , null , racks , hosts , null );
635-
688+ this .dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer (dfsCluster );
689+ this .dfsClusterFixer .start ();
636690 // Set this just-started cluster as our filesystem.
637691 setFs ();
638692
@@ -656,6 +710,8 @@ public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOE
656710 "ERROR" );
657711 dfsCluster =
658712 new MiniDFSCluster (namenodePort , conf , 5 , false , true , true , null , null , null , null );
713+ this .dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer (dfsCluster );
714+ this .dfsClusterFixer .start ();
659715 return dfsCluster ;
660716 }
661717
@@ -778,6 +834,12 @@ public void shutdownMiniDFSCluster() throws IOException {
778834 // The below throws an exception per dn, AsynchronousCloseException.
779835 this .dfsCluster .shutdown ();
780836 dfsCluster = null ;
837+ // It is possible that the dfs cluster is set through setDFSCluster method, where we will not
838+ // have a fixer
839+ if (dfsClusterFixer != null ) {
840+ this .dfsClusterFixer .shutdown ();
841+ dfsClusterFixer = null ;
842+ }
781843 dataTestDirOnTestFS = null ;
782844 CommonFSUtils .setFsDefault (this .conf , new Path ("file:///" ));
783845 }
0 commit comments