Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public ReplicationSourceManager getReplicationManager() {
return this.replicationManager;
}

void addHFileRefsToQueue(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
public void addHFileRefsToQueue(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws IOException {
try {
this.replicationManager.addHFileRefs(tableName, family, pairs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -60,6 +59,8 @@ public void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironmen
RegionCoprocessorEnvironment env = ctx.getEnvironment();
Configuration c = env.getConfiguration();
if (pairs == null || pairs.isEmpty() ||
env.getRegion().getTableDescriptor().getColumnFamily(family).getScope()
!= HConstants.REPLICATION_SCOPE_GLOBAL ||
!c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) {
LOG.debug("Skipping recording bulk load entries in preCommitStoreFile for bulkloaded "
Expand All @@ -70,7 +71,7 @@ public void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironmen
// just going to break. This is all private. Not allowed. Regions shouldn't assume they are
// hosted in a RegionServer. TODO: fix.
RegionServerServices rss = ((HasRegionServerServices)env).getRegionServerServices();
Replication rep = (Replication)((HRegionServer)rss).getReplicationSourceService();
Replication rep = (Replication)rss.getReplicationSourceService();
rep.addHFileRefsToQueue(env.getRegionInfo().getTable(), family, pairs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.math.BigDecimal;
Expand All @@ -61,6 +63,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand Down Expand Up @@ -128,6 +131,7 @@
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
Expand All @@ -139,8 +143,10 @@
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
import org.apache.hadoop.hbase.security.User;

import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
Expand All @@ -150,6 +156,7 @@
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.FaultyFSLog;
Expand All @@ -161,6 +168,7 @@
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.wal.WALSplitter;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -169,6 +177,7 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
Expand Down Expand Up @@ -220,6 +229,8 @@ public class TestHRegion {
HRegion region = null;
// Do not run unit tests in parallel (? Why not? It don't work? Why not? St.Ack)
protected static HBaseTestingUtility TEST_UTIL;
@ClassRule
public static TemporaryFolder testFolder = new TemporaryFolder();
public static Configuration CONF ;
private String dir;
private final int MAX_VERSIONS = 2;
Expand Down Expand Up @@ -6281,6 +6292,44 @@ public void testBulkLoadReplicationEnabled() throws IOException {
getCoprocessors().contains(ReplicationObserver.class.getSimpleName()));
}

@Test
public void testSkipAddHFileRefNode() throws IOException { // test for HBASE-22335
TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42);
final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
Replication replication = Mockito.mock(Replication.class);
Mockito.doThrow(
new IOException("REPLICATION_SCOPE is 0, should skip recording bulk load entries."))
.when(replication).addHFileRefsToQueue(Mockito.any(), Mockito.any(), Mockito.any());
when(rss.getReplicationSourceService()).thenReturn(replication);
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
htd.addFamily(new HColumnDescriptor(fam1));
HRegionInfo hri = new HRegionInfo(htd.getTableName(),
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), TEST_UTIL.getConfiguration(),
rss, null);

List<Pair<byte[], String>> familyPaths = new ArrayList<>();
HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(
TEST_UTIL.getConfiguration());
File hFileLocation = testFolder.newFile();
try (FSDataOutputStream out = new FSDataOutputStream(
new FileOutputStream(hFileLocation), null)) {
hFileFactory.withOutputStream(out);
hFileFactory.withFileContext(new HFileContext());
HFile.Writer writer = hFileFactory.create();
try {
byte[] randomBytes = new byte[100];
writer.append(new KeyValue(CellUtil.createCell(randomBytes, fam1, randomBytes,
0L, KeyValue.Type.Put.getCode(), randomBytes)));
} finally {
writer.close();
}
}
familyPaths.add(new Pair<>(fam1, hFileLocation.getAbsoluteFile().getAbsolutePath()));
region.bulkLoadHFiles(familyPaths, false, null);
}

/**
* The same as HRegion class, the only difference is that instantiateHStore will
* create a different HStore - HStoreForTesting. [HBASE-8518]
Expand Down