-
Notifications
You must be signed in to change notification settings - Fork 330
Introducing AfterRefreshTableEventListener and Mixins #2836
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
adutra
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @vchag the mixin approach looks interesting. There are however a few items that need fixing before we can merge.
| public class ObjectMapperFactory { | ||
| private ObjectMapperFactory() {} | ||
|
|
||
| public static ObjectMapper create() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should not create an ObjectMapper like this, but rather use CDI.
- If possible, let's use the main ObjectMapper provided by Quarkus. You can customize it by adding your customizations in the
org.apache.polaris.service.config.PolarisIcebergObjectMapperCustomizerbean. - If it's not possible to reuse the main ObjectMapper (for example, because of incompatible configuration), then you should produce a new ObjectMapper bean.
I realize that the old version of AwsCloudWatchEventListener was also doing it wrong, so let's seize the opportunity to fix this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with the CDI based approach. I found option #2 to be better feasible.
Let's seize the opportunity!
| .registerModule(new Jdk8Module()) // If you never serialize Optional, you can remove the | ||
| // .registerModule(new Jdk8Module()) line. | ||
| .registerModule(new JavaTimeModule()) | ||
| .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain why we need to disable these features? I would prefer to customize the mapper as little as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was an oversight on my part.... will remove them.
| * Mixins for Iceberg classes we don't control, to keep JSON concise. The @JsonValue marks | ||
| * toString() as the value to serialize. | ||
| */ | ||
| public class IcebergThirdPartyMixins { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to IcebergMixins and make it final.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will rename.
| public class IcebergThirdPartyMixins { | ||
| private IcebergThirdPartyMixins() {} | ||
|
|
||
| public abstract static class NamespaceMixin { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this is the best thing to do when serializing namespaces and table identifiers. Granted, it's very convenient, but it would not correctly handle dots in namespace segments.
For example the following namespaces would produce the same string:
Namespace.of("one.two", "three.four", "ns");
Namespace.of("one", "two", "three", "four", "ns");I think it's wiser to serialize these two classes as objects, not scalars.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch!
How about we aligned the serialization behavior with how the Iceberg REST API represents namespaces and table identifiers?
Namespace.of("one", "two", "three", "four", "ns");
would result in:
{"namespace": ["one", "two", "three", "four", "ns"]}
TableIdentifier.of(Namespace.of("one", "two", "three", "four", "ns"), "table")
would result in:
{"namespace": ["one", "two", "three", "four", "ns"], "name": "table"}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed I think that's preferable. It's slightly verbose for simple cases, but it's the only representation that is compatible with all the corner cases.
| import java.io.IOException; | ||
| import org.apache.iceberg.catalog.TableIdentifier; | ||
|
|
||
| public class TableIdentifierToStringSerializer extends JsonSerializer<TableIdentifier> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this serializer? We already have IcebergThirdPartyMixins.TableIdentifierMixin.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is redundant. I'll go ahead and remove it.
| String realmId, | ||
| Collection<String> activatedRoles, | ||
| String eventType, | ||
| @JsonUnwrapped PolarisEvent event // flatten |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice 👍
| */ | ||
| public abstract class PropertyMapEventListener implements PolarisEventListener { | ||
| protected abstract void transformAndSendEvent(HashMap<String, Object> properties); | ||
| public abstract class AfterRefreshTableEventListener implements PolarisEventListener { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry but just renaming this class doesn't make it any more usable.
In its current form, this class does not qualify as a general-purpose component worthy of being distributed with Polaris OSS.
In my opinion this class must be removed.
BTW, AwsCloudWatchEventListener is also extremely problematic, as it only handles after-table-refresh events, and nothing else. In theory, AwsCloudWatchEventListener should either equally handle all 150+ event types, or make the event types to handle configurable.
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.polaris.service.events.jsonEventListener; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also need to modify this package name: jsonEventListener does not comply with java naming conventions:
https://docs.oracle.com/javase/tutorial/java/package/namingpkgs.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we re-name it listener?
| import org.apache.polaris.service.events.json.serde.TableIdentifierToStringSerializer; | ||
|
|
||
| @JsonTypeName("AfterRefreshTableEvent") | ||
| public abstract class AfterRefreshTableEventMixin { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need this mixin, we already have specified @JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) in PolarisEventBaseMixin. That should be enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, Yes. We don't need this mixin.
| properties.put( | ||
| "activated_roles", ((PolarisPrincipal) securityContext.getUserPrincipal()).getRoles()); | ||
| // TODO: Add request ID when it is available | ||
| protected void transformAndSendEvent(IcebergRestCatalogEvents.AfterRefreshTableEvent event) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As said above, this class must handle all event types, or determine the event types to handle via configuration.
For example, we could introduce the following configuration option:
polaris.event-listener.aws-cloudwatch.event-types=\
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterRefreshTableEvent,\
org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCommitTableEventI would be in favor of doing this change in this PR since it this class has the same shortcomings as PropertyMapEventListener.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh right. Lets introduce a new config for aws-cloudwatch.event-types.
| public class IcebergThirdPartyMixins { | ||
| private IcebergThirdPartyMixins() {} | ||
|
|
||
| public abstract static class NamespaceMixin { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed I think that's preferable. It's slightly verbose for simple cases, but it's the only representation that is compatible with all the corner cases.
| @JsonValue | ||
| public abstract String toString(); // serializes "namespace" as "db.sales" | ||
| @JsonIgnore | ||
| public abstract String toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need to declare this method?
| @JsonValue | ||
| public abstract String toString(); // serializes "table_identifier" as "db.sales.orders" | ||
| @JsonIgnore | ||
| public abstract String toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
|
||
| /** | ||
| * Base class for event listeners that with to generically forward all {@link PolarisEvent | ||
| * PolarisEvents} to an external sinks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| * PolarisEvents} to an external sinks | |
| * PolarisEvents} to an external sink. |
| import org.slf4j.LoggerFactory; | ||
|
|
||
| /** This mapper is isolated and used exclusively for CloudWatch event serializations */ | ||
| public class PolarisAWSCloudWatchObjectMapperProducer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a separate mapper? This one seems fairly identical to the default one, except of course for the mixins. But you could add the mixins to the default mapper instead.
| @WithName("event-types") | ||
| @WithDefault("org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterRefreshTableEvent") | ||
| @Override | ||
| Set<String> eventTypes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could be Set<Class<? extends PolarisEvent>>.
| * @return a set of event types | ||
| */ | ||
| @WithName("event-types") | ||
| @WithDefault("org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterRefreshTableEvent") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not a good default. I'd say the default should be an empty set, and we should treat an empty set as "process all events".
WDYT?
Also, please add the corresponding property to application.properties, that helps users:
polaris/runtime/defaults/src/main/resources/application.properties
Lines 142 to 147 in 0acbb64
| # AWS CloudWatch event listener settings | |
| # polaris.event-listener.type=aws-cloudwatch | |
| # polaris.event-listener.aws-cloudwatch.log-group=polaris-cloudwatch-default-group | |
| # polaris.event-listener.aws-cloudwatch.log-stream=polaris-cloudwatch-default-stream | |
| # polaris.event-listener.aws-cloudwatch.region=us-east-1 | |
| # polaris.event-listener.aws-cloudwatch.synchronous-mode=false |
|
|
||
| @Inject | ||
| public PolarisAWSCloudWatchObjectMapperProducer( | ||
| @ConfigProperty(name = "polaris.aws.cloudwatch.max-body-size", defaultValue = "16M") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this config property coming from? It doesn't start with the prefix used for other CloudWatch properties, which is polaris.event-listener.aws-cloudwatch..
| | `polaris.tasks.max-concurrent-tasks` | `100` | Define the max number of concurrent tasks. | | ||
| | `polaris.tasks.max-queued-tasks` | `1000` | Define the max number of tasks in queue. | | ||
| | `polaris.config.rollback.compaction.on-conflicts.enabled` | `false` | When set to true Polaris will apply the deconfliction by rollbacking those REPLACE operations snapshots which have the property of `polaris.internal.rollback.compaction.on-conflict` in their snapshot summary set to `rollback`, to resolve conflicts at the server end. | | ||
| | `polaris.event-listener.type` | `no-op` | Define the Polaris event listener type. Supported values are `no-op`, `aws-cloudwatch`. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW persistence-in-memory-buffer is also supported.
| | `polaris.event-listener.aws-cloudwatch.log-stream` | `polaris-cloudwatch-default-stream`| Define the AWS CloudWatch log stream name for the event listener. Ensure that Polaris' IAM credentials have the following actions: "PutLogEvents", "DescribeLogStreams", and "DescribeLogGroups" on the specified log stream/group. If the specified log stream/group does not exist, then "CreateLogStream" and "CreateLogGroup" will also be required. | | ||
| | `polaris.event-listener.aws-cloudwatch.region` | `us-east-1` | Define the AWS region for the CloudWatch event listener. | | ||
| | `polaris.event-listener.aws-cloudwatch.synchronous-mode` | `false` | Define whether log events are sent to CloudWatch synchronously. When set to true, events are sent synchronously which may impact performance but ensures immediate delivery. When false (default), events are sent asynchronously for better performance. | | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Why remove this line?
|
Closing in favor of a new PR from a feature branch. Thanks for the reviews so far.....carrying over all context to the new PR. |
|
@vchag what's the status here? Are you still able to work in this issue? |
|
@adutra Yes, I'm on it. Will release it within the next 4 - 8 hrs. |
|
Here's the new PR #2962. |
These changes were created to address concerns raised in Issues #2630
What changes were proposed in this pull request?
Renaming
PropertyMapEventListenerfor clarity:The class
PropertyMapEventListenerwas misleadingly named, suggesting a generic event listening capability when its only overridden method was onAfterRefreshTable. To ensure the name accurately reflects its single responsibility, we have renamed it toAfterRefreshTableEventListener. This precise naming clearly communicates its purpose: to only process the AfterRefreshTableEvent.Streamlining Event-to-JSON Serialization:
The previous architecture involved a redundant, two-step conversion process: events were transformed into intermediate Maps by an abstract class, and only then was the resulting map serialized into JSON (as seen in
AwsCloudWatchEventListener).Since Jackson JSON is our chosen serialization format, we've removed this unnecessary intermediate mapping step. We introduced Jackson Mixins to provide native JSON serialization support directly on the event objects. This refactoring simplifies the serialization pipeline from:
Event->Map->JSONto
Event->JSONWhy are the changes needed?
Does this PR introduce any user-facing change?
How was this patch tested?
CHANGELOG.md