diff --git a/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java b/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java index 4954de2ce9..c433ca25d8 100644 --- a/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java +++ b/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java @@ -163,7 +163,11 @@ public T runInTransaction( return result; } catch (Exception e) { - tr.rollback(); + // For some transaction conflict errors, the transaction will already no longer be active; + // if it's still active, explicitly rollback. + if (tr.isActive()) { + tr.rollback(); + } LOGGER.debug("transaction rolled back", e); if (e instanceof OptimisticLockException @@ -205,7 +209,11 @@ public void runActionInTransaction( } } catch (Exception e) { LOGGER.debug("Rolling back transaction due to an error", e); - tr.rollback(); + // For some transaction conflict errors, the transaction will already no longer be active; + // if it's still active, explicitly rollback. + if (tr.isActive()) { + tr.rollback(); + } if (e instanceof OptimisticLockException || e.getCause() instanceof OptimisticLockException) { diff --git a/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkSessionCustomizer.java b/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkSessionCustomizer.java new file mode 100644 index 0000000000..faf149961a --- /dev/null +++ b/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkSessionCustomizer.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.persistence.impl.eclipselink; + +import org.eclipse.persistence.sessions.DatabaseLogin; +import org.eclipse.persistence.sessions.Session; +import org.eclipse.persistence.sessions.SessionCustomizer; + +/** + * This pattern of injecting a SessionCustomizer is taken from the EclipseLink guide documentation: + * + *

https://eclipse.dev/eclipselink/documentation/2.6/dbws/creating_dbws_services002.htm + */ +public class PolarisEclipseLinkSessionCustomizer implements SessionCustomizer { + @Override + public void customize(Session session) throws Exception { + DatabaseLogin databaseLogin = (DatabaseLogin) session.getDatasourceLogin(); + databaseLogin.setTransactionIsolation(DatabaseLogin.TRANSACTION_SERIALIZABLE); + } +} diff --git a/getting-started/assets/eclipselink/persistence.xml b/getting-started/assets/eclipselink/persistence.xml index 38c3676ded..e569a91832 100644 --- a/getting-started/assets/eclipselink/persistence.xml +++ b/getting-started/assets/eclipselink/persistence.xml @@ -38,6 +38,8 @@ + + - \ No newline at end of file + diff --git a/getting-started/eclipselink/docker-compose.yml b/getting-started/eclipselink/docker-compose.yml index 69370692fa..252ea53d64 100644 --- a/getting-started/eclipselink/docker-compose.yml +++ b/getting-started/eclipselink/docker-compose.yml @@ -85,6 +85,15 @@ services: POSTGRES_PASSWORD: postgres POSTGRES_DB: POLARIS POSTGRES_INITDB_ARGS: "--encoding UTF8 --data-checksums" + volumes: + # Bind local conf file to a convenient location in the container + - type: bind + source: ./postgresql.conf + target: /etc/postgresql/postgresql.conf + command: + - "postgres" + - "-c" + - "config_file=/etc/postgresql/postgresql.conf" healthcheck: test: "pg_isready -U postgres" interval: 5s diff --git a/getting-started/eclipselink/postgresql.conf b/getting-started/eclipselink/postgresql.conf new file mode 100644 index 0000000000..5594b72c16 --- /dev/null +++ b/getting-started/eclipselink/postgresql.conf @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# This contains the postgres config settings provided to the startup command +# as "postgres -c config_file=". +# +# See https://github.com/postgres/postgres/blob/master/src/backend/utils/misc/postgresql.conf.sample +# for more config options. + +# Required standard settings normally specified in default config +listen_addresses = '*' +max_connections = 100 + +shared_buffers = 128MB +dynamic_shared_memory_type = posix + +max_wal_size = 1GB +min_wal_size = 80MB + +log_timezone = 'Etc/UTC' +datestyle = 'iso, mdy' +timezone = 'Etc/UTC' + + +# Custom settings below + +# NOTE: It's best practice to explicitly set the isolation level from the +# application layer where possible, but in some cases this requires careful +# configuration to inject settings into JPA frameworks. This is provided here +# for defense-in-depth and for illustrative purposes if database customization +# is desired. +default_transaction_isolation = 'serializable' diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java index 5b351fc7f8..8b9a538721 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java @@ -22,6 +22,7 @@ import static org.apache.iceberg.types.Types.NestedField.required; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -57,12 +58,14 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.catalog.CatalogTests; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.BadRequestException; +import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.inmemory.InMemoryFileIO; @@ -1659,4 +1662,104 @@ public void testRegisterTableWithSlashlessMetadataLocation() { .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("Invalid metadata file location"); } + + @Test + public void testConcurrencyConflictCreateTableUpdatedDuringFinalTransaction() { + Assumptions.assumeTrue( + requiresNamespaceCreate(), + "Only applicable if namespaces must be created before adding children"); + Assumptions.assumeTrue( + supportsNestedNamespaces(), "Only applicable if nested namespaces are supported"); + + final String tableLocation = "s3://externally-owned-bucket/table/"; + final String tableMetadataLocation = tableLocation + "metadata/v1.metadata.json"; + + // Use a spy so that non-transactional pre-requisites succeed normally, but we inject + // a concurrency failure at final commit. + PolarisMetaStoreManager spyMetaStore = spy(metaStoreManager); + PolarisPassthroughResolutionView passthroughView = + new PolarisPassthroughResolutionView( + callContext, entityManager, securityContext, CATALOG_NAME); + final BasePolarisCatalog catalog = + new BasePolarisCatalog( + entityManager, + spyMetaStore, + callContext, + passthroughView, + securityContext, + Mockito.mock(TaskExecutor.class), + fileIOFactory); + catalog.initialize( + CATALOG_NAME, + ImmutableMap.of( + CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO")); + + Namespace namespace = Namespace.of("parent", "child1"); + + createNonExistingNamespaces(namespace); + + final TableIdentifier table = TableIdentifier.of(namespace, "conflict_table"); + + doReturn( + new PolarisMetaStoreManager.EntityResult( + BaseResult.ReturnStatus.ENTITY_ALREADY_EXISTS, + PolarisEntitySubType.TABLE.getCode())) + .when(spyMetaStore) + .createEntityIfNotExists(any(), any(), any()); + Assertions.assertThatThrownBy(() -> catalog.createTable(table, SCHEMA)) + .isInstanceOf(AlreadyExistsException.class) + .hasMessageContaining("conflict_table"); + } + + @Test + public void testConcurrencyConflictUpdateTableDuringFinalTransaction() { + Assumptions.assumeTrue( + requiresNamespaceCreate(), + "Only applicable if namespaces must be created before adding children"); + Assumptions.assumeTrue( + supportsNestedNamespaces(), "Only applicable if nested namespaces are supported"); + + final String tableLocation = "s3://externally-owned-bucket/table/"; + final String tableMetadataLocation = tableLocation + "metadata/v1.metadata.json"; + + // Use a spy so that non-transactional pre-requisites succeed normally, but we inject + // a concurrency failure at final commit. + PolarisMetaStoreManager spyMetaStore = spy(metaStoreManager); + PolarisPassthroughResolutionView passthroughView = + new PolarisPassthroughResolutionView( + callContext, entityManager, securityContext, CATALOG_NAME); + final BasePolarisCatalog catalog = + new BasePolarisCatalog( + entityManager, + spyMetaStore, + callContext, + passthroughView, + securityContext, + Mockito.mock(TaskExecutor.class), + fileIOFactory); + catalog.initialize( + CATALOG_NAME, + ImmutableMap.of( + CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO")); + Namespace namespace = Namespace.of("parent", "child1"); + + createNonExistingNamespaces(namespace); + + final TableIdentifier tableId = TableIdentifier.of(namespace, "conflict_table"); + + Table table = catalog.buildTable(tableId, SCHEMA).create(); + + doReturn( + new PolarisMetaStoreManager.EntityResult( + BaseResult.ReturnStatus.TARGET_ENTITY_CONCURRENTLY_MODIFIED, null)) + .when(spyMetaStore) + .updateEntityPropertiesIfNotChanged(any(), any(), any()); + + UpdateSchema update = table.updateSchema().addColumn("new_col", Types.LongType.get()); + Schema expected = update.apply(); + + Assertions.assertThatThrownBy(() -> update.commit()) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining("conflict_table"); + } } diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java index f06850d7f0..731d3b5d9f 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java @@ -1798,15 +1798,27 @@ private void createTableLike( entity = new PolarisEntity.Builder(entity).setCreateTimestamp(System.currentTimeMillis()).build(); - PolarisEntity returnedEntity = - PolarisEntity.of( - getMetaStoreManager() - .createEntityIfNotExists( - getCurrentPolarisContext(), PolarisEntity.toCoreList(catalogPath), entity)); - LOGGER.debug("Created TableLike entity {} with TableIdentifier {}", entity, identifier); - if (returnedEntity == null) { - // TODO: Error or retry? + PolarisMetaStoreManager.EntityResult res = + getMetaStoreManager() + .createEntityIfNotExists( + getCurrentPolarisContext(), PolarisEntity.toCoreList(catalogPath), entity); + if (!res.isSuccess()) { + switch (res.getReturnStatus()) { + case BaseResult.ReturnStatus.CATALOG_PATH_CANNOT_BE_RESOLVED: + throw new NotFoundException("Parent path does not exist for %s", identifier); + + case BaseResult.ReturnStatus.ENTITY_ALREADY_EXISTS: + throw new AlreadyExistsException("Table or View already exists: %s", identifier); + + default: + throw new IllegalStateException( + String.format( + "Unknown error status for identifier %s: %s with extraInfo: %s", + identifier, res.getReturnStatus(), res.getExtraInformation())); + } } + PolarisEntity resultEntity = PolarisEntity.of(res); + LOGGER.debug("Created TableLike entity {} with TableIdentifier {}", resultEntity, identifier); } private void updateTableLike(TableIdentifier identifier, PolarisEntity entity) { @@ -1823,17 +1835,28 @@ private void updateTableLike(TableIdentifier identifier, PolarisEntity entity) { validateLocationForTableLike(identifier, metadataLocation, resolvedEntities); List catalogPath = resolvedEntities.getRawParentPath(); - PolarisEntity returnedEntity = - Optional.ofNullable( - getMetaStoreManager() - .updateEntityPropertiesIfNotChanged( - getCurrentPolarisContext(), PolarisEntity.toCoreList(catalogPath), entity) - .getEntity()) - .map(PolarisEntity::new) - .orElse(null); - if (returnedEntity == null) { - // TODO: Error or retry? + PolarisMetaStoreManager.EntityResult res = + getMetaStoreManager() + .updateEntityPropertiesIfNotChanged( + getCurrentPolarisContext(), PolarisEntity.toCoreList(catalogPath), entity); + if (!res.isSuccess()) { + switch (res.getReturnStatus()) { + case BaseResult.ReturnStatus.CATALOG_PATH_CANNOT_BE_RESOLVED: + throw new NotFoundException("Parent path does not exist for %s", identifier); + + case BaseResult.ReturnStatus.TARGET_ENTITY_CONCURRENTLY_MODIFIED: + throw new CommitFailedException( + "Failed to commit Table or View %s because it was concurrently modified", identifier); + + default: + throw new IllegalStateException( + String.format( + "Unknown error status for identifier %s: %s with extraInfo: %s", + identifier, res.getReturnStatus(), res.getExtraInformation())); + } } + PolarisEntity resultEntity = PolarisEntity.of(res); + LOGGER.debug("Updated TableLike entity {} with TableIdentifier {}", resultEntity, identifier); } @SuppressWarnings("FormatStringAnnotation")