From 6779c520e3b9a895d8f5895c4cac615a06f492de Mon Sep 17 00:00:00 2001 From: adnanhemani Date: Wed, 27 Aug 2025 00:04:28 -0700 Subject: [PATCH 1/8] Add Events for Iceberg REST APIs --- .../iceberg/IcebergCatalogAdapter.java | 4 +- ...ebergRestCatalogEventServiceDelegator.java | 201 ++++++++++++++---- ...estConfigurationEventServiceDelegator.java | 9 +- .../events/IcebergRestCatalogEvents.java | 104 +++++++++ .../service/events/PolarisEventListener.java | 150 +++++++++++++ 5 files changed, 420 insertions(+), 48 deletions(-) create mode 100644 runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java index 860476cf81..e8bbd597dd 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java @@ -777,8 +777,8 @@ public Response sendNotification( securityContext, prefix, catalog -> { - catalog.sendNotification(tableIdentifier, notificationRequest); - return Response.status(Response.Status.NO_CONTENT).build(); + boolean notificationSent = catalog.sendNotification(tableIdentifier, notificationRequest); + return Response.status(Response.Status.NO_CONTENT).entity(notificationSent).build(); }); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java index 43d1c1f009..b64dfdd03a 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java @@ -33,8 +33,60 @@ import org.apache.iceberg.rest.requests.RenameTableRequest; import org.apache.iceberg.rest.requests.ReportMetricsRequest; import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; +import org.apache.iceberg.rest.responses.CreateNamespaceResponse; +import org.apache.iceberg.rest.responses.GetNamespaceResponse; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.rest.responses.LoadViewResponse; +import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.service.catalog.api.IcebergRestCatalogApiService; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCheckNamespaceExistsEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCheckTableExistsEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCheckViewExistsEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCreateNamespaceEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCreateTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCreateViewEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterDropNamespaceEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterDropTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterDropViewEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterListNamespacesEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterListTablesEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterListViewsEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterLoadCredentialsEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterLoadNamespaceMetadataEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterLoadTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterLoadViewEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterRegisterTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterRenameTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterRenameViewEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterReplaceViewEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterSendNotificationEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterUpdateNamespacePropertiesEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterUpdateTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeCheckNamespaceExistsEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeCheckTableExistsEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeCheckViewExistsEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeCreateNamespaceEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeCreateTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeCreateViewEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeDropNamespaceEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeDropTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeDropViewEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeListNamespacesEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeListTablesEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeListViewsEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeLoadCredentialsEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeLoadNamespaceMetadataEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeLoadTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeLoadViewEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeRegisterTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeRenameTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeRenameViewEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeReplaceViewEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeSendNotificationEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeUpdateNamespacePropertiesEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeUpdateTableEvent; +import org.apache.polaris.service.events.PolarisEventListener; import org.apache.polaris.service.types.CommitTableRequest; import org.apache.polaris.service.types.CommitViewRequest; import org.apache.polaris.service.types.NotificationRequest; @@ -44,6 +96,7 @@ public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatalogApiService { @Inject @Delegate IcebergCatalogAdapter delegate; + @Inject PolarisEventListener polarisEventListener; @Override public Response createNamespace( @@ -51,7 +104,11 @@ public Response createNamespace( CreateNamespaceRequest createNamespaceRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.createNamespace(prefix, createNamespaceRequest, realmContext, securityContext); + polarisEventListener.onBeforeCreateNamespace(new BeforeCreateNamespaceEvent(prefix, createNamespaceRequest)); + Response resp = delegate.createNamespace(prefix, createNamespaceRequest, realmContext, securityContext); + CreateNamespaceResponse createNamespaceResponse = resp.readEntity(CreateNamespaceResponse.class); + polarisEventListener.onAfterCreateNamespace(new AfterCreateNamespaceEvent(prefix, createNamespaceResponse.namespace(), createNamespaceResponse.properties())); + return resp; } @Override @@ -62,26 +119,38 @@ public Response listNamespaces( String parent, RealmContext realmContext, SecurityContext securityContext) { - return delegate.listNamespaces( - prefix, pageToken, pageSize, parent, realmContext, securityContext); + polarisEventListener.onBeforeListNamespaces(new BeforeListNamespacesEvent(prefix, parent)); + Response resp = delegate.listNamespaces(prefix, pageToken, pageSize, parent, realmContext, securityContext); + polarisEventListener.onAfterListNamespaces(new AfterListNamespacesEvent(prefix, parent)); + return resp; } @Override public Response loadNamespaceMetadata( String prefix, String namespace, RealmContext realmContext, SecurityContext securityContext) { - return delegate.loadNamespaceMetadata(prefix, namespace, realmContext, securityContext); + polarisEventListener.onBeforeLoadNamespaceMetadata(new BeforeLoadNamespaceMetadataEvent(prefix, namespace)); + Response resp = delegate.loadNamespaceMetadata(prefix, namespace, realmContext, securityContext); + GetNamespaceResponse getNamespaceResponse = resp.readEntity(GetNamespaceResponse.class); + polarisEventListener.onAfterLoadNamespaceMetadata(new AfterLoadNamespaceMetadataEvent(prefix, getNamespaceResponse.namespace(), getNamespaceResponse.properties())); + return resp; } @Override public Response namespaceExists( String prefix, String namespace, RealmContext realmContext, SecurityContext securityContext) { - return delegate.namespaceExists(prefix, namespace, realmContext, securityContext); + polarisEventListener.onBeforeCheckNamespaceExists(new BeforeCheckNamespaceExistsEvent(prefix, namespace)); + Response resp = delegate.namespaceExists(prefix, namespace, realmContext, securityContext); + polarisEventListener.onAfterCheckNamespaceExists(new AfterCheckNamespaceExistsEvent(prefix, namespace)); + return resp; } @Override public Response dropNamespace( String prefix, String namespace, RealmContext realmContext, SecurityContext securityContext) { - return delegate.dropNamespace(prefix, namespace, realmContext, securityContext); + polarisEventListener.onBeforeDropNamespace(new BeforeDropNamespaceEvent(prefix, namespace)); + Response resp = delegate.dropNamespace(prefix, namespace, realmContext, securityContext); + polarisEventListener.onAfterDropNamespace(new AfterDropNamespaceEvent(prefix, namespace)); + return resp; } @Override @@ -91,8 +160,10 @@ public Response updateProperties( UpdateNamespacePropertiesRequest updateNamespacePropertiesRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.updateProperties( - prefix, namespace, updateNamespacePropertiesRequest, realmContext, securityContext); + polarisEventListener.onBeforeUpdateNamespaceProperties(new BeforeUpdateNamespacePropertiesEvent(prefix, namespace, updateNamespacePropertiesRequest)); + Response resp = delegate.updateProperties(prefix, namespace, updateNamespacePropertiesRequest, realmContext, securityContext); + polarisEventListener.onAfterUpdateNamespaceProperties(new AfterUpdateNamespacePropertiesEvent(prefix, namespace, resp.readEntity(UpdateNamespacePropertiesResponse.class))); + return resp; } @Override @@ -103,8 +174,10 @@ public Response createTable( String accessDelegationMode, RealmContext realmContext, SecurityContext securityContext) { - return delegate.createTable( - prefix, namespace, createTableRequest, accessDelegationMode, realmContext, securityContext); + polarisEventListener.onBeforeCreateTable(new BeforeCreateTableEvent(prefix, namespace, createTableRequest, accessDelegationMode)); + Response resp = delegate.createTable(prefix, namespace, createTableRequest, accessDelegationMode, realmContext, securityContext); + polarisEventListener.onAfterCreateTable(new AfterCreateTableEvent(prefix, namespace, resp.readEntity(LoadTableResponse.class))); + return resp; } @Override @@ -115,8 +188,10 @@ public Response listTables( Integer pageSize, RealmContext realmContext, SecurityContext securityContext) { - return delegate.listTables( - prefix, namespace, pageToken, pageSize, realmContext, securityContext); + polarisEventListener.onBeforeListTables(new BeforeListTablesEvent(prefix, namespace)); + Response resp = delegate.listTables(prefix, namespace, pageToken, pageSize, realmContext, securityContext); + polarisEventListener.onAfterListTables(new AfterListTablesEvent(prefix, namespace)); + return resp; } @Override @@ -129,15 +204,10 @@ public Response loadTable( String snapshots, RealmContext realmContext, SecurityContext securityContext) { - return delegate.loadTable( - prefix, - namespace, - table, - accessDelegationMode, - ifNoneMatchString, - snapshots, - realmContext, - securityContext); + polarisEventListener.onBeforeLoadTable(new BeforeLoadTableEvent(prefix, namespace, table, accessDelegationMode, ifNoneMatchString, snapshots)); + Response resp = delegate.loadTable(prefix, namespace, table, accessDelegationMode, ifNoneMatchString, snapshots, realmContext, securityContext); + polarisEventListener.onAfterLoadTable(new AfterLoadTableEvent(prefix, namespace, resp.readEntity(LoadTableResponse.class))); + return resp; } @Override @@ -147,7 +217,10 @@ public Response tableExists( String table, RealmContext realmContext, SecurityContext securityContext) { - return delegate.tableExists(prefix, namespace, table, realmContext, securityContext); + polarisEventListener.onBeforeCheckTableExists(new BeforeCheckTableExistsEvent(prefix, namespace, table)); + Response resp = delegate.tableExists(prefix, namespace, table, realmContext, securityContext); + polarisEventListener.onAfterCheckTableExists(new AfterCheckTableExistsEvent(prefix, namespace, table)); + return resp; } @Override @@ -158,8 +231,10 @@ public Response dropTable( Boolean purgeRequested, RealmContext realmContext, SecurityContext securityContext) { - return delegate.dropTable( - prefix, namespace, table, purgeRequested, realmContext, securityContext); + polarisEventListener.onBeforeDropTable(new BeforeDropTableEvent(prefix, namespace, table, purgeRequested)); + Response resp = delegate.dropTable(prefix, namespace, table, purgeRequested, realmContext, securityContext); + polarisEventListener.onAfterDropTable(new AfterDropTableEvent(prefix, namespace, table, purgeRequested)); + return resp; } @Override @@ -169,8 +244,10 @@ public Response registerTable( RegisterTableRequest registerTableRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.registerTable( - prefix, namespace, registerTableRequest, realmContext, securityContext); + polarisEventListener.onBeforeRegisterTable(new BeforeRegisterTableEvent(prefix, namespace, registerTableRequest)); + Response resp = delegate.registerTable(prefix, namespace, registerTableRequest, realmContext, securityContext); + polarisEventListener.onAfterRegisterTable(new AfterRegisterTableEvent(prefix, namespace, resp.readEntity(LoadTableResponse.class))); + return resp; } @Override @@ -179,7 +256,10 @@ public Response renameTable( RenameTableRequest renameTableRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.renameTable(prefix, renameTableRequest, realmContext, securityContext); + polarisEventListener.onBeforeRenameTable(new BeforeRenameTableEvent(prefix, renameTableRequest)); + Response resp = delegate.renameTable(prefix, renameTableRequest, realmContext, securityContext); + polarisEventListener.onAfterRenameTable(new AfterRenameTableEvent(prefix, renameTableRequest)); + return resp; } @Override @@ -190,8 +270,10 @@ public Response updateTable( CommitTableRequest commitTableRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.updateTable( - prefix, namespace, table, commitTableRequest, realmContext, securityContext); + polarisEventListener.onBeforeUpdateTable(new BeforeUpdateTableEvent(prefix, namespace, table, commitTableRequest)); + Response resp = delegate.updateTable(prefix, namespace, table, commitTableRequest, realmContext, securityContext); + polarisEventListener.onAfterUpdateTable(new AfterUpdateTableEvent(prefix, namespace, table, resp.readEntity(LoadTableResponse.class))); + return resp; } @Override @@ -201,7 +283,10 @@ public Response createView( CreateViewRequest createViewRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.createView(prefix, namespace, createViewRequest, realmContext, securityContext); + polarisEventListener.onBeforeCreateView(new BeforeCreateViewEvent(prefix, namespace, createViewRequest)); + Response resp = delegate.createView(prefix, namespace, createViewRequest, realmContext, securityContext); + polarisEventListener.onAfterCreateView(new AfterCreateViewEvent(prefix, namespace, resp.readEntity(LoadViewResponse.class))); + return resp; } @Override @@ -212,8 +297,10 @@ public Response listViews( Integer pageSize, RealmContext realmContext, SecurityContext securityContext) { - return delegate.listViews( - prefix, namespace, pageToken, pageSize, realmContext, securityContext); + polarisEventListener.onBeforeListViews(new BeforeListViewsEvent(prefix, namespace)); + Response resp = delegate.listViews(prefix, namespace, pageToken, pageSize, realmContext, securityContext); + polarisEventListener.onAfterListViews(new AfterListViewsEvent(prefix, namespace)); + return resp; } @Override @@ -223,7 +310,10 @@ public Response loadCredentials( String table, RealmContext realmContext, SecurityContext securityContext) { - return delegate.loadCredentials(prefix, namespace, table, realmContext, securityContext); + polarisEventListener.onBeforeLoadCredentials(new BeforeLoadCredentialsEvent(prefix, namespace, table)); + Response resp = delegate.loadCredentials(prefix, namespace, table, realmContext, securityContext); + polarisEventListener.onAfterLoadCredentials(new AfterLoadCredentialsEvent(prefix, namespace, table)); + return resp; } @Override @@ -233,7 +323,10 @@ public Response loadView( String view, RealmContext realmContext, SecurityContext securityContext) { - return delegate.loadView(prefix, namespace, view, realmContext, securityContext); + polarisEventListener.onBeforeLoadView(new BeforeLoadViewEvent(prefix, namespace, view)); + Response resp = delegate.loadView(prefix, namespace, view, realmContext, securityContext); + polarisEventListener.onAfterLoadView(new AfterLoadViewEvent(prefix, namespace, resp.readEntity(LoadViewResponse.class))); + return resp; } @Override @@ -243,7 +336,10 @@ public Response viewExists( String view, RealmContext realmContext, SecurityContext securityContext) { - return delegate.viewExists(prefix, namespace, view, realmContext, securityContext); + polarisEventListener.onBeforeCheckViewExists(new BeforeCheckViewExistsEvent(prefix, namespace, view)); + Response resp = delegate.viewExists(prefix, namespace, view, realmContext, securityContext); + polarisEventListener.onAfterCheckViewExists(new AfterCheckViewExistsEvent(prefix, namespace, view)); + return resp; } @Override @@ -253,7 +349,10 @@ public Response dropView( String view, RealmContext realmContext, SecurityContext securityContext) { - return delegate.dropView(prefix, namespace, view, realmContext, securityContext); + polarisEventListener.onBeforeDropView(new BeforeDropViewEvent(prefix, namespace, view)); + Response resp = delegate.dropView(prefix, namespace, view, realmContext, securityContext); + polarisEventListener.onAfterDropView(new AfterDropViewEvent(prefix, namespace, view)); + return resp; } @Override @@ -262,7 +361,10 @@ public Response renameView( RenameTableRequest renameTableRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.renameView(prefix, renameTableRequest, realmContext, securityContext); + polarisEventListener.onBeforeRenameView(new BeforeRenameViewEvent(prefix, renameTableRequest)); + Response resp = delegate.renameView(prefix, renameTableRequest, realmContext, securityContext); + polarisEventListener.onAfterRenameView(new AfterRenameViewEvent(prefix, renameTableRequest)); + return resp; } @Override @@ -273,20 +375,27 @@ public Response replaceView( CommitViewRequest commitViewRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.replaceView( - prefix, namespace, view, commitViewRequest, realmContext, securityContext); + polarisEventListener.onBeforeReplaceView(new BeforeReplaceViewEvent(prefix, namespace, view, commitViewRequest)); + Response resp = delegate.replaceView(prefix, namespace, view, commitViewRequest, realmContext, securityContext); + polarisEventListener.onAfterReplaceView(new AfterReplaceViewEvent(prefix, namespace, view, resp.readEntity(LoadViewResponse.class))); + return resp; } + /** + * Table Committed Events are already instrumented at a more granular level than the API itself. + */ @Override public Response commitTransaction( String prefix, CommitTransactionRequest commitTransactionRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.commitTransaction( - prefix, commitTransactionRequest, realmContext, securityContext); + return delegate.commitTransaction(prefix, commitTransactionRequest, realmContext, securityContext); } + /** + * This API is currently a no-op in Polaris. + */ @Override public Response reportMetrics( String prefix, @@ -295,8 +404,8 @@ public Response reportMetrics( ReportMetricsRequest reportMetricsRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.reportMetrics( - prefix, namespace, table, reportMetricsRequest, realmContext, securityContext); + Response resp = delegate.reportMetrics(prefix, namespace, table, reportMetricsRequest, realmContext, securityContext); + return resp; } @Override @@ -307,7 +416,9 @@ public Response sendNotification( NotificationRequest notificationRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.sendNotification( - prefix, namespace, table, notificationRequest, realmContext, securityContext); + polarisEventListener.onBeforeSendNotification(new BeforeSendNotificationEvent(prefix, namespace, table, notificationRequest)); + Response resp = delegate.sendNotification(prefix, namespace, table, notificationRequest, realmContext, securityContext); + polarisEventListener.onAfterSendNotification(new AfterSendNotificationEvent(prefix, namespace, table, resp.readEntity(Boolean.class))); + return resp; } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestConfigurationEventServiceDelegator.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestConfigurationEventServiceDelegator.java index 6e018c9c22..a3bbc47f60 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestConfigurationEventServiceDelegator.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestConfigurationEventServiceDelegator.java @@ -25,8 +25,11 @@ import jakarta.inject.Inject; import jakarta.ws.rs.core.Response; import jakarta.ws.rs.core.SecurityContext; +import org.apache.iceberg.rest.responses.ConfigResponse; import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.service.catalog.api.IcebergRestConfigurationApiService; +import org.apache.polaris.service.events.IcebergRestCatalogEvents; +import org.apache.polaris.service.events.PolarisEventListener; @Decorator @Priority(1000) @@ -34,10 +37,14 @@ public class IcebergRestConfigurationEventServiceDelegator implements IcebergRestConfigurationApiService { @Inject @Delegate IcebergCatalogAdapter delegate; + @Inject PolarisEventListener polarisEventListener; @Override public Response getConfig( String warehouse, RealmContext realmContext, SecurityContext securityContext) { - return delegate.getConfig(warehouse, realmContext, securityContext); + polarisEventListener.onBeforeGetConfig(new IcebergRestCatalogEvents.BeforeGetConfigEvent(warehouse)); + Response resp = delegate.getConfig(warehouse, realmContext, securityContext); + polarisEventListener.onAfterGetConfig(new IcebergRestCatalogEvents.AfterGetConfigEvent(resp.readEntity(ConfigResponse.class))); + return resp; } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java b/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java new file mode 100644 index 0000000000..6b6cf388b4 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events; + +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.rest.requests.CreateNamespaceRequest; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.CreateViewRequest; +import org.apache.iceberg.rest.requests.RegisterTableRequest; +import org.apache.iceberg.rest.requests.RenameTableRequest; +import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; +import org.apache.iceberg.rest.responses.ConfigResponse; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.rest.responses.LoadViewResponse; +import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; +import org.apache.polaris.service.types.CommitTableRequest; +import org.apache.polaris.service.types.CommitViewRequest; +import org.apache.polaris.service.types.NotificationRequest; + +import java.util.Map; + +/** + * Event records for Iceberg REST Catalog operations. + * Each operation has corresponding "Before" and "After" event records. + */ +public class IcebergRestCatalogEvents { + + // Namespace Events + public record BeforeCreateNamespaceEvent(String prefix, CreateNamespaceRequest createNamespaceRequest) {} + public record AfterCreateNamespaceEvent(String prefix, Namespace namespace, Map namespaceProperties) {} + public record BeforeListNamespacesEvent(String prefix, String parent) {} + public record AfterListNamespacesEvent(String prefix, String parent) {} + public record BeforeLoadNamespaceMetadataEvent(String prefix, String namespace) {} + public record AfterLoadNamespaceMetadataEvent(String prefix, Namespace namespace, Map namespaceProperties) {} + public record BeforeCheckNamespaceExistsEvent(String prefix, String namespace) {} + public record AfterCheckNamespaceExistsEvent(String prefix, String namespace) {} + public record BeforeDropNamespaceEvent(String prefix, String namespace) {} + public record AfterDropNamespaceEvent(String prefix, String namespace) {} + public record BeforeUpdateNamespacePropertiesEvent(String prefix, String namespace, UpdateNamespacePropertiesRequest updateNamespacePropertiesRequest) {} + public record AfterUpdateNamespacePropertiesEvent(String prefix, String namespace, UpdateNamespacePropertiesResponse updateNamespacePropertiesResponse) {} + + // Table Events + public record BeforeCreateTableEvent(String prefix, String namespace, CreateTableRequest createTableRequest, String accessDelegationMode) {} + public record AfterCreateTableEvent(String prefix, String namespace, LoadTableResponse loadTableResponse) {} + public record BeforeListTablesEvent(String prefix, String namespace) {} + public record AfterListTablesEvent(String prefix, String namespace) {} + public record BeforeLoadTableEvent(String prefix, String namespace, String table, String accessDelegationMode, String ifNoneMatchString, String snapshots) {} + public record AfterLoadTableEvent(String prefix, String namespace, LoadTableResponse loadTableResponse) {} + public record BeforeCheckTableExistsEvent(String prefix, String namespace, String table) {} + public record AfterCheckTableExistsEvent(String prefix, String namespace, String table) {} + public record BeforeDropTableEvent(String prefix, String namespace, String table, Boolean purgeRequested) {} + public record AfterDropTableEvent(String prefix, String namespace, String table, Boolean purgeRequested) {} + public record BeforeRegisterTableEvent(String prefix, String namespace, RegisterTableRequest registerTableRequest) {} + public record AfterRegisterTableEvent(String prefix, String namespace, LoadTableResponse loadTableResponse) {} + public record BeforeRenameTableEvent(String prefix, RenameTableRequest renameTableRequest) {} + public record AfterRenameTableEvent(String prefix, RenameTableRequest renameTableRequest) {} + public record BeforeUpdateTableEvent(String prefix, String namespace, String sourceTable, CommitTableRequest commitTableRequest) {} + public record AfterUpdateTableEvent(String prefix, String namespace, String sourceTable, LoadTableResponse loadTableResponse) {} + + // View Events + public record BeforeCreateViewEvent(String prefix, String namespace, CreateViewRequest createViewRequest) {} + public record AfterCreateViewEvent(String prefix, String namespace, LoadViewResponse loadViewResponse) {} + public record BeforeListViewsEvent(String prefix, String namespace) {} + public record AfterListViewsEvent(String prefix, String namespace) {} + public record BeforeLoadViewEvent(String prefix, String namespace, String view) {} + public record AfterLoadViewEvent(String prefix, String namespace, LoadViewResponse loadViewResponse) {} + public record BeforeCheckViewExistsEvent(String prefix, String namespace, String view) {} + public record AfterCheckViewExistsEvent(String prefix, String namespace, String view) {} + public record BeforeDropViewEvent(String prefix, String namespace, String view) {} + public record AfterDropViewEvent(String prefix, String namespace, String view) {} + public record BeforeRenameViewEvent(String prefix, RenameTableRequest renameTableRequest) {} + public record AfterRenameViewEvent(String prefix, RenameTableRequest renameTableRequest) {} + public record BeforeReplaceViewEvent(String prefix, String namespace, String sourceView, CommitViewRequest commitViewRequest) {} + public record AfterReplaceViewEvent(String prefix, String namespace, String sourceView, LoadViewResponse loadViewResponse) {} + + // Credential Events + public record BeforeLoadCredentialsEvent(String prefix, String namespace, String table) {} + public record AfterLoadCredentialsEvent(String prefix, String namespace, String table) {} + + // Notification Events + public record BeforeSendNotificationEvent(String prefix, String namespace, String table, NotificationRequest notificationRequest) {} + public record AfterSendNotificationEvent(String prefix, String namespace, String table, boolean result) {} + + // Configuration Events + public record BeforeGetConfigEvent(String warehouse) {} + public record AfterGetConfigEvent(ConfigResponse configResponse) {} +} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java index 485766bb24..c7d31eb4e9 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java @@ -55,4 +55,154 @@ public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) {} /** {@link AfterTaskAttemptedEvent} */ public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {} + + // Iceberg REST Catalog Namespace Events + /** {@link IcebergRestCatalogEvents.BeforeCreateNamespaceEvent} */ + public void onBeforeCreateNamespace(IcebergRestCatalogEvents.BeforeCreateNamespaceEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterCreateNamespaceEvent} */ + public void onAfterCreateNamespace(IcebergRestCatalogEvents.AfterCreateNamespaceEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeListNamespacesEvent} */ + public void onBeforeListNamespaces(IcebergRestCatalogEvents.BeforeListNamespacesEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterListNamespacesEvent} */ + public void onAfterListNamespaces(IcebergRestCatalogEvents.AfterListNamespacesEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeLoadNamespaceMetadataEvent} */ + public void onBeforeLoadNamespaceMetadata(IcebergRestCatalogEvents.BeforeLoadNamespaceMetadataEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterLoadNamespaceMetadataEvent} */ + public void onAfterLoadNamespaceMetadata(IcebergRestCatalogEvents.AfterLoadNamespaceMetadataEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeCheckNamespaceExistsEvent} */ + public void onBeforeCheckNamespaceExists(IcebergRestCatalogEvents.BeforeCheckNamespaceExistsEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterCheckNamespaceExistsEvent} */ + public void onAfterCheckNamespaceExists(IcebergRestCatalogEvents.AfterCheckNamespaceExistsEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeDropNamespaceEvent} */ + public void onBeforeDropNamespace(IcebergRestCatalogEvents.BeforeDropNamespaceEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterDropNamespaceEvent} */ + public void onAfterDropNamespace(IcebergRestCatalogEvents.AfterDropNamespaceEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeUpdateNamespacePropertiesEvent} */ + public void onBeforeUpdateNamespaceProperties(IcebergRestCatalogEvents.BeforeUpdateNamespacePropertiesEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterUpdateNamespacePropertiesEvent} */ + public void onAfterUpdateNamespaceProperties(IcebergRestCatalogEvents.AfterUpdateNamespacePropertiesEvent event) {} + + // Iceberg REST Catalog Table Events + /** {@link IcebergRestCatalogEvents.BeforeCreateTableEvent} */ + public void onBeforeCreateTable(IcebergRestCatalogEvents.BeforeCreateTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterCreateTableEvent} */ + public void onAfterCreateTable(IcebergRestCatalogEvents.AfterCreateTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeListTablesEvent} */ + public void onBeforeListTables(IcebergRestCatalogEvents.BeforeListTablesEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterListTablesEvent} */ + public void onAfterListTables(IcebergRestCatalogEvents.AfterListTablesEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeLoadTableEvent} */ + public void onBeforeLoadTable(IcebergRestCatalogEvents.BeforeLoadTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterLoadTableEvent} */ + public void onAfterLoadTable(IcebergRestCatalogEvents.AfterLoadTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeCheckTableExistsEvent} */ + public void onBeforeCheckTableExists(IcebergRestCatalogEvents.BeforeCheckTableExistsEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterCheckTableExistsEvent} */ + public void onAfterCheckTableExists(IcebergRestCatalogEvents.AfterCheckTableExistsEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeDropTableEvent} */ + public void onBeforeDropTable(IcebergRestCatalogEvents.BeforeDropTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterDropTableEvent} */ + public void onAfterDropTable(IcebergRestCatalogEvents.AfterDropTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeRegisterTableEvent} */ + public void onBeforeRegisterTable(IcebergRestCatalogEvents.BeforeRegisterTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterRegisterTableEvent} */ + public void onAfterRegisterTable(IcebergRestCatalogEvents.AfterRegisterTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeRenameTableEvent} */ + public void onBeforeRenameTable(IcebergRestCatalogEvents.BeforeRenameTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterRenameTableEvent} */ + public void onAfterRenameTable(IcebergRestCatalogEvents.AfterRenameTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeUpdateTableEvent} */ + public void onBeforeUpdateTable(IcebergRestCatalogEvents.BeforeUpdateTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterUpdateTableEvent} */ + public void onAfterUpdateTable(IcebergRestCatalogEvents.AfterUpdateTableEvent event) {} + + // Iceberg REST Catalog View Events + /** {@link IcebergRestCatalogEvents.BeforeCreateViewEvent} */ + public void onBeforeCreateView(IcebergRestCatalogEvents.BeforeCreateViewEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterCreateViewEvent} */ + public void onAfterCreateView(IcebergRestCatalogEvents.AfterCreateViewEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeListViewsEvent} */ + public void onBeforeListViews(IcebergRestCatalogEvents.BeforeListViewsEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterListViewsEvent} */ + public void onAfterListViews(IcebergRestCatalogEvents.AfterListViewsEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeLoadViewEvent} */ + public void onBeforeLoadView(IcebergRestCatalogEvents.BeforeLoadViewEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterLoadViewEvent} */ + public void onAfterLoadView(IcebergRestCatalogEvents.AfterLoadViewEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeCheckViewExistsEvent} */ + public void onBeforeCheckViewExists(IcebergRestCatalogEvents.BeforeCheckViewExistsEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterCheckViewExistsEvent} */ + public void onAfterCheckViewExists(IcebergRestCatalogEvents.AfterCheckViewExistsEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeDropViewEvent} */ + public void onBeforeDropView(IcebergRestCatalogEvents.BeforeDropViewEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterDropViewEvent} */ + public void onAfterDropView(IcebergRestCatalogEvents.AfterDropViewEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeRenameViewEvent} */ + public void onBeforeRenameView(IcebergRestCatalogEvents.BeforeRenameViewEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterRenameViewEvent} */ + public void onAfterRenameView(IcebergRestCatalogEvents.AfterRenameViewEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeReplaceViewEvent} */ + public void onBeforeReplaceView(IcebergRestCatalogEvents.BeforeReplaceViewEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterReplaceViewEvent} */ + public void onAfterReplaceView(IcebergRestCatalogEvents.AfterReplaceViewEvent event) {} + + // Iceberg REST Catalog Credential Events + /** {@link IcebergRestCatalogEvents.BeforeLoadCredentialsEvent} */ + public void onBeforeLoadCredentials(IcebergRestCatalogEvents.BeforeLoadCredentialsEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterLoadCredentialsEvent} */ + public void onAfterLoadCredentials(IcebergRestCatalogEvents.AfterLoadCredentialsEvent event) {} + + // Iceberg REST Catalog Notification Events + /** {@link IcebergRestCatalogEvents.BeforeSendNotificationEvent} */ + public void onBeforeSendNotification(IcebergRestCatalogEvents.BeforeSendNotificationEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterSendNotificationEvent} */ + public void onAfterSendNotification(IcebergRestCatalogEvents.AfterSendNotificationEvent event) {} + + // Iceberg REST Catalog Configuration Events + /** {@link IcebergRestCatalogEvents.BeforeGetConfigEvent} */ + public void onBeforeGetConfig(IcebergRestCatalogEvents.BeforeGetConfigEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterGetConfigEvent} */ + public void onAfterGetConfig(IcebergRestCatalogEvents.AfterGetConfigEvent event) {} } From 93d22ff85c4ea17eeabcec26ab3544312be349d8 Mon Sep 17 00:00:00 2001 From: adnanhemani Date: Sat, 30 Aug 2025 23:12:23 -0700 Subject: [PATCH 2/8] spotlessapply --- ...ebergRestCatalogEventServiceDelegator.java | 185 +++++++++++++----- ...estConfigurationEventServiceDelegator.java | 6 +- .../events/IcebergRestCatalogEvents.java | 127 +++++++++--- .../service/events/PolarisEventListener.java | 24 ++- 4 files changed, 253 insertions(+), 89 deletions(-) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java index b64dfdd03a..95d1df7a55 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java @@ -104,10 +104,15 @@ public Response createNamespace( CreateNamespaceRequest createNamespaceRequest, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeCreateNamespace(new BeforeCreateNamespaceEvent(prefix, createNamespaceRequest)); - Response resp = delegate.createNamespace(prefix, createNamespaceRequest, realmContext, securityContext); - CreateNamespaceResponse createNamespaceResponse = resp.readEntity(CreateNamespaceResponse.class); - polarisEventListener.onAfterCreateNamespace(new AfterCreateNamespaceEvent(prefix, createNamespaceResponse.namespace(), createNamespaceResponse.properties())); + polarisEventListener.onBeforeCreateNamespace( + new BeforeCreateNamespaceEvent(prefix, createNamespaceRequest)); + Response resp = + delegate.createNamespace(prefix, createNamespaceRequest, realmContext, securityContext); + CreateNamespaceResponse createNamespaceResponse = + resp.readEntity(CreateNamespaceResponse.class); + polarisEventListener.onAfterCreateNamespace( + new AfterCreateNamespaceEvent( + prefix, createNamespaceResponse.namespace(), createNamespaceResponse.properties())); return resp; } @@ -120,7 +125,8 @@ public Response listNamespaces( RealmContext realmContext, SecurityContext securityContext) { polarisEventListener.onBeforeListNamespaces(new BeforeListNamespacesEvent(prefix, parent)); - Response resp = delegate.listNamespaces(prefix, pageToken, pageSize, parent, realmContext, securityContext); + Response resp = + delegate.listNamespaces(prefix, pageToken, pageSize, parent, realmContext, securityContext); polarisEventListener.onAfterListNamespaces(new AfterListNamespacesEvent(prefix, parent)); return resp; } @@ -128,19 +134,25 @@ public Response listNamespaces( @Override public Response loadNamespaceMetadata( String prefix, String namespace, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeLoadNamespaceMetadata(new BeforeLoadNamespaceMetadataEvent(prefix, namespace)); - Response resp = delegate.loadNamespaceMetadata(prefix, namespace, realmContext, securityContext); + polarisEventListener.onBeforeLoadNamespaceMetadata( + new BeforeLoadNamespaceMetadataEvent(prefix, namespace)); + Response resp = + delegate.loadNamespaceMetadata(prefix, namespace, realmContext, securityContext); GetNamespaceResponse getNamespaceResponse = resp.readEntity(GetNamespaceResponse.class); - polarisEventListener.onAfterLoadNamespaceMetadata(new AfterLoadNamespaceMetadataEvent(prefix, getNamespaceResponse.namespace(), getNamespaceResponse.properties())); + polarisEventListener.onAfterLoadNamespaceMetadata( + new AfterLoadNamespaceMetadataEvent( + prefix, getNamespaceResponse.namespace(), getNamespaceResponse.properties())); return resp; } @Override public Response namespaceExists( String prefix, String namespace, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeCheckNamespaceExists(new BeforeCheckNamespaceExistsEvent(prefix, namespace)); + polarisEventListener.onBeforeCheckNamespaceExists( + new BeforeCheckNamespaceExistsEvent(prefix, namespace)); Response resp = delegate.namespaceExists(prefix, namespace, realmContext, securityContext); - polarisEventListener.onAfterCheckNamespaceExists(new AfterCheckNamespaceExistsEvent(prefix, namespace)); + polarisEventListener.onAfterCheckNamespaceExists( + new AfterCheckNamespaceExistsEvent(prefix, namespace)); return resp; } @@ -160,9 +172,15 @@ public Response updateProperties( UpdateNamespacePropertiesRequest updateNamespacePropertiesRequest, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeUpdateNamespaceProperties(new BeforeUpdateNamespacePropertiesEvent(prefix, namespace, updateNamespacePropertiesRequest)); - Response resp = delegate.updateProperties(prefix, namespace, updateNamespacePropertiesRequest, realmContext, securityContext); - polarisEventListener.onAfterUpdateNamespaceProperties(new AfterUpdateNamespacePropertiesEvent(prefix, namespace, resp.readEntity(UpdateNamespacePropertiesResponse.class))); + polarisEventListener.onBeforeUpdateNamespaceProperties( + new BeforeUpdateNamespacePropertiesEvent( + prefix, namespace, updateNamespacePropertiesRequest)); + Response resp = + delegate.updateProperties( + prefix, namespace, updateNamespacePropertiesRequest, realmContext, securityContext); + polarisEventListener.onAfterUpdateNamespaceProperties( + new AfterUpdateNamespacePropertiesEvent( + prefix, namespace, resp.readEntity(UpdateNamespacePropertiesResponse.class))); return resp; } @@ -174,9 +192,22 @@ public Response createTable( String accessDelegationMode, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeCreateTable(new BeforeCreateTableEvent(prefix, namespace, createTableRequest, accessDelegationMode)); - Response resp = delegate.createTable(prefix, namespace, createTableRequest, accessDelegationMode, realmContext, securityContext); - polarisEventListener.onAfterCreateTable(new AfterCreateTableEvent(prefix, namespace, resp.readEntity(LoadTableResponse.class))); + if (!createTableRequest.stageCreate()) { + polarisEventListener.onBeforeCreateTable( + new BeforeCreateTableEvent(prefix, namespace, createTableRequest, accessDelegationMode)); + } + Response resp = + delegate.createTable( + prefix, + namespace, + createTableRequest, + accessDelegationMode, + realmContext, + securityContext); + if (!createTableRequest.stageCreate()) { + polarisEventListener.onAfterCreateTable( + new AfterCreateTableEvent(prefix, namespace, resp.readEntity(LoadTableResponse.class))); + } return resp; } @@ -189,7 +220,8 @@ public Response listTables( RealmContext realmContext, SecurityContext securityContext) { polarisEventListener.onBeforeListTables(new BeforeListTablesEvent(prefix, namespace)); - Response resp = delegate.listTables(prefix, namespace, pageToken, pageSize, realmContext, securityContext); + Response resp = + delegate.listTables(prefix, namespace, pageToken, pageSize, realmContext, securityContext); polarisEventListener.onAfterListTables(new AfterListTablesEvent(prefix, namespace)); return resp; } @@ -204,9 +236,21 @@ public Response loadTable( String snapshots, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeLoadTable(new BeforeLoadTableEvent(prefix, namespace, table, accessDelegationMode, ifNoneMatchString, snapshots)); - Response resp = delegate.loadTable(prefix, namespace, table, accessDelegationMode, ifNoneMatchString, snapshots, realmContext, securityContext); - polarisEventListener.onAfterLoadTable(new AfterLoadTableEvent(prefix, namespace, resp.readEntity(LoadTableResponse.class))); + polarisEventListener.onBeforeLoadTable( + new BeforeLoadTableEvent( + prefix, namespace, table, accessDelegationMode, ifNoneMatchString, snapshots)); + Response resp = + delegate.loadTable( + prefix, + namespace, + table, + accessDelegationMode, + ifNoneMatchString, + snapshots, + realmContext, + securityContext); + polarisEventListener.onAfterLoadTable( + new AfterLoadTableEvent(prefix, namespace, resp.readEntity(LoadTableResponse.class))); return resp; } @@ -217,9 +261,11 @@ public Response tableExists( String table, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeCheckTableExists(new BeforeCheckTableExistsEvent(prefix, namespace, table)); + polarisEventListener.onBeforeCheckTableExists( + new BeforeCheckTableExistsEvent(prefix, namespace, table)); Response resp = delegate.tableExists(prefix, namespace, table, realmContext, securityContext); - polarisEventListener.onAfterCheckTableExists(new AfterCheckTableExistsEvent(prefix, namespace, table)); + polarisEventListener.onAfterCheckTableExists( + new AfterCheckTableExistsEvent(prefix, namespace, table)); return resp; } @@ -231,9 +277,12 @@ public Response dropTable( Boolean purgeRequested, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeDropTable(new BeforeDropTableEvent(prefix, namespace, table, purgeRequested)); - Response resp = delegate.dropTable(prefix, namespace, table, purgeRequested, realmContext, securityContext); - polarisEventListener.onAfterDropTable(new AfterDropTableEvent(prefix, namespace, table, purgeRequested)); + polarisEventListener.onBeforeDropTable( + new BeforeDropTableEvent(prefix, namespace, table, purgeRequested)); + Response resp = + delegate.dropTable(prefix, namespace, table, purgeRequested, realmContext, securityContext); + polarisEventListener.onAfterDropTable( + new AfterDropTableEvent(prefix, namespace, table, purgeRequested)); return resp; } @@ -244,9 +293,13 @@ public Response registerTable( RegisterTableRequest registerTableRequest, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeRegisterTable(new BeforeRegisterTableEvent(prefix, namespace, registerTableRequest)); - Response resp = delegate.registerTable(prefix, namespace, registerTableRequest, realmContext, securityContext); - polarisEventListener.onAfterRegisterTable(new AfterRegisterTableEvent(prefix, namespace, resp.readEntity(LoadTableResponse.class))); + polarisEventListener.onBeforeRegisterTable( + new BeforeRegisterTableEvent(prefix, namespace, registerTableRequest)); + Response resp = + delegate.registerTable( + prefix, namespace, registerTableRequest, realmContext, securityContext); + polarisEventListener.onAfterRegisterTable( + new AfterRegisterTableEvent(prefix, namespace, resp.readEntity(LoadTableResponse.class))); return resp; } @@ -256,7 +309,8 @@ public Response renameTable( RenameTableRequest renameTableRequest, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeRenameTable(new BeforeRenameTableEvent(prefix, renameTableRequest)); + polarisEventListener.onBeforeRenameTable( + new BeforeRenameTableEvent(prefix, renameTableRequest)); Response resp = delegate.renameTable(prefix, renameTableRequest, realmContext, securityContext); polarisEventListener.onAfterRenameTable(new AfterRenameTableEvent(prefix, renameTableRequest)); return resp; @@ -270,9 +324,14 @@ public Response updateTable( CommitTableRequest commitTableRequest, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeUpdateTable(new BeforeUpdateTableEvent(prefix, namespace, table, commitTableRequest)); - Response resp = delegate.updateTable(prefix, namespace, table, commitTableRequest, realmContext, securityContext); - polarisEventListener.onAfterUpdateTable(new AfterUpdateTableEvent(prefix, namespace, table, resp.readEntity(LoadTableResponse.class))); + polarisEventListener.onBeforeUpdateTable( + new BeforeUpdateTableEvent(prefix, namespace, table, commitTableRequest)); + Response resp = + delegate.updateTable( + prefix, namespace, table, commitTableRequest, realmContext, securityContext); + polarisEventListener.onAfterUpdateTable( + new AfterUpdateTableEvent( + prefix, namespace, table, resp.readEntity(LoadTableResponse.class))); return resp; } @@ -283,9 +342,12 @@ public Response createView( CreateViewRequest createViewRequest, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeCreateView(new BeforeCreateViewEvent(prefix, namespace, createViewRequest)); - Response resp = delegate.createView(prefix, namespace, createViewRequest, realmContext, securityContext); - polarisEventListener.onAfterCreateView(new AfterCreateViewEvent(prefix, namespace, resp.readEntity(LoadViewResponse.class))); + polarisEventListener.onBeforeCreateView( + new BeforeCreateViewEvent(prefix, namespace, createViewRequest)); + Response resp = + delegate.createView(prefix, namespace, createViewRequest, realmContext, securityContext); + polarisEventListener.onAfterCreateView( + new AfterCreateViewEvent(prefix, namespace, resp.readEntity(LoadViewResponse.class))); return resp; } @@ -298,7 +360,8 @@ public Response listViews( RealmContext realmContext, SecurityContext securityContext) { polarisEventListener.onBeforeListViews(new BeforeListViewsEvent(prefix, namespace)); - Response resp = delegate.listViews(prefix, namespace, pageToken, pageSize, realmContext, securityContext); + Response resp = + delegate.listViews(prefix, namespace, pageToken, pageSize, realmContext, securityContext); polarisEventListener.onAfterListViews(new AfterListViewsEvent(prefix, namespace)); return resp; } @@ -310,9 +373,12 @@ public Response loadCredentials( String table, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeLoadCredentials(new BeforeLoadCredentialsEvent(prefix, namespace, table)); - Response resp = delegate.loadCredentials(prefix, namespace, table, realmContext, securityContext); - polarisEventListener.onAfterLoadCredentials(new AfterLoadCredentialsEvent(prefix, namespace, table)); + polarisEventListener.onBeforeLoadCredentials( + new BeforeLoadCredentialsEvent(prefix, namespace, table)); + Response resp = + delegate.loadCredentials(prefix, namespace, table, realmContext, securityContext); + polarisEventListener.onAfterLoadCredentials( + new AfterLoadCredentialsEvent(prefix, namespace, table)); return resp; } @@ -325,7 +391,8 @@ public Response loadView( SecurityContext securityContext) { polarisEventListener.onBeforeLoadView(new BeforeLoadViewEvent(prefix, namespace, view)); Response resp = delegate.loadView(prefix, namespace, view, realmContext, securityContext); - polarisEventListener.onAfterLoadView(new AfterLoadViewEvent(prefix, namespace, resp.readEntity(LoadViewResponse.class))); + polarisEventListener.onAfterLoadView( + new AfterLoadViewEvent(prefix, namespace, resp.readEntity(LoadViewResponse.class))); return resp; } @@ -336,9 +403,11 @@ public Response viewExists( String view, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeCheckViewExists(new BeforeCheckViewExistsEvent(prefix, namespace, view)); + polarisEventListener.onBeforeCheckViewExists( + new BeforeCheckViewExistsEvent(prefix, namespace, view)); Response resp = delegate.viewExists(prefix, namespace, view, realmContext, securityContext); - polarisEventListener.onAfterCheckViewExists(new AfterCheckViewExistsEvent(prefix, namespace, view)); + polarisEventListener.onAfterCheckViewExists( + new AfterCheckViewExistsEvent(prefix, namespace, view)); return resp; } @@ -375,9 +444,14 @@ public Response replaceView( CommitViewRequest commitViewRequest, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeReplaceView(new BeforeReplaceViewEvent(prefix, namespace, view, commitViewRequest)); - Response resp = delegate.replaceView(prefix, namespace, view, commitViewRequest, realmContext, securityContext); - polarisEventListener.onAfterReplaceView(new AfterReplaceViewEvent(prefix, namespace, view, resp.readEntity(LoadViewResponse.class))); + polarisEventListener.onBeforeReplaceView( + new BeforeReplaceViewEvent(prefix, namespace, view, commitViewRequest)); + Response resp = + delegate.replaceView( + prefix, namespace, view, commitViewRequest, realmContext, securityContext); + polarisEventListener.onAfterReplaceView( + new AfterReplaceViewEvent( + prefix, namespace, view, resp.readEntity(LoadViewResponse.class))); return resp; } @@ -390,12 +464,11 @@ public Response commitTransaction( CommitTransactionRequest commitTransactionRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.commitTransaction(prefix, commitTransactionRequest, realmContext, securityContext); + return delegate.commitTransaction( + prefix, commitTransactionRequest, realmContext, securityContext); } - /** - * This API is currently a no-op in Polaris. - */ + /** This API is currently a no-op in Polaris. */ @Override public Response reportMetrics( String prefix, @@ -404,7 +477,9 @@ public Response reportMetrics( ReportMetricsRequest reportMetricsRequest, RealmContext realmContext, SecurityContext securityContext) { - Response resp = delegate.reportMetrics(prefix, namespace, table, reportMetricsRequest, realmContext, securityContext); + Response resp = + delegate.reportMetrics( + prefix, namespace, table, reportMetricsRequest, realmContext, securityContext); return resp; } @@ -416,9 +491,13 @@ public Response sendNotification( NotificationRequest notificationRequest, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeSendNotification(new BeforeSendNotificationEvent(prefix, namespace, table, notificationRequest)); - Response resp = delegate.sendNotification(prefix, namespace, table, notificationRequest, realmContext, securityContext); - polarisEventListener.onAfterSendNotification(new AfterSendNotificationEvent(prefix, namespace, table, resp.readEntity(Boolean.class))); + polarisEventListener.onBeforeSendNotification( + new BeforeSendNotificationEvent(prefix, namespace, table, notificationRequest)); + Response resp = + delegate.sendNotification( + prefix, namespace, table, notificationRequest, realmContext, securityContext); + polarisEventListener.onAfterSendNotification( + new AfterSendNotificationEvent(prefix, namespace, table, resp.readEntity(Boolean.class))); return resp; } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestConfigurationEventServiceDelegator.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestConfigurationEventServiceDelegator.java index a3bbc47f60..0f17fe6a19 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestConfigurationEventServiceDelegator.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestConfigurationEventServiceDelegator.java @@ -42,9 +42,11 @@ public class IcebergRestConfigurationEventServiceDelegator @Override public Response getConfig( String warehouse, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeGetConfig(new IcebergRestCatalogEvents.BeforeGetConfigEvent(warehouse)); + polarisEventListener.onBeforeGetConfig( + new IcebergRestCatalogEvents.BeforeGetConfigEvent(warehouse)); Response resp = delegate.getConfig(warehouse, realmContext, securityContext); - polarisEventListener.onAfterGetConfig(new IcebergRestCatalogEvents.AfterGetConfigEvent(resp.readEntity(ConfigResponse.class))); + polarisEventListener.onAfterGetConfig( + new IcebergRestCatalogEvents.AfterGetConfigEvent(resp.readEntity(ConfigResponse.class))); return resp; } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java b/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java index 6b6cf388b4..cd775cf50d 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java @@ -19,6 +19,7 @@ package org.apache.polaris.service.events; +import java.util.Map; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; import org.apache.iceberg.rest.requests.CreateTableRequest; @@ -34,71 +35,145 @@ import org.apache.polaris.service.types.CommitViewRequest; import org.apache.polaris.service.types.NotificationRequest; -import java.util.Map; - /** - * Event records for Iceberg REST Catalog operations. - * Each operation has corresponding "Before" and "After" event records. + * Event records for Iceberg REST Catalog operations. Each operation has corresponding "Before" and + * "After" event records. */ public class IcebergRestCatalogEvents { // Namespace Events - public record BeforeCreateNamespaceEvent(String prefix, CreateNamespaceRequest createNamespaceRequest) {} - public record AfterCreateNamespaceEvent(String prefix, Namespace namespace, Map namespaceProperties) {} + public record BeforeCreateNamespaceEvent( + String prefix, CreateNamespaceRequest createNamespaceRequest) {} + + public record AfterCreateNamespaceEvent( + String prefix, Namespace namespace, Map namespaceProperties) {} + public record BeforeListNamespacesEvent(String prefix, String parent) {} + public record AfterListNamespacesEvent(String prefix, String parent) {} + public record BeforeLoadNamespaceMetadataEvent(String prefix, String namespace) {} - public record AfterLoadNamespaceMetadataEvent(String prefix, Namespace namespace, Map namespaceProperties) {} + + public record AfterLoadNamespaceMetadataEvent( + String prefix, Namespace namespace, Map namespaceProperties) {} + public record BeforeCheckNamespaceExistsEvent(String prefix, String namespace) {} + public record AfterCheckNamespaceExistsEvent(String prefix, String namespace) {} + public record BeforeDropNamespaceEvent(String prefix, String namespace) {} + public record AfterDropNamespaceEvent(String prefix, String namespace) {} - public record BeforeUpdateNamespacePropertiesEvent(String prefix, String namespace, UpdateNamespacePropertiesRequest updateNamespacePropertiesRequest) {} - public record AfterUpdateNamespacePropertiesEvent(String prefix, String namespace, UpdateNamespacePropertiesResponse updateNamespacePropertiesResponse) {} + + public record BeforeUpdateNamespacePropertiesEvent( + String prefix, + String namespace, + UpdateNamespacePropertiesRequest updateNamespacePropertiesRequest) {} + + public record AfterUpdateNamespacePropertiesEvent( + String prefix, + String namespace, + UpdateNamespacePropertiesResponse updateNamespacePropertiesResponse) {} // Table Events - public record BeforeCreateTableEvent(String prefix, String namespace, CreateTableRequest createTableRequest, String accessDelegationMode) {} - public record AfterCreateTableEvent(String prefix, String namespace, LoadTableResponse loadTableResponse) {} + public record BeforeCreateTableEvent( + String prefix, + String namespace, + CreateTableRequest createTableRequest, + String accessDelegationMode) {} + + public record AfterCreateTableEvent( + String prefix, String namespace, LoadTableResponse loadTableResponse) {} + public record BeforeListTablesEvent(String prefix, String namespace) {} + public record AfterListTablesEvent(String prefix, String namespace) {} - public record BeforeLoadTableEvent(String prefix, String namespace, String table, String accessDelegationMode, String ifNoneMatchString, String snapshots) {} - public record AfterLoadTableEvent(String prefix, String namespace, LoadTableResponse loadTableResponse) {} + + public record BeforeLoadTableEvent( + String prefix, + String namespace, + String table, + String accessDelegationMode, + String ifNoneMatchString, + String snapshots) {} + + public record AfterLoadTableEvent( + String prefix, String namespace, LoadTableResponse loadTableResponse) {} + public record BeforeCheckTableExistsEvent(String prefix, String namespace, String table) {} + public record AfterCheckTableExistsEvent(String prefix, String namespace, String table) {} - public record BeforeDropTableEvent(String prefix, String namespace, String table, Boolean purgeRequested) {} - public record AfterDropTableEvent(String prefix, String namespace, String table, Boolean purgeRequested) {} - public record BeforeRegisterTableEvent(String prefix, String namespace, RegisterTableRequest registerTableRequest) {} - public record AfterRegisterTableEvent(String prefix, String namespace, LoadTableResponse loadTableResponse) {} + + public record BeforeDropTableEvent( + String prefix, String namespace, String table, Boolean purgeRequested) {} + + public record AfterDropTableEvent( + String prefix, String namespace, String table, Boolean purgeRequested) {} + + public record BeforeRegisterTableEvent( + String prefix, String namespace, RegisterTableRequest registerTableRequest) {} + + public record AfterRegisterTableEvent( + String prefix, String namespace, LoadTableResponse loadTableResponse) {} + public record BeforeRenameTableEvent(String prefix, RenameTableRequest renameTableRequest) {} + public record AfterRenameTableEvent(String prefix, RenameTableRequest renameTableRequest) {} - public record BeforeUpdateTableEvent(String prefix, String namespace, String sourceTable, CommitTableRequest commitTableRequest) {} - public record AfterUpdateTableEvent(String prefix, String namespace, String sourceTable, LoadTableResponse loadTableResponse) {} + + public record BeforeUpdateTableEvent( + String prefix, String namespace, String sourceTable, CommitTableRequest commitTableRequest) {} + + public record AfterUpdateTableEvent( + String prefix, String namespace, String sourceTable, LoadTableResponse loadTableResponse) {} // View Events - public record BeforeCreateViewEvent(String prefix, String namespace, CreateViewRequest createViewRequest) {} - public record AfterCreateViewEvent(String prefix, String namespace, LoadViewResponse loadViewResponse) {} + public record BeforeCreateViewEvent( + String prefix, String namespace, CreateViewRequest createViewRequest) {} + + public record AfterCreateViewEvent( + String prefix, String namespace, LoadViewResponse loadViewResponse) {} + public record BeforeListViewsEvent(String prefix, String namespace) {} + public record AfterListViewsEvent(String prefix, String namespace) {} + public record BeforeLoadViewEvent(String prefix, String namespace, String view) {} - public record AfterLoadViewEvent(String prefix, String namespace, LoadViewResponse loadViewResponse) {} + + public record AfterLoadViewEvent( + String prefix, String namespace, LoadViewResponse loadViewResponse) {} + public record BeforeCheckViewExistsEvent(String prefix, String namespace, String view) {} + public record AfterCheckViewExistsEvent(String prefix, String namespace, String view) {} + public record BeforeDropViewEvent(String prefix, String namespace, String view) {} + public record AfterDropViewEvent(String prefix, String namespace, String view) {} + public record BeforeRenameViewEvent(String prefix, RenameTableRequest renameTableRequest) {} + public record AfterRenameViewEvent(String prefix, RenameTableRequest renameTableRequest) {} - public record BeforeReplaceViewEvent(String prefix, String namespace, String sourceView, CommitViewRequest commitViewRequest) {} - public record AfterReplaceViewEvent(String prefix, String namespace, String sourceView, LoadViewResponse loadViewResponse) {} + + public record BeforeReplaceViewEvent( + String prefix, String namespace, String sourceView, CommitViewRequest commitViewRequest) {} + + public record AfterReplaceViewEvent( + String prefix, String namespace, String sourceView, LoadViewResponse loadViewResponse) {} // Credential Events public record BeforeLoadCredentialsEvent(String prefix, String namespace, String table) {} + public record AfterLoadCredentialsEvent(String prefix, String namespace, String table) {} // Notification Events - public record BeforeSendNotificationEvent(String prefix, String namespace, String table, NotificationRequest notificationRequest) {} - public record AfterSendNotificationEvent(String prefix, String namespace, String table, boolean result) {} + public record BeforeSendNotificationEvent( + String prefix, String namespace, String table, NotificationRequest notificationRequest) {} + + public record AfterSendNotificationEvent( + String prefix, String namespace, String table, boolean result) {} // Configuration Events public record BeforeGetConfigEvent(String warehouse) {} + public record AfterGetConfigEvent(ConfigResponse configResponse) {} } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java index c7d31eb4e9..14306712b0 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java @@ -70,16 +70,20 @@ public void onBeforeListNamespaces(IcebergRestCatalogEvents.BeforeListNamespaces public void onAfterListNamespaces(IcebergRestCatalogEvents.AfterListNamespacesEvent event) {} /** {@link IcebergRestCatalogEvents.BeforeLoadNamespaceMetadataEvent} */ - public void onBeforeLoadNamespaceMetadata(IcebergRestCatalogEvents.BeforeLoadNamespaceMetadataEvent event) {} + public void onBeforeLoadNamespaceMetadata( + IcebergRestCatalogEvents.BeforeLoadNamespaceMetadataEvent event) {} /** {@link IcebergRestCatalogEvents.AfterLoadNamespaceMetadataEvent} */ - public void onAfterLoadNamespaceMetadata(IcebergRestCatalogEvents.AfterLoadNamespaceMetadataEvent event) {} + public void onAfterLoadNamespaceMetadata( + IcebergRestCatalogEvents.AfterLoadNamespaceMetadataEvent event) {} /** {@link IcebergRestCatalogEvents.BeforeCheckNamespaceExistsEvent} */ - public void onBeforeCheckNamespaceExists(IcebergRestCatalogEvents.BeforeCheckNamespaceExistsEvent event) {} + public void onBeforeCheckNamespaceExists( + IcebergRestCatalogEvents.BeforeCheckNamespaceExistsEvent event) {} /** {@link IcebergRestCatalogEvents.AfterCheckNamespaceExistsEvent} */ - public void onAfterCheckNamespaceExists(IcebergRestCatalogEvents.AfterCheckNamespaceExistsEvent event) {} + public void onAfterCheckNamespaceExists( + IcebergRestCatalogEvents.AfterCheckNamespaceExistsEvent event) {} /** {@link IcebergRestCatalogEvents.BeforeDropNamespaceEvent} */ public void onBeforeDropNamespace(IcebergRestCatalogEvents.BeforeDropNamespaceEvent event) {} @@ -88,10 +92,12 @@ public void onBeforeDropNamespace(IcebergRestCatalogEvents.BeforeDropNamespaceEv public void onAfterDropNamespace(IcebergRestCatalogEvents.AfterDropNamespaceEvent event) {} /** {@link IcebergRestCatalogEvents.BeforeUpdateNamespacePropertiesEvent} */ - public void onBeforeUpdateNamespaceProperties(IcebergRestCatalogEvents.BeforeUpdateNamespacePropertiesEvent event) {} + public void onBeforeUpdateNamespaceProperties( + IcebergRestCatalogEvents.BeforeUpdateNamespacePropertiesEvent event) {} /** {@link IcebergRestCatalogEvents.AfterUpdateNamespacePropertiesEvent} */ - public void onAfterUpdateNamespaceProperties(IcebergRestCatalogEvents.AfterUpdateNamespacePropertiesEvent event) {} + public void onAfterUpdateNamespaceProperties( + IcebergRestCatalogEvents.AfterUpdateNamespacePropertiesEvent event) {} // Iceberg REST Catalog Table Events /** {@link IcebergRestCatalogEvents.BeforeCreateTableEvent} */ @@ -113,7 +119,8 @@ public void onBeforeLoadTable(IcebergRestCatalogEvents.BeforeLoadTableEvent even public void onAfterLoadTable(IcebergRestCatalogEvents.AfterLoadTableEvent event) {} /** {@link IcebergRestCatalogEvents.BeforeCheckTableExistsEvent} */ - public void onBeforeCheckTableExists(IcebergRestCatalogEvents.BeforeCheckTableExistsEvent event) {} + public void onBeforeCheckTableExists( + IcebergRestCatalogEvents.BeforeCheckTableExistsEvent event) {} /** {@link IcebergRestCatalogEvents.AfterCheckTableExistsEvent} */ public void onAfterCheckTableExists(IcebergRestCatalogEvents.AfterCheckTableExistsEvent event) {} @@ -194,7 +201,8 @@ public void onAfterLoadCredentials(IcebergRestCatalogEvents.AfterLoadCredentials // Iceberg REST Catalog Notification Events /** {@link IcebergRestCatalogEvents.BeforeSendNotificationEvent} */ - public void onBeforeSendNotification(IcebergRestCatalogEvents.BeforeSendNotificationEvent event) {} + public void onBeforeSendNotification( + IcebergRestCatalogEvents.BeforeSendNotificationEvent event) {} /** {@link IcebergRestCatalogEvents.AfterSendNotificationEvent} */ public void onAfterSendNotification(IcebergRestCatalogEvents.AfterSendNotificationEvent event) {} From 83f35ee3761e40cdd287d834a2380607f777e443 Mon Sep 17 00:00:00 2001 From: adnanhemani Date: Tue, 2 Sep 2025 22:30:33 -0700 Subject: [PATCH 3/8] spotlessapply --- .../events/AfterTableCommitedEvent.java | 34 ------------------ .../events/AfterTableRefreshedEvent.java | 28 --------------- .../events/AfterTaskAttemptedEvent.java | 34 ------------------ .../events/AfterViewCommitedEvent.java | 33 ----------------- .../events/AfterViewRefreshedEvent.java | 28 --------------- .../events/BeforeRequestRateLimitedEvent.java | 28 --------------- .../events/BeforeTableCommitedEvent.java | 35 ------------------- .../events/BeforeTableRefreshedEvent.java | 29 --------------- .../events/BeforeTaskAttemptedEvent.java | 32 ----------------- .../events/BeforeViewCommitedEvent.java | 34 ------------------ .../events/BeforeViewRefreshedEvent.java | 29 --------------- 11 files changed, 344 deletions(-) delete mode 100644 runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java delete mode 100644 runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java delete mode 100644 runtime/service/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java delete mode 100644 runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java delete mode 100644 runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java delete mode 100644 runtime/service/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java delete mode 100644 runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java delete mode 100644 runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java delete mode 100644 runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java delete mode 100644 runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java delete mode 100644 runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java deleted file mode 100644 index c952997df1..0000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.events; - -import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.catalog.TableIdentifier; - -/** - * Emitted after Polaris performs a commit to a table. This is not emitted if there's an exception - * while committing. - * - * @param identifier The identifier. - * @param base The old metadata. - * @param metadata The new metadata. - */ -public record AfterTableCommitedEvent( - TableIdentifier identifier, TableMetadata base, TableMetadata metadata) - implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java deleted file mode 100644 index be38a8baaa..0000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.events; - -import org.apache.iceberg.catalog.TableIdentifier; - -/** - * Emitted after Polaris refreshes its known version of a table's metadata by fetching the latest. - * - * @param tableIdentifier The identifier of the table that was refreshed. - */ -public record AfterTableRefreshedEvent(TableIdentifier tableIdentifier) implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java deleted file mode 100644 index 638ba84fbc..0000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.events; - -import org.apache.polaris.core.context.CallContext; - -/** - * Emitted after an attempt of an async task, such as manifest file cleanup, finishes. - * - * @param taskEntityId The ID of the TaskEntity. - * @param callContext The CallContext the task is being executed under. - * @param attempt The attempt number. Each retry of the task will have its own attempt number. The - * initial (non-retried) attempt starts counting from 1. - * @param success Whether or not the attempt succeeded. - */ -public record AfterTaskAttemptedEvent( - long taskEntityId, CallContext callContext, int attempt, boolean success) - implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java deleted file mode 100644 index eb2ca24149..0000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.events; - -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.view.ViewMetadata; - -/** - * Emitted after Polaris performs a commit to a view. This is not emitted if there's an exception - * while committing. - * - * @param identifier The identifier. - * @param base The old metadata. - * @param metadata The new metadata. - */ -public record AfterViewCommitedEvent( - TableIdentifier identifier, ViewMetadata base, ViewMetadata metadata) implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java deleted file mode 100644 index 249220ddd7..0000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.events; - -import org.apache.iceberg.catalog.TableIdentifier; - -/** - * Emitted after Polaris refreshes its known version of a view's metadata by fetching the latest. - * - * @param viewIdentifier The identifier of the view that was refreshed. - */ -public record AfterViewRefreshedEvent(TableIdentifier viewIdentifier) implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java deleted file mode 100644 index 1d9780ebe7..0000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.events; - -/** - * Emitted before the RateLimiterFilter rejects a request due to exceeding the rate limit. - * - * @param method The request's HTTP method - * @param absolutePath The request's absolute path - */ -public record BeforeRequestRateLimitedEvent(String method, String absolutePath) - implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java deleted file mode 100644 index 2bcc49ab67..0000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.events; - -import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.catalog.TableIdentifier; - -/** - * Emitted when Polaris intends to perform a commit to a table. There is no guarantee on the order - * of this event relative to the validation checks we've performed, which means the commit may still - * fail Polaris-side validation checks. - * - * @param identifier The identifier. - * @param base The old metadata. - * @param metadata The new metadata. - */ -public record BeforeTableCommitedEvent( - TableIdentifier identifier, TableMetadata base, TableMetadata metadata) - implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java deleted file mode 100644 index f319298f57..0000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.events; - -import org.apache.iceberg.catalog.TableIdentifier; - -/** - * Emitted when Polaris intends to refresh its known version of a table's metadata by fetching the - * latest. - * - * @param tableIdentifier The identifier of the table being refreshed. - */ -public record BeforeTableRefreshedEvent(TableIdentifier tableIdentifier) implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java deleted file mode 100644 index a7fa7231e7..0000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.events; - -import org.apache.polaris.core.context.CallContext; - -/** - * Emitted before an attempt of an async task, such as manifest file cleanup, begins. - * - * @param taskEntityId The ID of the TaskEntity - * @param callContext The CallContext the task is being executed under. - * @param attempt The attempt number. Each retry of the task will have its own attempt number. The - * initial (non-retried) attempt starts counting from 1. - */ -public record BeforeTaskAttemptedEvent(long taskEntityId, CallContext callContext, int attempt) - implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java deleted file mode 100644 index 16e460d806..0000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.events; - -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.view.ViewMetadata; - -/** - * Emitted when Polaris intends to perform a commit to a view. There is no guarantee on the order of - * this event relative to the validation checks we've performed, which means the commit may still - * fail Polaris-side validation checks. - * - * @param identifier The identifier. - * @param base The old metadata. - * @param metadata The new metadata. - */ -public record BeforeViewCommitedEvent( - TableIdentifier identifier, ViewMetadata base, ViewMetadata metadata) implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java deleted file mode 100644 index 6f58d2ca22..0000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.events; - -import org.apache.iceberg.catalog.TableIdentifier; - -/** - * Emitted when Polaris intends to refresh its known version of a view's metadata by fetching the - * latest. - * - * @param viewIdentifier The identifier of the view being refreshed. - */ -public record BeforeViewRefreshedEvent(TableIdentifier viewIdentifier) implements PolarisEvent {} From 5b19f6fa5cd4cd92506d085816d394152aab0409 Mon Sep 17 00:00:00 2001 From: adnanhemani Date: Tue, 2 Sep 2025 22:31:17 -0700 Subject: [PATCH 4/8] Revision based on review from @adutra --- .../catalog/iceberg/IcebergCatalog.java | 41 ++--- ...ebergRestCatalogEventServiceDelegator.java | 139 +++++++++------- .../service/events/AfterAttemptTaskEvent.java | 34 ++++ .../events/BeforeAttemptTaskEvent.java | 32 ++++ .../events/BeforeRequestRateLimitEvent.java | 28 ++++ .../events/IcebergRestCatalogEvents.java | 148 ++++++++++++------ .../service/events/PolarisEventListener.java | 44 +++--- .../events/TestPolarisEventListener.java | 22 +-- .../ratelimiter/RateLimiterFilter.java | 6 +- .../service/task/TaskExecutorImpl.java | 12 +- .../iceberg/AbstractIcebergCatalogTest.java | 26 +-- .../AbstractIcebergCatalogViewTest.java | 28 ++-- .../ratelimiter/RateLimiterFilterTest.java | 6 +- .../service/task/TaskExecutorImplTest.java | 8 +- 14 files changed, 382 insertions(+), 192 deletions(-) create mode 100644 runtime/service/src/main/java/org/apache/polaris/service/events/AfterAttemptTaskEvent.java create mode 100644 runtime/service/src/main/java/org/apache/polaris/service/events/BeforeAttemptTaskEvent.java create mode 100644 runtime/service/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitEvent.java diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index 5da1eeb123..70fd73236b 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -132,14 +132,7 @@ import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.catalog.io.FileIOUtil; import org.apache.polaris.service.catalog.validation.IcebergPropertiesValidation; -import org.apache.polaris.service.events.AfterTableCommitedEvent; -import org.apache.polaris.service.events.AfterTableRefreshedEvent; -import org.apache.polaris.service.events.AfterViewCommitedEvent; -import org.apache.polaris.service.events.AfterViewRefreshedEvent; -import org.apache.polaris.service.events.BeforeTableCommitedEvent; -import org.apache.polaris.service.events.BeforeTableRefreshedEvent; -import org.apache.polaris.service.events.BeforeViewCommitedEvent; -import org.apache.polaris.service.events.BeforeViewRefreshedEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents; import org.apache.polaris.service.events.PolarisEventListener; import org.apache.polaris.service.task.TaskExecutor; import org.apache.polaris.service.types.NotificationRequest; @@ -1384,7 +1377,8 @@ public void doRefresh() { if (latestLocation == null) { disableRefresh(); } else { - polarisEventListener.onBeforeTableRefreshed(new BeforeTableRefreshedEvent(tableIdentifier)); + polarisEventListener.onBeforeRefreshTable( + new IcebergRestCatalogEvents.BeforeRefreshTableEvent(catalogName, tableIdentifier)); refreshFromMetadataLocation( latestLocation, SHOULD_RETRY_REFRESH_PREDICATE, @@ -1404,13 +1398,15 @@ public void doRefresh() { Set.of(PolarisStorageActions.READ, PolarisStorageActions.LIST)); return TableMetadataParser.read(fileIO, metadataLocation); }); - polarisEventListener.onAfterTableRefreshed(new AfterTableRefreshedEvent(tableIdentifier)); + polarisEventListener.onAfterRefreshTable( + new IcebergRestCatalogEvents.AfterRefreshTableEvent(catalogName, tableIdentifier)); } } public void doCommit(TableMetadata base, TableMetadata metadata) { - polarisEventListener.onBeforeTableCommited( - new BeforeTableCommitedEvent(tableIdentifier, base, metadata)); + polarisEventListener.onBeforeCommitTable( + new IcebergRestCatalogEvents.BeforeCommitTableEvent( + catalogName, tableIdentifier, base, metadata)); LOGGER.debug( "doCommit for table {} with base {}, metadata {}", tableIdentifier, base, metadata); @@ -1551,8 +1547,9 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { updateTableLike(tableIdentifier, entity); } - polarisEventListener.onAfterTableCommited( - new AfterTableCommitedEvent(tableIdentifier, base, metadata)); + polarisEventListener.onAfterCommitTable( + new IcebergRestCatalogEvents.AfterCommitTableEvent( + catalogName, tableIdentifier, base, metadata)); } @Override @@ -1743,7 +1740,8 @@ public void doRefresh() { if (latestLocation == null) { disableRefresh(); } else { - polarisEventListener.onBeforeViewRefreshed(new BeforeViewRefreshedEvent(identifier)); + polarisEventListener.onBeforeRefreshView( + new IcebergRestCatalogEvents.BeforeRefreshViewEvent(catalogName, identifier)); refreshFromMetadataLocation( latestLocation, SHOULD_RETRY_REFRESH_PREDICATE, @@ -1765,13 +1763,15 @@ public void doRefresh() { return ViewMetadataParser.read(fileIO.newInputFile(metadataLocation)); }); - polarisEventListener.onAfterViewRefreshed(new AfterViewRefreshedEvent(identifier)); + polarisEventListener.onAfterRefreshView( + new IcebergRestCatalogEvents.AfterRefreshViewEvent(catalogName, identifier)); } } public void doCommit(ViewMetadata base, ViewMetadata metadata) { - polarisEventListener.onBeforeViewCommited( - new BeforeViewCommitedEvent(identifier, base, metadata)); + polarisEventListener.onBeforeCommitView( + new IcebergRestCatalogEvents.BeforeCommitViewEvent( + catalogName, identifier, base, metadata)); // TODO: Maybe avoid writing metadata if there's definitely a transaction conflict LOGGER.debug("doCommit for view {} with base {}, metadata {}", identifier, base, metadata); @@ -1867,8 +1867,9 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) { updateTableLike(identifier, entity); } - polarisEventListener.onAfterViewCommited( - new AfterViewCommitedEvent(identifier, base, metadata)); + polarisEventListener.onAfterCommitView( + new IcebergRestCatalogEvents.AfterCommitViewEvent( + catalogName, identifier, base, metadata)); } protected String writeNewMetadataIfRequired(ViewMetadata metadata) { diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java index 95d1df7a55..884eb21596 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java @@ -39,6 +39,7 @@ import org.apache.iceberg.rest.responses.LoadViewResponse; import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.service.catalog.CatalogPrefixParser; import org.apache.polaris.service.catalog.api.IcebergRestCatalogApiService; import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCheckNamespaceExistsEvent; import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCheckTableExistsEvent; @@ -97,6 +98,7 @@ public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatal @Inject @Delegate IcebergCatalogAdapter delegate; @Inject PolarisEventListener polarisEventListener; + @Inject CatalogPrefixParser prefixParser; @Override public Response createNamespace( @@ -104,15 +106,17 @@ public Response createNamespace( CreateNamespaceRequest createNamespaceRequest, RealmContext realmContext, SecurityContext securityContext) { + String catalogName = getCatalogFromPrefix(prefix, realmContext); polarisEventListener.onBeforeCreateNamespace( - new BeforeCreateNamespaceEvent(prefix, createNamespaceRequest)); + new BeforeCreateNamespaceEvent(catalogName, createNamespaceRequest)); Response resp = delegate.createNamespace(prefix, createNamespaceRequest, realmContext, securityContext); - CreateNamespaceResponse createNamespaceResponse = - resp.readEntity(CreateNamespaceResponse.class); + CreateNamespaceResponse createNamespaceResponse = (CreateNamespaceResponse) resp.getEntity(); polarisEventListener.onAfterCreateNamespace( new AfterCreateNamespaceEvent( - prefix, createNamespaceResponse.namespace(), createNamespaceResponse.properties())); + catalogName, + createNamespaceResponse.namespace(), + createNamespaceResponse.properties())); return resp; } @@ -124,44 +128,49 @@ public Response listNamespaces( String parent, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeListNamespaces(new BeforeListNamespacesEvent(prefix, parent)); + String catalogName = getCatalogFromPrefix(prefix, realmContext); + polarisEventListener.onBeforeListNamespaces(new BeforeListNamespacesEvent(catalogName, parent)); Response resp = delegate.listNamespaces(prefix, pageToken, pageSize, parent, realmContext, securityContext); - polarisEventListener.onAfterListNamespaces(new AfterListNamespacesEvent(prefix, parent)); + polarisEventListener.onAfterListNamespaces(new AfterListNamespacesEvent(catalogName, parent)); return resp; } @Override public Response loadNamespaceMetadata( String prefix, String namespace, RealmContext realmContext, SecurityContext securityContext) { + String catalogName = getCatalogFromPrefix(prefix, realmContext); polarisEventListener.onBeforeLoadNamespaceMetadata( - new BeforeLoadNamespaceMetadataEvent(prefix, namespace)); + new BeforeLoadNamespaceMetadataEvent(catalogName, namespace)); Response resp = delegate.loadNamespaceMetadata(prefix, namespace, realmContext, securityContext); - GetNamespaceResponse getNamespaceResponse = resp.readEntity(GetNamespaceResponse.class); + GetNamespaceResponse getNamespaceResponse = (GetNamespaceResponse) resp.getEntity(); polarisEventListener.onAfterLoadNamespaceMetadata( new AfterLoadNamespaceMetadataEvent( - prefix, getNamespaceResponse.namespace(), getNamespaceResponse.properties())); + catalogName, getNamespaceResponse.namespace(), getNamespaceResponse.properties())); return resp; } @Override public Response namespaceExists( String prefix, String namespace, RealmContext realmContext, SecurityContext securityContext) { + String catalogName = getCatalogFromPrefix(prefix, realmContext); polarisEventListener.onBeforeCheckNamespaceExists( - new BeforeCheckNamespaceExistsEvent(prefix, namespace)); + new BeforeCheckNamespaceExistsEvent(catalogName, namespace)); Response resp = delegate.namespaceExists(prefix, namespace, realmContext, securityContext); polarisEventListener.onAfterCheckNamespaceExists( - new AfterCheckNamespaceExistsEvent(prefix, namespace)); + new AfterCheckNamespaceExistsEvent(catalogName, namespace)); return resp; } @Override public Response dropNamespace( String prefix, String namespace, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeDropNamespace(new BeforeDropNamespaceEvent(prefix, namespace)); + String catalogName = getCatalogFromPrefix(prefix, realmContext); + polarisEventListener.onBeforeDropNamespace( + new BeforeDropNamespaceEvent(catalogName, namespace)); Response resp = delegate.dropNamespace(prefix, namespace, realmContext, securityContext); - polarisEventListener.onAfterDropNamespace(new AfterDropNamespaceEvent(prefix, namespace)); + polarisEventListener.onAfterDropNamespace(new AfterDropNamespaceEvent(catalogName, namespace)); return resp; } @@ -172,15 +181,16 @@ public Response updateProperties( UpdateNamespacePropertiesRequest updateNamespacePropertiesRequest, RealmContext realmContext, SecurityContext securityContext) { + String catalogName = getCatalogFromPrefix(prefix, realmContext); polarisEventListener.onBeforeUpdateNamespaceProperties( new BeforeUpdateNamespacePropertiesEvent( - prefix, namespace, updateNamespacePropertiesRequest)); + catalogName, namespace, updateNamespacePropertiesRequest)); Response resp = delegate.updateProperties( prefix, namespace, updateNamespacePropertiesRequest, realmContext, securityContext); polarisEventListener.onAfterUpdateNamespaceProperties( new AfterUpdateNamespacePropertiesEvent( - prefix, namespace, resp.readEntity(UpdateNamespacePropertiesResponse.class))); + catalogName, namespace, (UpdateNamespacePropertiesResponse) resp.getEntity())); return resp; } @@ -192,9 +202,11 @@ public Response createTable( String accessDelegationMode, RealmContext realmContext, SecurityContext securityContext) { + String catalogName = getCatalogFromPrefix(prefix, realmContext); if (!createTableRequest.stageCreate()) { polarisEventListener.onBeforeCreateTable( - new BeforeCreateTableEvent(prefix, namespace, createTableRequest, accessDelegationMode)); + new BeforeCreateTableEvent( + catalogName, namespace, createTableRequest, accessDelegationMode)); } Response resp = delegate.createTable( @@ -206,7 +218,7 @@ public Response createTable( securityContext); if (!createTableRequest.stageCreate()) { polarisEventListener.onAfterCreateTable( - new AfterCreateTableEvent(prefix, namespace, resp.readEntity(LoadTableResponse.class))); + new AfterCreateTableEvent(catalogName, namespace, (LoadTableResponse) resp.getEntity())); } return resp; } @@ -219,10 +231,11 @@ public Response listTables( Integer pageSize, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeListTables(new BeforeListTablesEvent(prefix, namespace)); + String catalogName = getCatalogFromPrefix(prefix, realmContext); + polarisEventListener.onBeforeListTables(new BeforeListTablesEvent(catalogName, namespace)); Response resp = delegate.listTables(prefix, namespace, pageToken, pageSize, realmContext, securityContext); - polarisEventListener.onAfterListTables(new AfterListTablesEvent(prefix, namespace)); + polarisEventListener.onAfterListTables(new AfterListTablesEvent(catalogName, namespace)); return resp; } @@ -236,9 +249,10 @@ public Response loadTable( String snapshots, RealmContext realmContext, SecurityContext securityContext) { + String catalogName = getCatalogFromPrefix(prefix, realmContext); polarisEventListener.onBeforeLoadTable( new BeforeLoadTableEvent( - prefix, namespace, table, accessDelegationMode, ifNoneMatchString, snapshots)); + catalogName, namespace, table, accessDelegationMode, ifNoneMatchString, snapshots)); Response resp = delegate.loadTable( prefix, @@ -250,7 +264,7 @@ public Response loadTable( realmContext, securityContext); polarisEventListener.onAfterLoadTable( - new AfterLoadTableEvent(prefix, namespace, resp.readEntity(LoadTableResponse.class))); + new AfterLoadTableEvent(catalogName, namespace, (LoadTableResponse) resp.getEntity())); return resp; } @@ -261,11 +275,12 @@ public Response tableExists( String table, RealmContext realmContext, SecurityContext securityContext) { + String catalogName = getCatalogFromPrefix(prefix, realmContext); polarisEventListener.onBeforeCheckTableExists( - new BeforeCheckTableExistsEvent(prefix, namespace, table)); + new BeforeCheckTableExistsEvent(catalogName, namespace, table)); Response resp = delegate.tableExists(prefix, namespace, table, realmContext, securityContext); polarisEventListener.onAfterCheckTableExists( - new AfterCheckTableExistsEvent(prefix, namespace, table)); + new AfterCheckTableExistsEvent(catalogName, namespace, table)); return resp; } @@ -277,12 +292,13 @@ public Response dropTable( Boolean purgeRequested, RealmContext realmContext, SecurityContext securityContext) { + String catalogName = getCatalogFromPrefix(prefix, realmContext); polarisEventListener.onBeforeDropTable( - new BeforeDropTableEvent(prefix, namespace, table, purgeRequested)); + new BeforeDropTableEvent(catalogName, namespace, table, purgeRequested)); Response resp = delegate.dropTable(prefix, namespace, table, purgeRequested, realmContext, securityContext); polarisEventListener.onAfterDropTable( - new AfterDropTableEvent(prefix, namespace, table, purgeRequested)); + new AfterDropTableEvent(catalogName, namespace, table, purgeRequested)); return resp; } @@ -293,13 +309,14 @@ public Response registerTable( RegisterTableRequest registerTableRequest, RealmContext realmContext, SecurityContext securityContext) { + String catalogName = getCatalogFromPrefix(prefix, realmContext); polarisEventListener.onBeforeRegisterTable( - new BeforeRegisterTableEvent(prefix, namespace, registerTableRequest)); + new BeforeRegisterTableEvent(catalogName, namespace, registerTableRequest)); Response resp = delegate.registerTable( prefix, namespace, registerTableRequest, realmContext, securityContext); polarisEventListener.onAfterRegisterTable( - new AfterRegisterTableEvent(prefix, namespace, resp.readEntity(LoadTableResponse.class))); + new AfterRegisterTableEvent(catalogName, namespace, (LoadTableResponse) resp.getEntity())); return resp; } @@ -309,10 +326,12 @@ public Response renameTable( RenameTableRequest renameTableRequest, RealmContext realmContext, SecurityContext securityContext) { + String catalogName = getCatalogFromPrefix(prefix, realmContext); polarisEventListener.onBeforeRenameTable( - new BeforeRenameTableEvent(prefix, renameTableRequest)); + new BeforeRenameTableEvent(catalogName, renameTableRequest)); Response resp = delegate.renameTable(prefix, renameTableRequest, realmContext, securityContext); - polarisEventListener.onAfterRenameTable(new AfterRenameTableEvent(prefix, renameTableRequest)); + polarisEventListener.onAfterRenameTable( + new AfterRenameTableEvent(catalogName, renameTableRequest)); return resp; } @@ -324,14 +343,15 @@ public Response updateTable( CommitTableRequest commitTableRequest, RealmContext realmContext, SecurityContext securityContext) { + String catalogName = getCatalogFromPrefix(prefix, realmContext); polarisEventListener.onBeforeUpdateTable( - new BeforeUpdateTableEvent(prefix, namespace, table, commitTableRequest)); + new BeforeUpdateTableEvent(catalogName, namespace, table, commitTableRequest)); Response resp = delegate.updateTable( prefix, namespace, table, commitTableRequest, realmContext, securityContext); polarisEventListener.onAfterUpdateTable( new AfterUpdateTableEvent( - prefix, namespace, table, resp.readEntity(LoadTableResponse.class))); + catalogName, namespace, table, (LoadTableResponse) resp.getEntity())); return resp; } @@ -342,12 +362,13 @@ public Response createView( CreateViewRequest createViewRequest, RealmContext realmContext, SecurityContext securityContext) { + String catalogName = getCatalogFromPrefix(prefix, realmContext); polarisEventListener.onBeforeCreateView( - new BeforeCreateViewEvent(prefix, namespace, createViewRequest)); + new BeforeCreateViewEvent(catalogName, namespace, createViewRequest)); Response resp = delegate.createView(prefix, namespace, createViewRequest, realmContext, securityContext); polarisEventListener.onAfterCreateView( - new AfterCreateViewEvent(prefix, namespace, resp.readEntity(LoadViewResponse.class))); + new AfterCreateViewEvent(catalogName, namespace, (LoadViewResponse) resp.getEntity())); return resp; } @@ -359,10 +380,11 @@ public Response listViews( Integer pageSize, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeListViews(new BeforeListViewsEvent(prefix, namespace)); + String catalogName = getCatalogFromPrefix(prefix, realmContext); + polarisEventListener.onBeforeListViews(new BeforeListViewsEvent(catalogName, namespace)); Response resp = delegate.listViews(prefix, namespace, pageToken, pageSize, realmContext, securityContext); - polarisEventListener.onAfterListViews(new AfterListViewsEvent(prefix, namespace)); + polarisEventListener.onAfterListViews(new AfterListViewsEvent(catalogName, namespace)); return resp; } @@ -373,12 +395,13 @@ public Response loadCredentials( String table, RealmContext realmContext, SecurityContext securityContext) { + String catalogName = getCatalogFromPrefix(prefix, realmContext); polarisEventListener.onBeforeLoadCredentials( - new BeforeLoadCredentialsEvent(prefix, namespace, table)); + new BeforeLoadCredentialsEvent(catalogName, namespace, table)); Response resp = delegate.loadCredentials(prefix, namespace, table, realmContext, securityContext); polarisEventListener.onAfterLoadCredentials( - new AfterLoadCredentialsEvent(prefix, namespace, table)); + new AfterLoadCredentialsEvent(catalogName, namespace, table)); return resp; } @@ -389,10 +412,11 @@ public Response loadView( String view, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeLoadView(new BeforeLoadViewEvent(prefix, namespace, view)); + String catalogName = getCatalogFromPrefix(prefix, realmContext); + polarisEventListener.onBeforeLoadView(new BeforeLoadViewEvent(catalogName, namespace, view)); Response resp = delegate.loadView(prefix, namespace, view, realmContext, securityContext); polarisEventListener.onAfterLoadView( - new AfterLoadViewEvent(prefix, namespace, resp.readEntity(LoadViewResponse.class))); + new AfterLoadViewEvent(catalogName, namespace, (LoadViewResponse) resp.getEntity())); return resp; } @@ -403,11 +427,12 @@ public Response viewExists( String view, RealmContext realmContext, SecurityContext securityContext) { + String catalogName = getCatalogFromPrefix(prefix, realmContext); polarisEventListener.onBeforeCheckViewExists( - new BeforeCheckViewExistsEvent(prefix, namespace, view)); + new BeforeCheckViewExistsEvent(catalogName, namespace, view)); Response resp = delegate.viewExists(prefix, namespace, view, realmContext, securityContext); polarisEventListener.onAfterCheckViewExists( - new AfterCheckViewExistsEvent(prefix, namespace, view)); + new AfterCheckViewExistsEvent(catalogName, namespace, view)); return resp; } @@ -418,9 +443,10 @@ public Response dropView( String view, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeDropView(new BeforeDropViewEvent(prefix, namespace, view)); + String catalogName = getCatalogFromPrefix(prefix, realmContext); + polarisEventListener.onBeforeDropView(new BeforeDropViewEvent(catalogName, namespace, view)); Response resp = delegate.dropView(prefix, namespace, view, realmContext, securityContext); - polarisEventListener.onAfterDropView(new AfterDropViewEvent(prefix, namespace, view)); + polarisEventListener.onAfterDropView(new AfterDropViewEvent(catalogName, namespace, view)); return resp; } @@ -430,9 +456,12 @@ public Response renameView( RenameTableRequest renameTableRequest, RealmContext realmContext, SecurityContext securityContext) { - polarisEventListener.onBeforeRenameView(new BeforeRenameViewEvent(prefix, renameTableRequest)); + String catalogName = getCatalogFromPrefix(prefix, realmContext); + polarisEventListener.onBeforeRenameView( + new BeforeRenameViewEvent(catalogName, renameTableRequest)); Response resp = delegate.renameView(prefix, renameTableRequest, realmContext, securityContext); - polarisEventListener.onAfterRenameView(new AfterRenameViewEvent(prefix, renameTableRequest)); + polarisEventListener.onAfterRenameView( + new AfterRenameViewEvent(catalogName, renameTableRequest)); return resp; } @@ -444,14 +473,15 @@ public Response replaceView( CommitViewRequest commitViewRequest, RealmContext realmContext, SecurityContext securityContext) { + String catalogName = getCatalogFromPrefix(prefix, realmContext); polarisEventListener.onBeforeReplaceView( - new BeforeReplaceViewEvent(prefix, namespace, view, commitViewRequest)); + new BeforeReplaceViewEvent(catalogName, namespace, view, commitViewRequest)); Response resp = delegate.replaceView( prefix, namespace, view, commitViewRequest, realmContext, securityContext); polarisEventListener.onAfterReplaceView( new AfterReplaceViewEvent( - prefix, namespace, view, resp.readEntity(LoadViewResponse.class))); + catalogName, namespace, view, (LoadViewResponse) resp.getEntity())); return resp; } @@ -477,10 +507,8 @@ public Response reportMetrics( ReportMetricsRequest reportMetricsRequest, RealmContext realmContext, SecurityContext securityContext) { - Response resp = - delegate.reportMetrics( - prefix, namespace, table, reportMetricsRequest, realmContext, securityContext); - return resp; + return delegate.reportMetrics( + prefix, namespace, table, reportMetricsRequest, realmContext, securityContext); } @Override @@ -491,13 +519,18 @@ public Response sendNotification( NotificationRequest notificationRequest, RealmContext realmContext, SecurityContext securityContext) { + String catalogName = getCatalogFromPrefix(prefix, realmContext); polarisEventListener.onBeforeSendNotification( - new BeforeSendNotificationEvent(prefix, namespace, table, notificationRequest)); + new BeforeSendNotificationEvent(catalogName, namespace, table, notificationRequest)); Response resp = delegate.sendNotification( prefix, namespace, table, notificationRequest, realmContext, securityContext); polarisEventListener.onAfterSendNotification( - new AfterSendNotificationEvent(prefix, namespace, table, resp.readEntity(Boolean.class))); + new AfterSendNotificationEvent(catalogName, namespace, table, (boolean) resp.getEntity())); return resp; } + + private String getCatalogFromPrefix(String prefix, RealmContext realmContext) { + return prefixParser.prefixToCatalogName(realmContext, prefix); + } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterAttemptTaskEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterAttemptTaskEvent.java new file mode 100644 index 0000000000..4d557a9f16 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterAttemptTaskEvent.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import org.apache.polaris.core.context.CallContext; + +/** + * Emitted after an attempt of an async task, such as manifest file cleanup, finishes. + * + * @param taskEntityId The ID of the TaskEntity. + * @param callContext The CallContext the task is being executed under. + * @param attempt The attempt number. Each retry of the task will have its own attempt number. The + * initial (non-retried) attempt starts counting from 1. + * @param success Whether the attempt succeeded. + */ +public record AfterAttemptTaskEvent( + long taskEntityId, CallContext callContext, int attempt, boolean success) + implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeAttemptTaskEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeAttemptTaskEvent.java new file mode 100644 index 0000000000..80c40b973b --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeAttemptTaskEvent.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import org.apache.polaris.core.context.CallContext; + +/** + * Emitted before an attempt of an async task, such as manifest file cleanup, begins. + * + * @param taskEntityId The ID of the TaskEntity + * @param callContext The CallContext the task is being executed under. + * @param attempt The attempt number. Each retry of the task will have its own attempt number. The + * initial (non-retried) attempt starts counting from 1. + */ +public record BeforeAttemptTaskEvent(long taskEntityId, CallContext callContext, int attempt) + implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitEvent.java new file mode 100644 index 0000000000..c7f244af38 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitEvent.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +/** + * Emitted before the RateLimiterFilter rejects a request due to exceeding the rate limit. + * + * @param method The request's HTTP method + * @param absolutePath The request's absolute path + */ +public record BeforeRequestRateLimitEvent(String method, String absolutePath) + implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java b/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java index cd775cf50d..0787be7c9d 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java @@ -20,7 +20,9 @@ package org.apache.polaris.service.events; import java.util.Map; +import org.apache.iceberg.TableMetadata; import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; import org.apache.iceberg.rest.requests.CreateTableRequest; import org.apache.iceberg.rest.requests.CreateViewRequest; @@ -31,6 +33,7 @@ import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.LoadViewResponse; import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; +import org.apache.iceberg.view.ViewMetadata; import org.apache.polaris.service.types.CommitTableRequest; import org.apache.polaris.service.types.CommitViewRequest; import org.apache.polaris.service.types.NotificationRequest; @@ -43,54 +46,54 @@ public class IcebergRestCatalogEvents { // Namespace Events public record BeforeCreateNamespaceEvent( - String prefix, CreateNamespaceRequest createNamespaceRequest) {} + String catalogName, CreateNamespaceRequest createNamespaceRequest) {} public record AfterCreateNamespaceEvent( - String prefix, Namespace namespace, Map namespaceProperties) {} + String catalogName, Namespace namespace, Map namespaceProperties) {} - public record BeforeListNamespacesEvent(String prefix, String parent) {} + public record BeforeListNamespacesEvent(String catalogName, String parent) {} - public record AfterListNamespacesEvent(String prefix, String parent) {} + public record AfterListNamespacesEvent(String catalogName, String parent) {} - public record BeforeLoadNamespaceMetadataEvent(String prefix, String namespace) {} + public record BeforeLoadNamespaceMetadataEvent(String catalogName, String namespace) {} public record AfterLoadNamespaceMetadataEvent( - String prefix, Namespace namespace, Map namespaceProperties) {} + String catalogName, Namespace namespace, Map namespaceProperties) {} - public record BeforeCheckNamespaceExistsEvent(String prefix, String namespace) {} + public record BeforeCheckNamespaceExistsEvent(String catalogName, String namespace) {} - public record AfterCheckNamespaceExistsEvent(String prefix, String namespace) {} + public record AfterCheckNamespaceExistsEvent(String catalogName, String namespace) {} - public record BeforeDropNamespaceEvent(String prefix, String namespace) {} + public record BeforeDropNamespaceEvent(String catalogName, String namespace) {} - public record AfterDropNamespaceEvent(String prefix, String namespace) {} + public record AfterDropNamespaceEvent(String catalogName, String namespace) {} public record BeforeUpdateNamespacePropertiesEvent( - String prefix, + String catalogName, String namespace, UpdateNamespacePropertiesRequest updateNamespacePropertiesRequest) {} public record AfterUpdateNamespacePropertiesEvent( - String prefix, + String catalogName, String namespace, UpdateNamespacePropertiesResponse updateNamespacePropertiesResponse) {} // Table Events public record BeforeCreateTableEvent( - String prefix, + String catalogName, String namespace, CreateTableRequest createTableRequest, String accessDelegationMode) {} public record AfterCreateTableEvent( - String prefix, String namespace, LoadTableResponse loadTableResponse) {} + String catalogName, String namespace, LoadTableResponse loadTableResponse) {} - public record BeforeListTablesEvent(String prefix, String namespace) {} + public record BeforeListTablesEvent(String catalogName, String namespace) {} - public record AfterListTablesEvent(String prefix, String namespace) {} + public record AfterListTablesEvent(String catalogName, String namespace) {} public record BeforeLoadTableEvent( - String prefix, + String catalogName, String namespace, String table, String accessDelegationMode, @@ -98,82 +101,135 @@ public record BeforeLoadTableEvent( String snapshots) {} public record AfterLoadTableEvent( - String prefix, String namespace, LoadTableResponse loadTableResponse) {} + String catalogName, String namespace, LoadTableResponse loadTableResponse) {} - public record BeforeCheckTableExistsEvent(String prefix, String namespace, String table) {} + public record BeforeCheckTableExistsEvent(String catalogName, String namespace, String table) {} - public record AfterCheckTableExistsEvent(String prefix, String namespace, String table) {} + public record AfterCheckTableExistsEvent(String catalogName, String namespace, String table) {} public record BeforeDropTableEvent( - String prefix, String namespace, String table, Boolean purgeRequested) {} + String catalogName, String namespace, String table, Boolean purgeRequested) {} public record AfterDropTableEvent( - String prefix, String namespace, String table, Boolean purgeRequested) {} + String catalogName, String namespace, String table, Boolean purgeRequested) {} public record BeforeRegisterTableEvent( - String prefix, String namespace, RegisterTableRequest registerTableRequest) {} + String catalogName, String namespace, RegisterTableRequest registerTableRequest) {} public record AfterRegisterTableEvent( - String prefix, String namespace, LoadTableResponse loadTableResponse) {} + String catalogName, String namespace, LoadTableResponse loadTableResponse) {} - public record BeforeRenameTableEvent(String prefix, RenameTableRequest renameTableRequest) {} + public record BeforeRenameTableEvent(String catalogName, RenameTableRequest renameTableRequest) {} - public record AfterRenameTableEvent(String prefix, RenameTableRequest renameTableRequest) {} + public record AfterRenameTableEvent(String catalogName, RenameTableRequest renameTableRequest) {} public record BeforeUpdateTableEvent( - String prefix, String namespace, String sourceTable, CommitTableRequest commitTableRequest) {} + String catalogName, + String namespace, + String sourceTable, + CommitTableRequest commitTableRequest) {} public record AfterUpdateTableEvent( - String prefix, String namespace, String sourceTable, LoadTableResponse loadTableResponse) {} + String catalogName, + String namespace, + String sourceTable, + LoadTableResponse loadTableResponse) {} // View Events public record BeforeCreateViewEvent( - String prefix, String namespace, CreateViewRequest createViewRequest) {} + String catalogName, String namespace, CreateViewRequest createViewRequest) {} public record AfterCreateViewEvent( - String prefix, String namespace, LoadViewResponse loadViewResponse) {} + String catalogName, String namespace, LoadViewResponse loadViewResponse) {} - public record BeforeListViewsEvent(String prefix, String namespace) {} + public record BeforeListViewsEvent(String catalogName, String namespace) {} - public record AfterListViewsEvent(String prefix, String namespace) {} + public record AfterListViewsEvent(String catalogName, String namespace) {} - public record BeforeLoadViewEvent(String prefix, String namespace, String view) {} + public record BeforeLoadViewEvent(String catalogName, String namespace, String view) {} public record AfterLoadViewEvent( - String prefix, String namespace, LoadViewResponse loadViewResponse) {} + String catalogName, String namespace, LoadViewResponse loadViewResponse) {} - public record BeforeCheckViewExistsEvent(String prefix, String namespace, String view) {} + public record BeforeCheckViewExistsEvent(String catalogName, String namespace, String view) {} - public record AfterCheckViewExistsEvent(String prefix, String namespace, String view) {} + public record AfterCheckViewExistsEvent(String catalogName, String namespace, String view) {} - public record BeforeDropViewEvent(String prefix, String namespace, String view) {} + public record BeforeDropViewEvent(String catalogName, String namespace, String view) {} - public record AfterDropViewEvent(String prefix, String namespace, String view) {} + public record AfterDropViewEvent(String catalogName, String namespace, String view) {} - public record BeforeRenameViewEvent(String prefix, RenameTableRequest renameTableRequest) {} + public record BeforeRenameViewEvent(String catalogName, RenameTableRequest renameTableRequest) {} - public record AfterRenameViewEvent(String prefix, RenameTableRequest renameTableRequest) {} + public record AfterRenameViewEvent(String catalogName, RenameTableRequest renameTableRequest) {} public record BeforeReplaceViewEvent( - String prefix, String namespace, String sourceView, CommitViewRequest commitViewRequest) {} + String catalogName, + String namespace, + String sourceView, + CommitViewRequest commitViewRequest) {} public record AfterReplaceViewEvent( - String prefix, String namespace, String sourceView, LoadViewResponse loadViewResponse) {} + String catalogName, String namespace, String sourceView, LoadViewResponse loadViewResponse) {} // Credential Events - public record BeforeLoadCredentialsEvent(String prefix, String namespace, String table) {} + public record BeforeLoadCredentialsEvent(String catalogName, String namespace, String table) {} - public record AfterLoadCredentialsEvent(String prefix, String namespace, String table) {} + public record AfterLoadCredentialsEvent(String catalogName, String namespace, String table) {} // Notification Events public record BeforeSendNotificationEvent( - String prefix, String namespace, String table, NotificationRequest notificationRequest) {} + String catalogName, + String namespace, + String table, + NotificationRequest notificationRequest) {} public record AfterSendNotificationEvent( - String prefix, String namespace, String table, boolean result) {} + String catalogName, String namespace, String table, boolean result) {} // Configuration Events public record BeforeGetConfigEvent(String warehouse) {} public record AfterGetConfigEvent(ConfigResponse configResponse) {} + + // Legacy events + public record BeforeCommitTableEvent( + String catalogName, + TableIdentifier identifier, + TableMetadata beforeMetadata, + TableMetadata afterMetadata) + implements PolarisEvent {} + + public record AfterCommitTableEvent( + String catalogName, + TableIdentifier identifier, + TableMetadata beforeMetadata, + TableMetadata afterMetadata) + implements PolarisEvent {} + + public record BeforeCommitViewEvent( + String catalogName, + TableIdentifier identifier, + ViewMetadata beforeMetadata, + ViewMetadata afterMetadata) + implements PolarisEvent {} + + public record AfterCommitViewEvent( + String catalogName, + TableIdentifier identifier, + ViewMetadata beforeMetadata, + ViewMetadata afterMetadata) + implements PolarisEvent {} + + public record BeforeRefreshTableEvent(String catalogName, TableIdentifier tableIdentifier) + implements PolarisEvent {} + + public record AfterRefreshTableEvent(String catalogName, TableIdentifier tableIdentifier) + implements PolarisEvent {} + + public record BeforeRefreshViewEvent(String catalogName, TableIdentifier viewIdentifier) + implements PolarisEvent {} + + public record AfterRefreshViewEvent(String catalogName, TableIdentifier viewIdentifier) + implements PolarisEvent {} } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java index 14306712b0..ce57c01ba1 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java @@ -23,38 +23,38 @@ * Event details are documented under the event objects themselves. */ public abstract class PolarisEventListener { - /** {@link BeforeRequestRateLimitedEvent} */ - public void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent event) {} + /** {@link BeforeRequestRateLimitEvent} */ + public void onBeforeRequestRateLimit(BeforeRequestRateLimitEvent event) {} - /** {@link BeforeTableCommitedEvent} */ - public void onBeforeTableCommited(BeforeTableCommitedEvent event) {} + /** {@link IcebergRestCatalogEvents.BeforeCommitTableEvent} */ + public void onBeforeCommitTable(IcebergRestCatalogEvents.BeforeCommitTableEvent event) {} - /** {@link AfterTableCommitedEvent} */ - public void onAfterTableCommited(AfterTableCommitedEvent event) {} + /** {@link IcebergRestCatalogEvents.AfterCommitTableEvent} */ + public void onAfterCommitTable(IcebergRestCatalogEvents.AfterCommitTableEvent event) {} - /** {@link BeforeViewCommitedEvent} */ - public void onBeforeViewCommited(BeforeViewCommitedEvent event) {} + /** {@link IcebergRestCatalogEvents.BeforeCommitViewEvent} */ + public void onBeforeCommitView(IcebergRestCatalogEvents.BeforeCommitViewEvent event) {} - /** {@link AfterViewCommitedEvent} */ - public void onAfterViewCommited(AfterViewCommitedEvent event) {} + /** {@link IcebergRestCatalogEvents.AfterCommitViewEvent} */ + public void onAfterCommitView(IcebergRestCatalogEvents.AfterCommitViewEvent event) {} - /** {@link BeforeTableRefreshedEvent} */ - public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) {} + /** {@link IcebergRestCatalogEvents.BeforeRefreshTableEvent} */ + public void onBeforeRefreshTable(IcebergRestCatalogEvents.BeforeRefreshTableEvent event) {} - /** {@link AfterTableRefreshedEvent} */ - public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {} + /** {@link IcebergRestCatalogEvents.AfterRefreshTableEvent} */ + public void onAfterRefreshTable(IcebergRestCatalogEvents.AfterRefreshTableEvent event) {} - /** {@link BeforeViewRefreshedEvent} */ - public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) {} + /** {@link IcebergRestCatalogEvents.BeforeRefreshViewEvent} */ + public void onBeforeRefreshView(IcebergRestCatalogEvents.BeforeRefreshViewEvent event) {} - /** {@link AfterViewRefreshedEvent} */ - public void onAfterViewRefreshed(AfterViewRefreshedEvent event) {} + /** {@link IcebergRestCatalogEvents.AfterRefreshViewEvent} */ + public void onAfterRefreshView(IcebergRestCatalogEvents.AfterRefreshViewEvent event) {} - /** {@link BeforeTaskAttemptedEvent} */ - public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) {} + /** {@link BeforeAttemptTaskEvent} */ + public void onBeforeAttemptTask(BeforeAttemptTaskEvent event) {} - /** {@link AfterTaskAttemptedEvent} */ - public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {} + /** {@link AfterAttemptTaskEvent} */ + public void onAfterAttemptTask(AfterAttemptTaskEvent event) {} // Iceberg REST Catalog Namespace Events /** {@link IcebergRestCatalogEvents.BeforeCreateNamespaceEvent} */ diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/TestPolarisEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/TestPolarisEventListener.java index 2e2538e890..6b3da8562e 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/TestPolarisEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/TestPolarisEventListener.java @@ -36,57 +36,57 @@ public T getLatest(Class type) { } @Override - public void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent event) { + public void onBeforeRequestRateLimit(BeforeRequestRateLimitEvent event) { history.add(event); } @Override - public void onBeforeTableCommited(BeforeTableCommitedEvent event) { + public void onBeforeCommitTable(IcebergRestCatalogEvents.BeforeCommitTableEvent event) { history.add(event); } @Override - public void onAfterTableCommited(AfterTableCommitedEvent event) { + public void onAfterCommitTable(IcebergRestCatalogEvents.AfterCommitTableEvent event) { history.add(event); } @Override - public void onBeforeViewCommited(BeforeViewCommitedEvent event) { + public void onBeforeCommitView(IcebergRestCatalogEvents.BeforeCommitViewEvent event) { history.add(event); } @Override - public void onAfterViewCommited(AfterViewCommitedEvent event) { + public void onAfterCommitView(IcebergRestCatalogEvents.AfterCommitViewEvent event) { history.add(event); } @Override - public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) { + public void onBeforeRefreshTable(IcebergRestCatalogEvents.BeforeRefreshTableEvent event) { history.add(event); } @Override - public void onAfterTableRefreshed(AfterTableRefreshedEvent event) { + public void onAfterRefreshTable(IcebergRestCatalogEvents.AfterRefreshTableEvent event) { history.add(event); } @Override - public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) { + public void onBeforeRefreshView(IcebergRestCatalogEvents.BeforeRefreshViewEvent event) { history.add(event); } @Override - public void onAfterViewRefreshed(AfterViewRefreshedEvent event) { + public void onAfterRefreshView(IcebergRestCatalogEvents.AfterRefreshViewEvent event) { history.add(event); } @Override - public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) { + public void onBeforeAttemptTask(BeforeAttemptTaskEvent event) { history.add(event); } @Override - public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) { + public void onAfterAttemptTask(AfterAttemptTaskEvent event) { history.add(event); } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java b/runtime/service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java index 5b08e89bc2..5dd6fb7b40 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java @@ -28,7 +28,7 @@ import jakarta.ws.rs.ext.Provider; import java.io.IOException; import org.apache.polaris.service.config.FilterPriorities; -import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.BeforeRequestRateLimitEvent; import org.apache.polaris.service.events.PolarisEventListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,8 +54,8 @@ public RateLimiterFilter(RateLimiter rateLimiter, PolarisEventListener polarisEv @Override public void filter(ContainerRequestContext ctx) throws IOException { if (!rateLimiter.canProceed()) { - polarisEventListener.onBeforeRequestRateLimited( - new BeforeRequestRateLimitedEvent( + polarisEventListener.onBeforeRequestRateLimit( + new BeforeRequestRateLimitEvent( ctx.getMethod(), ctx.getUriInfo().getAbsolutePath().toString())); ctx.abortWith(Response.status(Response.Status.TOO_MANY_REQUESTS).build()); LOGGER.atDebug().log("Rate limiting request"); diff --git a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java index 6ee681ead7..dca58919a0 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java @@ -43,8 +43,8 @@ import org.apache.polaris.core.entity.TaskEntity; import org.apache.polaris.core.persistence.MetaStoreManagerFactory; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; -import org.apache.polaris.service.events.AfterTaskAttemptedEvent; -import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.AfterAttemptTaskEvent; +import org.apache.polaris.service.events.BeforeAttemptTaskEvent; import org.apache.polaris.service.events.PolarisEventListener; import org.apache.polaris.service.tracing.TracingFilter; import org.slf4j.Logger; @@ -143,8 +143,8 @@ public void addTaskHandlerContext(long taskEntityId, CallContext callContext) { } protected void handleTask(long taskEntityId, CallContext ctx, int attempt) { - polarisEventListener.onBeforeTaskAttempted( - new BeforeTaskAttemptedEvent(taskEntityId, ctx, attempt)); + polarisEventListener.onBeforeAttemptTask( + new BeforeAttemptTaskEvent(taskEntityId, ctx, attempt)); boolean success = false; try { @@ -187,8 +187,8 @@ protected void handleTask(long taskEntityId, CallContext ctx, int attempt) { .log("Unable to execute async task"); } } finally { - polarisEventListener.onAfterTaskAttempted( - new AfterTaskAttemptedEvent(taskEntityId, ctx, attempt, success)); + polarisEventListener.onAfterAttemptTask( + new AfterAttemptTaskEvent(taskEntityId, ctx, attempt, success)); } } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java index fca6a969e4..055593726c 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java @@ -137,10 +137,7 @@ import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory; import org.apache.polaris.service.config.ReservedProperties; -import org.apache.polaris.service.events.AfterTableCommitedEvent; -import org.apache.polaris.service.events.AfterTableRefreshedEvent; -import org.apache.polaris.service.events.BeforeTableCommitedEvent; -import org.apache.polaris.service.events.BeforeTableRefreshedEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents; import org.apache.polaris.service.events.PolarisEventListener; import org.apache.polaris.service.events.TestPolarisEventListener; import org.apache.polaris.service.exception.FakeAzureHttpResponse; @@ -2224,21 +2221,26 @@ public void testEventsAreEmitted() { table.updateProperties().set(key, valOld).commit(); table.updateProperties().set(key, valNew).commit(); - var beforeRefreshEvent = testPolarisEventListener.getLatest(BeforeTableRefreshedEvent.class); + var beforeRefreshEvent = + testPolarisEventListener.getLatest(IcebergRestCatalogEvents.BeforeRefreshTableEvent.class); Assertions.assertThat(beforeRefreshEvent.tableIdentifier()).isEqualTo(TestData.TABLE); - var afterRefreshEvent = testPolarisEventListener.getLatest(AfterTableRefreshedEvent.class); + var afterRefreshEvent = + testPolarisEventListener.getLatest(IcebergRestCatalogEvents.AfterRefreshTableEvent.class); Assertions.assertThat(afterRefreshEvent.tableIdentifier()).isEqualTo(TestData.TABLE); - var beforeTableEvent = testPolarisEventListener.getLatest(BeforeTableCommitedEvent.class); + var beforeTableEvent = + testPolarisEventListener.getLatest(IcebergRestCatalogEvents.BeforeCommitTableEvent.class); Assertions.assertThat(beforeTableEvent.identifier()).isEqualTo(TestData.TABLE); - Assertions.assertThat(beforeTableEvent.base().properties().get(key)).isEqualTo(valOld); - Assertions.assertThat(beforeTableEvent.metadata().properties().get(key)).isEqualTo(valNew); + Assertions.assertThat(beforeTableEvent.beforeMetadata().properties().get(key)) + .isEqualTo(valOld); + Assertions.assertThat(beforeTableEvent.afterMetadata().properties().get(key)).isEqualTo(valNew); - var afterTableEvent = testPolarisEventListener.getLatest(AfterTableCommitedEvent.class); + var afterTableEvent = + testPolarisEventListener.getLatest(IcebergRestCatalogEvents.AfterCommitTableEvent.class); Assertions.assertThat(afterTableEvent.identifier()).isEqualTo(TestData.TABLE); - Assertions.assertThat(afterTableEvent.base().properties().get(key)).isEqualTo(valOld); - Assertions.assertThat(afterTableEvent.metadata().properties().get(key)).isEqualTo(valNew); + Assertions.assertThat(afterTableEvent.beforeMetadata().properties().get(key)).isEqualTo(valOld); + Assertions.assertThat(afterTableEvent.afterMetadata().properties().get(key)).isEqualTo(valNew); } private static PageToken nextRequest(Page previousPage) { diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java index 51c1d08f5a..55f33c03ea 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java @@ -62,10 +62,7 @@ import org.apache.polaris.service.catalog.io.DefaultFileIOFactory; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.config.ReservedProperties; -import org.apache.polaris.service.events.AfterViewCommitedEvent; -import org.apache.polaris.service.events.AfterViewRefreshedEvent; -import org.apache.polaris.service.events.BeforeViewCommitedEvent; -import org.apache.polaris.service.events.BeforeViewRefreshedEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents; import org.apache.polaris.service.events.PolarisEventListener; import org.apache.polaris.service.events.TestPolarisEventListener; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; @@ -269,20 +266,27 @@ public void testEventsAreEmitted() { view.updateProperties().set(key, valOld).commit(); view.updateProperties().set(key, valNew).commit(); - var beforeRefreshEvent = testPolarisEventListener.getLatest(BeforeViewRefreshedEvent.class); + var beforeRefreshEvent = + testPolarisEventListener.getLatest(IcebergRestCatalogEvents.BeforeRefreshViewEvent.class); Assertions.assertThat(beforeRefreshEvent.viewIdentifier()).isEqualTo(TestData.TABLE); - var afterRefreshEvent = testPolarisEventListener.getLatest(AfterViewRefreshedEvent.class); + var afterRefreshEvent = + testPolarisEventListener.getLatest(IcebergRestCatalogEvents.AfterRefreshViewEvent.class); Assertions.assertThat(afterRefreshEvent.viewIdentifier()).isEqualTo(TestData.TABLE); - var beforeCommitEvent = testPolarisEventListener.getLatest(BeforeViewCommitedEvent.class); + var beforeCommitEvent = + testPolarisEventListener.getLatest(IcebergRestCatalogEvents.BeforeCommitViewEvent.class); Assertions.assertThat(beforeCommitEvent.identifier()).isEqualTo(TestData.TABLE); - Assertions.assertThat(beforeCommitEvent.base().properties().get(key)).isEqualTo(valOld); - Assertions.assertThat(beforeCommitEvent.metadata().properties().get(key)).isEqualTo(valNew); + Assertions.assertThat(beforeCommitEvent.beforeMetadata().properties().get(key)) + .isEqualTo(valOld); + Assertions.assertThat(beforeCommitEvent.afterMetadata().properties().get(key)) + .isEqualTo(valNew); - var afterCommitEvent = testPolarisEventListener.getLatest(AfterViewCommitedEvent.class); + var afterCommitEvent = + testPolarisEventListener.getLatest(IcebergRestCatalogEvents.AfterCommitViewEvent.class); Assertions.assertThat(afterCommitEvent.identifier()).isEqualTo(TestData.TABLE); - Assertions.assertThat(afterCommitEvent.base().properties().get(key)).isEqualTo(valOld); - Assertions.assertThat(afterCommitEvent.metadata().properties().get(key)).isEqualTo(valNew); + Assertions.assertThat(afterCommitEvent.beforeMetadata().properties().get(key)) + .isEqualTo(valOld); + Assertions.assertThat(afterCommitEvent.afterMetadata().properties().get(key)).isEqualTo(valNew); } } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimiterFilterTest.java b/runtime/service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimiterFilterTest.java index ff66a47255..f450d2b083 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimiterFilterTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimiterFilterTest.java @@ -32,7 +32,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Consumer; -import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.BeforeRequestRateLimitEvent; import org.apache.polaris.service.events.PolarisEventListener; import org.apache.polaris.service.events.TestPolarisEventListener; import org.apache.polaris.service.ratelimiter.RateLimiterFilterTest.Profile; @@ -148,9 +148,9 @@ public void testMetricsAreEmittedWhenRateLimiting() { } requestAsserter.accept(Status.TOO_MANY_REQUESTS); - BeforeRequestRateLimitedEvent event = + BeforeRequestRateLimitEvent event = ((TestPolarisEventListener) polarisEventListener) - .getLatest(BeforeRequestRateLimitedEvent.class); + .getLatest(BeforeRequestRateLimitEvent.class); assertThat(event.method()).isEqualTo("GET"); // Examples of expected metrics: diff --git a/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java b/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java index 03f9c88a3d..8eac95d0dc 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java @@ -25,8 +25,8 @@ import org.apache.polaris.core.persistence.MetaStoreManagerFactory; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.service.TestServices; -import org.apache.polaris.service.events.AfterTaskAttemptedEvent; -import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.AfterAttemptTaskEvent; +import org.apache.polaris.service.events.BeforeAttemptTaskEvent; import org.apache.polaris.service.events.TestPolarisEventListener; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -80,7 +80,7 @@ public boolean canHandleTask(TaskEntity task) { @Override public boolean handleTask(TaskEntity task, CallContext callContext) { var beforeTaskAttemptedEvent = - testPolarisEventListener.getLatest(BeforeTaskAttemptedEvent.class); + testPolarisEventListener.getLatest(BeforeAttemptTaskEvent.class); Assertions.assertEquals(taskEntity.getId(), beforeTaskAttemptedEvent.taskEntityId()); Assertions.assertEquals(callContext, beforeTaskAttemptedEvent.callContext()); Assertions.assertEquals(attempt, beforeTaskAttemptedEvent.attempt()); @@ -90,7 +90,7 @@ public boolean handleTask(TaskEntity task, CallContext callContext) { executor.handleTask(taskEntity.getId(), polarisCallCtx, attempt); - var afterAttemptTaskEvent = testPolarisEventListener.getLatest(AfterTaskAttemptedEvent.class); + var afterAttemptTaskEvent = testPolarisEventListener.getLatest(AfterAttemptTaskEvent.class); Assertions.assertEquals(taskEntity.getId(), afterAttemptTaskEvent.taskEntityId()); Assertions.assertEquals(polarisCallCtx, afterAttemptTaskEvent.callContext()); Assertions.assertEquals(attempt, afterAttemptTaskEvent.attempt()); From ce9a705dd77b2fc57c3adf9568977931aff4c97f Mon Sep 17 00:00:00 2001 From: adnanhemani Date: Fri, 5 Sep 2025 14:32:50 -0700 Subject: [PATCH 5/8] spotlessapply --- .../polaris/service/events/listeners/PolarisEventListener.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java index c36d6720da..b38e9de9bb 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java @@ -21,8 +21,8 @@ import org.apache.polaris.service.events.AfterAttemptTaskEvent; import org.apache.polaris.service.events.BeforeAttemptTaskEvent; import org.apache.polaris.service.events.BeforeLimitRequestRateEvent; -import org.apache.polaris.service.events.IcebergRestCatalogEvents; import org.apache.polaris.service.events.CatalogGenericTableServiceEvents; +import org.apache.polaris.service.events.IcebergRestCatalogEvents; /** * Represents an event listener that can respond to notable moments during Polaris's execution. From 1f9249bc9e6989b1a8eea19af98d543d90ded855 Mon Sep 17 00:00:00 2001 From: adnanhemani Date: Mon, 8 Sep 2025 15:19:33 -0700 Subject: [PATCH 6/8] fix compile error --- ...ebergRestCatalogEventServiceDelegator.java | 41 +++++++++---------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java index 9f66f712e1..2629ded7b6 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java @@ -96,12 +96,11 @@ @Decorator @Priority(1000) -public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatalogApiService { +public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatalogApiService, CatalogAdapter { @Inject @Delegate IcebergCatalogAdapter delegate; @Inject PolarisEventListener polarisEventListener; @Inject CatalogPrefixParser prefixParser; - @Inject CatalogAdapter catalogAdapter; @Override public Response createNamespace( @@ -145,7 +144,7 @@ public Response loadNamespaceMetadata( String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); polarisEventListener.onBeforeLoadNamespaceMetadata( new BeforeLoadNamespaceMetadataEvent( - catalogName, catalogAdapter.decodeNamespace(namespace))); + catalogName, decodeNamespace(namespace))); Response resp = delegate.loadNamespaceMetadata(prefix, namespace, realmContext, securityContext); GetNamespaceResponse getNamespaceResponse = (GetNamespaceResponse) resp.getEntity(); @@ -159,7 +158,7 @@ public Response loadNamespaceMetadata( public Response namespaceExists( String prefix, String namespace, RealmContext realmContext, SecurityContext securityContext) { String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); - Namespace namespaceObj = catalogAdapter.decodeNamespace(namespace); + Namespace namespaceObj = decodeNamespace(namespace); polarisEventListener.onBeforeCheckExistsNamespace( new BeforeCheckExistsNamespaceEvent(catalogName, namespaceObj)); Response resp = delegate.namespaceExists(prefix, namespace, realmContext, securityContext); @@ -173,7 +172,7 @@ public Response dropNamespace( String prefix, String namespace, RealmContext realmContext, SecurityContext securityContext) { String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); polarisEventListener.onBeforeDropNamespace( - new BeforeDropNamespaceEvent(catalogName, catalogAdapter.decodeNamespace(namespace))); + new BeforeDropNamespaceEvent(catalogName, decodeNamespace(namespace))); Response resp = delegate.dropNamespace(prefix, namespace, realmContext, securityContext); polarisEventListener.onAfterDropNamespace(new AfterDropNamespaceEvent(catalogName, namespace)); return resp; @@ -187,7 +186,7 @@ public Response updateProperties( RealmContext realmContext, SecurityContext securityContext) { String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); - Namespace namespaceObj = catalogAdapter.decodeNamespace(namespace); + Namespace namespaceObj = decodeNamespace(namespace); polarisEventListener.onBeforeUpdateNamespaceProperties( new BeforeUpdateNamespacePropertiesEvent( catalogName, namespaceObj, updateNamespacePropertiesRequest)); @@ -209,7 +208,7 @@ public Response createTable( RealmContext realmContext, SecurityContext securityContext) { String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); - Namespace namespaceObj = catalogAdapter.decodeNamespace(namespace); + Namespace namespaceObj = decodeNamespace(namespace); if (!createTableRequest.stageCreate()) { polarisEventListener.onBeforeCreateTable( new BeforeCreateTableEvent( @@ -243,7 +242,7 @@ public Response listTables( RealmContext realmContext, SecurityContext securityContext) { String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); - Namespace namespaceObj = catalogAdapter.decodeNamespace(namespace); + Namespace namespaceObj = decodeNamespace(namespace); polarisEventListener.onBeforeListTables(new BeforeListTablesEvent(catalogName, namespaceObj)); Response resp = delegate.listTables(prefix, namespace, pageToken, pageSize, realmContext, securityContext); @@ -262,7 +261,7 @@ public Response loadTable( RealmContext realmContext, SecurityContext securityContext) { String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); - Namespace namespaceObj = catalogAdapter.decodeNamespace(namespace); + Namespace namespaceObj = decodeNamespace(namespace); polarisEventListener.onBeforeLoadTable( new BeforeLoadTableEvent( catalogName, namespaceObj, table, accessDelegationMode, ifNoneMatchString, snapshots)); @@ -289,7 +288,7 @@ public Response tableExists( RealmContext realmContext, SecurityContext securityContext) { String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); - Namespace namespaceObj = catalogAdapter.decodeNamespace(namespace); + Namespace namespaceObj = decodeNamespace(namespace); polarisEventListener.onBeforeCheckExistsTable( new BeforeCheckExistsTableEvent(catalogName, namespaceObj, table)); Response resp = delegate.tableExists(prefix, namespace, table, realmContext, securityContext); @@ -307,7 +306,7 @@ public Response dropTable( RealmContext realmContext, SecurityContext securityContext) { String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); - Namespace namespaceObj = catalogAdapter.decodeNamespace(namespace); + Namespace namespaceObj = decodeNamespace(namespace); polarisEventListener.onBeforeDropTable( new BeforeDropTableEvent(catalogName, namespaceObj, table, purgeRequested)); Response resp = @@ -325,7 +324,7 @@ public Response registerTable( RealmContext realmContext, SecurityContext securityContext) { String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); - Namespace namespaceObj = catalogAdapter.decodeNamespace(namespace); + Namespace namespaceObj = decodeNamespace(namespace); polarisEventListener.onBeforeRegisterTable( new BeforeRegisterTableEvent(catalogName, namespaceObj, registerTableRequest)); Response resp = @@ -361,7 +360,7 @@ public Response updateTable( RealmContext realmContext, SecurityContext securityContext) { String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); - Namespace namespaceObj = catalogAdapter.decodeNamespace(namespace); + Namespace namespaceObj = decodeNamespace(namespace); polarisEventListener.onBeforeUpdateTable( new BeforeUpdateTableEvent(catalogName, namespaceObj, table, commitTableRequest)); Response resp = @@ -381,7 +380,7 @@ public Response createView( RealmContext realmContext, SecurityContext securityContext) { String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); - Namespace namespaceObj = catalogAdapter.decodeNamespace(namespace); + Namespace namespaceObj = decodeNamespace(namespace); polarisEventListener.onBeforeCreateView( new BeforeCreateViewEvent(catalogName, namespaceObj, createViewRequest)); Response resp = @@ -400,7 +399,7 @@ public Response listViews( RealmContext realmContext, SecurityContext securityContext) { String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); - Namespace namespaceObj = catalogAdapter.decodeNamespace(namespace); + Namespace namespaceObj = decodeNamespace(namespace); polarisEventListener.onBeforeListViews(new BeforeListViewsEvent(catalogName, namespaceObj)); Response resp = delegate.listViews(prefix, namespace, pageToken, pageSize, realmContext, securityContext); @@ -416,7 +415,7 @@ public Response loadCredentials( RealmContext realmContext, SecurityContext securityContext) { String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); - Namespace namespaceObj = catalogAdapter.decodeNamespace(namespace); + Namespace namespaceObj = decodeNamespace(namespace); polarisEventListener.onBeforeLoadCredentials( new BeforeLoadCredentialsEvent(catalogName, namespaceObj, table)); Response resp = @@ -434,7 +433,7 @@ public Response loadView( RealmContext realmContext, SecurityContext securityContext) { String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); - Namespace namespaceObj = catalogAdapter.decodeNamespace(namespace); + Namespace namespaceObj = decodeNamespace(namespace); polarisEventListener.onBeforeLoadView(new BeforeLoadViewEvent(catalogName, namespaceObj, view)); Response resp = delegate.loadView(prefix, namespace, view, realmContext, securityContext); polarisEventListener.onAfterLoadView( @@ -450,7 +449,7 @@ public Response viewExists( RealmContext realmContext, SecurityContext securityContext) { String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); - Namespace namespaceObj = catalogAdapter.decodeNamespace(namespace); + Namespace namespaceObj = decodeNamespace(namespace); polarisEventListener.onBeforeCheckExistsView( new BeforeCheckExistsViewEvent(catalogName, namespaceObj, view)); Response resp = delegate.viewExists(prefix, namespace, view, realmContext, securityContext); @@ -467,7 +466,7 @@ public Response dropView( RealmContext realmContext, SecurityContext securityContext) { String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); - Namespace namespaceObj = catalogAdapter.decodeNamespace(namespace); + Namespace namespaceObj = decodeNamespace(namespace); polarisEventListener.onBeforeDropView(new BeforeDropViewEvent(catalogName, namespaceObj, view)); Response resp = delegate.dropView(prefix, namespace, view, realmContext, securityContext); polarisEventListener.onAfterDropView(new AfterDropViewEvent(catalogName, namespaceObj, view)); @@ -498,7 +497,7 @@ public Response replaceView( RealmContext realmContext, SecurityContext securityContext) { String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); - Namespace namespaceObj = catalogAdapter.decodeNamespace(namespace); + Namespace namespaceObj = decodeNamespace(namespace); polarisEventListener.onBeforeReplaceView( new BeforeReplaceViewEvent(catalogName, namespaceObj, view, commitViewRequest)); Response resp = @@ -545,7 +544,7 @@ public Response sendNotification( RealmContext realmContext, SecurityContext securityContext) { String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); - Namespace namespaceObj = catalogAdapter.decodeNamespace(namespace); + Namespace namespaceObj = decodeNamespace(namespace); polarisEventListener.onBeforeSendNotification( new BeforeSendNotificationEvent(catalogName, namespaceObj, table, notificationRequest)); Response resp = From f3ef8df597cbf12635942af1f79f10726d5aafa2 Mon Sep 17 00:00:00 2001 From: adnanhemani Date: Mon, 8 Sep 2025 16:57:39 -0700 Subject: [PATCH 7/8] review comments from @singhpk234 --- ...ebergRestCatalogEventServiceDelegator.java | 30 +++++++++++-------- .../events/IcebergRestCatalogEvents.java | 9 ++++++ .../listeners/PolarisEventListener.java | 9 ++++++ 3 files changed, 35 insertions(+), 13 deletions(-) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java index 2629ded7b6..edf15d484d 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java @@ -43,6 +43,7 @@ import org.apache.polaris.service.catalog.CatalogPrefixParser; import org.apache.polaris.service.catalog.api.IcebergRestCatalogApiService; import org.apache.polaris.service.catalog.common.CatalogAdapter; +import org.apache.polaris.service.events.IcebergRestCatalogEvents; import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCheckExistsNamespaceEvent; import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCheckExistsTableEvent; import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCheckExistsViewEvent; @@ -96,7 +97,8 @@ @Decorator @Priority(1000) -public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatalogApiService, CatalogAdapter { +public class IcebergRestCatalogEventServiceDelegator + implements IcebergRestCatalogApiService, CatalogAdapter { @Inject @Delegate IcebergCatalogAdapter delegate; @Inject PolarisEventListener polarisEventListener; @@ -143,8 +145,7 @@ public Response loadNamespaceMetadata( String prefix, String namespace, RealmContext realmContext, SecurityContext securityContext) { String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); polarisEventListener.onBeforeLoadNamespaceMetadata( - new BeforeLoadNamespaceMetadataEvent( - catalogName, decodeNamespace(namespace))); + new BeforeLoadNamespaceMetadataEvent(catalogName, decodeNamespace(namespace))); Response resp = delegate.loadNamespaceMetadata(prefix, namespace, realmContext, securityContext); GetNamespaceResponse getNamespaceResponse = (GetNamespaceResponse) resp.getEntity(); @@ -209,11 +210,9 @@ public Response createTable( SecurityContext securityContext) { String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); Namespace namespaceObj = decodeNamespace(namespace); - if (!createTableRequest.stageCreate()) { - polarisEventListener.onBeforeCreateTable( - new BeforeCreateTableEvent( - catalogName, namespaceObj, createTableRequest, accessDelegationMode)); - } + polarisEventListener.onBeforeCreateTable( + new BeforeCreateTableEvent( + catalogName, namespaceObj, createTableRequest, accessDelegationMode)); Response resp = delegate.createTable( prefix, @@ -509,17 +508,22 @@ public Response replaceView( return resp; } - /** - * Table Committed Events are already instrumented at a more granular level than the API itself. - */ @Override public Response commitTransaction( String prefix, CommitTransactionRequest commitTransactionRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.commitTransaction( - prefix, commitTransactionRequest, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + polarisEventListener.onBeforeCommitTransaction( + new IcebergRestCatalogEvents.BeforeCommitTransactionEvent( + catalogName, commitTransactionRequest)); + Response resp = + delegate.commitTransaction(prefix, commitTransactionRequest, realmContext, securityContext); + polarisEventListener.onAfterCommitTransaction( + new IcebergRestCatalogEvents.AfterCommitTransactionEvent( + catalogName, commitTransactionRequest)); + return resp; } /** This API is currently a no-op in Polaris. */ diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java b/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java index 4472d135e4..76d3cf2989 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java @@ -23,6 +23,7 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.requests.CommitTransactionRequest; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; import org.apache.iceberg.rest.requests.CreateTableRequest; import org.apache.iceberg.rest.requests.CreateViewRequest; @@ -184,6 +185,14 @@ public record BeforeLoadCredentialsEvent(String catalogName, Namespace namespace public record AfterLoadCredentialsEvent(String catalogName, Namespace namespace, String table) {} + // Transaction Events + public record BeforeCommitTransactionEvent( + String catalogName, CommitTransactionRequest commitTransactionRequest) {} + + // TODO: Add all PolarisEntities that were modified with this transaction. + public record AfterCommitTransactionEvent( + String catalogName, CommitTransactionRequest commitTransactionRequest) {} + // Notification Events public record BeforeSendNotificationEvent( String catalogName, diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java index d753b51afd..20dc304d0e 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java @@ -206,6 +206,15 @@ public void onBeforeLoadCredentials(IcebergRestCatalogEvents.BeforeLoadCredentia /** {@link IcebergRestCatalogEvents.AfterLoadCredentialsEvent} */ public void onAfterLoadCredentials(IcebergRestCatalogEvents.AfterLoadCredentialsEvent event) {} + // Iceberg REST Catalog Transactions Events + /** {@link IcebergRestCatalogEvents.BeforeCommitTransactionEvent} */ + public void onBeforeCommitTransaction( + IcebergRestCatalogEvents.BeforeCommitTransactionEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterCommitTransactionEvent} */ + public void onAfterCommitTransaction( + IcebergRestCatalogEvents.AfterCommitTransactionEvent event) {} + // Iceberg REST Catalog Notification Events /** {@link IcebergRestCatalogEvents.BeforeSendNotificationEvent} */ public void onBeforeSendNotification( From 20295a1f6168e568da418badfadf39c075b138f6 Mon Sep 17 00:00:00 2001 From: adnanhemani Date: Wed, 17 Sep 2025 13:56:15 -0700 Subject: [PATCH 8/8] merge from main --- .../PolarisPersistenceEventListener.java | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java index 4c205aa665..e9d43f0032 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java @@ -90,22 +90,6 @@ public void onAfterCreateTable(IcebergRestCatalogEvents.AfterCreateTableEvent ev processEvent(polarisEvent); } - @Override - public void onAfterCatalogCreated(AfterCatalogCreatedEvent event) { - ContextSpecificInformation contextSpecificInformation = getContextSpecificInformation(); - PolarisEvent polarisEvent = - new PolarisEvent( - event.catalogName(), - org.apache.polaris.service.events.PolarisEvent.createEventId(), - getRequestId(), - event.getClass().getSimpleName(), - contextSpecificInformation.timestamp(), - contextSpecificInformation.principalName(), - PolarisEvent.ResourceType.CATALOG, - event.catalogName()); - processEvent(polarisEvent); - } - protected record ContextSpecificInformation(long timestamp, @Nullable String principalName) {} abstract ContextSpecificInformation getContextSpecificInformation();