From b227988833e816aa90910f990a1dd4cf30482cac Mon Sep 17 00:00:00 2001 From: Hritwik Singhai Date: Fri, 4 Jul 2025 16:21:01 +0100 Subject: [PATCH 1/5] temp: for inc-3209, we return a 429 instead of 500 --- .../confluent/rest/exceptions/GenericExceptionMapper.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/src/main/java/io/confluent/rest/exceptions/GenericExceptionMapper.java b/core/src/main/java/io/confluent/rest/exceptions/GenericExceptionMapper.java index da97284653..498e639daf 100644 --- a/core/src/main/java/io/confluent/rest/exceptions/GenericExceptionMapper.java +++ b/core/src/main/java/io/confluent/rest/exceptions/GenericExceptionMapper.java @@ -37,6 +37,13 @@ public GenericExceptionMapper(RestConfig restConfig) { public Response toResponse(Throwable exc) { log.error("Unhandled exception resulting in internal server error response", exc); + // #inc-3209, instead of returning a 500 error, we return a 429 error temporarily + if (exc instanceof IllegalStateException && exc.getMessage() != null + && exc.getMessage().contains("Response does not exist (likely recycled)")) { + return createResponse(exc, Response.Status.TOO_MANY_REQUESTS.getStatusCode(), + Response.Status.TOO_MANY_REQUESTS, + Response.Status.TOO_MANY_REQUESTS.getReasonPhrase()).build(); + } // There's no more specific information about the exception that can be passed back to the user, // so we can only use the generic message. Debug mode will append the exception info. return createResponse(exc, Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), From 9691e060e3fcfcc59a3c643f18a494adbf8f660d Mon Sep 17 00:00:00 2001 From: Hritwik Singhai Date: Mon, 7 Jul 2025 13:12:48 +0100 Subject: [PATCH 2/5] added behind a RestConfig --- .../main/java/io/confluent/rest/RestConfig.java | 17 +++++++++++++++++ .../rest/exceptions/GenericExceptionMapper.java | 15 +++++++++------ 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/io/confluent/rest/RestConfig.java b/core/src/main/java/io/confluent/rest/RestConfig.java index b4ca7d02f2..0267878c46 100644 --- a/core/src/main/java/io/confluent/rest/RestConfig.java +++ b/core/src/main/java/io/confluent/rest/RestConfig.java @@ -616,6 +616,13 @@ public class RestConfig extends AbstractConfig { protected static final boolean SUPPRESS_STACK_TRACE_IN_RESPONSE_DEFAULT = true; + public static final String RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_CONFIG = "sni.host.check.enabled"; + protected static final String RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_DOC = + "If true, return 429 Too Many Requests instead of 500 Internal Server Error for errors coming from Jetty " + + "response handlers, the particular error being 'Response does not exist (likely recycled)'. " + + "Default is false."; + protected static final boolean RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_DEFAULT = false; + static final List SUPPORTED_URI_SCHEMES = unmodifiableList(Arrays.asList("http", "https")); @@ -1236,6 +1243,12 @@ private static ConfigDef incompleteBaseConfigDef() { NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_VALIDATOR, Importance.LOW, NETWORK_TRAFFIC_RATE_LIMIT_BYTES_PER_SEC_DOC + ).define( + RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_CONFIG, + Type.BOOLEAN, + RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_DEFAULT, + Importance.LOW, + RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_DOC ); } @@ -1383,6 +1396,10 @@ public final boolean getSuppressStackTraceInResponse() { return getBoolean(SUPPRESS_STACK_TRACE_IN_RESPONSE); } + public final boolean getReturn429InsteadOf500ForJettyResponseErrors() { + return getBoolean(RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_CONFIG); + } + public final List getListeners() { return parseListeners( getList(RestConfig.LISTENERS_CONFIG), diff --git a/core/src/main/java/io/confluent/rest/exceptions/GenericExceptionMapper.java b/core/src/main/java/io/confluent/rest/exceptions/GenericExceptionMapper.java index 498e639daf..f97095be47 100644 --- a/core/src/main/java/io/confluent/rest/exceptions/GenericExceptionMapper.java +++ b/core/src/main/java/io/confluent/rest/exceptions/GenericExceptionMapper.java @@ -37,13 +37,16 @@ public GenericExceptionMapper(RestConfig restConfig) { public Response toResponse(Throwable exc) { log.error("Unhandled exception resulting in internal server error response", exc); - // #inc-3209, instead of returning a 500 error, we return a 429 error temporarily - if (exc instanceof IllegalStateException && exc.getMessage() != null - && exc.getMessage().contains("Response does not exist (likely recycled)")) { - return createResponse(exc, Response.Status.TOO_MANY_REQUESTS.getStatusCode(), - Response.Status.TOO_MANY_REQUESTS, - Response.Status.TOO_MANY_REQUESTS.getReasonPhrase()).build(); + if (restConfig.getReturn429InsteadOf500ForJettyResponseErrors()) { + // #inc-3209, instead of returning a 500 error, we return a 429 error temporarily + if (exc instanceof IllegalStateException && exc.getMessage() != null + && exc.getMessage().contains("Response does not exist (likely recycled)")) { + return createResponse(exc, Response.Status.TOO_MANY_REQUESTS.getStatusCode(), + Response.Status.TOO_MANY_REQUESTS, + Response.Status.TOO_MANY_REQUESTS.getReasonPhrase()).build(); + } } + // There's no more specific information about the exception that can be passed back to the user, // so we can only use the generic message. Debug mode will append the exception info. return createResponse(exc, Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), From b59226a49efdf9bef05d0e057fad009ae22d1f87 Mon Sep 17 00:00:00 2001 From: Hritwik Singhai Date: Mon, 7 Jul 2025 13:35:19 +0100 Subject: [PATCH 3/5] name the config correctly --- .../src/main/java/io/confluent/rest/RestConfig.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/io/confluent/rest/RestConfig.java b/core/src/main/java/io/confluent/rest/RestConfig.java index 0267878c46..307a373931 100644 --- a/core/src/main/java/io/confluent/rest/RestConfig.java +++ b/core/src/main/java/io/confluent/rest/RestConfig.java @@ -616,12 +616,15 @@ public class RestConfig extends AbstractConfig { protected static final boolean SUPPRESS_STACK_TRACE_IN_RESPONSE_DEFAULT = true; - public static final String RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_CONFIG = "sni.host.check.enabled"; + public static final String RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_CONFIG = + "return.429.instead.of.500.for.jetty.response.errors"; protected static final String RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_DOC = - "If true, return 429 Too Many Requests instead of 500 Internal Server Error for errors coming from Jetty " + - "response handlers, the particular error being 'Response does not exist (likely recycled)'. " + - "Default is false."; - protected static final boolean RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_DEFAULT = false; + "If true, return 429 Too Many Requests instead of 500 Internal Server Error " + + "for errors coming from Jetty response handlers, the particular error being " + + "'Response does not exist (likely recycled)'. " + + "Default is false."; + protected static final boolean RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_DEFAULT = + false; static final List SUPPORTED_URI_SCHEMES = unmodifiableList(Arrays.asList("http", "https")); From 76bb723cdcfc0d76471cc5d23b46e799ba4b63c4 Mon Sep 17 00:00:00 2001 From: Hritwik Singhai Date: Mon, 7 Jul 2025 17:28:00 +0100 Subject: [PATCH 4/5] fix build issues --- .../exceptions/GenericExceptionMapper.java | 19 ++++++++++++------- .../rest/KafkaExceptionMapperTest.java | 9 ++++++++- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/io/confluent/rest/exceptions/GenericExceptionMapper.java b/core/src/main/java/io/confluent/rest/exceptions/GenericExceptionMapper.java index f97095be47..07931f8068 100644 --- a/core/src/main/java/io/confluent/rest/exceptions/GenericExceptionMapper.java +++ b/core/src/main/java/io/confluent/rest/exceptions/GenericExceptionMapper.java @@ -37,14 +37,19 @@ public GenericExceptionMapper(RestConfig restConfig) { public Response toResponse(Throwable exc) { log.error("Unhandled exception resulting in internal server error response", exc); - if (restConfig.getReturn429InsteadOf500ForJettyResponseErrors()) { - // #inc-3209, instead of returning a 500 error, we return a 429 error temporarily - if (exc instanceof IllegalStateException && exc.getMessage() != null - && exc.getMessage().contains("Response does not exist (likely recycled)")) { - return createResponse(exc, Response.Status.TOO_MANY_REQUESTS.getStatusCode(), - Response.Status.TOO_MANY_REQUESTS, - Response.Status.TOO_MANY_REQUESTS.getReasonPhrase()).build(); + try { + if (restConfig.getReturn429InsteadOf500ForJettyResponseErrors()) { + // #inc-3209, instead of returning a 500 error, we return a 429 error temporarily + if (exc instanceof IllegalStateException && exc.getMessage() != null + && exc.getMessage().contains("Response does not exist (likely recycled)")) { + return createResponse(exc, Response.Status.TOO_MANY_REQUESTS.getStatusCode(), + Response.Status.TOO_MANY_REQUESTS, + Response.Status.TOO_MANY_REQUESTS.getReasonPhrase()).build(); + } } + } catch (Exception e) { + // If we fail to get the rest config correctly, we log the error + log.error("Failed to create response for exception", e); } // There's no more specific information about the exception that can be passed back to the user, diff --git a/core/src/test/java/io/confluent/rest/KafkaExceptionMapperTest.java b/core/src/test/java/io/confluent/rest/KafkaExceptionMapperTest.java index 6906bc4aae..751de44a45 100644 --- a/core/src/test/java/io/confluent/rest/KafkaExceptionMapperTest.java +++ b/core/src/test/java/io/confluent/rest/KafkaExceptionMapperTest.java @@ -19,6 +19,7 @@ import io.confluent.rest.entities.ErrorMessage; import io.confluent.rest.exceptions.KafkaExceptionMapper; import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.BrokerNotAvailableException; @@ -53,6 +54,9 @@ import jakarta.ws.rs.core.Response; import jakarta.ws.rs.core.Response.Status; + +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -74,7 +78,10 @@ public class KafkaExceptionMapperTest { @BeforeEach public void setUp() { - exceptionMapper = new KafkaExceptionMapper(null); + Map props = new HashMap<>(); + props.put(RestConfig.RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_CONFIG, "false"); + RestConfig config = new RestConfig(RestConfig.baseConfigDef(), props); + exceptionMapper = new KafkaExceptionMapper(config); } @Test From fe264d8b33f32e1d704584e7d363fb7b374ccac7 Mon Sep 17 00:00:00 2001 From: Hritwik Singhai Date: Tue, 8 Jul 2025 18:29:23 +0100 Subject: [PATCH 5/5] adding a UT for the temporary exception Mapper logic --- .../rest/KafkaExceptionMapperTest.java | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/core/src/test/java/io/confluent/rest/KafkaExceptionMapperTest.java b/core/src/test/java/io/confluent/rest/KafkaExceptionMapperTest.java index 751de44a45..f7f1f96145 100644 --- a/core/src/test/java/io/confluent/rest/KafkaExceptionMapperTest.java +++ b/core/src/test/java/io/confluent/rest/KafkaExceptionMapperTest.java @@ -19,7 +19,6 @@ import io.confluent.rest.entities.ErrorMessage; import io.confluent.rest.exceptions.KafkaExceptionMapper; import org.apache.kafka.clients.consumer.CommitFailedException; -import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.BrokerNotAvailableException; @@ -78,10 +77,7 @@ public class KafkaExceptionMapperTest { @BeforeEach public void setUp() { - Map props = new HashMap<>(); - props.put(RestConfig.RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_CONFIG, "false"); - RestConfig config = new RestConfig(RestConfig.baseConfigDef(), props); - exceptionMapper = new KafkaExceptionMapper(config); + exceptionMapper = new KafkaExceptionMapper(null); } @Test @@ -200,6 +196,28 @@ public void testWrappedExceptions() { verifyMapperResponse(new ExecutionException(unknownServerException), Status.BAD_REQUEST, KAFKA_BAD_REQUEST_ERROR_CODE); } + @Test + public void testGenericExceptionMapper_temp_throw429InsteadOf500() { + Map props = new HashMap<>(); + props.put(RestConfig.RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_CONFIG, false); + RestConfig configFalse = new RestConfig(RestConfig.baseConfigDef(), props); + KafkaExceptionMapper exceptionMapperWithConfigFalse = new KafkaExceptionMapper(configFalse); + + Response responseWithConfigFalse = + exceptionMapperWithConfigFalse.toResponse(new IllegalStateException("Response does not exist (likely recycled)")); + assertNotNull(responseWithConfigFalse); + assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), responseWithConfigFalse.getStatus()); + + props.put(RestConfig.RETURN_429_INSTEAD_OF_500_FOR_JETTY_RESPONSE_ERRORS_CONFIG, true); + RestConfig configTrue = new RestConfig(RestConfig.baseConfigDef(), props); + KafkaExceptionMapper exceptionMapperWithConfigTrue = new KafkaExceptionMapper(configTrue); + Response responseWithConfigTrue = + exceptionMapperWithConfigTrue.toResponse(new IllegalStateException("Response does not exist (likely recycled)")); + + assertNotNull(responseWithConfigTrue); + assertEquals(Status.TOO_MANY_REQUESTS.getStatusCode(), responseWithConfigTrue.getStatus()); + } + private void verifyMapperResponse(Throwable throwable, Status status, int errorCode) { Response response = exceptionMapper.toResponse(throwable); assertNotNull(response);