1515import java .util .concurrent .atomic .AtomicReference ;
1616import java .util .function .BiConsumer ;
1717
18+ import org .elasticsearch .ElasticsearchStatusException ;
1819import org .elasticsearch .Version ;
1920import org .elasticsearch .action .Action ;
2021import org .elasticsearch .action .ActionListener ;
2627import org .elasticsearch .cluster .ClusterChangedEvent ;
2728import org .elasticsearch .cluster .ClusterName ;
2829import org .elasticsearch .cluster .ClusterState ;
30+ import org .elasticsearch .cluster .block .ClusterBlocks ;
2931import org .elasticsearch .cluster .health .ClusterHealthStatus ;
3032import org .elasticsearch .cluster .metadata .IndexMetaData ;
3133import org .elasticsearch .cluster .metadata .IndexTemplateMetaData ;
4042import org .elasticsearch .cluster .service .ClusterService ;
4143import org .elasticsearch .common .UUIDs ;
4244import org .elasticsearch .common .settings .Settings ;
45+ import org .elasticsearch .common .util .concurrent .EsExecutors ;
4346import org .elasticsearch .common .util .concurrent .ThreadContext ;
4447import org .elasticsearch .common .xcontent .XContentType ;
48+ import org .elasticsearch .gateway .GatewayService ;
4549import org .elasticsearch .index .Index ;
4650import org .elasticsearch .index .shard .ShardId ;
51+ import org .elasticsearch .rest .RestStatus ;
4752import org .elasticsearch .test .ESTestCase ;
4853import org .elasticsearch .threadpool .ThreadPool ;
4954import org .elasticsearch .xpack .security .test .SecurityTestUtils ;
5560import static org .elasticsearch .xpack .security .support .SecurityIndexManager .SECURITY_TEMPLATE_NAME ;
5661import static org .elasticsearch .xpack .security .support .SecurityIndexManager .TEMPLATE_VERSION_PATTERN ;
5762import static org .hamcrest .Matchers .equalTo ;
63+ import static org .hamcrest .Matchers .instanceOf ;
64+ import static org .hamcrest .Matchers .is ;
65+ import static org .hamcrest .Matchers .notNullValue ;
66+ import static org .hamcrest .Matchers .nullValue ;
5867import static org .mockito .Mockito .mock ;
5968import static org .mockito .Mockito .when ;
6069
@@ -72,6 +81,7 @@ public void setUpManager() {
7281 final Client mockClient = mock (Client .class );
7382 final ThreadPool threadPool = mock (ThreadPool .class );
7483 when (threadPool .getThreadContext ()).thenReturn (new ThreadContext (Settings .EMPTY ));
84+ when (threadPool .generic ()).thenReturn (EsExecutors .newDirectExecutorService ());
7585 when (mockClient .threadPool ()).thenReturn (threadPool );
7686 when (mockClient .settings ()).thenReturn (Settings .EMPTY );
7787 final ClusterService clusterService = mock (ClusterService .class );
@@ -192,6 +202,67 @@ public void testIndexHealthChangeListeners() throws Exception {
192202 assertEquals (ClusterHealthStatus .GREEN , currentState .get ().indexStatus );
193203 }
194204
205+ public void testWriteBeforeStateNotRecovered () throws Exception {
206+ final AtomicBoolean prepareRunnableCalled = new AtomicBoolean (false );
207+ final AtomicReference <Exception > prepareException = new AtomicReference <>(null );
208+ manager .prepareIndexIfNeededThenExecute (ex -> {
209+ prepareException .set (ex );
210+ }, () -> {
211+ prepareRunnableCalled .set (true );
212+ });
213+ assertThat (prepareException .get (), is (notNullValue ()));
214+ assertThat (prepareException .get (), instanceOf (ElasticsearchStatusException .class ));
215+ assertThat (((ElasticsearchStatusException )prepareException .get ()).status (), is (RestStatus .SERVICE_UNAVAILABLE ));
216+ assertThat (prepareRunnableCalled .get (), is (false ));
217+ prepareException .set (null );
218+ prepareRunnableCalled .set (false );
219+ // state not recovered
220+ final ClusterBlocks .Builder blocks = ClusterBlocks .builder ().addGlobalBlock (GatewayService .STATE_NOT_RECOVERED_BLOCK );
221+ manager .clusterChanged (event (new ClusterState .Builder (CLUSTER_NAME ).blocks (blocks )));
222+ manager .prepareIndexIfNeededThenExecute (ex -> {
223+ prepareException .set (ex );
224+ }, () -> {
225+ prepareRunnableCalled .set (true );
226+ });
227+ assertThat (prepareException .get (), is (notNullValue ()));
228+ assertThat (prepareException .get (), instanceOf (ElasticsearchStatusException .class ));
229+ assertThat (((ElasticsearchStatusException )prepareException .get ()).status (), is (RestStatus .SERVICE_UNAVAILABLE ));
230+ assertThat (prepareRunnableCalled .get (), is (false ));
231+ prepareException .set (null );
232+ prepareRunnableCalled .set (false );
233+ // state recovered with index
234+ ClusterState .Builder clusterStateBuilder = createClusterState (INDEX_NAME , TEMPLATE_NAME ,
235+ SecurityIndexManager .INTERNAL_INDEX_FORMAT );
236+ markShardsAvailable (clusterStateBuilder );
237+ manager .clusterChanged (event (clusterStateBuilder ));
238+ manager .prepareIndexIfNeededThenExecute (ex -> {
239+ prepareException .set (ex );
240+ }, () -> {
241+ prepareRunnableCalled .set (true );
242+ });
243+ assertThat (prepareException .get (), is (nullValue ()));
244+ assertThat (prepareRunnableCalled .get (), is (true ));
245+ }
246+
247+ public void testListeneredNotCalledBeforeStateNotRecovered () throws Exception {
248+ final AtomicBoolean listenerCalled = new AtomicBoolean (false );
249+ manager .addIndexStateListener ((prev , current ) -> {
250+ listenerCalled .set (true );
251+ });
252+ final ClusterBlocks .Builder blocks = ClusterBlocks .builder ().addGlobalBlock (GatewayService .STATE_NOT_RECOVERED_BLOCK );
253+ // state not recovered
254+ manager .clusterChanged (event (new ClusterState .Builder (CLUSTER_NAME ).blocks (blocks )));
255+ assertThat (manager .isStateRecovered (), is (false ));
256+ assertThat (listenerCalled .get (), is (false ));
257+ // state recovered with index
258+ ClusterState .Builder clusterStateBuilder = createClusterState (INDEX_NAME , TEMPLATE_NAME ,
259+ SecurityIndexManager .INTERNAL_INDEX_FORMAT );
260+ markShardsAvailable (clusterStateBuilder );
261+ manager .clusterChanged (event (clusterStateBuilder ));
262+ assertThat (manager .isStateRecovered (), is (true ));
263+ assertThat (listenerCalled .get (), is (true ));
264+ }
265+
195266 public void testIndexOutOfDateListeners () throws Exception {
196267 final AtomicBoolean listenerCalled = new AtomicBoolean (false );
197268 manager .clusterChanged (event (new ClusterState .Builder (CLUSTER_NAME )));
@@ -236,12 +307,14 @@ private void assertInitialState() {
236307 assertThat (manager .indexExists (), Matchers .equalTo (false ));
237308 assertThat (manager .isAvailable (), Matchers .equalTo (false ));
238309 assertThat (manager .isMappingUpToDate (), Matchers .equalTo (false ));
310+ assertThat (manager .isStateRecovered (), Matchers .equalTo (false ));
239311 }
240312
241313 private void assertIndexUpToDateButNotAvailable () {
242314 assertThat (manager .indexExists (), Matchers .equalTo (true ));
243315 assertThat (manager .isAvailable (), Matchers .equalTo (false ));
244316 assertThat (manager .isMappingUpToDate (), Matchers .equalTo (true ));
317+ assertThat (manager .isStateRecovered (), Matchers .equalTo (true ));
245318 }
246319
247320 public static ClusterState .Builder createClusterState (String indexName , String templateName ) throws IOException {
0 commit comments