1515import java .util .concurrent .atomic .AtomicReference ;
1616import java .util .function .BiConsumer ;
1717
18+ import org .elasticsearch .ElasticsearchStatusException ;
1819import org .elasticsearch .Version ;
1920import org .elasticsearch .action .Action ;
2021import org .elasticsearch .action .ActionListener ;
2728import org .elasticsearch .cluster .ClusterChangedEvent ;
2829import org .elasticsearch .cluster .ClusterName ;
2930import org .elasticsearch .cluster .ClusterState ;
31+ import org .elasticsearch .cluster .block .ClusterBlocks ;
3032import org .elasticsearch .cluster .health .ClusterHealthStatus ;
3133import org .elasticsearch .cluster .metadata .IndexMetaData ;
3234import org .elasticsearch .cluster .metadata .IndexTemplateMetaData ;
4143import org .elasticsearch .cluster .service .ClusterService ;
4244import org .elasticsearch .common .UUIDs ;
4345import org .elasticsearch .common .settings .Settings ;
46+ import org .elasticsearch .common .util .concurrent .EsExecutors ;
4447import org .elasticsearch .common .util .concurrent .ThreadContext ;
4548import org .elasticsearch .common .xcontent .XContentType ;
49+ import org .elasticsearch .gateway .GatewayService ;
4650import org .elasticsearch .index .Index ;
4751import org .elasticsearch .index .shard .ShardId ;
52+ import org .elasticsearch .rest .RestStatus ;
4853import org .elasticsearch .test .ESTestCase ;
4954import org .elasticsearch .threadpool .ThreadPool ;
5055import org .elasticsearch .xpack .security .test .SecurityTestUtils ;
5661import static org .elasticsearch .xpack .security .support .SecurityIndexManager .SECURITY_TEMPLATE_NAME ;
5762import static org .elasticsearch .xpack .security .support .SecurityIndexManager .TEMPLATE_VERSION_PATTERN ;
5863import static org .hamcrest .Matchers .equalTo ;
64+ import static org .hamcrest .Matchers .instanceOf ;
65+ import static org .hamcrest .Matchers .is ;
66+ import static org .hamcrest .Matchers .notNullValue ;
67+ import static org .hamcrest .Matchers .nullValue ;
5968import static org .mockito .Mockito .mock ;
6069import static org .mockito .Mockito .when ;
6170
@@ -73,6 +82,7 @@ public void setUpManager() {
7382 final Client mockClient = mock (Client .class );
7483 final ThreadPool threadPool = mock (ThreadPool .class );
7584 when (threadPool .getThreadContext ()).thenReturn (new ThreadContext (Settings .EMPTY ));
85+ when (threadPool .generic ()).thenReturn (EsExecutors .newDirectExecutorService ());
7686 when (mockClient .threadPool ()).thenReturn (threadPool );
7787 when (mockClient .settings ()).thenReturn (Settings .EMPTY );
7888 final ClusterService clusterService = mock (ClusterService .class );
@@ -196,6 +206,67 @@ public void testIndexHealthChangeListeners() throws Exception {
196206 assertEquals (ClusterHealthStatus .GREEN , currentState .get ().indexStatus );
197207 }
198208
209+ public void testWriteBeforeStateNotRecovered () throws Exception {
210+ final AtomicBoolean prepareRunnableCalled = new AtomicBoolean (false );
211+ final AtomicReference <Exception > prepareException = new AtomicReference <>(null );
212+ manager .prepareIndexIfNeededThenExecute (ex -> {
213+ prepareException .set (ex );
214+ }, () -> {
215+ prepareRunnableCalled .set (true );
216+ });
217+ assertThat (prepareException .get (), is (notNullValue ()));
218+ assertThat (prepareException .get (), instanceOf (ElasticsearchStatusException .class ));
219+ assertThat (((ElasticsearchStatusException )prepareException .get ()).status (), is (RestStatus .SERVICE_UNAVAILABLE ));
220+ assertThat (prepareRunnableCalled .get (), is (false ));
221+ prepareException .set (null );
222+ prepareRunnableCalled .set (false );
223+ // state not recovered
224+ final ClusterBlocks .Builder blocks = ClusterBlocks .builder ().addGlobalBlock (GatewayService .STATE_NOT_RECOVERED_BLOCK );
225+ manager .clusterChanged (event (new ClusterState .Builder (CLUSTER_NAME ).blocks (blocks )));
226+ manager .prepareIndexIfNeededThenExecute (ex -> {
227+ prepareException .set (ex );
228+ }, () -> {
229+ prepareRunnableCalled .set (true );
230+ });
231+ assertThat (prepareException .get (), is (notNullValue ()));
232+ assertThat (prepareException .get (), instanceOf (ElasticsearchStatusException .class ));
233+ assertThat (((ElasticsearchStatusException )prepareException .get ()).status (), is (RestStatus .SERVICE_UNAVAILABLE ));
234+ assertThat (prepareRunnableCalled .get (), is (false ));
235+ prepareException .set (null );
236+ prepareRunnableCalled .set (false );
237+ // state recovered with index
238+ ClusterState .Builder clusterStateBuilder = createClusterState (INDEX_NAME , TEMPLATE_NAME ,
239+ SecurityIndexManager .INTERNAL_INDEX_FORMAT );
240+ markShardsAvailable (clusterStateBuilder );
241+ manager .clusterChanged (event (clusterStateBuilder ));
242+ manager .prepareIndexIfNeededThenExecute (ex -> {
243+ prepareException .set (ex );
244+ }, () -> {
245+ prepareRunnableCalled .set (true );
246+ });
247+ assertThat (prepareException .get (), is (nullValue ()));
248+ assertThat (prepareRunnableCalled .get (), is (true ));
249+ }
250+
251+ public void testListeneredNotCalledBeforeStateNotRecovered () throws Exception {
252+ final AtomicBoolean listenerCalled = new AtomicBoolean (false );
253+ manager .addIndexStateListener ((prev , current ) -> {
254+ listenerCalled .set (true );
255+ });
256+ final ClusterBlocks .Builder blocks = ClusterBlocks .builder ().addGlobalBlock (GatewayService .STATE_NOT_RECOVERED_BLOCK );
257+ // state not recovered
258+ manager .clusterChanged (event (new ClusterState .Builder (CLUSTER_NAME ).blocks (blocks )));
259+ assertThat (manager .isStateRecovered (), is (false ));
260+ assertThat (listenerCalled .get (), is (false ));
261+ // state recovered with index
262+ ClusterState .Builder clusterStateBuilder = createClusterState (INDEX_NAME , TEMPLATE_NAME ,
263+ SecurityIndexManager .INTERNAL_INDEX_FORMAT );
264+ markShardsAvailable (clusterStateBuilder );
265+ manager .clusterChanged (event (clusterStateBuilder ));
266+ assertThat (manager .isStateRecovered (), is (true ));
267+ assertThat (listenerCalled .get (), is (true ));
268+ }
269+
199270 public void testIndexOutOfDateListeners () throws Exception {
200271 final AtomicBoolean listenerCalled = new AtomicBoolean (false );
201272 manager .clusterChanged (event (new ClusterState .Builder (CLUSTER_NAME )));
@@ -240,12 +311,14 @@ private void assertInitialState() {
240311 assertThat (manager .indexExists (), Matchers .equalTo (false ));
241312 assertThat (manager .isAvailable (), Matchers .equalTo (false ));
242313 assertThat (manager .isMappingUpToDate (), Matchers .equalTo (false ));
314+ assertThat (manager .isStateRecovered (), Matchers .equalTo (false ));
243315 }
244316
245317 private void assertIndexUpToDateButNotAvailable () {
246318 assertThat (manager .indexExists (), Matchers .equalTo (true ));
247319 assertThat (manager .isAvailable (), Matchers .equalTo (false ));
248320 assertThat (manager .isMappingUpToDate (), Matchers .equalTo (true ));
321+ assertThat (manager .isStateRecovered (), Matchers .equalTo (true ));
249322 }
250323
251324 public static ClusterState .Builder createClusterState (String indexName , String templateName ) throws IOException {
0 commit comments