2323import org .elasticsearch .action .admin .cluster .node .tasks .get .GetTaskResponse ;
2424import org .elasticsearch .action .admin .cluster .node .tasks .list .ListTasksResponse ;
2525import org .elasticsearch .client .Client ;
26+ import org .elasticsearch .index .IndexModule ;
27+ import org .elasticsearch .index .engine .Engine ;
2628import org .elasticsearch .index .query .QueryBuilders ;
29+ import org .elasticsearch .index .reindex .AbstractBulkByScrollRequestBuilder ;
2730import org .elasticsearch .index .reindex .BulkByScrollResponse ;
2831import org .elasticsearch .index .reindex .BulkByScrollTask ;
32+ import org .elasticsearch .index .reindex .CancelTests ;
2933import org .elasticsearch .index .reindex .DeleteByQueryAction ;
3034import org .elasticsearch .index .reindex .ReindexAction ;
35+ import org .elasticsearch .index .reindex .ReindexPlugin ;
36+ import org .elasticsearch .index .reindex .ReindexRequestBuilder ;
3137import org .elasticsearch .index .reindex .RethrottleAction ;
3238import org .elasticsearch .index .reindex .UpdateByQueryAction ;
3339import org .elasticsearch .index .reindex .UpdateByQueryRequestBuilder ;
40+ import org .elasticsearch .index .shard .IndexingOperationListener ;
41+ import org .elasticsearch .index .shard .ShardId ;
42+ import org .elasticsearch .plugins .Plugin ;
3443import org .elasticsearch .script .Script ;
3544import org .elasticsearch .script .ScriptType ;
3645import org .elasticsearch .search .sort .SortOrder ;
3746import org .elasticsearch .tasks .TaskId ;
3847import org .elasticsearch .tasks .TaskInfo ;
3948import org .elasticsearch .test .ESIntegTestCase ;
49+ import org .hamcrest .Matcher ;
50+ import org .junit .Before ;
4051
52+ import java .util .Arrays ;
53+ import java .util .Collection ;
4154import java .util .Collections ;
55+ import java .util .concurrent .Semaphore ;
56+ import java .util .concurrent .TimeUnit ;
57+ import java .util .stream .Collectors ;
58+ import java .util .stream .IntStream ;
59+
60+ import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertHitCount ;
61+ import static org .hamcrest .Matchers .equalTo ;
4262
4363public class ReindexDocumentationIT extends ESIntegTestCase {
4464
45- public void reindex () {
65+ // Semaphore used to allow & block indexing operations during the test
66+ private static final Semaphore ALLOWED_OPERATIONS = new Semaphore (0 );
67+ private static final String INDEX_NAME = "source_index" ;
68+
69+ @ Override
70+ protected boolean ignoreExternalCluster () {
71+ return true ;
72+ }
73+
74+ @ Override
75+ protected Collection <Class <? extends Plugin >> nodePlugins () {
76+ return Arrays .asList (ReindexPlugin .class , ReindexCancellationPlugin .class );
77+ }
78+
79+ @ Override
80+ protected Collection <Class <? extends Plugin >> transportClientPlugins () {
81+ return Collections .singletonList (ReindexPlugin .class );
82+ }
83+
84+ @ Before
85+ public void setup () {
86+ client ().admin ().indices ().prepareCreate (INDEX_NAME ).get ();
87+ }
88+
89+ public void testReindex () {
4690 Client client = client ();
4791 // tag::reindex1
48- BulkByScrollResponse response = ReindexAction .INSTANCE .newRequestBuilder (client )
92+ BulkByScrollResponse response =
93+ ReindexAction .INSTANCE .newRequestBuilder (client )
94+ .source ("source_index" )
4995 .destination ("target_index" )
5096 .filter (QueryBuilders .matchQuery ("category" , "xzy" )) // <1>
5197 .get ();
5298 // end::reindex1
5399 }
54100
55- public void updateByQuery () {
101+ public void testUpdateByQuery () {
56102 Client client = client ();
103+ client .admin ().indices ().prepareCreate ("foo" ).get ();
104+ client .admin ().indices ().prepareCreate ("bar" ).get ();
105+ client .admin ().indices ().preparePutMapping (INDEX_NAME ).setType ("_doc" ).setSource ("cat" , "type=keyword" ).get ();
57106 {
58107 // tag::update-by-query
59- UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction .INSTANCE .newRequestBuilder (client );
108+ UpdateByQueryRequestBuilder updateByQuery =
109+ UpdateByQueryAction .INSTANCE .newRequestBuilder (client );
60110 updateByQuery .source ("source_index" ).abortOnVersionConflict (false );
61111 BulkByScrollResponse response = updateByQuery .get ();
62112 // end::update-by-query
63113 }
64114 {
65115 // tag::update-by-query-filter
66- UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction .INSTANCE .newRequestBuilder (client );
116+ UpdateByQueryRequestBuilder updateByQuery =
117+ UpdateByQueryAction .INSTANCE .newRequestBuilder (client );
67118 updateByQuery .source ("source_index" )
68119 .filter (QueryBuilders .termQuery ("level" , "awesome" ))
69120 .size (1000 )
70- .script (new Script (ScriptType .INLINE , "ctx._source.awesome = 'absolutely'" , "painless" , Collections .emptyMap ()));
121+ .script (new Script (ScriptType .INLINE ,
122+ "ctx._source.awesome = 'absolutely'" ,
123+ "painless" ,
124+ Collections .emptyMap ()));
71125 BulkByScrollResponse response = updateByQuery .get ();
72126 // end::update-by-query-filter
73127 }
74128 {
75129 // tag::update-by-query-size
76- UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction .INSTANCE .newRequestBuilder (client );
130+ UpdateByQueryRequestBuilder updateByQuery =
131+ UpdateByQueryAction .INSTANCE .newRequestBuilder (client );
77132 updateByQuery .source ("source_index" )
78- .source ().setSize (500 );
133+ .source ()
134+ .setSize (500 );
79135 BulkByScrollResponse response = updateByQuery .get ();
80136 // end::update-by-query-size
81137 }
82138 {
83139 // tag::update-by-query-sort
84- UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction .INSTANCE .newRequestBuilder (client );
85- updateByQuery .source ("source_index" ).size (100 )
86- .source ().addSort ("cat" , SortOrder .DESC );
140+ UpdateByQueryRequestBuilder updateByQuery =
141+ UpdateByQueryAction .INSTANCE .newRequestBuilder (client );
142+ updateByQuery .source ("source_index" )
143+ .size (100 )
144+ .source ()
145+ .addSort ("cat" , SortOrder .DESC );
87146 BulkByScrollResponse response = updateByQuery .get ();
88147 // end::update-by-query-sort
89148 }
90149 {
91150 // tag::update-by-query-script
92- UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction .INSTANCE .newRequestBuilder (client );
151+ UpdateByQueryRequestBuilder updateByQuery =
152+ UpdateByQueryAction .INSTANCE .newRequestBuilder (client );
93153 updateByQuery .source ("source_index" )
94154 .script (new Script (
95155 ScriptType .INLINE ,
@@ -106,66 +166,86 @@ public void updateByQuery() {
106166 }
107167 {
108168 // tag::update-by-query-multi-index
109- UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction .INSTANCE .newRequestBuilder (client );
169+ UpdateByQueryRequestBuilder updateByQuery =
170+ UpdateByQueryAction .INSTANCE .newRequestBuilder (client );
110171 updateByQuery .source ("foo" , "bar" ).source ().setTypes ("a" , "b" );
111172 BulkByScrollResponse response = updateByQuery .get ();
112173 // end::update-by-query-multi-index
113174 }
114175 {
115176 // tag::update-by-query-routing
116- UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction .INSTANCE .newRequestBuilder (client );
177+ UpdateByQueryRequestBuilder updateByQuery =
178+ UpdateByQueryAction .INSTANCE .newRequestBuilder (client );
117179 updateByQuery .source ().setRouting ("cat" );
118180 BulkByScrollResponse response = updateByQuery .get ();
119181 // end::update-by-query-routing
120182 }
121183 {
122184 // tag::update-by-query-pipeline
123- UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction .INSTANCE .newRequestBuilder (client );
185+ UpdateByQueryRequestBuilder updateByQuery =
186+ UpdateByQueryAction .INSTANCE .newRequestBuilder (client );
124187 updateByQuery .setPipeline ("hurray" );
125188 BulkByScrollResponse response = updateByQuery .get ();
126189 // end::update-by-query-pipeline
127190 }
191+ }
192+
193+ public void testTasks () throws Exception {
194+ final Client client = client ();
195+ final ReindexRequestBuilder builder = reindexAndPartiallyBlock ();
196+
128197 {
129198 // tag::update-by-query-list-tasks
130199 ListTasksResponse tasksList = client .admin ().cluster ().prepareListTasks ()
131200 .setActions (UpdateByQueryAction .NAME ).setDetailed (true ).get ();
132201 for (TaskInfo info : tasksList .getTasks ()) {
133202 TaskId taskId = info .getTaskId ();
134- BulkByScrollTask .Status status = (BulkByScrollTask .Status ) info .getStatus ();
203+ BulkByScrollTask .Status status =
204+ (BulkByScrollTask .Status ) info .getStatus ();
135205 // do stuff
136206 }
137207 // end::update-by-query-list-tasks
138208 }
209+
210+ TaskInfo mainTask = CancelTests .findTaskToCancel (ReindexAction .NAME , builder .request ().getSlices ());
211+ BulkByScrollTask .Status status = (BulkByScrollTask .Status ) mainTask .getStatus ();
212+ assertNull (status .getReasonCancelled ());
213+ TaskId taskId = mainTask .getTaskId ();
139214 {
140- TaskId taskId = null ;
141215 // tag::update-by-query-get-task
142216 GetTaskResponse get = client .admin ().cluster ().prepareGetTask (taskId ).get ();
143217 // end::update-by-query-get-task
144218 }
145219 {
146- TaskId taskId = null ;
147220 // tag::update-by-query-cancel-task
148221 // Cancel all update-by-query requests
149- client .admin ().cluster ().prepareCancelTasks ().setActions (UpdateByQueryAction .NAME ).get ().getTasks ();
222+ client .admin ().cluster ().prepareCancelTasks ()
223+ .setActions (UpdateByQueryAction .NAME ).get ().getTasks ();
150224 // Cancel a specific update-by-query request
151- client .admin ().cluster ().prepareCancelTasks ().setTaskId (taskId ).get ().getTasks ();
225+ client .admin ().cluster ().prepareCancelTasks ()
226+ .setTaskId (taskId ).get ().getTasks ();
152227 // end::update-by-query-cancel-task
153228 }
154229 {
155- TaskId taskId = null ;
156230 // tag::update-by-query-rethrottle
157231 RethrottleAction .INSTANCE .newRequestBuilder (client )
158232 .setTaskId (taskId )
159233 .setRequestsPerSecond (2.0f )
160234 .get ();
161235 // end::update-by-query-rethrottle
162236 }
237+
238+ // unblocking the blocked update
239+ ALLOWED_OPERATIONS .release (builder .request ().getSlices ());
163240 }
164241
165- public void deleteByQuery () {
242+ public void testDeleteByQuery () {
166243 Client client = client ();
244+ client .admin ().indices ().prepareCreate ("persons" ).get ();
245+
167246 // tag::delete-by-query-sync
168- BulkByScrollResponse response = DeleteByQueryAction .INSTANCE .newRequestBuilder (client )
247+ BulkByScrollResponse response =
248+ DeleteByQueryAction .INSTANCE .newRequestBuilder (client )
169249 .filter (QueryBuilders .matchQuery ("gender" , "male" )) // <1>
170250 .source ("persons" ) // <2>
171251 .get (); // <3>
@@ -189,4 +269,76 @@ public void onFailure(Exception e) {
189269 // end::delete-by-query-async
190270 }
191271
272+ /**
273+ * Similar to what CancelTests does: blocks some operations to be able to catch some tasks in running state
274+ * @see CancelTests#testCancel(String, AbstractBulkByScrollRequestBuilder, CancelTests.CancelAssertion, Matcher)
275+ */
276+ private ReindexRequestBuilder reindexAndPartiallyBlock () throws Exception {
277+ final Client client = client ();
278+ final int numDocs = randomIntBetween (10 , 100 );
279+ ALLOWED_OPERATIONS .release (numDocs );
280+
281+ indexRandom (true , false , true , IntStream .range (0 , numDocs )
282+ .mapToObj (i -> client ().prepareIndex (INDEX_NAME , "_doc" , Integer .toString (i )).setSource ("n" , Integer .toString (i )))
283+ .collect (Collectors .toList ()));
284+
285+ // Checks that the all documents have been indexed and correctly counted
286+ assertHitCount (client ().prepareSearch (INDEX_NAME ).setSize (0 ).get (), numDocs );
287+ assertThat (ALLOWED_OPERATIONS .drainPermits (), equalTo (0 ));
288+
289+ ReindexRequestBuilder builder = new ReindexRequestBuilder (client , ReindexAction .INSTANCE ).source (INDEX_NAME )
290+ .destination ("target_index" , "_doc" );
291+ // Scroll by 1 so that cancellation is easier to control
292+ builder .source ().setSize (1 );
293+
294+ int numModifiedDocs = randomIntBetween (builder .request ().getSlices () * 2 , numDocs );
295+ // chose to modify some of docs - rest is still blocked
296+ ALLOWED_OPERATIONS .release (numModifiedDocs - builder .request ().getSlices ());
297+
298+ // Now execute the reindex action...
299+ builder .execute ();
300+
301+ // 10 seconds is usually fine but on heavily loaded machines this can take a while
302+ assertTrue ("updates blocked" , awaitBusy (
303+ () -> ALLOWED_OPERATIONS .hasQueuedThreads () && ALLOWED_OPERATIONS .availablePermits () == 0 ,
304+ 1 , TimeUnit .MINUTES ));
305+ return builder ;
306+ }
307+
308+ public static class ReindexCancellationPlugin extends Plugin {
309+
310+ @ Override
311+ public void onIndexModule (IndexModule indexModule ) {
312+ indexModule .addIndexOperationListener (new BlockingOperationListener ());
313+ }
314+ }
315+
316+ public static class BlockingOperationListener implements IndexingOperationListener {
317+
318+ @ Override
319+ public Engine .Index preIndex (ShardId shardId , Engine .Index index ) {
320+ return preCheck (index , index .type ());
321+ }
322+
323+ @ Override
324+ public Engine .Delete preDelete (ShardId shardId , Engine .Delete delete ) {
325+ return preCheck (delete , delete .type ());
326+ }
327+
328+ private <T extends Engine .Operation > T preCheck (T operation , String type ) {
329+ if (("_doc" .equals (type ) == false ) || (operation .origin () != Engine .Operation .Origin .PRIMARY )) {
330+ return operation ;
331+ }
332+
333+ try {
334+ if (ALLOWED_OPERATIONS .tryAcquire (30 , TimeUnit .SECONDS )) {
335+ return operation ;
336+ }
337+ } catch (InterruptedException e ) {
338+ throw new RuntimeException (e );
339+ }
340+ throw new IllegalStateException ("Something went wrong" );
341+ }
342+ }
343+
192344}
0 commit comments