55 */
66package org .elasticsearch .xpack .ccr .action ;
77
8+ import org .apache .logging .log4j .message .ParameterizedMessage ;
89import org .elasticsearch .action .Action ;
10+ import org .elasticsearch .action .ActionListener ;
911import org .elasticsearch .action .ActionRequestValidationException ;
1012import org .elasticsearch .action .ActionResponse ;
1113import org .elasticsearch .action .support .ActionFilters ;
1921import org .elasticsearch .common .io .stream .StreamInput ;
2022import org .elasticsearch .common .io .stream .StreamOutput ;
2123import org .elasticsearch .common .settings .Settings ;
24+ import org .elasticsearch .common .unit .TimeValue ;
2225import org .elasticsearch .index .IndexService ;
2326import org .elasticsearch .index .seqno .SeqNoStats ;
2427import org .elasticsearch .index .shard .IndexShard ;
3639import java .util .Arrays ;
3740import java .util .List ;
3841import java .util .Objects ;
42+ import java .util .concurrent .TimeoutException ;
3943
4044import static org .elasticsearch .action .ValidateActions .addValidationError ;
45+ import static org .elasticsearch .index .seqno .SequenceNumbers .UNASSIGNED_SEQ_NO ;
4146
4247public class ShardChangesAction extends Action <ShardChangesAction .Response > {
4348
@@ -59,6 +64,7 @@ public static class Request extends SingleShardRequest<Request> {
5964 private int maxOperationCount ;
6065 private ShardId shardId ;
6166 private String expectedHistoryUUID ;
67+ private TimeValue pollTimeout = FollowIndexAction .DEFAULT_POLL_TIMEOUT ;
6268 private long maxOperationSizeInBytes = FollowIndexAction .DEFAULT_MAX_BATCH_SIZE_IN_BYTES ;
6369
6470 public Request (ShardId shardId , String expectedHistoryUUID ) {
@@ -102,6 +108,14 @@ public String getExpectedHistoryUUID() {
102108 return expectedHistoryUUID ;
103109 }
104110
111+ public TimeValue getPollTimeout () {
112+ return pollTimeout ;
113+ }
114+
115+ public void setPollTimeout (final TimeValue pollTimeout ) {
116+ this .pollTimeout = Objects .requireNonNull (pollTimeout , "pollTimeout" );
117+ }
118+
105119 @ Override
106120 public ActionRequestValidationException validate () {
107121 ActionRequestValidationException validationException = null ;
@@ -126,6 +140,7 @@ public void readFrom(StreamInput in) throws IOException {
126140 maxOperationCount = in .readVInt ();
127141 shardId = ShardId .readShardId (in );
128142 expectedHistoryUUID = in .readString ();
143+ pollTimeout = in .readTimeValue ();
129144 maxOperationSizeInBytes = in .readVLong ();
130145 }
131146
@@ -136,6 +151,7 @@ public void writeTo(StreamOutput out) throws IOException {
136151 out .writeVInt (maxOperationCount );
137152 shardId .writeTo (out );
138153 out .writeString (expectedHistoryUUID );
154+ out .writeTimeValue (pollTimeout );
139155 out .writeVLong (maxOperationSizeInBytes );
140156 }
141157
@@ -149,12 +165,13 @@ public boolean equals(final Object o) {
149165 maxOperationCount == request .maxOperationCount &&
150166 Objects .equals (shardId , request .shardId ) &&
151167 Objects .equals (expectedHistoryUUID , request .expectedHistoryUUID ) &&
168+ Objects .equals (pollTimeout , request .pollTimeout ) &&
152169 maxOperationSizeInBytes == request .maxOperationSizeInBytes ;
153170 }
154171
155172 @ Override
156173 public int hashCode () {
157- return Objects .hash (fromSeqNo , maxOperationCount , shardId , expectedHistoryUUID , maxOperationSizeInBytes );
174+ return Objects .hash (fromSeqNo , maxOperationCount , shardId , expectedHistoryUUID , pollTimeout , maxOperationSizeInBytes );
158175 }
159176
160177 @ Override
@@ -164,6 +181,7 @@ public String toString() {
164181 ", maxOperationCount=" + maxOperationCount +
165182 ", shardId=" + shardId +
166183 ", expectedHistoryUUID=" + expectedHistoryUUID +
184+ ", pollTimeout=" + pollTimeout +
167185 ", maxOperationSizeInBytes=" + maxOperationSizeInBytes +
168186 '}' ;
169187 }
@@ -265,19 +283,90 @@ public TransportAction(Settings settings,
265283
266284 @ Override
267285 protected Response shardOperation (Request request , ShardId shardId ) throws IOException {
268- IndexService indexService = indicesService .indexServiceSafe (request .getShard ().getIndex ());
269- IndexShard indexShard = indexService .getShard (request .getShard ().id ());
270- final SeqNoStats seqNoStats = indexShard .seqNoStats ();
286+ final IndexService indexService = indicesService .indexServiceSafe (request .getShard ().getIndex ());
287+ final IndexShard indexShard = indexService .getShard (request .getShard ().id ());
288+ final SeqNoStats seqNoStats = indexShard .seqNoStats ();
271289 final long mappingVersion = clusterService .state ().metaData ().index (shardId .getIndex ()).getMappingVersion ();
272290
273291 final Translog .Operation [] operations = getOperations (
274292 indexShard ,
275293 seqNoStats .getGlobalCheckpoint (),
276- request .fromSeqNo ,
277- request .maxOperationCount ,
278- request .expectedHistoryUUID ,
279- request .maxOperationSizeInBytes );
280- return new Response (mappingVersion , seqNoStats .getGlobalCheckpoint (), seqNoStats .getMaxSeqNo (), operations );
294+ request .getFromSeqNo (),
295+ request .getMaxOperationCount (),
296+ request .getExpectedHistoryUUID (),
297+ request .getMaxOperationSizeInBytes ());
298+ return getResponse (mappingVersion , seqNoStats , operations );
299+ }
300+
301+ @ Override
302+ protected void asyncShardOperation (
303+ final Request request ,
304+ final ShardId shardId ,
305+ final ActionListener <Response > listener ) throws IOException {
306+ final IndexService indexService = indicesService .indexServiceSafe (request .getShard ().getIndex ());
307+ final IndexShard indexShard = indexService .getShard (request .getShard ().id ());
308+ final SeqNoStats seqNoStats = indexShard .seqNoStats ();
309+
310+ if (request .getFromSeqNo () > seqNoStats .getGlobalCheckpoint ()) {
311+ logger .trace (
312+ "{} waiting for global checkpoint advancement from [{}] to [{}]" ,
313+ shardId ,
314+ seqNoStats .getGlobalCheckpoint (),
315+ request .getFromSeqNo ());
316+ indexShard .addGlobalCheckpointListener (
317+ request .getFromSeqNo (),
318+ (g , e ) -> {
319+ if (g != UNASSIGNED_SEQ_NO ) {
320+ assert request .getFromSeqNo () <= g
321+ : shardId + " only advanced to [" + g + "] while waiting for [" + request .getFromSeqNo () + "]" ;
322+ globalCheckpointAdvanced (shardId , g , request , listener );
323+ } else {
324+ assert e != null ;
325+ globalCheckpointAdvancementFailure (shardId , e , request , listener , indexShard );
326+ }
327+ },
328+ request .getPollTimeout ());
329+ } else {
330+ super .asyncShardOperation (request , shardId , listener );
331+ }
332+ }
333+
334+ private void globalCheckpointAdvanced (
335+ final ShardId shardId ,
336+ final long globalCheckpoint ,
337+ final Request request ,
338+ final ActionListener <Response > listener ) {
339+ logger .trace ("{} global checkpoint advanced to [{}] after waiting for [{}]" , shardId , globalCheckpoint , request .getFromSeqNo ());
340+ try {
341+ super .asyncShardOperation (request , shardId , listener );
342+ } catch (final IOException caught ) {
343+ listener .onFailure (caught );
344+ }
345+ }
346+
347+ private void globalCheckpointAdvancementFailure (
348+ final ShardId shardId ,
349+ final Exception e ,
350+ final Request request ,
351+ final ActionListener <Response > listener ,
352+ final IndexShard indexShard ) {
353+ logger .trace (
354+ () -> new ParameterizedMessage (
355+ "{} exception waiting for global checkpoint advancement to [{}]" , shardId , request .getFromSeqNo ()),
356+ e );
357+ if (e instanceof TimeoutException ) {
358+ try {
359+ final long mappingVersion =
360+ clusterService .state ().metaData ().index (shardId .getIndex ()).getMappingVersion ();
361+ final SeqNoStats latestSeqNoStats = indexShard .seqNoStats ();
362+ listener .onResponse (getResponse (mappingVersion , latestSeqNoStats , EMPTY_OPERATIONS_ARRAY ));
363+ } catch (final Exception caught ) {
364+ caught .addSuppressed (e );
365+ listener .onFailure (caught );
366+ }
367+ } else {
368+ listener .onFailure (e );
369+ }
281370 }
282371
283372 @ Override
@@ -300,7 +389,7 @@ protected Response newResponse() {
300389
301390 }
302391
303- private static final Translog .Operation [] EMPTY_OPERATIONS_ARRAY = new Translog .Operation [0 ];
392+ static final Translog .Operation [] EMPTY_OPERATIONS_ARRAY = new Translog .Operation [0 ];
304393
305394 /**
306395 * Returns at most maxOperationCount operations from the specified from sequence number.
@@ -324,7 +413,8 @@ static Translog.Operation[] getOperations(IndexShard indexShard,
324413 historyUUID + "]" );
325414 }
326415 if (fromSeqNo > globalCheckpoint ) {
327- return EMPTY_OPERATIONS_ARRAY ;
416+ throw new IllegalStateException (
417+ "not exposing operations from [" + fromSeqNo + "] greater than the global checkpoint [" + globalCheckpoint + "]" );
328418 }
329419 int seenBytes = 0 ;
330420 // - 1 is needed, because toSeqNo is inclusive
@@ -344,4 +434,8 @@ static Translog.Operation[] getOperations(IndexShard indexShard,
344434 return operations .toArray (EMPTY_OPERATIONS_ARRAY );
345435 }
346436
437+ static Response getResponse (final long mappingVersion , final SeqNoStats seqNoStats , final Translog .Operation [] operations ) {
438+ return new Response (mappingVersion , seqNoStats .getGlobalCheckpoint (), seqNoStats .getMaxSeqNo (), operations );
439+ }
440+
347441}
0 commit comments