Skip to content

Commit cf1f34b

Browse files
hpoettkerfmbenhassine
authored andcommitted
Use more SynchronizedItemReader in tests
1 parent 1d097c2 commit cf1f34b

File tree

2 files changed

+24
-56
lines changed

2 files changed

+24
-56
lines changed

spring-batch-core/src/test/java/org/springframework/batch/core/step/item/FaultTolerantStepFactoryBeanRollbackTests.java

Lines changed: 11 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717

1818
import java.util.ArrayList;
1919
import java.util.Arrays;
20-
import java.util.Collection;
2120
import java.util.Collections;
22-
import java.util.HashMap;
2321
import java.util.List;
2422
import java.util.Map;
2523

@@ -43,6 +41,7 @@
4341
import org.springframework.batch.core.step.factory.FaultTolerantStepFactoryBean;
4442
import org.springframework.batch.item.ItemReader;
4543
import org.springframework.batch.item.support.ListItemReader;
44+
import org.springframework.batch.item.support.SynchronizedItemReader;
4645
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
4746
import org.springframework.batch.support.transaction.TransactionAwareProxyFactory;
4847
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@@ -79,7 +78,6 @@ class FaultTolerantStepFactoryBeanRollbackTests {
7978

8079
private JobRepository repository;
8180

82-
@SuppressWarnings("unchecked")
8381
@BeforeEach
8482
void setUp() throws Exception {
8583
reader = new SkipReaderStub<>();
@@ -103,7 +101,7 @@ void setUp() throws Exception {
103101

104102
factory.setSkipLimit(2);
105103

106-
factory.setSkippableExceptionClasses(getExceptionMap(Exception.class));
104+
factory.setSkippableExceptionClasses(Map.of(Exception.class, true));
107105

108106
EmbeddedDatabase embeddedDatabase = new EmbeddedDatabaseBuilder()
109107
.addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
@@ -177,17 +175,16 @@ void testReaderDefaultNoRollbackOnCheckedException() throws Exception {
177175
/**
178176
* Scenario: Exception in reader that should not cause rollback
179177
*/
180-
@SuppressWarnings("unchecked")
181178
@Test
182179
void testReaderAttributesOverrideSkippableNoRollback() throws Exception {
183180
reader.setFailures("2", "3");
184181
reader.setItems("1", "2", "3", "4");
185182
reader.setExceptionType(SkippableException.class);
186183

187184
// No skips by default
188-
factory.setSkippableExceptionClasses(getExceptionMap(RuntimeException.class));
185+
factory.setSkippableExceptionClasses(Map.of(RuntimeException.class, true));
189186
// But this one is explicit in the tx-attrs so it should be skipped
190-
factory.setNoRollbackExceptionClasses(getExceptionList(SkippableException.class));
187+
factory.setNoRollbackExceptionClasses(List.of(SkippableException.class));
191188

192189
Step step = factory.getObject();
193190

@@ -249,11 +246,8 @@ void testNoRollbackInProcessorWhenSkipExceeded() throws Throwable {
249246
processor.clear();
250247
factory.setItemProcessor(processor);
251248

252-
List<Class<? extends Throwable>> exceptions = Arrays.asList(Exception.class);
253-
factory.setNoRollbackExceptionClasses(exceptions);
254-
@SuppressWarnings("unchecked")
255-
Map<Class<? extends Throwable>, Boolean> skippable = getExceptionMap(Exception.class);
256-
factory.setSkippableExceptionClasses(skippable);
249+
factory.setNoRollbackExceptionClasses(List.of(Exception.class));
250+
factory.setSkippableExceptionClasses(Map.of(Exception.class, true));
257251

258252
processor.setFailures("2");
259253

@@ -279,7 +273,7 @@ void testProcessSkipWithNoRollbackForCheckedException() throws Exception {
279273
processor.setFailures("4");
280274
processor.setExceptionType(SkippableException.class);
281275

282-
factory.setNoRollbackExceptionClasses(getExceptionList(SkippableException.class));
276+
factory.setNoRollbackExceptionClasses(List.of(SkippableException.class));
283277

284278
Step step = factory.getObject();
285279

@@ -359,7 +353,7 @@ void testWriterNoRollbackOnRuntimeException() throws Exception {
359353
writer.setFailures("2", "3");
360354
writer.setExceptionType(SkippableRuntimeException.class);
361355

362-
factory.setNoRollbackExceptionClasses(getExceptionList(SkippableRuntimeException.class));
356+
factory.setNoRollbackExceptionClasses(List.of(SkippableRuntimeException.class));
363357

364358
Step step = factory.getObject();
365359

@@ -380,7 +374,7 @@ void testWriterNoRollbackOnCheckedException() throws Exception {
380374
writer.setFailures("2", "3");
381375
writer.setExceptionType(SkippableException.class);
382376

383-
factory.setNoRollbackExceptionClasses(getExceptionList(SkippableException.class));
377+
factory.setNoRollbackExceptionClasses(List.of(SkippableException.class));
384378

385379
Step step = factory.getObject();
386380

@@ -517,12 +511,7 @@ void testSkipInWriterTransactionalReader() throws Exception {
517511

518512
@Test
519513
void testMultithreadedSkipInWriter() throws Exception {
520-
factory.setItemReader(new ItemReader<>() {
521-
@Override
522-
public synchronized String read() throws Exception {
523-
return reader.read();
524-
}
525-
});
514+
factory.setItemReader(new SynchronizedItemReader<>(reader));
526515
writer.setFailures("1", "2", "3", "4", "5");
527516
factory.setCommitInterval(3);
528517
factory.setSkipLimit(10);
@@ -575,23 +564,9 @@ void testMultipleSkipsInWriterNonTransactionalProcessor() throws Exception {
575564
assertEquals("[1, 2, 3, 4, 5]", processor.getProcessed().toString());
576565
}
577566

578-
@SuppressWarnings("unchecked")
579-
private Collection<Class<? extends Throwable>> getExceptionList(Class<? extends Throwable> arg) {
580-
return Arrays.<Class<? extends Throwable>>asList(arg);
581-
}
582-
583-
@SuppressWarnings("unchecked")
584-
private Map<Class<? extends Throwable>, Boolean> getExceptionMap(Class<? extends Throwable>... args) {
585-
Map<Class<? extends Throwable>, Boolean> map = new HashMap<>();
586-
for (Class<? extends Throwable> arg : args) {
587-
map.put(arg, true);
588-
}
589-
return map;
590-
}
591-
592567
static class ExceptionThrowingChunkListener implements ChunkListener {
593568

594-
private int phase = -1;
569+
private final int phase;
595570

596571
public ExceptionThrowingChunkListener(int throwPhase) {
597572
this.phase = throwPhase;

spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/AsyncTaskletStepTests.java

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,16 @@
3535
import org.springframework.batch.core.step.JobRepositorySupport;
3636
import org.springframework.batch.item.ExecutionContext;
3737
import org.springframework.batch.item.ItemProcessor;
38-
import org.springframework.batch.item.ItemStreamSupport;
38+
import org.springframework.batch.item.ItemStream;
3939
import org.springframework.batch.item.ItemWriter;
4040
import org.springframework.batch.item.support.ListItemReader;
4141
import org.springframework.batch.item.support.PassThroughItemProcessor;
42+
import org.springframework.batch.item.support.SynchronizedItemReader;
4243
import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
4344
import org.springframework.batch.repeat.support.RepeatTemplate;
4445
import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate;
4546
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
4647
import org.springframework.core.task.SimpleAsyncTaskExecutor;
47-
import org.springframework.lang.Nullable;
4848
import org.springframework.util.StringUtils;
4949

5050
class AsyncTaskletStepTests {
@@ -83,8 +83,8 @@ private void setUp() {
8383

8484
RepeatTemplate chunkTemplate = new RepeatTemplate();
8585
chunkTemplate.setCompletionPolicy(new SimpleCompletionPolicy(2));
86-
step.setTasklet(new TestingChunkOrientedTasklet<>(new ListItemReader<>(items), itemProcessor, itemWriter,
87-
chunkTemplate));
86+
step.setTasklet(new TestingChunkOrientedTasklet<>(new SynchronizedItemReader<>(new ListItemReader<>(items)),
87+
itemProcessor, itemWriter, chunkTemplate));
8888

8989
jobRepository = new JobRepositorySupport();
9090
step.setJobRepository(jobRepository);
@@ -96,12 +96,11 @@ private void setUp() {
9696
template.setTaskExecutor(taskExecutor);
9797
step.setStepOperations(template);
9898

99-
step.registerStream(new ItemStreamSupport() {
99+
step.registerStream(new ItemStream() {
100100
private int count = 0;
101101

102102
@Override
103103
public void update(ExecutionContext executionContext) {
104-
super.update(executionContext);
105104
executionContext.putInt("counter", count++);
106105
}
107106
});
@@ -125,10 +124,8 @@ void testStepExecutionUpdates() throws Exception {
125124
step.execute(stepExecution);
126125

127126
assertEquals(BatchStatus.COMPLETED, stepExecution.getStatus());
128-
// assertEquals(25, stepExecution.getReadCount());
129-
// assertEquals(25, processed.size());
130-
assertTrue(stepExecution.getReadCount() >= 25);
131-
assertTrue(processed.size() >= 25);
127+
assertEquals(25, stepExecution.getReadCount());
128+
assertEquals(25, processed.size());
132129

133130
// Check commit count didn't spin out of control waiting for other
134131
// threads to finish...
@@ -170,17 +167,13 @@ void testStepExecutionFailsWithProcessor() throws Exception {
170167
throttleLimit = 1;
171168
concurrencyLimit = 1;
172169
items = Arrays.asList("one", "barf", "three", "four");
173-
itemProcessor = new ItemProcessor<>() {
174-
@Nullable
175-
@Override
176-
public String process(String item) throws Exception {
177-
logger.info("Item: " + item);
178-
processed.add(item);
179-
if (item.equals("barf")) {
180-
throw new RuntimeException("Planned processor error");
181-
}
182-
return item;
170+
itemProcessor = item -> {
171+
logger.info("Item: " + item);
172+
processed.add(item);
173+
if (item.equals("barf")) {
174+
throw new RuntimeException("Planned processor error");
183175
}
176+
return item;
184177
};
185178
setUp();
186179

0 commit comments

Comments
 (0)