Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.config.RealmEntityManagerFactory;
import org.apache.polaris.service.context.RealmContextConfiguration;
import org.apache.polaris.service.context.RealmContextFilter;
import org.apache.polaris.service.context.RealmContextResolver;
import org.apache.polaris.service.events.PolarisEventListener;
import org.apache.polaris.service.quarkus.auth.QuarkusAuthenticationConfiguration;
import org.apache.polaris.service.quarkus.auth.QuarkusAuthenticationRealmConfiguration;
import org.apache.polaris.service.quarkus.auth.external.tenant.OidcTenantResolver;
import org.apache.polaris.service.quarkus.catalog.io.QuarkusFileIOConfiguration;
import org.apache.polaris.service.quarkus.context.QuarkusRealmContextConfiguration;
import org.apache.polaris.service.quarkus.context.RealmContextFilter;
import org.apache.polaris.service.quarkus.events.QuarkusPolarisEventListenerConfiguration;
import org.apache.polaris.service.quarkus.persistence.QuarkusPersistenceConfiguration;
import org.apache.polaris.service.quarkus.ratelimiter.QuarkusRateLimiterFilterConfiguration;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.quarkus.context;

import io.smallrye.common.vertx.ContextLocals;
import io.smallrye.mutiny.Uni;
import jakarta.inject.Inject;
import jakarta.ws.rs.container.ContainerRequestContext;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.apache.polaris.service.config.PolarisFilterPriorities;
import org.apache.polaris.service.context.RealmContextResolver;
import org.jboss.resteasy.reactive.server.ServerRequestFilter;

public class RealmContextFilter {

public static final String REALM_CONTEXT_KEY = "realmContext";

@Inject RealmContextResolver realmContextResolver;

@ServerRequestFilter(preMatching = true, priority = PolarisFilterPriorities.REALM_CONTEXT_FILTER)
public Uni<Response> resolveRealmContext(ContainerRequestContext rc) {
return Uni.createFrom()
.completionStage(
() ->
realmContextResolver.resolveRealmContext(
rc.getUriInfo().getRequestUri().toString(),
rc.getMethod(),
rc.getUriInfo().getPath(),
rc.getHeaders()::getFirst))
.onItem()
.invoke(realmContext -> rc.setProperty(REALM_CONTEXT_KEY, realmContext))
.invoke(realmContext -> ContextLocals.put(REALM_CONTEXT_KEY, realmContext))
.onItemOrFailure()
.transform((realmContext, error) -> error == null ? null : errorResponse(error));
}

private static Response errorResponse(Throwable error) {
return Response.status(Response.Status.NOT_FOUND)
.type(MediaType.APPLICATION_JSON_TYPE)
.entity(
ErrorResponse.builder()
.responseCode(Response.Status.NOT_FOUND.getStatusCode())
.withMessage(
error.getMessage() != null ? error.getMessage() : "Missing or invalid realm")
.withType("MissingOrInvalidRealm")
.build())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.polaris.service.quarkus.logging;

import static org.apache.polaris.service.context.RealmContextFilter.REALM_CONTEXT_KEY;
import static org.apache.polaris.service.quarkus.context.RealmContextFilter.REALM_CONTEXT_KEY;

import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,44 @@
public class RealmIdTagContributor implements HttpServerMetricsTagsContributor {

public static final String TAG_REALM = "realm_id";
public static final String TAG_REALM_RESOLUTION_FAILURE = "realm_resolution_failure";

private static final Tags UNFINISHED_RESOLUTION_TAGS = Tags.of(TAG_REALM, "???");
private static final Tags FAILED_RESOLUTION_TAGS = Tags.of(TAG_REALM, "!!!");

@Inject RealmContextResolver realmContextResolver;

@Override
public Tags contribute(Context context) {
// FIXME request scope does not work here, so we have to resolve the realm context manually
// FIXME retrieve the realm context from context.requestContextLocalData() when this PR is in:
// https://github.com/quarkusio/quarkus/pull/47887
HttpServerRequest request = context.request();
try {
RealmContext realmContext = resolveRealmContext(request);
return Tags.of(TAG_REALM, realmContext.getRealmIdentifier());
} catch (Exception ignored) {
// ignore, the RealmContextFilter will handle the error
return Tags.empty();
return realmContextResolver
.resolveRealmContext(
request.absoluteURI(),
request.method().name(),
request.path(),
request.headers()::get)
.thenApply(this::successfulResolutionTags)
.exceptionally(this::failedResolutionTags)
.toCompletableFuture()
// get the result of the CompletableFuture if it's already completed,
// otherwise return UNFINISHED_RESOLUTION_TAGS as this code is executed on
// an event loop thread, and we don't want to block it.
.getNow(UNFINISHED_RESOLUTION_TAGS);
} catch (Exception e) {
return failedResolutionTags(e);
}
}

private RealmContext resolveRealmContext(HttpServerRequest request) {
return realmContextResolver.resolveRealmContext(
request.absoluteURI(), request.method().name(), request.path(), request.headers()::get);
private Tags successfulResolutionTags(RealmContext realmContext) {
return Tags.of(TAG_REALM, realmContext.getRealmIdentifier());
}

private Tags failedResolutionTags(Throwable error) {
return FAILED_RESOLUTION_TAGS.and(
TAG_REALM_RESOLUTION_FAILURE,
error.getMessage() == null ? error.getClass().getSimpleName() : error.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import jakarta.ws.rs.container.PreMatching;
import jakarta.ws.rs.ext.Provider;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.service.context.RealmContextFilter;
import org.apache.polaris.service.quarkus.config.QuarkusFilterPriorities;
import org.apache.polaris.service.quarkus.context.RealmContextFilter;
import org.apache.polaris.service.quarkus.logging.QuarkusLoggingMDCFilter;
import org.eclipse.microprofile.config.inject.ConfigProperty;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testInvalidRealmHeaderValue() {
.extracting(ErrorResponse::code, ErrorResponse::type, ErrorResponse::message)
.containsExactly(
Response.Status.NOT_FOUND.getStatusCode(),
"UnresolvableRealmContextException",
"MissingOrInvalidRealm",
"Unknown realm: INVALID");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,11 @@ private PolarisPrincipalSecrets fetchAdminSecrets() {
}

RealmContext realmContext =
helper.realmContextResolver.resolveRealmContext(
baseUri.toString(), "GET", "/", Map.of(REALM_PROPERTY_KEY, realm));
helper
.realmContextResolver
.resolveRealmContext(baseUri.toString(), "GET", "/", Map.of(REALM_PROPERTY_KEY, realm))
.toCompletableFuture()
.join();

BasePersistence metaStoreSession =
helper.metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.apache.polaris.core.context.RealmContext;

Expand All @@ -36,21 +38,25 @@ public DefaultRealmContextResolver(RealmContextConfiguration configuration) {
}

@Override
public RealmContext resolveRealmContext(
public CompletionStage<RealmContext> resolveRealmContext(
String requestURL, String method, String path, Function<String, String> headers) {
String realm = resolveRealmIdentifier(headers);
return () -> realm;
try {
String realm = resolveRealmIdentifier(headers);
return CompletableFuture.completedFuture(() -> realm);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}

private String resolveRealmIdentifier(Function<String, String> headers) {
String realm = headers.apply(configuration.headerName());
if (realm != null) {
if (!configuration.realms().contains(realm)) {
throw new UnresolvableRealmContextException("Unknown realm: " + realm);
throw new IllegalArgumentException("Unknown realm: " + realm);
}
} else {
if (configuration.requireHeader()) {
throw new UnresolvableRealmContextException(
throw new IllegalArgumentException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this changes HTTP responses for unknown realms from 404 to 400 🤔 As for me, 404 seems to be more applicable. Did you have any reason in mind for changing the status code in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, the HTTP response for failed realm resolution is now crafted by the RealmContextFilter which is mapping all errors to 404 as before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I missed that 🤦

"Missing required realm header: " + configuration.headerName());
}
realm = configuration.defaultRealm();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,45 @@
package org.apache.polaris.service.context;

import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.polaris.core.context.RealmContext;

/**
* An interface for resolving the realm context for a given request.
*
* <p>General implementation guidance:
*
* <p>Methods in this class should not block the calling thread. If the realm resolution process is
* blocking, it should be done in a separate thread.
*
* <p>In the case of an error during realm resolution, the {@link CompletionStage} should complete
* exceptionally; methods should not throw exceptions directly.
Comment on lines +35 to +36
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[doubt] can you please elaborate on, who throws the exception ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@singhpk234 as usual for asynchronous code, invoking resolveRealmContext() should never fail. If a specific implementation is unable to resolve the realm for whatever reason, it should return a failed future.

*
* <p>The realm resolution takes place in the very early stages of the request processing pipeline,
* and before any authentication or authorization checks are performed. Implementations should be
* careful with the use of any blocking operations, as they may lead to performance issues or
* deadlocks, especially in public environments.
*/
public interface RealmContextResolver {

/**
* Resolves the realm context for the given request.
* Resolves the realm context for the given request, and returns a {@link CompletionStage} that
* completes with the resolved realm context.
*
* @return the resolved realm context
* @throws UnresolvableRealmContextException if the realm context cannot be resolved
* @return a {@link CompletionStage} that completes with the resolved realm context
*/
RealmContext resolveRealmContext(
CompletionStage<RealmContext> resolveRealmContext(
String requestURL, String method, String path, Function<String, String> headers);

default RealmContext resolveRealmContext(
/**
* Resolves the realm context for the given request, and returns a {@link CompletionStage} that
* completes with the resolved realm context.
*
* @return a {@link CompletionStage} that completes with the resolved realm context
*/
default CompletionStage<RealmContext> resolveRealmContext(
String requestURL, String method, String path, Map<String, String> headers) {
CaseInsensitiveMap caseInsensitiveMap = new CaseInsensitiveMap(headers);
return resolveRealmContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import jakarta.inject.Inject;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.apache.polaris.core.context.RealmContext;
import org.slf4j.Logger;
Expand All @@ -50,7 +52,7 @@ public TestRealmContextResolver(RealmContextConfiguration configuration) {
}

@Override
public RealmContext resolveRealmContext(
public CompletionStage<RealmContext> resolveRealmContext(
String requestURL, String method, String path, Function<String, String> headers) {
// Since this default resolver is strictly for use in test/dev environments, we'll consider
// it safe to log all contents. Any "real" resolver used in a prod environment should make
Expand All @@ -72,7 +74,7 @@ public RealmContext resolveRealmContext(
parsedProperties.put(REALM_PROPERTY_KEY, configuration.defaultRealm());
}
String realmId = parsedProperties.get(REALM_PROPERTY_KEY);
return () -> realmId;
return CompletableFuture.completedFuture(() -> realmId);
}

/**
Expand Down

This file was deleted.

Loading