4040import org .elasticsearch .transport .TransportService ;
4141import org .elasticsearch .xpack .CcrIntegTestCase ;
4242import org .elasticsearch .xpack .ccr .action .repositories .GetCcrRestoreFileChunkAction ;
43+ import org .elasticsearch .xpack .ccr .action .repositories .PutCcrRestoreSessionAction ;
4344import org .elasticsearch .xpack .ccr .repository .CcrRepository ;
4445import org .elasticsearch .xpack .ccr .repository .CcrRestoreSourceService ;
4546
4849import java .util .List ;
4950import java .util .Locale ;
5051import java .util .Map ;
52+ import java .util .concurrent .CountDownLatch ;
5153import java .util .concurrent .TimeUnit ;
5254import java .util .concurrent .atomic .AtomicBoolean ;
5355
5961import static org .hamcrest .Matchers .instanceOf ;
6062import static org .hamcrest .Matchers .lessThan ;
6163
62- // TODO: Fold this integration test into a more expansive integration test as more bootstrap from remote work
63- // TODO: is completed.
6464public class CcrRepositoryIT extends CcrIntegTestCase {
6565
6666 private final IndicesOptions indicesOptions = IndicesOptions .strictSingleIndexNoExpandForbidClosed ();
@@ -197,36 +197,6 @@ public void testDocsAreRecovered() throws Exception {
197197
198198 leaderClient ().admin ().indices ().prepareFlush (leaderIndex ).setForce (true ).setWaitIfOngoing (true ).get ();
199199
200- AtomicBoolean isRunning = new AtomicBoolean (true );
201-
202- // Concurrently index new docs with mapping changes
203- Thread thread = new Thread (() -> {
204- char [] chars = "abcdeghijklmnopqrstuvwxyz" .toCharArray ();
205- for (char c : chars ) {
206- if (isRunning .get () == false ) {
207- break ;
208- }
209- final String source ;
210- long l = randomLongBetween (0 , 50000 );
211- if (randomBoolean ()) {
212- source = String .format (Locale .ROOT , "{\" %c\" :%d}" , c , l );
213- } else {
214- source = String .format (Locale .ROOT , "{\" %c\" :\" %d\" }" , c , l );
215- }
216- for (int i = 64 ; i < 150 ; i ++) {
217- if (isRunning .get () == false ) {
218- break ;
219- }
220- leaderClient ().prepareIndex ("index1" , "doc" , Long .toString (i )).setSource (source , XContentType .JSON ).get ();
221- if (rarely ()) {
222- leaderClient ().admin ().indices ().prepareFlush (leaderIndex ).setForce (true ).get ();
223- }
224- }
225- leaderClient ().admin ().indices ().prepareFlush (leaderIndex ).setForce (true ).setWaitIfOngoing (true ).get ();
226- }
227- });
228- thread .start ();
229-
230200 Settings .Builder settingsBuilder = Settings .builder ()
231201 .put (IndexMetaData .SETTING_INDEX_PROVIDED_NAME , followerIndex )
232202 .put (CcrSettings .CCR_FOLLOWING_INDEX_SETTING .getKey (), true );
@@ -245,9 +215,6 @@ public void testDocsAreRecovered() throws Exception {
245215 assertExpectedDocument (followerIndex , i );
246216 }
247217
248- isRunning .set (false );
249- thread .join ();
250-
251218 settingsRequest = new ClusterUpdateSettingsRequest ();
252219 ByteSizeValue defaultValue = CcrSettings .RECOVERY_CHUNK_SIZE .getDefault (Settings .EMPTY );
253220 settingsRequest .persistentSettings (Settings .builder ().put (CcrSettings .RECOVERY_CHUNK_SIZE .getKey (), defaultValue ));
@@ -421,23 +388,60 @@ public void testFollowerMappingIsUpdated() throws IOException {
421388 .renameReplacement (followerIndex ).masterNodeTimeout (new TimeValue (1L , TimeUnit .HOURS ))
422389 .indexSettings (settingsBuilder );
423390
424- final String source = String .format (Locale .ROOT , "{\" k\" :%d}" , 1 );
425- leaderClient ().prepareIndex ("index1" , "doc" , Long .toString (1 )).setSource (source , XContentType .JSON ).get ();
426391
427- PlainActionFuture <RestoreInfo > future = PlainActionFuture .newFuture ();
428- restoreService .restoreSnapshot (restoreRequest , waitForRestore (clusterService , future ));
429- RestoreInfo restoreInfo = future .actionGet ();
392+ List <MockTransportService > transportServices = new ArrayList <>();
393+ CountDownLatch latch = new CountDownLatch (1 );
394+ AtomicBoolean updateSent = new AtomicBoolean (false );
395+ Runnable updateMappings = () -> {
396+ if (updateSent .compareAndSet (false , true )) {
397+ leaderClient ()
398+ .admin ()
399+ .indices ()
400+ .preparePutMapping (leaderIndex )
401+ .setType ("doc" )
402+ .setSource ("{\" properties\" :{\" k\" :{\" type\" :\" long\" }}}" , XContentType .JSON )
403+ .execute (ActionListener .wrap (latch ::countDown ));
404+ }
405+ try {
406+ latch .await ();
407+ } catch (InterruptedException e ) {
408+ throw ExceptionsHelper .convertToRuntime (e );
409+ }
410+ };
430411
431- assertEquals (restoreInfo .totalShards (), restoreInfo .successfulShards ());
432- assertEquals (0 , restoreInfo .failedShards ());
412+ for (TransportService transportService : getFollowerCluster ().getDataOrMasterNodeInstances (TransportService .class )) {
413+ MockTransportService mockTransportService = (MockTransportService ) transportService ;
414+ transportServices .add (mockTransportService );
415+ mockTransportService .addSendBehavior ((connection , requestId , action , request , options ) -> {
416+ if (action .equals (PutCcrRestoreSessionAction .NAME )) {
417+ updateMappings .run ();
418+ connection .sendRequest (requestId , action , request , options );
419+ } else {
420+ connection .sendRequest (requestId , action , request , options );
421+ }
422+ });
423+ }
424+
425+ try {
426+ PlainActionFuture <RestoreInfo > future = PlainActionFuture .newFuture ();
427+ restoreService .restoreSnapshot (restoreRequest , waitForRestore (clusterService , future ));
428+ RestoreInfo restoreInfo = future .actionGet ();
433429
434- ClusterStateRequest clusterStateRequest = new ClusterStateRequest ();
435- clusterStateRequest .clear ();
436- clusterStateRequest .metaData (true );
437- clusterStateRequest .indices (followerIndex );
438- MappingMetaData mappingMetaData = followerClient ().admin ().indices ().prepareGetMappings ("index2" ).get ().getMappings ()
439- .get ("index2" ).get ("doc" );
440- assertThat (XContentMapValues .extractValue ("properties.k.type" , mappingMetaData .sourceAsMap ()), equalTo ("long" ));
430+ assertEquals (restoreInfo .totalShards (), restoreInfo .successfulShards ());
431+ assertEquals (0 , restoreInfo .failedShards ());
432+
433+ ClusterStateRequest clusterStateRequest = new ClusterStateRequest ();
434+ clusterStateRequest .clear ();
435+ clusterStateRequest .metaData (true );
436+ clusterStateRequest .indices (followerIndex );
437+ MappingMetaData mappingMetaData = followerClient ().admin ().indices ().prepareGetMappings ("index2" ).get ().getMappings ()
438+ .get ("index2" ).get ("doc" );
439+ assertThat (XContentMapValues .extractValue ("properties.k.type" , mappingMetaData .sourceAsMap ()), equalTo ("long" ));
440+ } finally {
441+ for (MockTransportService transportService : transportServices ) {
442+ transportService .clearAllRules ();
443+ }
444+ }
441445 }
442446
443447 private void assertExpectedDocument (String followerIndex , final int value ) {
0 commit comments