|  | 
| 17 | 17 |  */ | 
| 18 | 18 | package org.apache.hadoop.hbase.regionserver; | 
| 19 | 19 | 
 | 
|  | 20 | +import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY; | 
| 20 | 21 | import static org.junit.Assert.assertArrayEquals; | 
| 21 | 22 | import static org.junit.Assert.assertEquals; | 
| 22 | 23 | import static org.junit.Assert.assertFalse; | 
|  | 
| 41 | 42 | import java.util.List; | 
| 42 | 43 | import java.util.ListIterator; | 
| 43 | 44 | import java.util.NavigableSet; | 
|  | 45 | +import java.util.Optional; | 
| 44 | 46 | import java.util.TreeSet; | 
| 45 | 47 | import java.util.concurrent.ConcurrentSkipListSet; | 
| 46 | 48 | import java.util.concurrent.CountDownLatch; | 
| 47 | 49 | import java.util.concurrent.CyclicBarrier; | 
| 48 | 50 | import java.util.concurrent.ExecutorService; | 
| 49 | 51 | import java.util.concurrent.Executors; | 
|  | 52 | +import java.util.concurrent.Future; | 
| 50 | 53 | import java.util.concurrent.ThreadPoolExecutor; | 
| 51 | 54 | import java.util.concurrent.TimeUnit; | 
| 52 | 55 | import java.util.concurrent.atomic.AtomicBoolean; | 
|  | 
| 103 | 106 | import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType; | 
| 104 | 107 | import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action; | 
| 105 | 108 | import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; | 
|  | 109 | +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; | 
| 106 | 110 | import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; | 
|  | 111 | +import org.apache.hadoop.hbase.regionserver.compactions.EverythingPolicy; | 
| 107 | 112 | import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; | 
| 108 | 113 | import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; | 
| 109 | 114 | import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; | 
| @@ -1113,6 +1118,68 @@ public void testRefreshStoreFilesNotChanged() throws IOException { | 
| 1113 | 1118 |     verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any()); | 
| 1114 | 1119 |   } | 
| 1115 | 1120 | 
 | 
|  | 1121 | +  @Test | 
|  | 1122 | +  public void testScanWithCompactionAfterFlush() throws Exception { | 
|  | 1123 | +    TEST_UTIL.getConfiguration().set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, | 
|  | 1124 | +      EverythingPolicy.class.getName()); | 
|  | 1125 | +    init(name.getMethodName()); | 
|  | 1126 | + | 
|  | 1127 | +    assertEquals(0, this.store.getStorefilesCount()); | 
|  | 1128 | + | 
|  | 1129 | +    KeyValue kv = new KeyValue(row, family, qf1, 1, (byte[]) null); | 
|  | 1130 | +    // add some data, flush | 
|  | 1131 | +    this.store.add(kv, null); | 
|  | 1132 | +    flush(1); | 
|  | 1133 | +    kv = new KeyValue(row, family, qf2, 1, (byte[]) null); | 
|  | 1134 | +    // add some data, flush | 
|  | 1135 | +    this.store.add(kv, null); | 
|  | 1136 | +    flush(2); | 
|  | 1137 | +    kv = new KeyValue(row, family, qf3, 1, (byte[]) null); | 
|  | 1138 | +    // add some data, flush | 
|  | 1139 | +    this.store.add(kv, null); | 
|  | 1140 | +    flush(3); | 
|  | 1141 | + | 
|  | 1142 | +    ExecutorService service = Executors.newFixedThreadPool(2); | 
|  | 1143 | + | 
|  | 1144 | +    Scan scan = new Scan(new Get(row)); | 
|  | 1145 | +    Future<KeyValueScanner> scanFuture = service.submit(() -> { | 
|  | 1146 | +      try { | 
|  | 1147 | +        LOG.info(">>>> creating scanner"); | 
|  | 1148 | +        return this.store.createScanner(scan, | 
|  | 1149 | +          new ScanInfo(HBaseConfiguration.create(), | 
|  | 1150 | +            ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build(), | 
|  | 1151 | +            Long.MAX_VALUE, 0, CellComparator.getInstance()), | 
|  | 1152 | +          scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0); | 
|  | 1153 | +      } catch (IOException e) { | 
|  | 1154 | +        e.printStackTrace(); | 
|  | 1155 | +        return null; | 
|  | 1156 | +      } | 
|  | 1157 | +    }); | 
|  | 1158 | +    Future compactFuture = service.submit(() -> { | 
|  | 1159 | +      try { | 
|  | 1160 | +        LOG.info(">>>>>> starting compaction"); | 
|  | 1161 | +        Optional<CompactionContext> opCompaction = this.store.requestCompaction(); | 
|  | 1162 | +        assertTrue(opCompaction.isPresent()); | 
|  | 1163 | +        store.compact(opCompaction.get(), new NoLimitThroughputController(), User.getCurrent()); | 
|  | 1164 | +        LOG.info(">>>>>> Compaction is finished"); | 
|  | 1165 | +        this.store.closeAndArchiveCompactedFiles(); | 
|  | 1166 | +        LOG.info(">>>>>> Compacted files deleted"); | 
|  | 1167 | +      } catch (IOException e) { | 
|  | 1168 | +        e.printStackTrace(); | 
|  | 1169 | +      } | 
|  | 1170 | +    }); | 
|  | 1171 | + | 
|  | 1172 | +    KeyValueScanner kvs = scanFuture.get(); | 
|  | 1173 | +    compactFuture.get(); | 
|  | 1174 | +    ((StoreScanner) kvs).currentScanners.forEach(s -> { | 
|  | 1175 | +      if (s instanceof StoreFileScanner) { | 
|  | 1176 | +        assertEquals(1, ((StoreFileScanner) s).getReader().getRefCount()); | 
|  | 1177 | +      } | 
|  | 1178 | +    }); | 
|  | 1179 | +    kvs.seek(kv); | 
|  | 1180 | +    service.shutdownNow(); | 
|  | 1181 | +  } | 
|  | 1182 | + | 
| 1116 | 1183 |   private long countMemStoreScanner(StoreScanner scanner) { | 
| 1117 | 1184 |     if (scanner.currentScanners == null) { | 
| 1118 | 1185 |       return 0; | 
|  | 
0 commit comments