Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2020 the original author or authors.
* Copyright 2017-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,15 +17,13 @@
package org.springframework.integration.webflux.outbound;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Map;

import org.reactivestreams.Publisher;

import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.http.HttpEntity;
Expand All @@ -41,15 +39,13 @@
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.BodyExtractor;
import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import org.springframework.web.util.DefaultUriBuilderFactory;

import reactor.core.publisher.Flux;
Expand All @@ -62,6 +58,7 @@
* @author Shiliang Li
* @author Artem Bilan
* @author Gary Russell
* @author David Graff
*
* @since 5.0
*
Expand Down Expand Up @@ -299,32 +296,7 @@ private WebClient.RequestBodySpec createRequestBodySpec(Object uri, HttpMethod h

private Mono<ClientResponse> exchangeForResponseMono(WebClient.RequestBodySpec requestSpec) {
return requestSpec.retrieve()
.onStatus(HttpStatus::isError, response -> {
HttpStatus httpStatus = response.statusCode();
return response.body(BodyExtractors.toDataBuffers())
.reduce(DataBuffer::write)
.map(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
return bytes;
})
.defaultIfEmpty(new byte[0])
.map(bodyBytes -> {
throw new WebClientResponseException(
"ClientResponse has erroneous status code: "
+ httpStatus.value() + " "
+ httpStatus.getReasonPhrase(),
httpStatus.value(),
httpStatus.getReasonPhrase(),
response.headers().asHttpHeaders(),
bodyBytes,
response.headers().contentType()
.map(MimeType::getCharset)
.orElse(StandardCharsets.ISO_8859_1));
}
);
})
.onStatus(HttpStatus::isError, ClientResponse::createException)
.toEntityList(DataBuffer.class)
.map((entity) ->
ClientResponse.create(entity.getStatusCode())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2020 the original author or authors.
* Copyright 2017-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.nio.charset.StandardCharsets;
import java.time.Duration;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -47,6 +48,7 @@
/**
* @author Shiliang Li
* @author Artem Bilan
* @author David Graff
*
* @since 5.0
*/
Expand Down Expand Up @@ -108,6 +110,8 @@ void testReactiveErrorOneWay() {
assertThat(errorMessage).isNotNull();
assertThat(errorMessage).isInstanceOf(ErrorMessage.class);
Throwable throwable = (Throwable) errorMessage.getPayload();
assertThat(throwable).isInstanceOf(MessageHandlingException.class);
assertThat(throwable.getCause()).isInstanceOf(WebClientResponseException.Unauthorized.class);
assertThat(throwable.getMessage()).contains("401 Unauthorized");
}

Expand Down Expand Up @@ -173,7 +177,8 @@ void testServiceUnavailableWithoutBody() {
assertThat(payload).isInstanceOf(MessageHandlingException.class);

Exception exception = (Exception) payload;
assertThat(exception.getCause()).isInstanceOf(WebClientResponseException.class);
assertThat(exception).isInstanceOf(MessageHandlingException.class);
assertThat(exception.getCause()).isInstanceOf(WebClientResponseException.ServiceUnavailable.class);
assertThat(exception.getMessage()).contains("503 Service Unavailable");

Message<?> replyMessage = errorChannel.receive(10);
Expand Down Expand Up @@ -273,4 +278,57 @@ void testClientHttpResponseAsReply() {
.verifyComplete();
}


@Test
void testClientHttpResponseErrorAsReply() {
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
response.setStatusCode(HttpStatus.NOT_FOUND);
response.getHeaders().setContentType(MediaType.APPLICATION_JSON);

DataBufferFactory bufferFactory = response.bufferFactory();

Flux<DataBuffer> data =
Flux.just(
bufferFactory.wrap("{".getBytes(StandardCharsets.UTF_8)),
bufferFactory.wrap(" \"error\": \"Not Found\",".getBytes(StandardCharsets.UTF_8)),
bufferFactory.wrap(" \"message\": \"404 NOT_FOUND\",".getBytes(StandardCharsets.UTF_8)),
bufferFactory.wrap(" \"path\": \"/spring-integration\",".getBytes(StandardCharsets.UTF_8)),
bufferFactory.wrap(" \"status\": 404,".getBytes(StandardCharsets.UTF_8)),
bufferFactory.wrap(" \"timestamp\": \"1970-01-01T00:00:00.000+00:00\",".getBytes(StandardCharsets.UTF_8)),
bufferFactory.wrap(" \"trace\": \"some really\nlong\ntrace\",".getBytes(StandardCharsets.UTF_8)),
bufferFactory.wrap("}".getBytes(StandardCharsets.UTF_8))
);

return response.writeWith(data)
.then(Mono.defer(response::setComplete));
});

WebClient webClient = WebClient.builder()
.clientConnector(httpConnector)
.build();

String destinationUri = "https://www.springsource.org/spring-integration";
WebFluxRequestExecutingMessageHandler reactiveHandler =
new WebFluxRequestExecutingMessageHandler(destinationUri, webClient);

QueueChannel replyChannel = new QueueChannel();
QueueChannel errorChannel = new QueueChannel();
reactiveHandler.setOutputChannel(replyChannel);
reactiveHandler.setBodyExtractor(new ClientHttpResponseBodyExtractor());

final Message<?> message =
MessageBuilder.withPayload("hello, world")
.setErrorChannel(errorChannel)
.build();
reactiveHandler.handleMessage(message);

Message<?> errorMessage = errorChannel.receive(10_000);

assertThat(errorMessage).isNotNull();
assertThat(errorMessage).isInstanceOf(ErrorMessage.class);
final Throwable throwable = (Throwable) errorMessage.getPayload();
assertThat(throwable).isInstanceOf(MessageHandlingException.class);
assertThat(throwable.getCause()).isInstanceOf(WebClientResponseException.NotFound.class);
assertThat(throwable.getMessage()).contains("404 Not Found");
}
}