Skip to content

Commit cb51efb

Browse files
committed
Use isolated request contexts for task execution
* Avoid propagating request contexts into threads running background tasks. * Use fresh request context for each task instead. * Manually propagate realm context into a task from the request that submits the task. * Fix tests to set thread-local data to match CDI context data. Next step: Simplify DefaultConfigurationStore API to remove contextual arguments and use CDI context instead.
1 parent 3185adf commit cb51efb

File tree

7 files changed

+84
-20
lines changed

7 files changed

+84
-20
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.polaris.service.quarkus.config;
21+
22+
import jakarta.enterprise.context.RequestScoped;
23+
import jakarta.ws.rs.container.ContainerRequestContext;
24+
import java.util.concurrent.atomic.AtomicReference;
25+
import org.apache.polaris.core.context.RealmContext;
26+
27+
/**
28+
* A container for request-scoped information discovered during request execution.
29+
*
30+
* <p>This is an equivalent for {@link ContainerRequestContext}, but for use in non-HTTP requests.
31+
*/
32+
@RequestScoped
33+
public class PolarisRequestContext {
34+
private final AtomicReference<RealmContext> realmCtx = new AtomicReference<>();
35+
36+
/**
37+
* Records the {@link RealmContext} that applies to current request. The realm context may be
38+
* determined from REST API header or by passing explicit realm ID values from one CDI context to
39+
* another.
40+
*
41+
* <p>During the execution of a particular request, this method should be called before {@link
42+
* #realmContext()}.
43+
*/
44+
public void setRealmContext(RealmContext rc) {
45+
realmCtx.set(rc);
46+
}
47+
48+
/**
49+
* Returns the realm context for this request previously set via {@link
50+
* #setRealmContext(RealmContext)}.
51+
*/
52+
public RealmContext realmContext() {
53+
return realmCtx.get();
54+
}
55+
}

quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.polaris.service.quarkus.config;
2020

21+
import com.google.common.base.Preconditions;
2122
import io.smallrye.common.annotation.Identifier;
2223
import io.smallrye.context.SmallRyeManagedExecutor;
2324
import jakarta.enterprise.context.ApplicationScoped;
@@ -29,8 +30,6 @@
2930
import jakarta.enterprise.inject.Instance;
3031
import jakarta.enterprise.inject.Produces;
3132
import jakarta.inject.Singleton;
32-
import jakarta.ws.rs.container.ContainerRequestContext;
33-
import jakarta.ws.rs.core.Context;
3433
import java.time.Clock;
3534
import java.util.stream.Collectors;
3635
import org.apache.polaris.core.PolarisCallContext;
@@ -67,7 +66,6 @@
6766
import org.apache.polaris.service.quarkus.auth.external.tenant.OidcTenantResolver;
6867
import org.apache.polaris.service.quarkus.catalog.io.QuarkusFileIOConfiguration;
6968
import org.apache.polaris.service.quarkus.context.QuarkusRealmContextConfiguration;
70-
import org.apache.polaris.service.quarkus.context.RealmContextFilter;
7169
import org.apache.polaris.service.quarkus.events.QuarkusPolarisEventListenerConfiguration;
7270
import org.apache.polaris.service.quarkus.persistence.QuarkusPersistenceConfiguration;
7371
import org.apache.polaris.service.quarkus.ratelimiter.QuarkusRateLimiterFilterConfiguration;
@@ -115,8 +113,10 @@ public PolarisDiagnostics polarisDiagnostics() {
115113

116114
@Produces
117115
@RequestScoped
118-
public RealmContext realmContext(@Context ContainerRequestContext request) {
119-
return (RealmContext) request.getProperty(RealmContextFilter.REALM_CONTEXT_KEY);
116+
public RealmContext realmContext(PolarisRequestContext context) {
117+
RealmContext realmContext = context.realmContext();
118+
Preconditions.checkState(realmContext != null, "RealmContext was not property configured");
119+
return realmContext;
120120
}
121121

122122
@Produces
@@ -297,7 +297,7 @@ public TokenBroker tokenBroker(
297297
public ManagedExecutor taskExecutor(TaskHandlerConfiguration config) {
298298
return SmallRyeManagedExecutor.builder()
299299
.injectionPointName("task-executor")
300-
.propagated(ThreadContext.ALL_REMAINING)
300+
.propagated(ThreadContext.NONE)
301301
.maxAsync(config.maxConcurrentTasks())
302302
.maxQueued(config.maxQueuedTasks())
303303
.build();

quarkus/service/src/main/java/org/apache/polaris/service/quarkus/context/RealmContextFilter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@
2727
import org.apache.iceberg.rest.responses.ErrorResponse;
2828
import org.apache.polaris.service.config.PolarisFilterPriorities;
2929
import org.apache.polaris.service.context.RealmContextResolver;
30+
import org.apache.polaris.service.quarkus.config.PolarisRequestContext;
3031
import org.jboss.resteasy.reactive.server.ServerRequestFilter;
3132

3233
public class RealmContextFilter {
3334

3435
public static final String REALM_CONTEXT_KEY = "realmContext";
3536

3637
@Inject RealmContextResolver realmContextResolver;
38+
@Inject PolarisRequestContext polarisRequestContext;
3739

3840
@ServerRequestFilter(preMatching = true, priority = PolarisFilterPriorities.REALM_CONTEXT_FILTER)
3941
public Uni<Response> resolveRealmContext(ContainerRequestContext rc) {
@@ -46,7 +48,7 @@ public Uni<Response> resolveRealmContext(ContainerRequestContext rc) {
4648
rc.getUriInfo().getPath(),
4749
rc.getHeaders()::getFirst))
4850
.onItem()
49-
.invoke(realmContext -> rc.setProperty(REALM_CONTEXT_KEY, realmContext))
51+
.invoke(realmContext -> polarisRequestContext.setRealmContext(realmContext))
5052
.invoke(realmContext -> ContextLocals.put(REALM_CONTEXT_KEY, realmContext))
5153
.onItemOrFailure()
5254
.transform((realmContext, error) -> error == null ? null : errorResponse(error));

quarkus/service/src/main/java/org/apache/polaris/service/quarkus/logging/QuarkusLoggingMDCFilter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
*/
1919
package org.apache.polaris.service.quarkus.logging;
2020

21-
import static org.apache.polaris.service.quarkus.context.RealmContextFilter.REALM_CONTEXT_KEY;
22-
2321
import jakarta.annotation.Priority;
2422
import jakarta.enterprise.context.ApplicationScoped;
2523
import jakarta.inject.Inject;
@@ -28,6 +26,7 @@
2826
import jakarta.ws.rs.container.PreMatching;
2927
import jakarta.ws.rs.ext.Provider;
3028
import org.apache.polaris.core.context.RealmContext;
29+
import org.apache.polaris.service.quarkus.config.PolarisRequestContext;
3130
import org.apache.polaris.service.quarkus.config.QuarkusFilterPriorities;
3231
import org.slf4j.MDC;
3332

@@ -41,6 +40,7 @@ public class QuarkusLoggingMDCFilter implements ContainerRequestFilter {
4140
public static final String REQUEST_ID_KEY = "requestId";
4241

4342
@Inject QuarkusLoggingConfiguration loggingConfiguration;
43+
@Inject PolarisRequestContext polarisRequestContext;
4444

4545
@Override
4646
public void filter(ContainerRequestContext rc) {
@@ -54,7 +54,7 @@ public void filter(ContainerRequestContext rc) {
5454
MDC.put(REQUEST_ID_KEY, requestId);
5555
rc.setProperty(REQUEST_ID_KEY, requestId);
5656
}
57-
RealmContext realmContext = (RealmContext) rc.getProperty(REALM_CONTEXT_KEY);
57+
RealmContext realmContext = polarisRequestContext.realmContext();
5858
MDC.put(REALM_ID_KEY, realmContext.getRealmIdentifier());
5959
}
6060
}

quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@
2525
import io.quarkus.runtime.Startup;
2626
import io.smallrye.common.annotation.Identifier;
2727
import jakarta.enterprise.context.ApplicationScoped;
28+
import jakarta.enterprise.context.control.ActivateRequestContext;
2829
import jakarta.inject.Inject;
2930
import java.util.concurrent.ExecutorService;
3031
import org.apache.polaris.core.context.CallContext;
3132
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
3233
import org.apache.polaris.service.events.PolarisEventListener;
34+
import org.apache.polaris.service.quarkus.config.PolarisRequestContext;
3335
import org.apache.polaris.service.quarkus.tracing.QuarkusTracingFilter;
3436
import org.apache.polaris.service.task.TaskExecutorImpl;
3537
import org.apache.polaris.service.task.TaskFileIOSupplier;
@@ -38,9 +40,10 @@
3840
public class QuarkusTaskExecutorImpl extends TaskExecutorImpl {
3941

4042
private final Tracer tracer;
43+
private final PolarisRequestContext polarisRequestContext;
4144

4245
public QuarkusTaskExecutorImpl() {
43-
this(null, null, null, null, null);
46+
this(null, null, null, null, null, null);
4447
}
4548

4649
@Inject
@@ -49,9 +52,11 @@ public QuarkusTaskExecutorImpl(
4952
MetaStoreManagerFactory metaStoreManagerFactory,
5053
TaskFileIOSupplier fileIOSupplier,
5154
Tracer tracer,
52-
PolarisEventListener polarisEventListener) {
55+
PolarisEventListener polarisEventListener,
56+
PolarisRequestContext polarisRequestContext) {
5357
super(executorService, metaStoreManagerFactory, fileIOSupplier, polarisEventListener);
5458
this.tracer = tracer;
59+
this.polarisRequestContext = polarisRequestContext;
5560
}
5661

5762
@Startup
@@ -61,6 +66,7 @@ public void init() {
6166
}
6267

6368
@Override
69+
@ActivateRequestContext
6470
protected void handleTask(long taskEntityId, CallContext callContext, int attempt) {
6571
Span span =
6672
tracer
@@ -73,6 +79,7 @@ protected void handleTask(long taskEntityId, CallContext callContext, int attemp
7379
.setAttribute("polaris.task.attempt", attempt)
7480
.startSpan();
7581
try (Scope ignored = span.makeCurrent()) {
82+
polarisRequestContext.setRealmContext(callContext.getRealmContext());
7683
super.handleTask(taskEntityId, callContext, attempt);
7784
} finally {
7885
span.end();

quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -230,10 +230,6 @@ public static void setUpMocks() {
230230
public void before(TestInfo testInfo) {
231231
RealmContext realmContext = testInfo::getDisplayName;
232232
QuarkusMock.installMockForType(realmContext, RealmContext.class);
233-
metaStoreManager = managerFactory.getOrCreateMetaStoreManager(realmContext);
234-
userSecretsManager = userSecretsManagerFactory.getOrCreateUserSecretsManager(realmContext);
235-
236-
polarisAuthorizer = new PolarisAuthorizerImpl(configurationStore);
237233

238234
polarisContext =
239235
new PolarisCallContext(
@@ -242,11 +238,16 @@ public void before(TestInfo testInfo) {
242238
diagServices,
243239
configurationStore,
244240
clock);
245-
this.entityManager = realmEntityManagerFactory.getOrCreateEntityManager(realmContext);
246-
247241
callContext = polarisContext;
248242
CallContext.setCurrentContext(callContext);
249243

244+
metaStoreManager = managerFactory.getOrCreateMetaStoreManager(realmContext);
245+
userSecretsManager = userSecretsManagerFactory.getOrCreateUserSecretsManager(realmContext);
246+
247+
polarisAuthorizer = new PolarisAuthorizerImpl(configurationStore);
248+
249+
this.entityManager = realmEntityManagerFactory.getOrCreateEntityManager(realmContext);
250+
250251
PrincipalEntity rootEntity =
251252
new PrincipalEntity(
252253
PolarisEntity.of(

quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogViewTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,15 +175,14 @@ public void before(TestInfo testInfo) {
175175
diagServices,
176176
configurationStore,
177177
Clock.systemDefaultZone());
178+
CallContext.setCurrentContext(polarisContext);
178179

179180
PolarisEntityManager entityManager =
180181
new PolarisEntityManager(
181182
metaStoreManager,
182183
new StorageCredentialCache(),
183184
new InMemoryEntityCache(metaStoreManager));
184185

185-
CallContext.setCurrentContext(polarisContext);
186-
187186
PrincipalEntity rootEntity =
188187
new PrincipalEntity(
189188
PolarisEntity.of(

0 commit comments

Comments
 (0)