Skip to content

Commit 5739cbb

Browse files
committed
Add Async withHandler() support for any response body handler
1 parent baf191b commit 5739cbb

File tree

4 files changed

+113
-4
lines changed

4 files changed

+113
-4
lines changed

client/src/main/java/io/avaje/http/client/DHttpAsync.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,17 @@ class DHttpAsync implements HttpAsyncResponse {
1313
}
1414

1515
@Override
16-
public CompletableFuture<HttpResponse<Void>> asDiscarding() {
16+
public <E> CompletableFuture<HttpResponse<E>> withHandler(HttpResponse.BodyHandler<E> handler) {
1717
return request
18-
.performSendAsync(false, HttpResponse.BodyHandlers.discarding())
18+
.performSendAsync(false, handler)
1919
.thenApply(request::afterAsync);
2020
}
2121

22+
@Override
23+
public CompletableFuture<HttpResponse<Void>> asDiscarding() {
24+
return withHandler(HttpResponse.BodyHandlers.discarding());
25+
}
26+
2227
@Override
2328
public CompletableFuture<HttpResponse<String>> asString() {
2429
return request

client/src/main/java/io/avaje/http/client/DHttpClientRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,8 @@ public String responseBody() {
553553
if (encodedResponseBody != null) {
554554
return context.maxResponseBody(new String(encodedResponseBody.content(), StandardCharsets.UTF_8));
555555
} else if (httpResponse != null && loggableResponseBody) {
556-
return context.maxResponseBody(httpResponse.body().toString());
556+
final Object body = httpResponse.body();
557+
return (body == null) ? null : context.maxResponseBody(body.toString());
557558
}
558559
return null;
559560
}

client/src/main/java/io/avaje/http/client/HttpAsyncResponse.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import java.util.concurrent.CompletableFuture;
66

77
/**
8-
* Async responses as CompletableFuture.
8+
* Async processing of the request with responses as CompletableFuture.
99
*/
1010
public interface HttpAsyncResponse {
1111

@@ -29,6 +29,8 @@ public interface HttpAsyncResponse {
2929
* });
3030
*
3131
* }</pre>
32+
*
33+
* @return The CompletableFuture of the response
3234
*/
3335
CompletableFuture<HttpResponse<Void>> asDiscarding();
3436

@@ -53,9 +55,53 @@ public interface HttpAsyncResponse {
5355
* });
5456
*
5557
* }</pre>
58+
*
59+
* @return The CompletableFuture of the response
5660
*/
5761
CompletableFuture<HttpResponse<String>> asString();
5862

63+
/**
64+
* Process with any given {@code HttpResponse.BodyHandler}.
65+
*
66+
* <h3>Example: line subscriber</h3>
67+
* <p>
68+
* Subscribe line by line to the response.
69+
* </p>
70+
* <pre>{@code
71+
*
72+
* CompletableFuture<HttpResponse<Void>> future = clientContext.request()
73+
* .path("hello/lineStream")
74+
* .GET().async()
75+
* .withHandler(HttpResponse.BodyHandlers.fromLineSubscriber(new Flow.Subscriber<>() {
76+
*
77+
* @Override
78+
* public void onSubscribe(Flow.Subscription subscription) {
79+
* subscription.request(Long.MAX_VALUE);
80+
* }
81+
* @Override
82+
* public void onNext(String item) {
83+
* ...
84+
* }
85+
* @Override
86+
* public void onError(Throwable throwable) {
87+
* ...
88+
* }
89+
* @Override
90+
* public void onComplete() {
91+
* ...
92+
* }
93+
* }))
94+
* .whenComplete((hres, throwable) -> {
95+
* int statusCode = hres.statusCode();
96+
* ...
97+
* });
98+
* }</pre>
99+
*
100+
* @param bodyHandlers The body handler to use to process the response
101+
* @return The CompletableFuture of the response
102+
*/
103+
<E> CompletableFuture<HttpResponse<E>> withHandler(HttpResponse.BodyHandler<E> bodyHandlers);
104+
59105
/**
60106
* Process expecting a bean response body (typically from json content).
61107
* <p>
@@ -84,6 +130,9 @@ public interface HttpAsyncResponse {
84130
* }
85131
* });
86132
* }</pre>
133+
*
134+
* @param type The bean type to convert the content to
135+
* @return The CompletableFuture of the response
87136
*/
88137
<E> CompletableFuture<E> bean(Class<E> type);
89138

@@ -112,6 +161,9 @@ public interface HttpAsyncResponse {
112161
* }
113162
* });
114163
* }</pre>
164+
*
165+
* @param type The bean type to convert the content to
166+
* @return The CompletableFuture of the response
115167
*/
116168
<E> CompletableFuture<List<E>> list(Class<E> type);
117169

client/src/test/java/io/avaje/http/client/HelloControllerTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
import org.junit.jupiter.api.Test;
66

77
import java.net.http.HttpResponse;
8+
import java.util.ArrayList;
89
import java.util.List;
910
import java.util.Map;
1011
import java.util.concurrent.CompletableFuture;
1112
import java.util.concurrent.CompletionException;
1213
import java.util.concurrent.ExecutionException;
14+
import java.util.concurrent.Flow;
1315
import java.util.concurrent.atomic.AtomicInteger;
1416
import java.util.concurrent.atomic.AtomicReference;
1517
import java.util.stream.Collectors;
@@ -38,6 +40,55 @@ void get_stream() {
3840
assertThat(first.name).isEqualTo("one");
3941
}
4042

43+
@Test
44+
void async_stream() throws ExecutionException, InterruptedException {
45+
46+
AtomicReference<HttpResponse<Void>> hresRef = new AtomicReference<>();
47+
AtomicReference<Throwable> errRef = new AtomicReference<>();
48+
AtomicReference<Boolean> completeRef = new AtomicReference<>();
49+
AtomicReference<Boolean> onSubscribeRef = new AtomicReference<>();
50+
51+
final List<String> lines = new ArrayList<>();
52+
53+
final CompletableFuture<HttpResponse<Void>> future = clientContext.request()
54+
.path("hello/stream")
55+
.GET()
56+
.async().withHandler(HttpResponse.BodyHandlers.fromLineSubscriber(new Flow.Subscriber<>() {
57+
@Override
58+
public void onSubscribe(Flow.Subscription subscription) {
59+
subscription.request(Long.MAX_VALUE);
60+
onSubscribeRef.set(true);
61+
}
62+
@Override
63+
public void onNext(String item) {
64+
lines.add(item);
65+
}
66+
@Override
67+
public void onError(Throwable throwable) {
68+
errRef.set(throwable);
69+
}
70+
@Override
71+
public void onComplete() {
72+
completeRef.set(true);
73+
}
74+
})).whenComplete((hres, throwable) -> {
75+
hresRef.set(hres);
76+
assertThat(hres.statusCode()).isEqualTo(200);
77+
assertThat(throwable).isNull();
78+
});
79+
80+
// just wait
81+
assertThat(future.get()).isSameAs(hresRef.get());
82+
83+
assertThat(onSubscribeRef.get()).isTrue();
84+
assertThat(completeRef.get()).isTrue();
85+
assertThat(errRef.get()).isNull();
86+
assertThat(lines).hasSize(4);
87+
88+
final String first = lines.get(0);
89+
assertThat(first).isEqualTo("{\"id\":1, \"name\":\"one\"}");
90+
}
91+
4192
@Test
4293
void get_helloMessage() {
4394

0 commit comments

Comments
 (0)