Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand Down Expand Up @@ -360,7 +359,7 @@ public QueryCachingPolicy getQueryCachingPolicy() {
@Override
public void updateShardState(final ShardRouting newRouting,
final long newPrimaryTerm,
final CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer,
final BiConsumer<IndexShard, ActionListener<ResyncTask>> primaryReplicaSyncer,
final long applyingClusterStateVersion,
final Set<String> inSyncAllocationIds,
final IndexShardRoutingTable routingTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,30 @@ void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests
this.chunkSize = chunkSize;
}

public void resync(IndexShard indexShard, ActionListener<ResyncTask> listener) throws IOException {
try (Translog.View view = indexShard.acquireTranslogView()) {
public void resync(IndexShard indexShard, ActionListener<ResyncTask> listener) {
final Translog.View view = indexShard.acquireTranslogView();
ActionListener<ResyncTask> wrappedListener = new ActionListener<ResyncTask>() {
@Override
public void onResponse(ResyncTask resyncTask) {
try {
view.close();
} catch (IOException e) {
onFailure(e);
}
listener.onResponse(resyncTask);
}

@Override
public void onFailure(Exception e) {
try {
view.close();
} catch (IOException inner) {
e.addSuppressed(inner);
}
listener.onFailure(e);
}
};
try {
final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1;
Translog.Snapshot snapshot = view.snapshot(startingSeqNo);
ShardId shardId = indexShard.shardId();
Expand All @@ -96,16 +118,20 @@ public synchronized int totalOperations() {

@Override
public synchronized Translog.Operation next() throws IOException {
if (indexShard.state() != IndexShardState.STARTED) {
assert indexShard.state() != IndexShardState.RELOCATED : "resync should never happen on a relocated shard";
throw new IndexShardNotStartedException(shardId, indexShard.state());
IndexShardState state = indexShard.state();
if (state == IndexShardState.CLOSED) {
throw new IndexShardClosedException(shardId);
} else {
assert state == IndexShardState.STARTED : "resync should only happen on a started shard, but state was: " + state;
}
return snapshot.next();
}
};

resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPrimaryTerm(), wrappedSnapshot,
startingSeqNo, listener);
startingSeqNo, wrappedListener);
} catch (Exception e) {
wrappedListener.onFailure(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -87,6 +86,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -746,7 +746,7 @@ public interface Shard {
*/
void updateShardState(ShardRouting shardRouting,
long primaryTerm,
CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer,
BiConsumer<IndexShard, ActionListener<ResyncTask>> primaryReplicaSyncer,
long applyingClusterStateVersion,
Set<String> inSyncAllocationIds,
IndexShardRoutingTable routingTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.index.shard;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.resync.ResyncReplicationResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
Expand All @@ -33,6 +34,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -90,6 +92,49 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception {
closeShards(shard);
}

public void testSyncerOnClosingShard() throws Exception {
IndexShard shard = newStartedShard(true);
AtomicBoolean syncActionCalled = new AtomicBoolean();
CountDownLatch syncCalledLatch = new CountDownLatch(1);
PrimaryReplicaSyncer.SyncAction syncAction =
(request, parentTask, allocationId, primaryTerm, listener) -> {
logger.info("Sending off {} operations", request.getOperations().size());
syncActionCalled.set(true);
syncCalledLatch.countDown();
threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse()));
};
PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(Settings.EMPTY, new TaskManager(Settings.EMPTY), syncAction);
syncer.setChunkSize(new ByteSizeValue(1)); // every document is sent off separately

int numDocs = 10;
for (int i = 0; i < numDocs; i++) {
indexDoc(shard, "test", Integer.toString(i));
}

String allocationId = shard.routingEntry().allocationId().getId();
shard.updateShardState(shard.routingEntry(), shard.getPrimaryTerm(), null, 1000L, Collections.singleton(allocationId),
new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build(), Collections.emptySet());

PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>();
threadPool.generic().execute(() -> {
try {
syncer.resync(shard, fut);
} catch (AlreadyClosedException ace) {
fut.onFailure(ace);
}
});
if (randomBoolean()) {
syncCalledLatch.await();
}
closeShards(shard);
try {
fut.actionGet();
assertTrue("Sync action was not called", syncActionCalled.get());
} catch (AlreadyClosedException | IndexShardClosedException ignored) {
// ignore
}
}

public void testStatusSerialization() throws IOException {
PrimaryReplicaSyncer.ResyncTask.Status status = new PrimaryReplicaSyncer.ResyncTask.Status(randomAlphaOfLength(10),
randomIntBetween(0, 1000), randomIntBetween(0, 1000), randomIntBetween(0, 1000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -54,8 +53,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
Expand Down Expand Up @@ -345,7 +344,7 @@ public RecoveryState recoveryState() {
@Override
public void updateShardState(ShardRouting shardRouting,
long newPrimaryTerm,
CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer,
BiConsumer<IndexShard, ActionListener<ResyncTask>> primaryReplicaSyncer,
long applyingClusterStateVersion,
Set<String> inSyncAllocationIds,
IndexShardRoutingTable routingTable,
Expand Down