Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,11 @@ public <T> T runInTransaction(

return result;
} catch (Exception e) {
tr.rollback();
// For some transaction conflict errors, the transaction will already no longer be active;
// if it's still active, explicitly rollback.
if (tr.isActive()) {
tr.rollback();
}
LOGGER.debug("transaction rolled back", e);

if (e instanceof OptimisticLockException
Expand Down Expand Up @@ -205,7 +209,11 @@ public void runActionInTransaction(
}
} catch (Exception e) {
LOGGER.debug("Rolling back transaction due to an error", e);
tr.rollback();
// For some transaction conflict errors, the transaction will already no longer be active;
// if it's still active, explicitly rollback.
if (tr.isActive()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I guess that might be the reason for 500 errors in my old test with Serializable isolation.

tr.rollback();
}

if (e instanceof OptimisticLockException
|| e.getCause() instanceof OptimisticLockException) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.extension.persistence.impl.eclipselink;

import org.eclipse.persistence.sessions.DatabaseLogin;
import org.eclipse.persistence.sessions.Session;
import org.eclipse.persistence.sessions.SessionCustomizer;

/**
* This pattern of injecting a SessionCustomizer is taken from the EclipseLink guide documentation:
*
* <p>https://eclipse.dev/eclipselink/documentation/2.6/dbws/creating_dbws_services002.htm
*/
public class PolarisEclipseLinkSessionCustomizer implements SessionCustomizer {
@Override
public void customize(Session session) throws Exception {
DatabaseLogin databaseLogin = (DatabaseLogin) session.getDatasourceLogin();
databaseLogin.setTransactionIsolation(DatabaseLogin.TRANSACTION_SERIALIZABLE);
}
}
4 changes: 3 additions & 1 deletion getting-started/assets/eclipselink/persistence.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
<property name="jakarta.persistence.jdbc.password" value="postgres"/>
<property name="jakarta.persistence.schema-generation.database.action" value="create"/>
<property name="eclipselink.persistence-context.flush-mode" value="auto"/>
<property name="eclipselink.session.customizer" value="org.apache.polaris.extension.persistence.impl.eclipselink.PolarisEclipseLinkSessionCustomizer"/>
<property name="eclipselink.transaction.join-existing" value="true"/>
</properties>
</persistence-unit>
</persistence>
</persistence>
9 changes: 9 additions & 0 deletions getting-started/eclipselink/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ services:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: POLARIS
POSTGRES_INITDB_ARGS: "--encoding UTF8 --data-checksums"
volumes:
# Bind local conf file to a convenient location in the container
- type: bind
source: ./postgresql.conf
target: /etc/postgresql/postgresql.conf
command:
- "postgres"
- "-c"
- "config_file=/etc/postgresql/postgresql.conf"
healthcheck:
test: "pg_isready -U postgres"
interval: 5s
Expand Down
46 changes: 46 additions & 0 deletions getting-started/eclipselink/postgresql.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# This contains the postgres config settings provided to the startup command
# as "postgres -c config_file=<this file>".
#
# See https://github.com/postgres/postgres/blob/master/src/backend/utils/misc/postgresql.conf.sample
# for more config options.

# Required standard settings normally specified in default config
listen_addresses = '*'
max_connections = 100

shared_buffers = 128MB
dynamic_shared_memory_type = posix

max_wal_size = 1GB
min_wal_size = 80MB
Comment on lines +31 to +32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these just defaults, or where did these come from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah they were in the postgres docker image. Originally I copy pasted the whole file but it was too cluttered so I just literally filtered to keep only the non-commented-out default settings.


log_timezone = 'Etc/UTC'
datestyle = 'iso, mdy'
timezone = 'Etc/UTC'


# Custom settings below

# NOTE: It's best practice to explicitly set the isolation level from the
# application layer where possible, but in some cases this requires careful
# configuration to inject settings into JPA frameworks. This is provided here
# for defense-in-depth and for illustrative purposes if database customization
# is desired.
default_transaction_isolation = 'serializable'
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -57,12 +58,14 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.CatalogTests;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.BadRequestException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.inmemory.InMemoryFileIO;
Expand Down Expand Up @@ -1659,4 +1662,104 @@ public void testRegisterTableWithSlashlessMetadataLocation() {
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Invalid metadata file location");
}

@Test
public void testConcurrencyConflictCreateTableUpdatedDuringFinalTransaction() {
Assumptions.assumeTrue(
requiresNamespaceCreate(),
"Only applicable if namespaces must be created before adding children");
Assumptions.assumeTrue(
supportsNestedNamespaces(), "Only applicable if nested namespaces are supported");

final String tableLocation = "s3://externally-owned-bucket/table/";
final String tableMetadataLocation = tableLocation + "metadata/v1.metadata.json";

// Use a spy so that non-transactional pre-requisites succeed normally, but we inject
// a concurrency failure at final commit.
PolarisMetaStoreManager spyMetaStore = spy(metaStoreManager);
PolarisPassthroughResolutionView passthroughView =
new PolarisPassthroughResolutionView(
callContext, entityManager, securityContext, CATALOG_NAME);
final BasePolarisCatalog catalog =
new BasePolarisCatalog(
entityManager,
spyMetaStore,
callContext,
passthroughView,
securityContext,
Mockito.mock(TaskExecutor.class),
fileIOFactory);
catalog.initialize(
CATALOG_NAME,
ImmutableMap.of(
CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO"));

Namespace namespace = Namespace.of("parent", "child1");

createNonExistingNamespaces(namespace);

final TableIdentifier table = TableIdentifier.of(namespace, "conflict_table");

doReturn(
new PolarisMetaStoreManager.EntityResult(
BaseResult.ReturnStatus.ENTITY_ALREADY_EXISTS,
PolarisEntitySubType.TABLE.getCode()))
.when(spyMetaStore)
.createEntityIfNotExists(any(), any(), any());
Assertions.assertThatThrownBy(() -> catalog.createTable(table, SCHEMA))
.isInstanceOf(AlreadyExistsException.class)
.hasMessageContaining("conflict_table");
}

@Test
public void testConcurrencyConflictUpdateTableDuringFinalTransaction() {
Assumptions.assumeTrue(
requiresNamespaceCreate(),
"Only applicable if namespaces must be created before adding children");
Assumptions.assumeTrue(
supportsNestedNamespaces(), "Only applicable if nested namespaces are supported");

final String tableLocation = "s3://externally-owned-bucket/table/";
final String tableMetadataLocation = tableLocation + "metadata/v1.metadata.json";

// Use a spy so that non-transactional pre-requisites succeed normally, but we inject
// a concurrency failure at final commit.
PolarisMetaStoreManager spyMetaStore = spy(metaStoreManager);
PolarisPassthroughResolutionView passthroughView =
new PolarisPassthroughResolutionView(
callContext, entityManager, securityContext, CATALOG_NAME);
final BasePolarisCatalog catalog =
new BasePolarisCatalog(
entityManager,
spyMetaStore,
callContext,
passthroughView,
securityContext,
Mockito.mock(TaskExecutor.class),
fileIOFactory);
catalog.initialize(
CATALOG_NAME,
ImmutableMap.of(
CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO"));
Namespace namespace = Namespace.of("parent", "child1");

createNonExistingNamespaces(namespace);

final TableIdentifier tableId = TableIdentifier.of(namespace, "conflict_table");

Table table = catalog.buildTable(tableId, SCHEMA).create();

doReturn(
new PolarisMetaStoreManager.EntityResult(
BaseResult.ReturnStatus.TARGET_ENTITY_CONCURRENTLY_MODIFIED, null))
.when(spyMetaStore)
.updateEntityPropertiesIfNotChanged(any(), any(), any());

UpdateSchema update = table.updateSchema().addColumn("new_col", Types.LongType.get());
Schema expected = update.apply();

Assertions.assertThatThrownBy(() -> update.commit())
.isInstanceOf(CommitFailedException.class)
.hasMessageContaining("conflict_table");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1798,15 +1798,27 @@ private void createTableLike(
entity =
new PolarisEntity.Builder(entity).setCreateTimestamp(System.currentTimeMillis()).build();

PolarisEntity returnedEntity =
PolarisEntity.of(
getMetaStoreManager()
.createEntityIfNotExists(
getCurrentPolarisContext(), PolarisEntity.toCoreList(catalogPath), entity));
LOGGER.debug("Created TableLike entity {} with TableIdentifier {}", entity, identifier);
if (returnedEntity == null) {
// TODO: Error or retry?
PolarisMetaStoreManager.EntityResult res =
getMetaStoreManager()
.createEntityIfNotExists(
getCurrentPolarisContext(), PolarisEntity.toCoreList(catalogPath), entity);
if (!res.isSuccess()) {
switch (res.getReturnStatus()) {
case BaseResult.ReturnStatus.CATALOG_PATH_CANNOT_BE_RESOLVED:
throw new NotFoundException("Parent path does not exist for %s", identifier);

case BaseResult.ReturnStatus.ENTITY_ALREADY_EXISTS:
throw new AlreadyExistsException("Table or View already exists: %s", identifier);

default:
throw new IllegalStateException(
String.format(
"Unknown error status for identifier %s: %s with extraInfo: %s",
identifier, res.getReturnStatus(), res.getExtraInformation()));
}
}
PolarisEntity resultEntity = PolarisEntity.of(res);
LOGGER.debug("Created TableLike entity {} with TableIdentifier {}", resultEntity, identifier);
}

private void updateTableLike(TableIdentifier identifier, PolarisEntity entity) {
Expand All @@ -1823,17 +1835,28 @@ private void updateTableLike(TableIdentifier identifier, PolarisEntity entity) {
validateLocationForTableLike(identifier, metadataLocation, resolvedEntities);

List<PolarisEntity> catalogPath = resolvedEntities.getRawParentPath();
PolarisEntity returnedEntity =
Optional.ofNullable(
getMetaStoreManager()
.updateEntityPropertiesIfNotChanged(
getCurrentPolarisContext(), PolarisEntity.toCoreList(catalogPath), entity)
.getEntity())
.map(PolarisEntity::new)
.orElse(null);
if (returnedEntity == null) {
// TODO: Error or retry?
PolarisMetaStoreManager.EntityResult res =
getMetaStoreManager()
.updateEntityPropertiesIfNotChanged(
getCurrentPolarisContext(), PolarisEntity.toCoreList(catalogPath), entity);
if (!res.isSuccess()) {
switch (res.getReturnStatus()) {
case BaseResult.ReturnStatus.CATALOG_PATH_CANNOT_BE_RESOLVED:
throw new NotFoundException("Parent path does not exist for %s", identifier);

case BaseResult.ReturnStatus.TARGET_ENTITY_CONCURRENTLY_MODIFIED:
throw new CommitFailedException(
"Failed to commit Table or View %s because it was concurrently modified", identifier);

default:
throw new IllegalStateException(
String.format(
"Unknown error status for identifier %s: %s with extraInfo: %s",
identifier, res.getReturnStatus(), res.getExtraInformation()));
}
}
PolarisEntity resultEntity = PolarisEntity.of(res);
LOGGER.debug("Updated TableLike entity {} with TableIdentifier {}", resultEntity, identifier);
}

@SuppressWarnings("FormatStringAnnotation")
Expand Down