133133import java .util .Collections ;
134134import java .util .Comparator ;
135135import java .util .HashSet ;
136+ import java .util .Iterator ;
136137import java .util .LinkedHashMap ;
137138import java .util .List ;
138139import java .util .Map ;
@@ -1385,18 +1386,13 @@ public void testVersioningCreateExistsException() throws IOException {
13851386 }
13861387
13871388 protected List <Engine .Operation > generateSingleDocHistory (boolean forReplica , VersionType versionType ,
1388- boolean partialOldPrimary , long primaryTerm ,
1389- int minOpCount , int maxOpCount ) {
1389+ long primaryTerm ,
1390+ int minOpCount , int maxOpCount , String docId ) {
13901391 final int numOfOps = randomIntBetween (minOpCount , maxOpCount );
13911392 final List <Engine .Operation > ops = new ArrayList <>();
1392- final Term id = newUid ("1" );
1393- final int startWithSeqNo ;
1394- if (partialOldPrimary ) {
1395- startWithSeqNo = randomBoolean () ? numOfOps - 1 : randomIntBetween (0 , numOfOps - 1 );
1396- } else {
1397- startWithSeqNo = 0 ;
1398- }
1399- final String valuePrefix = forReplica ? "r_" : "p_" ;
1393+ final Term id = newUid (docId );
1394+ final int startWithSeqNo = 0 ;
1395+ final String valuePrefix = (forReplica ? "r_" : "p_" ) + docId + "_" ;
14001396 final boolean incrementTermWhenIntroducingSeqNo = randomBoolean ();
14011397 for (int i = 0 ; i < numOfOps ; i ++) {
14021398 final Engine .Operation op ;
@@ -1418,7 +1414,7 @@ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, Ve
14181414 throw new UnsupportedOperationException ("unknown version type: " + versionType );
14191415 }
14201416 if (randomBoolean ()) {
1421- op = new Engine .Index (id , testParsedDocument ("1" , null , testDocumentWithTextField (valuePrefix + i ), B_1 , null ),
1417+ op = new Engine .Index (id , testParsedDocument (docId , null , testDocumentWithTextField (valuePrefix + i ), B_1 , null ),
14221418 forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers .UNASSIGNED_SEQ_NO ,
14231419 forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm ,
14241420 version ,
@@ -1427,7 +1423,7 @@ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, Ve
14271423 System .currentTimeMillis (), -1 , false
14281424 );
14291425 } else {
1430- op = new Engine .Delete ("test" , "1" , id ,
1426+ op = new Engine .Delete ("test" , docId , id ,
14311427 forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers .UNASSIGNED_SEQ_NO ,
14321428 forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm ,
14331429 version ,
@@ -1442,7 +1438,7 @@ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, Ve
14421438
14431439 public void testOutOfOrderDocsOnReplica () throws IOException {
14441440 final List <Engine .Operation > ops = generateSingleDocHistory (true ,
1445- randomFrom (VersionType .INTERNAL , VersionType .EXTERNAL , VersionType .EXTERNAL_GTE , VersionType .FORCE ), false , 2 , 2 , 20 );
1441+ randomFrom (VersionType .INTERNAL , VersionType .EXTERNAL , VersionType .EXTERNAL_GTE , VersionType .FORCE ), 2 , 2 , 20 , "1" );
14461442 assertOpsOnReplica (ops , replicaEngine , true );
14471443 }
14481444
@@ -1511,28 +1507,83 @@ private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine repli
15111507 }
15121508 }
15131509
1514- public void testConcurrentOutOfDocsOnReplica () throws IOException , InterruptedException {
1515- final List <Engine .Operation > ops = generateSingleDocHistory (true , randomFrom (VersionType .INTERNAL , VersionType .EXTERNAL ), false , 2 , 100 , 300 );
1516- final Engine .Operation lastOp = ops .get (ops .size () - 1 );
1517- final String lastFieldValue ;
1518- if (lastOp instanceof Engine .Index ) {
1519- Engine .Index index = (Engine .Index ) lastOp ;
1520- lastFieldValue = index .docs ().get (0 ).get ("value" );
1510+ public void testConcurrentOutOfOrderDocsOnReplica () throws IOException , InterruptedException {
1511+ final List <Engine .Operation > opsDoc1 =
1512+ generateSingleDocHistory (true , randomFrom (VersionType .INTERNAL , VersionType .EXTERNAL ), 2 , 100 , 300 , "1" );
1513+ final Engine .Operation lastOpDoc1 = opsDoc1 .get (opsDoc1 .size () - 1 );
1514+ final String lastFieldValueDoc1 ;
1515+ if (lastOpDoc1 instanceof Engine .Index ) {
1516+ Engine .Index index = (Engine .Index ) lastOpDoc1 ;
1517+ lastFieldValueDoc1 = index .docs ().get (0 ).get ("value" );
15211518 } else {
15221519 // delete
1523- lastFieldValue = null ;
1520+ lastFieldValueDoc1 = null ;
1521+ }
1522+ final List <Engine .Operation > opsDoc2 =
1523+ generateSingleDocHistory (true , randomFrom (VersionType .INTERNAL , VersionType .EXTERNAL ), 2 , 100 , 300 , "2" );
1524+ final Engine .Operation lastOpDoc2 = opsDoc2 .get (opsDoc2 .size () - 1 );
1525+ final String lastFieldValueDoc2 ;
1526+ if (lastOpDoc2 instanceof Engine .Index ) {
1527+ Engine .Index index = (Engine .Index ) lastOpDoc2 ;
1528+ lastFieldValueDoc2 = index .docs ().get (0 ).get ("value" );
1529+ } else {
1530+ // delete
1531+ lastFieldValueDoc2 = null ;
15241532 }
1525- shuffle (ops , random ());
1526- concurrentlyApplyOps (ops , engine );
1533+ // randomly interleave
1534+ final AtomicLong seqNoGenerator = new AtomicLong ();
1535+ Function <Engine .Operation , Engine .Operation > seqNoUpdater = operation -> {
1536+ final long newSeqNo = seqNoGenerator .getAndIncrement ();
1537+ if (operation instanceof Engine .Index ) {
1538+ Engine .Index index = (Engine .Index ) operation ;
1539+ return new Engine .Index (index .uid (), index .parsedDoc (), newSeqNo , index .primaryTerm (), index .version (),
1540+ index .versionType (), index .origin (), index .startTime (), index .getAutoGeneratedIdTimestamp (), index .isRetry ());
1541+ } else {
1542+ Engine .Delete delete = (Engine .Delete ) operation ;
1543+ return new Engine .Delete (delete .type (), delete .id (), delete .uid (), newSeqNo , delete .primaryTerm (),
1544+ delete .version (), delete .versionType (), delete .origin (), delete .startTime ());
1545+ }
1546+ };
1547+ final List <Engine .Operation > allOps = new ArrayList <>();
1548+ Iterator <Engine .Operation > iter1 = opsDoc1 .iterator ();
1549+ Iterator <Engine .Operation > iter2 = opsDoc2 .iterator ();
1550+ while (iter1 .hasNext () && iter2 .hasNext ()) {
1551+ final Engine .Operation next = randomBoolean () ? iter1 .next () : iter2 .next ();
1552+ allOps .add (seqNoUpdater .apply (next ));
1553+ }
1554+ iter1 .forEachRemaining (o -> allOps .add (seqNoUpdater .apply (o )));
1555+ iter2 .forEachRemaining (o -> allOps .add (seqNoUpdater .apply (o )));
1556+ // insert some duplicates
1557+ allOps .addAll (randomSubsetOf (allOps ));
15271558
1528- assertVisibleCount (engine , lastFieldValue == null ? 0 : 1 );
1529- if (lastFieldValue != null ) {
1559+ shuffle (allOps , random ());
1560+ concurrentlyApplyOps (allOps , engine );
1561+
1562+ engine .refresh ("test" );
1563+
1564+ if (lastFieldValueDoc1 != null ) {
15301565 try (Searcher searcher = engine .acquireSearcher ("test" )) {
15311566 final TotalHitCountCollector collector = new TotalHitCountCollector ();
1532- searcher .searcher ().search (new TermQuery (new Term ("value" , lastFieldValue )), collector );
1567+ searcher .searcher ().search (new TermQuery (new Term ("value" , lastFieldValueDoc1 )), collector );
15331568 assertThat (collector .getTotalHits (), equalTo (1 ));
15341569 }
15351570 }
1571+ if (lastFieldValueDoc2 != null ) {
1572+ try (Searcher searcher = engine .acquireSearcher ("test" )) {
1573+ final TotalHitCountCollector collector = new TotalHitCountCollector ();
1574+ searcher .searcher ().search (new TermQuery (new Term ("value" , lastFieldValueDoc2 )), collector );
1575+ assertThat (collector .getTotalHits (), equalTo (1 ));
1576+ }
1577+ }
1578+
1579+ int totalExpectedOps = 0 ;
1580+ if (lastFieldValueDoc1 != null ) {
1581+ totalExpectedOps ++;
1582+ }
1583+ if (lastFieldValueDoc2 != null ) {
1584+ totalExpectedOps ++;
1585+ }
1586+ assertVisibleCount (engine , totalExpectedOps );
15361587 }
15371588
15381589 private void concurrentlyApplyOps (List <Engine .Operation > ops , InternalEngine engine ) throws InterruptedException {
@@ -1572,12 +1623,12 @@ private void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine eng
15721623 }
15731624
15741625 public void testInternalVersioningOnPrimary () throws IOException {
1575- final List <Engine .Operation > ops = generateSingleDocHistory (false , VersionType .INTERNAL , false , 2 , 2 , 20 );
1626+ final List <Engine .Operation > ops = generateSingleDocHistory (false , VersionType .INTERNAL , 2 , 2 , 20 , "1" );
15761627 assertOpsOnPrimary (ops , Versions .NOT_FOUND , true , engine );
15771628 }
15781629
15791630 public void testVersionOnPrimaryWithConcurrentRefresh () throws Exception {
1580- List <Engine .Operation > ops = generateSingleDocHistory (false , VersionType .INTERNAL , false , 2 , 10 , 100 );
1631+ List <Engine .Operation > ops = generateSingleDocHistory (false , VersionType .INTERNAL , 2 , 10 , 100 , "1" );
15811632 CountDownLatch latch = new CountDownLatch (1 );
15821633 AtomicBoolean running = new AtomicBoolean (true );
15831634 Thread refreshThread = new Thread (() -> {
@@ -1697,7 +1748,7 @@ public void testNonInternalVersioningOnPrimary() throws IOException {
16971748 final Set <VersionType > nonInternalVersioning = new HashSet <>(Arrays .asList (VersionType .values ()));
16981749 nonInternalVersioning .remove (VersionType .INTERNAL );
16991750 final VersionType versionType = randomFrom (nonInternalVersioning );
1700- final List <Engine .Operation > ops = generateSingleDocHistory (false , versionType , false , 2 , 2 , 20 );
1751+ final List <Engine .Operation > ops = generateSingleDocHistory (false , versionType , 2 , 2 , 20 , "1" );
17011752 final Engine .Operation lastOp = ops .get (ops .size () - 1 );
17021753 final String lastFieldValue ;
17031754 if (lastOp instanceof Engine .Index ) {
@@ -1775,8 +1826,8 @@ public void testNonInternalVersioningOnPrimary() throws IOException {
17751826 }
17761827
17771828 public void testVersioningPromotedReplica () throws IOException {
1778- final List <Engine .Operation > replicaOps = generateSingleDocHistory (true , VersionType .INTERNAL , false , 1 , 2 , 20 );
1779- List <Engine .Operation > primaryOps = generateSingleDocHistory (false , VersionType .INTERNAL , false , 2 , 2 , 20 );
1829+ final List <Engine .Operation > replicaOps = generateSingleDocHistory (true , VersionType .INTERNAL , 1 , 2 , 20 , "1" );
1830+ List <Engine .Operation > primaryOps = generateSingleDocHistory (false , VersionType .INTERNAL , 2 , 2 , 20 , "1" );
17801831 Engine .Operation lastReplicaOp = replicaOps .get (replicaOps .size () - 1 );
17811832 final boolean deletedOnReplica = lastReplicaOp instanceof Engine .Delete ;
17821833 final long finalReplicaVersion = lastReplicaOp .version ();
@@ -1796,7 +1847,7 @@ public void testVersioningPromotedReplica() throws IOException {
17961847 }
17971848
17981849 public void testConcurrentExternalVersioningOnPrimary () throws IOException , InterruptedException {
1799- final List <Engine .Operation > ops = generateSingleDocHistory (false , VersionType .EXTERNAL , false , 2 , 100 , 300 );
1850+ final List <Engine .Operation > ops = generateSingleDocHistory (false , VersionType .EXTERNAL , 2 , 100 , 300 , "1" );
18001851 final Engine .Operation lastOp = ops .get (ops .size () - 1 );
18011852 final String lastFieldValue ;
18021853 if (lastOp instanceof Engine .Index ) {
0 commit comments