Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -17,20 +17,16 @@
*/

package org.apache.hadoop.hbase.replication;

import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Skips WAL edits for all System tables including META
* Skips WAL edits for all System tables including hbase:meta.
*/
@InterfaceAudience.Private
public class SystemTableWALEntryFilter implements WALEntryFilter {
@Override
public Entry filter(Entry entry) {
if (entry.getKey().getTableName().isSystemTable()) {
return null;
}
return entry;
return entry.getKey().getTableName().isSystemTable()? null: entry;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -16,10 +16,9 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;

import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;

/**
* A Filter for WAL entries before being sent over to replication. Multiple
Expand All @@ -34,7 +33,6 @@
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public interface WALEntryFilter {

/**
* <p>
* Applies the filter, possibly returning a different Entry instance. If null is returned, the
Expand All @@ -49,5 +47,5 @@ public interface WALEntryFilter {
* @return a (possibly modified) Entry to use. Returning null or an entry with no cells will cause
* the entry to be skipped for replication.
*/
public Entry filter(Entry entry);
Entry filter(Entry entry);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For system tables we return null Entry. Good to consider return type Optional<Entry> here?
Maybe as follow up task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how it works now.... no Optional. Will leave it in this patch. Thanks.

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -18,12 +18,12 @@
package org.apache.hadoop.hbase.replication.regionserver;

import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand All @@ -36,6 +36,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -63,15 +64,15 @@
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

/**
* Class that handles the source of a replication stream.
* Currently does not handle more than 1 slave
* Currently does not handle more than 1 slave cluster.
* For each slave cluster it selects a random number of peers
* using a replication ratio. For example, if replication ration = 0.1
* and slave cluster has 100 region servers, 10 will be selected.
Expand Down Expand Up @@ -120,8 +121,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
private int logQueueWarnThreshold;
// ReplicationEndpoint which will handle the actual replication
private volatile ReplicationEndpoint replicationEndpoint;
// A filter (or a chain of filters) for the WAL entries.

/**
* A filter (or a chain of filters) for WAL entries; filters out edits.
*/
protected volatile WALEntryFilter walEntryFilter;

// throttler
private ReplicationThrottler throttler;
private long defaultBandwidth;
Expand All @@ -139,6 +144,39 @@ public class ReplicationSource implements ReplicationSourceInterface {

private Thread initThread;

/**
* WALs to replicate.
* Predicate that returns 'true' for WALs to replicate and false for WALs to skip.
*/
private final Predicate<Path> filterInWALs;

/**
* Base WALEntry filters for this class. Unmodifiable. Set on construction.
* Filters *out* edits we do not want replicated, passed on to replication endpoints.
* This is the basic set. Down in #initializeWALEntryFilter this set is added to the end of
* the WALEntry filter chain. These are put after those that we pick up from the configured
* endpoints and other machinations to create the final {@link #walEntryFilter}.
* @see WALEntryFilter
*/
private final List<WALEntryFilter> baseFilterOutWALEntries;

ReplicationSource() {
// Default, filters *in* all WALs but meta WALs & filters *out* all WALEntries of System Tables.
this(p -> !AbstractFSWALProvider.isMetaFile(p),
Lists.newArrayList(new SystemTableWALEntryFilter()));
}

/**
* @param replicateWAL Pass a filter to run against WAL Path; filter *in* WALs to Replicate;
* i.e. return 'true' if you want to replicate the content of the WAL.
* @param baseFilterOutWALEntries Base set of filters you want applied always; filters *out*
* WALEntries so they never make it out of this ReplicationSource.
*/
ReplicationSource(Predicate<Path> replicateWAL, List<WALEntryFilter> baseFilterOutWALEntries) {
this.filterInWALs = replicateWAL;
this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries);
}

/**
* Instantiation method used by region servers
* @param conf configuration to use
Expand Down Expand Up @@ -194,26 +232,30 @@ private void decorateConf() {
}

@Override
public void enqueueLog(Path log) {
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName());
public void enqueueLog(Path wal) {
if (!this.filterInWALs.test(wal)) {
LOG.trace("NOT replicating {}", wal);
return;
}
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
if (queue == null) {
queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
queues.put(logPrefix, queue);
if (this.isSourceActive() && this.walEntryFilter != null) {
// new wal group observed after source startup, start a new worker thread to track it
// notice: it's possible that log enqueued when this.running is set but worker thread
// notice: it's possible that wal enqueued when this.running is set but worker thread
// still not launched, so it's necessary to check workerThreads before start the worker
tryStartNewShipper(logPrefix, queue);
}
}
queue.put(log);
queue.put(wal);
if (LOG.isTraceEnabled()) {
LOG.trace("{} Added log file {} to queue of source {}.", logPeerId(), logPrefix,
LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), logPrefix,
this.replicationQueueInfo.getQueueId());
}
this.metrics.incrSizeOfLogQueue();
// This will log a warning for each new log that gets created above the warn threshold
// This will wal a warning for each new wal that gets created above the warn threshold
int queueSize = queue.size();
if (queueSize > this.logQueueWarnThreshold) {
LOG.warn("{} WAL group {} queue size: {} exceeds value of "
Expand Down Expand Up @@ -302,8 +344,7 @@ private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndp

private void initializeWALEntryFilter(UUID peerClusterId) {
// get the WALEntryFilter from ReplicationEndpoint and add it to default filters
ArrayList<WALEntryFilter> filters =
Lists.<WALEntryFilter> newArrayList(new SystemTableWALEntryFilter());
List<WALEntryFilter> filters = new ArrayList<>(this.baseFilterOutWALEntries);
WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
if (filterFromEndpoint != null) {
filters.add(filterFromEndpoint);
Expand Down Expand Up @@ -392,6 +433,16 @@ private ReplicationSourceWALReader createNewWALReader(String walGroupId,
: new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
}

/**
* Call after {@link #initializeWALEntryFilter(UUID)} else it will be null.
* @return The WAL Entry Filter Chain this ReplicationSource will use on WAL files filtering
* out WALEntry edits.
*/
@VisibleForTesting
WALEntryFilter getWalEntryFilter() {
return walEntryFilter;
}

protected final void uncaughtException(Thread t, Throwable e) {
RSRpcServices.exitIfOOME(e);
LOG.error("Unexpected exception in {} currentPath={}",
Expand Down Expand Up @@ -623,7 +674,10 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool
}
}
if (clearMetrics) {
this.metrics.clear();
// Can be null in test context.
if (this.metrics != null) {
this.metrics.clear();
}
}
}

Expand Down Expand Up @@ -653,10 +707,6 @@ public boolean isSourceActive() {
return !this.server.isStopped() && this.sourceRunning;
}

public UUID getPeerClusterUUID(){
return this.clusterId;
}

/**
* Comparator used to compare logs together based on their start time
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -45,7 +45,6 @@
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

Expand Down Expand Up @@ -308,7 +307,6 @@ public static String getWALDirectoryName(final String serverName) {
* Construct the directory name for all old WALs on a given server. The default old WALs dir looks
* like: <code>hbase/oldWALs</code>. If you config hbase.separate.oldlogdir.by.regionserver to
* true, it looks like <code>hbase//oldWALs/kalashnikov.att.net,61634,1486865297088</code>.
* @param conf
* @param serverName Server name formatted as described in {@link ServerName}
* @return the relative WAL directory name
*/
Expand Down Expand Up @@ -414,11 +412,11 @@ public static boolean isMetaFile(Path p) {
return isMetaFile(p.getName());
}

/**
* @return True if String ends in {@link #META_WAL_PROVIDER_ID}
*/
public static boolean isMetaFile(String p) {
if (p != null && p.endsWith(META_WAL_PROVIDER_ID)) {
return true;
}
return false;
return p != null && p.endsWith(META_WAL_PROVIDER_ID);
}

public static boolean isArchivedLogFile(Path p) {
Expand Down Expand Up @@ -461,12 +459,9 @@ public static Path getArchivedLogPath(Path path, Configuration conf) throws IOEx
* @param path path to WAL file
* @param conf configuration
* @return WAL Reader instance
* @throws IOException
*/
public static org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Configuration conf)
throws IOException

{
throws IOException {
long retryInterval = 2000; // 2 sec
int maxAttempts = 30;
int attempt = 0;
Expand Down
Loading