Skip to content

Commit 38bd362

Browse files
committed
backout sync id
1 parent 7bdece5 commit 38bd362

File tree

12 files changed

+59
-187
lines changed

12 files changed

+59
-187
lines changed

server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
import org.apache.logging.log4j.LogManager;
2222
import org.apache.logging.log4j.Logger;
23-
import org.elasticsearch.Version;
2423
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
2525
import org.elasticsearch.action.support.ActionFilters;
2626
import org.elasticsearch.action.support.replication.ReplicationOperation;
2727
import org.elasticsearch.action.support.replication.ReplicationRequest;
@@ -85,23 +85,21 @@ protected void acquireReplicaOperationPermit(final IndexShard replica,
8585
}
8686

8787
@Override
88-
protected void shardOperationOnPrimary(final ShardRequest primaryRequest, final IndexShard primary,
88+
protected void shardOperationOnPrimary(final ShardRequest shardRequest, final IndexShard primary,
8989
ActionListener<PrimaryResult<ShardRequest, ReplicationResponse>> listener) {
9090
ActionListener.completeWith(listener, () -> {
91-
final String syncId = executeShardOperation(primaryRequest, primary);
92-
final ShardRequest replicaRequest = new ShardRequest(
93-
primaryRequest.shardId(), primaryRequest.clusterBlock, syncId, primaryRequest.getParentTask());
94-
return new PrimaryResult<>(replicaRequest, new ReplicationResponse());
91+
executeShardOperation(shardRequest, primary);
92+
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
9593
});
9694
}
9795

9896
@Override
99-
protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) throws IOException {
97+
protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) {
10098
executeShardOperation(shardRequest, replica);
10199
return new ReplicaResult();
102100
}
103101

104-
private String executeShardOperation(final ShardRequest request, final IndexShard indexShard) throws IOException {
102+
private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) {
105103
final ShardId shardId = indexShard.shardId();
106104
if (indexShard.getActiveOperationsCount() != IndexShard.OPERATIONS_BLOCKED) {
107105
throw new IllegalStateException("Index shard " + shardId + " is not blocking all operations during closing");
@@ -111,9 +109,9 @@ private String executeShardOperation(final ShardRequest request, final IndexShar
111109
if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) {
112110
throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + request.clusterBlock() + " before closing");
113111
}
114-
final String syncId = indexShard.prepareShardBeforeIndexClosing(request.syncId);
112+
indexShard.verifyShardBeforeIndexClosing();
113+
indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true));
115114
logger.trace("{} shard is ready for closing", shardId);
116-
return syncId;
117115
}
118116

119117
@Override
@@ -137,28 +135,21 @@ public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String all
137135
public static class ShardRequest extends ReplicationRequest<ShardRequest> {
138136

139137
private final ClusterBlock clusterBlock;
140-
private final String syncId;
141138

142139
ShardRequest(StreamInput in) throws IOException {
143140
super(in);
144141
clusterBlock = new ClusterBlock(in);
145-
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
146-
syncId = in.readOptionalString();
147-
} else {
148-
syncId = null;
149-
}
150142
}
151143

152-
public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final String syncId, final TaskId parentTaskId) {
144+
public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final TaskId parentTaskId) {
153145
super(shardId);
154146
this.clusterBlock = Objects.requireNonNull(clusterBlock);
155-
this.syncId = syncId;
156147
setParentTask(parentTaskId);
157148
}
158149

159150
@Override
160151
public String toString() {
161-
return "verify shard " + shardId + " before close with block " + clusterBlock + " sync_id " + syncId;
152+
return "verify shard " + shardId + " before close with block " + clusterBlock;
162153
}
163154

164155
@Override
@@ -170,9 +161,6 @@ public void readFrom(final StreamInput in) {
170161
public void writeTo(final StreamOutput out) throws IOException {
171162
super.writeTo(out);
172163
clusterBlock.writeTo(out);
173-
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
174-
out.writeOptionalString(syncId);
175-
}
176164
}
177165

178166
public ClusterBlock clusterBlock() {

server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -384,9 +384,8 @@ private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shar
384384
return;
385385
}
386386
final TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), request.taskId());
387-
final String syncId = UUIDs.randomBase64UUID();
388387
final TransportVerifyShardBeforeCloseAction.ShardRequest shardRequest =
389-
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, closingBlock, syncId, parentTaskId);
388+
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, closingBlock, parentTaskId);
390389
if (request.ackTimeout() != null) {
391390
shardRequest.timeout(request.ackTimeout());
392391
}

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -265,14 +265,18 @@ protected final DocsStats docsStats(IndexReader indexReader) {
265265
}
266266

267267
/**
268-
* Performs the pre-closing action on the {@link Engine}.
268+
* Performs the pre-closing checks on the {@link Engine}.
269269
*
270-
* @param syncId a syncId that an engine can use to seal its index commit. If there was no indexing activity since the last seal,
271-
* the engine can choose to skip synced-flush and returns the existing syncId instead of the provided syncId.
272-
* @return either the provided syncId or the existing syncId
273270
* @throws IllegalStateException if the sanity checks failed
274271
*/
275-
public abstract String prepareEngineBeforeIndexClosing(String syncId) throws IOException;
272+
public void verifyEngineBeforeIndexClosing() throws IllegalStateException {
273+
final long globalCheckpoint = engineConfig.getGlobalCheckpointSupplier().getAsLong();
274+
final long maxSeqNo = getSeqNoStats(globalCheckpoint).getMaxSeqNo();
275+
if (globalCheckpoint != maxSeqNo) {
276+
throw new IllegalStateException("Global checkpoint [" + globalCheckpoint
277+
+ "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId);
278+
}
279+
}
276280

277281
/**
278282
* A throttling class that can be activated, causing the

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2756,31 +2756,4 @@ private static void trimUnsafeCommits(EngineConfig engineConfig) throws IOExcept
27562756
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUUID);
27572757
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, engineConfig.getIndexSettings().getIndexVersionCreated());
27582758
}
2759-
2760-
protected void verifyEngineBeforeIndexClosing() {
2761-
final long globalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
2762-
final long maxSeqNo = localCheckpointTracker.getMaxSeqNo();
2763-
if (globalCheckpoint != maxSeqNo) {
2764-
throw new IllegalStateException("Global checkpoint [" + globalCheckpoint
2765-
+ "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId);
2766-
}
2767-
}
2768-
2769-
@Override
2770-
public String prepareEngineBeforeIndexClosing(String syncId) throws IOException {
2771-
try (ReleasableLock ignored = writeLock.acquire()) {
2772-
ensureOpen();
2773-
syncTranslog(); // make sure that we persist the global checkpoint to translog checkpoint
2774-
verifyEngineBeforeIndexClosing();
2775-
// we can reuse the existing syncId if there was no indexing activity since the last synced-flush.
2776-
if (indexWriter.hasUncommittedChanges() == false && lastCommittedSegmentInfos.userData.containsKey(Engine.SYNC_COMMIT_ID)) {
2777-
syncId = lastCommittedSegmentInfos.userData.get(Engine.SYNC_COMMIT_ID);
2778-
}
2779-
final CommitId commitId = flush(true, true);
2780-
if (syncId != null) {
2781-
syncFlush(syncId, commitId);
2782-
}
2783-
return syncId;
2784-
}
2785-
}
27862759
}

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,18 +146,17 @@ protected boolean assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, fi
146146
}
147147

148148
@Override
149-
public String prepareEngineBeforeIndexClosing(String syncId) {
149+
public void verifyEngineBeforeIndexClosing() throws IllegalStateException {
150150
// the value of the global checkpoint is verified when the read-only engine is opened,
151151
// and it is not expected to change during the lifecycle of the engine. We could also
152152
// check this value before closing the read-only engine but if something went wrong
153153
// and the global checkpoint is not in-sync with the max. sequence number anymore,
154154
// checking the value here again would prevent the read-only engine to be closed and
155155
// reopened as an internal engine, which would be the path to fix the issue.
156-
return lastCommittedSegmentInfos.userData.get(Engine.SYNC_COMMIT_ID);
157156
}
158157

159158
protected final DirectoryReader wrapReader(DirectoryReader reader,
160-
Function<DirectoryReader, DirectoryReader> readerWrapperFunction) throws IOException {
159+
Function<DirectoryReader, DirectoryReader> readerWrapperFunction) throws IOException {
161160
reader = ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId());
162161
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
163162
reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3179,14 +3179,12 @@ public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
31793179
}
31803180

31813181
/**
3182-
* Performs the pre-closing action on the {@link IndexShard}.
3182+
* Performs the pre-closing checks on the {@link IndexShard}.
31833183
*
31843184
* @throws IllegalStateException if the sanity checks failed
31853185
*/
3186-
public String prepareShardBeforeIndexClosing(String syncId) throws IOException {
3187-
// don't issue synced-flush for recovering shards
3188-
final boolean canSyncedFlush = state == IndexShardState.STARTED || state == IndexShardState.POST_RECOVERY;
3189-
return getEngine().prepareEngineBeforeIndexClosing(canSyncedFlush ? syncId : null);
3186+
public void verifyShardBeforeIndexClosing() throws IllegalStateException {
3187+
getEngine().verifyEngineBeforeIndexClosing();
31903188
}
31913189

31923190
RetentionLeaseSyncer getRetentionLeaseSyncer() {

server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1346,14 +1346,9 @@ private void verifyNodeDecisions(XContentParser parser, Map<String, AllocationDe
13461346
assertTrue("store info should not be present", reuseStore);
13471347
assertEquals(Token.START_OBJECT, parser.nextToken());
13481348
parser.nextToken();
1349-
if ("matching_sync_id".equals(parser.currentName())) {
1350-
parser.nextToken();
1351-
assertTrue(parser.booleanValue());
1352-
} else {
1353-
assertEquals("matching_size_in_bytes", parser.currentName());
1354-
parser.nextToken();
1355-
assertThat(parser.longValue(), greaterThan(0L));
1356-
}
1349+
assertEquals("matching_size_in_bytes", parser.currentName());
1350+
parser.nextToken();
1351+
assertThat(parser.longValue(), greaterThan(0L));
13571352
assertEquals(Token.END_OBJECT, parser.nextToken());
13581353
parser.nextToken();
13591354
}

server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.lucene.util.SetOnce;
2222
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
2324
import org.elasticsearch.action.support.ActionFilters;
2425
import org.elasticsearch.action.support.PlainActionFuture;
2526
import org.elasticsearch.action.support.replication.ReplicationOperation;
@@ -38,8 +39,8 @@
3839
import org.elasticsearch.cluster.routing.ShardRouting;
3940
import org.elasticsearch.cluster.routing.ShardRoutingState;
4041
import org.elasticsearch.cluster.service.ClusterService;
41-
import org.elasticsearch.common.UUIDs;
4242
import org.elasticsearch.common.settings.Settings;
43+
import org.elasticsearch.index.engine.Engine;
4344
import org.elasticsearch.index.shard.IndexShard;
4445
import org.elasticsearch.index.shard.ReplicationGroup;
4546
import org.elasticsearch.index.shard.ShardId;
@@ -55,6 +56,7 @@
5556
import org.junit.AfterClass;
5657
import org.junit.Before;
5758
import org.junit.BeforeClass;
59+
import org.mockito.ArgumentCaptor;
5860

5961
import java.util.Collections;
6062
import java.util.List;
@@ -69,7 +71,8 @@
6971
import static org.hamcrest.Matchers.equalTo;
7072
import static org.hamcrest.Matchers.greaterThan;
7173
import static org.hamcrest.Matchers.instanceOf;
72-
import static org.mockito.Matchers.anyString;
74+
import static org.hamcrest.Matchers.is;
75+
import static org.mockito.Matchers.any;
7376
import static org.mockito.Mockito.doThrow;
7477
import static org.mockito.Mockito.mock;
7578
import static org.mockito.Mockito.times;
@@ -135,7 +138,7 @@ public static void afterClass() {
135138
private void executeOnPrimaryOrReplica() throws Throwable {
136139
final TaskId taskId = new TaskId("_node_id", randomNonNegativeLong());
137140
final TransportVerifyShardBeforeCloseAction.ShardRequest request =
138-
new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId(), clusterBlock, UUIDs.randomBase64UUID(), taskId);
141+
new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId(), clusterBlock, taskId);
139142
final PlainActionFuture<Void> res = PlainActionFuture.newFuture();
140143
action.shardOperationOnPrimary(request, indexShard, ActionListener.wrap(
141144
r -> {
@@ -153,12 +156,22 @@ private void executeOnPrimaryOrReplica() throws Throwable {
153156
}
154157
}
155158

159+
public void testShardIsFlushed() throws Throwable {
160+
final ArgumentCaptor<FlushRequest> flushRequest = ArgumentCaptor.forClass(FlushRequest.class);
161+
when(indexShard.flush(flushRequest.capture())).thenReturn(new Engine.CommitId(new byte[0]));
162+
163+
executeOnPrimaryOrReplica();
164+
verify(indexShard, times(1)).flush(any(FlushRequest.class));
165+
assertThat(flushRequest.getValue().force(), is(true));
166+
}
167+
156168
public void testOperationFailsWhenNotBlocked() {
157169
when(indexShard.getActiveOperationsCount()).thenReturn(randomIntBetween(0, 10));
158170

159171
IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica);
160172
assertThat(exception.getMessage(),
161173
equalTo("Index shard " + indexShard.shardId() + " is not blocking all operations during closing"));
174+
verify(indexShard, times(0)).flush(any(FlushRequest.class));
162175
}
163176

164177
public void testOperationFailsWithNoBlock() {
@@ -167,17 +180,20 @@ public void testOperationFailsWithNoBlock() {
167180
IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica);
168181
assertThat(exception.getMessage(),
169182
equalTo("Index shard " + indexShard.shardId() + " must be blocked by " + clusterBlock + " before closing"));
183+
verify(indexShard, times(0)).flush(any(FlushRequest.class));
170184
}
171185

172186
public void testVerifyShardBeforeIndexClosing() throws Throwable {
173187
executeOnPrimaryOrReplica();
174-
verify(indexShard, times(1)).prepareShardBeforeIndexClosing(anyString());
188+
verify(indexShard, times(1)).verifyShardBeforeIndexClosing();
189+
verify(indexShard, times(1)).flush(any(FlushRequest.class));
175190
}
176191

177-
public void testVerifyShardBeforeIndexClosingFailed() throws Exception {
178-
doThrow(new IllegalStateException("test")).when(indexShard).prepareShardBeforeIndexClosing(anyString());
192+
public void testVerifyShardBeforeIndexClosingFailed() {
193+
doThrow(new IllegalStateException("test")).when(indexShard).verifyShardBeforeIndexClosing();
179194
expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica);
180-
verify(indexShard, times(1)).prepareShardBeforeIndexClosing(anyString());
195+
verify(indexShard, times(1)).verifyShardBeforeIndexClosing();
196+
verify(indexShard, times(0)).flush(any(FlushRequest.class));
181197
}
182198

183199
public void testUnavailableShardsMarkedAsStale() throws Exception {
@@ -211,7 +227,7 @@ public void testUnavailableShardsMarkedAsStale() throws Exception {
211227
final PlainActionFuture<PrimaryResult> listener = new PlainActionFuture<>();
212228
TaskId taskId = new TaskId(clusterService.localNode().getId(), 0L);
213229
TransportVerifyShardBeforeCloseAction.ShardRequest request =
214-
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, clusterBlock, UUIDs.randomBase64UUID(), taskId);
230+
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, clusterBlock, taskId);
215231
ReplicationOperation.Replicas<TransportVerifyShardBeforeCloseAction.ShardRequest> proxy = action.newReplicasProxy();
216232
ReplicationOperation<TransportVerifyShardBeforeCloseAction.ShardRequest,
217233
TransportVerifyShardBeforeCloseAction.ShardRequest, PrimaryResult> operation = new ReplicationOperation<>(

server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.lucene.util.LuceneTestCase;
2222
import org.elasticsearch.Version;
23-
import org.elasticsearch.common.UUIDs;
2423
import org.elasticsearch.common.bytes.BytesArray;
2524
import org.elasticsearch.core.internal.io.IOUtils;
2625
import org.elasticsearch.index.mapper.ParsedDocument;
@@ -184,10 +183,10 @@ public void testReadOnly() throws IOException {
184183
}
185184

186185
/**
187-
* Test that {@link ReadOnlyEngine#prepareEngineBeforeIndexClosing(String)} never fails
186+
* Test that {@link ReadOnlyEngine#verifyEngineBeforeIndexClosing()} never fails
188187
* whatever the value of the global checkpoint to check is.
189188
*/
190-
public void testPrepareShardBeforeIndexClosingIsNoOp() throws IOException {
189+
public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException {
191190
IOUtils.close(engine, store);
192191
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
193192
try (Store store = createStore()) {
@@ -196,7 +195,7 @@ public void testPrepareShardBeforeIndexClosingIsNoOp() throws IOException {
196195
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) {
197196
globalCheckpoint.set(randomNonNegativeLong());
198197
try {
199-
readOnlyEngine.prepareEngineBeforeIndexClosing(UUIDs.randomBase64UUID());
198+
readOnlyEngine.verifyEngineBeforeIndexClosing();
200199
} catch (final IllegalStateException e) {
201200
fail("Read-only engine pre-closing verifications failed");
202201
}

0 commit comments

Comments
 (0)