Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ private void uncaughtException(Thread t, Throwable e, ReplicationSourceManager m
t.getName());
manager.refreshSources(peerId);
break;
} catch (IOException e1) {
} catch (IOException | ReplicationException e1) {
LOG.error("Replication sources refresh failed.", e1);
sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,38 +403,44 @@ public void drainSources(String peerId) throws IOException, ReplicationException
// TODO: use empty initial offsets for now, revisit when adding support for sync replication
ReplicationSourceInterface src =
createSource(new ReplicationQueueData(queueId, ImmutableMap.of()), peer);
// synchronized here to avoid race with preLogRoll where we add new log to source and also
// synchronized here to avoid race with postLogRoll where we add new log to source and also
// walsById.
ReplicationSourceInterface toRemove;
Map<String, NavigableSet<String>> wals = new HashMap<>();
ReplicationQueueData queueData;
synchronized (latestPaths) {
// Here we make a copy of all the remaining wal files and then delete them from the
// replication queue storage after releasing the lock. It is not safe to just remove the old
// map from walsById since later we may fail to update the replication queue storage, and when
// we retry next time, we can not know the wal files that needs to be set to the replication
// queue storage
ImmutableMap.Builder<String, ReplicationGroupOffset> builder = ImmutableMap.builder();
synchronized (walsById) {
walsById.get(queueId).forEach((group, wals) -> {
if (!wals.isEmpty()) {
builder.put(group, new ReplicationGroupOffset(wals.last(), -1));
}
});
}
queueData = new ReplicationQueueData(queueId, builder.build());
src = createSource(queueData, peer);
toRemove = sources.put(peerId, src);
if (toRemove != null) {
LOG.info("Terminate replication source for " + toRemove.getPeerId());
toRemove.terminate(terminateMessage);
toRemove.getSourceMetrics().clear();
}
// Here we make a copy of all the remaining wal files and then delete them from the
// replication queue storage after releasing the lock. It is not safe to just remove the old
// map from walsById since later we may fail to delete them from the replication queue
// storage, and when we retry next time, we can not know the wal files that need to be deleted
// from the replication queue storage.
walsById.get(queueId).forEach((k, v) -> wals.put(k, new TreeSet<>(v)));
}
for (Map.Entry<String, ReplicationGroupOffset> entry : queueData.getOffsets().entrySet()) {
queueStorage.setOffset(queueId, entry.getKey(), entry.getValue(), Collections.emptyMap());
}
LOG.info("Startup replication source for " + src.getPeerId());
src.startup();
for (NavigableSet<String> walsByGroup : wals.values()) {
// TODO: just need to reset the replication offset
// for (String wal : walsByGroup) {
// queueStorage.removeWAL(server.getServerName(), peerId, wal);
// }
}
synchronized (walsById) {
Map<String, NavigableSet<String>> oldWals = walsById.get(queueId);
wals.forEach((k, v) -> {
NavigableSet<String> walsByGroup = oldWals.get(k);
Map<String, NavigableSet<String>> wals = walsById.get(queueId);
queueData.getOffsets().forEach((group, offset) -> {
NavigableSet<String> walsByGroup = wals.get(group);
if (walsByGroup != null) {
walsByGroup.removeAll(v);
walsByGroup.headSet(offset.getWal(), true).clear();
}
});
}
Expand All @@ -457,13 +463,8 @@ public void drainSources(String peerId) throws IOException, ReplicationException
}

private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId queueId,
ReplicationPeer peer) throws IOException {
Map<String, ReplicationGroupOffset> offsets;
try {
offsets = queueStorage.getOffsets(queueId);
} catch (ReplicationException e) {
throw new IOException(e);
}
ReplicationPeer peer) throws IOException, ReplicationException {
Map<String, ReplicationGroupOffset> offsets = queueStorage.getOffsets(queueId);
return createSource(new ReplicationQueueData(queueId, ImmutableMap.copyOf(offsets)), peer);
}

Expand All @@ -473,7 +474,7 @@ private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId queu
* replication queue storage and only to enqueue all logs to the new replication source
* @param peerId the id of the replication peer
*/
public void refreshSources(String peerId) throws IOException {
public void refreshSources(String peerId) throws ReplicationException, IOException {
String terminateMessage = "Peer " + peerId
+ " state or config changed. Will close the previous replication source and open a new one";
ReplicationPeer peer = replicationPeers.getPeer(peerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,9 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;

// TODO: revisit later
@Ignore
@Category({ ReplicationTests.class, MediumTests.class })
public class TestDrainReplicationQueuesForStandBy extends SyncReplicationTestBase {

Expand Down