2121import org .apache .logging .log4j .message .ParameterizedMessage ;
2222import org .apache .logging .log4j .util .Supplier ;
2323import org .elasticsearch .ElasticsearchException ;
24- import org .elasticsearch .Version ;
2524import org .elasticsearch .action .ActionListener ;
2625import org .elasticsearch .action .admin .indices .flush .FlushRequest ;
2726import org .elasticsearch .action .admin .indices .flush .SyncedFlushResponse ;
4544import org .elasticsearch .index .Index ;
4645import org .elasticsearch .index .IndexNotFoundException ;
4746import org .elasticsearch .index .IndexService ;
48- import org .elasticsearch .index .engine .CommitStats ;
4947import org .elasticsearch .index .engine .Engine ;
5048import org .elasticsearch .index .shard .IndexEventListener ;
5149import org .elasticsearch .index .shard .IndexShard ;
@@ -201,10 +199,10 @@ private void innerAttemptSyncedFlush(final ShardId shardId, final ClusterState s
201199 return ;
202200 }
203201
204- final ActionListener <Map <String , PreSyncedFlushResponse >> presyncListener = new ActionListener <Map <String , PreSyncedFlushResponse >>() {
202+ final ActionListener <Map <String , Engine . CommitId >> commitIdsListener = new ActionListener <Map <String , Engine . CommitId >>() {
205203 @ Override
206- public void onResponse (final Map <String , PreSyncedFlushResponse > presyncResponses ) {
207- if (presyncResponses .isEmpty ()) {
204+ public void onResponse (final Map <String , Engine . CommitId > commitIds ) {
205+ if (commitIds .isEmpty ()) {
208206 actionListener .onResponse (new ShardsSyncedFlushResult (shardId , totalShards , "all shards failed to commit on pre-sync" ));
209207 return ;
210208 }
@@ -218,7 +216,7 @@ public void onResponse(InFlightOpsResponse response) {
218216 } else {
219217 // 3. now send the sync request to all the shards
220218 String syncId = UUIDs .randomBase64UUID ();
221- sendSyncRequests (syncId , activeShards , state , presyncResponses , shardId , totalShards , actionListener );
219+ sendSyncRequests (syncId , activeShards , state , commitIds , shardId , totalShards , actionListener );
222220 }
223221 }
224222
@@ -238,7 +236,7 @@ public void onFailure(Exception e) {
238236 };
239237
240238 // 1. send pre-sync flushes to all replicas
241- sendPreSyncRequests (activeShards , state , shardId , presyncListener );
239+ sendPreSyncRequests (activeShards , state , shardId , commitIdsListener );
242240 } catch (Exception e ) {
243241 actionListener .onFailure (e );
244242 }
@@ -301,49 +299,28 @@ public String executor() {
301299 }
302300 }
303301
304- private int numDocsOnPrimary (List <ShardRouting > shards , Map <String , PreSyncedFlushResponse > preSyncResponses ) {
305- for (ShardRouting shard : shards ) {
306- if (shard .primary ()) {
307- final PreSyncedFlushResponse resp = preSyncResponses .get (shard .currentNodeId ());
308- if (resp != null ) {
309- return resp .numDocs ;
310- }
311- }
312- }
313- return PreSyncedFlushResponse .UNKNOWN_NUM_DOCS ;
314- }
315302
316- void sendSyncRequests (final String syncId , final List <ShardRouting > shards , ClusterState state , Map <String , PreSyncedFlushResponse > preSyncResponses ,
303+ void sendSyncRequests (final String syncId , final List <ShardRouting > shards , ClusterState state , Map <String , Engine . CommitId > expectedCommitIds ,
317304 final ShardId shardId , final int totalShards , final ActionListener <ShardsSyncedFlushResult > listener ) {
318305 final CountDown countDown = new CountDown (shards .size ());
319306 final Map <ShardRouting , ShardSyncedFlushResponse > results = ConcurrentCollections .newConcurrentMap ();
320- final int numDocsOnPrimary = numDocsOnPrimary (shards , preSyncResponses );
321307 for (final ShardRouting shard : shards ) {
322308 final DiscoveryNode node = state .nodes ().get (shard .currentNodeId ());
323309 if (node == null ) {
324310 logger .trace ("{} is assigned to an unknown node. skipping for sync id [{}]. shard routing {}" , shardId , syncId , shard );
325311 results .put (shard , new ShardSyncedFlushResponse ("unknown node" ));
326- countDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
312+ contDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
327313 continue ;
328314 }
329- final PreSyncedFlushResponse preSyncedResponse = preSyncResponses .get (shard .currentNodeId ());
330- if (preSyncedResponse == null ) {
315+ final Engine . CommitId expectedCommitId = expectedCommitIds .get (shard .currentNodeId ());
316+ if (expectedCommitId == null ) {
331317 logger .trace ("{} can't resolve expected commit id for current node, skipping for sync id [{}]. shard routing {}" , shardId , syncId , shard );
332318 results .put (shard , new ShardSyncedFlushResponse ("no commit id from pre-sync flush" ));
333- countDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
334- continue ;
335- }
336- if (preSyncedResponse .numDocs != numDocsOnPrimary
337- && preSyncedResponse .numDocs != PreSyncedFlushResponse .UNKNOWN_NUM_DOCS && numDocsOnPrimary != PreSyncedFlushResponse .UNKNOWN_NUM_DOCS ) {
338- logger .warn ("{} can't to issue sync id [{}] for out of sync replica [{}] with num docs [{}]; num docs on primary [{}]" ,
339- shardId , syncId , shard , preSyncedResponse .numDocs , numDocsOnPrimary );
340- results .put (shard , new ShardSyncedFlushResponse ("out of sync replica; " +
341- "num docs on replica [" + preSyncedResponse .numDocs + "]; num docs on primary [" + numDocsOnPrimary + "]" ));
342- countDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
319+ contDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
343320 continue ;
344321 }
345322 logger .trace ("{} sending synced flush request to {}. sync id [{}]." , shardId , shard , syncId );
346- transportService .sendRequest (node , SYNCED_FLUSH_ACTION_NAME , new ShardSyncedFlushRequest (shard .shardId (), syncId , preSyncedResponse . commitId ),
323+ transportService .sendRequest (node , SYNCED_FLUSH_ACTION_NAME , new ShardSyncedFlushRequest (shard .shardId (), syncId , expectedCommitId ),
347324 new TransportResponseHandler <ShardSyncedFlushResponse >() {
348325 @ Override
349326 public ShardSyncedFlushResponse newInstance () {
@@ -355,14 +332,14 @@ public void handleResponse(ShardSyncedFlushResponse response) {
355332 ShardSyncedFlushResponse existing = results .put (shard , response );
356333 assert existing == null : "got two answers for node [" + node + "]" ;
357334 // count after the assert so we won't decrement twice in handleException
358- countDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
335+ contDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
359336 }
360337
361338 @ Override
362339 public void handleException (TransportException exp ) {
363340 logger .trace ((Supplier <?>) () -> new ParameterizedMessage ("{} error while performing synced flush on [{}], skipping" , shardId , shard ), exp );
364341 results .put (shard , new ShardSyncedFlushResponse (exp .getMessage ()));
365- countDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
342+ contDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
366343 }
367344
368345 @ Override
@@ -374,8 +351,8 @@ public String executor() {
374351
375352 }
376353
377- private void countDownAndSendResponseIfDone (String syncId , List <ShardRouting > shards , ShardId shardId , int totalShards ,
378- ActionListener <ShardsSyncedFlushResult > listener , CountDown countDown , Map <ShardRouting , ShardSyncedFlushResponse > results ) {
354+ private void contDownAndSendResponseIfDone (String syncId , List <ShardRouting > shards , ShardId shardId , int totalShards ,
355+ ActionListener <ShardsSyncedFlushResult > listener , CountDown countDown , Map <ShardRouting , ShardSyncedFlushResponse > results ) {
379356 if (countDown .countDown ()) {
380357 assert results .size () == shards .size ();
381358 listener .onResponse (new ShardsSyncedFlushResult (shardId , syncId , totalShards , results ));
@@ -385,16 +362,16 @@ private void countDownAndSendResponseIfDone(String syncId, List<ShardRouting> sh
385362 /**
386363 * send presync requests to all started copies of the given shard
387364 */
388- void sendPreSyncRequests (final List <ShardRouting > shards , final ClusterState state , final ShardId shardId , final ActionListener <Map <String , PreSyncedFlushResponse >> listener ) {
365+ void sendPreSyncRequests (final List <ShardRouting > shards , final ClusterState state , final ShardId shardId , final ActionListener <Map <String , Engine . CommitId >> listener ) {
389366 final CountDown countDown = new CountDown (shards .size ());
390- final ConcurrentMap <String , PreSyncedFlushResponse > presyncResponses = ConcurrentCollections .newConcurrentMap ();
367+ final ConcurrentMap <String , Engine . CommitId > commitIds = ConcurrentCollections .newConcurrentMap ();
391368 for (final ShardRouting shard : shards ) {
392369 logger .trace ("{} sending pre-synced flush request to {}" , shardId , shard );
393370 final DiscoveryNode node = state .nodes ().get (shard .currentNodeId ());
394371 if (node == null ) {
395372 logger .trace ("{} shard routing {} refers to an unknown node. skipping." , shardId , shard );
396373 if (countDown .countDown ()) {
397- listener .onResponse (presyncResponses );
374+ listener .onResponse (commitIds );
398375 }
399376 continue ;
400377 }
@@ -406,19 +383,19 @@ public PreSyncedFlushResponse newInstance() {
406383
407384 @ Override
408385 public void handleResponse (PreSyncedFlushResponse response ) {
409- PreSyncedFlushResponse existing = presyncResponses .putIfAbsent (node .getId (), response );
386+ Engine . CommitId existing = commitIds .putIfAbsent (node .getId (), response . commitId () );
410387 assert existing == null : "got two answers for node [" + node + "]" ;
411388 // count after the assert so we won't decrement twice in handleException
412389 if (countDown .countDown ()) {
413- listener .onResponse (presyncResponses );
390+ listener .onResponse (commitIds );
414391 }
415392 }
416393
417394 @ Override
418395 public void handleException (TransportException exp ) {
419396 logger .trace ((Supplier <?>) () -> new ParameterizedMessage ("{} error while performing pre synced flush on [{}], skipping" , shardId , shard ), exp );
420397 if (countDown .countDown ()) {
421- listener .onResponse (presyncResponses );
398+ listener .onResponse (commitIds );
422399 }
423400 }
424401
@@ -434,11 +411,9 @@ private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest
434411 IndexShard indexShard = indicesService .indexServiceSafe (request .shardId ().getIndex ()).getShard (request .shardId ().id ());
435412 FlushRequest flushRequest = new FlushRequest ().force (false ).waitIfOngoing (true );
436413 logger .trace ("{} performing pre sync flush" , request .shardId ());
437- indexShard .flush (flushRequest );
438- final CommitStats commitStats = indexShard .commitStats ();
439- final Engine .CommitId commitId = commitStats .getRawCommitId ();
440- logger .trace ("{} pre sync flush done. commit id {}, num docs {}" , request .shardId (), commitId , commitStats .getNumDocs ());
441- return new PreSyncedFlushResponse (commitId , commitStats .getNumDocs ());
414+ Engine .CommitId commitId = indexShard .flush (flushRequest );
415+ logger .trace ("{} pre sync flush done. commit id {}" , request .shardId (), commitId );
416+ return new PreSyncedFlushResponse (commitId );
442417 }
443418
444419 private ShardSyncedFlushResponse performSyncedFlush (ShardSyncedFlushRequest request ) {
@@ -508,45 +483,30 @@ public ShardId shardId() {
508483 * Response for first step of synced flush (flush) for one shard copy
509484 */
510485 static final class PreSyncedFlushResponse extends TransportResponse {
511- static final int UNKNOWN_NUM_DOCS = -1 ;
512486
513487 Engine .CommitId commitId ;
514- int numDocs ;
515488
516489 PreSyncedFlushResponse () {
517490 }
518491
519- PreSyncedFlushResponse (Engine .CommitId commitId , int numDocs ) {
492+ PreSyncedFlushResponse (Engine .CommitId commitId ) {
520493 this .commitId = commitId ;
521- this .numDocs = numDocs ;
522494 }
523495
524- Engine .CommitId commitId () {
496+ public Engine .CommitId commitId () {
525497 return commitId ;
526498 }
527499
528- int numDocs () {
529- return numDocs ;
530- }
531-
532500 @ Override
533501 public void readFrom (StreamInput in ) throws IOException {
534502 super .readFrom (in );
535503 commitId = new Engine .CommitId (in );
536- if (in .getVersion ().onOrAfter (Version .V_5_6_8 )) {
537- numDocs = in .readInt ();
538- } else {
539- numDocs = UNKNOWN_NUM_DOCS ;
540- }
541504 }
542505
543506 @ Override
544507 public void writeTo (StreamOutput out ) throws IOException {
545508 super .writeTo (out );
546509 commitId .writeTo (out );
547- if (out .getVersion ().onOrAfter (Version .V_5_6_8 )) {
548- out .writeInt (numDocs );
549- }
550510 }
551511 }
552512
0 commit comments