diff --git a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/ExamplePolarisEventListener.java b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/ExamplePolarisEventListener.java new file mode 100644 index 0000000000..dc1581c729 --- /dev/null +++ b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/ExamplePolarisEventListener.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.quarkus; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; +import org.apache.polaris.service.events.BeforeTableCommitEvent; + +@ApplicationScoped +public class ExamplePolarisEventListener { + void onBeforeTableCommit(@Observes BeforeTableCommitEvent task) { + System.out.println("Observed BeforeTableCommitEvent"); + } +} diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java index 06bbc84d6c..a039c469b7 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import jakarta.annotation.Nonnull; +import jakarta.enterprise.event.Event; import jakarta.ws.rs.core.SecurityContext; import java.io.Closeable; import java.io.IOException; @@ -104,6 +105,8 @@ import org.apache.polaris.core.storage.PolarisStorageIntegration; import org.apache.polaris.core.storage.StorageLocation; import org.apache.polaris.service.catalog.io.FileIOFactory; +import org.apache.polaris.service.events.BeforeTableCommitEvent; +import org.apache.polaris.service.events.PolarisEventEmitter; import org.apache.polaris.service.exception.IcebergExceptionMapper; import org.apache.polaris.service.task.TaskExecutor; import org.apache.polaris.service.types.NotificationRequest; @@ -163,6 +166,7 @@ public class BasePolarisCatalog extends BaseMetastoreViewCatalog private final FileIOFactory fileIOFactory; private final long catalogId; private final String catalogName; + private final PolarisEventEmitter eventEmitter; private String ioImplClassName; private FileIO catalogFileIO; @@ -180,16 +184,17 @@ public class BasePolarisCatalog extends BaseMetastoreViewCatalog * @param taskExecutor Executor we use to register cleanup task handlers */ public BasePolarisCatalog( - RealmId realmId, - PolarisEntityManager entityManager, - PolarisMetaStoreManager metaStoreManager, - PolarisMetaStoreSession metaStoreSession, - PolarisConfigurationStore configurationStore, - PolarisDiagnostics diagnostics, - PolarisResolutionManifestCatalogView resolvedEntityView, - SecurityContext securityContext, - TaskExecutor taskExecutor, - FileIOFactory fileIOFactory) { + RealmId realmId, + PolarisEntityManager entityManager, + PolarisMetaStoreManager metaStoreManager, + PolarisMetaStoreSession metaStoreSession, + PolarisConfigurationStore configurationStore, + PolarisDiagnostics diagnostics, + PolarisResolutionManifestCatalogView resolvedEntityView, + SecurityContext securityContext, + TaskExecutor taskExecutor, + FileIOFactory fileIOFactory, + PolarisEventEmitter eventEmitter) { this.realmId = realmId; this.entityManager = entityManager; this.metaStoreManager = metaStoreManager; @@ -206,6 +211,7 @@ public BasePolarisCatalog( (AuthenticatedPolarisPrincipal) securityContext.getUserPrincipal(); this.catalogId = catalogEntity.getId(); this.catalogName = catalogEntity.getName(); + this.eventEmitter = eventEmitter; } @Override @@ -1256,6 +1262,8 @@ public void doRefresh() { @Override public void doCommit(TableMetadata base, TableMetadata metadata) { + eventEmitter.fire(new BeforeTableCommitEvent(base, metadata)); + LOGGER.debug( "doCommit for table {} with base {}, metadata {}", tableIdentifier, base, metadata); // TODO: Maybe avoid writing metadata if there's definitely a transaction conflict diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java b/service/common/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java index a123019cd2..e3edea3e37 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import jakarta.enterprise.context.RequestScoped; +import jakarta.enterprise.event.Event; import jakarta.inject.Inject; import jakarta.ws.rs.core.Response; import jakarta.ws.rs.core.Response.Status; @@ -69,6 +70,8 @@ import org.apache.polaris.service.catalog.api.IcebergRestCatalogApiService; import org.apache.polaris.service.catalog.api.IcebergRestConfigurationApiService; import org.apache.polaris.service.catalog.io.FileIOFactory; +import org.apache.polaris.service.events.BeforeTableCommitEvent; +import org.apache.polaris.service.events.PolarisEventEmitter; import org.apache.polaris.service.task.TaskExecutor; import org.apache.polaris.service.types.CommitTableRequest; import org.apache.polaris.service.types.CommitViewRequest; @@ -128,18 +131,20 @@ public class IcebergCatalogAdapter private final PolarisAuthorizer polarisAuthorizer; private final TaskExecutor taskExecutor; private final FileIOFactory fileIOFactory; + private final PolarisEventEmitter eventEmitter; @Inject public IcebergCatalogAdapter( - RealmId realmId, - PolarisEntityManager entityManager, - PolarisMetaStoreManager metaStoreManager, - PolarisMetaStoreSession session, - PolarisConfigurationStore configurationStore, - PolarisDiagnostics diagnostics, - PolarisAuthorizer polarisAuthorizer, - TaskExecutor taskExecutor, - FileIOFactory fileIOFactory) { + RealmId realmId, + PolarisEntityManager entityManager, + PolarisMetaStoreManager metaStoreManager, + PolarisMetaStoreSession session, + PolarisConfigurationStore configurationStore, + PolarisDiagnostics diagnostics, + PolarisAuthorizer polarisAuthorizer, + TaskExecutor taskExecutor, + FileIOFactory fileIOFactory, + PolarisEventEmitter eventEmitter) { this.realmId = realmId; this.entityManager = entityManager; this.metaStoreManager = metaStoreManager; @@ -149,6 +154,7 @@ public IcebergCatalogAdapter( this.polarisAuthorizer = polarisAuthorizer; this.taskExecutor = taskExecutor; this.fileIOFactory = fileIOFactory; + this.eventEmitter = eventEmitter; } /** @@ -189,7 +195,8 @@ private PolarisCatalogHandlerWrapper newHandlerWrapper( catalogName, polarisAuthorizer, taskExecutor, - fileIOFactory); + fileIOFactory, + eventEmitter); } @Override diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java b/service/common/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java index ea014f2cce..7ad260f3c2 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import jakarta.annotation.Nonnull; +import jakarta.enterprise.event.Event; import jakarta.ws.rs.core.SecurityContext; import java.io.Closeable; import java.io.IOException; @@ -97,6 +98,8 @@ import org.apache.polaris.core.persistence.resolver.ResolverStatus; import org.apache.polaris.core.storage.PolarisStorageActions; import org.apache.polaris.service.catalog.io.FileIOFactory; +import org.apache.polaris.service.events.BeforeTableCommitEvent; +import org.apache.polaris.service.events.PolarisEventEmitter; import org.apache.polaris.service.task.TaskExecutor; import org.apache.polaris.service.types.NotificationRequest; import org.slf4j.Logger; @@ -132,6 +135,7 @@ public class PolarisCatalogHandlerWrapper implements AutoCloseable { private final PolarisAuthorizer authorizer; private final TaskExecutor taskExecutor; private final FileIOFactory fileIOFactory; + private final PolarisEventEmitter eventEmitter; // Initialized in the authorize methods. private PolarisResolutionManifest resolutionManifest = null; @@ -143,17 +147,18 @@ public class PolarisCatalogHandlerWrapper implements AutoCloseable { private ViewCatalog viewCatalog = null; public PolarisCatalogHandlerWrapper( - RealmId realmId, - PolarisMetaStoreSession session, - PolarisConfigurationStore configurationStore, - PolarisDiagnostics diagnostics, - PolarisEntityManager entityManager, - PolarisMetaStoreManager metaStoreManager, - SecurityContext securityContext, - String catalogName, - PolarisAuthorizer authorizer, - TaskExecutor taskExecutor, - FileIOFactory fileIOFactory) { + RealmId realmId, + PolarisMetaStoreSession session, + PolarisConfigurationStore configurationStore, + PolarisDiagnostics diagnostics, + PolarisEntityManager entityManager, + PolarisMetaStoreManager metaStoreManager, + SecurityContext securityContext, + String catalogName, + PolarisAuthorizer authorizer, + TaskExecutor taskExecutor, + FileIOFactory fileIOFactory, + PolarisEventEmitter eventEmitter) { this.realmId = realmId; this.session = session; this.entityManager = entityManager; @@ -173,6 +178,7 @@ public PolarisCatalogHandlerWrapper( this.authorizer = authorizer; this.taskExecutor = taskExecutor; this.fileIOFactory = fileIOFactory; + this.eventEmitter = eventEmitter; } /** @@ -244,7 +250,8 @@ protected Catalog createBasePolarisCatalog(Map catalogProperties resolutionManifest, securityContext, taskExecutor, - fileIOFactory); + fileIOFactory, + eventEmitter); // TODO: The initialize properties might need to take more from the CatalogEntity. catalogInstance.initialize(catalogName, catalogProperties); diff --git a/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableCommitEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableCommitEvent.java new file mode 100644 index 0000000000..20310d2654 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableCommitEvent.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events; + +import org.apache.iceberg.TableMetadata; + +public record BeforeTableCommitEvent(TableMetadata base, TableMetadata metadata) { +} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/PolarisEventEmitter.java b/service/common/src/main/java/org/apache/polaris/service/events/PolarisEventEmitter.java new file mode 100644 index 0000000000..ba15189388 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/PolarisEventEmitter.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Event; +import jakarta.inject.Inject; + +@ApplicationScoped +public class PolarisEventEmitter { + @Inject private Event beforeTableCommitEvent; + // TODO: more events + + public void fire(BeforeTableCommitEvent event) { + beforeTableCommitEvent.fire(event); + } + + // TODO: more fire() methods +}