Skip to content

Commit ef14d76

Browse files
committed
Merge limits on input in codecs
2 parents ce0b012 + 5abf24e commit ef14d76

26 files changed

+1168
-208
lines changed

spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,39 @@
4848
@SuppressWarnings("deprecation")
4949
public abstract class AbstractDataBufferDecoder<T> extends AbstractDecoder<T> {
5050

51+
private int maxInMemorySize = 256 * 1024;
52+
5153

5254
protected AbstractDataBufferDecoder(MimeType... supportedMimeTypes) {
5355
super(supportedMimeTypes);
5456
}
5557

5658

59+
/**
60+
* Configure a limit on the number of bytes that can be buffered whenever
61+
* the input stream needs to be aggregated. This can be a result of
62+
* decoding to a single {@code DataBuffer},
63+
* {@link java.nio.ByteBuffer ByteBuffer}, {@code byte[]},
64+
* {@link org.springframework.core.io.Resource Resource}, {@code String}, etc.
65+
* It can also occur when splitting the input stream, e.g. delimited text,
66+
* in which case the limit applies to data buffered between delimiters.
67+
* <p>By default this is set to 256K.
68+
* @param byteCount the max number of bytes to buffer, or -1 for unlimited
69+
* @since 5.1.11
70+
*/
71+
public void setMaxInMemorySize(int byteCount) {
72+
this.maxInMemorySize = byteCount;
73+
}
74+
75+
/**
76+
* Return the {@link #setMaxInMemorySize configured} byte count limit.
77+
* @since 5.1.11
78+
*/
79+
public int getMaxInMemorySize() {
80+
return this.maxInMemorySize;
81+
}
82+
83+
5784
@Override
5885
public Flux<T> decode(Publisher<DataBuffer> input, ResolvableType elementType,
5986
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
@@ -65,7 +92,7 @@ public Flux<T> decode(Publisher<DataBuffer> input, ResolvableType elementType,
6592
public Mono<T> decodeToMono(Publisher<DataBuffer> input, ResolvableType elementType,
6693
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
6794

68-
return DataBufferUtils.join(input)
95+
return DataBufferUtils.join(input, this.maxInMemorySize)
6996
.map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints));
7097
}
7198

spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,18 @@
2525
import java.util.Map;
2626
import java.util.concurrent.ConcurrentHashMap;
2727
import java.util.concurrent.ConcurrentMap;
28+
import java.util.function.Consumer;
2829

2930
import org.reactivestreams.Publisher;
3031
import reactor.core.publisher.Flux;
3132

3233
import org.springframework.core.ResolvableType;
3334
import org.springframework.core.io.buffer.DataBuffer;
35+
import org.springframework.core.io.buffer.DataBufferLimitException;
3436
import org.springframework.core.io.buffer.DataBufferUtils;
3537
import org.springframework.core.io.buffer.DataBufferWrapper;
3638
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
39+
import org.springframework.core.io.buffer.LimitedDataBufferList;
3740
import org.springframework.core.io.buffer.PooledDataBuffer;
3841
import org.springframework.core.log.LogFormatUtils;
3942
import org.springframework.lang.Nullable;
@@ -91,12 +94,18 @@ public Flux<String> decode(Publisher<DataBuffer> input, ResolvableType elementTy
9194

9295
byte[][] delimiterBytes = getDelimiterBytes(mimeType);
9396

97+
// TODO: Drop Consumer and use bufferUntil with Supplier<LimistedDataBufferList> (reactor-core#1925)
98+
// TODO: Drop doOnDiscard(LimitedDataBufferList.class, ...) (reactor-core#1924)
99+
LimitedDataBufferConsumer limiter = new LimitedDataBufferConsumer(getMaxInMemorySize());
100+
94101
Flux<DataBuffer> inputFlux = Flux.defer(() -> {
95102
DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes);
96103
return Flux.from(input)
97104
.concatMapIterable(buffer -> endFrameAfterDelimiter(buffer, matcher))
105+
.doOnNext(limiter)
98106
.bufferUntil(buffer -> buffer instanceof EndFrameBuffer)
99107
.map(buffers -> joinAndStrip(buffers, this.stripDelimiter))
108+
.doOnDiscard(LimitedDataBufferList.class, LimitedDataBufferList::releaseAndClear)
100109
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
101110

102111
});
@@ -279,4 +288,34 @@ public byte[] delimiter() {
279288
}
280289

281290

291+
/**
292+
* Temporary measure for reactor-core#1925.
293+
* Consumer that adds to a {@link LimitedDataBufferList} to enforce limits.
294+
*/
295+
private static class LimitedDataBufferConsumer implements Consumer<DataBuffer> {
296+
297+
private final LimitedDataBufferList bufferList;
298+
299+
300+
public LimitedDataBufferConsumer(int maxInMemorySize) {
301+
this.bufferList = new LimitedDataBufferList(maxInMemorySize);
302+
}
303+
304+
305+
@Override
306+
public void accept(DataBuffer buffer) {
307+
if (buffer instanceof EndFrameBuffer) {
308+
this.bufferList.clear();
309+
}
310+
else {
311+
try {
312+
this.bufferList.add(buffer);
313+
}
314+
catch (DataBufferLimitException ex) {
315+
DataBufferUtils.release(buffer);
316+
throw ex;
317+
}
318+
}
319+
}
320+
}
282321
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2002-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.core.io.buffer;
17+
18+
/**
19+
* Exception that indicates the cumulative number of bytes consumed from a
20+
* stream of {@link DataBuffer DataBuffer}'s exceeded some pre-configured limit.
21+
* This can be raised when data buffers are cached and aggregated, e.g.
22+
* {@link DataBufferUtils#join}. Or it could also be raised when data buffers
23+
* have been released but a parsed representation is being aggregated, e.g. async
24+
* parsing with Jackson.
25+
*
26+
* @author Rossen Stoyanchev
27+
* @since 5.1.11
28+
*/
29+
@SuppressWarnings("serial")
30+
public class DataBufferLimitException extends IllegalStateException {
31+
32+
33+
public DataBufferLimitException(String message) {
34+
super(message);
35+
}
36+
37+
}

spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -525,16 +525,35 @@ public static Consumer<DataBuffer> releaseConsumer() {
525525
*/
526526
@SuppressWarnings("unchecked")
527527
public static Mono<DataBuffer> join(Publisher<? extends DataBuffer> dataBuffers) {
528-
Assert.notNull(dataBuffers, "'dataBuffers' must not be null");
528+
return join(dataBuffers, -1);
529+
}
530+
531+
/**
532+
* Variant of {@link #join(Publisher)} that behaves the same way up until
533+
* the specified max number of bytes to buffer. Once the limit is exceeded,
534+
* {@link DataBufferLimitException} is raised.
535+
* @param buffers the data buffers that are to be composed
536+
* @param maxByteCount the max number of bytes to buffer, or -1 for unlimited
537+
* @return a buffer with the aggregated content, possibly an empty Mono if
538+
* the max number of bytes to buffer is exceeded.
539+
* @throws DataBufferLimitException if maxByteCount is exceeded
540+
* @since 5.1.11
541+
*/
542+
@SuppressWarnings("unchecked")
543+
public static Mono<DataBuffer> join(Publisher<? extends DataBuffer> buffers, int maxByteCount) {
544+
Assert.notNull(buffers, "'dataBuffers' must not be null");
529545

530-
if (dataBuffers instanceof Mono) {
531-
return (Mono<DataBuffer>) dataBuffers;
546+
if (buffers instanceof Mono) {
547+
return (Mono<DataBuffer>) buffers;
532548
}
533549

534-
return Flux.from(dataBuffers)
535-
.collectList()
550+
// TODO: Drop doOnDiscard(LimitedDataBufferList.class, ...) (reactor-core#1924)
551+
552+
return Flux.from(buffers)
553+
.collect(() -> new LimitedDataBufferList(maxByteCount), LimitedDataBufferList::add)
536554
.filter(list -> !list.isEmpty())
537555
.map(list -> list.get(0).factory().join(list))
556+
.doOnDiscard(LimitedDataBufferList.class, LimitedDataBufferList::releaseAndClear)
538557
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
539558
}
540559

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Copyright 2002-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.core.io.buffer;
17+
18+
import java.util.ArrayList;
19+
import java.util.Collection;
20+
import java.util.List;
21+
import java.util.function.Predicate;
22+
23+
import reactor.core.publisher.Flux;
24+
25+
/**
26+
* Custom {@link List} to collect data buffers with and enforce a
27+
* limit on the total number of bytes buffered. For use with "collect" or
28+
* other buffering operators in declarative APIs, e.g. {@link Flux}.
29+
*
30+
* <p>Adding elements increases the byte count and if the limit is exceeded,
31+
* {@link DataBufferLimitException} is raised. {@link #clear()} resets the
32+
* count. Remove and set are not supported.
33+
*
34+
* <p><strong>Note:</strong> This class does not automatically release the
35+
* buffers it contains. It is usually preferable to use hooks such as
36+
* {@link Flux#doOnDiscard} that also take care of cancel and error signals,
37+
* or otherwise {@link #releaseAndClear()} can be used.
38+
*
39+
* @author Rossen Stoyanchev
40+
* @since 5.1.11
41+
*/
42+
@SuppressWarnings("serial")
43+
public class LimitedDataBufferList extends ArrayList<DataBuffer> {
44+
45+
private final int maxByteCount;
46+
47+
private int byteCount;
48+
49+
50+
public LimitedDataBufferList(int maxByteCount) {
51+
this.maxByteCount = maxByteCount;
52+
}
53+
54+
55+
@Override
56+
public boolean add(DataBuffer buffer) {
57+
boolean result = super.add(buffer);
58+
if (result) {
59+
updateCount(buffer.readableByteCount());
60+
}
61+
return result;
62+
}
63+
64+
@Override
65+
public void add(int index, DataBuffer buffer) {
66+
super.add(index, buffer);
67+
updateCount(buffer.readableByteCount());
68+
}
69+
70+
@Override
71+
public boolean addAll(Collection<? extends DataBuffer> collection) {
72+
boolean result = super.addAll(collection);
73+
collection.forEach(buffer -> updateCount(buffer.readableByteCount()));
74+
return result;
75+
}
76+
77+
@Override
78+
public boolean addAll(int index, Collection<? extends DataBuffer> collection) {
79+
boolean result = super.addAll(index, collection);
80+
collection.forEach(buffer -> updateCount(buffer.readableByteCount()));
81+
return result;
82+
}
83+
84+
private void updateCount(int bytesToAdd) {
85+
if (this.maxByteCount < 0) {
86+
return;
87+
}
88+
if (bytesToAdd > Integer.MAX_VALUE - this.byteCount) {
89+
raiseLimitException();
90+
}
91+
else {
92+
this.byteCount += bytesToAdd;
93+
if (this.byteCount > this.maxByteCount) {
94+
raiseLimitException();
95+
}
96+
}
97+
}
98+
99+
private void raiseLimitException() {
100+
// Do not release here, it's likely down via doOnDiscard..
101+
throw new DataBufferLimitException(
102+
"Exceeded limit on max bytes to buffer : " + this.maxByteCount);
103+
}
104+
105+
@Override
106+
public DataBuffer remove(int index) {
107+
throw new UnsupportedOperationException();
108+
}
109+
110+
@Override
111+
public boolean remove(Object o) {
112+
throw new UnsupportedOperationException();
113+
}
114+
115+
@Override
116+
protected void removeRange(int fromIndex, int toIndex) {
117+
throw new UnsupportedOperationException();
118+
}
119+
120+
@Override
121+
public boolean removeAll(Collection<?> c) {
122+
throw new UnsupportedOperationException();
123+
}
124+
125+
@Override
126+
public boolean removeIf(Predicate<? super DataBuffer> filter) {
127+
throw new UnsupportedOperationException();
128+
}
129+
130+
@Override
131+
public DataBuffer set(int index, DataBuffer element) {
132+
throw new UnsupportedOperationException();
133+
}
134+
135+
@Override
136+
public void clear() {
137+
this.byteCount = 0;
138+
super.clear();
139+
}
140+
141+
/**
142+
* Shortcut to {@link DataBufferUtils#release release} all data buffers and
143+
* then {@link #clear()}.
144+
*/
145+
public void releaseAndClear() {
146+
forEach(buf -> {
147+
try {
148+
DataBufferUtils.release(buf);
149+
}
150+
catch (Throwable ex) {
151+
// Keep going..
152+
}
153+
});
154+
clear();
155+
}
156+
157+
}

spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.util.Collections;
2020
import java.util.function.Consumer;
2121

22-
import org.junit.jupiter.api.AfterEach;
2322
import org.junit.jupiter.api.Test;
2423
import org.reactivestreams.Subscription;
2524
import reactor.core.publisher.BaseSubscriber;
@@ -30,9 +29,9 @@
3029
import org.springframework.core.ResolvableType;
3130
import org.springframework.core.io.ClassPathResource;
3231
import org.springframework.core.io.Resource;
32+
import org.springframework.core.io.buffer.AbstractLeakCheckingTests;
3333
import org.springframework.core.io.buffer.DataBuffer;
3434
import org.springframework.core.io.buffer.DataBufferUtils;
35-
import org.springframework.core.io.buffer.LeakAwareDataBufferFactory;
3635
import org.springframework.core.io.buffer.support.DataBufferTestUtils;
3736
import org.springframework.core.io.support.ResourceRegion;
3837
import org.springframework.util.MimeType;
@@ -45,18 +44,10 @@
4544
* Test cases for {@link ResourceRegionEncoder} class.
4645
* @author Brian Clozel
4746
*/
48-
class ResourceRegionEncoderTests {
47+
class ResourceRegionEncoderTests extends AbstractLeakCheckingTests {
4948

5049
private ResourceRegionEncoder encoder = new ResourceRegionEncoder();
5150

52-
private LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory();
53-
54-
55-
@AfterEach
56-
void tearDown() throws Exception {
57-
this.bufferFactory.checkForLeaks();
58-
}
59-
6051
@Test
6152
void canEncode() {
6253
ResolvableType resourceRegion = ResolvableType.forClass(ResourceRegion.class);

0 commit comments

Comments
 (0)