Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public interface MetricsReplicationSourceSource extends BaseSource {
public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues";
public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues";
/* Used to track the age of oldest wal in ms since its creation time */
String OLDEST_WAL_AGE = "source.oldestWalAge";

void setLastShippedAge(long age);
void incrSizeOfLogQueue(int size);
Expand Down Expand Up @@ -79,4 +81,6 @@ public interface MetricsReplicationSourceSource extends BaseSource {
long getWALEditsRead();
long getShippedOps();
long getEditsFiltered();
void setOldestWalAge(long age);
long getOldestWalAge();
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,18 @@ public void incrCompletedRecoveryQueue() {
public void incrFailedRecoveryQueue() {
failedRecoveryQueue.incr(1L);
}

@Override
public void setOldestWalAge(long age) {
// Not implemented
}

@Override
public long getOldestWalAge() {
// Not implemented
return 0;
}

@Override
public void init() {
rms.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final String logReadInBytesKey;
private final String shippedHFilesKey;
private final String sizeOfHFileRefsQueueKey;
private final String oldestWalAgeKey;

private final MutableHistogram ageOfLastShippedOpHist;
private final MutableGaugeLong sizeOfLogQueueGauge;
Expand Down Expand Up @@ -71,6 +72,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final MutableFastCounter repeatedFileBytes;
private final MutableFastCounter completedWAL;
private final MutableFastCounter completedRecoveryQueue;
private final MutableGaugeLong oldestWalAge;

public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
this.rms = rms;
Expand Down Expand Up @@ -130,6 +132,9 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri

completedRecoveryKey = this.keyPrefix + "completedRecoverQueues";
completedRecoveryQueue = rms.getMetricsRegistry().getCounter(completedRecoveryKey, 0L);

oldestWalAgeKey = this.keyPrefix + "oldestWalAge";
oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L);
}

@Override public void setLastShippedAge(long age) {
Expand Down Expand Up @@ -195,6 +200,7 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri
rms.removeMetric(repeatedBytesKey);
rms.removeMetric(completedLogsKey);
rms.removeMetric(completedRecoveryKey);
rms.removeMetric(oldestWalAgeKey);
}

@Override
Expand Down Expand Up @@ -260,6 +266,14 @@ public void incrCompletedRecoveryQueue() {
@Override
public void incrFailedRecoveryQueue() {/*no op*/}

@Override public void setOldestWalAge(long age) {
oldestWalAge.set(age);
}

@Override public long getOldestWalAge() {
return oldestWalAge.value();
}

@Override
public void init() {
rms.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,17 @@ public void incrFailedRecoveryQueue() {
globalSourceSource.incrFailedRecoveryQueue();
}

/*
Sets the age of oldest log file just for source.
*/
public void setOldestWalAge(long age) {
singleSourceSource.setOldestWalAge(age);
}

public long getOldestWalAge() {
return singleSourceSource.getOldestWalAge();
}

@Override
public void init() {
singleSourceSource.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man
}

@Override
protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId,
PriorityBlockingQueue<Path> queue) {
return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage);
protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId) {
return new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, this, queueStorage);
}

public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
public void locateRecoveredPaths(String walGroupId) throws IOException {
boolean hasPathChanged = false;
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
PriorityBlockingQueue<Path> newPaths = new PriorityBlockingQueue<Path>(queueSizePerGroup,
new AbstractFSWALProvider.WALStartTimeComparator());
pathsLoop: for (Path path : queue) {
Expand Down Expand Up @@ -117,9 +117,9 @@ public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOExc
// put the correct locations in the queue
// since this is a recovered queue with no new incoming logs,
// there shouldn't be any concurrency issues
queue.clear();
logQueue.clear(walGroupId);
for (Path path : newPaths) {
queue.add(path);
logQueue.enqueueLog(path, walGroupId);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.Threads;
Expand All @@ -40,9 +38,9 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
private final ReplicationQueueStorage replicationQueues;

public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId,
PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source,
ReplicationSourceLogQueue logQueue, RecoveredReplicationSource source,
ReplicationQueueStorage queueStorage) {
super(conf, walGroupId, queue, source);
super(conf, walGroupId, logQueue, source);
this.source = source;
this.replicationQueues = queueStorage;
}
Expand All @@ -65,7 +63,7 @@ public long getStartPosition() {
int numRetries = 0;
while (numRetries <= maxRetriesMultiplier) {
try {
source.locateRecoveredPaths(queue);
source.locateRecoveredPaths(walGroupId);
break;
} catch (IOException e) {
LOG.error("Error while locating recovered queue paths, attempt #" + numRetries);
Expand All @@ -82,9 +80,9 @@ private long getRecoveredQueueStartPos() {
String peerClusterZNode = source.getQueueId();
try {
startPosition = this.replicationQueues.getWALPosition(source.getServer().getServerName(),
peerClusterZNode, this.queue.peek().getName());
LOG.trace("Recovered queue started with log {} at position {}", this.queue.peek(),
startPosition);
peerClusterZNode, this.logQueue.getQueue(walGroupId).peek().getName());
LOG.trace("Recovered queue started with log {} at position {}",
this.logQueue.getQueue(walGroupId).peek(), startPosition);
} catch (ReplicationException e) {
terminate("Couldn't get the position of this recovered queue " + peerClusterZNode, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -86,11 +84,9 @@
public class ReplicationSource implements ReplicationSourceInterface {

private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
// Queues of logs to process, entry in format of walGroupId->queue,
// each presents a queue for one wal group
private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>();
// per group queue size, keep no more than this number of logs in each wal group
protected int queueSizePerGroup;
protected ReplicationSourceLogQueue logQueue;
protected ReplicationQueueStorage queueStorage;
protected ReplicationPeer replicationPeer;

Expand Down Expand Up @@ -118,8 +114,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
volatile boolean sourceRunning = false;
// Metrics for this source
private MetricsSource metrics;
// WARN threshold for the number of queued logs, defaults to 2
private int logQueueWarnThreshold;
// ReplicationEndpoint which will handle the actual replication
private volatile ReplicationEndpoint replicationEndpoint;

Expand Down Expand Up @@ -213,6 +207,7 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man
this.maxRetriesMultiplier =
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
this.logQueue = new ReplicationSourceLogQueue(conf, metrics, this);
this.queueStorage = queueStorage;
this.replicationPeer = replicationPeer;
this.manager = manager;
Expand All @@ -224,7 +219,6 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man
this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
// ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId();
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);

// A defaultBandwidth of '0' means no bandwidth; i.e. no throttling.
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
Expand Down Expand Up @@ -255,35 +249,20 @@ public void enqueueLog(Path wal) {
}
// Use WAL prefix as the WALGroupId for this peer.
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
PriorityBlockingQueue<Path> queue = queues.get(walPrefix);
if (queue == null) {
queue = new PriorityBlockingQueue<>(queueSizePerGroup,
new AbstractFSWALProvider.WALStartTimeComparator());
// make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
// the shipper may quit immediately
queue.put(wal);
queues.put(walPrefix, queue);
boolean queueExists = logQueue.enqueueLog(wal, walPrefix);

if (!queueExists) {
if (this.isSourceActive() && this.walEntryFilter != null) {
// new wal group observed after source startup, start a new worker thread to track it
// notice: it's possible that wal enqueued when this.running is set but worker thread
// still not launched, so it's necessary to check workerThreads before start the worker
tryStartNewShipper(walPrefix, queue);
tryStartNewShipper(walPrefix);
}
} else {
queue.put(wal);
}
if (LOG.isTraceEnabled()) {
LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walPrefix,
this.replicationQueueInfo.getQueueId());
}
this.metrics.incrSizeOfLogQueue();
// This will wal a warning for each new wal that gets created above the warn threshold
int queueSize = queue.size();
if (queueSize > this.logQueueWarnThreshold) {
LOG.warn("{} WAL group {} queue size: {} exceeds value of "
+ "replication.source.log.queue.warn: {}", logPeerId(),
walPrefix, queueSize, logQueueWarnThreshold);
}
}

@Override
Expand Down Expand Up @@ -375,16 +354,16 @@ private void initializeWALEntryFilter(UUID peerClusterId) {
this.walEntryFilter = new ChainWALEntryFilter(filters);
}

private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
private void tryStartNewShipper(String walGroupId) {
workerThreads.compute(walGroupId, (key, value) -> {
if (value != null) {
LOG.debug("{} preempted start of shipping worker walGroupId={}", logPeerId(), walGroupId);
return value;
} else {
LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId);
ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
ReplicationSourceShipper worker = createNewShipper(walGroupId);
ReplicationSourceWALReader walReader =
createNewWALReader(walGroupId, queue, worker.getStartPosition());
createNewWALReader(walGroupId, worker.getStartPosition());
Threads.setDaemonThreadRunning(
walReader, Thread.currentThread().getName()
+ ".replicationSource.wal-reader." + walGroupId + "," + queueId,
Expand All @@ -404,7 +383,7 @@ public Map<String, ReplicationStatus> getWalGroupStatus() {
String walGroupId = walGroupShipper.getKey();
ReplicationSourceShipper shipper = walGroupShipper.getValue();
ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId);
int queueSize = queues.get(walGroupId).size();
int queueSize = logQueue.getQueueSize(walGroupId);
replicationDelay = metrics.getReplicationDelay();
Path currentPath = shipper.getCurrentPath();
fileSize = -1;
Expand Down Expand Up @@ -443,16 +422,16 @@ private long getFileSize(Path currentPath) throws IOException {
return fileSize;
}

protected ReplicationSourceShipper createNewShipper(String walGroupId,
PriorityBlockingQueue<Path> queue) {
return new ReplicationSourceShipper(conf, walGroupId, queue, this);
protected ReplicationSourceShipper createNewShipper(String walGroupId) {
return new ReplicationSourceShipper(conf, walGroupId, logQueue, this);
}

private ReplicationSourceWALReader createNewWALReader(String walGroupId,
PriorityBlockingQueue<Path> queue, long startPosition) {
private ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition) {
return replicationPeer.getPeerConfig().isSerial()
? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this)
: new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
? new SerialReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter,
this, walGroupId)
: new ReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter,
this, walGroupId);
}

/**
Expand Down Expand Up @@ -621,14 +600,12 @@ private void initialize() {
throw new IllegalStateException("Source should be active.");
}
LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}",
logPeerId(), this.replicationQueueInfo.getQueueId(), this.queues.size(), clusterId,
logPeerId(), this.replicationQueueInfo.getQueueId(), logQueue.getNumQueues(), clusterId,
peerClusterId);
initializeWALEntryFilter(peerClusterId);
// Start workers
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
String walGroupId = entry.getKey();
PriorityBlockingQueue<Path> queue = entry.getValue();
tryStartNewShipper(walGroupId, queue);
for (String walGroupId: logQueue.getQueues().keySet()) {
tryStartNewShipper(walGroupId);
}
this.startupOngoing.set(false);
}
Expand Down Expand Up @@ -857,7 +834,7 @@ public ReplicationQueueStorage getReplicationQueueStorage() {
/**
* @return String to use as a log prefix that contains current peerId.
*/
private String logPeerId(){
public String logPeerId(){
return "peerId=" + this.getPeerId() + ",";
}
}
Loading