Skip to content

Commit a87764f

Browse files
committed
Add support for Jetty Reactive Streams HTTP client
Leverage https://github.com/jetty-project/jetty-reactive-httpclient to add support for Jetty in WebClient via JettyClientHttpConnector. Implemented with buffer copy instead of optimized buffer wrapping because the latter hangs since Callback#succeeded doesn't allow releasing the buffer and requesting more data at different times (required for Mono<DataBuffer> for example). See jetty/jetty.project#2429. Issue: SPR-15092
1 parent 3c9049d commit a87764f

File tree

8 files changed

+465
-3
lines changed

8 files changed

+465
-3
lines changed

spring-web/spring-web.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ dependencies {
6262
optional("org.codehaus.groovy:groovy-all:${groovyVersion}")
6363
optional("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}")
6464
optional("org.jetbrains.kotlin:kotlin-stdlib:${kotlinVersion}")
65+
optional("org.eclipse.jetty:jetty-reactive-httpclient:1.0.0")
6566
testCompile("io.projectreactor:reactor-test")
6667
testCompile("org.apache.taglibs:taglibs-standard-jstlel:1.2.5") {
6768
exclude group: "org.apache.taglibs", module: "taglibs-standard-spec"
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Copyright 2002-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.http.client.reactive;
17+
18+
import java.net.URI;
19+
import java.util.function.Function;
20+
21+
import org.eclipse.jetty.client.HttpClient;
22+
import org.eclipse.jetty.util.Callback;
23+
import reactor.core.publisher.Flux;
24+
import reactor.core.publisher.Mono;
25+
26+
import org.springframework.context.SmartLifecycle;
27+
import org.springframework.core.io.buffer.DataBuffer;
28+
import org.springframework.core.io.buffer.DataBufferFactory;
29+
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
30+
import org.springframework.http.HttpMethod;
31+
32+
/**
33+
* Jetty ReactiveStreams HttpClient implementation of {@link ClientHttpConnector}.
34+
*
35+
* Implemented with buffer copy instead of optimized buffer wrapping because the latter
36+
* hangs since {@link Callback#succeeded()} doesn't allow releasing the buffer and
37+
* requesting more data at different times (required for {@code Mono<DataBuffer>} for example).
38+
* See https://github.com/eclipse/jetty.project/issues/2429 for more details.
39+
*
40+
* @author Sebastien Deleuze
41+
* @since 5.1
42+
* @see <a href="https://github.com/jetty-project/jetty-reactive-httpclient">Jetty ReactiveStreams HttpClient</a>
43+
*/
44+
public class JettyClientHttpConnector implements ClientHttpConnector, SmartLifecycle {
45+
46+
private final HttpClient httpClient;
47+
48+
private DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
49+
50+
51+
/**
52+
* Create a Jetty {@link ClientHttpConnector} with the default {@link HttpClient}.
53+
*/
54+
public JettyClientHttpConnector() {
55+
this(new HttpClient());
56+
}
57+
58+
/**
59+
* Create a Jetty {@link ClientHttpConnector} with the given {@link HttpClient}.
60+
*/
61+
public JettyClientHttpConnector(HttpClient httpClient) {
62+
this.httpClient = httpClient;
63+
}
64+
65+
66+
public void setBufferFactory(DataBufferFactory bufferFactory) {
67+
this.bufferFactory = bufferFactory;
68+
}
69+
70+
@Override
71+
public int getPhase() {
72+
return Integer.MAX_VALUE;
73+
}
74+
75+
@Override
76+
public boolean isAutoStartup() {
77+
return true;
78+
}
79+
80+
@Override
81+
public boolean isRunning() {
82+
return this.httpClient.isRunning();
83+
}
84+
85+
@Override
86+
public void start() {
87+
try {
88+
// HttpClient is internally synchronized and protected with state checks
89+
this.httpClient.start();
90+
}
91+
catch (Exception ex) {
92+
throw new RuntimeException(ex);
93+
}
94+
}
95+
96+
@Override
97+
public void stop() {
98+
try {
99+
this.httpClient.stop();
100+
}
101+
catch (Exception ex) {
102+
throw new RuntimeException(ex);
103+
}
104+
}
105+
106+
@Override
107+
public void stop(Runnable callback) {
108+
stop();
109+
callback.run();
110+
}
111+
112+
@Override
113+
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
114+
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
115+
116+
if (!uri.isAbsolute()) {
117+
return Mono.error(new IllegalArgumentException("URI is not absolute: " + uri));
118+
}
119+
120+
if (!this.httpClient.isStarted()) {
121+
try {
122+
this.httpClient.start();
123+
}
124+
catch (Exception ex) {
125+
return Mono.error(ex);
126+
}
127+
}
128+
129+
JettyClientHttpRequest clientHttpRequest = new JettyClientHttpRequest(
130+
this.httpClient.newRequest(uri).method(method.toString()), this.bufferFactory);
131+
return requestCallback.apply(clientHttpRequest).then(Mono.from(
132+
clientHttpRequest.getReactiveRequest().response((reactiveResponse, contentChunks) -> {
133+
Flux<DataBuffer> content = Flux.from(contentChunks).map(chunk -> {
134+
DataBuffer buffer = this.bufferFactory.allocateBuffer(chunk.buffer.capacity());
135+
buffer.write(chunk.buffer);
136+
chunk.callback.succeeded();
137+
return buffer;
138+
});
139+
return Mono.just(new JettyClientHttpResponse(reactiveResponse, content));
140+
})));
141+
}
142+
143+
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Copyright 2002-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.http.client.reactive;
17+
18+
import java.net.HttpCookie;
19+
import java.net.URI;
20+
import java.util.Collection;
21+
import java.util.function.Function;
22+
23+
import org.eclipse.jetty.client.api.Request;
24+
import org.eclipse.jetty.http.HttpHeader;
25+
import org.eclipse.jetty.reactive.client.ContentChunk;
26+
import org.eclipse.jetty.reactive.client.ReactiveRequest;
27+
import org.eclipse.jetty.util.Callback;
28+
import org.reactivestreams.Publisher;
29+
import reactor.core.Exceptions;
30+
import reactor.core.publisher.Flux;
31+
import reactor.core.publisher.Mono;
32+
33+
import org.springframework.core.io.buffer.DataBuffer;
34+
import org.springframework.core.io.buffer.DataBufferFactory;
35+
import org.springframework.core.io.buffer.DataBufferUtils;
36+
import org.springframework.http.HttpHeaders;
37+
import org.springframework.http.HttpMethod;
38+
import org.springframework.http.MediaType;
39+
import org.springframework.lang.Nullable;
40+
import org.springframework.util.Assert;
41+
42+
/**
43+
* {@link ClientHttpRequest} implementation for the Jetty ReactiveStreams HTTP client.
44+
*
45+
* @author Sebastien Deleuze
46+
* @since 5.1
47+
* @see <a href="https://github.com/jetty-project/jetty-reactive-httpclient">Jetty ReactiveStreams HttpClient</a>
48+
*/
49+
class JettyClientHttpRequest extends AbstractClientHttpRequest {
50+
51+
private final Request jettyRequest;
52+
53+
private final DataBufferFactory bufferFactory;
54+
55+
@Nullable
56+
private ReactiveRequest reactiveRequest;
57+
58+
59+
public JettyClientHttpRequest(Request jettyRequest, DataBufferFactory bufferFactory) {
60+
this.jettyRequest = jettyRequest;
61+
this.bufferFactory = bufferFactory;
62+
}
63+
64+
65+
@Override
66+
public HttpMethod getMethod() {
67+
HttpMethod method = HttpMethod.resolve(this.jettyRequest.getMethod());
68+
Assert.state(method != null, "Method must not be null");
69+
return method;
70+
}
71+
72+
@Override
73+
public URI getURI() {
74+
return this.jettyRequest.getURI();
75+
}
76+
77+
@Override
78+
public Mono<Void> setComplete() {
79+
return doCommit(this::completes);
80+
}
81+
82+
@Override
83+
public DataBufferFactory bufferFactory() {
84+
return this.bufferFactory;
85+
}
86+
87+
@Override
88+
public Mono<Void> writeWith(Publisher<? extends DataBuffer> publisher) {
89+
Flux<ContentChunk> chunks = Flux.from(publisher).map(this::toContentChunk);
90+
MediaType contentType = getHeaders().getContentType();
91+
ReactiveRequest.Content requestContent = ReactiveRequest.Content.fromPublisher(chunks,
92+
(contentType != null ? contentType.toString() : MediaType.APPLICATION_OCTET_STREAM_VALUE));
93+
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(requestContent).build();
94+
return doCommit(this::completes);
95+
}
96+
97+
@Override
98+
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
99+
String contentType = this.jettyRequest.getHeaders().getField(HttpHeader.CONTENT_TYPE).getValue();
100+
Flux<ContentChunk> chunks = Flux.from(body).flatMap(Function.identity()).map(this::toContentChunk);
101+
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, contentType);
102+
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
103+
return doCommit(this::completes);
104+
}
105+
106+
private Mono<Void> completes() {
107+
return Mono.empty();
108+
}
109+
110+
private ContentChunk toContentChunk(DataBuffer buffer) {
111+
return new ContentChunk(buffer.asByteBuffer(), new Callback() {
112+
@Override
113+
public void succeeded() {
114+
DataBufferUtils.release(buffer);
115+
}
116+
117+
@Override
118+
public void failed(Throwable x) {
119+
DataBufferUtils.release(buffer);
120+
throw Exceptions.propagate(x);
121+
}
122+
});
123+
}
124+
125+
126+
@Override
127+
protected void applyCookies() {
128+
getCookies().values().stream().flatMap(Collection::stream)
129+
.map(cookie -> new HttpCookie(cookie.getName(), cookie.getValue()))
130+
.forEach(this.jettyRequest::cookie);
131+
}
132+
133+
@Override
134+
protected void applyHeaders() {
135+
HttpHeaders headers = getHeaders();
136+
headers.forEach((key, value) -> value.forEach(v -> this.jettyRequest.header(key, v)));
137+
if (!headers.containsKey(HttpHeaders.ACCEPT)) {
138+
this.jettyRequest.header(HttpHeaders.ACCEPT, "*/*");
139+
}
140+
}
141+
142+
ReactiveRequest getReactiveRequest() {
143+
if (this.reactiveRequest == null) {
144+
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).build();
145+
}
146+
return this.reactiveRequest;
147+
}
148+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright 2002-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.http.client.reactive;
17+
18+
import java.net.HttpCookie;
19+
20+
import org.eclipse.jetty.reactive.client.ReactiveResponse;
21+
import org.reactivestreams.Publisher;
22+
import reactor.core.publisher.Flux;
23+
24+
import org.springframework.core.io.buffer.DataBuffer;
25+
import org.springframework.http.HttpHeaders;
26+
import org.springframework.http.HttpStatus;
27+
import org.springframework.http.ResponseCookie;
28+
import org.springframework.util.Assert;
29+
import org.springframework.util.CollectionUtils;
30+
import org.springframework.util.LinkedMultiValueMap;
31+
import org.springframework.util.MultiValueMap;
32+
33+
/**
34+
* {@link ClientHttpResponse} implementation for the Jetty ReactiveStreams HTTP client.
35+
*
36+
* @author Sebastien Deleuze
37+
* @since 5.1
38+
* @see <a href="https://github.com/jetty-project/jetty-reactive-httpclient">Jetty ReactiveStreams HttpClient</a>
39+
*/
40+
class JettyClientHttpResponse implements ClientHttpResponse {
41+
42+
private final ReactiveResponse reactiveResponse;
43+
44+
private final Flux<DataBuffer> content;
45+
46+
47+
public JettyClientHttpResponse(ReactiveResponse reactiveResponse, Publisher<DataBuffer> content) {
48+
Assert.notNull(reactiveResponse, "reactiveResponse should not be null");
49+
Assert.notNull(content, "content should not be null");
50+
this.reactiveResponse = reactiveResponse;
51+
this.content = Flux.from(content);
52+
}
53+
54+
55+
@Override
56+
public HttpStatus getStatusCode() {
57+
return HttpStatus.valueOf(getRawStatusCode());
58+
}
59+
60+
@Override
61+
public int getRawStatusCode() {
62+
return this.reactiveResponse.getStatus();
63+
}
64+
65+
@Override
66+
public MultiValueMap<String, ResponseCookie> getCookies() {
67+
MultiValueMap<String, ResponseCookie> result = new LinkedMultiValueMap<>();
68+
getHeaders().get(HttpHeaders.SET_COOKIE).forEach(header -> {
69+
HttpCookie.parse(header).forEach(cookie -> result.add(cookie.getName(), ResponseCookie.from(cookie.getName(), cookie.getValue())
70+
.domain(cookie.getDomain())
71+
.path(cookie.getPath())
72+
.maxAge(cookie.getMaxAge())
73+
.secure(cookie.getSecure())
74+
.httpOnly(cookie.isHttpOnly())
75+
.build()));
76+
77+
});
78+
return CollectionUtils.unmodifiableMultiValueMap(result);
79+
}
80+
81+
@Override
82+
public Flux<DataBuffer> getBody() {
83+
return this.content;
84+
}
85+
86+
@Override
87+
public HttpHeaders getHeaders() {
88+
HttpHeaders headers = new HttpHeaders();
89+
this.reactiveResponse.getHeaders().stream()
90+
.forEach(e -> headers.add(e.getName(), e.getValue()));
91+
return headers;
92+
}
93+
94+
}

0 commit comments

Comments
 (0)