7676import org .elasticsearch .test .DummyShardLock ;
7777import org .elasticsearch .test .ESTestCase ;
7878import org .elasticsearch .test .IndexSettingsModule ;
79- import org .mockito .ArgumentCaptor ;
8079
8180import java .io .IOException ;
8281import java .io .OutputStream ;
108107import static org .mockito .Matchers .anyString ;
109108import static org .mockito .Mockito .doAnswer ;
110109import static org .mockito .Mockito .mock ;
111- import static org .mockito .Mockito .verify ;
112110import static org .mockito .Mockito .when ;
113111
114112public class RecoverySourceHandlerTests extends ESTestCase {
@@ -205,9 +203,6 @@ public void testSendSnapshotSendsOps() throws IOException {
205203 final StartRecoveryRequest request = getStartRecoveryRequest ();
206204 final IndexShard shard = mock (IndexShard .class );
207205 when (shard .state ()).thenReturn (IndexShardState .STARTED );
208- final RecoveryTargetHandler recoveryTarget = mock (RecoveryTargetHandler .class );
209- final RecoverySourceHandler handler =
210- new RecoverySourceHandler (shard , recoveryTarget , request , fileChunkSizeInBytes , between (1 , 10 ));
211206 final List <Translog .Operation > operations = new ArrayList <>();
212207 final int initialNumberOfDocs = randomIntBetween (16 , 64 );
213208 for (int i = 0 ; i < initialNumberOfDocs ; i ++) {
@@ -219,38 +214,23 @@ public void testSendSnapshotSendsOps() throws IOException {
219214 final Engine .Index index = getIndex (Integer .toString (i ));
220215 operations .add (new Translog .Index (index , new Engine .IndexResult (1 , 1 , i - initialNumberOfDocs , true )));
221216 }
222- operations .add (null );
223217 final long startingSeqNo = randomIntBetween (0 , numberOfDocsWithValidSequenceNumbers - 1 );
224218 final long requiredStartingSeqNo = randomIntBetween ((int ) startingSeqNo , numberOfDocsWithValidSequenceNumbers - 1 );
225219 final long endingSeqNo = randomIntBetween ((int ) requiredStartingSeqNo - 1 , numberOfDocsWithValidSequenceNumbers - 1 );
226- RecoverySourceHandler .SendSnapshotResult result = handler .phase2 (startingSeqNo , requiredStartingSeqNo ,
227- endingSeqNo , new Translog .Snapshot () {
228- @ Override
229- public void close () {
230-
231- }
232-
233- private int counter = 0 ;
234220
235- @ Override
236- public int totalOperations () {
237- return operations .size () - 1 ;
238- }
239-
240- @ Override
241- public Translog .Operation next () throws IOException {
242- return operations .get (counter ++);
243- }
244- }, randomNonNegativeLong (), randomNonNegativeLong ());
221+ final List <Translog .Operation > shippedOps = new ArrayList <>();
222+ RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler () {
223+ @ Override
224+ public long indexTranslogOperations (List <Translog .Operation > operations , int totalTranslogOps , long timestamp , long msu ) {
225+ shippedOps .addAll (operations );
226+ return SequenceNumbers .NO_OPS_PERFORMED ;
227+ }
228+ };
229+ RecoverySourceHandler handler = new RecoverySourceHandler (shard , recoveryTarget , request , fileChunkSizeInBytes , between (1 , 10 ));
230+ RecoverySourceHandler .SendSnapshotResult result = handler .phase2 (startingSeqNo , requiredStartingSeqNo ,
231+ endingSeqNo , newTranslogSnapshot (operations , Collections .emptyList ()), randomNonNegativeLong (), randomNonNegativeLong ());
245232 final int expectedOps = (int ) (endingSeqNo - startingSeqNo + 1 );
246233 assertThat (result .totalOperations , equalTo (expectedOps ));
247- final ArgumentCaptor <List > shippedOpsCaptor = ArgumentCaptor .forClass (List .class );
248- verify (recoveryTarget ).indexTranslogOperations (shippedOpsCaptor .capture (), ArgumentCaptor .forClass (Integer .class ).capture (),
249- ArgumentCaptor .forClass (Long .class ).capture (), ArgumentCaptor .forClass (Long .class ).capture ());
250- List <Translog .Operation > shippedOps = new ArrayList <>();
251- for (List list : shippedOpsCaptor .getAllValues ()) {
252- shippedOps .addAll (list );
253- }
254234 shippedOps .sort (Comparator .comparing (Translog .Operation ::seqNo ));
255235 assertThat (shippedOps .size (), equalTo (expectedOps ));
256236 for (int i = 0 ; i < shippedOps .size (); i ++) {
@@ -261,30 +241,8 @@ public Translog.Operation next() throws IOException {
261241 List <Translog .Operation > requiredOps = operations .subList (0 , operations .size () - 1 ).stream () // remove last null marker
262242 .filter (o -> o .seqNo () >= requiredStartingSeqNo && o .seqNo () <= endingSeqNo ).collect (Collectors .toList ());
263243 List <Translog .Operation > opsToSkip = randomSubsetOf (randomIntBetween (1 , requiredOps .size ()), requiredOps );
264- expectThrows (IllegalStateException .class , () ->
265- handler .phase2 (startingSeqNo , requiredStartingSeqNo ,
266- endingSeqNo , new Translog .Snapshot () {
267- @ Override
268- public void close () {
269-
270- }
271-
272- private int counter = 0 ;
273-
274- @ Override
275- public int totalOperations () {
276- return operations .size () - 1 - opsToSkip .size ();
277- }
278-
279- @ Override
280- public Translog .Operation next () throws IOException {
281- Translog .Operation op ;
282- do {
283- op = operations .get (counter ++);
284- } while (op != null && opsToSkip .contains (op ));
285- return op ;
286- }
287- }, randomNonNegativeLong (), randomNonNegativeLong ()));
244+ expectThrows (IllegalStateException .class , () -> handler .phase2 (startingSeqNo , requiredStartingSeqNo ,
245+ endingSeqNo , newTranslogSnapshot (operations , opsToSkip ), randomNonNegativeLong (), randomNonNegativeLong ()));
288246 }
289247 }
290248
@@ -716,4 +674,39 @@ public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesR
716674 int totalTranslogOps , ActionListener <Void > listener ) {
717675 }
718676 }
677+
678+ private Translog .Snapshot newTranslogSnapshot (List <Translog .Operation > operations , List <Translog .Operation > operationsToSkip ) {
679+ return new Translog .Snapshot () {
680+ int index = 0 ;
681+ int skippedCount = 0 ;
682+
683+ @ Override
684+ public int totalOperations () {
685+ return operations .size ();
686+ }
687+
688+ @ Override
689+ public int skippedOperations () {
690+ return skippedCount ;
691+ }
692+
693+ @ Override
694+ public Translog .Operation next () {
695+ while (index < operations .size ()) {
696+ Translog .Operation op = operations .get (index ++);
697+ if (operationsToSkip .contains (op )) {
698+ skippedCount ++;
699+ } else {
700+ return op ;
701+ }
702+ }
703+ return null ;
704+ }
705+
706+ @ Override
707+ public void close () {
708+
709+ }
710+ };
711+ }
719712}
0 commit comments