Skip to content

Commit efa3657

Browse files
committed
Replace queryForStream with query to prevent connection leaks
Signed-off-by: Hyunsang Han <[email protected]>
1 parent e74e928 commit efa3657

File tree

4 files changed

+32
-23
lines changed

4 files changed

+32
-23
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcExecutionContextDao.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@
2727
import java.util.Collection;
2828
import java.util.HashMap;
2929
import java.util.Iterator;
30+
import java.util.List;
3031
import java.util.Map;
3132
import java.util.Map.Entry;
3233
import java.util.concurrent.locks.Lock;
3334
import java.util.concurrent.locks.ReentrantLock;
34-
import java.util.stream.Stream;
3535

3636
import org.springframework.batch.core.job.JobExecution;
3737
import org.springframework.batch.core.step.StepExecution;
@@ -59,6 +59,7 @@
5959
* @author David Turanski
6060
* @author Mahmoud Ben Hassine
6161
* @author Yanming Zhou
62+
* @author Hyunsang Han
6263
*/
6364
public class JdbcExecutionContextDao extends AbstractJdbcBatchMetadataDao implements ExecutionContextDao {
6465

@@ -154,21 +155,19 @@ public ExecutionContext getExecutionContext(JobExecution jobExecution) {
154155
Long executionId = jobExecution.getId();
155156
Assert.notNull(executionId, "ExecutionId must not be null.");
156157

157-
try (Stream<ExecutionContext> stream = getJdbcTemplate().queryForStream(getQuery(FIND_JOB_EXECUTION_CONTEXT),
158-
new ExecutionContextRowMapper(), executionId)) {
159-
return stream.findFirst().orElseGet(ExecutionContext::new);
160-
}
158+
List<ExecutionContext> results = getJdbcTemplate().query(getQuery(FIND_JOB_EXECUTION_CONTEXT),
159+
new ExecutionContextRowMapper(), executionId);
160+
return !results.isEmpty() ? results.get(0) : new ExecutionContext();
161161
}
162162

163163
@Override
164164
public ExecutionContext getExecutionContext(StepExecution stepExecution) {
165165
Long executionId = stepExecution.getId();
166166
Assert.notNull(executionId, "ExecutionId must not be null.");
167167

168-
try (Stream<ExecutionContext> stream = getJdbcTemplate().queryForStream(getQuery(FIND_STEP_EXECUTION_CONTEXT),
169-
new ExecutionContextRowMapper(), executionId)) {
170-
return stream.findFirst().orElseGet(ExecutionContext::new);
171-
}
168+
List<ExecutionContext> results = getJdbcTemplate().query(getQuery(FIND_STEP_EXECUTION_CONTEXT),
169+
new ExecutionContextRowMapper(), executionId);
170+
return !results.isEmpty() ? results.get(0) : new ExecutionContext();
172171
}
173172

174173
@Override

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcJobExecutionDao.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.Set;
2929
import java.util.concurrent.locks.Lock;
3030
import java.util.concurrent.locks.ReentrantLock;
31-
import java.util.stream.Stream;
3231

3332
import org.apache.commons.logging.Log;
3433
import org.apache.commons.logging.LogFactory;
@@ -79,6 +78,7 @@
7978
* @author Philippe Marschall
8079
* @author Jinwoo Bae
8180
* @author Yanming Zhou
81+
* @author Hyunsang Han
8282
*/
8383
public class JdbcJobExecutionDao extends AbstractJdbcBatchMetadataDao implements JobExecutionDao, InitializingBean {
8484

@@ -337,10 +337,12 @@ public JobExecution getLastJobExecution(JobInstance jobInstance) {
337337

338338
Long id = jobInstance.getId();
339339

340-
try (Stream<JobExecution> stream = getJdbcTemplate().queryForStream(getQuery(GET_LAST_EXECUTION),
341-
new JobExecutionRowMapper(jobInstance), id, id)) {
342-
return stream.findFirst().orElse(null);
343-
}
340+
List<JobExecution> executions = getJdbcTemplate().query(getQuery(GET_LAST_EXECUTION),
341+
new JobExecutionRowMapper(jobInstance), id, id);
342+
343+
Assert.state(executions.size() <= 1, "There must be at most one latest job execution");
344+
345+
return executions.isEmpty() ? null : executions.get(0);
344346
}
345347

346348
@Override

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcJobInstanceDao.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.sql.Types;
2222
import java.util.ArrayList;
2323
import java.util.List;
24-
import java.util.stream.Stream;
2524

2625
import org.springframework.batch.core.job.DefaultJobKeyGenerator;
2726
import org.springframework.batch.core.job.JobExecution;
@@ -58,6 +57,7 @@
5857
* @author Mahmoud Ben Hassine
5958
* @author Parikshit Dutta
6059
* @author Yanming Zhou
60+
* @author Hyunsang Han
6161
*/
6262
public class JdbcJobInstanceDao extends AbstractJdbcBatchMetadataDao implements JobInstanceDao, InitializingBean {
6363

@@ -176,10 +176,16 @@ public JobInstance getJobInstance(final String jobName, final JobParameters jobP
176176

177177
RowMapper<JobInstance> rowMapper = new JobInstanceRowMapper();
178178

179-
try (Stream<JobInstance> stream = getJdbcTemplate().queryForStream(
179+
List<JobInstance> instances = getJdbcTemplate().query(
180180
getQuery(StringUtils.hasLength(jobKey) ? FIND_JOBS_WITH_KEY : FIND_JOBS_WITH_EMPTY_KEY), rowMapper,
181-
jobName, jobKey)) {
182-
return stream.findFirst().orElse(null);
181+
jobName, jobKey);
182+
183+
if (instances.isEmpty()) {
184+
return null;
185+
}
186+
else {
187+
Assert.state(instances.size() == 1, "instance count must be 1 but was " + instances.size());
188+
return instances.get(0);
183189
}
184190

185191
}

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcStepExecutionDao.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.List;
3030
import java.util.concurrent.locks.Lock;
3131
import java.util.concurrent.locks.ReentrantLock;
32-
import java.util.stream.Stream;
3332

3433
import org.apache.commons.logging.Log;
3534
import org.apache.commons.logging.LogFactory;
@@ -70,6 +69,7 @@
7069
* @author Baris Cubukcuoglu
7170
* @author Minsoo Kim
7271
* @author Yanming Zhou
72+
* @author Hyunsang Han
7373
* @see StepExecutionDao
7474
*/
7575
public class JdbcStepExecutionDao extends AbstractJdbcBatchMetadataDao implements StepExecutionDao, InitializingBean {
@@ -327,10 +327,12 @@ private String truncateExitDescription(String description) {
327327
@Override
328328
@Nullable
329329
public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecutionId) {
330-
try (Stream<StepExecution> stream = getJdbcTemplate().queryForStream(getQuery(GET_STEP_EXECUTION),
331-
new StepExecutionRowMapper(jobExecution), stepExecutionId)) {
332-
return stream.findFirst().orElse(null);
333-
}
330+
List<StepExecution> executions = getJdbcTemplate().query(getQuery(GET_STEP_EXECUTION),
331+
new StepExecutionRowMapper(jobExecution), stepExecutionId);
332+
333+
Assert.state(executions.size() <= 1,
334+
"There can be at most one step execution with given name for single job execution");
335+
return executions.isEmpty() ? null : executions.get(0);
334336
}
335337

336338
@Override

0 commit comments

Comments
 (0)