diff --git a/persistence/relational-jdbc/build.gradle.kts b/persistence/relational-jdbc/build.gradle.kts index f1ca24b0bd..39fe5614f4 100644 --- a/persistence/relational-jdbc/build.gradle.kts +++ b/persistence/relational-jdbc/build.gradle.kts @@ -36,6 +36,9 @@ dependencies { implementation(libs.smallrye.common.annotation) // @Identifier implementation(libs.postgresql) + compileOnly(project(":polaris-immutables")) + annotationProcessor(project(":polaris-immutables", configuration = "processor")) + testImplementation(libs.mockito.junit.jupiter) testImplementation(libs.h2) testImplementation(testFixtures(project(":polaris-core"))) diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatabaseType.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatabaseType.java index f731eb5508..c617e505e6 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatabaseType.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatabaseType.java @@ -62,9 +62,10 @@ public InputStream openInitScriptResource(@Nonnull SchemaOptions schemaOptions) } else { final String schemaSuffix; switch (schemaOptions.schemaVersion()) { - case null -> schemaSuffix = "schema-v2.sql"; + case null -> schemaSuffix = "schema-v3.sql"; case 1 -> schemaSuffix = "schema-v1.sql"; case 2 -> schemaSuffix = "schema-v2.sql"; + case 3 -> schemaSuffix = "schema-v3.sql"; default -> throw new IllegalArgumentException( "Unknown schema version " + schemaOptions.schemaVersion()); diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java index 346cd3f1c7..d391c36a77 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java @@ -32,11 +32,13 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.Objects; import java.util.Random; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -196,6 +198,63 @@ public int executeUpdate(QueryGenerator.PreparedQuery preparedQuery) throws SQLE }); } + /** + * Executes the INSERT/UPDATE Queries in batches. Requires that all SQL queries have the same + * parameterized form. + * + * @param preparedQueries : queries to be executed + * @return : Number of rows modified / inserted. + * @throws SQLException : Exception during Query Execution. + */ + public int executeBatchUpdate(QueryGenerator.PreparedBatchQuery preparedQueries) + throws SQLException { + if (preparedQueries.parametersList().isEmpty() || preparedQueries.sql().isEmpty()) { + return 0; + } + int batchSize = 100; + AtomicInteger successCount = new AtomicInteger(); + return withRetries( + () -> { + try (Connection connection = borrowConnection(); + PreparedStatement statement = connection.prepareStatement(preparedQueries.sql())) { + boolean autoCommit = connection.getAutoCommit(); + boolean success = false; + connection.setAutoCommit(false); + + try { + for (int i = 1; i <= preparedQueries.parametersList().size(); i++) { + List params = preparedQueries.parametersList().get(i - 1); + for (int j = 0; j < params.size(); j++) { + statement.setObject(j + 1, params.get(j)); + } + + statement.addBatch(); // Add to batch + + if (i % batchSize == 0) { + successCount.addAndGet(Arrays.stream(statement.executeBatch()).sum()); + } + } + + // Execute remaining queries in the batch + successCount.addAndGet(Arrays.stream(statement.executeBatch()).sum()); + success = true; + } finally { + try { + if (success) { + connection.commit(); + } else { + connection.rollback(); + successCount.set(0); + } + } finally { + connection.setAutoCommit(autoCommit); + } + } + } + return successCount.get(); + }); + } + /** * Transaction callback to be executed. * diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java index 9cbb6057d6..5a0427b381 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java @@ -47,6 +47,7 @@ import org.apache.polaris.core.entity.PolarisEntityId; import org.apache.polaris.core.entity.PolarisEntitySubType; import org.apache.polaris.core.entity.PolarisEntityType; +import org.apache.polaris.core.entity.PolarisEvent; import org.apache.polaris.core.entity.PolarisGrantRecord; import org.apache.polaris.core.entity.PolarisPrincipalSecrets; import org.apache.polaris.core.persistence.BaseMetaStoreManager; @@ -67,6 +68,7 @@ import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; import org.apache.polaris.core.storage.StorageLocation; import org.apache.polaris.persistence.relational.jdbc.models.ModelEntity; +import org.apache.polaris.persistence.relational.jdbc.models.ModelEvent; import org.apache.polaris.persistence.relational.jdbc.models.ModelGrantRecord; import org.apache.polaris.persistence.relational.jdbc.models.ModelPolicyMappingRecord; import org.apache.polaris.persistence.relational.jdbc.models.ModelPrincipalAuthenticationData; @@ -248,6 +250,63 @@ public void writeToGrantRecords( } } + @Override + public void writeEvents(@Nonnull List events) { + if (events.isEmpty()) { + return; // or throw if empty list is invalid + } + + try { + // Generate the SQL using the first event as the reference + PreparedQuery firstPreparedQuery = + QueryGenerator.generateInsertQuery( + ModelEvent.ALL_COLUMNS, + ModelEvent.TABLE_NAME, + ModelEvent.fromEvent(events.getFirst()) + .toMap(datasourceOperations.getDatabaseType()) + .values() + .stream() + .toList(), + realmId); + String expectedSql = firstPreparedQuery.sql(); + + List> parametersList = new ArrayList<>(); + parametersList.add(firstPreparedQuery.parameters()); + + // Process remaining events and verify SQL consistency + for (int i = 1; i < events.size(); i++) { + PolarisEvent event = events.get(i); + PreparedQuery pq = + QueryGenerator.generateInsertQuery( + ModelEvent.ALL_COLUMNS, + ModelEvent.TABLE_NAME, + ModelEvent.fromEvent(event) + .toMap(datasourceOperations.getDatabaseType()) + .values() + .stream() + .toList(), + realmId); + + if (!expectedSql.equals(pq.sql())) { + throw new RuntimeException("All events did not generate the same SQL"); + } + + parametersList.add(pq.parameters()); + } + + int totalUpdated = + datasourceOperations.executeBatchUpdate( + new QueryGenerator.PreparedBatchQuery(expectedSql, parametersList)); + + if (totalUpdated == 0) { + throw new SQLException("No events were inserted."); + } + } catch (SQLException e) { + throw new RuntimeException( + String.format("Failed to write events due to %s", e.getMessage()), e); + } + } + @Override public void deleteEntity(@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity) { ModelEntity modelEntity = ModelEntity.fromEntity(entity); diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java index a06bf283a0..fac862ccd2 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java @@ -44,6 +44,9 @@ public class QueryGenerator { /** A container for the SQL string and the ordered parameter values. */ public record PreparedQuery(String sql, List parameters) {} + /** A container for the SQL string and a list of the ordered parameter values. */ + public record PreparedBatchQuery(String sql, List> parametersList) {} + /** A container for the query fragment SQL string and the ordered parameter values. */ record QueryFragment(String sql, List parameters) {} diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEvent.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEvent.java new file mode 100644 index 0000000000..8700cff8bf --- /dev/null +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEvent.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.persistence.relational.jdbc.models; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.immutables.PolarisImmutable; +import org.apache.polaris.persistence.relational.jdbc.DatabaseType; + +@PolarisImmutable +public interface ModelEvent extends Converter { + String TABLE_NAME = "EVENTS"; + + List ALL_COLUMNS = + List.of( + "catalog_id", + "event_id", + "request_id", + "event_type", + "timestamp_ms", + "principal_name", + "resource_type", + "resource_identifier", + "additional_properties"); + + // catalog id + String getCatalogId(); + + // event id + String getEventId(); + + // id of the request that generated this event + String getRequestId(); + + // event type that was created + String getEventType(); + + // timestamp in epoch milliseconds of when this event was emitted + long getTimestampMs(); + + // polaris principal who took this action + String getPrincipalName(); + + // Enum that states the type of resource was being operated on + PolarisEvent.ResourceType getResourceType(); + + // Which resource was operated on + String getResourceIdentifier(); + + // Additional parameters that were not earlier recorded + String getAdditionalProperties(); + + @Override + default PolarisEvent fromResultSet(ResultSet rs) throws SQLException { + var modelEvent = + ImmutableModelEvent.builder() + .catalogId(rs.getString("catalog_id")) + .eventId(rs.getString("event_id")) + .requestId(rs.getString("request_id")) + .eventType(rs.getString("event_type")) + .timestampMs(rs.getLong("timestamp_ms")) + .principalName(rs.getString("actor")) + .resourceType(PolarisEvent.ResourceType.valueOf(rs.getString("resource_type"))) + .resourceIdentifier(rs.getString("resource_identifier")) + .additionalProperties(rs.getString("additional_properties")) + .build(); + return toEvent(modelEvent); + } + + @Override + default Map toMap(DatabaseType databaseType) { + Map map = new LinkedHashMap<>(); + map.put("catalog_id", getCatalogId()); + map.put("event_id", getEventId()); + map.put("request_id", getRequestId()); + map.put("event_type", getEventType()); + map.put("timestamp_ms", getTimestampMs()); + map.put("principal_name", getPrincipalName()); + map.put("resource_type", getResourceType().toString()); + map.put("resource_identifier", getResourceIdentifier()); + if (databaseType.equals(DatabaseType.POSTGRES)) { + map.put("additional_properties", toJsonbPGobject(getAdditionalProperties())); + } else { + map.put("additional_properties", getAdditionalProperties()); + } + return map; + } + + static ModelEvent fromEvent(PolarisEvent event) { + if (event == null) return null; + + return ImmutableModelEvent.builder() + .catalogId(event.getCatalogId()) + .eventId(event.getId()) + .requestId(event.getRequestId()) + .eventType(event.getEventType()) + .timestampMs(event.getTimestampMs()) + .principalName(event.getPrincipalName()) + .resourceType(event.getResourceType()) + .resourceIdentifier(event.getResourceIdentifier()) + .additionalProperties(event.getAdditionalProperties()) + .build(); + } + + static PolarisEvent toEvent(ModelEvent model) { + if (model == null) return null; + + PolarisEvent polarisEvent = + new PolarisEvent( + model.getCatalogId(), + model.getEventId(), + model.getRequestId(), + model.getEventType(), + model.getTimestampMs(), + model.getPrincipalName(), + model.getResourceType(), + model.getResourceIdentifier()); + polarisEvent.setAdditionalProperties(model.getAdditionalProperties()); + return polarisEvent; + } +} diff --git a/persistence/relational-jdbc/src/main/resources/h2/schema-v3.sql b/persistence/relational-jdbc/src/main/resources/h2/schema-v3.sql new file mode 100644 index 0000000000..947799246a --- /dev/null +++ b/persistence/relational-jdbc/src/main/resources/h2/schema-v3.sql @@ -0,0 +1,140 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file-- +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"). You may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. +-- + +-- Changes from v2: +-- * Added `events` table + +CREATE SCHEMA IF NOT EXISTS POLARIS_SCHEMA; +SET SCHEMA POLARIS_SCHEMA; + +CREATE TABLE IF NOT EXISTS version ( + version_key VARCHAR PRIMARY KEY, + version_value INTEGER NOT NULL +); + +MERGE INTO version (version_key, version_value) + KEY (version_key) + VALUES ('version', 2); + +-- H2 supports COMMENT, but some modes may ignore it +COMMENT ON TABLE version IS 'the version of the JDBC schema in use'; + +DROP TABLE IF EXISTS entities; +CREATE TABLE IF NOT EXISTS entities ( + realm_id TEXT NOT NULL, + catalog_id BIGINT NOT NULL, + id BIGINT NOT NULL, + parent_id BIGINT NOT NULL, + name TEXT NOT NULL, + entity_version INT NOT NULL, + type_code INT NOT NULL, + sub_type_code INT NOT NULL, + create_timestamp BIGINT NOT NULL, + drop_timestamp BIGINT NOT NULL, + purge_timestamp BIGINT NOT NULL, + to_purge_timestamp BIGINT NOT NULL, + last_update_timestamp BIGINT NOT NULL, + properties TEXT NOT NULL DEFAULT '{}', + internal_properties TEXT NOT NULL DEFAULT '{}', + grant_records_version INT NOT NULL, + location_without_scheme TEXT, + PRIMARY KEY (realm_id, id), + CONSTRAINT constraint_name UNIQUE (realm_id, catalog_id, parent_id, type_code, name) +); + +CREATE INDEX IF NOT EXISTS idx_locations ON entities(realm_id, catalog_id, location_without_scheme); + +-- TODO: create indexes based on all query pattern. +CREATE INDEX IF NOT EXISTS idx_entities ON entities (realm_id, catalog_id, id); + +COMMENT ON TABLE entities IS 'all the entities'; + +COMMENT ON COLUMN entities.catalog_id IS 'catalog id'; +COMMENT ON COLUMN entities.id IS 'entity id'; +COMMENT ON COLUMN entities.parent_id IS 'entity id of parent'; +COMMENT ON COLUMN entities.name IS 'entity name'; +COMMENT ON COLUMN entities.entity_version IS 'version of the entity'; +COMMENT ON COLUMN entities.type_code IS 'type code'; +COMMENT ON COLUMN entities.sub_type_code IS 'sub type of entity'; +COMMENT ON COLUMN entities.create_timestamp IS 'creation time of entity'; +COMMENT ON COLUMN entities.drop_timestamp IS 'time of drop of entity'; +COMMENT ON COLUMN entities.purge_timestamp IS 'time to start purging entity'; +COMMENT ON COLUMN entities.last_update_timestamp IS 'last time the entity is touched'; +COMMENT ON COLUMN entities.properties IS 'entities properties json'; +COMMENT ON COLUMN entities.internal_properties IS 'entities internal properties json'; +COMMENT ON COLUMN entities.grant_records_version IS 'the version of grant records change on the entity'; + +DROP TABLE IF EXISTS grant_records; +CREATE TABLE IF NOT EXISTS grant_records ( + realm_id TEXT NOT NULL, + securable_catalog_id BIGINT NOT NULL, + securable_id BIGINT NOT NULL, + grantee_catalog_id BIGINT NOT NULL, + grantee_id BIGINT NOT NULL, + privilege_code INTEGER, + PRIMARY KEY (realm_id, securable_catalog_id, securable_id, grantee_catalog_id, grantee_id, privilege_code) +); + +COMMENT ON TABLE grant_records IS 'grant records for entities'; +COMMENT ON COLUMN grant_records.securable_catalog_id IS 'catalog id of the securable'; +COMMENT ON COLUMN grant_records.securable_id IS 'entity id of the securable'; +COMMENT ON COLUMN grant_records.grantee_catalog_id IS 'catalog id of the grantee'; +COMMENT ON COLUMN grant_records.grantee_id IS 'id of the grantee'; +COMMENT ON COLUMN grant_records.privilege_code IS 'privilege code'; + +DROP TABLE IF EXISTS principal_authentication_data; +CREATE TABLE IF NOT EXISTS principal_authentication_data ( + realm_id TEXT NOT NULL, + principal_id BIGINT NOT NULL, + principal_client_id VARCHAR(255) NOT NULL, + main_secret_hash VARCHAR(255) NOT NULL, + secondary_secret_hash VARCHAR(255) NOT NULL, + secret_salt VARCHAR(255) NOT NULL, + PRIMARY KEY (realm_id, principal_client_id) +); + +COMMENT ON TABLE principal_authentication_data IS 'authentication data for client'; + +DROP TABLE IF EXISTS policy_mapping_record; +CREATE TABLE IF NOT EXISTS policy_mapping_record ( + realm_id TEXT NOT NULL, + target_catalog_id BIGINT NOT NULL, + target_id BIGINT NOT NULL, + policy_type_code INTEGER NOT NULL, + policy_catalog_id BIGINT NOT NULL, + policy_id BIGINT NOT NULL, + parameters TEXT NOT NULL DEFAULT '{}', + PRIMARY KEY (realm_id, target_catalog_id, target_id, policy_type_code, policy_catalog_id, policy_id) +); + +CREATE INDEX IF NOT EXISTS idx_policy_mapping_record ON policy_mapping_record (realm_id, policy_type_code, policy_catalog_id, policy_id, target_catalog_id, target_id); + +CREATE TABLE IF NOT EXISTS events ( + realm_id TEXT NOT NULL, + catalog_id TEXT NOT NULL, + event_id TEXT NOT NULL, + request_id TEXT NOT NULL, + event_type TEXT NOT NULL, + timestamp_ms BIGINT NOT NULL, + principal_name TEXT, + resource_type TEXT NOT NULL, + resource_identifier TEXT NOT NULL, + additional_properties JSONB NOT NULL DEFAULT '{}'::JSONB, + PRIMARY KEY (event_id) +); \ No newline at end of file diff --git a/persistence/relational-jdbc/src/main/resources/postgres/schema-v3.sql b/persistence/relational-jdbc/src/main/resources/postgres/schema-v3.sql new file mode 100644 index 0000000000..38a1ee4927 --- /dev/null +++ b/persistence/relational-jdbc/src/main/resources/postgres/schema-v3.sql @@ -0,0 +1,137 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file-- +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"). You may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Changes from v2: +-- * Added `events` table + +CREATE SCHEMA IF NOT EXISTS POLARIS_SCHEMA; +SET search_path TO POLARIS_SCHEMA; + +CREATE TABLE IF NOT EXISTS version ( + version_key TEXT PRIMARY KEY, + version_value INTEGER NOT NULL +); +INSERT INTO version (version_key, version_value) +VALUES ('version', 3) +ON CONFLICT (version_key) DO UPDATE +SET version_value = EXCLUDED.version_value; +COMMENT ON TABLE version IS 'the version of the JDBC schema in use'; + +CREATE TABLE IF NOT EXISTS entities ( + realm_id TEXT NOT NULL, + catalog_id BIGINT NOT NULL, + id BIGINT NOT NULL, + parent_id BIGINT NOT NULL, + name TEXT NOT NULL, + entity_version INT NOT NULL, + type_code INT NOT NULL, + sub_type_code INT NOT NULL, + create_timestamp BIGINT NOT NULL, + drop_timestamp BIGINT NOT NULL, + purge_timestamp BIGINT NOT NULL, + to_purge_timestamp BIGINT NOT NULL, + last_update_timestamp BIGINT NOT NULL, + properties JSONB not null default '{}'::JSONB, + internal_properties JSONB not null default '{}'::JSONB, + grant_records_version INT NOT NULL, + location_without_scheme TEXT, + PRIMARY KEY (realm_id, id), + CONSTRAINT constraint_name UNIQUE (realm_id, catalog_id, parent_id, type_code, name) +); + +-- TODO: create indexes based on all query pattern. +CREATE INDEX IF NOT EXISTS idx_entities ON entities (realm_id, catalog_id, id); +CREATE INDEX IF NOT EXISTS idx_locations + ON entities USING btree (realm_id, parent_id, location_without_scheme) + WHERE location_without_scheme IS NOT NULL; + +COMMENT ON TABLE entities IS 'all the entities'; + +COMMENT ON COLUMN entities.realm_id IS 'realm_id used for multi-tenancy'; +COMMENT ON COLUMN entities.catalog_id IS 'catalog id'; +COMMENT ON COLUMN entities.id IS 'entity id'; +COMMENT ON COLUMN entities.parent_id IS 'entity id of parent'; +COMMENT ON COLUMN entities.name IS 'entity name'; +COMMENT ON COLUMN entities.entity_version IS 'version of the entity'; +COMMENT ON COLUMN entities.type_code IS 'type code'; +COMMENT ON COLUMN entities.sub_type_code IS 'sub type of entity'; +COMMENT ON COLUMN entities.create_timestamp IS 'creation time of entity'; +COMMENT ON COLUMN entities.drop_timestamp IS 'time of drop of entity'; +COMMENT ON COLUMN entities.purge_timestamp IS 'time to start purging entity'; +COMMENT ON COLUMN entities.last_update_timestamp IS 'last time the entity is touched'; +COMMENT ON COLUMN entities.properties IS 'entities properties json'; +COMMENT ON COLUMN entities.internal_properties IS 'entities internal properties json'; +COMMENT ON COLUMN entities.grant_records_version IS 'the version of grant records change on the entity'; + +CREATE TABLE IF NOT EXISTS grant_records ( + realm_id TEXT NOT NULL, + securable_catalog_id BIGINT NOT NULL, + securable_id BIGINT NOT NULL, + grantee_catalog_id BIGINT NOT NULL, + grantee_id BIGINT NOT NULL, + privilege_code INTEGER, + PRIMARY KEY (realm_id, securable_catalog_id, securable_id, grantee_catalog_id, grantee_id, privilege_code) +); + +COMMENT ON TABLE grant_records IS 'grant records for entities'; + +COMMENT ON COLUMN grant_records.securable_catalog_id IS 'catalog id of the securable'; +COMMENT ON COLUMN grant_records.securable_id IS 'entity id of the securable'; +COMMENT ON COLUMN grant_records.grantee_catalog_id IS 'catalog id of the grantee'; +COMMENT ON COLUMN grant_records.grantee_id IS 'id of the grantee'; +COMMENT ON COLUMN grant_records.privilege_code IS 'privilege code'; + +CREATE TABLE IF NOT EXISTS principal_authentication_data ( + realm_id TEXT NOT NULL, + principal_id BIGINT NOT NULL, + principal_client_id VARCHAR(255) NOT NULL, + main_secret_hash VARCHAR(255) NOT NULL, + secondary_secret_hash VARCHAR(255) NOT NULL, + secret_salt VARCHAR(255) NOT NULL, + PRIMARY KEY (realm_id, principal_client_id) +); + +COMMENT ON TABLE principal_authentication_data IS 'authentication data for client'; + +DROP TABLE IF EXISTS policy_mapping_record; +CREATE TABLE IF NOT EXISTS policy_mapping_record ( + realm_id TEXT NOT NULL, + target_catalog_id BIGINT NOT NULL, + target_id BIGINT NOT NULL, + policy_type_code INTEGER NOT NULL, + policy_catalog_id BIGINT NOT NULL, + policy_id BIGINT NOT NULL, + parameters JSONB NOT NULL DEFAULT '{}'::JSONB, + PRIMARY KEY (realm_id, target_catalog_id, target_id, policy_type_code, policy_catalog_id, policy_id) +); + +CREATE INDEX IF NOT EXISTS idx_policy_mapping_record ON policy_mapping_record (realm_id, policy_type_code, policy_catalog_id, policy_id, target_catalog_id, target_id); + +CREATE TABLE IF NOT EXISTS events ( + realm_id TEXT NOT NULL, + catalog_id TEXT NOT NULL, + event_id TEXT NOT NULL, + request_id TEXT NOT NULL, + event_type TEXT NOT NULL, + timestamp_ms BIGINT NOT NULL, + principal_name TEXT, + resource_type TEXT NOT NULL, + resource_identifier TEXT NOT NULL, + additional_properties JSONB NOT NULL DEFAULT '{}'::JSONB, + PRIMARY KEY (event_id) +); diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperationsTest.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperationsTest.java index 26f452a087..f9d5c69428 100644 --- a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperationsTest.java +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperationsTest.java @@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.reset; @@ -33,11 +34,15 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.time.Instant; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import javax.sql.DataSource; +import org.apache.polaris.core.entity.PolarisEvent; import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations.Operation; +import org.apache.polaris.persistence.relational.jdbc.models.ImmutableModelEvent; import org.apache.polaris.persistence.relational.jdbc.models.ModelEntity; +import org.apache.polaris.persistence.relational.jdbc.models.ModelEvent; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -92,6 +97,41 @@ void testExecuteUpdate_failure() throws Exception { assertThrows(SQLException.class, () -> datasourceOperations.executeUpdate(query)); } + @Test + void executeBatchUpdate_success() throws Exception { + // There is no way to track how many statements are in a batch, so we are testing how many times + // `executeBatch` is being called + when(mockDataSource.getConnection()).thenReturn(mockConnection); + String sql = + "INSERT INTO POLARIS_SCHEMA.EVENTS (catalog_id, event_id, request_id, event_type, timestamp_ms, principal_name, resource_type, resource_identifier, additional_properties, realm_id) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"; + List> queryParams = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + ModelEvent modelEvent = + ImmutableModelEvent.builder() + .resourceType(PolarisEvent.ResourceType.CATALOG) + .resourceIdentifier("catalog_" + i) + .catalogId("catalog_" + i) + .eventId("event_" + i) + .requestId("request_" + i) + .eventType("event_type1") + .timestampMs(1234) + .principalName("principal_" + i) + .additionalProperties("") + .build(); + queryParams.add( + modelEvent.toMap(datasourceOperations.getDatabaseType()).values().stream().toList()); + } + when(mockConnection.prepareStatement(any())).thenReturn(mockPreparedStatement); + when(mockPreparedStatement.executeBatch()).thenReturn(new int[] {100}); + + int result = + datasourceOperations.executeBatchUpdate( + new QueryGenerator.PreparedBatchQuery(sql, queryParams)); + assertEquals( + queryParams.size() + 100, + result); // ExecuteBatch will be called once more than actual batches + } + @Test void testExecuteSelect_exception() throws Exception { when(mockDataSource.getConnection()).thenReturn(mockConnection); diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEventTest.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEventTest.java new file mode 100644 index 0000000000..fa78de0885 --- /dev/null +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEventTest.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.persistence.relational.jdbc.models; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.persistence.relational.jdbc.DatabaseType; +import org.junit.jupiter.api.Test; +import org.postgresql.util.PGobject; + +public class ModelEventTest { + // Column names + private static final String CATALOG_ID = "catalog_id"; + private static final String EVENT_ID = "event_id"; + private static final String REQUEST_ID = "request_id"; + private static final String EVENT_TYPE = "event_type"; + private static final String TIMESTAMP_MS = "timestamp_ms"; + private static final String ACTOR = "actor"; + private static final String PRINCIPAL_NAME = "principal_name"; + private static final String RESOURCE_TYPE = "resource_type"; + private static final String RESOURCE_IDENTIFIER = "resource_identifier"; + private static final String ADDITIONAL_PROPERTIES = "additional_properties"; + + // Test data values + private static final String TEST_CATALOG_ID = "test-catalog"; + private static final String TEST_EVENT_ID = "event-123"; + private static final String TEST_REQUEST_ID = "req-456"; + private static final String TEST_EVENT_TYPE = "CREATE"; + private static final long TEST_TIMESTAMP_MS = 1234567890L; + private static final String TEST_USER = "test-user"; + private static final PolarisEvent.ResourceType TEST_RESOURCE_TYPE = + PolarisEvent.ResourceType.TABLE; + private static final String TEST_RESOURCE_TYPE_STRING = "TABLE"; + private static final String TEST_RESOURCE_IDENTIFIER = "test-table"; + private static final String EMPTY_JSON = "{}"; + private static final String TEST_JSON = "{\"key\":\"value\"}"; + + // Dummy values for test initialization + private static final String DUMMY = "dummy"; + private static final long DUMMY_TIMESTAMP = 0L; + private static final PolarisEvent.ResourceType DUMMY_RESOURCE_TYPE = + PolarisEvent.ResourceType.CATALOG; + + @Test + public void testFromResultSet() throws SQLException { + // Arrange + ResultSet mockResultSet = mock(ResultSet.class); + when(mockResultSet.getString(CATALOG_ID)).thenReturn(TEST_CATALOG_ID); + when(mockResultSet.getString(EVENT_ID)).thenReturn(TEST_EVENT_ID); + when(mockResultSet.getString(REQUEST_ID)).thenReturn(TEST_REQUEST_ID); + when(mockResultSet.getString(EVENT_TYPE)).thenReturn(TEST_EVENT_TYPE); + when(mockResultSet.getLong(TIMESTAMP_MS)).thenReturn(TEST_TIMESTAMP_MS); + when(mockResultSet.getString(ACTOR)).thenReturn(TEST_USER); + when(mockResultSet.getString(RESOURCE_TYPE)).thenReturn(TEST_RESOURCE_TYPE_STRING); + when(mockResultSet.getString(RESOURCE_IDENTIFIER)).thenReturn(TEST_RESOURCE_IDENTIFIER); + when(mockResultSet.getString(ADDITIONAL_PROPERTIES)).thenReturn(EMPTY_JSON); + + // Create a concrete implementation of ModelEvent for testing + ModelEvent modelEvent = + ImmutableModelEvent.builder() + .catalogId(DUMMY) + .eventId(DUMMY) + .requestId(DUMMY) + .eventType(DUMMY) + .timestampMs(DUMMY_TIMESTAMP) + .principalName(DUMMY) + .resourceType(DUMMY_RESOURCE_TYPE) + .resourceIdentifier(DUMMY) + .additionalProperties(EMPTY_JSON) + .build(); + + // Act + PolarisEvent result = modelEvent.fromResultSet(mockResultSet); + + // Assert + assertEquals(TEST_CATALOG_ID, result.getCatalogId()); + assertEquals(TEST_EVENT_ID, result.getId()); + assertEquals(TEST_REQUEST_ID, result.getRequestId()); + assertEquals(TEST_EVENT_TYPE, result.getEventType()); + assertEquals(TEST_TIMESTAMP_MS, result.getTimestampMs()); + assertEquals(TEST_USER, result.getPrincipalName()); + assertEquals(TEST_RESOURCE_TYPE, result.getResourceType()); + assertEquals(TEST_RESOURCE_IDENTIFIER, result.getResourceIdentifier()); + assertEquals(EMPTY_JSON, result.getAdditionalProperties()); + } + + @Test + public void testToMapWithH2DatabaseType() { + // Arrange + ModelEvent modelEvent = + ImmutableModelEvent.builder() + .catalogId(TEST_CATALOG_ID) + .eventId(TEST_EVENT_ID) + .requestId(TEST_REQUEST_ID) + .eventType(TEST_EVENT_TYPE) + .timestampMs(TEST_TIMESTAMP_MS) + .principalName(TEST_USER) + .resourceType(TEST_RESOURCE_TYPE) + .resourceIdentifier(TEST_RESOURCE_IDENTIFIER) + .additionalProperties(TEST_JSON) + .build(); + + // Act + Map resultMap = modelEvent.toMap(DatabaseType.H2); + + // Assert + assertEquals(TEST_CATALOG_ID, resultMap.get(CATALOG_ID)); + assertEquals(TEST_EVENT_ID, resultMap.get(EVENT_ID)); + assertEquals(TEST_REQUEST_ID, resultMap.get(REQUEST_ID)); + assertEquals(TEST_EVENT_TYPE, resultMap.get(EVENT_TYPE)); + assertEquals(TEST_TIMESTAMP_MS, resultMap.get(TIMESTAMP_MS)); + assertEquals(TEST_USER, resultMap.get(PRINCIPAL_NAME)); + assertEquals(TEST_RESOURCE_TYPE_STRING, resultMap.get(RESOURCE_TYPE)); + assertEquals(TEST_RESOURCE_IDENTIFIER, resultMap.get(RESOURCE_IDENTIFIER)); + assertEquals(TEST_JSON, resultMap.get(ADDITIONAL_PROPERTIES)); + } + + @Test + public void testToMapWithPostgresType() { + // Arrange + ModelEvent modelEvent = + ImmutableModelEvent.builder() + .catalogId(TEST_CATALOG_ID) + .eventId(TEST_EVENT_ID) + .requestId(TEST_REQUEST_ID) + .eventType(TEST_EVENT_TYPE) + .timestampMs(TEST_TIMESTAMP_MS) + .principalName(TEST_USER) + .resourceType(TEST_RESOURCE_TYPE) + .resourceIdentifier(TEST_RESOURCE_IDENTIFIER) + .additionalProperties(TEST_JSON) + .build(); + + // Act + Map resultMap = modelEvent.toMap(DatabaseType.POSTGRES); + + // Assert + assertEquals(TEST_CATALOG_ID, resultMap.get(CATALOG_ID)); + assertEquals(TEST_EVENT_ID, resultMap.get(EVENT_ID)); + assertEquals(TEST_REQUEST_ID, resultMap.get(REQUEST_ID)); + assertEquals(TEST_EVENT_TYPE, resultMap.get(EVENT_TYPE)); + assertEquals(TEST_TIMESTAMP_MS, resultMap.get(TIMESTAMP_MS)); + assertEquals(TEST_USER, resultMap.get(PRINCIPAL_NAME)); + assertEquals(TEST_RESOURCE_TYPE_STRING, resultMap.get(RESOURCE_TYPE)); + assertEquals(TEST_RESOURCE_IDENTIFIER, resultMap.get(RESOURCE_IDENTIFIER)); + + // For PostgreSQL, the additional properties should be a PGobject of type "jsonb" + PGobject pgObject = (PGobject) resultMap.get(ADDITIONAL_PROPERTIES); + assertEquals("jsonb", pgObject.getType()); + assertEquals(TEST_JSON, pgObject.getValue()); + } + + @Test + public void testFromEventWithNullInput() { + // Act + ModelEvent result = ModelEvent.fromEvent(null); + + // Assert + assertNull(result); + } + + @Test + public void testFromEvent() { + // Arrange + PolarisEvent polarisEvent = + new PolarisEvent( + TEST_CATALOG_ID, + TEST_EVENT_ID, + TEST_REQUEST_ID, + TEST_EVENT_TYPE, + TEST_TIMESTAMP_MS, + TEST_USER, + TEST_RESOURCE_TYPE, + TEST_RESOURCE_IDENTIFIER); + polarisEvent.setAdditionalProperties(TEST_JSON); + + // Act + ModelEvent result = ModelEvent.fromEvent(polarisEvent); + + // Assert + assertEquals(TEST_CATALOG_ID, result.getCatalogId()); + assertEquals(TEST_EVENT_ID, result.getEventId()); + assertEquals(TEST_REQUEST_ID, result.getRequestId()); + assertEquals(TEST_EVENT_TYPE, result.getEventType()); + assertEquals(TEST_TIMESTAMP_MS, result.getTimestampMs()); + assertEquals(TEST_USER, result.getPrincipalName()); + assertEquals(TEST_RESOURCE_TYPE, result.getResourceType()); + assertEquals(TEST_RESOURCE_IDENTIFIER, result.getResourceIdentifier()); + assertEquals(TEST_JSON, result.getAdditionalProperties()); + } + + @Test + public void testToEventWithNullInput() { + // Act + PolarisEvent result = ModelEvent.toEvent(null); + + // Assert + assertNull(result); + } + + @Test + public void testToEvent() { + // Arrange + ModelEvent modelEvent = + ImmutableModelEvent.builder() + .catalogId(TEST_CATALOG_ID) + .eventId(TEST_EVENT_ID) + .requestId(TEST_REQUEST_ID) + .eventType(TEST_EVENT_TYPE) + .timestampMs(TEST_TIMESTAMP_MS) + .principalName(TEST_USER) + .resourceType(TEST_RESOURCE_TYPE) + .resourceIdentifier(TEST_RESOURCE_IDENTIFIER) + .additionalProperties(TEST_JSON) + .build(); + + // Act + PolarisEvent result = ModelEvent.toEvent(modelEvent); + + // Assert + assertEquals(TEST_CATALOG_ID, result.getCatalogId()); + assertEquals(TEST_EVENT_ID, result.getId()); + assertEquals(TEST_REQUEST_ID, result.getRequestId()); + assertEquals(TEST_EVENT_TYPE, result.getEventType()); + assertEquals(TEST_TIMESTAMP_MS, result.getTimestampMs()); + assertEquals(TEST_USER, result.getPrincipalName()); + assertEquals(TEST_RESOURCE_TYPE, result.getResourceType()); + assertEquals(TEST_RESOURCE_IDENTIFIER, result.getResourceIdentifier()); + assertEquals(TEST_JSON, result.getAdditionalProperties()); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java b/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java new file mode 100644 index 0000000000..010108193a --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.core.entity; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Map; + +public class PolarisEvent { + // TODO: Look into using the CDI-managed `ObjectMapper` object + public static final String EMPTY_MAP_STRING = "{}"; + + // to serialize/deserialize properties + private static final ObjectMapper MAPPER = new ObjectMapper(); + + // catalog id + private String catalogId; + + // event id + private String id; + + // id of the request that generated this event + private String requestId; + + // event type that was fired + private String eventType; + + // timestamp in epoch milliseconds of when this event was emitted + private long timestampMs; + + // polaris principal who took this action + private String principalName; + + // Enum that states the type of resource was being operated on + private ResourceType resourceType; + + // Which resource was operated on + private String resourceIdentifier; + + // Additional parameters that were not earlier recorded + private String additionalProperties; + + public String getCatalogId() { + return catalogId; + } + + public String getId() { + return id; + } + + public String getRequestId() { + return requestId; + } + + public String getEventType() { + return eventType; + } + + public long getTimestampMs() { + return timestampMs; + } + + public String getPrincipalName() { + return principalName; + } + + public ResourceType getResourceType() { + return resourceType; + } + + public String getResourceIdentifier() { + return resourceIdentifier; + } + + public String getAdditionalProperties() { + return additionalProperties != null ? additionalProperties : EMPTY_MAP_STRING; + } + + public PolarisEvent( + String catalogId, + String id, + String requestId, + String eventType, + long timestampMs, + String actor, + ResourceType resourceType, + String resourceIdentifier) { + this.catalogId = catalogId; + this.id = id; + this.requestId = requestId; + this.eventType = eventType; + this.timestampMs = timestampMs; + this.principalName = actor; + this.resourceType = resourceType; + this.resourceIdentifier = resourceIdentifier; + } + + @JsonIgnore + public void setAdditionalProperties(Map properties) { + try { + this.additionalProperties = properties == null ? null : MAPPER.writeValueAsString(properties); + } catch (JsonProcessingException ex) { + throw new IllegalStateException( + String.format("Failed to serialize json. properties %s", properties), ex); + } + } + + public void setAdditionalProperties(String additionalProperties) { + this.additionalProperties = additionalProperties; + } + + public enum ResourceType { + CATALOG, + NAMESPACE, + TABLE, + VIEW + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEventManager.java b/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEventManager.java new file mode 100644 index 0000000000..b4b7891338 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEventManager.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.core.entity; + +import jakarta.annotation.Nonnull; +import java.util.List; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.persistence.BasePersistence; + +public interface PolarisEventManager { + @Nonnull + default void writeEvents( + @Nonnull PolarisCallContext callCtx, @Nonnull List polarisEvents) { + BasePersistence ms = callCtx.getMetaStore(); + ms.writeEvents(polarisEvents); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java index 6134e5af6e..4fa60e8c5d 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java @@ -34,6 +34,7 @@ import org.apache.polaris.core.entity.PolarisEntityId; import org.apache.polaris.core.entity.PolarisEntitySubType; import org.apache.polaris.core.entity.PolarisEntityType; +import org.apache.polaris.core.entity.PolarisEvent; import org.apache.polaris.core.entity.PolarisGrantRecord; import org.apache.polaris.core.persistence.pagination.Page; import org.apache.polaris.core.persistence.pagination.PageToken; @@ -136,6 +137,13 @@ void writeEntities( void writeToGrantRecords( @Nonnull PolarisCallContext callCtx, @Nonnull PolarisGrantRecord grantRec); + /** + * Write all events to the events table. This is an append-only operation. + * + * @param events events to persist + */ + void writeEvents(@Nonnull List events); + /** * Delete this entity from the meta store. * diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java index ffe3ca2a26..9f13faf6dc 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java @@ -34,6 +34,7 @@ import org.apache.polaris.core.entity.PolarisEntityId; import org.apache.polaris.core.entity.PolarisEntitySubType; import org.apache.polaris.core.entity.PolarisEntityType; +import org.apache.polaris.core.entity.PolarisEventManager; import org.apache.polaris.core.entity.PrincipalEntity; import org.apache.polaris.core.entity.PrincipalRoleEntity; import org.apache.polaris.core.persistence.dao.entity.BaseResult; @@ -60,7 +61,8 @@ public interface PolarisMetaStoreManager extends PolarisSecretsManager, PolarisGrantManager, PolarisCredentialVendor, - PolarisPolicyMappingManager { + PolarisPolicyMappingManager, + PolarisEventManager { /** * Bootstrap the Polaris service, creating the root catalog, root principal, and associated diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java index 7f414ef9dd..55a005d398 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java @@ -35,6 +35,7 @@ import org.apache.polaris.core.entity.PolarisEntityId; import org.apache.polaris.core.entity.PolarisEntitySubType; import org.apache.polaris.core.entity.PolarisEntityType; +import org.apache.polaris.core.entity.PolarisEvent; import org.apache.polaris.core.entity.PolarisPrivilege; import org.apache.polaris.core.persistence.dao.entity.BaseResult; import org.apache.polaris.core.persistence.dao.entity.ChangeTrackingResult; @@ -436,4 +437,11 @@ Optional> hasOverlappingSiblings( diagnostics.fail("illegal_method_in_transaction_workspace", "loadPoliciesOnEntityByType"); return null; } + + @Nonnull + @Override + public void writeEvents( + @Nonnull PolarisCallContext callCtx, @Nonnull List polarisEvents) { + diagnostics.fail("illegal_method_in_transaction_workspace", "writeEvents"); + } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java index d5b323fabe..848a8421e5 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java @@ -34,6 +34,7 @@ import org.apache.polaris.core.entity.PolarisEntityId; import org.apache.polaris.core.entity.PolarisEntitySubType; import org.apache.polaris.core.entity.PolarisEntityType; +import org.apache.polaris.core.entity.PolarisEvent; import org.apache.polaris.core.entity.PolarisGrantRecord; import org.apache.polaris.core.entity.PolarisPrincipalSecrets; import org.apache.polaris.core.persistence.EntityAlreadyExistsException; @@ -273,6 +274,11 @@ public void writeToGrantRecords( runActionInTransaction(callCtx, () -> this.writeToGrantRecordsInCurrentTxn(callCtx, grantRec)); } + @Override + public void writeEvents(@Nonnull List events) { + throw new UnsupportedOperationException("Not implemented for transactional persistence."); + } + /** {@inheritDoc} */ @Override public void deleteEntity(@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity) { diff --git a/runtime/admin/src/main/java/org/apache/polaris/admintool/BootstrapCommand.java b/runtime/admin/src/main/java/org/apache/polaris/admintool/BootstrapCommand.java index 2d75b4c17d..c1a772409b 100644 --- a/runtime/admin/src/main/java/org/apache/polaris/admintool/BootstrapCommand.java +++ b/runtime/admin/src/main/java/org/apache/polaris/admintool/BootstrapCommand.java @@ -83,7 +83,7 @@ static class SchemaInputOptions { @CommandLine.Option( names = {"-v", "--schema-version"}, paramLabel = "", - description = "The version of the schema to load in [1, 2, LATEST].") + description = "The version of the schema to load in [1, 2, 3, LATEST].") Integer schemaVersion; @CommandLine.Option( diff --git a/runtime/defaults/src/main/resources/application.properties b/runtime/defaults/src/main/resources/application.properties index 794ea13375..a521e8277f 100644 --- a/runtime/defaults/src/main/resources/application.properties +++ b/runtime/defaults/src/main/resources/application.properties @@ -115,7 +115,6 @@ polaris.features."ENFORCE_PRINCIPAL_CREDENTIAL_ROTATION_REQUIRED_CHECKING"=false polaris.features."SUPPORTED_CATALOG_STORAGE_TYPES"=["S3","GCS","AZURE"] # polaris.features."ENABLE_CATALOG_FEDERATION"=true polaris.features."SUPPORTED_CATALOG_CONNECTION_TYPES"=["ICEBERG_REST"] -polaris.features."SUPPORTED_EXTERNAL_CATALOG_AUTHENTICATION_TYPES"=["OAUTH", "BEARER"] # realm overrides # polaris.features.realm-overrides."my-realm"."SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION"=true @@ -123,12 +122,16 @@ polaris.features."SUPPORTED_EXTERNAL_CATALOG_AUTHENTICATION_TYPES"=["OAUTH", "BE # polaris.persistence.type=eclipse-link # polaris.persistence.type=in-memory-atomic polaris.persistence.type=in-memory +# polaris.persistence.type=relational-jdbc polaris.secrets-manager.type=in-memory polaris.file-io.type=default polaris.event-listener.type=no-op +# polaris.event-listener.type=persistence-in-memory-buffer +# polaris.event-listener.persistence-in-memory-buffer.buffer-time=5000ms +# polaris.event-listener.persistence-in-memory-buffer.max-buffer-size=5 polaris.log.request-id-header-name=Polaris-Request-Id # polaris.log.mdc.aid=polaris diff --git a/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java b/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java index 48846ac145..31bbd6cdd6 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java @@ -86,6 +86,8 @@ import org.apache.polaris.service.admin.api.PolarisPrincipalRolesApiService; import org.apache.polaris.service.admin.api.PolarisPrincipalsApiService; import org.apache.polaris.service.config.ReservedProperties; +import org.apache.polaris.service.events.AfterCatalogCreatedEvent; +import org.apache.polaris.service.events.listeners.PolarisEventListener; import org.apache.polaris.service.types.PolicyIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,6 +107,7 @@ public class PolarisServiceImpl private final CallContext callContext; private final RealmConfig realmConfig; private final ReservedProperties reservedProperties; + private final PolarisEventListener polarisEventListener; @Inject public PolarisServiceImpl( @@ -114,7 +117,8 @@ public PolarisServiceImpl( UserSecretsManagerFactory userSecretsManagerFactory, PolarisAuthorizer polarisAuthorizer, CallContext callContext, - ReservedProperties reservedProperties) { + ReservedProperties reservedProperties, + PolarisEventListener polarisEventListener) { this.diagnostics = diagnostics; this.resolutionManifestFactory = resolutionManifestFactory; this.metaStoreManagerFactory = metaStoreManagerFactory; @@ -123,6 +127,7 @@ public PolarisServiceImpl( this.callContext = callContext; this.realmConfig = callContext.getRealmConfig(); this.reservedProperties = reservedProperties; + this.polarisEventListener = polarisEventListener; } private PolarisAdminService newAdminService( @@ -173,6 +178,7 @@ public Response createCatalog( validateExternalCatalog(catalog); Catalog newCatalog = CatalogEntity.of(adminService.createCatalog(request)).asCatalog(); LOGGER.info("Created new catalog {}", newCatalog); + polarisEventListener.onAfterCatalogCreated(new AfterCatalogCreatedEvent(newCatalog.getName())); return Response.status(Response.Status.CREATED).build(); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index b59adcc1f6..7402de80d5 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -139,7 +139,7 @@ import org.apache.polaris.service.events.BeforeTableRefreshedEvent; import org.apache.polaris.service.events.BeforeViewCommitedEvent; import org.apache.polaris.service.events.BeforeViewRefreshedEvent; -import org.apache.polaris.service.events.PolarisEventListener; +import org.apache.polaris.service.events.listeners.PolarisEventListener; import org.apache.polaris.service.task.TaskExecutor; import org.apache.polaris.service.types.NotificationRequest; import org.apache.polaris.service.types.NotificationType; @@ -1361,7 +1361,8 @@ public void doRefresh() { if (latestLocation == null) { disableRefresh(); } else { - polarisEventListener.onBeforeTableRefreshed(new BeforeTableRefreshedEvent(tableIdentifier)); + polarisEventListener.onBeforeTableRefreshed( + new BeforeTableRefreshedEvent(catalogName, tableIdentifier)); refreshFromMetadataLocation( latestLocation, SHOULD_RETRY_REFRESH_PREDICATE, @@ -1381,7 +1382,8 @@ public void doRefresh() { Set.of(PolarisStorageActions.READ, PolarisStorageActions.LIST)); return TableMetadataParser.read(fileIO, metadataLocation); }); - polarisEventListener.onAfterTableRefreshed(new AfterTableRefreshedEvent(tableIdentifier)); + polarisEventListener.onAfterTableRefreshed( + new AfterTableRefreshedEvent(catalogName, tableIdentifier)); } } @@ -1390,7 +1392,10 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { new BeforeTableCommitedEvent(tableIdentifier, base, metadata)); LOGGER.debug( - "doCommit for table {} with base {}, metadata {}", tableIdentifier, base, metadata); + "doCommit for table {} with metadataBefore {}, metadataAfter {}", + tableIdentifier, + base, + metadata); // TODO: Maybe avoid writing metadata if there's definitely a transaction conflict if (null == base && !namespaceExists(tableIdentifier.namespace())) { throw new NoSuchNamespaceException( @@ -1529,7 +1534,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { } polarisEventListener.onAfterTableCommited( - new AfterTableCommitedEvent(tableIdentifier, base, metadata)); + new AfterTableCommitedEvent(catalogName, tableIdentifier, base, metadata)); } @Override @@ -1720,7 +1725,8 @@ public void doRefresh() { if (latestLocation == null) { disableRefresh(); } else { - polarisEventListener.onBeforeViewRefreshed(new BeforeViewRefreshedEvent(identifier)); + polarisEventListener.onBeforeViewRefreshed( + new BeforeViewRefreshedEvent(catalogName, identifier)); refreshFromMetadataLocation( latestLocation, SHOULD_RETRY_REFRESH_PREDICATE, @@ -1742,16 +1748,21 @@ public void doRefresh() { return ViewMetadataParser.read(fileIO.newInputFile(metadataLocation)); }); - polarisEventListener.onAfterViewRefreshed(new AfterViewRefreshedEvent(identifier)); + polarisEventListener.onAfterViewRefreshed( + new AfterViewRefreshedEvent(catalogName, identifier)); } } public void doCommit(ViewMetadata base, ViewMetadata metadata) { polarisEventListener.onBeforeViewCommited( - new BeforeViewCommitedEvent(identifier, base, metadata)); + new BeforeViewCommitedEvent(catalogName, identifier, base, metadata)); // TODO: Maybe avoid writing metadata if there's definitely a transaction conflict - LOGGER.debug("doCommit for view {} with base {}, metadata {}", identifier, base, metadata); + LOGGER.debug( + "doCommit for view {} with metadataBefore {}, metadataAfter {}", + identifier, + base, + metadata); if (null == base && !namespaceExists(identifier.namespace())) { throw new NoSuchNamespaceException( "Cannot create view '%s'. Namespace does not exist: '%s'", @@ -1845,7 +1856,7 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) { } polarisEventListener.onAfterViewCommited( - new AfterViewCommitedEvent(identifier, base, metadata)); + new AfterViewCommitedEvent(catalogName, identifier, base, metadata)); } protected String writeNewMetadataIfRequired(ViewMetadata metadata) { diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java index 1b0ea9b02c..7ed3e2d298 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java @@ -85,6 +85,7 @@ import org.apache.polaris.service.catalog.common.CatalogAdapter; import org.apache.polaris.service.config.ReservedProperties; import org.apache.polaris.service.context.catalog.CallContextCatalogFactory; +import org.apache.polaris.service.events.listeners.PolarisEventListener; import org.apache.polaris.service.http.IcebergHttpUtil; import org.apache.polaris.service.http.IfNoneMatch; import org.apache.polaris.service.types.CommitTableRequest; @@ -153,6 +154,7 @@ public class IcebergCatalogAdapter private final ReservedProperties reservedProperties; private final CatalogHandlerUtils catalogHandlerUtils; private final Instance externalCatalogFactories; + private final PolarisEventListener polarisEventListener; @Inject public IcebergCatalogAdapter( @@ -168,7 +170,8 @@ public IcebergCatalogAdapter( CatalogPrefixParser prefixParser, ReservedProperties reservedProperties, CatalogHandlerUtils catalogHandlerUtils, - @Any Instance externalCatalogFactories) { + @Any Instance externalCatalogFactories, + PolarisEventListener polarisEventListener) { this.diagnostics = diagnostics; this.realmContext = realmContext; this.callContext = callContext; @@ -183,6 +186,7 @@ public IcebergCatalogAdapter( this.reservedProperties = reservedProperties; this.catalogHandlerUtils = catalogHandlerUtils; this.externalCatalogFactories = externalCatalogFactories; + this.polarisEventListener = polarisEventListener; } /** @@ -221,7 +225,8 @@ IcebergCatalogHandler newHandlerWrapper(SecurityContext securityContext, String polarisAuthorizer, reservedProperties, catalogHandlerUtils, - externalCatalogFactories); + externalCatalogFactories, + polarisEventListener); } @Override diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index beb5c0f649..863e1aef52 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -102,6 +102,8 @@ import org.apache.polaris.service.catalog.common.CatalogHandler; import org.apache.polaris.service.config.ReservedProperties; import org.apache.polaris.service.context.catalog.CallContextCatalogFactory; +import org.apache.polaris.service.events.AfterTableCreatedEvent; +import org.apache.polaris.service.events.listeners.PolarisEventListener; import org.apache.polaris.service.http.IcebergHttpUtil; import org.apache.polaris.service.http.IfNoneMatch; import org.apache.polaris.service.types.NotificationRequest; @@ -130,6 +132,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab private final CallContextCatalogFactory catalogFactory; private final ReservedProperties reservedProperties; private final CatalogHandlerUtils catalogHandlerUtils; + private final PolarisEventListener polarisEventListener; // Catalog instance will be initialized after authorizing resolver successfully resolves // the catalog entity. @@ -152,7 +155,8 @@ public IcebergCatalogHandler( PolarisAuthorizer authorizer, ReservedProperties reservedProperties, CatalogHandlerUtils catalogHandlerUtils, - Instance externalCatalogFactories) { + Instance externalCatalogFactories, + PolarisEventListener polarisEventListener) { super( diagnostics, callContext, @@ -166,6 +170,7 @@ public IcebergCatalogHandler( this.catalogFactory = catalogFactory; this.reservedProperties = reservedProperties; this.catalogHandlerUtils = catalogHandlerUtils; + this.polarisEventListener = polarisEventListener; } private CatalogEntity getResolvedCatalogEntity() { @@ -387,8 +392,11 @@ public LoadTableResponse createTableDirect(Namespace namespace, CreateTableReque .withWriteOrder(request.writeOrder()) .setProperties(reservedProperties.removeReservedProperties(request.properties())) .build(); - return catalogHandlerUtils.createTable( - baseCatalog, namespace, requestWithoutReservedProperties); + LoadTableResponse resp = + catalogHandlerUtils.createTable(baseCatalog, namespace, requestWithoutReservedProperties); + polarisEventListener.onAfterTableCreated( + new AfterTableCreatedEvent(catalogName, identifier, resp.tableMetadata())); + return resp; } /** diff --git a/runtime/service/src/main/java/org/apache/polaris/service/config/ProductionReadinessChecks.java b/runtime/service/src/main/java/org/apache/polaris/service/config/ProductionReadinessChecks.java index ca1373ea5f..50d0746910 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/config/ProductionReadinessChecks.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/config/ProductionReadinessChecks.java @@ -42,8 +42,8 @@ import org.apache.polaris.service.context.DefaultRealmContextResolver; import org.apache.polaris.service.context.RealmContextResolver; import org.apache.polaris.service.context.TestRealmContextResolver; -import org.apache.polaris.service.events.PolarisEventListener; -import org.apache.polaris.service.events.TestPolarisEventListener; +import org.apache.polaris.service.events.listeners.PolarisEventListener; +import org.apache.polaris.service.events.listeners.TestPolarisEventListener; import org.apache.polaris.service.metrics.MetricsConfiguration; import org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory; import org.eclipse.microprofile.config.Config; @@ -225,7 +225,9 @@ public ProductionReadinessCheck checkPolarisEventListener( PolarisEventListener polarisEventListener) { if (polarisEventListener instanceof TestPolarisEventListener) { return ProductionReadinessCheck.of( - Error.of("TestPolarisEventListener is intended for tests only.", "polaris.events.type")); + Error.of( + "TestPolarisEventListener is intended for tests only.", + "polaris.event-listener.type")); } return ProductionReadinessCheck.OK; } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java b/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java index 4d954258f8..ff822e293c 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java @@ -70,8 +70,8 @@ import org.apache.polaris.service.context.RealmContextConfiguration; import org.apache.polaris.service.context.RealmContextFilter; import org.apache.polaris.service.context.RealmContextResolver; -import org.apache.polaris.service.events.PolarisEventListener; import org.apache.polaris.service.events.PolarisEventListenerConfiguration; +import org.apache.polaris.service.events.listeners.PolarisEventListener; import org.apache.polaris.service.persistence.PersistenceConfiguration; import org.apache.polaris.service.ratelimiter.RateLimiter; import org.apache.polaris.service.ratelimiter.RateLimiterFilterConfiguration; diff --git a/runtime/service/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java b/runtime/service/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java index 9cd8fd4c33..2efe9a65b1 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java @@ -36,7 +36,7 @@ import org.apache.polaris.core.storage.cache.StorageCredentialCache; import org.apache.polaris.service.catalog.iceberg.IcebergCatalog; import org.apache.polaris.service.catalog.io.FileIOFactory; -import org.apache.polaris.service.events.PolarisEventListener; +import org.apache.polaris.service.events.listeners.PolarisEventListener; import org.apache.polaris.service.task.TaskExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterCatalogCreatedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterCatalogCreatedEvent.java new file mode 100644 index 0000000000..0ed7cef48b --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterCatalogCreatedEvent.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events; + +/** Emitted Polaris creates a catalog. */ +public record AfterCatalogCreatedEvent(String catalogName) implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java index c952997df1..a69d304483 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java @@ -25,10 +25,11 @@ * Emitted after Polaris performs a commit to a table. This is not emitted if there's an exception * while committing. * + * @param catalogName The catalog name where this table exists. * @param identifier The identifier. * @param base The old metadata. * @param metadata The new metadata. */ public record AfterTableCommitedEvent( - TableIdentifier identifier, TableMetadata base, TableMetadata metadata) + String catalogName, TableIdentifier identifier, TableMetadata base, TableMetadata metadata) implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCreatedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCreatedEvent.java new file mode 100644 index 0000000000..dd8939a358 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCreatedEvent.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events; + +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.TableIdentifier; + +/** Emitted when Polaris creates a table. */ +public record AfterTableCreatedEvent( + String catalogName, TableIdentifier identifier, TableMetadata metadata) + implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java index be38a8baaa..caef8e3c53 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java @@ -23,6 +23,8 @@ /** * Emitted after Polaris refreshes its known version of a table's metadata by fetching the latest. * + * @param catalogName The name of the catalog where the table is located * @param tableIdentifier The identifier of the table that was refreshed. */ -public record AfterTableRefreshedEvent(TableIdentifier tableIdentifier) implements PolarisEvent {} +public record AfterTableRefreshedEvent(String catalogName, TableIdentifier tableIdentifier) + implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java index 638ba84fbc..d1ccfb1960 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java @@ -18,17 +18,13 @@ */ package org.apache.polaris.service.events; -import org.apache.polaris.core.context.CallContext; - /** * Emitted after an attempt of an async task, such as manifest file cleanup, finishes. * * @param taskEntityId The ID of the TaskEntity. - * @param callContext The CallContext the task is being executed under. * @param attempt The attempt number. Each retry of the task will have its own attempt number. The * initial (non-retried) attempt starts counting from 1. - * @param success Whether or not the attempt succeeded. + * @param success Whether the attempt succeeded. */ -public record AfterTaskAttemptedEvent( - long taskEntityId, CallContext callContext, int attempt, boolean success) +public record AfterTaskAttemptedEvent(long taskEntityId, int attempt, boolean success) implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java index eb2ca24149..a7d01ecc29 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java @@ -25,9 +25,11 @@ * Emitted after Polaris performs a commit to a view. This is not emitted if there's an exception * while committing. * + * @param catalogName The catalog name where the view is located * @param identifier The identifier. * @param base The old metadata. * @param metadata The new metadata. */ public record AfterViewCommitedEvent( - TableIdentifier identifier, ViewMetadata base, ViewMetadata metadata) implements PolarisEvent {} + String catalogName, TableIdentifier identifier, ViewMetadata base, ViewMetadata metadata) + implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java index 249220ddd7..b44fa2a510 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java @@ -23,6 +23,8 @@ /** * Emitted after Polaris refreshes its known version of a view's metadata by fetching the latest. * + * @param catalogName * @param viewIdentifier The identifier of the view that was refreshed. */ -public record AfterViewRefreshedEvent(TableIdentifier viewIdentifier) implements PolarisEvent {} +public record AfterViewRefreshedEvent(String catalogName, TableIdentifier viewIdentifier) + implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java index 2bcc49ab67..511f6f0976 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java @@ -26,10 +26,10 @@ * of this event relative to the validation checks we've performed, which means the commit may still * fail Polaris-side validation checks. * - * @param identifier The identifier. - * @param base The old metadata. - * @param metadata The new metadata. + * @param tableIdentifier The identifier. + * @param metadataBefore The old metadata. + * @param metadataAfter The new metadata. */ public record BeforeTableCommitedEvent( - TableIdentifier identifier, TableMetadata base, TableMetadata metadata) + TableIdentifier tableIdentifier, TableMetadata metadataBefore, TableMetadata metadataAfter) implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java index f319298f57..f5daade51d 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java @@ -24,6 +24,8 @@ * Emitted when Polaris intends to refresh its known version of a table's metadata by fetching the * latest. * + * @param catalogName The name of the catalog where the view is located. * @param tableIdentifier The identifier of the table being refreshed. */ -public record BeforeTableRefreshedEvent(TableIdentifier tableIdentifier) implements PolarisEvent {} +public record BeforeTableRefreshedEvent(String catalogName, TableIdentifier tableIdentifier) + implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java index a7fa7231e7..fcfa4400af 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java @@ -18,15 +18,11 @@ */ package org.apache.polaris.service.events; -import org.apache.polaris.core.context.CallContext; - /** * Emitted before an attempt of an async task, such as manifest file cleanup, begins. * * @param taskEntityId The ID of the TaskEntity - * @param callContext The CallContext the task is being executed under. * @param attempt The attempt number. Each retry of the task will have its own attempt number. The * initial (non-retried) attempt starts counting from 1. */ -public record BeforeTaskAttemptedEvent(long taskEntityId, CallContext callContext, int attempt) - implements PolarisEvent {} +public record BeforeTaskAttemptedEvent(long taskEntityId, int attempt) implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java index 16e460d806..c9303de71f 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java @@ -26,9 +26,11 @@ * this event relative to the validation checks we've performed, which means the commit may still * fail Polaris-side validation checks. * + * @param catalogName The name of the catalog where the view is located. * @param identifier The identifier. * @param base The old metadata. * @param metadata The new metadata. */ public record BeforeViewCommitedEvent( - TableIdentifier identifier, ViewMetadata base, ViewMetadata metadata) implements PolarisEvent {} + String catalogName, TableIdentifier identifier, ViewMetadata base, ViewMetadata metadata) + implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java index 6f58d2ca22..32b3250c72 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java @@ -24,6 +24,8 @@ * Emitted when Polaris intends to refresh its known version of a view's metadata by fetching the * latest. * + * @param catalogName The name of the catalog where the view is located. * @param viewIdentifier The identifier of the view being refreshed. */ -public record BeforeViewRefreshedEvent(TableIdentifier viewIdentifier) implements PolarisEvent {} +public record BeforeViewRefreshedEvent(String catalogName, TableIdentifier viewIdentifier) + implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEvent.java index 4922c02f4c..b5a84c4937 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEvent.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEvent.java @@ -18,8 +18,14 @@ */ package org.apache.polaris.service.events; +import java.util.UUID; + /** - * Represents an event emitted by Polaris. Currently there's no common data across events so this is - * just a marker interface. * + * Represents an event emitted by Polaris. Currently, there's no common data across events so this + * is just a marker interface. */ -public interface PolarisEvent {} +public interface PolarisEvent { + static String createEventId() { + return UUID.randomUUID().toString(); + } +} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListenerConfiguration.java b/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListenerConfiguration.java index 9b165ee3a5..56c12eac03 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListenerConfiguration.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListenerConfiguration.java @@ -20,13 +20,14 @@ import io.quarkus.runtime.annotations.StaticInitSafe; import io.smallrye.config.ConfigMapping; +import org.apache.polaris.service.events.listeners.PolarisEventListener; @StaticInitSafe @ConfigMapping(prefix = "polaris.event-listener") public interface PolarisEventListenerConfiguration { /** - * The type of the event listener to use. Must be a registered {@link - * org.apache.polaris.service.events.PolarisEventListener} identifier. + * The type of the event listener to use. Must be a registered {@link PolarisEventListener} + * identifier. */ String type(); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/ConcurrentLinkedQueueWithApproximateSize.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/ConcurrentLinkedQueueWithApproximateSize.java new file mode 100644 index 0000000000..3be25e1ad2 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/ConcurrentLinkedQueueWithApproximateSize.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; + +class ConcurrentLinkedQueueWithApproximateSize { + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + private final AtomicInteger size = new AtomicInteger(); + + public void add(T event) { + queue.add(event); + size.getAndIncrement(); + } + + public boolean isEmpty() { + return queue.isEmpty(); + } + + public T peek() { + return queue.peek(); + } + + public int size() { + return size.get(); + } + + public Stream stream() { + return queue.stream(); + } +} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferEventListenerConfiguration.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferEventListenerConfiguration.java new file mode 100644 index 0000000000..277676f10d --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferEventListenerConfiguration.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners; + +import io.quarkus.runtime.annotations.StaticInitSafe; +import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithDefault; +import io.smallrye.config.WithName; +import java.time.Duration; + +@StaticInitSafe +@ConfigMapping(prefix = "polaris.event-listener.persistence-in-memory-buffer") +public interface InMemoryBufferEventListenerConfiguration { + /** + * @return the buffer time in milliseconds + */ + @WithName("buffer-time") + @WithDefault("5000ms") + Duration bufferTime(); + + /** + * @return the maximum number of cached entries + */ + @WithName("max-buffer-size") + @WithDefault("5") + int maxBufferSize(); +} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java new file mode 100644 index 0000000000..bf8d56088e --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.SecurityContext; +import java.time.Clock; +import java.time.Duration; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.BasePersistence; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.core.persistence.PolarisMetaStoreManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + private static final String REQUEST_ID_KEY = "requestId"; + private final MetaStoreManagerFactory metaStoreManagerFactory; + + private final ConcurrentHashMap> + buffer = new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor; + private final ConcurrentHashMap> futures = new ConcurrentHashMap<>(); + private ScheduledFuture scheduledFuture; + private final Duration timeToFlush; + private final int maxBufferSize; + + @Inject CallContext callContext; + @Inject Clock clock; + @Context SecurityContext securityContext; + @Context ContainerRequestContext containerRequestContext; + + @Inject + public InMemoryBufferPolarisPersistenceEventListener( + MetaStoreManagerFactory metaStoreManagerFactory, + Clock clock, + InMemoryBufferEventListenerConfiguration eventListenerConfiguration) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.clock = clock; + this.timeToFlush = eventListenerConfiguration.bufferTime(); + this.maxBufferSize = eventListenerConfiguration.maxBufferSize(); + + executor = Executors.newSingleThreadScheduledExecutor(); + } + + @PostConstruct + void start() { + scheduledFuture = + executor.scheduleAtFixedRate( + this::runCleanup, 0, timeToFlush.toMillis(), TimeUnit.MILLISECONDS); + } + + void runCleanup() { + for (String realmId : buffer.keySet()) { + try { + checkAndFlushBufferIfNecessary(realmId, false); + } catch (Exception e) { + LOGGER.debug("Buffer checking task failed for realm ({}): {}", realmId, e); + } + } + } + + @PreDestroy + void shutdown() { + scheduledFuture.cancel(false); + futures.forEach((key, future) -> future.cancel(false)); + executor.shutdown(); + + try { + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + executor.shutdownNow(); + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + LOGGER.warn("Executor did not shut down cleanly"); + } + } + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } finally { + for (String realmId : buffer.keySet()) { + try { + checkAndFlushBufferIfNecessary(realmId, true); + } catch (Exception e) { + LOGGER.debug("Buffer flushing task failed for realm ({}): ", realmId, e); + } + } + } + } + + @Override + String getRequestId() { + if (containerRequestContext != null && containerRequestContext.hasProperty(REQUEST_ID_KEY)) { + return (String) containerRequestContext.getProperty(REQUEST_ID_KEY); + } + return null; + } + + @Override + void processEvent(PolarisEvent polarisEvent) { + String realmId = callContext.getRealmContext().getRealmIdentifier(); + + ConcurrentLinkedQueueWithApproximateSize realmQueue = + buffer.computeIfAbsent(realmId, k -> new ConcurrentLinkedQueueWithApproximateSize<>()); + realmQueue.add(polarisEvent); + if (realmQueue.size() >= maxBufferSize) { + futures.compute( + realmId, + (k, v) -> { + if (v == null || v.isDone()) { + return executor.submit(() -> checkAndFlushBufferIfNecessary(realmId, true)); + } + return v; + }); + } + } + + @VisibleForTesting + void checkAndFlushBufferIfNecessary(String realmId, boolean forceFlush) { + ConcurrentLinkedQueueWithApproximateSize queue = buffer.get(realmId); + if (queue == null || queue.isEmpty()) { + return; + } + + PolarisEvent head = queue.peek(); + if (head == null) { + return; + } + + Duration elapsed = Duration.ofMillis(clock.millis() - head.getTimestampMs()); + + if (elapsed.compareTo(timeToFlush) > 0 || queue.size() >= maxBufferSize || forceFlush) { + // Atomically replace old queue with new queue + boolean replaced = + buffer.replace(realmId, queue, new ConcurrentLinkedQueueWithApproximateSize<>()); + if (!replaced) { + // Another thread concurrently modified the buffer, so do not continue + return; + } + + RealmContext realmContext = () -> realmId; + PolarisMetaStoreManager metaStoreManager = + metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext); + BasePersistence basePersistence = metaStoreManagerFactory.getOrCreateSession(realmContext); + metaStoreManager.writeEvents( + new PolarisCallContext(realmContext, basePersistence), queue.stream().toList()); + + if (buffer.get(realmId).size() >= maxBufferSize) { + // Ensure that all events will be flushed, even if the race condition is triggered where + // new events are added between replacing the buffer above and the finishing of this method. + futures.put(realmId, executor.submit(() -> checkAndFlushBufferIfNecessary(realmId, true))); + } + } + } + + @Override + ContextSpecificInformation getContextSpecificInformation() { + return new ContextSpecificInformation( + clock.millis(), + securityContext.getUserPrincipal() == null + ? null + : securityContext.getUserPrincipal().getName()); + } +} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/NoOpPolarisEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/NoOpPolarisEventListener.java similarity index 95% rename from runtime/service/src/main/java/org/apache/polaris/service/events/NoOpPolarisEventListener.java rename to runtime/service/src/main/java/org/apache/polaris/service/events/listeners/NoOpPolarisEventListener.java index f31fbcef51..c02dfe4811 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/NoOpPolarisEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/NoOpPolarisEventListener.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.polaris.service.events; +package org.apache.polaris.service.events.listeners; import io.smallrye.common.annotation.Identifier; import jakarta.enterprise.context.ApplicationScoped; diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java similarity index 66% rename from runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java rename to runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java index 485766bb24..2d13194dcf 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java @@ -16,13 +16,28 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.polaris.service.events; +package org.apache.polaris.service.events.listeners; + +import org.apache.polaris.service.events.AfterCatalogCreatedEvent; +import org.apache.polaris.service.events.AfterTableCommitedEvent; +import org.apache.polaris.service.events.AfterTableCreatedEvent; +import org.apache.polaris.service.events.AfterTableRefreshedEvent; +import org.apache.polaris.service.events.AfterTaskAttemptedEvent; +import org.apache.polaris.service.events.AfterViewCommitedEvent; +import org.apache.polaris.service.events.AfterViewRefreshedEvent; +import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.BeforeTableCommitedEvent; +import org.apache.polaris.service.events.BeforeTableRefreshedEvent; +import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.BeforeViewCommitedEvent; +import org.apache.polaris.service.events.BeforeViewRefreshedEvent; /** * Represents an event listener that can respond to notable moments during Polaris's execution. * Event details are documented under the event objects themselves. */ public abstract class PolarisEventListener { + /** {@link BeforeRequestRateLimitedEvent} */ public void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent event) {} @@ -55,4 +70,10 @@ public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) {} /** {@link AfterTaskAttemptedEvent} */ public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {} + + /** {@link AfterTableCreatedEvent} */ + public void onAfterTableCreated(AfterTableCreatedEvent event) {} + + /** {@link AfterCatalogCreatedEvent} */ + public void onAfterCatalogCreated(AfterCatalogCreatedEvent event) {} } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java new file mode 100644 index 0000000000..d31beb45e6 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners; + +import java.util.Map; +import org.apache.iceberg.TableMetadataParser; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.service.events.AfterCatalogCreatedEvent; +import org.apache.polaris.service.events.AfterTableCommitedEvent; +import org.apache.polaris.service.events.AfterTableCreatedEvent; +import org.apache.polaris.service.events.AfterTableRefreshedEvent; +import org.apache.polaris.service.events.AfterTaskAttemptedEvent; +import org.apache.polaris.service.events.AfterViewCommitedEvent; +import org.apache.polaris.service.events.AfterViewRefreshedEvent; +import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.BeforeTableCommitedEvent; +import org.apache.polaris.service.events.BeforeTableRefreshedEvent; +import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.BeforeViewCommitedEvent; +import org.apache.polaris.service.events.BeforeViewRefreshedEvent; + +public abstract class PolarisPersistenceEventListener extends PolarisEventListener { + + // TODO: Ensure all events (except RateLimiter ones) call `addToBuffer` + @Override + public final void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent event) {} + + @Override + public void onBeforeTableCommited(BeforeTableCommitedEvent event) {} + + @Override + public void onAfterTableCommited(AfterTableCommitedEvent event) {} + + @Override + public void onBeforeViewCommited(BeforeViewCommitedEvent event) {} + + @Override + public void onAfterViewCommited(AfterViewCommitedEvent event) {} + + @Override + public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) {} + + @Override + public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {} + + @Override + public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) {} + + @Override + public void onAfterViewRefreshed(AfterViewRefreshedEvent event) {} + + @Override + public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) {} + + @Override + public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {} + + @Override + public void onAfterTableCreated(AfterTableCreatedEvent event) { + ContextSpecificInformation contextSpecificInformation = getContextSpecificInformation(); + PolarisEvent polarisEvent = + new PolarisEvent( + event.catalogName(), + org.apache.polaris.service.events.PolarisEvent.createEventId(), + getRequestId(), + event.getClass().getSimpleName(), + contextSpecificInformation.timestamp(), + contextSpecificInformation.principalName(), + PolarisEvent.ResourceType.TABLE, + event.identifier().toString()); + Map additionalParameters = + Map.of( + "table-uuid", + event.metadata().uuid(), + "metadata", + TableMetadataParser.toJson(event.metadata())); + polarisEvent.setAdditionalProperties(additionalParameters); + processEvent(polarisEvent); + } + + @Override + public void onAfterCatalogCreated(AfterCatalogCreatedEvent event) { + ContextSpecificInformation contextSpecificInformation = getContextSpecificInformation(); + PolarisEvent polarisEvent = + new PolarisEvent( + event.catalogName(), + org.apache.polaris.service.events.PolarisEvent.createEventId(), + getRequestId(), + event.getClass().getSimpleName(), + contextSpecificInformation.timestamp(), + contextSpecificInformation.principalName(), + PolarisEvent.ResourceType.CATALOG, + event.catalogName()); + processEvent(polarisEvent); + } + + protected record ContextSpecificInformation(long timestamp, String principalName) {} + + abstract ContextSpecificInformation getContextSpecificInformation(); + + abstract String getRequestId(); + + abstract void processEvent(PolarisEvent event); +} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/TestPolarisEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java similarity index 75% rename from runtime/service/src/main/java/org/apache/polaris/service/events/TestPolarisEventListener.java rename to runtime/service/src/main/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java index 2e2538e890..cf95452de3 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/TestPolarisEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java @@ -16,13 +16,25 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.polaris.service.events; +package org.apache.polaris.service.events.listeners; import com.google.common.collect.Streams; import io.smallrye.common.annotation.Identifier; import jakarta.enterprise.context.ApplicationScoped; import java.util.ArrayList; import java.util.List; +import org.apache.polaris.service.events.AfterTableCommitedEvent; +import org.apache.polaris.service.events.AfterTableRefreshedEvent; +import org.apache.polaris.service.events.AfterTaskAttemptedEvent; +import org.apache.polaris.service.events.AfterViewCommitedEvent; +import org.apache.polaris.service.events.AfterViewRefreshedEvent; +import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.BeforeTableCommitedEvent; +import org.apache.polaris.service.events.BeforeTableRefreshedEvent; +import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.BeforeViewCommitedEvent; +import org.apache.polaris.service.events.BeforeViewRefreshedEvent; +import org.apache.polaris.service.events.PolarisEvent; /** Event listener that stores all emitted events forever. Not recommended for use in production. */ @ApplicationScoped diff --git a/runtime/service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java b/runtime/service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java index 5b08e89bc2..61d27a0407 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java @@ -29,7 +29,7 @@ import java.io.IOException; import org.apache.polaris.service.config.FilterPriorities; import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; -import org.apache.polaris.service.events.PolarisEventListener; +import org.apache.polaris.service.events.listeners.PolarisEventListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java index 6ee681ead7..d425c16754 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java @@ -45,7 +45,7 @@ import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.service.events.AfterTaskAttemptedEvent; import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; -import org.apache.polaris.service.events.PolarisEventListener; +import org.apache.polaris.service.events.listeners.PolarisEventListener; import org.apache.polaris.service.tracing.TracingFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -143,8 +143,7 @@ public void addTaskHandlerContext(long taskEntityId, CallContext callContext) { } protected void handleTask(long taskEntityId, CallContext ctx, int attempt) { - polarisEventListener.onBeforeTaskAttempted( - new BeforeTaskAttemptedEvent(taskEntityId, ctx, attempt)); + polarisEventListener.onBeforeTaskAttempted(new BeforeTaskAttemptedEvent(taskEntityId, attempt)); boolean success = false; try { @@ -188,7 +187,7 @@ protected void handleTask(long taskEntityId, CallContext ctx, int attempt) { } } finally { polarisEventListener.onAfterTaskAttempted( - new AfterTaskAttemptedEvent(taskEntityId, ctx, attempt, success)); + new AfterTaskAttemptedEvent(taskEntityId, attempt, success)); } } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/admin/PolarisAuthzTestBase.java b/runtime/service/src/test/java/org/apache/polaris/service/admin/PolarisAuthzTestBase.java index fbdf066618..52c1051138 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/admin/PolarisAuthzTestBase.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/admin/PolarisAuthzTestBase.java @@ -28,6 +28,7 @@ import jakarta.enterprise.context.RequestScoped; import jakarta.enterprise.inject.Alternative; import jakarta.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; import jakarta.ws.rs.core.SecurityContext; import java.io.IOException; import java.util.Date; @@ -87,7 +88,7 @@ import org.apache.polaris.service.config.ReservedProperties; import org.apache.polaris.service.context.catalog.CallContextCatalogFactory; import org.apache.polaris.service.context.catalog.PolarisCallContextCatalogFactory; -import org.apache.polaris.service.events.PolarisEventListener; +import org.apache.polaris.service.events.listeners.PolarisEventListener; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; import org.apache.polaris.service.task.TaskExecutor; import org.apache.polaris.service.types.PolicyIdentifier; @@ -225,6 +226,10 @@ public void before(TestInfo testInfo) { RealmContext realmContext = testInfo::getDisplayName; QuarkusMock.installMockForType(realmContext, RealmContext.class); + ContainerRequestContext containerRequestContext = Mockito.mock(ContainerRequestContext.class); + Mockito.when(containerRequestContext.getProperty(Mockito.anyString())) + .thenReturn("request-id-1"); + QuarkusMock.installMockForType(containerRequestContext, ContainerRequestContext.class); metaStoreManager = managerFactory.getOrCreateMetaStoreManager(realmContext); userSecretsManager = userSecretsManagerFactory.getOrCreateUserSecretsManager(realmContext); diff --git a/runtime/service/src/test/java/org/apache/polaris/service/admin/PolarisServiceImplTest.java b/runtime/service/src/test/java/org/apache/polaris/service/admin/PolarisServiceImplTest.java index 09929bac23..1d22c48282 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/admin/PolarisServiceImplTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/admin/PolarisServiceImplTest.java @@ -42,6 +42,8 @@ import org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory; import org.apache.polaris.core.secrets.UserSecretsManagerFactory; import org.apache.polaris.service.config.ReservedProperties; +import org.apache.polaris.service.events.listeners.NoOpPolarisEventListener; +import org.apache.polaris.service.events.listeners.PolarisEventListener; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -56,6 +58,7 @@ public class PolarisServiceImplTest { private CallContext callContext; private ReservedProperties reservedProperties; private RealmConfig realmConfig; + private PolarisEventListener polarisEventListener; private PolarisServiceImpl polarisService; @@ -68,6 +71,7 @@ void setUp() { callContext = Mockito.mock(CallContext.class); reservedProperties = Mockito.mock(ReservedProperties.class); realmConfig = Mockito.mock(RealmConfig.class); + polarisEventListener = new NoOpPolarisEventListener(); when(callContext.getRealmConfig()).thenReturn(realmConfig); when(realmConfig.getConfig(FeatureConfiguration.SUPPORTED_CATALOG_CONNECTION_TYPES)) @@ -84,7 +88,8 @@ void setUp() { userSecretsManagerFactory, polarisAuthorizer, callContext, - reservedProperties); + reservedProperties, + polarisEventListener); } @Test diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/AbstractPolarisGenericTableCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/AbstractPolarisGenericTableCatalogTest.java index de4e569921..98a30f0d98 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/AbstractPolarisGenericTableCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/AbstractPolarisGenericTableCatalogTest.java @@ -69,7 +69,7 @@ import org.apache.polaris.service.catalog.io.DefaultFileIOFactory; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.config.ReservedProperties; -import org.apache.polaris.service.events.NoOpPolarisEventListener; +import org.apache.polaris.service.events.listeners.NoOpPolarisEventListener; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; import org.apache.polaris.service.task.TaskExecutor; import org.assertj.core.api.Assertions; diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java index 38811c39c8..9b81249f0b 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java @@ -141,8 +141,8 @@ import org.apache.polaris.service.events.AfterTableRefreshedEvent; import org.apache.polaris.service.events.BeforeTableCommitedEvent; import org.apache.polaris.service.events.BeforeTableRefreshedEvent; -import org.apache.polaris.service.events.PolarisEventListener; -import org.apache.polaris.service.events.TestPolarisEventListener; +import org.apache.polaris.service.events.listeners.PolarisEventListener; +import org.apache.polaris.service.events.listeners.TestPolarisEventListener; import org.apache.polaris.service.exception.FakeAzureHttpResponse; import org.apache.polaris.service.exception.IcebergExceptionMapper; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; @@ -2235,9 +2235,10 @@ public void testEventsAreEmitted() { Assertions.assertThat(afterRefreshEvent.tableIdentifier()).isEqualTo(TestData.TABLE); var beforeTableEvent = testPolarisEventListener.getLatest(BeforeTableCommitedEvent.class); - Assertions.assertThat(beforeTableEvent.identifier()).isEqualTo(TestData.TABLE); - Assertions.assertThat(beforeTableEvent.base().properties().get(key)).isEqualTo(valOld); - Assertions.assertThat(beforeTableEvent.metadata().properties().get(key)).isEqualTo(valNew); + Assertions.assertThat(beforeTableEvent.tableIdentifier()).isEqualTo(TestData.TABLE); + Assertions.assertThat(beforeTableEvent.metadataBefore().properties().get(key)) + .isEqualTo(valOld); + Assertions.assertThat(beforeTableEvent.metadataAfter().properties().get(key)).isEqualTo(valNew); var afterTableEvent = testPolarisEventListener.getLatest(AfterTableCommitedEvent.class); Assertions.assertThat(afterTableEvent.identifier()).isEqualTo(TestData.TABLE); diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java index d4edde2c45..f0e54ea3d0 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java @@ -66,8 +66,8 @@ import org.apache.polaris.service.events.AfterViewRefreshedEvent; import org.apache.polaris.service.events.BeforeViewCommitedEvent; import org.apache.polaris.service.events.BeforeViewRefreshedEvent; -import org.apache.polaris.service.events.PolarisEventListener; -import org.apache.polaris.service.events.TestPolarisEventListener; +import org.apache.polaris.service.events.listeners.PolarisEventListener; +import org.apache.polaris.service.events.listeners.TestPolarisEventListener; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; import org.apache.polaris.service.test.TestData; import org.assertj.core.api.Assertions; diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerAuthzTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerAuthzTest.java index 5211a514a7..d359a95444 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerAuthzTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerAuthzTest.java @@ -112,7 +112,8 @@ private IcebergCatalogHandler newWrapper( polarisAuthorizer, reservedProperties, catalogHandlerUtils, - emptyExternalCatalogFactory()); + emptyExternalCatalogFactory(), + polarisEventListener); } /** @@ -252,7 +253,8 @@ public void testInsufficientPermissionsPriorToSecretRotation() { polarisAuthorizer, reservedProperties, catalogHandlerUtils, - emptyExternalCatalogFactory()); + emptyExternalCatalogFactory(), + polarisEventListener); // a variety of actions are all disallowed because the principal's credentials must be rotated doTestInsufficientPrivileges( @@ -289,7 +291,8 @@ public void testInsufficientPermissionsPriorToSecretRotation() { polarisAuthorizer, reservedProperties, catalogHandlerUtils, - emptyExternalCatalogFactory()); + emptyExternalCatalogFactory(), + polarisEventListener); doTestSufficientPrivilegeSets( List.of(Set.of(PolarisPrivilege.NAMESPACE_LIST)), diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/AbstractPolicyCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/AbstractPolicyCatalogTest.java index e627f3cf59..6a603e3772 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/AbstractPolicyCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/AbstractPolicyCatalogTest.java @@ -81,7 +81,7 @@ import org.apache.polaris.service.catalog.io.DefaultFileIOFactory; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.config.ReservedProperties; -import org.apache.polaris.service.events.NoOpPolarisEventListener; +import org.apache.polaris.service.events.listeners.NoOpPolarisEventListener; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; import org.apache.polaris.service.task.TaskExecutor; import org.apache.polaris.service.types.ApplicablePolicy; diff --git a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java new file mode 100644 index 0000000000..026e741d40 --- /dev/null +++ b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import jakarta.ws.rs.container.ContainerRequestContext; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.core.persistence.PolarisMetaStoreManager; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.threeten.extra.MutableClock; + +public class InMemoryBufferPolarisPersistenceEventListenerTest { + private InMemoryBufferPolarisPersistenceEventListener eventListener; + private PolarisMetaStoreManager polarisMetaStoreManager; + private MutableClock clock; + private CallContext callContext; + + private static final int CONFIG_MAX_BUFFER_SIZE = 5; + private static final Duration CONFIG_TIME_TO_FLUSH_IN_MS = Duration.ofMillis(500); + + @BeforeEach + public void setUp() { + callContext = Mockito.mock(CallContext.class); + PolarisCallContext polarisCallContext = Mockito.mock(PolarisCallContext.class); + when(callContext.getPolarisCallContext()).thenReturn(polarisCallContext); + + MetaStoreManagerFactory metaStoreManagerFactory = Mockito.mock(MetaStoreManagerFactory.class); + polarisMetaStoreManager = Mockito.mock(PolarisMetaStoreManager.class); + when(metaStoreManagerFactory.getOrCreateMetaStoreManager(any())) + .thenReturn(polarisMetaStoreManager); + + InMemoryBufferEventListenerConfiguration eventListenerConfiguration = + Mockito.mock(InMemoryBufferEventListenerConfiguration.class); + when(eventListenerConfiguration.maxBufferSize()).thenReturn(CONFIG_MAX_BUFFER_SIZE); + when(eventListenerConfiguration.bufferTime()).thenReturn(CONFIG_TIME_TO_FLUSH_IN_MS); + + clock = + MutableClock.of( + Instant.ofEpochSecond(0), ZoneOffset.UTC); // Use 0 Epoch Time to make it easier to test + + eventListener = + new InMemoryBufferPolarisPersistenceEventListener( + metaStoreManagerFactory, clock, eventListenerConfiguration); + + eventListener.callContext = callContext; + } + + @Test + public void testProcessEventFlushesAfterConfiguredTime() { + String realmId = "realm1"; + List eventsAddedToBuffer = addEventsWithoutTriggeringFlush(realmId); + + // Push clock forwards to flush the buffer + clock.add(CONFIG_TIME_TO_FLUSH_IN_MS.multipliedBy(2)); + eventListener.checkAndFlushBufferIfNecessary(realmId, false); + verify(polarisMetaStoreManager, times(1)).writeEvents(any(), eq(eventsAddedToBuffer)); + } + + @Test + public void testProcessEventFlushesAfterMaxEvents() { + String realm1 = "realm1"; + List eventsAddedToBuffer = addEventsWithoutTriggeringFlush(realm1); + List eventsAddedToBufferRealm2 = addEventsWithoutTriggeringFlush("realm2"); + + // Add the last event for realm1 and verify that it did trigger the flush + PolarisEvent triggeringEvent = createSampleEvent(); + RealmContext realmContext = () -> realm1; + when(callContext.getRealmContext()).thenReturn(realmContext); + eventListener.processEvent(triggeringEvent); + eventsAddedToBuffer.add(triggeringEvent); + + // Calling checkAndFlushBufferIfNecessary manually to replicate the behavior of the executor + // service + eventListener.checkAndFlushBufferIfNecessary(realm1, false); + verify(polarisMetaStoreManager, times(1)).writeEvents(any(), eq(eventsAddedToBuffer)); + verify(polarisMetaStoreManager, times(0)).writeEvents(any(), eq(eventsAddedToBufferRealm2)); + } + + @Test + public void testCheckAndFlushBufferIfNecessaryIsThreadSafe() throws Exception { + String realmId = "realm1"; + int threadCount = 10; + List threads = new ArrayList<>(); + ConcurrentLinkedQueue exceptions = new ConcurrentLinkedQueue<>(); + + // Pre-populate the buffer with events + List events = addEventsWithoutTriggeringFlush(realmId); + + // Push clock forwards to flush the buffer + clock.add(CONFIG_TIME_TO_FLUSH_IN_MS.multipliedBy(2)); + + // Each thread will call checkAndFlushBufferIfNecessary concurrently + for (int i = 0; i < threadCount; i++) { + Thread t = + new Thread( + () -> { + try { + eventListener.checkAndFlushBufferIfNecessary(realmId, false); + } catch (Exception e) { + exceptions.add(e); + } + }); + threads.add(t); + } + // Start all threads + threads.forEach(Thread::start); + // Wait for all threads to finish + for (Thread t : threads) { + t.join(); + } + // There should be no exceptions + if (!exceptions.isEmpty()) { + throw new AssertionError( + "Exceptions occurred in concurrent checkAndFlushBufferIfNecessary: ", exceptions.peek()); + } + // Only one flush should occur + verify(polarisMetaStoreManager, times(1)).writeEvents(any(), eq(events)); + } + + @Execution(ExecutionMode.SAME_THREAD) + @Test + public void testProcessEventIsThreadSafe() throws Exception { + String realmId = "realm1"; + when(callContext.getRealmContext()).thenReturn(() -> realmId); + int threadCount = 10; + List threads = new ArrayList<>(); + ConcurrentLinkedQueue exceptions = new ConcurrentLinkedQueue<>(); + ConcurrentLinkedQueue allEvents = new ConcurrentLinkedQueue<>(); + eventListener.start(); + + for (int i = 0; i < threadCount; i++) { + Thread t = + new Thread( + () -> { + try { + for (int j = 0; j < 10; j++) { + PolarisEvent event = createSampleEvent(); + allEvents.add(event); + eventListener.processEvent(event); + } + } catch (Exception e) { + exceptions.add(e); + } + }); + threads.add(t); + } + + // Start all threads + threads.forEach(Thread::start); + // Wait for all threads to finish + for (Thread t : threads) { + t.join(); + } + // There should be no exceptions + if (!exceptions.isEmpty()) { + throw new AssertionError( + "Exceptions occurred in concurrent processEvent: ", exceptions.peek()); + } + + Awaitility.await("expected amount of records should be processed") + .atMost(Duration.ofSeconds(30)) + .pollDelay(Duration.ofMillis(500)) + .pollInterval(Duration.ofMillis(500)) + .untilAsserted( + () -> { + clock.add(500, ChronoUnit.MILLIS); + ArgumentCaptor> eventsCaptor = ArgumentCaptor.captor(); + verify(polarisMetaStoreManager, atLeastOnce()) + .writeEvents(any(), eventsCaptor.capture()); + List eventsProcessed = + eventsCaptor.getAllValues().stream().flatMap(List::stream).toList(); + if (eventsProcessed.size() > 100) { + eventsProcessed = new ArrayList<>(); + } + assertThat(eventsProcessed.size()).isGreaterThanOrEqualTo(allEvents.size()); + }); + ArgumentCaptor> eventsCaptor = ArgumentCaptor.captor(); + verify(polarisMetaStoreManager, atLeastOnce()).writeEvents(any(), eventsCaptor.capture()); + List seenEvents = + eventsCaptor.getAllValues().stream().flatMap(List::stream).toList(); + assertThat(seenEvents.size()).isEqualTo(allEvents.size()); + assertThat(seenEvents).hasSameElementsAs(allEvents); + } + + @Test + public void testRequestIdFunctionalityWithContainerRequestContext() { + // Test when containerRequestContext has requestId property + ContainerRequestContext mockContainerRequestContext = + Mockito.mock(ContainerRequestContext.class); + String expectedRequestId = "custom-request-id-123"; + + when(mockContainerRequestContext.hasProperty("requestId")).thenReturn(true); + when(mockContainerRequestContext.getProperty("requestId")).thenReturn(expectedRequestId); + + eventListener.containerRequestContext = mockContainerRequestContext; + + String actualRequestId = eventListener.getRequestId(); + assertThat(actualRequestId) + .as("Expected requestId '" + expectedRequestId + "' but got '" + actualRequestId + "'") + .isEqualTo(expectedRequestId); + } + + @Test + public void testRequestIdFunctionalityWithoutContainerRequestContext() { + // Test when containerRequestContext is null + try { + java.lang.reflect.Field field = + InMemoryBufferPolarisPersistenceEventListener.class.getDeclaredField( + "containerRequestContext"); + field.setAccessible(true); + field.set(eventListener, null); + } catch (Exception e) { + throw new RuntimeException("Failed to set containerRequestContext field", e); + } + + String requestId1 = eventListener.getRequestId(); + String requestId2 = eventListener.getRequestId(); + + assertThat(requestId1 == null).isTrue(); + assertThat(requestId2 == null).isTrue(); + } + + @Test + public void testRequestIdFunctionalityWithContainerRequestContextButNoProperty() { + // Test when containerRequestContext exists but doesn't have requestId property + ContainerRequestContext mockContainerRequestContext = + Mockito.mock(ContainerRequestContext.class); + when(mockContainerRequestContext.hasProperty("requestId")).thenReturn(false); + eventListener.containerRequestContext = mockContainerRequestContext; + + String requestId = eventListener.getRequestId(); + + assertThat(requestId == null).isTrue(); + + // Verify that getProperty was never called since hasProperty returned false + verify(mockContainerRequestContext, times(0)).getProperty("requestId"); + } + + private List addEventsWithoutTriggeringFlush(String realmId) { + List realmEvents = new ArrayList<>(); + for (int i = 0; i < CONFIG_MAX_BUFFER_SIZE - 1; i++) { + realmEvents.add(createSampleEvent()); + } + RealmContext realmContext = () -> realmId; + when(callContext.getRealmContext()).thenReturn(realmContext); + for (PolarisEvent realmEvent : realmEvents) { + eventListener.processEvent(realmEvent); + } + verify(polarisMetaStoreManager, times(0)).writeEvents(any(), any()); + return realmEvents; + } + + private PolarisEvent createSampleEvent() { + String catalogId = "test-catalog"; + String id = UUID.randomUUID().toString(); + String requestId = "test-request-id"; + String eventType = "TEST_EVENT"; + long timestampMs = 0; + String principalName = "test-user"; + PolarisEvent.ResourceType resourceType = PolarisEvent.ResourceType.TABLE; + String resourceIdentifier = "test-table"; + + PolarisEvent event = + new PolarisEvent( + catalogId, + id, + requestId, + eventType, + timestampMs, + principalName, + resourceType, + resourceIdentifier); + + Map additionalParams = new HashMap<>(); + additionalParams.put("testKey", "testValue"); + event.setAdditionalProperties(additionalParams); + + return event; + } +} diff --git a/runtime/service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimiterFilterTest.java b/runtime/service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimiterFilterTest.java index ff66a47255..d0c2a34f19 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimiterFilterTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimiterFilterTest.java @@ -33,8 +33,8 @@ import java.util.Set; import java.util.function.Consumer; import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; -import org.apache.polaris.service.events.PolarisEventListener; -import org.apache.polaris.service.events.TestPolarisEventListener; +import org.apache.polaris.service.events.listeners.PolarisEventListener; +import org.apache.polaris.service.events.listeners.TestPolarisEventListener; import org.apache.polaris.service.ratelimiter.RateLimiterFilterTest.Profile; import org.apache.polaris.service.test.PolarisIntegrationTestFixture; import org.apache.polaris.service.test.PolarisIntegrationTestHelper; diff --git a/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java b/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java index 03f9c88a3d..9ac16e75e9 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java @@ -27,7 +27,7 @@ import org.apache.polaris.service.TestServices; import org.apache.polaris.service.events.AfterTaskAttemptedEvent; import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; -import org.apache.polaris.service.events.TestPolarisEventListener; +import org.apache.polaris.service.events.listeners.TestPolarisEventListener; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -82,7 +82,6 @@ public boolean handleTask(TaskEntity task, CallContext callContext) { var beforeTaskAttemptedEvent = testPolarisEventListener.getLatest(BeforeTaskAttemptedEvent.class); Assertions.assertEquals(taskEntity.getId(), beforeTaskAttemptedEvent.taskEntityId()); - Assertions.assertEquals(callContext, beforeTaskAttemptedEvent.callContext()); Assertions.assertEquals(attempt, beforeTaskAttemptedEvent.attempt()); return true; } @@ -91,8 +90,8 @@ public boolean handleTask(TaskEntity task, CallContext callContext) { executor.handleTask(taskEntity.getId(), polarisCallCtx, attempt); var afterAttemptTaskEvent = testPolarisEventListener.getLatest(AfterTaskAttemptedEvent.class); + Assertions.assertEquals(taskEntity.getId(), afterAttemptTaskEvent.taskEntityId()); - Assertions.assertEquals(polarisCallCtx, afterAttemptTaskEvent.callContext()); Assertions.assertEquals(attempt, afterAttemptTaskEvent.attempt()); Assertions.assertTrue(afterAttemptTaskEvent.success()); } diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java index 93bf2c5384..11ee88b0d0 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -68,8 +68,8 @@ import org.apache.polaris.service.config.ReservedProperties; import org.apache.polaris.service.context.catalog.CallContextCatalogFactory; import org.apache.polaris.service.context.catalog.PolarisCallContextCatalogFactory; -import org.apache.polaris.service.events.PolarisEventListener; -import org.apache.polaris.service.events.TestPolarisEventListener; +import org.apache.polaris.service.events.listeners.PolarisEventListener; +import org.apache.polaris.service.events.listeners.TestPolarisEventListener; import org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory; import org.apache.polaris.service.secrets.UnsafeInMemorySecretsManagerFactory; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; @@ -237,7 +237,8 @@ public TestServices build() { new DefaultCatalogPrefixParser(), reservedProperties, catalogHandlerUtils, - externalCatalogFactory); + externalCatalogFactory, + polarisEventListener); IcebergRestCatalogApi restApi = new IcebergRestCatalogApi(catalogService); IcebergRestConfigurationApi restConfigurationApi = @@ -286,7 +287,8 @@ public String getAuthenticationScheme() { userSecretsManagerFactory, authorizer, callContext, - reservedProperties)); + reservedProperties, + polarisEventListener)); return new TestServices( clock,