2323import org .elasticsearch .action .admin .cluster .node .tasks .get .GetTaskResponse ;
2424import org .elasticsearch .action .admin .cluster .node .tasks .list .ListTasksResponse ;
2525import org .elasticsearch .client .Client ;
26+ import org .elasticsearch .index .IndexModule ;
27+ import org .elasticsearch .index .engine .Engine ;
2628import org .elasticsearch .index .query .QueryBuilders ;
29+ import org .elasticsearch .index .reindex .AbstractBulkByScrollRequestBuilder ;
2730import org .elasticsearch .index .reindex .BulkByScrollResponse ;
2831import org .elasticsearch .index .reindex .BulkByScrollTask ;
32+ import org .elasticsearch .index .reindex .CancelTests ;
2933import org .elasticsearch .index .reindex .DeleteByQueryAction ;
3034import org .elasticsearch .index .reindex .DeleteByQueryRequestBuilder ;
3135import org .elasticsearch .index .reindex .ReindexAction ;
36+ import org .elasticsearch .index .reindex .ReindexPlugin ;
3237import org .elasticsearch .index .reindex .ReindexRequestBuilder ;
3338import org .elasticsearch .index .reindex .RethrottleAction ;
3439import org .elasticsearch .index .reindex .RethrottleRequestBuilder ;
3540import org .elasticsearch .index .reindex .UpdateByQueryAction ;
3641import org .elasticsearch .index .reindex .UpdateByQueryRequestBuilder ;
42+ import org .elasticsearch .index .shard .IndexingOperationListener ;
43+ import org .elasticsearch .index .shard .ShardId ;
44+ import org .elasticsearch .plugins .Plugin ;
3745import org .elasticsearch .script .Script ;
3846import org .elasticsearch .script .ScriptType ;
3947import org .elasticsearch .search .sort .SortOrder ;
4048import org .elasticsearch .tasks .TaskId ;
4149import org .elasticsearch .tasks .TaskInfo ;
4250import org .elasticsearch .test .ESIntegTestCase ;
51+ import org .hamcrest .Matcher ;
52+ import org .junit .Before ;
4353
54+ import java .util .Arrays ;
55+ import java .util .Collection ;
4456import java .util .Collections ;
57+ import java .util .concurrent .Semaphore ;
58+ import java .util .concurrent .TimeUnit ;
59+ import java .util .stream .Collectors ;
60+ import java .util .stream .IntStream ;
61+
62+ import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertHitCount ;
63+ import static org .hamcrest .Matchers .equalTo ;
4564
4665public class ReindexDocumentationIT extends ESIntegTestCase {
4766
67+ // Semaphore used to allow & block indexing operations during the test
68+ private static final Semaphore ALLOWED_OPERATIONS = new Semaphore (0 );
69+ private static final String INDEX_NAME = "source_index" ;
70+
71+ @ Override
72+ protected boolean ignoreExternalCluster () {
73+ return true ;
74+ }
75+
76+ @ Override
77+ protected Collection <Class <? extends Plugin >> nodePlugins () {
78+ return Arrays .asList (ReindexPlugin .class , ReindexCancellationPlugin .class );
79+ }
80+
81+ @ Override
82+ protected Collection <Class <? extends Plugin >> transportClientPlugins () {
83+ return Collections .singletonList (ReindexPlugin .class );
84+ }
85+
86+ @ Before
87+ public void setup () {
88+ client ().admin ().indices ().prepareCreate (INDEX_NAME ).get ();
89+ }
90+
4891 @ SuppressWarnings ("unused" )
49- public void reindex () {
92+ public void testReindex () {
5093 Client client = client ();
5194 // tag::reindex1
52- BulkByScrollResponse response = new ReindexRequestBuilder (client , ReindexAction .INSTANCE )
95+ BulkByScrollResponse response =
96+ new ReindexRequestBuilder (client , ReindexAction .INSTANCE )
97+ .source ("source_index" )
5398 .destination ("target_index" )
5499 .filter (QueryBuilders .matchQuery ("category" , "xzy" )) // <1>
55100 .get ();
56101 // end::reindex1
57102 }
58103
59104 @ SuppressWarnings ("unused" )
60- public void updateByQuery () {
105+ public void testUpdateByQuery () {
61106 Client client = client ();
107+ client .admin ().indices ().prepareCreate ("foo" ).get ();
108+ client .admin ().indices ().prepareCreate ("bar" ).get ();
109+ client .admin ().indices ().preparePutMapping (INDEX_NAME ).setType ("_doc" ).setSource ("cat" , "type=keyword" ).get ();
62110 {
63111 // tag::update-by-query
64- UpdateByQueryRequestBuilder updateByQuery = new UpdateByQueryRequestBuilder (client , UpdateByQueryAction .INSTANCE );
112+ UpdateByQueryRequestBuilder updateByQuery =
113+ new UpdateByQueryRequestBuilder (client , UpdateByQueryAction .INSTANCE );
65114 updateByQuery .source ("source_index" ).abortOnVersionConflict (false );
66115 BulkByScrollResponse response = updateByQuery .get ();
67116 // end::update-by-query
68117 }
69118 {
70119 // tag::update-by-query-filter
71- UpdateByQueryRequestBuilder updateByQuery = new UpdateByQueryRequestBuilder (client , UpdateByQueryAction .INSTANCE );
120+ UpdateByQueryRequestBuilder updateByQuery =
121+ new UpdateByQueryRequestBuilder (client , UpdateByQueryAction .INSTANCE );
72122 updateByQuery .source ("source_index" )
73123 .filter (QueryBuilders .termQuery ("level" , "awesome" ))
74124 .size (1000 )
75- .script (new Script (ScriptType .INLINE , "ctx._source.awesome = 'absolutely'" , "painless" , Collections .emptyMap ()));
125+ .script (new Script (ScriptType .INLINE ,
126+ "ctx._source.awesome = 'absolutely'" ,
127+ "painless" ,
128+ Collections .emptyMap ()));
76129 BulkByScrollResponse response = updateByQuery .get ();
77130 // end::update-by-query-filter
78131 }
79132 {
80133 // tag::update-by-query-size
81- UpdateByQueryRequestBuilder updateByQuery = new UpdateByQueryRequestBuilder (client , UpdateByQueryAction .INSTANCE );
134+ UpdateByQueryRequestBuilder updateByQuery =
135+ new UpdateByQueryRequestBuilder (client , UpdateByQueryAction .INSTANCE );
82136 updateByQuery .source ("source_index" )
83- .source ().setSize (500 );
137+ .source ()
138+ .setSize (500 );
84139 BulkByScrollResponse response = updateByQuery .get ();
85140 // end::update-by-query-size
86141 }
87142 {
88143 // tag::update-by-query-sort
89- UpdateByQueryRequestBuilder updateByQuery = new UpdateByQueryRequestBuilder (client , UpdateByQueryAction .INSTANCE );
90- updateByQuery .source ("source_index" ).size (100 )
91- .source ().addSort ("cat" , SortOrder .DESC );
144+ UpdateByQueryRequestBuilder updateByQuery =
145+ new UpdateByQueryRequestBuilder (client , UpdateByQueryAction .INSTANCE );
146+ updateByQuery .source ("source_index" )
147+ .size (100 )
148+ .source ()
149+ .addSort ("cat" , SortOrder .DESC );
92150 BulkByScrollResponse response = updateByQuery .get ();
93151 // end::update-by-query-sort
94152 }
95153 {
96154 // tag::update-by-query-script
97- UpdateByQueryRequestBuilder updateByQuery = new UpdateByQueryRequestBuilder (client , UpdateByQueryAction .INSTANCE );
155+ UpdateByQueryRequestBuilder updateByQuery =
156+ new UpdateByQueryRequestBuilder (client , UpdateByQueryAction .INSTANCE );
98157 updateByQuery .source ("source_index" )
99158 .script (new Script (
100159 ScriptType .INLINE ,
@@ -111,67 +170,87 @@ public void updateByQuery() {
111170 }
112171 {
113172 // tag::update-by-query-multi-index
114- UpdateByQueryRequestBuilder updateByQuery = new UpdateByQueryRequestBuilder (client , UpdateByQueryAction .INSTANCE );
173+ UpdateByQueryRequestBuilder updateByQuery =
174+ new UpdateByQueryRequestBuilder (client , UpdateByQueryAction .INSTANCE );
115175 updateByQuery .source ("foo" , "bar" ).source ().setTypes ("a" , "b" );
116176 BulkByScrollResponse response = updateByQuery .get ();
117177 // end::update-by-query-multi-index
118178 }
119179 {
120180 // tag::update-by-query-routing
121- UpdateByQueryRequestBuilder updateByQuery = new UpdateByQueryRequestBuilder (client , UpdateByQueryAction .INSTANCE );
181+ UpdateByQueryRequestBuilder updateByQuery =
182+ new UpdateByQueryRequestBuilder (client , UpdateByQueryAction .INSTANCE );
122183 updateByQuery .source ().setRouting ("cat" );
123184 BulkByScrollResponse response = updateByQuery .get ();
124185 // end::update-by-query-routing
125186 }
126187 {
127188 // tag::update-by-query-pipeline
128- UpdateByQueryRequestBuilder updateByQuery = new UpdateByQueryRequestBuilder (client , UpdateByQueryAction .INSTANCE );
189+ UpdateByQueryRequestBuilder updateByQuery =
190+ new UpdateByQueryRequestBuilder (client , UpdateByQueryAction .INSTANCE );
129191 updateByQuery .setPipeline ("hurray" );
130192 BulkByScrollResponse response = updateByQuery .get ();
131193 // end::update-by-query-pipeline
132194 }
195+ }
196+
197+ public void testTasks () throws InterruptedException {
198+ final Client client = client ();
199+ final ReindexRequestBuilder builder = reindexAndPartiallyBlock ();
200+
133201 {
134202 // tag::update-by-query-list-tasks
135203 ListTasksResponse tasksList = client .admin ().cluster ().prepareListTasks ()
136204 .setActions (UpdateByQueryAction .NAME ).setDetailed (true ).get ();
137205 for (TaskInfo info : tasksList .getTasks ()) {
138206 TaskId taskId = info .getTaskId ();
139- BulkByScrollTask .Status status = (BulkByScrollTask .Status ) info .getStatus ();
207+ BulkByScrollTask .Status status =
208+ (BulkByScrollTask .Status ) info .getStatus ();
140209 // do stuff
141210 }
142211 // end::update-by-query-list-tasks
143212 }
213+
214+ TaskInfo mainTask = CancelTests .findTaskToCancel (ReindexAction .NAME , builder .request ().getSlices ());
215+ BulkByScrollTask .Status status = (BulkByScrollTask .Status ) mainTask .getStatus ();
216+ assertNull (status .getReasonCancelled ());
217+ TaskId taskId = mainTask .getTaskId ();
144218 {
145- TaskId taskId = null ;
146219 // tag::update-by-query-get-task
147220 GetTaskResponse get = client .admin ().cluster ().prepareGetTask (taskId ).get ();
148221 // end::update-by-query-get-task
149222 }
150223 {
151- TaskId taskId = null ;
152224 // tag::update-by-query-cancel-task
153225 // Cancel all update-by-query requests
154- client .admin ().cluster ().prepareCancelTasks ().setActions (UpdateByQueryAction .NAME ).get ().getTasks ();
226+ client .admin ().cluster ().prepareCancelTasks ()
227+ .setActions (UpdateByQueryAction .NAME ).get ().getTasks ();
155228 // Cancel a specific update-by-query request
156- client .admin ().cluster ().prepareCancelTasks ().setTaskId (taskId ).get ().getTasks ();
229+ client .admin ().cluster ().prepareCancelTasks ()
230+ .setTaskId (taskId ).get ().getTasks ();
157231 // end::update-by-query-cancel-task
158232 }
159233 {
160- TaskId taskId = null ;
161234 // tag::update-by-query-rethrottle
162235 new RethrottleRequestBuilder (client , RethrottleAction .INSTANCE )
163236 .setTaskId (taskId )
164237 .setRequestsPerSecond (2.0f )
165238 .get ();
166239 // end::update-by-query-rethrottle
167240 }
241+
242+ // unblocking the blocked update
243+ ALLOWED_OPERATIONS .release (builder .request ().getSlices ());
168244 }
169245
170246 @ SuppressWarnings ("unused" )
171- public void deleteByQuery () {
247+ public void testDeleteByQuery () {
172248 Client client = client ();
249+ client .admin ().indices ().prepareCreate ("persons" ).get ();
250+
173251 // tag::delete-by-query-sync
174- BulkByScrollResponse response = new DeleteByQueryRequestBuilder (client , DeleteByQueryAction .INSTANCE )
252+ BulkByScrollResponse response =
253+ new DeleteByQueryRequestBuilder (client , DeleteByQueryAction .INSTANCE )
175254 .filter (QueryBuilders .matchQuery ("gender" , "male" )) // <1>
176255 .source ("persons" ) // <2>
177256 .get (); // <3>
@@ -195,4 +274,76 @@ public void onFailure(Exception e) {
195274 // end::delete-by-query-async
196275 }
197276
277+ /**
278+ * Similar to what CancelTests does: blocks some operations to be able to catch some tasks in running state
279+ * @see CancelTests#testCancel(String, AbstractBulkByScrollRequestBuilder, CancelTests.CancelAssertion, Matcher)
280+ */
281+ private ReindexRequestBuilder reindexAndPartiallyBlock () throws InterruptedException {
282+ final Client client = client ();
283+ final int numDocs = randomIntBetween (10 , 100 );
284+ ALLOWED_OPERATIONS .release (numDocs );
285+
286+ indexRandom (true , false , true , IntStream .range (0 , numDocs )
287+ .mapToObj (i -> client ().prepareIndex (INDEX_NAME , "_doc" , Integer .toString (i )).setSource ("n" , Integer .toString (i )))
288+ .collect (Collectors .toList ()));
289+
290+ // Checks that the all documents have been indexed and correctly counted
291+ assertHitCount (client ().prepareSearch (INDEX_NAME ).setSize (0 ).get (), numDocs );
292+ assertThat (ALLOWED_OPERATIONS .drainPermits (), equalTo (0 ));
293+
294+ ReindexRequestBuilder builder = new ReindexRequestBuilder (client , ReindexAction .INSTANCE ).source (INDEX_NAME )
295+ .destination ("target_index" , "_doc" );
296+ // Scroll by 1 so that cancellation is easier to control
297+ builder .source ().setSize (1 );
298+
299+ int numModifiedDocs = randomIntBetween (builder .request ().getSlices () * 2 , numDocs );
300+ // chose to modify some of docs - rest is still blocked
301+ ALLOWED_OPERATIONS .release (numModifiedDocs - builder .request ().getSlices ());
302+
303+ // Now execute the reindex action...
304+ builder .execute ();
305+
306+ // 10 seconds is usually fine but on heavily loaded machines this can take a while
307+ assertTrue ("updates blocked" , awaitBusy (
308+ () -> ALLOWED_OPERATIONS .hasQueuedThreads () && ALLOWED_OPERATIONS .availablePermits () == 0 ,
309+ 1 , TimeUnit .MINUTES ));
310+ return builder ;
311+ }
312+
313+ public static class ReindexCancellationPlugin extends Plugin {
314+
315+ @ Override
316+ public void onIndexModule (IndexModule indexModule ) {
317+ indexModule .addIndexOperationListener (new BlockingOperationListener ());
318+ }
319+ }
320+
321+ public static class BlockingOperationListener implements IndexingOperationListener {
322+
323+ @ Override
324+ public Engine .Index preIndex (ShardId shardId , Engine .Index index ) {
325+ return preCheck (index , index .type ());
326+ }
327+
328+ @ Override
329+ public Engine .Delete preDelete (ShardId shardId , Engine .Delete delete ) {
330+ return preCheck (delete , delete .type ());
331+ }
332+
333+ private <T extends Engine .Operation > T preCheck (T operation , String type ) {
334+ if (("_doc" .equals (type ) == false ) || (operation .origin () != Engine .Operation .Origin .PRIMARY )) {
335+ return operation ;
336+ }
337+
338+ try {
339+ if (ALLOWED_OPERATIONS .tryAcquire (30 , TimeUnit .SECONDS )) {
340+ return operation ;
341+ }
342+ } catch (InterruptedException e ) {
343+ throw new RuntimeException (e );
344+ }
345+ throw new IllegalStateException ("Something went wrong" );
346+ }
347+ }
348+
198349}
0 commit comments