-
Notifications
You must be signed in to change notification settings - Fork 25.6k
[ML] add auditor to data frame plugin #40012
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a194836
ad006db
c55d74c
e0cdfab
d7b9b0b
4b089a5
c066494
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,122 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License; | ||
| * you may not use this file except in compliance with the Elastic License. | ||
| */ | ||
| package org.elasticsearch.xpack.core.common.notifications; | ||
|
|
||
| import org.elasticsearch.common.ParseField; | ||
| import org.elasticsearch.common.xcontent.ToXContent; | ||
| import org.elasticsearch.common.xcontent.ToXContentObject; | ||
| import org.elasticsearch.common.xcontent.XContentBuilder; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.Date; | ||
| import java.util.Objects; | ||
|
|
||
| public abstract class AbstractAuditMessage implements ToXContentObject { | ||
| public static final ParseField TYPE = new ParseField("audit_message"); | ||
|
|
||
| public static final ParseField MESSAGE = new ParseField("message"); | ||
| public static final ParseField LEVEL = new ParseField("level"); | ||
| public static final ParseField TIMESTAMP = new ParseField("timestamp"); | ||
| public static final ParseField NODE_NAME = new ParseField("node_name"); | ||
|
|
||
| private final String resourceId; | ||
| private final String message; | ||
| private final Level level; | ||
| private final Date timestamp; | ||
| private final String nodeName; | ||
|
|
||
| public AbstractAuditMessage(String resourceId, String message, Level level, String nodeName) { | ||
| this.resourceId = resourceId; | ||
| this.message = Objects.requireNonNull(message); | ||
| this.level = Objects.requireNonNull(level); | ||
| this.timestamp = new Date(); | ||
| this.nodeName = nodeName; | ||
| } | ||
|
|
||
| protected AbstractAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) { | ||
| this.resourceId = resourceId; | ||
| this.message = Objects.requireNonNull(message); | ||
| this.level = Objects.requireNonNull(level); | ||
| this.timestamp = Objects.requireNonNull(timestamp); | ||
| this.nodeName = nodeName; | ||
| } | ||
|
|
||
| public final String getResourceId() { | ||
| return resourceId; | ||
| } | ||
|
|
||
| public final String getMessage() { | ||
| return message; | ||
| } | ||
|
|
||
| public final Level getLevel() { | ||
| return level; | ||
| } | ||
|
|
||
| public final Date getTimestamp() { | ||
| return timestamp; | ||
| } | ||
|
|
||
| public final String getNodeName() { | ||
| return nodeName; | ||
| } | ||
|
|
||
| @Override | ||
| public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { | ||
| builder.startObject(); | ||
| if (resourceId != null) { | ||
| builder.field(getResourceField(), resourceId); | ||
| } | ||
| builder.field(MESSAGE.getPreferredName(), message); | ||
| builder.field(LEVEL.getPreferredName(), level); | ||
| builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime()); | ||
| if (nodeName != null) { | ||
| builder.field(NODE_NAME.getPreferredName(), nodeName); | ||
| } | ||
| builder.endObject(); | ||
| return builder; | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(resourceId, message, level, timestamp); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object obj) { | ||
| if (obj == null) { | ||
| return false; | ||
| } | ||
| if (obj instanceof AbstractAuditMessage == false) { | ||
| return false; | ||
| } | ||
|
|
||
| AbstractAuditMessage other = (AbstractAuditMessage) obj; | ||
| return Objects.equals(resourceId, other.resourceId) && | ||
| Objects.equals(message, other.message) && | ||
| Objects.equals(level, other.level) && | ||
| Objects.equals(timestamp, other.timestamp); | ||
| } | ||
|
|
||
| protected abstract String getResourceField(); | ||
|
|
||
| public abstract static class AbstractBuilder<T extends AbstractAuditMessage> { | ||
|
|
||
| public T info(String resourceId, String message, String nodeName) { | ||
| return newMessage(Level.INFO, resourceId, message, nodeName); | ||
| } | ||
|
|
||
| public T warning(String resourceId, String message, String nodeName) { | ||
| return newMessage(Level.WARNING, resourceId, message, nodeName); | ||
| } | ||
|
|
||
| public T error(String resourceId, String message, String nodeName) { | ||
| return newMessage(Level.ERROR, resourceId, message, nodeName); | ||
| } | ||
|
|
||
| protected abstract T newMessage(Level level, String resourceId, String message, String nodeName); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License; | ||
| * you may not use this file except in compliance with the Elastic License. | ||
| */ | ||
| package org.elasticsearch.xpack.core.common.notifications; | ||
|
|
||
| import org.apache.logging.log4j.LogManager; | ||
| import org.apache.logging.log4j.Logger; | ||
| import org.elasticsearch.action.ActionListener; | ||
| import org.elasticsearch.action.index.IndexRequest; | ||
| import org.elasticsearch.action.index.IndexResponse; | ||
| import org.elasticsearch.client.Client; | ||
| import org.elasticsearch.common.unit.TimeValue; | ||
| import org.elasticsearch.common.xcontent.ToXContent; | ||
| import org.elasticsearch.common.xcontent.XContentBuilder; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.Objects; | ||
|
|
||
| import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; | ||
| import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; | ||
|
|
||
| public class Auditor<T extends AbstractAuditMessage> { | ||
|
|
||
| private static final Logger logger = LogManager.getLogger(Auditor.class); | ||
| private final Client client; | ||
| private final String nodeName; | ||
| private final String auditIndex; | ||
| private final String executionOrigin; | ||
| private final AbstractAuditMessage.AbstractBuilder<T> messageBuilder; | ||
|
|
||
| public Auditor(Client client, | ||
| String nodeName, | ||
| String auditIndex, | ||
| String executionOrigin, | ||
| AbstractAuditMessage.AbstractBuilder<T> messageBuilder) { | ||
| this.client = Objects.requireNonNull(client); | ||
| this.nodeName = Objects.requireNonNull(nodeName); | ||
| this.auditIndex = auditIndex; | ||
| this.executionOrigin = executionOrigin; | ||
| this.messageBuilder = Objects.requireNonNull(messageBuilder); | ||
| } | ||
|
|
||
| public final void info(String resourceId, String message) { | ||
| indexDoc(messageBuilder.info(resourceId, message, nodeName)); | ||
| } | ||
|
|
||
| public final void warning(String resourceId, String message) { | ||
| indexDoc(messageBuilder.warning(resourceId, message, nodeName)); | ||
| } | ||
|
|
||
| public final void error(String resourceId, String message) { | ||
| indexDoc(messageBuilder.error(resourceId, message, nodeName)); | ||
| } | ||
|
|
||
| protected void onIndexResponse(IndexResponse response) { | ||
| logger.trace("Successfully wrote audit message"); | ||
| } | ||
|
|
||
| protected void onIndexFailure(Exception exception) { | ||
| logger.debug("Failed to write audit message", exception); | ||
| } | ||
|
|
||
| private void indexDoc(ToXContent toXContent) { | ||
| IndexRequest indexRequest = new IndexRequest(auditIndex); | ||
| indexRequest.source(toXContentBuilder(toXContent)); | ||
| indexRequest.timeout(TimeValue.timeValueSeconds(5)); | ||
| executeAsyncWithOrigin(client.threadPool().getThreadContext(), | ||
| executionOrigin, | ||
| indexRequest, | ||
| ActionListener.wrap( | ||
| this::onIndexResponse, | ||
| this::onIndexFailure | ||
| ), client::index); | ||
| } | ||
|
|
||
| private XContentBuilder toXContentBuilder(ToXContent toXContent) { | ||
| try (XContentBuilder jsonBuilder = jsonBuilder()) { | ||
| return toXContent.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License; | ||
| * you may not use this file except in compliance with the Elastic License. | ||
| */ | ||
| package org.elasticsearch.xpack.core.common.notifications; | ||
|
|
||
| import java.util.Locale; | ||
|
|
||
| public enum Level { | ||
| INFO, WARNING, ERROR; | ||
|
|
||
| /** | ||
| * Case-insensitive from string method. | ||
| * | ||
| * @param value | ||
| * String representation | ||
| * @return The condition type | ||
| */ | ||
| public static Level fromString(String value) { | ||
| return Level.valueOf(value.toUpperCase(Locale.ROOT)); | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return name().toLowerCase(Locale.ROOT); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License; | ||
| * you may not use this file except in compliance with the Elastic License. | ||
| */ | ||
| package org.elasticsearch.xpack.core.dataframe.notifications; | ||
|
|
||
| import org.elasticsearch.common.ParseField; | ||
| import org.elasticsearch.common.xcontent.ConstructingObjectParser; | ||
| import org.elasticsearch.common.xcontent.ObjectParser; | ||
| import org.elasticsearch.common.xcontent.XContentParser; | ||
| import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage; | ||
| import org.elasticsearch.xpack.core.common.notifications.Level; | ||
| import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils; | ||
|
|
||
| import java.util.Date; | ||
|
|
||
| import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; | ||
| import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; | ||
| import static org.elasticsearch.xpack.core.dataframe.DataFrameField.DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD; | ||
|
|
||
| public class DataFrameAuditMessage extends AbstractAuditMessage { | ||
|
|
||
| private static final ParseField TRANSFORM_ID = new ParseField(DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD); | ||
| public static final ConstructingObjectParser<DataFrameAuditMessage, Void> PARSER = new ConstructingObjectParser<>( | ||
| "data_frame_audit_message", | ||
| true, | ||
| a -> new DataFrameAuditMessage((String)a[0], (String)a[1], (Level)a[2], (Date)a[3], (String)a[4])); | ||
|
|
||
| static { | ||
| PARSER.declareString(optionalConstructorArg(), TRANSFORM_ID); | ||
| PARSER.declareString(constructorArg(), MESSAGE); | ||
| PARSER.declareField(constructorArg(), p -> { | ||
| if (p.currentToken() == XContentParser.Token.VALUE_STRING) { | ||
| return Level.fromString(p.text()); | ||
| } | ||
| throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); | ||
| }, LEVEL, ObjectParser.ValueType.STRING); | ||
| PARSER.declareField(constructorArg(), parser -> { | ||
| if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) { | ||
| return new Date(parser.longValue()); | ||
| } else if (parser.currentToken() == XContentParser.Token.VALUE_STRING) { | ||
| return new Date(TimeUtils.dateStringToEpoch(parser.text())); | ||
| } | ||
| throw new IllegalArgumentException( | ||
| "unexpected token [" + parser.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]"); | ||
| }, TIMESTAMP, ObjectParser.ValueType.VALUE); | ||
| PARSER.declareString(optionalConstructorArg(), NODE_NAME); | ||
| } | ||
|
|
||
| public DataFrameAuditMessage(String resourceId, String message, Level level, String nodeName) { | ||
| super(resourceId, message, level, nodeName); | ||
| } | ||
|
|
||
| protected DataFrameAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) { | ||
| super(resourceId, message, level, timestamp, nodeName); | ||
| } | ||
|
|
||
| @Override | ||
| protected String getResourceField() { | ||
| return TRANSFORM_ID.getPreferredName(); | ||
| } | ||
|
|
||
| public static AbstractAuditMessage.AbstractBuilder<DataFrameAuditMessage> builder() { | ||
| return new AbstractBuilder<DataFrameAuditMessage>() { | ||
| @Override | ||
| protected DataFrameAuditMessage newMessage(Level level, String resourceId, String message, String nodeName) { | ||
| return new DataFrameAuditMessage(resourceId, message, level, nodeName); | ||
| } | ||
| }; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -158,10 +158,18 @@ private static Map<String, RoleDescriptor> initializeReservedRoles() { | |
| null, MetadataUtils.DEFAULT_RESERVED_METADATA)) | ||
| .put("data_frame_transforms_admin", new RoleDescriptor("data_frame_transforms_admin", | ||
| new String[] { "manage_data_frame_transforms" }, | ||
| null, null, null, null, MetadataUtils.DEFAULT_RESERVED_METADATA, null)) | ||
| new RoleDescriptor.IndicesPrivileges[]{ | ||
| RoleDescriptor.IndicesPrivileges.builder() | ||
| .indices(".data-frame-notifications*") | ||
| .privileges("view_index_metadata", "read").build() | ||
| }, null, null, null, MetadataUtils.DEFAULT_RESERVED_METADATA, null)) | ||
| .put("data_frame_transforms_user", new RoleDescriptor("data_frame_transforms_user", | ||
| new String[] { "monitor_data_frame_transforms" }, | ||
| null, null, null, null, MetadataUtils.DEFAULT_RESERVED_METADATA, null)) | ||
| new RoleDescriptor.IndicesPrivileges[]{ | ||
| RoleDescriptor.IndicesPrivileges.builder() | ||
| .indices(".data-frame-notifications*") | ||
| .privileges("view_index_metadata", "read").build() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please open a separate PR on the stack docs similar to elastic/stack-docs#246 that updates the docs for the definition of these two roles. It causes problems for people who create their own custom roles if we don't accurately state which privileges our built in roles provide.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @droberts195, I definitely will if this PR gets approved and we are going to use the Auditor. |
||
| }, null, null, null, MetadataUtils.DEFAULT_RESERVED_METADATA, null)) | ||
| .put("watcher_admin", new RoleDescriptor("watcher_admin", new String[] { "manage_watcher" }, | ||
| new RoleDescriptor.IndicesPrivileges[] { | ||
| RoleDescriptor.IndicesPrivileges.builder().indices(Watch.INDEX, TriggeredWatchStoreField.INDEX_NAME, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add
Objects.requireNotNullto those that need it. Probably all?