Skip to content

Commit be2de08

Browse files
committed
HBASE-22810 Initialize an separate ThreadPoolExecutor for taking/restoring snapshot (addendum - use the old config key)
1 parent 105008e commit be2de08

File tree

6 files changed

+53
-122
lines changed

6 files changed

+53
-122
lines changed

hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1363,10 +1363,6 @@ public static enum Modify {
13631363
"hbase.master.executor.logreplayops.threads";
13641364
public static final int MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT = 10;
13651365

1366-
public static final String MASTER_SNAPSHOT_OPERATIONS_THREADS =
1367-
"hbase.master.executor.snapshot.threads";
1368-
public static final int MASTER_SNAPSHOT_OPERATIONS_THREADS_DEFAULT = 3;
1369-
13701366
private HConstants() {
13711367
// Can't be instantiated with this ctor.
13721368
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1226,9 +1226,8 @@ private void startServiceThreads() throws IOException {
12261226
HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT));
12271227
this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt(
12281228
HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT));
1229-
this.service.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS,
1230-
conf.getInt(HConstants.MASTER_SNAPSHOT_OPERATIONS_THREADS,
1231-
HConstants.MASTER_SNAPSHOT_OPERATIONS_THREADS_DEFAULT));
1229+
this.service.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, conf.getInt(
1230+
SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT));
12321231

12331232
// We depend on there being only one instance of this executor running
12341233
// at a time. To do concurrency, would need fencing of enable/disable of

hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/EnabledTableSnapshotHandler.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,11 @@ public EnabledTableSnapshotHandler prepare() throws Exception {
6565
// enforce a snapshot time constraints, but lets us be potentially a bit more robust.
6666

6767
/**
68-
* This method kicks off a snapshot procedure. Other than that it hangs around for various
69-
* phases to complete.
68+
* This method kicks off a snapshot procedure. Other than that it hangs around for various phases
69+
* to complete.
7070
*/
7171
@Override
72-
protected void snapshotRegions(List<Pair<HRegionInfo, ServerName>> regions)
73-
throws HBaseSnapshotException, IOException {
72+
protected void snapshotRegions(List<Pair<HRegionInfo, ServerName>> regions) throws IOException {
7473
Set<String> regionServers = new HashSet<String>(regions.size());
7574
for (Pair<HRegionInfo, ServerName> region : regions) {
7675
if (region != null && region.getFirst() != null && region.getSecond() != null) {

hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,10 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
137137
public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
138138

139139
/** Conf key for # of threads used by the SnapshotManager thread pool */
140-
private static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads";
140+
public static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads";
141141

142142
/** number of current operations running on the master */
143-
private static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
143+
public static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
144144

145145
private boolean stopped;
146146
private MasterServices master; // Needed by TableEventHandlers
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.snapshot;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
22+
import org.apache.hadoop.hbase.testclassification.ClientTests;
23+
import org.apache.hadoop.hbase.testclassification.LargeTests;
24+
import org.junit.BeforeClass;
25+
import org.junit.experimental.categories.Category;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
@Category({ ClientTests.class, LargeTests.class })
30+
public class TestConcurrentFlushSnapshotFromClient extends TestFlushSnapshotFromClient {
31+
private static final Logger LOG = LoggerFactory.getLogger(TestFlushSnapshotFromClient.class);
32+
33+
@BeforeClass
34+
public static void setupCluster() throws Exception {
35+
setupConf(UTIL.getConfiguration());
36+
UTIL.startMiniCluster(3);
37+
}
38+
39+
protected static void setupConf(Configuration conf) {
40+
TestFlushSnapshotFromClient.setupConf(conf);
41+
UTIL.getConfiguration().setInt(SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, 3);
42+
LOG.info("Config the {} to be 3", SnapshotManager.SNAPSHOT_POOL_THREADS_KEY);
43+
}
44+
}

hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java

Lines changed: 2 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.hadoop.hbase.snapshot;
1919

2020
import static org.junit.Assert.assertEquals;
21-
import static org.junit.Assert.assertTrue;
2221
import static org.junit.Assert.fail;
2322

2423
import java.io.IOException;
@@ -27,7 +26,6 @@
2726
import java.util.HashMap;
2827
import java.util.List;
2928
import java.util.Map;
30-
import java.util.concurrent.CountDownLatch;
3129

3230
import org.apache.commons.logging.Log;
3331
import org.apache.commons.logging.LogFactory;
@@ -65,7 +63,7 @@
6563
@Category(LargeTests.class)
6664
public class TestFlushSnapshotFromClient {
6765
private static final Log LOG = LogFactory.getLog(TestFlushSnapshotFromClient.class);
68-
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
66+
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
6967
private static final int NUM_RS = 2;
7068
private static final byte[] TEST_FAM = Bytes.toBytes("fam");
7169
private static final TableName TABLE_NAME = TableName.valueOf("test");
@@ -86,7 +84,7 @@ public static void setupCluster() throws Exception {
8684
UTIL.startMiniCluster(NUM_RS);
8785
}
8886

89-
private static void setupConf(Configuration conf) {
87+
protected static void setupConf(Configuration conf) {
9088
// disable the ui
9189
conf.setInt("hbase.regionsever.info.port", -1);
9290
// change the flush size to a small amount, regulating number of store files
@@ -400,111 +398,6 @@ public void testFlushCreateListDestroy() throws Exception {
400398
snapshotName, rootDir, fs, true);
401399
}
402400

403-
/**
404-
* Demonstrate that we reject snapshot requests if there is a snapshot already running on the
405-
* same table currently running and that concurrent snapshots on different tables can both
406-
* succeed concurretly.
407-
*/
408-
@Test(timeout=300000)
409-
public void testConcurrentSnapshottingAttempts() throws IOException, InterruptedException {
410-
final TableName TABLE2_NAME = TableName.valueOf(TABLE_NAME + "2");
411-
412-
int ssNum = 20;
413-
Admin admin = UTIL.getHBaseAdmin();
414-
// make sure we don't fail on listing snapshots
415-
SnapshotTestingUtils.assertNoSnapshots(admin);
416-
// create second testing table
417-
SnapshotTestingUtils.createTable(UTIL, TABLE2_NAME, TEST_FAM);
418-
// load the table so we have some data
419-
SnapshotTestingUtils.loadData(UTIL, TABLE_NAME, DEFAULT_NUM_ROWS, TEST_FAM);
420-
SnapshotTestingUtils.loadData(UTIL, TABLE2_NAME, DEFAULT_NUM_ROWS, TEST_FAM);
421-
422-
final CountDownLatch toBeSubmitted = new CountDownLatch(ssNum);
423-
// We'll have one of these per thread
424-
class SSRunnable implements Runnable {
425-
SnapshotDescription ss;
426-
SSRunnable(SnapshotDescription ss) {
427-
this.ss = ss;
428-
}
429-
430-
@Override
431-
public void run() {
432-
try {
433-
Admin admin = UTIL.getHBaseAdmin();
434-
LOG.info("Submitting snapshot request: " + ClientSnapshotDescriptionUtils.toString(ss));
435-
admin.takeSnapshotAsync(ss);
436-
} catch (Exception e) {
437-
LOG.info("Exception during snapshot request: " + ClientSnapshotDescriptionUtils.toString(
438-
ss)
439-
+ ". This is ok, we expect some", e);
440-
}
441-
LOG.info("Submitted snapshot request: " + ClientSnapshotDescriptionUtils.toString(ss));
442-
toBeSubmitted.countDown();
443-
}
444-
};
445-
446-
// build descriptions
447-
SnapshotDescription[] descs = new SnapshotDescription[ssNum];
448-
for (int i = 0; i < ssNum; i++) {
449-
SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
450-
builder.setTable(((i % 2) == 0 ? TABLE_NAME : TABLE2_NAME).getNameAsString());
451-
builder.setName("ss"+i);
452-
builder.setType(SnapshotDescription.Type.FLUSH);
453-
descs[i] = builder.build();
454-
}
455-
456-
// kick each off its own thread
457-
for (int i=0 ; i < ssNum; i++) {
458-
new Thread(new SSRunnable(descs[i])).start();
459-
}
460-
461-
// wait until all have been submitted
462-
toBeSubmitted.await();
463-
464-
// loop until all are done.
465-
while (true) {
466-
int doneCount = 0;
467-
for (SnapshotDescription ss : descs) {
468-
try {
469-
if (admin.isSnapshotFinished(ss)) {
470-
doneCount++;
471-
}
472-
} catch (Exception e) {
473-
LOG.warn("Got an exception when checking for snapshot " + ss.getName(), e);
474-
doneCount++;
475-
}
476-
}
477-
if (doneCount == descs.length) {
478-
break;
479-
}
480-
Thread.sleep(100);
481-
}
482-
483-
// dump for debugging
484-
UTIL.getHBaseCluster().getMaster().getMasterFileSystem().logFileSystemState(LOG);
485-
486-
List<SnapshotDescription> taken = admin.listSnapshots();
487-
int takenSize = taken.size();
488-
LOG.info("Taken " + takenSize + " snapshots: " + taken);
489-
assertTrue("We expect at least 1 request to be rejected because of we concurrently" +
490-
" issued many requests", takenSize < ssNum && takenSize > 0);
491-
492-
// Verify that there's at least one snapshot per table
493-
int t1SnapshotsCount = 0;
494-
int t2SnapshotsCount = 0;
495-
for (SnapshotDescription ss : taken) {
496-
if (TableName.valueOf(ss.getTable()).equals(TABLE_NAME)) {
497-
t1SnapshotsCount++;
498-
} else if (TableName.valueOf(ss.getTable()).equals(TABLE2_NAME)) {
499-
t2SnapshotsCount++;
500-
}
501-
}
502-
assertTrue("We expect at least 1 snapshot of table1 ", t1SnapshotsCount > 0);
503-
assertTrue("We expect at least 1 snapshot of table2 ", t2SnapshotsCount > 0);
504-
505-
UTIL.deleteTable(TABLE2_NAME);
506-
}
507-
508401
private void waitRegionsAfterMerge(final long numRegionsAfterMerge)
509402
throws IOException, InterruptedException {
510403
Admin admin = UTIL.getHBaseAdmin();

0 commit comments

Comments
 (0)