Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,14 @@ protected BehaviorChangeConfiguration(
.description("Whether or not to use soft values in the entity cache")
.defaultValue(false)
.buildBehaviorChangeConfiguration();

public static final BehaviorChangeConfiguration<Boolean> TABLE_OPERATIONS_COMMIT_UPDATE_METADATA =
PolarisConfiguration.<Boolean>builder()
.key("TABLE_OPERATIONS_COMMIT_UPDATE_METADATA")
.description(
"If true, BasePolarisTableOperations should update the metadata that is passed into"
+ " `commit`, and re-use it to skip a trip to object storage to re-construct"
+ " the committed metadata again.")
.defaultValue(true)
.buildBehaviorChangeConfiguration();
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import software.amazon.awssdk.core.exception.NonRetryableException;
import software.amazon.awssdk.core.exception.RetryableException;
Expand Down Expand Up @@ -1645,8 +1647,8 @@ public void testFileIOWrapper() {

table.updateProperties().set("foo", "bar").commit();
Assertions.assertThat(measured.getInputBytes())
.as("A table was read and written")
.isGreaterThan(0);
.as("A table was read and written, but no trip to storage was made")
.isEqualTo(0);

Assertions.assertThat(catalog.dropTable(TABLE)).as("Table deletion should succeed").isTrue();
TaskEntity taskEntity =
Expand Down Expand Up @@ -1795,6 +1797,58 @@ public void testConcurrencyConflictUpdateTableDuringFinalTransaction() {
.hasMessageContaining("conflict_table");
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testTableOperationsDoesNotRefreshAfterCommit(boolean updateMetadataOnCommit) {
if (this.requiresNamespaceCreate()) {
catalog.createNamespace(NS);
}

catalog.buildTable(TABLE, SCHEMA).create();

IcebergCatalog.BasePolarisTableOperations realOps =
(IcebergCatalog.BasePolarisTableOperations)
catalog.newTableOps(TABLE, updateMetadataOnCommit);
IcebergCatalog.BasePolarisTableOperations ops = Mockito.spy(realOps);

try (MockedStatic<TableMetadataParser> mocked =
Mockito.mockStatic(TableMetadataParser.class, Mockito.CALLS_REAL_METHODS)) {
TableMetadata base1 = ops.current();
mocked.verify(
() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(1));

TableMetadata base2 = ops.refresh();
mocked.verify(
() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(1));

Assertions.assertThat(base1.metadataFileLocation()).isEqualTo(base2.metadataFileLocation());
Assertions.assertThat(base1).isEqualTo(base2);

Schema newSchema =
new Schema(Types.NestedField.optional(100, "new_col", Types.LongType.get()));
TableMetadata newMetadata =
TableMetadata.buildFrom(base1).setCurrentSchema(newSchema, 100).build();
ops.commit(base2, newMetadata);
mocked.verify(
() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(1));

ops.current();
int expectedReads = 2;
if (updateMetadataOnCommit) {
expectedReads = 1;
}
mocked.verify(
() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()),
Mockito.times(expectedReads));
ops.refresh();
mocked.verify(
() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()),
Mockito.times(expectedReads));
} finally {
catalog.dropTable(TABLE, true);
}
}

private static InMemoryFileIO getInMemoryIo(IcebergCatalog catalog) {
return (InMemoryFileIO) ((ExceptionMappingFileIO) catalog.getIo()).getInnerIo();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.azure.core.exception.AzureException;
Expand Down Expand Up @@ -117,6 +118,30 @@ private static void requestCreateTable() {
res.close();
}

private static void requestDropTable() {
Response res =
services
.restApi()
.dropTable(
catalog, "ns1", "t1", false, services.realmContext(), services.securityContext());
res.close();
}

private static void requestLoadTable() {
Response res =
services
.restApi()
.loadTable(
catalog,
"ns1",
"t1",
null,
"ALL",
services.realmContext(),
services.securityContext());
res.close();
}

static Stream<RuntimeException> exceptions() {
return Stream.of(
new AzureException("Forbidden"),
Expand All @@ -135,7 +160,9 @@ void testLoadFileIOExceptionPropagation(RuntimeException ex) {
@MethodSource("exceptions")
void testNewInputFileExceptionPropagation(RuntimeException ex) {
ioFactory.newInputFileExceptionSupplier = Optional.of(() -> ex);
assertThatThrownBy(FileIOExceptionsTest::requestCreateTable).isSameAs(ex);
assertThatCode(FileIOExceptionsTest::requestCreateTable).doesNotThrowAnyException();
assertThatThrownBy(FileIOExceptionsTest::requestLoadTable).isSameAs(ex);
assertThatCode(FileIOExceptionsTest::requestDropTable).doesNotThrowAnyException();
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,21 @@ public ViewBuilder buildView(TableIdentifier identifier) {
return new PolarisIcebergCatalogViewBuilder(identifier);
}

@VisibleForTesting
public TableOperations newTableOps(
TableIdentifier tableIdentifier, boolean updateMetadataOnCommit) {
return new BasePolarisTableOperations(catalogFileIO, tableIdentifier, updateMetadataOnCommit);
}

@Override
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
return new BasePolarisTableOperations(catalogFileIO, tableIdentifier);
boolean updateMetadataOnCommit =
getCurrentPolarisContext()
.getConfigurationStore()
.getConfiguration(
getCurrentPolarisContext(),
BehaviorChangeConfiguration.TABLE_OPERATIONS_COMMIT_UPDATE_METADATA);
return newTableOps(tableIdentifier, updateMetadataOnCommit);
}

@Override
Expand Down Expand Up @@ -1184,16 +1196,19 @@ public ViewBuilder withLocation(String newLocation) {
}
}

private class BasePolarisTableOperations extends BaseMetastoreTableOperations {
public class BasePolarisTableOperations extends BaseMetastoreTableOperations {
private final TableIdentifier tableIdentifier;
private final String fullTableName;
private final boolean updateMetadataOnCommit;
private FileIO tableFileIO;

BasePolarisTableOperations(FileIO defaultFileIO, TableIdentifier tableIdentifier) {
BasePolarisTableOperations(
FileIO defaultFileIO, TableIdentifier tableIdentifier, boolean updateMetadataOnCommit) {
LOGGER.debug("new BasePolarisTableOperations for {}", tableIdentifier);
this.tableIdentifier = tableIdentifier;
this.fullTableName = fullTableName(catalogName, tableIdentifier);
this.tableFileIO = defaultFileIO;
this.updateMetadataOnCommit = updateMetadataOnCommit;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do the same for views?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current iteration of the PR I'm making a judgement call between the complexity of the change and consistency across views & tables. Ideally we can contribute this upstream to Iceberg and then the reflection hack goes away. In the meantime, views tend to have much less metadata than tables (and also to get updated less frequently) and so I'm less concerned about the extra round trip to object storage.

}

@Override
Expand Down Expand Up @@ -1382,6 +1397,13 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
+ "because it has been concurrently modified to %s",
tableIdentifier, oldLocation, newLocation, existingLocation);
}
// TODO: we should not need to do this hack, but there's no other way to modify
// currentMetadata / currentMetadataLocation
if (updateMetadataOnCommit) {
TableOperationsReflectionUtil reflectionUtil = TableOperationsReflectionUtil.getInstance();
reflectionUtil.setMetadataFileLocation(metadata, newLocation);
reflectionUtil.setCurrentMetadata(this, metadata, newLocation);
}
if (null == existingLocation) {
createTableLike(tableIdentifier, entity);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.catalog.iceberg;

import java.lang.reflect.Field;
import java.util.ArrayList;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.MetadataUpdate;
import org.apache.iceberg.TableMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.misc.Unsafe;

/**
* Used to handle reflection-related fields for TableOperations and TableMetadata TODO remove the
* following if the Iceberg library allows us to do this without reflection For more details see <a
* href="https://github.com/apache/polaris/pull/1378">#1378</a>
*/
public final class TableOperationsReflectionUtil {
private static Logger LOGGER = LoggerFactory.getLogger(TableOperationsReflectionUtil.class);

private static class Instance {
private static TableOperationsReflectionUtil instance = new TableOperationsReflectionUtil();
}

static TableOperationsReflectionUtil getInstance() {
return Instance.instance;
}

public void setMetadataFileLocation(TableMetadata tableMetadata, String metadataFileLocation) {
try {
tableMetadataField.set(tableMetadata, metadataFileLocation);
unsafe.putObject(tableMetadata, changesFieldOffset, new ArrayList<MetadataUpdate>());
} catch (IllegalAccessException e) {
LOGGER.error(
"Encountered an unexpected error while attempting to access private fields"
+ " in TableMetadata",
e);
}
}

public void setCurrentMetadata(
BaseMetastoreTableOperations tableOperations,
TableMetadata tableMetadata,
String metadataFileLocation) {
try {
currentMetadataLocationField.set(tableOperations, metadataFileLocation);
currentMetadataField.set(tableOperations, tableMetadata);
} catch (IllegalAccessException e) {
LOGGER.error(
"Encountered an unexpected error while attempting to access private fields"
+ " in BaseMetastoreTableOperations",
e);
}
}

private final Unsafe unsafe;
private final Field tableMetadataField;
private final long changesFieldOffset;
private final Field currentMetadataLocationField;
private final Field currentMetadataField;

private TableOperationsReflectionUtil() {
try {
tableMetadataField = TableMetadata.class.getDeclaredField("metadataFileLocation");
tableMetadataField.setAccessible(true);

Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
unsafeField.setAccessible(true);
unsafe = (Unsafe) unsafeField.get(null);
Field changesField = TableMetadata.class.getDeclaredField("changes");
changesField.setAccessible(true);
changesFieldOffset = unsafe.objectFieldOffset(changesField);

currentMetadataLocationField =
BaseMetastoreTableOperations.class.getDeclaredField("currentMetadataLocation");
currentMetadataLocationField.setAccessible(true);

currentMetadataField = BaseMetastoreTableOperations.class.getDeclaredField("currentMetadata");
currentMetadataField.setAccessible(true);
} catch (IllegalAccessException | NoSuchFieldException e) {
LOGGER.error(
"Encountered an unexpected error while attempting to access private fields in TableMetadata",
e);
throw new RuntimeException(e);
}
}
}