3939import org .elasticsearch .common .Randomness ;
4040import org .elasticsearch .common .bytes .BytesArray ;
4141import org .elasticsearch .common .bytes .BytesReference ;
42+ import org .elasticsearch .common .collect .Tuple ;
4243import org .elasticsearch .common .io .FileSystemUtils ;
4344import org .elasticsearch .common .io .stream .BytesStreamOutput ;
4445import org .elasticsearch .common .io .stream .StreamInput ;
@@ -2185,32 +2186,38 @@ public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException
21852186 public void testMinGenerationForSeqNo () throws IOException {
21862187 final long initialGeneration = translog .getGeneration ().translogFileGeneration ;
21872188 final int operations = randomIntBetween (1 , 4096 );
2188- final List <Long > seqNos = LongStream .range (0 , operations ).boxed ().collect (Collectors .toList ());
2189- Randomness .shuffle (seqNos );
2190-
2191- for (int i = 0 ; i < operations ; i ++) {
2192- final Long seqNo = seqNos .get (i );
2193- translog .add (new Translog .NoOp (seqNo , 0 , "test" ));
2189+ final List <Long > shuffledSeqNos = LongStream .range (0 , operations ).boxed ().collect (Collectors .toList ());
2190+ Randomness .shuffle (shuffledSeqNos );
2191+ final List <Tuple <Long , Long >> seqNos = new ArrayList <>();
2192+ final Map <Long , Long > terms = new HashMap <>();
2193+ for (final Long seqNo : shuffledSeqNos ) {
2194+ seqNos .add (Tuple .tuple (seqNo , terms .computeIfAbsent (seqNo , k -> 0L )));
2195+ Long repeatingTermSeqNo = randomFrom (seqNos .stream ().map (Tuple ::v1 ).collect (Collectors .toList ()));
2196+ seqNos .add (Tuple .tuple (repeatingTermSeqNo , terms .computeIfPresent (repeatingTermSeqNo , (s , t ) -> t + 1 )));
2197+ }
2198+
2199+ for (final Tuple <Long , Long > tuple : seqNos ) {
2200+ translog .add (new Translog .NoOp (tuple .v1 (), tuple .v2 (), "test" ));
21942201 if (rarely ()) {
21952202 translog .rollGeneration ();
21962203 }
21972204 }
21982205
2199- Map <Long , Set <Long >> generations = new HashMap <>();
2206+ Map <Long , Set <Tuple < Long , Long > >> generations = new HashMap <>();
22002207
22012208 translog .commit (initialGeneration );
22022209 for (long seqNo = 0 ; seqNo < operations ; seqNo ++) {
2203- final Set <Long > seenSeqNos = new HashSet <>();
2210+ final Set <Tuple < Long , Long > > seenSeqNos = new HashSet <>();
22042211 final long generation = translog .getMinGenerationForSeqNo (seqNo ).translogFileGeneration ;
22052212 for (long g = generation ; g < translog .currentFileGeneration (); g ++) {
22062213 if (!generations .containsKey (g )) {
2207- final Set <Long > generationSeenSeqNos = new HashSet <>();
2214+ final Set <Tuple < Long , Long > > generationSeenSeqNos = new HashSet <>();
22082215 final Checkpoint checkpoint = Checkpoint .read (translog .location ().resolve (Translog .getCommitCheckpointFileName (g )));
22092216 try (TranslogReader reader = translog .openReader (translog .location ().resolve (Translog .getFilename (g )), checkpoint )) {
22102217 Translog .Snapshot snapshot = reader .newSnapshot ();
22112218 Translog .Operation operation ;
22122219 while ((operation = snapshot .next ()) != null ) {
2213- generationSeenSeqNos .add (operation .seqNo ());
2220+ generationSeenSeqNos .add (Tuple . tuple ( operation .seqNo (), operation . primaryTerm () ));
22142221 }
22152222 }
22162223 generations .put (g , generationSeenSeqNos );
@@ -2219,7 +2226,8 @@ public void testMinGenerationForSeqNo() throws IOException {
22192226 seenSeqNos .addAll (generations .get (g ));
22202227 }
22212228
2222- final Set <Long > expected = LongStream .range (seqNo , operations ).boxed ().collect (Collectors .toSet ());
2229+ final long seqNoLowerBound = seqNo ;
2230+ final Set <Tuple <Long , Long >> expected = seqNos .stream ().filter (t -> t .v1 () >= seqNoLowerBound ).collect (Collectors .toSet ());
22232231 seenSeqNos .retainAll (expected );
22242232 assertThat (seenSeqNos , equalTo (expected ));
22252233 }
0 commit comments