From 1eef525ee16bc0c9e668cbdef5cc8656535a8e40 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 16 Apr 2025 13:52:15 -0700 Subject: [PATCH 01/14] rebase --- .../config/BehaviorChangeConfiguration.java | 9 ++++ .../quarkus/catalog/IcebergCatalogTest.java | 46 +++++++++++++++++++ .../catalog/iceberg/IcebergCatalog.java | 40 ++++++++++++++-- 3 files changed, 92 insertions(+), 3 deletions(-) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java index d3577a30f3..372a472638 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java @@ -59,4 +59,13 @@ protected BehaviorChangeConfiguration( .description("Whether or not to use soft values in the entity cache") .defaultValue(false) .buildBehaviorChangeConfiguration(); + + public static final BehaviorChangeConfiguration TABLE_OPERATIONS_COMMIT_UPDATE_METADATA = + PolarisConfiguration.builder() + .key("TABLE_OPERATIONS_COMMIT_UPDATE_METADATA") + .description( + "If true, BasePolarisTableOperations should cache metadata that has been committed" + + " which can reduce File IO") + .defaultValue(true) + .buildBehaviorChangeConfiguration(); } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java index 4a5506fa22..7acd6436a3 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java @@ -133,6 +133,8 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.MockedStatic; import org.mockito.Mockito; import software.amazon.awssdk.core.exception.NonRetryableException; import software.amazon.awssdk.core.exception.RetryableException; @@ -1795,6 +1797,50 @@ public void testConcurrencyConflictUpdateTableDuringFinalTransaction() { .hasMessageContaining("conflict_table"); } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testTableOperationsDoesNotRefreshAfterCommit(boolean updateMetadataOnCommit) { + if (this.requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + catalog.buildTable(TABLE, SCHEMA).create(); + + IcebergCatalog.BasePolarisTableOperations realOps = + (IcebergCatalog.BasePolarisTableOperations) catalog.newTableOps(TABLE, updateMetadataOnCommit); + IcebergCatalog.BasePolarisTableOperations ops = Mockito.spy(realOps); + + try (MockedStatic mocked = + Mockito.mockStatic(TableMetadataParser.class, Mockito.CALLS_REAL_METHODS)) { + TableMetadata base1 = ops.current(); + mocked.verify(() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(1)); + + TableMetadata base2 = ops.refresh(); + mocked.verify(() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(1)); + + Assertions.assertThat(base1.metadataFileLocation()).isEqualTo(base2.metadataFileLocation()); + Assertions.assertThat(base1).isEqualTo(base2); + + Schema newSchema = new Schema(Types.NestedField.optional(100, "new_col", Types.LongType.get())); + TableMetadata newMetadata = + TableMetadata.buildFrom(base1).setCurrentSchema(newSchema, 100).build(); + ops.commit(base2, newMetadata); + mocked.verify(() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(1)); + + ops.current(); + int expectedReads = 2; + if (updateMetadataOnCommit) { + expectedReads = 1; + } + mocked.verify(() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(expectedReads)); + ops.refresh(); + mocked.verify(() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(expectedReads)); + } finally { + catalog.dropTable(TABLE, true); + } + } + private static InMemoryFileIO getInMemoryIo(IcebergCatalog catalog) { return (InMemoryFileIO) ((ExceptionMappingFileIO) catalog.getIo()).getInnerIo(); } diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index 3f9722f793..988d70428e 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -30,6 +30,7 @@ import jakarta.ws.rs.core.SecurityContext; import java.io.Closeable; import java.io.IOException; +import java.lang.reflect.Field; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -349,9 +350,20 @@ public ViewBuilder buildView(TableIdentifier identifier) { return new PolarisIcebergCatalogViewBuilder(identifier); } + @VisibleForTesting + public TableOperations newTableOps(TableIdentifier tableIdentifier, boolean updateMetadataOnCommit) { + return new BasePolarisTableOperations(catalogFileIO, tableIdentifier, updateMetadataOnCommit); + } + @Override protected TableOperations newTableOps(TableIdentifier tableIdentifier) { - return new BasePolarisTableOperations(catalogFileIO, tableIdentifier); + boolean updateMetadataOnCommit = + getCurrentPolarisContext() + .getConfigurationStore() + .getConfiguration( + getCurrentPolarisContext(), + BehaviorChangeConfiguration.TABLE_OPERATIONS_COMMIT_UPDATE_METADATA); + return newTableOps(tableIdentifier, updateMetadataOnCommit); } @Override @@ -1184,16 +1196,19 @@ public ViewBuilder withLocation(String newLocation) { } } - private class BasePolarisTableOperations extends BaseMetastoreTableOperations { + public class BasePolarisTableOperations extends BaseMetastoreTableOperations { private final TableIdentifier tableIdentifier; private final String fullTableName; + private final boolean updateMetadataOnCommit; private FileIO tableFileIO; - BasePolarisTableOperations(FileIO defaultFileIO, TableIdentifier tableIdentifier) { + BasePolarisTableOperations( + FileIO defaultFileIO, TableIdentifier tableIdentifier, boolean updateMetadataOnCommit) { LOGGER.debug("new BasePolarisTableOperations for {}", tableIdentifier); this.tableIdentifier = tableIdentifier; this.fullTableName = fullTableName(catalogName, tableIdentifier); this.tableFileIO = defaultFileIO; + this.updateMetadataOnCommit = updateMetadataOnCommit; } @Override @@ -1328,6 +1343,25 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { String newLocation = writeNewMetadataIfRequired(base == null, metadata); String oldLocation = base == null ? null : base.metadataFileLocation(); + // TODO: we should not need to do this hack, but there's no other way to modify + // metadataFileLocation / currentMetadataLocation + if (updateMetadataOnCommit) { + try { + Field tableMetadataField = TableMetadata.class.getDeclaredField("metadataFileLocation"); + tableMetadataField.setAccessible(true); + tableMetadataField.set(metadata, newLocation); + + Field currentMetadataField = + BaseMetastoreTableOperations.class.getDeclaredField("currentMetadataLocation"); + currentMetadataField.setAccessible(true); + currentMetadataField.set(this, newLocation); + } catch (IllegalAccessException | NoSuchFieldException e) { + LOGGER.error( + "Encountered an unexpected error while attempting to modify TableMetadata.metadataFileLocation", + e); + } + } + // TODO: Consider using the entity from doRefresh() directly to do the conflict detection // instead of a two-layer CAS (checking metadataLocation to detect concurrent modification // between doRefresh() and doCommit(), and then updateEntityPropertiesIfNotChanged to detect From 08e78e7349a8b5e771e20076e53834e3bad37079 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 16 Apr 2025 13:52:18 -0700 Subject: [PATCH 02/14] autolint --- .../quarkus/catalog/IcebergCatalogTest.java | 26 ++++++++++++------- .../catalog/iceberg/IcebergCatalog.java | 3 ++- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java index 7acd6436a3..286eb290c3 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java @@ -1797,7 +1797,6 @@ public void testConcurrencyConflictUpdateTableDuringFinalTransaction() { .hasMessageContaining("conflict_table"); } - @ParameterizedTest @ValueSource(booleans = {false, true}) public void testTableOperationsDoesNotRefreshAfterCommit(boolean updateMetadataOnCommit) { @@ -1808,34 +1807,43 @@ public void testTableOperationsDoesNotRefreshAfterCommit(boolean updateMetadataO catalog.buildTable(TABLE, SCHEMA).create(); IcebergCatalog.BasePolarisTableOperations realOps = - (IcebergCatalog.BasePolarisTableOperations) catalog.newTableOps(TABLE, updateMetadataOnCommit); + (IcebergCatalog.BasePolarisTableOperations) + catalog.newTableOps(TABLE, updateMetadataOnCommit); IcebergCatalog.BasePolarisTableOperations ops = Mockito.spy(realOps); try (MockedStatic mocked = - Mockito.mockStatic(TableMetadataParser.class, Mockito.CALLS_REAL_METHODS)) { + Mockito.mockStatic(TableMetadataParser.class, Mockito.CALLS_REAL_METHODS)) { TableMetadata base1 = ops.current(); - mocked.verify(() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(1)); + mocked.verify( + () -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(1)); TableMetadata base2 = ops.refresh(); - mocked.verify(() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(1)); + mocked.verify( + () -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(1)); Assertions.assertThat(base1.metadataFileLocation()).isEqualTo(base2.metadataFileLocation()); Assertions.assertThat(base1).isEqualTo(base2); - Schema newSchema = new Schema(Types.NestedField.optional(100, "new_col", Types.LongType.get())); + Schema newSchema = + new Schema(Types.NestedField.optional(100, "new_col", Types.LongType.get())); TableMetadata newMetadata = TableMetadata.buildFrom(base1).setCurrentSchema(newSchema, 100).build(); ops.commit(base2, newMetadata); - mocked.verify(() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(1)); + mocked.verify( + () -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(1)); ops.current(); int expectedReads = 2; if (updateMetadataOnCommit) { expectedReads = 1; } - mocked.verify(() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(expectedReads)); + mocked.verify( + () -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), + Mockito.times(expectedReads)); ops.refresh(); - mocked.verify(() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(expectedReads)); + mocked.verify( + () -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), + Mockito.times(expectedReads)); } finally { catalog.dropTable(TABLE, true); } diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index 988d70428e..a5cf4ead83 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -351,7 +351,8 @@ public ViewBuilder buildView(TableIdentifier identifier) { } @VisibleForTesting - public TableOperations newTableOps(TableIdentifier tableIdentifier, boolean updateMetadataOnCommit) { + public TableOperations newTableOps( + TableIdentifier tableIdentifier, boolean updateMetadataOnCommit) { return new BasePolarisTableOperations(catalogFileIO, tableIdentifier, updateMetadataOnCommit); } From 37c22c43a0ce1c1d1fe270af463990e624ca6e3d Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 16 Apr 2025 14:58:34 -0700 Subject: [PATCH 03/14] autolint --- .../service/quarkus/catalog/IcebergCatalogTest.java | 4 ++-- .../service/catalog/iceberg/IcebergCatalog.java | 11 ++++++++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java index 286eb290c3..b8ffced5ef 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java @@ -1647,8 +1647,8 @@ public void testFileIOWrapper() { table.updateProperties().set("foo", "bar").commit(); Assertions.assertThat(measured.getInputBytes()) - .as("A table was read and written") - .isGreaterThan(0); + .as("A table was read and written, but no trip to storage was made") + .isEqualTo(0); Assertions.assertThat(catalog.dropTable(TABLE)).as("Table deletion should succeed").isTrue(); TaskEntity taskEntity = diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index a5cf4ead83..24a5191dde 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -1345,17 +1345,22 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { String oldLocation = base == null ? null : base.metadataFileLocation(); // TODO: we should not need to do this hack, but there's no other way to modify - // metadataFileLocation / currentMetadataLocation + // currentMetadata / currentMetadataLocation if (updateMetadataOnCommit) { try { Field tableMetadataField = TableMetadata.class.getDeclaredField("metadataFileLocation"); tableMetadataField.setAccessible(true); tableMetadataField.set(metadata, newLocation); - Field currentMetadataField = + Field currentMetadataLocationField = BaseMetastoreTableOperations.class.getDeclaredField("currentMetadataLocation"); + currentMetadataLocationField.setAccessible(true); + currentMetadataLocationField.set(this, newLocation); + + Field currentMetadataField = + BaseMetastoreTableOperations.class.getDeclaredField("currentMetadata"); currentMetadataField.setAccessible(true); - currentMetadataField.set(this, newLocation); + currentMetadataField.set(this, metadata); } catch (IllegalAccessException | NoSuchFieldException e) { LOGGER.error( "Encountered an unexpected error while attempting to modify TableMetadata.metadataFileLocation", From 7ebf9b01c17e2cdf447dc5234b74e8fbfd7902d9 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 16 Apr 2025 15:36:12 -0700 Subject: [PATCH 04/14] fix other tests --- .../service/catalog/iceberg/IcebergCatalog.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index 24a5191dde..ddd20639c0 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -31,6 +31,7 @@ import java.io.Closeable; import java.io.IOException; import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -47,6 +48,7 @@ import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.MetadataUpdate; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; @@ -1352,6 +1354,15 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { tableMetadataField.setAccessible(true); tableMetadataField.set(metadata, newLocation); + Field tableMetadataChanges = TableMetadata.class.getDeclaredField("changes"); + + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(tableMetadataChanges, tableMetadataChanges.getModifiers() & ~java.lang.reflect.Modifier.FINAL); + + tableMetadataField.setAccessible(true); + tableMetadataField.set(metadata, new ArrayList()); + Field currentMetadataLocationField = BaseMetastoreTableOperations.class.getDeclaredField("currentMetadataLocation"); currentMetadataLocationField.setAccessible(true); From 455e2318e56024502385af9cc7e6b07cfdbcc823 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 16 Apr 2025 15:36:14 -0700 Subject: [PATCH 05/14] autolint --- .../polaris/service/catalog/iceberg/IcebergCatalog.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index ddd20639c0..a3baef516b 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -1358,7 +1358,9 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { Field modifiersField = Field.class.getDeclaredField("modifiers"); modifiersField.setAccessible(true); - modifiersField.setInt(tableMetadataChanges, tableMetadataChanges.getModifiers() & ~java.lang.reflect.Modifier.FINAL); + modifiersField.setInt( + tableMetadataChanges, + tableMetadataChanges.getModifiers() & ~java.lang.reflect.Modifier.FINAL); tableMetadataField.setAccessible(true); tableMetadataField.set(metadata, new ArrayList()); From 453f10853124f36955d4001295f6ad02785727c8 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 16 Apr 2025 15:51:46 -0700 Subject: [PATCH 06/14] semistable --- .../catalog/iceberg/IcebergCatalog.java | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index a3baef516b..6c60c9960b 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -119,6 +119,7 @@ import org.apache.polaris.service.types.NotificationType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import sun.misc.Unsafe; /** Defines the relationship between PolarisEntities and Iceberg's business logic. */ public class IcebergCatalog extends BaseMetastoreViewCatalog @@ -1354,16 +1355,13 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { tableMetadataField.setAccessible(true); tableMetadataField.set(metadata, newLocation); - Field tableMetadataChanges = TableMetadata.class.getDeclaredField("changes"); - - Field modifiersField = Field.class.getDeclaredField("modifiers"); - modifiersField.setAccessible(true); - modifiersField.setInt( - tableMetadataChanges, - tableMetadataChanges.getModifiers() & ~java.lang.reflect.Modifier.FINAL); - - tableMetadataField.setAccessible(true); - tableMetadataField.set(metadata, new ArrayList()); + Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe"); + unsafeField.setAccessible(true); + Unsafe unsafe = (Unsafe) unsafeField.get(null); + Field changesField = TableMetadata.class.getDeclaredField("changes"); + changesField.setAccessible(true); + long offset = unsafe.objectFieldOffset(changesField); + unsafe.putObject(metadata, offset, new ArrayList()); Field currentMetadataLocationField = BaseMetastoreTableOperations.class.getDeclaredField("currentMetadataLocation"); From a6c342d48f85bd0faba18cf6b5fbc5664a6c80a5 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 16 Apr 2025 16:07:23 -0700 Subject: [PATCH 07/14] touch up --- .../polaris/core/config/BehaviorChangeConfiguration.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java index 372a472638..121fc3d31d 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java @@ -64,8 +64,9 @@ protected BehaviorChangeConfiguration( PolarisConfiguration.builder() .key("TABLE_OPERATIONS_COMMIT_UPDATE_METADATA") .description( - "If true, BasePolarisTableOperations should cache metadata that has been committed" - + " which can reduce File IO") + "If true, BasePolarisTableOperations should update the metadata that is passed into" + + " `commit`, which means that future calls to `refresh` may be able to skip a trip to" + + " object storage") .defaultValue(true) .buildBehaviorChangeConfiguration(); } From c3185213c503a074ee1fad711fef47abe0f49d0c Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 16 Apr 2025 16:07:26 -0700 Subject: [PATCH 08/14] autolint --- .../polaris/core/config/BehaviorChangeConfiguration.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java index 121fc3d31d..3039c06c1d 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java @@ -64,9 +64,9 @@ protected BehaviorChangeConfiguration( PolarisConfiguration.builder() .key("TABLE_OPERATIONS_COMMIT_UPDATE_METADATA") .description( - "If true, BasePolarisTableOperations should update the metadata that is passed into" + - " `commit`, which means that future calls to `refresh` may be able to skip a trip to" + - " object storage") + "If true, BasePolarisTableOperations should update the metadata that is passed into" + + " `commit`, which means that future calls to `refresh` may be able to skip a trip to" + + " object storage") .defaultValue(true) .buildBehaviorChangeConfiguration(); } From b06c6eac24f2d4b129b5eac364a5642cf227cae5 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 16 Apr 2025 16:19:55 -0700 Subject: [PATCH 09/14] superstable --- .../catalog/io/FileIOExceptionsTest.java | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java index 9604ffff52..1a1f3daa70 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java @@ -20,6 +20,7 @@ import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.azure.core.exception.AzureException; @@ -40,6 +41,7 @@ import org.apache.polaris.core.admin.model.StorageConfigInfo; import org.apache.polaris.service.TestServices; import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; @@ -117,6 +119,24 @@ private static void requestCreateTable() { res.close(); } + private static void requestDropTable() { + Response res = + services + .restApi() + .dropTable( + catalog, "ns1", "t1", false, services.realmContext(), services.securityContext()); + res.close(); + } + + private static void requestLoadTable() { + Response res = + services + .restApi() + .loadTable( + catalog, "ns1", "t1", null, "ALL", services.realmContext(), services.securityContext()); + res.close(); + } + static Stream exceptions() { return Stream.of( new AzureException("Forbidden"), @@ -135,7 +155,9 @@ void testLoadFileIOExceptionPropagation(RuntimeException ex) { @MethodSource("exceptions") void testNewInputFileExceptionPropagation(RuntimeException ex) { ioFactory.newInputFileExceptionSupplier = Optional.of(() -> ex); - assertThatThrownBy(FileIOExceptionsTest::requestCreateTable).isSameAs(ex); + assertThatCode(FileIOExceptionsTest::requestCreateTable).doesNotThrowAnyException(); + assertThatThrownBy(FileIOExceptionsTest::requestLoadTable).isSameAs(ex); + assertThatCode(FileIOExceptionsTest::requestDropTable).doesNotThrowAnyException(); } @ParameterizedTest From 08783ac9c3594c9450f19ab9a57103435f22d300 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 16 Apr 2025 16:19:57 -0700 Subject: [PATCH 10/14] autolint --- .../service/quarkus/catalog/io/FileIOExceptionsTest.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java index 1a1f3daa70..9302daee89 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java @@ -41,7 +41,6 @@ import org.apache.polaris.core.admin.model.StorageConfigInfo; import org.apache.polaris.service.TestServices; import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; @@ -133,7 +132,13 @@ private static void requestLoadTable() { services .restApi() .loadTable( - catalog, "ns1", "t1", null, "ALL", services.realmContext(), services.securityContext()); + catalog, + "ns1", + "t1", + null, + "ALL", + services.realmContext(), + services.securityContext()); res.close(); } From 34b6e282307a85f9ca407597a67028c1a5dca686 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Wed, 16 Apr 2025 17:45:07 -0700 Subject: [PATCH 11/14] static block per review --- .../catalog/iceberg/IcebergCatalog.java | 59 ++++++++++++------- 1 file changed, 39 insertions(+), 20 deletions(-) diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index 6c60c9960b..ce10d694b5 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -1206,6 +1206,42 @@ public class BasePolarisTableOperations extends BaseMetastoreTableOperations { private final boolean updateMetadataOnCommit; private FileIO tableFileIO; + // TODO remove the following if the Iceberg library allows us to do this without reflection + // For more details see: + // https://github.com/apache/polaris/pull/1378 + private static final Unsafe unsafe; + private static final Field tableMetadataField; + private static final long changesFieldOffset; + private static final Field currentMetadataLocationField; + private static final Field currentMetadataField; + + static { + try { + tableMetadataField = TableMetadata.class.getDeclaredField("metadataFileLocation"); + tableMetadataField.setAccessible(true); + + Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe"); + unsafeField.setAccessible(true); + unsafe = (Unsafe) unsafeField.get(null); + Field changesField = TableMetadata.class.getDeclaredField("changes"); + changesField.setAccessible(true); + changesFieldOffset = unsafe.objectFieldOffset(changesField); + + currentMetadataLocationField = + BaseMetastoreTableOperations.class.getDeclaredField("currentMetadataLocation"); + currentMetadataLocationField.setAccessible(true); + + currentMetadataField = + BaseMetastoreTableOperations.class.getDeclaredField("currentMetadata"); + currentMetadataField.setAccessible(true); + } catch (IllegalAccessException | NoSuchFieldException e) { + LOGGER.error( + "Encountered an unexpected error while attempting to access private fields in TableMetadata", + e); + throw new RuntimeException(e); + } + } + BasePolarisTableOperations( FileIO defaultFileIO, TableIdentifier tableIdentifier, boolean updateMetadataOnCommit) { LOGGER.debug("new BasePolarisTableOperations for {}", tableIdentifier); @@ -1351,30 +1387,13 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { // currentMetadata / currentMetadataLocation if (updateMetadataOnCommit) { try { - Field tableMetadataField = TableMetadata.class.getDeclaredField("metadataFileLocation"); - tableMetadataField.setAccessible(true); tableMetadataField.set(metadata, newLocation); - - Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe"); - unsafeField.setAccessible(true); - Unsafe unsafe = (Unsafe) unsafeField.get(null); - Field changesField = TableMetadata.class.getDeclaredField("changes"); - changesField.setAccessible(true); - long offset = unsafe.objectFieldOffset(changesField); - unsafe.putObject(metadata, offset, new ArrayList()); - - Field currentMetadataLocationField = - BaseMetastoreTableOperations.class.getDeclaredField("currentMetadataLocation"); - currentMetadataLocationField.setAccessible(true); + unsafe.putObject(metadata, changesFieldOffset, new ArrayList()); currentMetadataLocationField.set(this, newLocation); - - Field currentMetadataField = - BaseMetastoreTableOperations.class.getDeclaredField("currentMetadata"); - currentMetadataField.setAccessible(true); currentMetadataField.set(this, metadata); - } catch (IllegalAccessException | NoSuchFieldException e) { + } catch (IllegalAccessException e) { LOGGER.error( - "Encountered an unexpected error while attempting to modify TableMetadata.metadataFileLocation", + "Encountered an unexpected error while attempting to access private fields in TableMetadata", e); } } From f62da004dd0e53fe843ab1e8d4dd45f9d8bd0483 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Thu, 17 Apr 2025 10:57:21 -0700 Subject: [PATCH 12/14] doc change --- .../polaris/core/config/BehaviorChangeConfiguration.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java index 3039c06c1d..ea905e19dc 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java @@ -65,8 +65,8 @@ protected BehaviorChangeConfiguration( .key("TABLE_OPERATIONS_COMMIT_UPDATE_METADATA") .description( "If true, BasePolarisTableOperations should update the metadata that is passed into" - + " `commit`, which means that future calls to `refresh` may be able to skip a trip to" - + " object storage") + + " `commit`, and re-use it to skip a trip to object storage to re-construct" + + " the committed metadata again.") .defaultValue(true) .buildBehaviorChangeConfiguration(); } From f7e11c32fcad489355e6da93450a5ac4421165d5 Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Fri, 18 Apr 2025 23:01:37 -0700 Subject: [PATCH 13/14] changes per review --- .../catalog/iceberg/IcebergCatalog.java | 53 +-------- .../TableOperationsReflectionUtil.java | 104 ++++++++++++++++++ 2 files changed, 107 insertions(+), 50 deletions(-) create mode 100644 service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/TableOperationsReflectionUtil.java diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index ce10d694b5..e218cd4bbe 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -30,8 +30,6 @@ import jakarta.ws.rs.core.SecurityContext; import java.io.Closeable; import java.io.IOException; -import java.lang.reflect.Field; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -48,7 +46,6 @@ import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.MetadataUpdate; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; @@ -119,7 +116,6 @@ import org.apache.polaris.service.types.NotificationType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import sun.misc.Unsafe; /** Defines the relationship between PolarisEntities and Iceberg's business logic. */ public class IcebergCatalog extends BaseMetastoreViewCatalog @@ -1206,42 +1202,6 @@ public class BasePolarisTableOperations extends BaseMetastoreTableOperations { private final boolean updateMetadataOnCommit; private FileIO tableFileIO; - // TODO remove the following if the Iceberg library allows us to do this without reflection - // For more details see: - // https://github.com/apache/polaris/pull/1378 - private static final Unsafe unsafe; - private static final Field tableMetadataField; - private static final long changesFieldOffset; - private static final Field currentMetadataLocationField; - private static final Field currentMetadataField; - - static { - try { - tableMetadataField = TableMetadata.class.getDeclaredField("metadataFileLocation"); - tableMetadataField.setAccessible(true); - - Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe"); - unsafeField.setAccessible(true); - unsafe = (Unsafe) unsafeField.get(null); - Field changesField = TableMetadata.class.getDeclaredField("changes"); - changesField.setAccessible(true); - changesFieldOffset = unsafe.objectFieldOffset(changesField); - - currentMetadataLocationField = - BaseMetastoreTableOperations.class.getDeclaredField("currentMetadataLocation"); - currentMetadataLocationField.setAccessible(true); - - currentMetadataField = - BaseMetastoreTableOperations.class.getDeclaredField("currentMetadata"); - currentMetadataField.setAccessible(true); - } catch (IllegalAccessException | NoSuchFieldException e) { - LOGGER.error( - "Encountered an unexpected error while attempting to access private fields in TableMetadata", - e); - throw new RuntimeException(e); - } - } - BasePolarisTableOperations( FileIO defaultFileIO, TableIdentifier tableIdentifier, boolean updateMetadataOnCommit) { LOGGER.debug("new BasePolarisTableOperations for {}", tableIdentifier); @@ -1386,16 +1346,9 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { // TODO: we should not need to do this hack, but there's no other way to modify // currentMetadata / currentMetadataLocation if (updateMetadataOnCommit) { - try { - tableMetadataField.set(metadata, newLocation); - unsafe.putObject(metadata, changesFieldOffset, new ArrayList()); - currentMetadataLocationField.set(this, newLocation); - currentMetadataField.set(this, metadata); - } catch (IllegalAccessException e) { - LOGGER.error( - "Encountered an unexpected error while attempting to access private fields in TableMetadata", - e); - } + TableOperationsReflectionUtil reflectionUtil = TableOperationsReflectionUtil.getInstance(); + reflectionUtil.setMetadataFileLocation(metadata, newLocation); + reflectionUtil.setCurrentMetadata(this, metadata, newLocation); } // TODO: Consider using the entity from doRefresh() directly to do the conflict detection diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/TableOperationsReflectionUtil.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/TableOperationsReflectionUtil.java new file mode 100644 index 0000000000..e47ec73bb8 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/TableOperationsReflectionUtil.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.catalog.iceberg; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.TableMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.misc.Unsafe; + +/** + * Used to handle reflection-related fields for TableOperations and TableMetadata TODO remove the + * following if the Iceberg library allows us to do this without reflection For more details see #1378 + */ +public final class TableOperationsReflectionUtil { + private static Logger LOGGER = LoggerFactory.getLogger(TableOperationsReflectionUtil.class); + + private static class Instance { + private static TableOperationsReflectionUtil instance = new TableOperationsReflectionUtil(); + } + + static TableOperationsReflectionUtil getInstance() { + return Instance.instance; + } + + public void setMetadataFileLocation(TableMetadata tableMetadata, String metadataFileLocation) { + try { + tableMetadataField.set(tableMetadata, metadataFileLocation); + unsafe.putObject(tableMetadata, changesFieldOffset, new ArrayList()); + } catch (IllegalAccessException e) { + LOGGER.error( + "Encountered an unexpected error while attempting to access private fields" + + " in TableMetadata", + e); + } + } + + public void setCurrentMetadata( + BaseMetastoreTableOperations tableOperations, + TableMetadata tableMetadata, + String metadataFileLocation) { + try { + currentMetadataLocationField.set(tableOperations, metadataFileLocation); + currentMetadataField.set(tableOperations, tableMetadata); + } catch (IllegalAccessException e) { + LOGGER.error( + "Encountered an unexpected error while attempting to access private fields" + + " in BaseMetastoreTableOperations", + e); + } + } + + private final Unsafe unsafe; + private final Field tableMetadataField; + private final long changesFieldOffset; + private final Field currentMetadataLocationField; + private final Field currentMetadataField; + + private TableOperationsReflectionUtil() { + try { + tableMetadataField = TableMetadata.class.getDeclaredField("metadataFileLocation"); + tableMetadataField.setAccessible(true); + + Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe"); + unsafeField.setAccessible(true); + unsafe = (Unsafe) unsafeField.get(null); + Field changesField = TableMetadata.class.getDeclaredField("changes"); + changesField.setAccessible(true); + changesFieldOffset = unsafe.objectFieldOffset(changesField); + + currentMetadataLocationField = + BaseMetastoreTableOperations.class.getDeclaredField("currentMetadataLocation"); + currentMetadataLocationField.setAccessible(true); + + currentMetadataField = BaseMetastoreTableOperations.class.getDeclaredField("currentMetadata"); + currentMetadataField.setAccessible(true); + } catch (IllegalAccessException | NoSuchFieldException e) { + LOGGER.error( + "Encountered an unexpected error while attempting to access private fields in TableMetadata", + e); + throw new RuntimeException(e); + } + } +} From 5aeb9618c850c442aa966e100881d4e810e910ea Mon Sep 17 00:00:00 2001 From: Eric Maynard Date: Fri, 18 Apr 2025 23:09:55 -0700 Subject: [PATCH 14/14] move --- .../service/catalog/iceberg/IcebergCatalog.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index e218cd4bbe..2e8fadab75 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -1343,14 +1343,6 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { String newLocation = writeNewMetadataIfRequired(base == null, metadata); String oldLocation = base == null ? null : base.metadataFileLocation(); - // TODO: we should not need to do this hack, but there's no other way to modify - // currentMetadata / currentMetadataLocation - if (updateMetadataOnCommit) { - TableOperationsReflectionUtil reflectionUtil = TableOperationsReflectionUtil.getInstance(); - reflectionUtil.setMetadataFileLocation(metadata, newLocation); - reflectionUtil.setCurrentMetadata(this, metadata, newLocation); - } - // TODO: Consider using the entity from doRefresh() directly to do the conflict detection // instead of a two-layer CAS (checking metadataLocation to detect concurrent modification // between doRefresh() and doCommit(), and then updateEntityPropertiesIfNotChanged to detect @@ -1405,6 +1397,13 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { + "because it has been concurrently modified to %s", tableIdentifier, oldLocation, newLocation, existingLocation); } + // TODO: we should not need to do this hack, but there's no other way to modify + // currentMetadata / currentMetadataLocation + if (updateMetadataOnCommit) { + TableOperationsReflectionUtil reflectionUtil = TableOperationsReflectionUtil.getInstance(); + reflectionUtil.setMetadataFileLocation(metadata, newLocation); + reflectionUtil.setCurrentMetadata(this, metadata, newLocation); + } if (null == existingLocation) { createTableLike(tableIdentifier, entity); } else {