2727import org .elasticsearch .snapshots .SnapshotState ;
2828import org .elasticsearch .snapshots .mockstore .MockRepository ;
2929import org .elasticsearch .test .ESIntegTestCase ;
30- import org .elasticsearch .test .junit .annotations .TestLogging ;
3130import org .elasticsearch .xpack .core .LocalStateCompositeXPackPlugin ;
3231import org .elasticsearch .xpack .core .XPackSettings ;
3332import org .elasticsearch .xpack .core .slm .SnapshotLifecyclePolicy ;
4443import java .util .Collection ;
4544import java .util .Collections ;
4645import java .util .HashMap ;
46+ import java .util .List ;
4747import java .util .Map ;
4848import java .util .concurrent .TimeUnit ;
4949import java .util .concurrent .atomic .AtomicReference ;
5858/**
5959 * Tests for Snapshot Lifecycle Management that require a slow or blocked snapshot repo (using {@link MockRepository}
6060 */
61- @ TestLogging (value = "org.elasticsearch.snapshots.mockstore:DEBUG" ,
62- reason = "https://github.com/elastic/elasticsearch/issues/46508" )
6361@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST , numDataNodes = 0 )
6462public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
6563
6664 private static final String REPO = "repo-id" ;
65+ List <String > dataNodeNames = null ;
6766
6867 @ Before
6968 public void ensureClusterNodes () {
7069 logger .info ("--> starting enough nodes to ensure we have enough to safely stop for tests" );
7170 internalCluster ().startMasterOnlyNodes (2 );
72- internalCluster ().startDataOnlyNodes (2 );
71+ dataNodeNames = internalCluster ().startDataOnlyNodes (2 );
7372 ensureGreen ();
7473 }
7574
@@ -163,7 +162,7 @@ public void testRetentionWhileSnapshotInProgress() throws Exception {
163162 final String policyId = "slm-policy" ;
164163 int docCount = 20 ;
165164 for (int i = 0 ; i < docCount ; i ++) {
166- index (indexName , "_doc" , i + "" , Collections .singletonMap ("foo" , "bar" ));
165+ index (indexName , "_doc" , null , Collections .singletonMap ("foo" , "bar" ));
167166 }
168167
169168 initializeRepo (REPO );
@@ -196,15 +195,26 @@ public void testRetentionWhileSnapshotInProgress() throws Exception {
196195 assertTrue ("cluster state was not ready for deletion " + state , SnapshotRetentionTask .okayToDeleteSnapshots (state ));
197196 });
198197
199- // Take another snapshot, but before doing that, block it from completing
200- logger .info ("--> blocking nodes from completing snapshot" );
198+ logger .info ("--> indexing more docs to force new segment files" );
199+ for (int i = 0 ; i < docCount ; i ++) {
200+ index (indexName , "_doc" , null , Collections .singletonMap ("foo" , "bar" ));
201+ }
202+ refresh (indexName );
203+
201204 try {
205+ // Take another snapshot, but before doing that, block it from completing
206+ logger .info ("--> blocking data nodes from completing snapshot" );
202207 blockAllDataNodes (REPO );
203- blockMasterFromFinalizingSnapshotOnIndexFile ( REPO );
208+ logger . info ( "--> blocked data nodes, executing policy" );
204209 final String secondSnapName = executePolicy (policyId );
210+ logger .info ("--> executed policy, got snapname [{}]" , secondSnapName );
211+
205212
206213 // Check that the executed snapshot shows up in the SLM output as in_progress
207214 assertBusy (() -> {
215+ logger .info ("--> Waiting for at least one data node to hit the block" );
216+ assertTrue (dataNodeNames .stream ().anyMatch (node -> checkBlocked (node , REPO )));
217+ logger .info ("--> at least one data node has hit the block" );
208218 GetSnapshotLifecycleAction .Response getResp =
209219 client ().execute (GetSnapshotLifecycleAction .INSTANCE , new GetSnapshotLifecycleAction .Request (policyId )).get ();
210220 logger .info ("--> checking for in progress snapshot..." );
@@ -218,7 +228,7 @@ public void testRetentionWhileSnapshotInProgress() throws Exception {
218228 assertThat (inProgress .getState (), anyOf (equalTo (SnapshotsInProgress .State .INIT ),
219229 equalTo (SnapshotsInProgress .State .STARTED )));
220230 assertNull (inProgress .getFailure ());
221- });
231+ }, 60 , TimeUnit . SECONDS );
222232
223233 // Run retention
224234 logger .info ("--> triggering retention" );
@@ -243,7 +253,7 @@ public void testRetentionWhileSnapshotInProgress() throws Exception {
243253 }
244254 });
245255
246- // Cancel the ongoing snapshot to cancel it
256+ // Cancel the ongoing snapshot (or just delete it if it finished)
247257 assertBusy (() -> {
248258 try {
249259 logger .info ("--> cancelling snapshot {}" , secondSnapName );
@@ -508,4 +518,10 @@ public void waitForBlock(String node, String repository, TimeValue timeout) thro
508518 }
509519 fail ("Timeout waiting for node [" + node + "] to be blocked" );
510520 }
521+
522+ public boolean checkBlocked (String node , String repository ) {
523+ RepositoriesService repositoriesService = internalCluster ().getInstance (RepositoriesService .class , node );
524+ MockRepository mockRepository = (MockRepository ) repositoriesService .repository (repository );
525+ return mockRepository .blocked ();
526+ }
511527}
0 commit comments