Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,8 @@
*/
package org.apache.polaris.core.context;

import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDiagnostics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Stores elements associated with an individual REST request such as RealmContext, caller
Expand All @@ -37,15 +29,9 @@
* principal/role entities may be defined within a Realm-specific persistence layer, and the
* underlying nature of the persistence layer may differ between different realms.
*/
public interface CallContext extends AutoCloseable {
public interface CallContext {
InheritableThreadLocal<CallContext> CURRENT_CONTEXT = new InheritableThreadLocal<>();

// For requests that make use of a Catalog instance, this holds the instance that was
// created, scoped to the current call context.
String REQUEST_PATH_CATALOG_INSTANCE_KEY = "REQUEST_PATH_CATALOG_INSTANCE";

String CLOSEABLES = "closeables";

static CallContext setCurrentContext(CallContext context) {
CURRENT_CONTEXT.set(context);
return context;
Expand All @@ -65,7 +51,6 @@ static void unsetCurrentContext() {

static CallContext of(
final RealmContext realmContext, final PolarisCallContext polarisCallContext) {
Map<String, Object> map = new HashMap<>();
return new CallContext() {
@Override
public RealmContext getRealmContext() {
Expand All @@ -76,28 +61,14 @@ public RealmContext getRealmContext() {
public PolarisCallContext getPolarisCallContext() {
return polarisCallContext;
}

@Override
public Map<String, Object> contextVariables() {
return map;
}
};
}

/**
* Copy the {@link CallContext}. {@link #contextVariables()} will be copied except for {@link
* #closeables()}. The original {@link #contextVariables()} map is untouched and {@link
* #closeables()} in the original {@link CallContext} should be closed along with the {@link
* CallContext}.
*/
/** Copy the {@link CallContext}. */
static CallContext copyOf(CallContext base) {
String realmId = base.getRealmContext().getRealmIdentifier();
RealmContext realmContext = () -> realmId;
PolarisCallContext polarisCallContext = PolarisCallContext.copyOf(base.getPolarisCallContext());
Map<String, Object> contextVariables =
base.contextVariables().entrySet().stream()
.filter(e -> !e.getKey().equals(CLOSEABLES))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return new CallContext() {
@Override
public RealmContext getRealmContext() {
Expand All @@ -108,11 +79,6 @@ public RealmContext getRealmContext() {
public PolarisCallContext getPolarisCallContext() {
return polarisCallContext;
}

@Override
public Map<String, Object> contextVariables() {
return contextVariables;
}
};
}

Expand All @@ -122,28 +88,4 @@ public Map<String, Object> contextVariables() {
* @return the inner context used for delegating services
*/
PolarisCallContext getPolarisCallContext();

Map<String, Object> contextVariables();

default @Nonnull CloseableGroup closeables() {
return (CloseableGroup)
contextVariables().computeIfAbsent(CLOSEABLES, key -> new CloseableGroup());
}

@Override
default void close() {
if (CURRENT_CONTEXT.get() == this) {
unsetCurrentContext();
CloseableGroup closeables = closeables();
try {
closeables.close();
} catch (IOException e) {
Logger logger = LoggerFactory.getLogger(CallContext.class);
logger
.atWarn()
.addKeyValue("closeableGroup", closeables)
.log("Unable to close closeable group", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,43 +105,41 @@ public void testValidateAccessToLocationsWithWildcard() {
}
},
Clock.systemUTC());
try (CallContext ignored =
CallContext.setCurrentContext(CallContext.of(() -> "realm", polarisCallContext))) {
Map<String, Map<PolarisStorageActions, PolarisStorageIntegration.ValidationResult>> result =
storage.validateAccessToLocations(
new FileStorageConfigurationInfo(List.of("file://", "*")),
Set.of(PolarisStorageActions.READ),
Set.of(
"s3://bucket/path/to/warehouse/namespace/table",
"file:///etc/passwd",
"a/relative/subdirectory"));
Assertions.assertThat(result)
.hasSize(3)
.hasEntrySatisfying(
"s3://bucket/path/to/warehouse/namespace/table",
val ->
Assertions.assertThat(val)
.hasSize(1)
.containsKey(PolarisStorageActions.READ)
.extractingByKey(PolarisStorageActions.READ)
.returns(true, PolarisStorageIntegration.ValidationResult::isSuccess))
.hasEntrySatisfying(
"file:///etc/passwd",
val ->
Assertions.assertThat(val)
.hasSize(1)
.containsKey(PolarisStorageActions.READ)
.extractingByKey(PolarisStorageActions.READ)
.returns(true, PolarisStorageIntegration.ValidationResult::isSuccess))
.hasEntrySatisfying(
"a/relative/subdirectory",
val ->
Assertions.assertThat(val)
.hasSize(1)
.containsKey(PolarisStorageActions.READ)
.extractingByKey(PolarisStorageActions.READ)
.returns(true, PolarisStorageIntegration.ValidationResult::isSuccess));
}
CallContext.setCurrentContext(CallContext.of(() -> "realm", polarisCallContext));
Map<String, Map<PolarisStorageActions, PolarisStorageIntegration.ValidationResult>> result =
storage.validateAccessToLocations(
new FileStorageConfigurationInfo(List.of("file://", "*")),
Set.of(PolarisStorageActions.READ),
Set.of(
"s3://bucket/path/to/warehouse/namespace/table",
"file:///etc/passwd",
"a/relative/subdirectory"));
Assertions.assertThat(result)
.hasSize(3)
.hasEntrySatisfying(
"s3://bucket/path/to/warehouse/namespace/table",
val ->
Assertions.assertThat(val)
.hasSize(1)
.containsKey(PolarisStorageActions.READ)
.extractingByKey(PolarisStorageActions.READ)
.returns(true, PolarisStorageIntegration.ValidationResult::isSuccess))
.hasEntrySatisfying(
"file:///etc/passwd",
val ->
Assertions.assertThat(val)
.hasSize(1)
.containsKey(PolarisStorageActions.READ)
.extractingByKey(PolarisStorageActions.READ)
.returns(true, PolarisStorageIntegration.ValidationResult::isSuccess))
.hasEntrySatisfying(
"a/relative/subdirectory",
val ->
Assertions.assertThat(val)
.hasSize(1)
.containsKey(PolarisStorageActions.READ)
.extractingByKey(PolarisStorageActions.READ)
.returns(true, PolarisStorageIntegration.ValidationResult::isSuccess));
}

@Test
Expand Down
Loading
Loading