diff --git a/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandler.java b/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandler.java index 66c1889e332..bc4760d0068 100644 --- a/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandler.java +++ b/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,6 @@ package org.springframework.integration.webflux.outbound; import java.net.URI; -import java.nio.charset.StandardCharsets; import java.util.Map; import org.reactivestreams.Publisher; @@ -25,7 +24,6 @@ import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.io.Resource; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.expression.Expression; import org.springframework.expression.common.LiteralExpression; import org.springframework.http.HttpEntity; @@ -41,7 +39,6 @@ import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.util.Assert; -import org.springframework.util.MimeType; import org.springframework.util.MultiValueMap; import org.springframework.web.reactive.function.BodyExtractor; import org.springframework.web.reactive.function.BodyExtractors; @@ -49,7 +46,6 @@ import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.web.reactive.function.client.WebClientResponseException; import org.springframework.web.util.DefaultUriBuilderFactory; import reactor.core.publisher.Flux; @@ -62,6 +58,7 @@ * @author Shiliang Li * @author Artem Bilan * @author Gary Russell + * @author David Graff * * @since 5.0 * @@ -299,32 +296,7 @@ private WebClient.RequestBodySpec createRequestBodySpec(Object uri, HttpMethod h private Mono exchangeForResponseMono(WebClient.RequestBodySpec requestSpec) { return requestSpec.retrieve() - .onStatus(HttpStatus::isError, response -> { - HttpStatus httpStatus = response.statusCode(); - return response.body(BodyExtractors.toDataBuffers()) - .reduce(DataBuffer::write) - .map(dataBuffer -> { - byte[] bytes = new byte[dataBuffer.readableByteCount()]; - dataBuffer.read(bytes); - DataBufferUtils.release(dataBuffer); - return bytes; - }) - .defaultIfEmpty(new byte[0]) - .map(bodyBytes -> { - throw new WebClientResponseException( - "ClientResponse has erroneous status code: " - + httpStatus.value() + " " - + httpStatus.getReasonPhrase(), - httpStatus.value(), - httpStatus.getReasonPhrase(), - response.headers().asHttpHeaders(), - bodyBytes, - response.headers().contentType() - .map(MimeType::getCharset) - .orElse(StandardCharsets.ISO_8859_1)); - } - ); - }) + .onStatus(HttpStatus::isError, ClientResponse::createException) .toEntityList(DataBuffer.class) .map((entity) -> ClientResponse.create(entity.getStatusCode()) diff --git a/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandlerTests.java b/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandlerTests.java index 6d5ccab8f75..b247b395933 100644 --- a/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandlerTests.java +++ b/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.nio.charset.StandardCharsets; import java.time.Duration; import org.junit.jupiter.api.Test; @@ -47,6 +48,7 @@ /** * @author Shiliang Li * @author Artem Bilan + * @author David Graff * * @since 5.0 */ @@ -108,6 +110,8 @@ void testReactiveErrorOneWay() { assertThat(errorMessage).isNotNull(); assertThat(errorMessage).isInstanceOf(ErrorMessage.class); Throwable throwable = (Throwable) errorMessage.getPayload(); + assertThat(throwable).isInstanceOf(MessageHandlingException.class); + assertThat(throwable.getCause()).isInstanceOf(WebClientResponseException.Unauthorized.class); assertThat(throwable.getMessage()).contains("401 Unauthorized"); } @@ -173,7 +177,8 @@ void testServiceUnavailableWithoutBody() { assertThat(payload).isInstanceOf(MessageHandlingException.class); Exception exception = (Exception) payload; - assertThat(exception.getCause()).isInstanceOf(WebClientResponseException.class); + assertThat(exception).isInstanceOf(MessageHandlingException.class); + assertThat(exception.getCause()).isInstanceOf(WebClientResponseException.ServiceUnavailable.class); assertThat(exception.getMessage()).contains("503 Service Unavailable"); Message replyMessage = errorChannel.receive(10); @@ -273,4 +278,57 @@ void testClientHttpResponseAsReply() { .verifyComplete(); } + + @Test + void testClientHttpResponseErrorAsReply() { + ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> { + response.setStatusCode(HttpStatus.NOT_FOUND); + response.getHeaders().setContentType(MediaType.APPLICATION_JSON); + + DataBufferFactory bufferFactory = response.bufferFactory(); + + Flux data = + Flux.just( + bufferFactory.wrap("{".getBytes(StandardCharsets.UTF_8)), + bufferFactory.wrap(" \"error\": \"Not Found\",".getBytes(StandardCharsets.UTF_8)), + bufferFactory.wrap(" \"message\": \"404 NOT_FOUND\",".getBytes(StandardCharsets.UTF_8)), + bufferFactory.wrap(" \"path\": \"/spring-integration\",".getBytes(StandardCharsets.UTF_8)), + bufferFactory.wrap(" \"status\": 404,".getBytes(StandardCharsets.UTF_8)), + bufferFactory.wrap(" \"timestamp\": \"1970-01-01T00:00:00.000+00:00\",".getBytes(StandardCharsets.UTF_8)), + bufferFactory.wrap(" \"trace\": \"some really\nlong\ntrace\",".getBytes(StandardCharsets.UTF_8)), + bufferFactory.wrap("}".getBytes(StandardCharsets.UTF_8)) + ); + + return response.writeWith(data) + .then(Mono.defer(response::setComplete)); + }); + + WebClient webClient = WebClient.builder() + .clientConnector(httpConnector) + .build(); + + String destinationUri = "https://www.springsource.org/spring-integration"; + WebFluxRequestExecutingMessageHandler reactiveHandler = + new WebFluxRequestExecutingMessageHandler(destinationUri, webClient); + + QueueChannel replyChannel = new QueueChannel(); + QueueChannel errorChannel = new QueueChannel(); + reactiveHandler.setOutputChannel(replyChannel); + reactiveHandler.setBodyExtractor(new ClientHttpResponseBodyExtractor()); + + final Message message = + MessageBuilder.withPayload("hello, world") + .setErrorChannel(errorChannel) + .build(); + reactiveHandler.handleMessage(message); + + Message errorMessage = errorChannel.receive(10_000); + + assertThat(errorMessage).isNotNull(); + assertThat(errorMessage).isInstanceOf(ErrorMessage.class); + final Throwable throwable = (Throwable) errorMessage.getPayload(); + assertThat(throwable).isInstanceOf(MessageHandlingException.class); + assertThat(throwable.getCause()).isInstanceOf(WebClientResponseException.NotFound.class); + assertThat(throwable.getMessage()).contains("404 Not Found"); + } }