5151import org .elasticsearch .cluster .routing .AllocationId ;
5252import org .elasticsearch .common .CheckedFunction ;
5353import org .elasticsearch .common .Nullable ;
54+ import org .elasticsearch .common .Randomness ;
5455import org .elasticsearch .common .Strings ;
5556import org .elasticsearch .common .bytes .BytesArray ;
5657import org .elasticsearch .common .bytes .BytesReference ;
@@ -704,6 +705,32 @@ public static List<Engine.Operation> generateSingleDocHistory(boolean forReplica
704705 return ops ;
705706 }
706707
708+ public List <Engine .Operation > generateReplicaHistory (int numOps , boolean allowGapInSeqNo ) {
709+ long seqNo = 0 ;
710+ List <Engine .Operation > operations = new ArrayList <>(numOps );
711+ for (int i = 0 ; i < numOps ; i ++) {
712+ String id = Integer .toString (between (1 , 100 ));
713+ final ParsedDocument doc = EngineTestCase .createParsedDoc (id , null );
714+ if (randomBoolean ()) {
715+ operations .add (new Engine .Index (EngineTestCase .newUid (doc ), doc , seqNo , primaryTerm .get (),
716+ i , null , Engine .Operation .Origin .REPLICA , threadPool .relativeTimeInMillis (),
717+ -1 , true ));
718+ } else if (randomBoolean ()) {
719+ operations .add (new Engine .Delete (doc .type (), doc .id (), EngineTestCase .newUid (doc ), seqNo , primaryTerm .get (),
720+ i , null , Engine .Operation .Origin .REPLICA , threadPool .relativeTimeInMillis ()));
721+ } else {
722+ operations .add (new Engine .NoOp (seqNo , primaryTerm .get (), Engine .Operation .Origin .REPLICA ,
723+ threadPool .relativeTimeInMillis (), "test-" + i ));
724+ }
725+ seqNo ++;
726+ if (allowGapInSeqNo && rarely ()) {
727+ seqNo ++;
728+ }
729+ }
730+ Randomness .shuffle (operations );
731+ return operations ;
732+ }
733+
707734 public static void assertOpsOnReplica (
708735 final List <Engine .Operation > ops ,
709736 final InternalEngine replicaEngine ,
@@ -788,14 +815,7 @@ public static void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngi
788815 int docOffset ;
789816 while ((docOffset = offset .incrementAndGet ()) < ops .size ()) {
790817 try {
791- final Engine .Operation op = ops .get (docOffset );
792- if (op instanceof Engine .Index ) {
793- engine .index ((Engine .Index ) op );
794- } else if (op instanceof Engine .Delete ){
795- engine .delete ((Engine .Delete ) op );
796- } else {
797- engine .noOp ((Engine .NoOp ) op );
798- }
818+ applyOperation (engine , ops .get (docOffset ));
799819 if ((docOffset + 1 ) % 4 == 0 ) {
800820 engine .refresh ("test" );
801821 }
@@ -814,6 +834,36 @@ public static void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngi
814834 }
815835 }
816836
837+ public static void applyOperations (Engine engine , List <Engine .Operation > operations ) throws IOException {
838+ for (Engine .Operation operation : operations ) {
839+ applyOperation (engine , operation );
840+ if (randomInt (100 ) < 10 ) {
841+ engine .refresh ("test" );
842+ }
843+ if (rarely ()) {
844+ engine .flush ();
845+ }
846+ }
847+ }
848+
849+ public static Engine .Result applyOperation (Engine engine , Engine .Operation operation ) throws IOException {
850+ final Engine .Result result ;
851+ switch (operation .operationType ()) {
852+ case INDEX :
853+ result = engine .index ((Engine .Index ) operation );
854+ break ;
855+ case DELETE :
856+ result = engine .delete ((Engine .Delete ) operation );
857+ break ;
858+ case NO_OP :
859+ result = engine .noOp ((Engine .NoOp ) operation );
860+ break ;
861+ default :
862+ throw new IllegalStateException ("No operation defined for [" + operation + "]" );
863+ }
864+ return result ;
865+ }
866+
817867 /**
818868 * Gets a collection of tuples of docId, sequence number, and primary term of all live documents in the provided engine.
819869 */
0 commit comments