|
30 | 30 | import org.apache.hadoop.hbase.IntegrationTestingUtility; |
31 | 31 | import org.apache.hadoop.hbase.chaos.policies.Policy; |
32 | 32 | import org.apache.hadoop.hbase.util.Pair; |
| 33 | +import org.apache.hadoop.hbase.util.ReservoirSample; |
| 34 | +import org.slf4j.Logger; |
| 35 | +import org.slf4j.LoggerFactory; |
33 | 36 |
|
34 | 37 | import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; |
35 | 38 |
|
36 | 39 | /** |
37 | 40 | * Chaos monkey that given multiple policies will run actions against the cluster. |
38 | 41 | */ |
39 | 42 | public class PolicyBasedChaosMonkey extends ChaosMonkey { |
| 43 | + private static final Logger LOG = LoggerFactory.getLogger(PolicyBasedChaosMonkey.class); |
40 | 44 |
|
41 | 45 | private static final long ONE_SEC = 1000; |
42 | 46 | private static final long ONE_MIN = 60 * ONE_SEC; |
@@ -116,13 +120,14 @@ public static <T> T selectWeightedRandomItem(List<Pair<T, Integer>> items) { |
116 | 120 |
|
117 | 121 | /** Selects and returns ceil(ratio * items.length) random items from the given array */ |
118 | 122 | public static <T> List<T> selectRandomItems(T[] items, float ratio) { |
119 | | - int selectedNumber = (int) Math.ceil(items.length * ratio); |
120 | | - |
121 | | - List<T> originalItems = Arrays.asList(items); |
122 | | - Collections.shuffle(originalItems); |
123 | | - |
124 | | - int startIndex = ThreadLocalRandom.current().nextInt(items.length - selectedNumber); |
125 | | - return originalItems.subList(startIndex, startIndex + selectedNumber); |
| 123 | + // clamp ratio to [0.0,1.0] |
| 124 | + ratio = Math.max(Math.min(ratio, 1.0f), 0.0f); |
| 125 | + final int selectedNumber = (int) Math.ceil(items.length * ratio); |
| 126 | + final ReservoirSample<T> sample = new ReservoirSample<>(selectedNumber); |
| 127 | + sample.add(Arrays.stream(items)); |
| 128 | + final List<T> shuffledItems = sample.getSamplingResult(); |
| 129 | + Collections.shuffle(shuffledItems); |
| 130 | + return shuffledItems; |
126 | 131 | } |
127 | 132 |
|
128 | 133 | @Override |
@@ -151,7 +156,10 @@ public boolean isStopped() { |
151 | 156 |
|
152 | 157 | @Override |
153 | 158 | public void waitForStop() throws InterruptedException { |
154 | | - monkeyThreadPool.awaitTermination(1, TimeUnit.MINUTES); |
| 159 | + if (!monkeyThreadPool.awaitTermination(1, TimeUnit.MINUTES)) { |
| 160 | + LOG.warn("Some pool threads failed to terminate. Forcing. {}", monkeyThreadPool); |
| 161 | + monkeyThreadPool.shutdownNow(); |
| 162 | + } |
155 | 163 | } |
156 | 164 |
|
157 | 165 | @Override |
|
0 commit comments