diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java index d3577a30f3..ea905e19dc 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java @@ -59,4 +59,14 @@ protected BehaviorChangeConfiguration( .description("Whether or not to use soft values in the entity cache") .defaultValue(false) .buildBehaviorChangeConfiguration(); + + public static final BehaviorChangeConfiguration TABLE_OPERATIONS_COMMIT_UPDATE_METADATA = + PolarisConfiguration.builder() + .key("TABLE_OPERATIONS_COMMIT_UPDATE_METADATA") + .description( + "If true, BasePolarisTableOperations should update the metadata that is passed into" + + " `commit`, and re-use it to skip a trip to object storage to re-construct" + + " the committed metadata again.") + .defaultValue(true) + .buildBehaviorChangeConfiguration(); } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java index 4a5506fa22..b8ffced5ef 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java @@ -133,6 +133,8 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.MockedStatic; import org.mockito.Mockito; import software.amazon.awssdk.core.exception.NonRetryableException; import software.amazon.awssdk.core.exception.RetryableException; @@ -1645,8 +1647,8 @@ public void testFileIOWrapper() { table.updateProperties().set("foo", "bar").commit(); Assertions.assertThat(measured.getInputBytes()) - .as("A table was read and written") - .isGreaterThan(0); + .as("A table was read and written, but no trip to storage was made") + .isEqualTo(0); Assertions.assertThat(catalog.dropTable(TABLE)).as("Table deletion should succeed").isTrue(); TaskEntity taskEntity = @@ -1795,6 +1797,58 @@ public void testConcurrencyConflictUpdateTableDuringFinalTransaction() { .hasMessageContaining("conflict_table"); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testTableOperationsDoesNotRefreshAfterCommit(boolean updateMetadataOnCommit) { + if (this.requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + catalog.buildTable(TABLE, SCHEMA).create(); + + IcebergCatalog.BasePolarisTableOperations realOps = + (IcebergCatalog.BasePolarisTableOperations) + catalog.newTableOps(TABLE, updateMetadataOnCommit); + IcebergCatalog.BasePolarisTableOperations ops = Mockito.spy(realOps); + + try (MockedStatic mocked = + Mockito.mockStatic(TableMetadataParser.class, Mockito.CALLS_REAL_METHODS)) { + TableMetadata base1 = ops.current(); + mocked.verify( + () -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(1)); + + TableMetadata base2 = ops.refresh(); + mocked.verify( + () -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(1)); + + Assertions.assertThat(base1.metadataFileLocation()).isEqualTo(base2.metadataFileLocation()); + Assertions.assertThat(base1).isEqualTo(base2); + + Schema newSchema = + new Schema(Types.NestedField.optional(100, "new_col", Types.LongType.get())); + TableMetadata newMetadata = + TableMetadata.buildFrom(base1).setCurrentSchema(newSchema, 100).build(); + ops.commit(base2, newMetadata); + mocked.verify( + () -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(1)); + + ops.current(); + int expectedReads = 2; + if (updateMetadataOnCommit) { + expectedReads = 1; + } + mocked.verify( + () -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), + Mockito.times(expectedReads)); + ops.refresh(); + mocked.verify( + () -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), + Mockito.times(expectedReads)); + } finally { + catalog.dropTable(TABLE, true); + } + } + private static InMemoryFileIO getInMemoryIo(IcebergCatalog catalog) { return (InMemoryFileIO) ((ExceptionMappingFileIO) catalog.getIo()).getInnerIo(); } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java index 9604ffff52..9302daee89 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java @@ -20,6 +20,7 @@ import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.azure.core.exception.AzureException; @@ -117,6 +118,30 @@ private static void requestCreateTable() { res.close(); } + private static void requestDropTable() { + Response res = + services + .restApi() + .dropTable( + catalog, "ns1", "t1", false, services.realmContext(), services.securityContext()); + res.close(); + } + + private static void requestLoadTable() { + Response res = + services + .restApi() + .loadTable( + catalog, + "ns1", + "t1", + null, + "ALL", + services.realmContext(), + services.securityContext()); + res.close(); + } + static Stream exceptions() { return Stream.of( new AzureException("Forbidden"), @@ -135,7 +160,9 @@ void testLoadFileIOExceptionPropagation(RuntimeException ex) { @MethodSource("exceptions") void testNewInputFileExceptionPropagation(RuntimeException ex) { ioFactory.newInputFileExceptionSupplier = Optional.of(() -> ex); - assertThatThrownBy(FileIOExceptionsTest::requestCreateTable).isSameAs(ex); + assertThatCode(FileIOExceptionsTest::requestCreateTable).doesNotThrowAnyException(); + assertThatThrownBy(FileIOExceptionsTest::requestLoadTable).isSameAs(ex); + assertThatCode(FileIOExceptionsTest::requestDropTable).doesNotThrowAnyException(); } @ParameterizedTest diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index 3f9722f793..2e8fadab75 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -349,9 +349,21 @@ public ViewBuilder buildView(TableIdentifier identifier) { return new PolarisIcebergCatalogViewBuilder(identifier); } + @VisibleForTesting + public TableOperations newTableOps( + TableIdentifier tableIdentifier, boolean updateMetadataOnCommit) { + return new BasePolarisTableOperations(catalogFileIO, tableIdentifier, updateMetadataOnCommit); + } + @Override protected TableOperations newTableOps(TableIdentifier tableIdentifier) { - return new BasePolarisTableOperations(catalogFileIO, tableIdentifier); + boolean updateMetadataOnCommit = + getCurrentPolarisContext() + .getConfigurationStore() + .getConfiguration( + getCurrentPolarisContext(), + BehaviorChangeConfiguration.TABLE_OPERATIONS_COMMIT_UPDATE_METADATA); + return newTableOps(tableIdentifier, updateMetadataOnCommit); } @Override @@ -1184,16 +1196,19 @@ public ViewBuilder withLocation(String newLocation) { } } - private class BasePolarisTableOperations extends BaseMetastoreTableOperations { + public class BasePolarisTableOperations extends BaseMetastoreTableOperations { private final TableIdentifier tableIdentifier; private final String fullTableName; + private final boolean updateMetadataOnCommit; private FileIO tableFileIO; - BasePolarisTableOperations(FileIO defaultFileIO, TableIdentifier tableIdentifier) { + BasePolarisTableOperations( + FileIO defaultFileIO, TableIdentifier tableIdentifier, boolean updateMetadataOnCommit) { LOGGER.debug("new BasePolarisTableOperations for {}", tableIdentifier); this.tableIdentifier = tableIdentifier; this.fullTableName = fullTableName(catalogName, tableIdentifier); this.tableFileIO = defaultFileIO; + this.updateMetadataOnCommit = updateMetadataOnCommit; } @Override @@ -1382,6 +1397,13 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { + "because it has been concurrently modified to %s", tableIdentifier, oldLocation, newLocation, existingLocation); } + // TODO: we should not need to do this hack, but there's no other way to modify + // currentMetadata / currentMetadataLocation + if (updateMetadataOnCommit) { + TableOperationsReflectionUtil reflectionUtil = TableOperationsReflectionUtil.getInstance(); + reflectionUtil.setMetadataFileLocation(metadata, newLocation); + reflectionUtil.setCurrentMetadata(this, metadata, newLocation); + } if (null == existingLocation) { createTableLike(tableIdentifier, entity); } else { diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/TableOperationsReflectionUtil.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/TableOperationsReflectionUtil.java new file mode 100644 index 0000000000..e47ec73bb8 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/TableOperationsReflectionUtil.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.catalog.iceberg; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.TableMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.misc.Unsafe; + +/** + * Used to handle reflection-related fields for TableOperations and TableMetadata TODO remove the + * following if the Iceberg library allows us to do this without reflection For more details see #1378 + */ +public final class TableOperationsReflectionUtil { + private static Logger LOGGER = LoggerFactory.getLogger(TableOperationsReflectionUtil.class); + + private static class Instance { + private static TableOperationsReflectionUtil instance = new TableOperationsReflectionUtil(); + } + + static TableOperationsReflectionUtil getInstance() { + return Instance.instance; + } + + public void setMetadataFileLocation(TableMetadata tableMetadata, String metadataFileLocation) { + try { + tableMetadataField.set(tableMetadata, metadataFileLocation); + unsafe.putObject(tableMetadata, changesFieldOffset, new ArrayList()); + } catch (IllegalAccessException e) { + LOGGER.error( + "Encountered an unexpected error while attempting to access private fields" + + " in TableMetadata", + e); + } + } + + public void setCurrentMetadata( + BaseMetastoreTableOperations tableOperations, + TableMetadata tableMetadata, + String metadataFileLocation) { + try { + currentMetadataLocationField.set(tableOperations, metadataFileLocation); + currentMetadataField.set(tableOperations, tableMetadata); + } catch (IllegalAccessException e) { + LOGGER.error( + "Encountered an unexpected error while attempting to access private fields" + + " in BaseMetastoreTableOperations", + e); + } + } + + private final Unsafe unsafe; + private final Field tableMetadataField; + private final long changesFieldOffset; + private final Field currentMetadataLocationField; + private final Field currentMetadataField; + + private TableOperationsReflectionUtil() { + try { + tableMetadataField = TableMetadata.class.getDeclaredField("metadataFileLocation"); + tableMetadataField.setAccessible(true); + + Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe"); + unsafeField.setAccessible(true); + unsafe = (Unsafe) unsafeField.get(null); + Field changesField = TableMetadata.class.getDeclaredField("changes"); + changesField.setAccessible(true); + changesFieldOffset = unsafe.objectFieldOffset(changesField); + + currentMetadataLocationField = + BaseMetastoreTableOperations.class.getDeclaredField("currentMetadataLocation"); + currentMetadataLocationField.setAccessible(true); + + currentMetadataField = BaseMetastoreTableOperations.class.getDeclaredField("currentMetadata"); + currentMetadataField.setAccessible(true); + } catch (IllegalAccessException | NoSuchFieldException e) { + LOGGER.error( + "Encountered an unexpected error while attempting to access private fields in TableMetadata", + e); + throw new RuntimeException(e); + } + } +}