55 */
66package org .elasticsearch .xpack .ccr .action ;
77
8+ import org .apache .logging .log4j .message .ParameterizedMessage ;
89import org .elasticsearch .action .Action ;
10+ import org .elasticsearch .action .ActionListener ;
911import org .elasticsearch .action .ActionRequestValidationException ;
1012import org .elasticsearch .action .ActionResponse ;
1113import org .elasticsearch .action .support .ActionFilters ;
2123import org .elasticsearch .common .io .stream .StreamInput ;
2224import org .elasticsearch .common .io .stream .StreamOutput ;
2325import org .elasticsearch .common .settings .Settings ;
26+ import org .elasticsearch .common .unit .TimeValue ;
2427import org .elasticsearch .index .IndexService ;
2528import org .elasticsearch .index .seqno .SeqNoStats ;
2629import org .elasticsearch .index .shard .IndexShard ;
3841import java .util .Arrays ;
3942import java .util .List ;
4043import java .util .Objects ;
44+ import java .util .concurrent .TimeoutException ;
4145
4246import static org .elasticsearch .action .ValidateActions .addValidationError ;
47+ import static org .elasticsearch .index .seqno .SequenceNumbers .UNASSIGNED_SEQ_NO ;
4348
4449public class ShardChangesAction extends Action <ShardChangesAction .Request , ShardChangesAction .Response , ShardChangesAction .RequestBuilder > {
4550
@@ -66,6 +71,7 @@ public static class Request extends SingleShardRequest<Request> {
6671 private int maxOperationCount ;
6772 private ShardId shardId ;
6873 private String expectedHistoryUUID ;
74+ private TimeValue pollTimeout = FollowIndexAction .DEFAULT_POLL_TIMEOUT ;
6975 private long maxOperationSizeInBytes = FollowIndexAction .DEFAULT_MAX_BATCH_SIZE_IN_BYTES ;
7076
7177 public Request (ShardId shardId , String expectedHistoryUUID ) {
@@ -109,6 +115,14 @@ public String getExpectedHistoryUUID() {
109115 return expectedHistoryUUID ;
110116 }
111117
118+ public TimeValue getPollTimeout () {
119+ return pollTimeout ;
120+ }
121+
122+ public void setPollTimeout (final TimeValue pollTimeout ) {
123+ this .pollTimeout = Objects .requireNonNull (pollTimeout , "pollTimeout" );
124+ }
125+
112126 @ Override
113127 public ActionRequestValidationException validate () {
114128 ActionRequestValidationException validationException = null ;
@@ -133,6 +147,7 @@ public void readFrom(StreamInput in) throws IOException {
133147 maxOperationCount = in .readVInt ();
134148 shardId = ShardId .readShardId (in );
135149 expectedHistoryUUID = in .readString ();
150+ pollTimeout = in .readTimeValue ();
136151 maxOperationSizeInBytes = in .readVLong ();
137152 }
138153
@@ -143,6 +158,7 @@ public void writeTo(StreamOutput out) throws IOException {
143158 out .writeVInt (maxOperationCount );
144159 shardId .writeTo (out );
145160 out .writeString (expectedHistoryUUID );
161+ out .writeTimeValue (pollTimeout );
146162 out .writeVLong (maxOperationSizeInBytes );
147163 }
148164
@@ -156,12 +172,13 @@ public boolean equals(final Object o) {
156172 maxOperationCount == request .maxOperationCount &&
157173 Objects .equals (shardId , request .shardId ) &&
158174 Objects .equals (expectedHistoryUUID , request .expectedHistoryUUID ) &&
175+ Objects .equals (pollTimeout , request .pollTimeout ) &&
159176 maxOperationSizeInBytes == request .maxOperationSizeInBytes ;
160177 }
161178
162179 @ Override
163180 public int hashCode () {
164- return Objects .hash (fromSeqNo , maxOperationCount , shardId , expectedHistoryUUID , maxOperationSizeInBytes );
181+ return Objects .hash (fromSeqNo , maxOperationCount , shardId , expectedHistoryUUID , pollTimeout , maxOperationSizeInBytes );
165182 }
166183
167184 @ Override
@@ -171,6 +188,7 @@ public String toString() {
171188 ", maxOperationCount=" + maxOperationCount +
172189 ", shardId=" + shardId +
173190 ", expectedHistoryUUID=" + expectedHistoryUUID +
191+ ", pollTimeout=" + pollTimeout +
174192 ", maxOperationSizeInBytes=" + maxOperationSizeInBytes +
175193 '}' ;
176194 }
@@ -279,19 +297,90 @@ public TransportAction(Settings settings,
279297
280298 @ Override
281299 protected Response shardOperation (Request request , ShardId shardId ) throws IOException {
282- IndexService indexService = indicesService .indexServiceSafe (request .getShard ().getIndex ());
283- IndexShard indexShard = indexService .getShard (request .getShard ().id ());
284- final SeqNoStats seqNoStats = indexShard .seqNoStats ();
300+ final IndexService indexService = indicesService .indexServiceSafe (request .getShard ().getIndex ());
301+ final IndexShard indexShard = indexService .getShard (request .getShard ().id ());
302+ final SeqNoStats seqNoStats = indexShard .seqNoStats ();
285303 final long mappingVersion = clusterService .state ().metaData ().index (shardId .getIndex ()).getMappingVersion ();
286304
287305 final Translog .Operation [] operations = getOperations (
288306 indexShard ,
289307 seqNoStats .getGlobalCheckpoint (),
290- request .fromSeqNo ,
291- request .maxOperationCount ,
292- request .expectedHistoryUUID ,
293- request .maxOperationSizeInBytes );
294- return new Response (mappingVersion , seqNoStats .getGlobalCheckpoint (), seqNoStats .getMaxSeqNo (), operations );
308+ request .getFromSeqNo (),
309+ request .getMaxOperationCount (),
310+ request .getExpectedHistoryUUID (),
311+ request .getMaxOperationSizeInBytes ());
312+ return getResponse (mappingVersion , seqNoStats , operations );
313+ }
314+
315+ @ Override
316+ protected void asyncShardOperation (
317+ final Request request ,
318+ final ShardId shardId ,
319+ final ActionListener <Response > listener ) throws IOException {
320+ final IndexService indexService = indicesService .indexServiceSafe (request .getShard ().getIndex ());
321+ final IndexShard indexShard = indexService .getShard (request .getShard ().id ());
322+ final SeqNoStats seqNoStats = indexShard .seqNoStats ();
323+
324+ if (request .getFromSeqNo () > seqNoStats .getGlobalCheckpoint ()) {
325+ logger .trace (
326+ "{} waiting for global checkpoint advancement from [{}] to [{}]" ,
327+ shardId ,
328+ seqNoStats .getGlobalCheckpoint (),
329+ request .getFromSeqNo ());
330+ indexShard .addGlobalCheckpointListener (
331+ request .getFromSeqNo (),
332+ (g , e ) -> {
333+ if (g != UNASSIGNED_SEQ_NO ) {
334+ assert request .getFromSeqNo () <= g
335+ : shardId + " only advanced to [" + g + "] while waiting for [" + request .getFromSeqNo () + "]" ;
336+ globalCheckpointAdvanced (shardId , g , request , listener );
337+ } else {
338+ assert e != null ;
339+ globalCheckpointAdvancementFailure (shardId , e , request , listener , indexShard );
340+ }
341+ },
342+ request .getPollTimeout ());
343+ } else {
344+ super .asyncShardOperation (request , shardId , listener );
345+ }
346+ }
347+
348+ private void globalCheckpointAdvanced (
349+ final ShardId shardId ,
350+ final long globalCheckpoint ,
351+ final Request request ,
352+ final ActionListener <Response > listener ) {
353+ logger .trace ("{} global checkpoint advanced to [{}] after waiting for [{}]" , shardId , globalCheckpoint , request .getFromSeqNo ());
354+ try {
355+ super .asyncShardOperation (request , shardId , listener );
356+ } catch (final IOException caught ) {
357+ listener .onFailure (caught );
358+ }
359+ }
360+
361+ private void globalCheckpointAdvancementFailure (
362+ final ShardId shardId ,
363+ final Exception e ,
364+ final Request request ,
365+ final ActionListener <Response > listener ,
366+ final IndexShard indexShard ) {
367+ logger .trace (
368+ () -> new ParameterizedMessage (
369+ "{} exception waiting for global checkpoint advancement to [{}]" , shardId , request .getFromSeqNo ()),
370+ e );
371+ if (e instanceof TimeoutException ) {
372+ try {
373+ final long mappingVersion =
374+ clusterService .state ().metaData ().index (shardId .getIndex ()).getMappingVersion ();
375+ final SeqNoStats latestSeqNoStats = indexShard .seqNoStats ();
376+ listener .onResponse (getResponse (mappingVersion , latestSeqNoStats , EMPTY_OPERATIONS_ARRAY ));
377+ } catch (final Exception caught ) {
378+ caught .addSuppressed (e );
379+ listener .onFailure (caught );
380+ }
381+ } else {
382+ listener .onFailure (e );
383+ }
295384 }
296385
297386 @ Override
@@ -314,7 +403,7 @@ protected Response newResponse() {
314403
315404 }
316405
317- private static final Translog .Operation [] EMPTY_OPERATIONS_ARRAY = new Translog .Operation [0 ];
406+ static final Translog .Operation [] EMPTY_OPERATIONS_ARRAY = new Translog .Operation [0 ];
318407
319408 /**
320409 * Returns at most maxOperationCount operations from the specified from sequence number.
@@ -338,7 +427,8 @@ static Translog.Operation[] getOperations(IndexShard indexShard,
338427 historyUUID + "]" );
339428 }
340429 if (fromSeqNo > globalCheckpoint ) {
341- return EMPTY_OPERATIONS_ARRAY ;
430+ throw new IllegalStateException (
431+ "not exposing operations from [" + fromSeqNo + "] greater than the global checkpoint [" + globalCheckpoint + "]" );
342432 }
343433 int seenBytes = 0 ;
344434 // - 1 is needed, because toSeqNo is inclusive
@@ -358,4 +448,8 @@ static Translog.Operation[] getOperations(IndexShard indexShard,
358448 return operations .toArray (EMPTY_OPERATIONS_ARRAY );
359449 }
360450
451+ static Response getResponse (final long mappingVersion , final SeqNoStats seqNoStats , final Translog .Operation [] operations ) {
452+ return new Response (mappingVersion , seqNoStats .getGlobalCheckpoint (), seqNoStats .getMaxSeqNo (), operations );
453+ }
454+
361455}
0 commit comments