diff --git a/core/src/main/java/io/confluent/rest/RestConfig.java b/core/src/main/java/io/confluent/rest/RestConfig.java index b4ca7d02f2..307a373931 100644 --- a/core/src/main/java/io/confluent/rest/RestConfig.java +++ b/core/src/main/java/io/confluent/rest/RestConfig.java @@ -616,6 +616,16 @@ public class RestConfig extends AbstractConfig { protected static final boolean SUPPRESS_STACK_TRACE_IN_RESPONSE_DEFAULT = true; + public static final String RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_CONFIG = + "return.429.instead.of.500.for.jetty.response.errors"; + protected static final String RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_DOC = + "If true, return 429 Too Many Requests instead of 500 Internal Server Error " + + "for errors coming from Jetty response handlers, the particular error being " + + "'Response does not exist (likely recycled)'. " + + "Default is false."; + protected static final boolean RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_DEFAULT = + false; + static final List SUPPORTED_URI_SCHEMES = unmodifiableList(Arrays.asList("http", "https")); @@ -1236,6 +1246,12 @@ private static ConfigDef incompleteBaseConfigDef() { NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_VALIDATOR, Importance.LOW, NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_DOC + ).define( + RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_CONFIG, + Type.BOOLEAN, + RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_DEFAULT, + Importance.LOW, + RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_DOC ); } @@ -1383,6 +1399,10 @@ public final boolean getSuppressStackTraceInResponse() { return getBoolean(SUPPRESS_STACK_TRACE_IN_RESPONSE); } + public final boolean getReturn429InsteadOf500ForJettyResponseErrors() { + return getBoolean(RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_CONFIG); + } + public final List getListeners() { return parseListeners( getList(RestConfig.LISTENERS_CONFIG), diff --git a/core/src/main/java/io/confluent/rest/exceptions/GenericExceptionMapper.java b/core/src/main/java/io/confluent/rest/exceptions/GenericExceptionMapper.java index da97284653..07931f8068 100644 --- a/core/src/main/java/io/confluent/rest/exceptions/GenericExceptionMapper.java +++ b/core/src/main/java/io/confluent/rest/exceptions/GenericExceptionMapper.java @@ -37,6 +37,21 @@ public GenericExceptionMapper(RestConfig restConfig) { public Response toResponse(Throwable exc) { log.error("Unhandled exception resulting in internal server error response", exc); + try { + if (restConfig.getReturn429InsteadOf500ForJettyResponseErrors()) { + // #inc-3209, instead of returning a 500 error, we return a 429 error temporarily + if (exc instanceof IllegalStateException && exc.getMessage() != null + && exc.getMessage().contains("Response does not exist (likely recycled)")) { + return createResponse(exc, Response.Status.TOO_MANY_REQUESTS.getStatusCode(), + Response.Status.TOO_MANY_REQUESTS, + Response.Status.TOO_MANY_REQUESTS.getReasonPhrase()).build(); + } + } + } catch (Exception e) { + // If we fail to get the rest config correctly, we log the error + log.error("Failed to create response for exception", e); + } + // There's no more specific information about the exception that can be passed back to the user, // so we can only use the generic message. Debug mode will append the exception info. return createResponse(exc, Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), diff --git a/core/src/test/java/io/confluent/rest/KafkaExceptionMapperTest.java b/core/src/test/java/io/confluent/rest/KafkaExceptionMapperTest.java index 6906bc4aae..f7f1f96145 100644 --- a/core/src/test/java/io/confluent/rest/KafkaExceptionMapperTest.java +++ b/core/src/test/java/io/confluent/rest/KafkaExceptionMapperTest.java @@ -53,6 +53,9 @@ import jakarta.ws.rs.core.Response; import jakarta.ws.rs.core.Response.Status; + +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -193,6 +196,28 @@ public void testWrappedExceptions() { verifyMapperResponse(new ExecutionException(unknownServerException), Status.BAD_REQUEST, KAFKA_BAD_REQUEST_ERROR_CODE); } + @Test + public void testGenericExceptionMapper_temp_throw429InsteadOf500() { + Map props = new HashMap<>(); + props.put(RestConfig.RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_CONFIG, false); + RestConfig configFalse = new RestConfig(RestConfig.baseConfigDef(), props); + KafkaExceptionMapper exceptionMapperWithConfigFalse = new KafkaExceptionMapper(configFalse); + + Response responseWithConfigFalse = + exceptionMapperWithConfigFalse.toResponse(new IllegalStateException("Response does not exist (likely recycled)")); + assertNotNull(responseWithConfigFalse); + assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), responseWithConfigFalse.getStatus()); + + props.put(RestConfig.RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_CONFIG, true); + RestConfig configTrue = new RestConfig(RestConfig.baseConfigDef(), props); + KafkaExceptionMapper exceptionMapperWithConfigTrue = new KafkaExceptionMapper(configTrue); + Response responseWithConfigTrue = + exceptionMapperWithConfigTrue.toResponse(new IllegalStateException("Response does not exist (likely recycled)")); + + assertNotNull(responseWithConfigTrue); + assertEquals(Status.TOO_MANY_REQUESTS.getStatusCode(), responseWithConfigTrue.getStatus()); + } + private void verifyMapperResponse(Throwable throwable, Status status, int errorCode) { Response response = exceptionMapper.toResponse(throwable); assertNotNull(response);