Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.common.util.concurrent.ThreadContext;
Expand All @@ -56,7 +55,6 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
Expand All @@ -69,7 +67,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
static final int PROCESSOR_RETRY_LIMIT = 16;
static final int DEFAULT_CONCURRENT_PROCESSORS = 1;
static final long DEFAULT_MAX_TRANSLOG_BYTES= Long.MAX_VALUE;
private static final TimeValue RETRY_TIMEOUT = TimeValue.timeValueMillis(500);
private static final TimeValue CHECK_LEADER_GLOBAL_CHECKPOINT_INTERVAL = TimeValue.timeValueSeconds(3);

private final Client client;
private final ThreadPool threadPool;
Expand Down Expand Up @@ -130,58 +128,20 @@ protected void nodeOperation(AllocatedPersistentTask task, ShardFollowTask param
void prepare(Client leaderClient, Client followerClient, ShardFollowNodeTask task, ShardFollowTask params,
long followGlobalCheckPoint,
IndexMetadataVersionChecker imdVersionChecker) {
if (task.isRunning() == false) {
// TODO: need better cancellation control
return;
}

final ShardId leaderShard = params.getLeaderShardId();
final ShardId followerShard = params.getFollowShardId();
fetchGlobalCheckpoint(leaderClient, leaderShard, leaderGlobalCheckPoint -> {
// TODO: check if both indices have the same history uuid
if (leaderGlobalCheckPoint == followGlobalCheckPoint) {
logger.debug("{} no write operations to fetch", followerShard);
retry(leaderClient, followerClient, task, params, followGlobalCheckPoint, imdVersionChecker);
} else {
assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followGlobalCheckPoint +
"] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]";
logger.debug("{} fetching write operations, leaderGlobalCheckPoint={}, followGlobalCheckPoint={}", followerShard,
leaderGlobalCheckPoint, followGlobalCheckPoint);
Executor ccrExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME);
Consumer<Exception> handler = e -> {
if (e == null) {
task.updateProcessedGlobalCheckpoint(leaderGlobalCheckPoint);
prepare(leaderClient, followerClient, task, params, leaderGlobalCheckPoint, imdVersionChecker);
} else {
task.markAsFailed(e);
}
};
ChunksCoordinator coordinator = new ChunksCoordinator(followerClient, leaderClient, ccrExecutor, imdVersionChecker,
params.getMaxChunkSize(), params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard,
followerShard, handler);
coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint);
coordinator.start();
}
logger.debug("{} fetching write operations, leaderGlobalCheckPoint={}, followGlobalCheckPoint={}", followerShard,
leaderGlobalCheckPoint, followGlobalCheckPoint);
ChunksCoordinator coordinator = new ChunksCoordinator(followerClient, leaderClient, threadPool, imdVersionChecker,
params.getMaxChunkSize(), params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard,
followerShard, task::markAsFailed, task::isRunning, task::updateProcessedGlobalCheckpoint);
coordinator.start(followGlobalCheckPoint, leaderGlobalCheckPoint);
}, task::markAsFailed);
}

private void retry(Client leaderClient, Client followerClient, ShardFollowNodeTask task, ShardFollowTask params,
long followGlobalCheckPoint,
IndexMetadataVersionChecker imdVersionChecker) {
threadPool.schedule(RETRY_TIMEOUT, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
task.markAsFailed(e);
}

@Override
protected void doRun() throws Exception {
prepare(leaderClient, followerClient, task, params, followGlobalCheckPoint, imdVersionChecker);
}
});
}

private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer handler, Consumer<Exception> errorHandler) {
private static void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer handler, Consumer<Exception> errorHandler) {
client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> {
IndexStats indexStats = r.getIndex(shardId.getIndexName());
Optional<ShardStats> filteredShardStats = Arrays.stream(indexStats.getShards())
Expand All @@ -201,37 +161,53 @@ private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer
static class ChunksCoordinator {

private static final Logger LOGGER = Loggers.getLogger(ChunksCoordinator.class);

private final Client followerClient;
private final Client leaderClient;
private final ThreadPool threadPool;
private final Executor ccrExecutor;
private final IndexMetadataVersionChecker imdVersionChecker;

private final long batchSize;
private final int concurrentProcessors;
private final int maxConcurrentWorker;
private final long processorMaxTranslogBytes;
private final ShardId leaderShard;
private final ShardId followerShard;
private final Consumer<Exception> handler;

private final CountDown countDown;
private final Queue<long[]> chunks = new ConcurrentLinkedQueue<>();
private final AtomicReference<Exception> failureHolder = new AtomicReference<>();

ChunksCoordinator(Client followerClient, Client leaderClient, Executor ccrExecutor, IndexMetadataVersionChecker imdVersionChecker,
long batchSize, int concurrentProcessors, long processorMaxTranslogBytes, ShardId leaderShard,
ShardId followerShard, Consumer<Exception> handler) {
private final Consumer<Exception> failureHandler;
private final Supplier<Boolean> stateSupplier;
private final LongConsumer processedGlobalCheckpointUpdater;

private final AtomicInteger activeWorkers;
private final AtomicLong lastProcessedGlobalCheckpoint;
private final Queue<long[]> chunkWorkerQueue = new ConcurrentLinkedQueue<>();

ChunksCoordinator(Client followerClient,
Client leaderClient,
ThreadPool threadPool,
IndexMetadataVersionChecker imdVersionChecker,
long batchSize,
int maxConcurrentWorker,
long processorMaxTranslogBytes,
ShardId leaderShard,
ShardId followerShard,
Consumer<Exception> failureHandler,
Supplier<Boolean> runningSuppler,
LongConsumer processedGlobalCheckpointUpdater) {
this.followerClient = followerClient;
this.leaderClient = leaderClient;
this.ccrExecutor = ccrExecutor;
this.threadPool = threadPool;
this.imdVersionChecker = imdVersionChecker;
this.ccrExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME);
this.batchSize = batchSize;
this.concurrentProcessors = concurrentProcessors;
this.maxConcurrentWorker = maxConcurrentWorker;
this.processorMaxTranslogBytes = processorMaxTranslogBytes;
this.leaderShard = leaderShard;
this.followerShard = followerShard;
this.handler = handler;
this.countDown = new CountDown(concurrentProcessors);
this.failureHandler = failureHandler;
this.stateSupplier = runningSuppler;
this.processedGlobalCheckpointUpdater = processedGlobalCheckpointUpdater;
this.activeWorkers = new AtomicInteger();
this.lastProcessedGlobalCheckpoint = new AtomicLong();
}

/**
Expand All @@ -244,69 +220,113 @@ void createChucks(final long from, final long to) {
LOGGER.debug("{} Creating chunks for operation range [{}] to [{}]", leaderShard, from, to);
for (long i = from; i < to; i += batchSize) {
long v2 = i + batchSize <= to ? i + batchSize - 1 : to;
chunks.add(new long[]{i, v2});
chunkWorkerQueue.add(new long[]{i, v2});
}
}

void start() {
void updateChunksQueue(long previousGlobalcheckpoint) {
schedule(CHECK_LEADER_GLOBAL_CHECKPOINT_INTERVAL, () -> {
if (stateSupplier.get() == false) {
chunkWorkerQueue.clear();
return;
}

fetchGlobalCheckpoint(leaderClient, leaderShard, currentGlobalCheckPoint -> {
if (currentGlobalCheckPoint != previousGlobalcheckpoint) {
assert previousGlobalcheckpoint < currentGlobalCheckPoint : "followGlobalCheckPoint [" + previousGlobalcheckpoint +
"] is not below leaderGlobalCheckPoint [" + currentGlobalCheckPoint + "]";
createChucks(previousGlobalcheckpoint, currentGlobalCheckPoint);
initiateChunkWorkers();
updateChunksQueue(currentGlobalCheckPoint);
} else {
LOGGER.debug("{} no write operations to fetch", followerShard);
updateChunksQueue(previousGlobalcheckpoint);
}
}, failureHandler);
});
}

void start(long followerGlobalCheckpoint, long leaderGlobalCheckPoint) {
createChucks(followerGlobalCheckpoint, leaderGlobalCheckPoint);
LOGGER.debug("{} Start coordination of [{}] chunks with [{}] concurrent processors",
leaderShard, chunks.size(), concurrentProcessors);
for (int i = 0; i < concurrentProcessors; i++) {
leaderShard, chunkWorkerQueue.size(), maxConcurrentWorker);
initiateChunkWorkers();
updateChunksQueue(leaderGlobalCheckPoint);
}

void initiateChunkWorkers() {
int workersToStart = maxConcurrentWorker - activeWorkers.get();
if (workersToStart == 0) {
LOGGER.debug("{} No new chunk workers were started", followerShard);
return;
}

LOGGER.debug("{} Starting [{}] new chunk workers", followerShard, workersToStart);
for (int i = 0; i < workersToStart; i++) {
ccrExecutor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
assert e != null;
LOGGER.error(() -> new ParameterizedMessage("{} Failure starting processor", leaderShard), e);
postProcessChuck(e);
LOGGER.error(() -> new ParameterizedMessage("{} Failure starting processor", followerShard), e);
failureHandler.accept(e);
}

@Override
protected void doRun() throws Exception {
processNextChunk();
}
});
activeWorkers.incrementAndGet();
}
}

void processNextChunk() {
long[] chunk = chunks.poll();
long[] chunk = chunkWorkerQueue.poll();
if (chunk == null) {
postProcessChuck(null);
int activeWorkers = this.activeWorkers.decrementAndGet();
LOGGER.debug("{} No more chunks to process, active workers [{}]", leaderShard, activeWorkers);
return;
}
LOGGER.debug("{} Processing chunk [{}/{}]", leaderShard, chunk[0], chunk[1]);
Consumer<Exception> processorHandler = e -> {
if (e == null) {
LOGGER.debug("{} Successfully processed chunk [{}/{}]", leaderShard, chunk[0], chunk[1]);
if (lastProcessedGlobalCheckpoint.updateAndGet(x -> x < chunk[1] ? chunk[1] : x) == chunk[1]) {
processedGlobalCheckpointUpdater.accept(chunk[1]);
}
processNextChunk();
} else {
LOGGER.error(() -> new ParameterizedMessage("{} Failure processing chunk [{}/{}]",
leaderShard, chunk[0], chunk[1]), e);
postProcessChuck(e);
failureHandler.accept(e);
}
};
ChunkProcessor processor = new ChunkProcessor(leaderClient, followerClient, chunks, ccrExecutor, imdVersionChecker,
ChunkWorker worker = new ChunkWorker(leaderClient, followerClient, chunkWorkerQueue, ccrExecutor, imdVersionChecker,
leaderShard, followerShard, processorHandler);
processor.start(chunk[0], chunk[1], processorMaxTranslogBytes);
worker.start(chunk[0], chunk[1], processorMaxTranslogBytes);
}

void postProcessChuck(Exception e) {
if (failureHolder.compareAndSet(null, e) == false) {
Exception firstFailure = failureHolder.get();
firstFailure.addSuppressed(e);
}
if (countDown.countDown()) {
handler.accept(failureHolder.get());
}
void schedule(TimeValue delay, Runnable runnable) {
threadPool.schedule(delay, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
failureHandler.accept(e);
}

@Override
protected void doRun() throws Exception {
runnable.run();
}
});
}

Queue<long[]> getChunks() {
return chunks;
Queue<long[]> getChunkWorkerQueue() {
return chunkWorkerQueue;
}

}

static class ChunkProcessor {
static class ChunkWorker {

private final Client leaderClient;
private final Client followerClient;
Expand All @@ -319,9 +339,9 @@ static class ChunkProcessor {
private final Consumer<Exception> handler;
final AtomicInteger retryCounter = new AtomicInteger(0);

ChunkProcessor(Client leaderClient, Client followerClient, Queue<long[]> chunks, Executor ccrExecutor,
BiConsumer<Long, Consumer<Exception>> indexVersionChecker,
ShardId leaderShard, ShardId followerShard, Consumer<Exception> handler) {
ChunkWorker(Client leaderClient, Client followerClient, Queue<long[]> chunks, Executor ccrExecutor,
BiConsumer<Long, Consumer<Exception>> indexVersionChecker, ShardId leaderShard, ShardId followerShard,
Consumer<Exception> handler) {
this.leaderClient = leaderClient;
this.followerClient = followerClient;
this.chunks = chunks;
Expand Down
Loading