diff --git a/quarkus/defaults/src/main/resources/application.properties b/quarkus/defaults/src/main/resources/application.properties index d9b82239e3..0482f3de03 100644 --- a/quarkus/defaults/src/main/resources/application.properties +++ b/quarkus/defaults/src/main/resources/application.properties @@ -124,6 +124,8 @@ polaris.secrets-manager.type=in-memory polaris.file-io.type=default +polaris.event-listener.type=no-op + polaris.log.request-id-header-name=Polaris-Request-Id # polaris.log.mdc.aid=polaris # polaris.log.mdc.sid=polaris-service diff --git a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/ProductionReadinessChecks.java b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/ProductionReadinessChecks.java index 5d9524f4c6..2389537f17 100644 --- a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/ProductionReadinessChecks.java +++ b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/ProductionReadinessChecks.java @@ -36,6 +36,8 @@ import org.apache.polaris.service.context.DefaultRealmContextResolver; import org.apache.polaris.service.context.RealmContextResolver; import org.apache.polaris.service.context.TestRealmContextResolver; +import org.apache.polaris.service.events.PolarisEventListener; +import org.apache.polaris.service.events.TestPolarisEventListener; import org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory; import org.apache.polaris.service.quarkus.auth.QuarkusAuthenticationConfiguration; import org.eclipse.microprofile.config.Config; @@ -161,6 +163,16 @@ public ProductionReadinessCheck checkRealmResolver(Config config, RealmContextRe return ProductionReadinessCheck.OK; } + @Produces + public ProductionReadinessCheck checkPolarisEventListener( + PolarisEventListener polarisEventListener) { + if (polarisEventListener instanceof TestPolarisEventListener) { + return ProductionReadinessCheck.of( + Error.of("TestPolarisEventListener is intended for tests only.", "polaris.events.type")); + } + return ProductionReadinessCheck.OK; + } + private static String authRealmSegment(String realm) { return realm.equals(QuarkusAuthenticationConfiguration.DEFAULT_REALM_KEY) ? "" : realm + "."; } diff --git a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java index 17672ce2bc..0510bcc5d9 100644 --- a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java +++ b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java @@ -61,11 +61,13 @@ import org.apache.polaris.service.context.RealmContextConfiguration; import org.apache.polaris.service.context.RealmContextFilter; import org.apache.polaris.service.context.RealmContextResolver; +import org.apache.polaris.service.events.PolarisEventListener; import org.apache.polaris.service.quarkus.auth.QuarkusAuthenticationConfiguration; import org.apache.polaris.service.quarkus.auth.QuarkusAuthenticationRealmConfiguration; import org.apache.polaris.service.quarkus.auth.external.tenant.OidcTenantResolver; import org.apache.polaris.service.quarkus.catalog.io.QuarkusFileIOConfiguration; import org.apache.polaris.service.quarkus.context.QuarkusRealmContextConfiguration; +import org.apache.polaris.service.quarkus.events.QuarkusPolarisEventListenerConfiguration; import org.apache.polaris.service.quarkus.persistence.QuarkusPersistenceConfiguration; import org.apache.polaris.service.quarkus.ratelimiter.QuarkusRateLimiterFilterConfiguration; import org.apache.polaris.service.quarkus.ratelimiter.QuarkusTokenBucketConfiguration; @@ -151,6 +153,13 @@ public FileIOFactory fileIOFactory( return fileIOFactories.select(Identifier.Literal.of(config.type())).get(); } + @Produces + public PolarisEventListener polarisEventListener( + QuarkusPolarisEventListenerConfiguration config, + @Any Instance polarisEventListeners) { + return polarisEventListeners.select(Identifier.Literal.of(config.type())).get(); + } + @Produces public MetaStoreManagerFactory metaStoreManagerFactory( QuarkusPersistenceConfiguration config, diff --git a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/events/QuarkusPolarisEventListenerConfiguration.java b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/events/QuarkusPolarisEventListenerConfiguration.java new file mode 100644 index 0000000000..8921c726c6 --- /dev/null +++ b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/events/QuarkusPolarisEventListenerConfiguration.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.quarkus.events; + +import io.quarkus.runtime.annotations.StaticInitSafe; +import io.smallrye.config.ConfigMapping; + +@StaticInitSafe +@ConfigMapping(prefix = "polaris.event-listener") +public interface QuarkusPolarisEventListenerConfiguration { + /** + * The type of the event listener to use. Must be a registered {@link + * org.apache.polaris.service.events.PolarisEventListener} identifier. + */ + String type(); +} diff --git a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java index 8f38de648e..3e16edb5a6 100644 --- a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java +++ b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java @@ -29,6 +29,7 @@ import java.util.concurrent.ExecutorService; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.service.events.PolarisEventListener; import org.apache.polaris.service.quarkus.tracing.QuarkusTracingFilter; import org.apache.polaris.service.task.TaskExecutorImpl; import org.apache.polaris.service.task.TaskFileIOSupplier; @@ -39,7 +40,7 @@ public class QuarkusTaskExecutorImpl extends TaskExecutorImpl { private final Tracer tracer; public QuarkusTaskExecutorImpl() { - this(null, null, null, null); + this(null, null, null, null, null); } @Inject @@ -47,8 +48,9 @@ public QuarkusTaskExecutorImpl( @Identifier("task-executor") ExecutorService executorService, MetaStoreManagerFactory metaStoreManagerFactory, TaskFileIOSupplier fileIOSupplier, - Tracer tracer) { - super(executorService, metaStoreManagerFactory, fileIOSupplier); + Tracer tracer, + PolarisEventListener polarisEventListener) { + super(executorService, metaStoreManagerFactory, fileIOSupplier, polarisEventListener); this.tracer = tracer; } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java index 6f3498d319..5761812f4c 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java @@ -88,6 +88,7 @@ import org.apache.polaris.service.config.RealmEntityManagerFactory; import org.apache.polaris.service.context.CallContextCatalogFactory; import org.apache.polaris.service.context.PolarisCallContextCatalogFactory; +import org.apache.polaris.service.events.PolarisEventListener; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; import org.apache.polaris.service.task.TaskExecutor; import org.apache.polaris.service.types.PolicyIdentifier; @@ -188,6 +189,7 @@ public Map getConfigOverrides() { @Inject protected PolarisDiagnostics diagServices; @Inject protected Clock clock; @Inject protected FileIOFactory fileIOFactory; + @Inject protected PolarisEventListener polarisEventListener; protected IcebergCatalog baseCatalog; protected GenericTableCatalog genericTableCatalog; @@ -469,7 +471,8 @@ private void initBaseCatalog() { passthroughView, securityContext, Mockito.mock(), - fileIOFactory); + fileIOFactory, + polarisEventListener); this.baseCatalog.initialize( CATALOG_NAME, ImmutableMap.of( @@ -485,7 +488,7 @@ public static class TestPolarisCallContextCatalogFactory extends PolarisCallContextCatalogFactory { public TestPolarisCallContextCatalogFactory() { - super(null, null, null, null, null); + super(null, null, null, null, null, null); } @Inject @@ -494,13 +497,15 @@ public TestPolarisCallContextCatalogFactory( MetaStoreManagerFactory metaStoreManagerFactory, UserSecretsManagerFactory userSecretsManagerFactory, TaskExecutor taskExecutor, - FileIOFactory fileIOFactory) { + FileIOFactory fileIOFactory, + PolarisEventListener polarisEventListener) { super( entityManagerFactory, metaStoreManagerFactory, userSecretsManagerFactory, taskExecutor, - fileIOFactory); + fileIOFactory, + polarisEventListener); } @Override diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/GenericTableCatalogTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/GenericTableCatalogTest.java index 96b1010b43..0b6af6e8f4 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/GenericTableCatalogTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/GenericTableCatalogTest.java @@ -81,6 +81,7 @@ import org.apache.polaris.service.catalog.io.DefaultFileIOFactory; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.config.RealmEntityManagerFactory; +import org.apache.polaris.service.events.NoOpPolarisEventListener; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; import org.apache.polaris.service.task.TaskExecutor; import org.assertj.core.api.Assertions; @@ -264,7 +265,8 @@ public void before(TestInfo testInfo) { passthroughView, securityContext, taskExecutor, - fileIOFactory); + fileIOFactory, + new NoOpPolarisEventListener()); this.icebergCatalog.initialize( CATALOG_NAME, ImmutableMap.of( diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java index 11a938f316..e74b7d6412 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java @@ -1808,7 +1808,8 @@ public PolarisEntityManager getOrCreateEntityManager(RealmContext realmContext) userSecretsManagerFactory, Mockito.mock(), new DefaultFileIOFactory( - realmEntityManagerFactory, managerFactory, new PolarisConfigurationStore() {})) { + realmEntityManagerFactory, managerFactory, new PolarisConfigurationStore() {}), + polarisEventListener) { @Override public Catalog createCallContextCatalog( CallContext context, diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java index 5a83330a5b..4f77a90dac 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java @@ -19,7 +19,6 @@ package org.apache.polaris.service.quarkus.catalog; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.iceberg.types.Types.NestedField.required; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.doReturn; @@ -116,8 +115,15 @@ import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory; import org.apache.polaris.service.config.RealmEntityManagerFactory; +import org.apache.polaris.service.events.AfterTableCommitedEvent; +import org.apache.polaris.service.events.AfterTableRefreshedEvent; +import org.apache.polaris.service.events.BeforeTableCommitedEvent; +import org.apache.polaris.service.events.BeforeTableRefreshedEvent; +import org.apache.polaris.service.events.PolarisEventListener; +import org.apache.polaris.service.events.TestPolarisEventListener; import org.apache.polaris.service.exception.FakeAzureHttpResponse; import org.apache.polaris.service.exception.IcebergExceptionMapper; +import org.apache.polaris.service.quarkus.test.TestData; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; import org.apache.polaris.service.task.TableCleanupTaskHandler; import org.apache.polaris.service.task.TaskExecutor; @@ -164,17 +170,14 @@ public Map getConfigOverrides() { "polaris.features.defaults.\"INITIALIZE_DEFAULT_CATALOG_FILEIO_FOR_TEST\"", "true", "polaris.features.defaults.\"SUPPORTED_CATALOG_STORAGE_TYPES\"", - "[\"FILE\"]"); + "[\"FILE\"]", + "polaris.event-listener.type", + "test"); } } - protected static final Namespace NS = Namespace.of("newdb"); - protected static final TableIdentifier TABLE = TableIdentifier.of(NS, "table"); - protected static final Schema SCHEMA = - new Schema( - required(3, "id", Types.IntegerType.get(), "unique ID 🤪"), - required(4, "data", Types.StringType.get())); private static final String VIEW_QUERY = "select * from ns1.layer1_table"; + public static final String CATALOG_NAME = "polaris-catalog"; public static final String TEST_ACCESS_KEY = "test_access_key"; public static final String SECRET_ACCESS_KEY = "secret_access_key"; @@ -198,6 +201,7 @@ public Map getConfigOverrides() { @Inject PolarisStorageIntegrationProvider storageIntegrationProvider; @Inject UserSecretsManagerFactory userSecretsManagerFactory; @Inject PolarisDiagnostics diagServices; + @Inject PolarisEventListener polarisEventListener; private IcebergCatalog catalog; private CallContext callContext; @@ -210,6 +214,7 @@ public Map getConfigOverrides() { private FileIOFactory fileIOFactory; private PolarisEntity catalogEntity; private SecurityContext securityContext; + private TestPolarisEventListener testPolarisEventListener; @BeforeAll public static void setUpMocks() { @@ -319,6 +324,7 @@ public void before(TestInfo testInfo) { .thenReturn((PolarisStorageIntegration) storageIntegration); this.catalog = initCatalog("my-catalog", ImmutableMap.of()); + testPolarisEventListener = (TestPolarisEventListener) polarisEventListener; } @AfterEach @@ -354,7 +360,8 @@ protected IcebergCatalog initCatalog( passthroughView, securityContext, taskExecutor, - fileIOFactory); + fileIOFactory, + polarisEventListener); ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder() .put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO") @@ -649,7 +656,8 @@ public void testValidateNotificationFailToCreateFileIO() { passthroughView, securityContext, Mockito.mock(TaskExecutor.class), - fileIOFactory); + fileIOFactory, + polarisEventListener); catalog.initialize( CATALOG_NAME, ImmutableMap.of( @@ -975,7 +983,8 @@ public void testUpdateNotificationCreateTableWithLocalFilePrefix() { passthroughView, securityContext, taskExecutor, - fileIOFactory); + fileIOFactory, + polarisEventListener); catalog.initialize( catalogWithoutStorage, ImmutableMap.of( @@ -1041,7 +1050,8 @@ public void testUpdateNotificationCreateTableWithHttpPrefix() { passthroughView, securityContext, taskExecutor, - fileIOFactory); + fileIOFactory, + polarisEventListener); catalog.initialize( catalogName, ImmutableMap.of( @@ -1587,7 +1597,8 @@ public void testDropTableWithPurgeDisabled() { passthroughView, securityContext, Mockito.mock(), - fileIOFactory); + fileIOFactory, + polarisEventListener); noPurgeCatalog.initialize( noPurgeCatalogName, ImmutableMap.of( @@ -1695,7 +1706,8 @@ public void testFileIOWrapper() { passthroughView, securityContext, Mockito.mock(), - measured); + measured, + polarisEventListener); catalog.initialize( CATALOG_NAME, ImmutableMap.of( @@ -1705,8 +1717,8 @@ public void testFileIOWrapper() { .as("Nothing was created yet") .isEqualTo(0); - catalog.createNamespace(NS); - Table table = catalog.buildTable(TABLE, SCHEMA).create(); + catalog.createNamespace(TestData.NAMESPACE); + Table table = catalog.buildTable(TestData.TABLE, TestData.SCHEMA).create(); // Asserting greaterThan 0 is sufficient for validating that the wrapper works without making // assumptions about the @@ -1718,7 +1730,9 @@ public void testFileIOWrapper() { .as("A table was read and written, but a trip to storage was made") .isEqualTo(0); - Assertions.assertThat(catalog.dropTable(TABLE)).as("Table deletion should succeed").isTrue(); + Assertions.assertThat(catalog.dropTable(TestData.TABLE)) + .as("Table deletion should succeed") + .isTrue(); TaskEntity taskEntity = TaskEntity.of( metaStoreManager @@ -1792,7 +1806,8 @@ public void testConcurrencyConflictCreateTableUpdatedDuringFinalTransaction() { passthroughView, securityContext, Mockito.mock(TaskExecutor.class), - fileIOFactory); + fileIOFactory, + polarisEventListener); catalog.initialize( CATALOG_NAME, ImmutableMap.of( @@ -1840,7 +1855,8 @@ public void testConcurrencyConflictUpdateTableDuringFinalTransaction() { passthroughView, securityContext, Mockito.mock(TaskExecutor.class), - fileIOFactory); + fileIOFactory, + polarisEventListener); catalog.initialize( CATALOG_NAME, ImmutableMap.of( @@ -1915,6 +1931,35 @@ public void testTableOperationsDoesNotRefreshAfterCommit(boolean updateMetadataO } } + @Test + public void testEventsAreEmitted() { + IcebergCatalog catalog = catalog(); + catalog.createNamespace(TestData.NAMESPACE); + Table table = catalog.buildTable(TestData.TABLE, TestData.SCHEMA).create(); + + String key = "foo"; + String valOld = "bar1"; + String valNew = "bar2"; + table.updateProperties().set(key, valOld).commit(); + table.updateProperties().set(key, valNew).commit(); + + var beforeRefreshEvent = testPolarisEventListener.getLatest(BeforeTableRefreshedEvent.class); + Assertions.assertThat(beforeRefreshEvent.tableIdentifier()).isEqualTo(TestData.TABLE); + + var afterRefreshEvent = testPolarisEventListener.getLatest(AfterTableRefreshedEvent.class); + Assertions.assertThat(afterRefreshEvent.tableIdentifier()).isEqualTo(TestData.TABLE); + + var beforeTableEvent = testPolarisEventListener.getLatest(BeforeTableCommitedEvent.class); + Assertions.assertThat(beforeTableEvent.identifier()).isEqualTo(TestData.TABLE); + Assertions.assertThat(beforeTableEvent.base().properties().get(key)).isEqualTo(valOld); + Assertions.assertThat(beforeTableEvent.metadata().properties().get(key)).isEqualTo(valNew); + + var afterTableEvent = testPolarisEventListener.getLatest(AfterTableCommitedEvent.class); + Assertions.assertThat(afterTableEvent.identifier()).isEqualTo(TestData.TABLE); + Assertions.assertThat(afterTableEvent.base().properties().get(key)).isEqualTo(valOld); + Assertions.assertThat(afterTableEvent.metadata().properties().get(key)).isEqualTo(valNew); + } + private static InMemoryFileIO getInMemoryIo(IcebergCatalog catalog) { return (InMemoryFileIO) ((ExceptionMappingFileIO) catalog.getIo()).getInnerIo(); } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogViewTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogViewTest.java index 25089d5c4f..c66e88b371 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogViewTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogViewTest.java @@ -37,6 +37,7 @@ import java.util.Set; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.view.View; import org.apache.iceberg.view.ViewCatalogTests; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.PolarisDiagnostics; @@ -67,12 +68,21 @@ import org.apache.polaris.service.catalog.io.DefaultFileIOFactory; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.config.RealmEntityManagerFactory; +import org.apache.polaris.service.events.AfterViewCommitedEvent; +import org.apache.polaris.service.events.AfterViewRefreshedEvent; +import org.apache.polaris.service.events.BeforeViewCommitedEvent; +import org.apache.polaris.service.events.BeforeViewRefreshedEvent; +import org.apache.polaris.service.events.PolarisEventListener; +import org.apache.polaris.service.events.TestPolarisEventListener; +import org.apache.polaris.service.quarkus.test.TestData; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; +import org.assertj.core.api.Assertions; import org.assertj.core.api.Assumptions; import org.assertj.core.configuration.PreferredAssumptionException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.io.TempDir; import org.mockito.Mockito; @@ -98,7 +108,9 @@ public Map getConfigOverrides() { "polaris.features.defaults.\"INITIALIZE_DEFAULT_CATALOG_FILEIO_FOR_TEST\"", "true", "polaris.features.defaults.\"SUPPORTED_CATALOG_STORAGE_TYPES\"", - "[\"FILE\"]"); + "[\"FILE\"]", + "polaris.event-listener.type", + "test"); } } @@ -116,6 +128,7 @@ public Map getConfigOverrides() { @Inject UserSecretsManagerFactory userSecretsManagerFactory; @Inject PolarisConfigurationStore configurationStore; @Inject PolarisDiagnostics diagServices; + @Inject PolarisEventListener polarisEventListener; private IcebergCatalog catalog; @@ -124,6 +137,8 @@ public Map getConfigOverrides() { private UserSecretsManager userSecretsManager; private PolarisCallContext polarisContext; + private TestPolarisEventListener testPolarisEventListener; + @BeforeAll public static void setUpMocks() { PolarisStorageIntegrationProviderImpl mock = @@ -212,6 +227,8 @@ public void before(TestInfo testInfo) { FileIOFactory fileIOFactory = new DefaultFileIOFactory( new RealmEntityManagerFactory(managerFactory), managerFactory, configurationStore); + + testPolarisEventListener = (TestPolarisEventListener) polarisEventListener; this.catalog = new IcebergCatalog( entityManager, @@ -220,7 +237,8 @@ public void before(TestInfo testInfo) { passthroughView, securityContext, Mockito.mock(), - fileIOFactory); + fileIOFactory, + polarisEventListener); Map properties = ImmutableMap.builder() .put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO") @@ -249,4 +267,39 @@ protected Catalog tableCatalog() { protected boolean requiresNamespaceCreate() { return true; } + + @Test + public void testEventsAreEmitted() { + IcebergCatalog catalog = catalog(); + catalog.createNamespace(TestData.NAMESPACE); + View view = + catalog + .buildView(TestData.TABLE) + .withDefaultNamespace(TestData.NAMESPACE) + .withSchema(TestData.SCHEMA) + .withQuery("a", "b") + .create(); + + String key = "foo"; + String valOld = "bar1"; + String valNew = "bar2"; + view.updateProperties().set(key, valOld).commit(); + view.updateProperties().set(key, valNew).commit(); + + var beforeRefreshEvent = testPolarisEventListener.getLatest(BeforeViewRefreshedEvent.class); + Assertions.assertThat(beforeRefreshEvent.viewIdentifier()).isEqualTo(TestData.TABLE); + + var afterRefreshEvent = testPolarisEventListener.getLatest(AfterViewRefreshedEvent.class); + Assertions.assertThat(afterRefreshEvent.viewIdentifier()).isEqualTo(TestData.TABLE); + + var beforeCommitEvent = testPolarisEventListener.getLatest(BeforeViewCommitedEvent.class); + Assertions.assertThat(beforeCommitEvent.identifier()).isEqualTo(TestData.TABLE); + Assertions.assertThat(beforeCommitEvent.base().properties().get(key)).isEqualTo(valOld); + Assertions.assertThat(beforeCommitEvent.metadata().properties().get(key)).isEqualTo(valNew); + + var afterCommitEvent = testPolarisEventListener.getLatest(AfterViewCommitedEvent.class); + Assertions.assertThat(afterCommitEvent.identifier()).isEqualTo(TestData.TABLE); + Assertions.assertThat(afterCommitEvent.base().properties().get(key)).isEqualTo(valOld); + Assertions.assertThat(afterCommitEvent.metadata().properties().get(key)).isEqualTo(valNew); + } } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java index a5bf98ba99..68e2c35e08 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java @@ -94,6 +94,7 @@ import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.catalog.policy.PolicyCatalog; import org.apache.polaris.service.config.RealmEntityManagerFactory; +import org.apache.polaris.service.events.NoOpPolarisEventListener; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; import org.apache.polaris.service.task.TaskExecutor; import org.apache.polaris.service.types.ApplicablePolicy; @@ -287,7 +288,8 @@ public void before(TestInfo testInfo) { passthroughView, securityContext, taskExecutor, - fileIOFactory); + fileIOFactory, + new NoOpPolarisEventListener()); this.icebergCatalog.initialize( CATALOG_NAME, ImmutableMap.of( diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/ratelimiter/RateLimiterFilterTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/ratelimiter/RateLimiterFilterTest.java index a5f66ec5be..48aa450019 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/ratelimiter/RateLimiterFilterTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/ratelimiter/RateLimiterFilterTest.java @@ -31,6 +31,9 @@ import java.util.Map; import java.util.Set; import java.util.function.Consumer; +import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.PolarisEventListener; +import org.apache.polaris.service.events.TestPolarisEventListener; import org.apache.polaris.service.quarkus.ratelimiter.RateLimiterFilterTest.Profile; import org.apache.polaris.service.quarkus.test.PolarisIntegrationTestFixture; import org.apache.polaris.service.quarkus.test.PolarisIntegrationTestHelper; @@ -81,7 +84,9 @@ public Map getConfigOverrides() { "polaris.authentication.token-broker.type", "symmetric-key", "polaris.authentication.token-broker.symmetric-key.secret", - "secret"); + "secret", + "polaris.event-listener.type", + "test"); } } @@ -90,6 +95,7 @@ public Map getConfigOverrides() { @Inject PolarisIntegrationTestHelper helper; @Inject MeterRegistry meterRegistry; + @Inject PolarisEventListener polarisEventListener; private TestEnvironment testEnv; private PolarisIntegrationTestFixture fixture; @@ -145,6 +151,11 @@ public void testMetricsAreEmittedWhenRateLimiting() { } requestAsserter.accept(Status.TOO_MANY_REQUESTS); + BeforeRequestRateLimitedEvent event = + ((TestPolarisEventListener) polarisEventListener) + .getLatest(BeforeRequestRateLimitedEvent.class); + assertThat(event.method()).isEqualTo("GET"); + // Examples of expected metrics: // http_server_requests_seconds_count{application="Polaris",environment="prod",method="GET",outcome="CLIENT_ERROR",realm_id="org_apache_polaris_service_ratelimiter_RateLimiterFilterTest",status="429",uri="/api/management/v1/principal-roles"} 1.0 // polaris_principal_roles_listPrincipalRoles_seconds_count{application="Polaris",class="org.apache.polaris.service.admin.api.PolarisPrincipalRolesApi",environment="prod",exception="none",method="listPrincipalRoles"} 50.0 diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/TestData.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/TestData.java new file mode 100644 index 0000000000..8c061f2016 --- /dev/null +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/TestData.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.quarkus.test; + +import static org.apache.iceberg.types.Types.NestedField.required; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.types.Types; + +/** Contains test data that can be reused across tests */ +public class TestData { + public static final Namespace NAMESPACE = Namespace.of("newdb"); + public static final TableIdentifier TABLE = TableIdentifier.of(NAMESPACE, "table"); + public static final Schema SCHEMA = + new Schema( + required(3, "id", Types.IntegerType.get(), "unique ID 🤪"), + required(4, "data", Types.StringType.get())); +} diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index 27ef12c3d7..501a4b4ab0 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -122,6 +122,15 @@ import org.apache.polaris.service.catalog.SupportsNotifications; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.catalog.io.FileIOUtil; +import org.apache.polaris.service.events.AfterTableCommitedEvent; +import org.apache.polaris.service.events.AfterTableRefreshedEvent; +import org.apache.polaris.service.events.AfterViewCommitedEvent; +import org.apache.polaris.service.events.AfterViewRefreshedEvent; +import org.apache.polaris.service.events.BeforeTableCommitedEvent; +import org.apache.polaris.service.events.BeforeTableRefreshedEvent; +import org.apache.polaris.service.events.BeforeViewCommitedEvent; +import org.apache.polaris.service.events.BeforeViewRefreshedEvent; +import org.apache.polaris.service.events.PolarisEventListener; import org.apache.polaris.service.task.TaskExecutor; import org.apache.polaris.service.types.NotificationRequest; import org.apache.polaris.service.types.NotificationType; @@ -170,6 +179,8 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog private final CatalogEntity catalogEntity; private final TaskExecutor taskExecutor; private final SecurityContext securityContext; + private final PolarisEventListener polarisEventListener; + private String ioImplClassName; private FileIO catalogFileIO; private final String catalogName; @@ -196,7 +207,8 @@ public IcebergCatalog( PolarisResolutionManifestCatalogView resolvedEntityView, SecurityContext securityContext, TaskExecutor taskExecutor, - FileIOFactory fileIOFactory) { + FileIOFactory fileIOFactory, + PolarisEventListener polarisEventListener) { this.entityManager = entityManager; this.callContext = callContext; this.resolvedEntityView = resolvedEntityView; @@ -208,6 +220,7 @@ public IcebergCatalog( this.catalogName = catalogEntity.getName(); this.fileIOFactory = fileIOFactory; this.metaStoreManager = metaStoreManager; + this.polarisEventListener = polarisEventListener; } @Override @@ -1318,6 +1331,7 @@ public void doRefresh() { if (latestLocation == null) { disableRefresh(); } else { + polarisEventListener.onBeforeTableRefreshed(new BeforeTableRefreshedEvent(tableIdentifier)); refreshFromMetadataLocation( latestLocation, SHOULD_RETRY_REFRESH_PREDICATE, @@ -1337,10 +1351,14 @@ public void doRefresh() { Set.of(PolarisStorageActions.READ)); return TableMetadataParser.read(fileIO, metadataLocation); }); + polarisEventListener.onAfterTableRefreshed(new AfterTableRefreshedEvent(tableIdentifier)); } } public void doCommit(TableMetadata base, TableMetadata metadata) { + polarisEventListener.onBeforeTableCommited( + new BeforeTableCommitedEvent(tableIdentifier, base, metadata)); + LOGGER.debug( "doCommit for table {} with base {}, metadata {}", tableIdentifier, base, metadata); // TODO: Maybe avoid writing metadata if there's definitely a transaction conflict @@ -1493,6 +1511,9 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { } else { updateTableLike(tableIdentifier, entity); } + + polarisEventListener.onAfterTableCommited( + new AfterTableCommitedEvent(tableIdentifier, base, metadata)); } @Override @@ -1683,6 +1704,7 @@ public void doRefresh() { if (latestLocation == null) { disableRefresh(); } else { + polarisEventListener.onBeforeViewRefreshed(new BeforeViewRefreshedEvent(identifier)); refreshFromMetadataLocation( latestLocation, SHOULD_RETRY_REFRESH_PREDICATE, @@ -1704,10 +1726,14 @@ public void doRefresh() { return ViewMetadataParser.read(fileIO.newInputFile(metadataLocation)); }); + polarisEventListener.onAfterViewRefreshed(new AfterViewRefreshedEvent(identifier)); } } public void doCommit(ViewMetadata base, ViewMetadata metadata) { + polarisEventListener.onBeforeViewCommited( + new BeforeViewCommitedEvent(identifier, base, metadata)); + // TODO: Maybe avoid writing metadata if there's definitely a transaction conflict LOGGER.debug("doCommit for view {} with base {}, metadata {}", identifier, base, metadata); if (null == base && !namespaceExists(identifier.namespace())) { @@ -1801,6 +1827,9 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) { } else { updateTableLike(identifier, entity); } + + polarisEventListener.onAfterViewCommited( + new AfterViewCommitedEvent(identifier, base, metadata)); } protected String writeNewMetadataIfRequired(ViewMetadata metadata) { diff --git a/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java b/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java index 94fe197609..b1d28ce89c 100644 --- a/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java +++ b/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java @@ -38,6 +38,7 @@ import org.apache.polaris.service.catalog.iceberg.IcebergCatalog; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.config.RealmEntityManagerFactory; +import org.apache.polaris.service.events.PolarisEventListener; import org.apache.polaris.service.task.TaskExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +56,7 @@ public class PolarisCallContextCatalogFactory implements CallContextCatalogFacto private final FileIOFactory fileIOFactory; private final MetaStoreManagerFactory metaStoreManagerFactory; private final UserSecretsManagerFactory userSecretsManagerFactory; + private final PolarisEventListener polarisEventListener; @Inject public PolarisCallContextCatalogFactory( @@ -62,12 +64,14 @@ public PolarisCallContextCatalogFactory( MetaStoreManagerFactory metaStoreManagerFactory, UserSecretsManagerFactory userSecretsManagerFactory, TaskExecutor taskExecutor, - FileIOFactory fileIOFactory) { + FileIOFactory fileIOFactory, + PolarisEventListener polarisEventListener) { this.entityManagerFactory = entityManagerFactory; this.metaStoreManagerFactory = metaStoreManagerFactory; this.userSecretsManagerFactory = userSecretsManagerFactory; this.taskExecutor = taskExecutor; this.fileIOFactory = fileIOFactory; + this.polarisEventListener = polarisEventListener; } @Override @@ -95,7 +99,8 @@ public Catalog createCallContextCatalog( resolvedManifest, securityContext, taskExecutor, - fileIOFactory); + fileIOFactory, + polarisEventListener); context.contextVariables().put(CallContext.REQUEST_PATH_CATALOG_INSTANCE_KEY, catalogInstance); diff --git a/service/common/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java new file mode 100644 index 0000000000..c952997df1 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.TableIdentifier; + +/** + * Emitted after Polaris performs a commit to a table. This is not emitted if there's an exception + * while committing. + * + * @param identifier The identifier. + * @param base The old metadata. + * @param metadata The new metadata. + */ +public record AfterTableCommitedEvent( + TableIdentifier identifier, TableMetadata base, TableMetadata metadata) + implements PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java new file mode 100644 index 0000000000..be38a8baaa --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import org.apache.iceberg.catalog.TableIdentifier; + +/** + * Emitted after Polaris refreshes its known version of a table's metadata by fetching the latest. + * + * @param tableIdentifier The identifier of the table that was refreshed. + */ +public record AfterTableRefreshedEvent(TableIdentifier tableIdentifier) implements PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java new file mode 100644 index 0000000000..638ba84fbc --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import org.apache.polaris.core.context.CallContext; + +/** + * Emitted after an attempt of an async task, such as manifest file cleanup, finishes. + * + * @param taskEntityId The ID of the TaskEntity. + * @param callContext The CallContext the task is being executed under. + * @param attempt The attempt number. Each retry of the task will have its own attempt number. The + * initial (non-retried) attempt starts counting from 1. + * @param success Whether or not the attempt succeeded. + */ +public record AfterTaskAttemptedEvent( + long taskEntityId, CallContext callContext, int attempt, boolean success) + implements PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java new file mode 100644 index 0000000000..eb2ca24149 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.view.ViewMetadata; + +/** + * Emitted after Polaris performs a commit to a view. This is not emitted if there's an exception + * while committing. + * + * @param identifier The identifier. + * @param base The old metadata. + * @param metadata The new metadata. + */ +public record AfterViewCommitedEvent( + TableIdentifier identifier, ViewMetadata base, ViewMetadata metadata) implements PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java new file mode 100644 index 0000000000..249220ddd7 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import org.apache.iceberg.catalog.TableIdentifier; + +/** + * Emitted after Polaris refreshes its known version of a view's metadata by fetching the latest. + * + * @param viewIdentifier The identifier of the view that was refreshed. + */ +public record AfterViewRefreshedEvent(TableIdentifier viewIdentifier) implements PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java new file mode 100644 index 0000000000..1d9780ebe7 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +/** + * Emitted before the RateLimiterFilter rejects a request due to exceeding the rate limit. + * + * @param method The request's HTTP method + * @param absolutePath The request's absolute path + */ +public record BeforeRequestRateLimitedEvent(String method, String absolutePath) + implements PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java new file mode 100644 index 0000000000..2bcc49ab67 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.TableIdentifier; + +/** + * Emitted when Polaris intends to perform a commit to a table. There is no guarantee on the order + * of this event relative to the validation checks we've performed, which means the commit may still + * fail Polaris-side validation checks. + * + * @param identifier The identifier. + * @param base The old metadata. + * @param metadata The new metadata. + */ +public record BeforeTableCommitedEvent( + TableIdentifier identifier, TableMetadata base, TableMetadata metadata) + implements PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java new file mode 100644 index 0000000000..f319298f57 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import org.apache.iceberg.catalog.TableIdentifier; + +/** + * Emitted when Polaris intends to refresh its known version of a table's metadata by fetching the + * latest. + * + * @param tableIdentifier The identifier of the table being refreshed. + */ +public record BeforeTableRefreshedEvent(TableIdentifier tableIdentifier) implements PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java new file mode 100644 index 0000000000..a7fa7231e7 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import org.apache.polaris.core.context.CallContext; + +/** + * Emitted before an attempt of an async task, such as manifest file cleanup, begins. + * + * @param taskEntityId The ID of the TaskEntity + * @param callContext The CallContext the task is being executed under. + * @param attempt The attempt number. Each retry of the task will have its own attempt number. The + * initial (non-retried) attempt starts counting from 1. + */ +public record BeforeTaskAttemptedEvent(long taskEntityId, CallContext callContext, int attempt) + implements PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java new file mode 100644 index 0000000000..16e460d806 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.view.ViewMetadata; + +/** + * Emitted when Polaris intends to perform a commit to a view. There is no guarantee on the order of + * this event relative to the validation checks we've performed, which means the commit may still + * fail Polaris-side validation checks. + * + * @param identifier The identifier. + * @param base The old metadata. + * @param metadata The new metadata. + */ +public record BeforeViewCommitedEvent( + TableIdentifier identifier, ViewMetadata base, ViewMetadata metadata) implements PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java new file mode 100644 index 0000000000..6f58d2ca22 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import org.apache.iceberg.catalog.TableIdentifier; + +/** + * Emitted when Polaris intends to refresh its known version of a view's metadata by fetching the + * latest. + * + * @param viewIdentifier The identifier of the view being refreshed. + */ +public record BeforeViewRefreshedEvent(TableIdentifier viewIdentifier) implements PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/NoOpPolarisEventListener.java b/service/common/src/main/java/org/apache/polaris/service/events/NoOpPolarisEventListener.java new file mode 100644 index 0000000000..f31fbcef51 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/NoOpPolarisEventListener.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; + +/** Event listener that does nothing. */ +@ApplicationScoped +@Identifier("no-op") +public class NoOpPolarisEventListener extends PolarisEventListener {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/PolarisEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/PolarisEvent.java new file mode 100644 index 0000000000..4922c02f4c --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/PolarisEvent.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +/** + * Represents an event emitted by Polaris. Currently there's no common data across events so this is + * just a marker interface. * + */ +public interface PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java b/service/common/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java new file mode 100644 index 0000000000..485766bb24 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +/** + * Represents an event listener that can respond to notable moments during Polaris's execution. + * Event details are documented under the event objects themselves. + */ +public abstract class PolarisEventListener { + /** {@link BeforeRequestRateLimitedEvent} */ + public void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent event) {} + + /** {@link BeforeTableCommitedEvent} */ + public void onBeforeTableCommited(BeforeTableCommitedEvent event) {} + + /** {@link AfterTableCommitedEvent} */ + public void onAfterTableCommited(AfterTableCommitedEvent event) {} + + /** {@link BeforeViewCommitedEvent} */ + public void onBeforeViewCommited(BeforeViewCommitedEvent event) {} + + /** {@link AfterViewCommitedEvent} */ + public void onAfterViewCommited(AfterViewCommitedEvent event) {} + + /** {@link BeforeTableRefreshedEvent} */ + public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) {} + + /** {@link AfterTableRefreshedEvent} */ + public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {} + + /** {@link BeforeViewRefreshedEvent} */ + public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) {} + + /** {@link AfterViewRefreshedEvent} */ + public void onAfterViewRefreshed(AfterViewRefreshedEvent event) {} + + /** {@link BeforeTaskAttemptedEvent} */ + public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) {} + + /** {@link AfterTaskAttemptedEvent} */ + public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {} +} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/TestPolarisEventListener.java b/service/common/src/main/java/org/apache/polaris/service/events/TestPolarisEventListener.java new file mode 100644 index 0000000000..668edc7fa0 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/TestPolarisEventListener.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import com.google.common.collect.Streams; +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; +import java.util.ArrayList; +import java.util.List; + +/** Event listener that stores all emitted events forever. Not recommended for use in production. */ +@ApplicationScoped +@Identifier("test") +public class TestPolarisEventListener extends PolarisEventListener { + private final List history = new ArrayList<>(); + + public T getLatest(Class type) { + return (T) Streams.findLast(history.stream().filter(type::isInstance)).orElseThrow(); + } + + @Override + public void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent event) { + history.add(event); + } + + @Override + public void onBeforeTableCommited(BeforeTableCommitedEvent event) { + history.add(event); + } + + @Override + public void onAfterTableCommited(AfterTableCommitedEvent event) { + history.add(event); + } + + @Override + public void onBeforeViewCommited(BeforeViewCommitedEvent event) { + history.add(event); + } + + @Override + public void onAfterViewCommited(AfterViewCommitedEvent event) { + history.add(event); + } + + @Override + public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) { + history.add(event); + } + + @Override + public void onAfterTableRefreshed(AfterTableRefreshedEvent event) { + history.add(event); + } + + @Override + public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) { + history.add(event); + } + + @Override + public void onAfterViewRefreshed(AfterViewRefreshedEvent event) { + history.add(event); + } + + @Override + public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) { + history.add(event); + } + + @Override + public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) { + history.add(event); + } +} diff --git a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java index 28bb60589d..7b0fda93fc 100644 --- a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java +++ b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java @@ -28,6 +28,8 @@ import jakarta.ws.rs.ext.Provider; import java.io.IOException; import org.apache.polaris.service.config.PolarisFilterPriorities; +import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.PolarisEventListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,16 +42,21 @@ public class RateLimiterFilter implements ContainerRequestFilter { private static final Logger LOGGER = LoggerFactory.getLogger(RateLimiterFilter.class); private final RateLimiter rateLimiter; + private final PolarisEventListener polarisEventListener; @Inject - public RateLimiterFilter(RateLimiter rateLimiter) { + public RateLimiterFilter(RateLimiter rateLimiter, PolarisEventListener polarisEventListener) { this.rateLimiter = rateLimiter; + this.polarisEventListener = polarisEventListener; } /** Returns a 429 if the rate limiter says so. Otherwise, forwards the request along. */ @Override public void filter(ContainerRequestContext ctx) throws IOException { if (!rateLimiter.canProceed()) { + polarisEventListener.onBeforeRequestRateLimited( + new BeforeRequestRateLimitedEvent( + ctx.getMethod(), ctx.getUriInfo().getAbsolutePath().toString())); ctx.abortWith(Response.status(Response.Status.TOO_MANY_REQUESTS).build()); LOGGER.atDebug().log("Rate limiting request"); } diff --git a/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java b/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java index 6b42c3fc89..1409310318 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java @@ -33,6 +33,9 @@ import org.apache.polaris.core.entity.TaskEntity; import org.apache.polaris.core.persistence.MetaStoreManagerFactory; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; +import org.apache.polaris.service.events.AfterTaskAttemptedEvent; +import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.PolarisEventListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,14 +51,17 @@ public class TaskExecutorImpl implements TaskExecutor { private final MetaStoreManagerFactory metaStoreManagerFactory; private final TaskFileIOSupplier fileIOSupplier; private final List taskHandlers = new CopyOnWriteArrayList<>(); + private final PolarisEventListener polarisEventListener; public TaskExecutorImpl( Executor executor, MetaStoreManagerFactory metaStoreManagerFactory, - TaskFileIOSupplier fileIOSupplier) { + TaskFileIOSupplier fileIOSupplier, + PolarisEventListener polarisEventListener) { this.executor = executor; this.metaStoreManagerFactory = metaStoreManagerFactory; this.fileIOSupplier = fileIOSupplier; + this.polarisEventListener = polarisEventListener; } public void init() { @@ -111,45 +117,54 @@ public void addTaskHandlerContext(long taskEntityId, CallContext callContext) { } protected void handleTask(long taskEntityId, CallContext ctx, int attempt) { - // set the call context INSIDE the async task - CallContext.setCurrentContext(ctx); - LOGGER.info("Handling task entity id {}", taskEntityId); - PolarisMetaStoreManager metaStoreManager = - metaStoreManagerFactory.getOrCreateMetaStoreManager(ctx.getRealmContext()); - PolarisBaseEntity taskEntity = - metaStoreManager - .loadEntity(ctx.getPolarisCallContext(), 0L, taskEntityId, PolarisEntityType.TASK) - .getEntity(); - if (!PolarisEntityType.TASK.equals(taskEntity.getType())) { - throw new IllegalArgumentException("Provided taskId must be a task entity type"); - } - TaskEntity task = TaskEntity.of(taskEntity); - Optional handlerOpt = - taskHandlers.stream().filter(th -> th.canHandleTask(task)).findFirst(); - if (handlerOpt.isEmpty()) { - LOGGER - .atWarn() - .addKeyValue("taskEntityId", taskEntityId) - .addKeyValue("taskType", task.getTaskType()) - .log("Unable to find handler for task type"); - return; - } - TaskHandler handler = handlerOpt.get(); - boolean success = handler.handleTask(task, ctx); - if (success) { - LOGGER - .atInfo() - .addKeyValue("taskEntityId", taskEntityId) - .addKeyValue("handlerClass", handler.getClass()) - .log("Task successfully handled"); - metaStoreManager.dropEntityIfExists( - ctx.getPolarisCallContext(), null, taskEntity, Map.of(), false); - } else { - LOGGER - .atWarn() - .addKeyValue("taskEntityId", taskEntityId) - .addKeyValue("taskEntityName", taskEntity.getName()) - .log("Unable to execute async task"); + polarisEventListener.onBeforeTaskAttempted( + new BeforeTaskAttemptedEvent(taskEntityId, ctx, attempt)); + + boolean success = false; + try { + // set the call context INSIDE the async task + CallContext.setCurrentContext(ctx); + LOGGER.info("Handling task entity id {}", taskEntityId); + PolarisMetaStoreManager metaStoreManager = + metaStoreManagerFactory.getOrCreateMetaStoreManager(ctx.getRealmContext()); + PolarisBaseEntity taskEntity = + metaStoreManager + .loadEntity(ctx.getPolarisCallContext(), 0L, taskEntityId, PolarisEntityType.TASK) + .getEntity(); + if (!PolarisEntityType.TASK.equals(taskEntity.getType())) { + throw new IllegalArgumentException("Provided taskId must be a task entity type"); + } + TaskEntity task = TaskEntity.of(taskEntity); + Optional handlerOpt = + taskHandlers.stream().filter(th -> th.canHandleTask(task)).findFirst(); + if (handlerOpt.isEmpty()) { + LOGGER + .atWarn() + .addKeyValue("taskEntityId", taskEntityId) + .addKeyValue("taskType", task.getTaskType()) + .log("Unable to find handler for task type"); + return; + } + TaskHandler handler = handlerOpt.get(); + success = handler.handleTask(task, ctx); + if (success) { + LOGGER + .atInfo() + .addKeyValue("taskEntityId", taskEntityId) + .addKeyValue("handlerClass", handler.getClass()) + .log("Task successfully handled"); + metaStoreManager.dropEntityIfExists( + ctx.getPolarisCallContext(), null, taskEntity, Map.of(), false); + } else { + LOGGER + .atWarn() + .addKeyValue("taskEntityId", taskEntityId) + .addKeyValue("taskEntityName", taskEntity.getName()) + .log("Unable to execute async task"); + } + } finally { + polarisEventListener.onAfterTaskAttempted( + new AfterTaskAttemptedEvent(taskEntityId, ctx, attempt, success)); } } } diff --git a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java index 9a070273b8..a6854703d7 100644 --- a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java +++ b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java @@ -244,7 +244,8 @@ IcebergCatalog createCatalog(TestServices services) { passthroughView, services.securityContext(), services.taskExecutor(), - services.fileIOFactory()); + services.fileIOFactory(), + services.polarisEventListener()); polarisCatalog.initialize( CATALOG_NAME, ImmutableMap.of( diff --git a/service/common/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java b/service/common/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java new file mode 100644 index 0000000000..1cc88fc100 --- /dev/null +++ b/service/common/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.task; + +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.TaskEntity; +import org.apache.polaris.core.persistence.BasePersistence; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.core.persistence.PolarisMetaStoreManager; +import org.apache.polaris.service.TestServices; +import org.apache.polaris.service.events.AfterTaskAttemptedEvent; +import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.TestPolarisEventListener; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** Unit tests for TaskExecutorImpl */ +public class TaskExecutorImplTest { + @Test + void testEventsAreEmitted() { + String realm = "myrealm"; + RealmContext realmContext = () -> realm; + + TestServices testServices = TestServices.builder().realmContext(realmContext).build(); + + TestPolarisEventListener testPolarisEventListener = + (TestPolarisEventListener) testServices.polarisEventListener(); + + MetaStoreManagerFactory metaStoreManagerFactory = testServices.metaStoreManagerFactory(); + PolarisMetaStoreManager metaStoreManager = + metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext); + BasePersistence bp = metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(); + + PolarisCallContext polarisCallCtx = + new PolarisCallContext(bp, testServices.polarisDiagnostics()); + CallContext callContext = CallContext.of(realmContext, polarisCallCtx); + + // This task doesn't have a type so it won't be handle-able by a real handler. We register a + // test TaskHandler below that can handle any task. + TaskEntity taskEntity = + new TaskEntity.Builder() + .setName("mytask") + .setId(metaStoreManager.generateNewEntityId(polarisCallCtx).getId()) + .build(); + metaStoreManager.createEntityIfNotExists(polarisCallCtx, null, taskEntity); + + int attempt = 1; + + TaskExecutorImpl executor = + new TaskExecutorImpl( + Runnable::run, + testServices.metaStoreManagerFactory(), + new TaskFileIOSupplier(testServices.fileIOFactory()), + testServices.polarisEventListener()); + executor.addTaskHandler( + new TaskHandler() { + @Override + public boolean canHandleTask(TaskEntity task) { + return true; + } + + @Override + public boolean handleTask(TaskEntity task, CallContext callContext) { + var beforeTaskAttemptedEvent = + testPolarisEventListener.getLatest(BeforeTaskAttemptedEvent.class); + Assertions.assertEquals(taskEntity.getId(), beforeTaskAttemptedEvent.taskEntityId()); + Assertions.assertEquals(callContext, beforeTaskAttemptedEvent.callContext()); + Assertions.assertEquals(attempt, beforeTaskAttemptedEvent.attempt()); + return true; + } + }); + + executor.handleTask(taskEntity.getId(), callContext, attempt); + + var afterAttemptTaskEvent = testPolarisEventListener.getLatest(AfterTaskAttemptedEvent.class); + Assertions.assertEquals(taskEntity.getId(), afterAttemptTaskEvent.taskEntityId()); + Assertions.assertEquals(callContext, afterAttemptTaskEvent.callContext()); + Assertions.assertEquals(attempt, afterAttemptTaskEvent.attempt()); + Assertions.assertTrue(afterAttemptTaskEvent.success()); + } +} diff --git a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java index 88406b8954..cb48ad0a18 100644 --- a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -56,6 +56,8 @@ import org.apache.polaris.service.config.RealmEntityManagerFactory; import org.apache.polaris.service.context.CallContextCatalogFactory; import org.apache.polaris.service.context.PolarisCallContextCatalogFactory; +import org.apache.polaris.service.events.PolarisEventListener; +import org.apache.polaris.service.events.TestPolarisEventListener; import org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory; import org.apache.polaris.service.secrets.UnsafeInMemorySecretsManagerFactory; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; @@ -75,7 +77,8 @@ public record TestServices( RealmContext realmContext, SecurityContext securityContext, FileIOFactory fileIOFactory, - TaskExecutor taskExecutor) { + TaskExecutor taskExecutor, + PolarisEventListener polarisEventListener) { private static final RealmContext TEST_REALM = () -> "test-realm"; private static final String GCP_ACCESS_TOKEN = "abc"; @@ -175,13 +178,15 @@ public Map contextVariables() { TaskExecutor taskExecutor = Mockito.mock(TaskExecutor.class); + PolarisEventListener polarisEventListener = new TestPolarisEventListener(); CallContextCatalogFactory callContextFactory = new PolarisCallContextCatalogFactory( realmEntityManagerFactory, metaStoreManagerFactory, userSecretsManagerFactory, taskExecutor, - fileIOFactory); + fileIOFactory, + polarisEventListener); IcebergCatalogAdapter service = new IcebergCatalogAdapter( @@ -252,7 +257,8 @@ public String getAuthenticationScheme() { realmContext, securityContext, fileIOFactory, - taskExecutor); + taskExecutor, + polarisEventListener); } } }