Skip to content

Commit ca38c97

Browse files
authored
Fix concurrency errors that could cause corruption or dropped updates (#1092)
* Explicitly set transaction isolation level to SERIALIZABLE and set eclipselink.transaction.join-existing Set the Isolation level in both a new EclipseLink SessionCustomizer as well as providing default postgresql.conf and injecting it in the getting-started docker-compose. Set eclipselink.transaction.join-existing in persistence.xml to force reads to go through the same write connection per EntityManager session, otherwise reads are not consistent with writes in a transaction. Only call rollback() if the transaction is still active, otherwise we get 500 server errors. * Fix handling of non-SUCCESS ReturnStatus in BasePolarisCatalog for createTableLike/updateTableLike Without this fix, even if the underlying database layer does the right thing in resolving concurrency conflicts, the API call will incorrectly return a 200 success. This isn't too common of a race condition under normal operation since BasePolarisCatalog does redundant state-checking right before the transaction (which maybe should be removed in the future for performance), but in theory this bug indeed could've caused dropped writes under high concurrency. * Instead of including the entire sample postgres.conf file, just include a note that the sample conf file can be found at the postgres github mirror, and then only specify the settings we immediately care about. * Add a link to the EclipseLink tutorial for injecting a SessionCustomizer for attribution of the PolarisEclipseLinkSessionCustomizer.java code. * Fix format * Add some other required default postgres.conf settings so that it can start successfully.
1 parent 1970319 commit ca38c97

File tree

7 files changed

+248
-21
lines changed

7 files changed

+248
-21
lines changed

extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,11 @@ public <T> T runInTransaction(
163163

164164
return result;
165165
} catch (Exception e) {
166-
tr.rollback();
166+
// For some transaction conflict errors, the transaction will already no longer be active;
167+
// if it's still active, explicitly rollback.
168+
if (tr.isActive()) {
169+
tr.rollback();
170+
}
167171
LOGGER.debug("transaction rolled back", e);
168172

169173
if (e instanceof OptimisticLockException
@@ -205,7 +209,11 @@ public void runActionInTransaction(
205209
}
206210
} catch (Exception e) {
207211
LOGGER.debug("Rolling back transaction due to an error", e);
208-
tr.rollback();
212+
// For some transaction conflict errors, the transaction will already no longer be active;
213+
// if it's still active, explicitly rollback.
214+
if (tr.isActive()) {
215+
tr.rollback();
216+
}
209217

210218
if (e instanceof OptimisticLockException
211219
|| e.getCause() instanceof OptimisticLockException) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.extension.persistence.impl.eclipselink;
20+
21+
import org.eclipse.persistence.sessions.DatabaseLogin;
22+
import org.eclipse.persistence.sessions.Session;
23+
import org.eclipse.persistence.sessions.SessionCustomizer;
24+
25+
/**
26+
* This pattern of injecting a SessionCustomizer is taken from the EclipseLink guide documentation:
27+
*
28+
* <p>https://eclipse.dev/eclipselink/documentation/2.6/dbws/creating_dbws_services002.htm
29+
*/
30+
public class PolarisEclipseLinkSessionCustomizer implements SessionCustomizer {
31+
@Override
32+
public void customize(Session session) throws Exception {
33+
DatabaseLogin databaseLogin = (DatabaseLogin) session.getDatasourceLogin();
34+
databaseLogin.setTransactionIsolation(DatabaseLogin.TRANSACTION_SERIALIZABLE);
35+
}
36+
}

getting-started/assets/eclipselink/persistence.xml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
<property name="jakarta.persistence.jdbc.password" value="postgres"/>
3939
<property name="jakarta.persistence.schema-generation.database.action" value="create"/>
4040
<property name="eclipselink.persistence-context.flush-mode" value="auto"/>
41+
<property name="eclipselink.session.customizer" value="org.apache.polaris.extension.persistence.impl.eclipselink.PolarisEclipseLinkSessionCustomizer"/>
42+
<property name="eclipselink.transaction.join-existing" value="true"/>
4143
</properties>
4244
</persistence-unit>
43-
</persistence>
45+
</persistence>

getting-started/eclipselink/docker-compose.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,15 @@ services:
8585
POSTGRES_PASSWORD: postgres
8686
POSTGRES_DB: POLARIS
8787
POSTGRES_INITDB_ARGS: "--encoding UTF8 --data-checksums"
88+
volumes:
89+
# Bind local conf file to a convenient location in the container
90+
- type: bind
91+
source: ./postgresql.conf
92+
target: /etc/postgresql/postgresql.conf
93+
command:
94+
- "postgres"
95+
- "-c"
96+
- "config_file=/etc/postgresql/postgresql.conf"
8897
healthcheck:
8998
test: "pg_isready -U postgres"
9099
interval: 5s
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
# This contains the postgres config settings provided to the startup command
19+
# as "postgres -c config_file=<this file>".
20+
#
21+
# See https://github.com/postgres/postgres/blob/master/src/backend/utils/misc/postgresql.conf.sample
22+
# for more config options.
23+
24+
# Required standard settings normally specified in default config
25+
listen_addresses = '*'
26+
max_connections = 100
27+
28+
shared_buffers = 128MB
29+
dynamic_shared_memory_type = posix
30+
31+
max_wal_size = 1GB
32+
min_wal_size = 80MB
33+
34+
log_timezone = 'Etc/UTC'
35+
datestyle = 'iso, mdy'
36+
timezone = 'Etc/UTC'
37+
38+
39+
# Custom settings below
40+
41+
# NOTE: It's best practice to explicitly set the isolation level from the
42+
# application layer where possible, but in some cases this requires careful
43+
# configuration to inject settings into JPA frameworks. This is provided here
44+
# for defense-in-depth and for illustrative purposes if database customization
45+
# is desired.
46+
default_transaction_isolation = 'serializable'

quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.apache.iceberg.types.Types.NestedField.required;
2323
import static org.mockito.ArgumentMatchers.any;
2424
import static org.mockito.ArgumentMatchers.isA;
25+
import static org.mockito.Mockito.doReturn;
2526
import static org.mockito.Mockito.doThrow;
2627
import static org.mockito.Mockito.spy;
2728
import static org.mockito.Mockito.when;
@@ -57,12 +58,14 @@
5758
import org.apache.iceberg.Table;
5859
import org.apache.iceberg.TableMetadata;
5960
import org.apache.iceberg.TableMetadataParser;
61+
import org.apache.iceberg.UpdateSchema;
6062
import org.apache.iceberg.catalog.CatalogTests;
6163
import org.apache.iceberg.catalog.Namespace;
6264
import org.apache.iceberg.catalog.SupportsNamespaces;
6365
import org.apache.iceberg.catalog.TableIdentifier;
6466
import org.apache.iceberg.exceptions.AlreadyExistsException;
6567
import org.apache.iceberg.exceptions.BadRequestException;
68+
import org.apache.iceberg.exceptions.CommitFailedException;
6669
import org.apache.iceberg.exceptions.ForbiddenException;
6770
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
6871
import org.apache.iceberg.inmemory.InMemoryFileIO;
@@ -1659,4 +1662,104 @@ public void testRegisterTableWithSlashlessMetadataLocation() {
16591662
.isInstanceOf(IllegalArgumentException.class)
16601663
.hasMessageContaining("Invalid metadata file location");
16611664
}
1665+
1666+
@Test
1667+
public void testConcurrencyConflictCreateTableUpdatedDuringFinalTransaction() {
1668+
Assumptions.assumeTrue(
1669+
requiresNamespaceCreate(),
1670+
"Only applicable if namespaces must be created before adding children");
1671+
Assumptions.assumeTrue(
1672+
supportsNestedNamespaces(), "Only applicable if nested namespaces are supported");
1673+
1674+
final String tableLocation = "s3://externally-owned-bucket/table/";
1675+
final String tableMetadataLocation = tableLocation + "metadata/v1.metadata.json";
1676+
1677+
// Use a spy so that non-transactional pre-requisites succeed normally, but we inject
1678+
// a concurrency failure at final commit.
1679+
PolarisMetaStoreManager spyMetaStore = spy(metaStoreManager);
1680+
PolarisPassthroughResolutionView passthroughView =
1681+
new PolarisPassthroughResolutionView(
1682+
callContext, entityManager, securityContext, CATALOG_NAME);
1683+
final BasePolarisCatalog catalog =
1684+
new BasePolarisCatalog(
1685+
entityManager,
1686+
spyMetaStore,
1687+
callContext,
1688+
passthroughView,
1689+
securityContext,
1690+
Mockito.mock(TaskExecutor.class),
1691+
fileIOFactory);
1692+
catalog.initialize(
1693+
CATALOG_NAME,
1694+
ImmutableMap.of(
1695+
CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO"));
1696+
1697+
Namespace namespace = Namespace.of("parent", "child1");
1698+
1699+
createNonExistingNamespaces(namespace);
1700+
1701+
final TableIdentifier table = TableIdentifier.of(namespace, "conflict_table");
1702+
1703+
doReturn(
1704+
new PolarisMetaStoreManager.EntityResult(
1705+
BaseResult.ReturnStatus.ENTITY_ALREADY_EXISTS,
1706+
PolarisEntitySubType.TABLE.getCode()))
1707+
.when(spyMetaStore)
1708+
.createEntityIfNotExists(any(), any(), any());
1709+
Assertions.assertThatThrownBy(() -> catalog.createTable(table, SCHEMA))
1710+
.isInstanceOf(AlreadyExistsException.class)
1711+
.hasMessageContaining("conflict_table");
1712+
}
1713+
1714+
@Test
1715+
public void testConcurrencyConflictUpdateTableDuringFinalTransaction() {
1716+
Assumptions.assumeTrue(
1717+
requiresNamespaceCreate(),
1718+
"Only applicable if namespaces must be created before adding children");
1719+
Assumptions.assumeTrue(
1720+
supportsNestedNamespaces(), "Only applicable if nested namespaces are supported");
1721+
1722+
final String tableLocation = "s3://externally-owned-bucket/table/";
1723+
final String tableMetadataLocation = tableLocation + "metadata/v1.metadata.json";
1724+
1725+
// Use a spy so that non-transactional pre-requisites succeed normally, but we inject
1726+
// a concurrency failure at final commit.
1727+
PolarisMetaStoreManager spyMetaStore = spy(metaStoreManager);
1728+
PolarisPassthroughResolutionView passthroughView =
1729+
new PolarisPassthroughResolutionView(
1730+
callContext, entityManager, securityContext, CATALOG_NAME);
1731+
final BasePolarisCatalog catalog =
1732+
new BasePolarisCatalog(
1733+
entityManager,
1734+
spyMetaStore,
1735+
callContext,
1736+
passthroughView,
1737+
securityContext,
1738+
Mockito.mock(TaskExecutor.class),
1739+
fileIOFactory);
1740+
catalog.initialize(
1741+
CATALOG_NAME,
1742+
ImmutableMap.of(
1743+
CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO"));
1744+
Namespace namespace = Namespace.of("parent", "child1");
1745+
1746+
createNonExistingNamespaces(namespace);
1747+
1748+
final TableIdentifier tableId = TableIdentifier.of(namespace, "conflict_table");
1749+
1750+
Table table = catalog.buildTable(tableId, SCHEMA).create();
1751+
1752+
doReturn(
1753+
new PolarisMetaStoreManager.EntityResult(
1754+
BaseResult.ReturnStatus.TARGET_ENTITY_CONCURRENTLY_MODIFIED, null))
1755+
.when(spyMetaStore)
1756+
.updateEntityPropertiesIfNotChanged(any(), any(), any());
1757+
1758+
UpdateSchema update = table.updateSchema().addColumn("new_col", Types.LongType.get());
1759+
Schema expected = update.apply();
1760+
1761+
Assertions.assertThatThrownBy(() -> update.commit())
1762+
.isInstanceOf(CommitFailedException.class)
1763+
.hasMessageContaining("conflict_table");
1764+
}
16621765
}

service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1798,15 +1798,27 @@ private void createTableLike(
17981798
entity =
17991799
new PolarisEntity.Builder(entity).setCreateTimestamp(System.currentTimeMillis()).build();
18001800

1801-
PolarisEntity returnedEntity =
1802-
PolarisEntity.of(
1803-
getMetaStoreManager()
1804-
.createEntityIfNotExists(
1805-
getCurrentPolarisContext(), PolarisEntity.toCoreList(catalogPath), entity));
1806-
LOGGER.debug("Created TableLike entity {} with TableIdentifier {}", entity, identifier);
1807-
if (returnedEntity == null) {
1808-
// TODO: Error or retry?
1801+
PolarisMetaStoreManager.EntityResult res =
1802+
getMetaStoreManager()
1803+
.createEntityIfNotExists(
1804+
getCurrentPolarisContext(), PolarisEntity.toCoreList(catalogPath), entity);
1805+
if (!res.isSuccess()) {
1806+
switch (res.getReturnStatus()) {
1807+
case BaseResult.ReturnStatus.CATALOG_PATH_CANNOT_BE_RESOLVED:
1808+
throw new NotFoundException("Parent path does not exist for %s", identifier);
1809+
1810+
case BaseResult.ReturnStatus.ENTITY_ALREADY_EXISTS:
1811+
throw new AlreadyExistsException("Table or View already exists: %s", identifier);
1812+
1813+
default:
1814+
throw new IllegalStateException(
1815+
String.format(
1816+
"Unknown error status for identifier %s: %s with extraInfo: %s",
1817+
identifier, res.getReturnStatus(), res.getExtraInformation()));
1818+
}
18091819
}
1820+
PolarisEntity resultEntity = PolarisEntity.of(res);
1821+
LOGGER.debug("Created TableLike entity {} with TableIdentifier {}", resultEntity, identifier);
18101822
}
18111823

18121824
private void updateTableLike(TableIdentifier identifier, PolarisEntity entity) {
@@ -1823,17 +1835,28 @@ private void updateTableLike(TableIdentifier identifier, PolarisEntity entity) {
18231835
validateLocationForTableLike(identifier, metadataLocation, resolvedEntities);
18241836

18251837
List<PolarisEntity> catalogPath = resolvedEntities.getRawParentPath();
1826-
PolarisEntity returnedEntity =
1827-
Optional.ofNullable(
1828-
getMetaStoreManager()
1829-
.updateEntityPropertiesIfNotChanged(
1830-
getCurrentPolarisContext(), PolarisEntity.toCoreList(catalogPath), entity)
1831-
.getEntity())
1832-
.map(PolarisEntity::new)
1833-
.orElse(null);
1834-
if (returnedEntity == null) {
1835-
// TODO: Error or retry?
1838+
PolarisMetaStoreManager.EntityResult res =
1839+
getMetaStoreManager()
1840+
.updateEntityPropertiesIfNotChanged(
1841+
getCurrentPolarisContext(), PolarisEntity.toCoreList(catalogPath), entity);
1842+
if (!res.isSuccess()) {
1843+
switch (res.getReturnStatus()) {
1844+
case BaseResult.ReturnStatus.CATALOG_PATH_CANNOT_BE_RESOLVED:
1845+
throw new NotFoundException("Parent path does not exist for %s", identifier);
1846+
1847+
case BaseResult.ReturnStatus.TARGET_ENTITY_CONCURRENTLY_MODIFIED:
1848+
throw new CommitFailedException(
1849+
"Failed to commit Table or View %s because it was concurrently modified", identifier);
1850+
1851+
default:
1852+
throw new IllegalStateException(
1853+
String.format(
1854+
"Unknown error status for identifier %s: %s with extraInfo: %s",
1855+
identifier, res.getReturnStatus(), res.getExtraInformation()));
1856+
}
18361857
}
1858+
PolarisEntity resultEntity = PolarisEntity.of(res);
1859+
LOGGER.debug("Updated TableLike entity {} with TableIdentifier {}", resultEntity, identifier);
18371860
}
18381861

18391862
@SuppressWarnings("FormatStringAnnotation")

0 commit comments

Comments
 (0)