Skip to content

Commit 3ca885e

Browse files
authored
[Close Index API] Add TransportShardCloseAction for pre-closing verifications (#36249)
This pull request adds the TransportShardCloseAction which is a transport replication action that acquires all index shard permits for its execution. This action will be used in the future by the MetaDataIndexStateService in a new index closing process, where we need to execute some sanity checks before closing an index. The action executes the following verifications on the primary and replicas: * there is no other on going operation active on the shard * the data node holding the shard knows that the index is blocked for writes * the shard's max sequence number is equal to the global checkpoint When the verifications are done and successful, the shard is flushed. Relates #33888
1 parent 97259f0 commit 3ca885e

File tree

2 files changed

+248
-0
lines changed

2 files changed

+248
-0
lines changed
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.action.admin.indices.close;
20+
21+
import org.elasticsearch.action.ActionListener;
22+
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
23+
import org.elasticsearch.action.support.ActionFilters;
24+
import org.elasticsearch.action.support.replication.ReplicationRequest;
25+
import org.elasticsearch.action.support.replication.ReplicationResponse;
26+
import org.elasticsearch.action.support.replication.TransportReplicationAction;
27+
import org.elasticsearch.cluster.action.shard.ShardStateAction;
28+
import org.elasticsearch.cluster.block.ClusterBlock;
29+
import org.elasticsearch.cluster.block.ClusterBlocks;
30+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
31+
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
32+
import org.elasticsearch.cluster.service.ClusterService;
33+
import org.elasticsearch.common.inject.Inject;
34+
import org.elasticsearch.common.lease.Releasable;
35+
import org.elasticsearch.common.settings.Settings;
36+
import org.elasticsearch.index.shard.IndexShard;
37+
import org.elasticsearch.index.shard.ShardId;
38+
import org.elasticsearch.indices.IndicesService;
39+
import org.elasticsearch.threadpool.ThreadPool;
40+
import org.elasticsearch.transport.TransportService;
41+
42+
public class TransportVerifyShardBeforeCloseAction extends TransportReplicationAction<
43+
TransportVerifyShardBeforeCloseAction.ShardCloseRequest, TransportVerifyShardBeforeCloseAction.ShardCloseRequest, ReplicationResponse> {
44+
45+
public static final String NAME = CloseIndexAction.NAME + "[s]";
46+
private static final ClusterBlock EXPECTED_BLOCK = MetaDataIndexStateService.INDEX_CLOSED_BLOCK;
47+
48+
@Inject
49+
public TransportVerifyShardBeforeCloseAction(final Settings settings, final TransportService transportService,
50+
final ClusterService clusterService, final IndicesService indicesService,
51+
final ThreadPool threadPool, final ShardStateAction stateAction,
52+
final ActionFilters actionFilters, final IndexNameExpressionResolver resolver) {
53+
super(settings, NAME, transportService, clusterService, indicesService, threadPool, stateAction, actionFilters, resolver,
54+
ShardCloseRequest::new, ShardCloseRequest::new, ThreadPool.Names.MANAGEMENT);
55+
}
56+
57+
@Override
58+
protected ReplicationResponse newResponseInstance() {
59+
return new ReplicationResponse();
60+
}
61+
62+
@Override
63+
protected void acquirePrimaryOperationPermit(final IndexShard primary,
64+
final ShardCloseRequest request,
65+
final ActionListener<Releasable> onAcquired) {
66+
primary.acquireAllPrimaryOperationsPermits(onAcquired, request.timeout());
67+
}
68+
69+
@Override
70+
protected void acquireReplicaOperationPermit(final IndexShard replica,
71+
final ShardCloseRequest request,
72+
final ActionListener<Releasable> onAcquired,
73+
final long primaryTerm,
74+
final long globalCheckpoint,
75+
final long maxSeqNoOfUpdateOrDeletes) {
76+
replica.acquireAllReplicaOperationsPermits(primaryTerm, globalCheckpoint, maxSeqNoOfUpdateOrDeletes, onAcquired, request.timeout());
77+
}
78+
79+
@Override
80+
protected PrimaryResult<ShardCloseRequest, ReplicationResponse> shardOperationOnPrimary(final ShardCloseRequest shardRequest,
81+
final IndexShard primary) throws Exception {
82+
executeShardOperation(primary);
83+
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
84+
}
85+
86+
@Override
87+
protected ReplicaResult shardOperationOnReplica(final ShardCloseRequest shardRequest, final IndexShard replica) throws Exception {
88+
executeShardOperation(replica);
89+
return new ReplicaResult();
90+
}
91+
92+
private void executeShardOperation(final IndexShard indexShard) {
93+
final ShardId shardId = indexShard.shardId();
94+
if (indexShard.getActiveOperationsCount() != 0) {
95+
throw new IllegalStateException("On-going operations in progress while checking index shard " + shardId + " before closing");
96+
}
97+
98+
final ClusterBlocks clusterBlocks = clusterService.state().blocks();
99+
if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), EXPECTED_BLOCK) == false) {
100+
throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + EXPECTED_BLOCK + " before closing");
101+
}
102+
103+
final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
104+
if (indexShard.getGlobalCheckpoint() != maxSeqNo) {
105+
throw new IllegalStateException("Global checkpoint [" + indexShard.getGlobalCheckpoint()
106+
+ "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId);
107+
}
108+
indexShard.flush(new FlushRequest());
109+
logger.debug("{} shard is ready for closing", shardId);
110+
}
111+
112+
public static class ShardCloseRequest extends ReplicationRequest<ShardCloseRequest> {
113+
114+
ShardCloseRequest(){
115+
}
116+
117+
public ShardCloseRequest(final ShardId shardId) {
118+
super(shardId);
119+
}
120+
121+
@Override
122+
public String toString() {
123+
return "close shard {" + shardId + "}";
124+
}
125+
}
126+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.action.admin.indices.close;
20+
21+
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
22+
import org.elasticsearch.action.support.ActionFilters;
23+
import org.elasticsearch.cluster.ClusterName;
24+
import org.elasticsearch.cluster.ClusterState;
25+
import org.elasticsearch.cluster.action.shard.ShardStateAction;
26+
import org.elasticsearch.cluster.block.ClusterBlocks;
27+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
28+
import org.elasticsearch.cluster.service.ClusterService;
29+
import org.elasticsearch.common.settings.Settings;
30+
import org.elasticsearch.index.seqno.SeqNoStats;
31+
import org.elasticsearch.index.seqno.SequenceNumbers;
32+
import org.elasticsearch.index.shard.IndexShard;
33+
import org.elasticsearch.index.shard.ShardId;
34+
import org.elasticsearch.indices.IndicesService;
35+
import org.elasticsearch.test.ESTestCase;
36+
import org.elasticsearch.threadpool.ThreadPool;
37+
import org.elasticsearch.transport.TransportService;
38+
import org.junit.Before;
39+
40+
import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK;
41+
import static org.hamcrest.Matchers.equalTo;
42+
import static org.mockito.Matchers.any;
43+
import static org.mockito.Mockito.mock;
44+
import static org.mockito.Mockito.times;
45+
import static org.mockito.Mockito.verify;
46+
import static org.mockito.Mockito.when;
47+
48+
public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
49+
50+
private IndexShard indexShard;
51+
private TransportVerifyShardBeforeCloseAction action;
52+
private ClusterService clusterService;
53+
54+
@Override
55+
@Before
56+
public void setUp() throws Exception {
57+
super.setUp();
58+
59+
indexShard = mock(IndexShard.class);
60+
when(indexShard.getActiveOperationsCount()).thenReturn(0);
61+
when(indexShard.getGlobalCheckpoint()).thenReturn(0L);
62+
when(indexShard.seqNoStats()).thenReturn(new SeqNoStats(0L, 0L, 0L));
63+
64+
final ShardId shardId = new ShardId("index", "_na_", randomIntBetween(0, 3));
65+
when(indexShard.shardId()).thenReturn(shardId);
66+
67+
clusterService = mock(ClusterService.class);
68+
when(clusterService.state()).thenReturn(new ClusterState.Builder(new ClusterName("test"))
69+
.blocks(ClusterBlocks.builder().addIndexBlock("index", INDEX_CLOSED_BLOCK).build()).build());
70+
71+
action = new TransportVerifyShardBeforeCloseAction(Settings.EMPTY, mock(TransportService.class), clusterService,
72+
mock(IndicesService.class), mock(ThreadPool.class), mock(ShardStateAction.class), mock(ActionFilters.class),
73+
mock(IndexNameExpressionResolver.class));
74+
}
75+
76+
private void executeOnPrimaryOrReplica() throws Exception {
77+
final TransportVerifyShardBeforeCloseAction.ShardCloseRequest request =
78+
new TransportVerifyShardBeforeCloseAction.ShardCloseRequest(indexShard.shardId());
79+
if (randomBoolean()) {
80+
assertNotNull(action.shardOperationOnPrimary(request, indexShard));
81+
} else {
82+
assertNotNull(action.shardOperationOnPrimary(request, indexShard));
83+
}
84+
}
85+
86+
public void testOperationSuccessful() throws Exception {
87+
executeOnPrimaryOrReplica();
88+
verify(indexShard, times(1)).flush(any(FlushRequest.class));
89+
}
90+
91+
public void testOperationFailsWithOnGoingOps() {
92+
when(indexShard.getActiveOperationsCount()).thenReturn(randomIntBetween(1, 10));
93+
94+
IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica);
95+
assertThat(exception.getMessage(),
96+
equalTo("On-going operations in progress while checking index shard " + indexShard.shardId() + " before closing"));
97+
verify(indexShard, times(0)).flush(any(FlushRequest.class));
98+
}
99+
100+
public void testOperationFailsWithNoBlock() {
101+
when(clusterService.state()).thenReturn(new ClusterState.Builder(new ClusterName("test")).build());
102+
103+
IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica);
104+
assertThat(exception.getMessage(),
105+
equalTo("Index shard " + indexShard.shardId() + " must be blocked by " + INDEX_CLOSED_BLOCK + " before closing"));
106+
verify(indexShard, times(0)).flush(any(FlushRequest.class));
107+
}
108+
109+
public void testOperationFailsWithGlobalCheckpointNotCaughtUp() {
110+
final long maxSeqNo = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, Long.MAX_VALUE);
111+
final long localCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, maxSeqNo);
112+
final long globalCheckpoint = randomValueOtherThan(maxSeqNo,
113+
() -> randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, localCheckpoint));
114+
when(indexShard.seqNoStats()).thenReturn(new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint));
115+
when(indexShard.getGlobalCheckpoint()).thenReturn(globalCheckpoint);
116+
117+
IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica);
118+
assertThat(exception.getMessage(), equalTo("Global checkpoint [" + globalCheckpoint + "] mismatches maximum sequence number ["
119+
+ maxSeqNo + "] on index shard " + indexShard.shardId()));
120+
verify(indexShard, times(0)).flush(any(FlushRequest.class));
121+
}
122+
}

0 commit comments

Comments
 (0)