Skip to content

Commit 24b3614

Browse files
committed
Add a 8 Kb flush threshold to RxNettyServerHttpResponse
Issue: SPR-14991
1 parent 856cb3b commit 24b3614

File tree

2 files changed

+71
-20
lines changed

2 files changed

+71
-20
lines changed

spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import reactor.core.publisher.Mono;
2929
import rx.Observable;
3030
import rx.RxReactiveStreams;
31+
import rx.functions.Func1;
3132

3233
import org.springframework.core.io.buffer.DataBuffer;
3334
import org.springframework.core.io.buffer.NettyDataBufferFactory;
@@ -40,6 +41,7 @@
4041
*
4142
* @author Rossen Stoyanchev
4243
* @author Stephane Maldini
44+
* @author Sebastien Deleuze
4345
* @since 5.0
4446
*/
4547
public class RxNettyServerHttpResponse extends AbstractServerHttpResponse {
@@ -48,6 +50,9 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse {
4850

4951
private static final ByteBuf FLUSH_SIGNAL = Unpooled.buffer(0, 0);
5052

53+
// 8 Kb flush threshold to avoid blocking RxNetty when the send buffer has reached the high watermark
54+
private static final long FLUSH_THRESHOLD = 8192;
55+
5156
public RxNettyServerHttpResponse(HttpServerResponse<ByteBuf> response,
5257
NettyDataBufferFactory dataBufferFactory) {
5358
super(dataBufferFactory);
@@ -74,7 +79,7 @@ protected void applyStatusCode() {
7479
protected Mono<Void> writeWithInternal(Publisher<? extends DataBuffer> body) {
7580
Observable<ByteBuf> content = RxReactiveStreams.toObservable(body)
7681
.map(NettyDataBufferFactory::toByteBuf);
77-
return Flux.from(RxReactiveStreams.toPublisher(this.response.write(content)))
82+
return Flux.from(RxReactiveStreams.toPublisher(this.response.write(content, new FlushSelector(FLUSH_THRESHOLD))))
7883
.then();
7984
}
8085

@@ -116,6 +121,26 @@ protected void applyCookies() {
116121
}
117122
}
118123

124+
private class FlushSelector implements Func1<ByteBuf, Boolean> {
125+
126+
private final long flushEvery;
127+
private long count;
128+
129+
public FlushSelector(long flushEvery) {
130+
this.flushEvery = flushEvery;
131+
}
132+
133+
@Override
134+
public Boolean call(ByteBuf byteBuf) {
135+
this.count += byteBuf.readableBytes();
136+
if (this.count >= this.flushEvery) {
137+
this.count = 0;
138+
return true;
139+
}
140+
return false;
141+
}
142+
}
143+
119144

120145
/*
121146
While the underlying implementation of {@link ZeroCopyHttpOutputMessage} seems to

spring-web/src/test/java/org/springframework/http/server/reactive/FlushingIntegrationTests.java

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.http.server.reactive;
1818

19+
import java.nio.charset.StandardCharsets;
1920
import java.time.Duration;
2021

2122
import org.junit.Before;
@@ -26,8 +27,10 @@
2627
import reactor.test.StepVerifier;
2728

2829
import org.springframework.core.io.buffer.DataBuffer;
30+
import org.springframework.core.io.buffer.DataBufferFactory;
2931
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
3032
import org.springframework.http.codec.BodyExtractors;
33+
import org.springframework.util.Assert;
3134
import org.springframework.web.client.reactive.ClientRequest;
3235
import org.springframework.web.client.reactive.WebClient;
3336

@@ -45,11 +48,8 @@ public void setup() throws Exception {
4548
}
4649

4750
@Test
48-
public void testFlushing() throws Exception {
49-
50-
ClientRequest<Void> request = ClientRequest.GET("http://localhost:" + port).build();
51-
52-
51+
public void writeAndFlushWith() throws Exception {
52+
ClientRequest<Void> request = ClientRequest.GET("http://localhost:" + port + "/write-and-flush").build();
5353
Mono<String> result = this.webClient
5454
.exchange(request)
5555
.flatMap(response -> response.body(BodyExtractors.toFlux(String.class)))
@@ -62,6 +62,20 @@ public void testFlushing() throws Exception {
6262
.verify(Duration.ofSeconds(5L));
6363
}
6464

65+
@Test // SPR-14991
66+
public void writeAndAutoFlushOnComplete() {
67+
ClientRequest<Void> request = ClientRequest.GET("http://localhost:" + port + "/write-and-complete").build();
68+
Mono<String> result = this.webClient
69+
.exchange(request)
70+
.flatMap(response -> response.bodyToFlux(String.class))
71+
.reduce((s1, s2) -> s1 + s2);
72+
73+
StepVerifier.create(result)
74+
.consumeNextWith(value -> Assert.isTrue(value.length() == 200000))
75+
.expectComplete()
76+
.verify();
77+
}
78+
6579
@Override
6680
protected HttpHandler createHttpHandler() {
6781
return new FlushingHandler();
@@ -71,21 +85,33 @@ private static class FlushingHandler implements HttpHandler {
7185

7286
@Override
7387
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
74-
Flux<Publisher<DataBuffer>> responseBody = Flux
75-
.intervalMillis(50)
76-
.map(l -> {
77-
byte[] data = ("data" + l).getBytes();
78-
DataBuffer buffer = response.bufferFactory().allocateBuffer(data.length);
79-
buffer.write(data);
80-
return buffer;
81-
})
82-
.take(2)
83-
.map(Flux::just);
84-
85-
responseBody = responseBody.concatWith(Flux.never());
86-
87-
return response.writeAndFlushWith(responseBody);
88+
String path = request.getURI().getPath();
89+
if (path.endsWith("write-and-flush")) {
90+
Flux<Publisher<DataBuffer>> responseBody = Flux
91+
.intervalMillis(50)
92+
.map(l -> toDataBuffer("data" + l, response.bufferFactory()))
93+
.take(2)
94+
.map(Flux::just);
95+
responseBody = responseBody.concatWith(Flux.never());
96+
return response.writeAndFlushWith(responseBody);
97+
}
98+
else if (path.endsWith("write-and-complete")){
99+
Flux<DataBuffer> responseBody = Flux
100+
.just("0123456789")
101+
.repeat(20000)
102+
.map(value -> toDataBuffer(value, response.bufferFactory()));
103+
return response.writeWith(responseBody);
104+
}
105+
return response.writeWith(Flux.empty());
88106
}
107+
108+
private DataBuffer toDataBuffer(String value, DataBufferFactory factory) {
109+
byte[] data = (value).getBytes(StandardCharsets.UTF_8);
110+
DataBuffer buffer = factory.allocateBuffer(data.length);
111+
buffer.write(data);
112+
return buffer;
113+
}
114+
89115
}
90116

91117
}

0 commit comments

Comments
 (0)