From a1948361540ab5ab8ba5653bdc0173d2c2840689 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Wed, 13 Mar 2019 11:36:45 -0500 Subject: [PATCH 1/3] [Data Frame] add auditor --- .../notifications/AbstractAuditMessage.java | 195 ++++++++++++++++++ .../common/notifications/AbstractAuditor.java | 74 +++++++ .../core/common/notifications/Level.java | 42 ++++ .../xpack/core/dataframe/DataFrameField.java | 1 + .../notifications/DataFrameAuditMessage.java | 65 ++++++ .../authz/store/ReservedRolesStore.java | 12 +- .../AbstractAuditMessageTests.java | 131 ++++++++++++ .../notifications/AbstractAuditorTests.java | 122 +++++++++++ .../core/common/notifications/LevelTests.java | 113 ++++++++++ .../DataFrameAuditMessageTests.java | 82 ++++++++ .../authz/store/ReservedRolesStoreTests.java | 2 + .../integration/DataFrameAuditorIT.java | 76 +++++++ .../xpack/dataframe/DataFrame.java | 15 +- .../notifications/DataFrameAuditor.java | 45 ++++ .../persistence/DataFrameInternalIndex.java | 53 +++++ ...FrameTransformPersistentTasksExecutor.java | 8 +- .../transforms/DataFrameTransformTask.java | 17 +- 17 files changed, 1043 insertions(+), 10 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessage.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/Level.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/notifications/DataFrameAuditMessage.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessageTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditorTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/LevelTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/notifications/DataFrameAuditMessageTests.java create mode 100644 x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameAuditorIT.java create mode 100644 x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/notifications/DataFrameAuditor.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessage.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessage.java new file mode 100644 index 0000000000000..3c57c57a2e28f --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessage.java @@ -0,0 +1,195 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.common.notifications; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Date; +import java.util.Objects; +import java.util.function.Supplier; + +public abstract class AbstractAuditMessage implements ToXContentObject, Writeable { + public static final ParseField TYPE = new ParseField("audit_message"); + + public static final ParseField MESSAGE = new ParseField("message"); + public static final ParseField LEVEL = new ParseField("level"); + public static final ParseField TIMESTAMP = new ParseField("timestamp"); + public static final ParseField NODE_NAME = new ParseField("node_name"); + + private String resourceId; + private String message; + private Level level; + private Date timestamp; + private String nodeName; + + protected AbstractAuditMessage() { + this.timestamp = new Date(); + } + + AbstractAuditMessage(String resourceId, String message, Level level, String nodeName) { + this.resourceId = resourceId; + this.message = message; + this.level = level; + this.timestamp = new Date(); + this.nodeName = nodeName; + } + + public AbstractAuditMessage(StreamInput in) throws IOException { + resourceId = in.readOptionalString(); + message = in.readOptionalString(); + if (in.readBoolean()) { + level = Level.readFromStream(in); + } + if (in.readBoolean()) { + timestamp = new Date(in.readLong()); + } + nodeName = in.readOptionalString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalString(resourceId); + out.writeOptionalString(message); + boolean hasLevel = level != null; + out.writeBoolean(hasLevel); + if (hasLevel) { + level.writeTo(out); + } + boolean hasTimestamp = timestamp != null; + out.writeBoolean(hasTimestamp); + if (hasTimestamp) { + out.writeLong(timestamp.getTime()); + } + out.writeOptionalString(nodeName); + } + + public final String getResourceId() { + return resourceId; + } + + public final void setResourceId(String resourceId) { + this.resourceId = resourceId; + } + + public final String getMessage() { + return message; + } + + public final void setMessage(String message) { + this.message = message; + } + + public final Level getLevel() { + return level; + } + + public final void setLevel(Level level) { + this.level = level; + } + + public final Date getTimestamp() { + return timestamp; + } + + public final void setTimestamp(Date timestamp) { + this.timestamp = timestamp; + } + + public final String getNodeName() { + return nodeName; + } + + public final void setNodeName(String nodeName) { + this.nodeName = nodeName; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + if (resourceId != null) { + builder.field(getResourceField(), resourceId); + } + if (message != null) { + builder.field(MESSAGE.getPreferredName(), message); + } + if (level != null) { + builder.field(LEVEL.getPreferredName(), level); + } + if (timestamp != null) { + builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime()); + } + if (nodeName != null) { + builder.field(NODE_NAME.getPreferredName(), nodeName); + } + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(resourceId, message, level, timestamp); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj instanceof AbstractAuditMessage == false) { + return false; + } + + AbstractAuditMessage other = (AbstractAuditMessage) obj; + return Objects.equals(resourceId, other.resourceId) && + Objects.equals(message, other.message) && + Objects.equals(level, other.level) && + Objects.equals(timestamp, other.timestamp); + } + + protected abstract String getResourceField(); + + public static final class AuditMessageBuilder { + + private final Supplier auditSupplier; + + public AuditMessageBuilder(Supplier auditSupplier) { + this.auditSupplier = Objects.requireNonNull(auditSupplier); + } + + public T activity(String resourceId, String message, String nodeName) { + return newMessage(Level.ACTIVITY, resourceId, message, nodeName); + } + + public T info(String resourceId, String message, String nodeName) { + return newMessage(Level.INFO, resourceId, message, nodeName); + } + + public T warning(String resourceId, String message, String nodeName) { + return newMessage(Level.WARNING, resourceId, message, nodeName); + } + + public T error(String resourceId, String message, String nodeName) { + return newMessage(Level.ERROR, resourceId, message, nodeName); + } + + private T newMessage(Level level, String resourceId, String message, String nodeName) { + T audit = auditSupplier.get(); + audit.setLevel(level); + audit.setResourceId(resourceId); + audit.setMessage(message); + audit.setNodeName(nodeName); + return audit; + } + + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java new file mode 100644 index 0000000000000..566da2a021b66 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.common.notifications; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; + +public abstract class AbstractAuditor { + + private final Client client; + private final String nodeName; + private final AbstractAuditMessage.AuditMessageBuilder messageBuilder; + + public AbstractAuditor(Client client, String nodeName, AbstractAuditMessage.AuditMessageBuilder messageBuilder) { + this.client = Objects.requireNonNull(client); + this.nodeName = Objects.requireNonNull(nodeName); + this.messageBuilder = Objects.requireNonNull(messageBuilder); + } + + public final void info(String resourceId, String message) { + indexDoc(messageBuilder.info(resourceId, message, nodeName)); + } + + public final void warning(String resourceId, String message) { + indexDoc(messageBuilder.warning(resourceId, message, nodeName)); + } + + public final void error(String resourceId, String message) { + indexDoc(messageBuilder.error(resourceId, message, nodeName)); + } + + protected abstract String getExecutionOrigin(); + + protected abstract String getAuditIndex(); + + protected abstract void onIndexResponse(IndexResponse response); + + protected abstract void onIndexFailure(Exception exception); + + private void indexDoc(ToXContent toXContent) { + IndexRequest indexRequest = new IndexRequest(getAuditIndex()); + indexRequest.source(toXContentBuilder(toXContent)); + indexRequest.timeout(TimeValue.timeValueSeconds(5)); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), + getExecutionOrigin(), + indexRequest, + ActionListener.wrap( + this::onIndexResponse, + this::onIndexFailure + ), client::index); + } + + private XContentBuilder toXContentBuilder(ToXContent toXContent) { + try (XContentBuilder jsonBuilder = jsonBuilder()) { + return toXContent.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/Level.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/Level.java new file mode 100644 index 0000000000000..b8cf5ada10f0e --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/Level.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.common.notifications; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.Locale; + +public enum Level implements Writeable { + INFO, ACTIVITY, WARNING, ERROR; + + /** + * Case-insensitive from string method. + * + * @param value + * String representation + * @return The condition type + */ + public static Level fromString(String value) { + return Level.valueOf(value.toUpperCase(Locale.ROOT)); + } + + public static Level readFromStream(StreamInput in) throws IOException { + return in.readEnum(Level.class); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeEnum(this); + } + + @Override + public String toString() { + return name().toLowerCase(Locale.ROOT); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java index e25b2619b19f6..c8da21e596a5e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java @@ -32,6 +32,7 @@ public final class DataFrameField { public static final String REST_BASE_PATH = "/_data_frame/"; public static final String REST_BASE_PATH_TRANSFORMS = REST_BASE_PATH + "transforms/"; public static final String REST_BASE_PATH_TRANSFORMS_BY_ID = REST_BASE_PATH_TRANSFORMS + "{id}/"; + public static final String DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD = "transform_id"; // note: this is used to match tasks public static final String PERSISTENT_TASK_DESCRIPTION_PREFIX = "data_frame_"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/notifications/DataFrameAuditMessage.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/notifications/DataFrameAuditMessage.java new file mode 100644 index 0000000000000..b72d764b37452 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/notifications/DataFrameAuditMessage.java @@ -0,0 +1,65 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.dataframe.notifications; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage; +import org.elasticsearch.xpack.core.common.notifications.Level; +import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils; + +import java.io.IOException; +import java.util.Date; + +import static org.elasticsearch.xpack.core.dataframe.DataFrameField.DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD; + +public class DataFrameAuditMessage extends AbstractAuditMessage { + + private static final ParseField TRANSFORM_ID = new ParseField(DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD); + public static final ObjectParser PARSER = new ObjectParser<>("data_frame_audit_message", + true, + DataFrameAuditMessage::new); + + static { + PARSER.declareString(AbstractAuditMessage::setResourceId, TRANSFORM_ID); + PARSER.declareString(AbstractAuditMessage::setMessage, MESSAGE); + PARSER.declareField(AbstractAuditMessage::setLevel, p -> { + if (p.currentToken() == XContentParser.Token.VALUE_STRING) { + return Level.fromString(p.text()); + } + throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); + }, LEVEL, ObjectParser.ValueType.STRING); + PARSER.declareField(AbstractAuditMessage::setTimestamp, parser -> { + if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) { + return new Date(parser.longValue()); + } else if (parser.currentToken() == XContentParser.Token.VALUE_STRING) { + return new Date(TimeUtils.dateStringToEpoch(parser.text())); + } + throw new IllegalArgumentException( + "unexpected token [" + parser.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]"); + }, TIMESTAMP, ObjectParser.ValueType.VALUE); + PARSER.declareString(AbstractAuditMessage::setNodeName, NODE_NAME); + } + + public DataFrameAuditMessage(StreamInput in) throws IOException { + super(in); + } + + public DataFrameAuditMessage(){ + super(); + } + + @Override + protected String getResourceField() { + return TRANSFORM_ID.getPreferredName(); + } + + public static AbstractAuditMessage.AuditMessageBuilder messageBuilder() { + return new AuditMessageBuilder<>(DataFrameAuditMessage::new); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStore.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStore.java index fe8ef032e8fe2..d2745e4ef8e2e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStore.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStore.java @@ -158,10 +158,18 @@ private static Map initializeReservedRoles() { null, MetadataUtils.DEFAULT_RESERVED_METADATA)) .put("data_frame_transforms_admin", new RoleDescriptor("data_frame_transforms_admin", new String[] { "manage_data_frame_transforms" }, - null, null, null, null, MetadataUtils.DEFAULT_RESERVED_METADATA, null)) + new RoleDescriptor.IndicesPrivileges[]{ + RoleDescriptor.IndicesPrivileges.builder() + .indices(".data-frame-notifications*") + .privileges("view_index_metadata", "read").build() + }, null, null, null, MetadataUtils.DEFAULT_RESERVED_METADATA, null)) .put("data_frame_transforms_user", new RoleDescriptor("data_frame_transforms_user", new String[] { "monitor_data_frame_transforms" }, - null, null, null, null, MetadataUtils.DEFAULT_RESERVED_METADATA, null)) + new RoleDescriptor.IndicesPrivileges[]{ + RoleDescriptor.IndicesPrivileges.builder() + .indices(".data-frame-notifications*") + .privileges("view_index_metadata", "read").build() + }, null, null, null, MetadataUtils.DEFAULT_RESERVED_METADATA, null)) .put("watcher_admin", new RoleDescriptor("watcher_admin", new String[] { "manage_watcher" }, new RoleDescriptor.IndicesPrivileges[] { RoleDescriptor.IndicesPrivileges.builder().indices(Watch.INDEX, TriggeredWatchStoreField.INDEX_NAME, diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessageTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessageTests.java new file mode 100644 index 0000000000000..cffa1f0b565cd --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessageTests.java @@ -0,0 +1,131 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.common.notifications; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils; +import org.junit.Before; + +import java.io.IOException; +import java.util.Date; + +public class AbstractAuditMessageTests extends AbstractSerializingTestCase { + private long startMillis; + + static class TestAuditMessage extends AbstractAuditMessage { + private static final ParseField ID = new ParseField("test_id"); + public static final ObjectParser PARSER = new ObjectParser<>(AbstractAuditMessage.TYPE.getPreferredName(), + true, + TestAuditMessage::new); + + static { + PARSER.declareString(AbstractAuditMessage::setResourceId, ID); + PARSER.declareString(AbstractAuditMessage::setMessage, MESSAGE); + PARSER.declareField(AbstractAuditMessage::setLevel, p -> { + if (p.currentToken() == XContentParser.Token.VALUE_STRING) { + return Level.fromString(p.text()); + } + throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); + }, LEVEL, ObjectParser.ValueType.STRING); + PARSER.declareField(AbstractAuditMessage::setTimestamp, parser -> { + if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) { + return new Date(parser.longValue()); + } else if (parser.currentToken() == XContentParser.Token.VALUE_STRING) { + return new Date(TimeUtils.dateStringToEpoch(parser.text())); + } + throw new IllegalArgumentException( + "unexpected token [" + parser.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]"); + }, TIMESTAMP, ObjectParser.ValueType.VALUE); + PARSER.declareString(AbstractAuditMessage::setNodeName, NODE_NAME); + } + + TestAuditMessage() { + super(); + } + + TestAuditMessage(StreamInput in) throws IOException { + super(in); + } + + TestAuditMessage(String resourceId, String message, Level level, String nodeName) { + super(resourceId, message, level, nodeName); + } + + @Override + protected String getResourceField() { + return "test_id"; + } + + static AbstractAuditMessage.AuditMessageBuilder newBuilder() { + return new AuditMessageBuilder<>(TestAuditMessage::new); + } + } + + @Before + public void setStartTime() { + startMillis = System.currentTimeMillis(); + } + + public void testNewInfo() { + TestAuditMessage info = TestAuditMessage.newBuilder().info("foo", "some info", "some_node"); + assertEquals("foo", info.getResourceId()); + assertEquals("some info", info.getMessage()); + assertEquals(Level.INFO, info.getLevel()); + assertDateBetweenStartAndNow(info.getTimestamp()); + } + + public void testNewWarning() { + TestAuditMessage warning = TestAuditMessage.newBuilder().warning("bar", "some warning", "some_node"); + assertEquals("bar", warning.getResourceId()); + assertEquals("some warning", warning.getMessage()); + assertEquals(Level.WARNING, warning.getLevel()); + assertDateBetweenStartAndNow(warning.getTimestamp()); + } + + + public void testNewError() { + TestAuditMessage error = TestAuditMessage.newBuilder().error("foo", "some error", "some_node"); + assertEquals("foo", error.getResourceId()); + assertEquals("some error", error.getMessage()); + assertEquals(Level.ERROR, error.getLevel()); + assertDateBetweenStartAndNow(error.getTimestamp()); + } + + public void testNewActivity() { + TestAuditMessage error = TestAuditMessage.newBuilder().activity("foo", "some error", "some_node"); + assertEquals("foo", error.getResourceId()); + assertEquals("some error", error.getMessage()); + assertEquals(Level.ACTIVITY, error.getLevel()); + assertDateBetweenStartAndNow(error.getTimestamp()); + } + + private void assertDateBetweenStartAndNow(Date timestamp) { + long timestampMillis = timestamp.getTime(); + assertTrue(timestampMillis >= startMillis); + assertTrue(timestampMillis <= System.currentTimeMillis()); + } + + @Override + protected TestAuditMessage doParseInstance(XContentParser parser) { + return TestAuditMessage.PARSER.apply(parser, null); + } + + @Override + protected TestAuditMessage createTestInstance() { + return new TestAuditMessage(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 200), + randomFrom(Level.values()), randomAlphaOfLengthBetween(1, 20)); + } + + @Override + protected Reader instanceReader() { + return TestAuditMessage::new; + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditorTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditorTests.java new file mode 100644 index 0000000000000..84a52e3cea5cb --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditorTests.java @@ -0,0 +1,122 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.common.notifications; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.DeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.Before; +import org.mockito.ArgumentCaptor; + +import java.io.IOException; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class AbstractAuditorTests extends ESTestCase { + private Client client; + private ArgumentCaptor indexRequestCaptor; + private static final String TEST_ORIGIN = "test_origin"; + private static final String TEST_INDEX = "test_index"; + + static class TestAuditor extends AbstractAuditor { + + TestAuditor(Client client, String nodeName) { + super(client, nodeName, AbstractAuditMessageTests.TestAuditMessage.newBuilder()); + } + + @Override + protected String getExecutionOrigin() { + return TEST_ORIGIN; + } + + @Override + protected String getAuditIndex() { + return TEST_INDEX; + } + + @Override + protected void onIndexResponse(IndexResponse response) { + + } + + @Override + protected void onIndexFailure(Exception exception) { + + } + } + + @Before + public void setUpMocks() { + client = mock(Client.class); + ThreadPool threadPool = mock(ThreadPool.class); + when(client.threadPool()).thenReturn(threadPool); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + + indexRequestCaptor = ArgumentCaptor.forClass(IndexRequest.class); + } + + public void testInfo() throws IOException { + TestAuditor auditor = new TestAuditor(client, "node_1"); + auditor.info("foo", "Here is my info"); + + verify(client).index(indexRequestCaptor.capture(), any()); + IndexRequest indexRequest = indexRequestCaptor.getValue(); + assertArrayEquals(new String[] {TEST_INDEX}, indexRequest.indices()); + assertEquals(TimeValue.timeValueSeconds(5), indexRequest.timeout()); + AbstractAuditMessageTests.TestAuditMessage auditMessage = parseAuditMessage(indexRequest.source()); + assertEquals("foo", auditMessage.getResourceId()); + assertEquals("Here is my info", auditMessage.getMessage()); + assertEquals(Level.INFO, auditMessage.getLevel()); + } + + public void testWarning() throws IOException { + TestAuditor auditor = new TestAuditor(client, "node_1"); + auditor.warning("bar", "Here is my warning"); + + verify(client).index(indexRequestCaptor.capture(), any()); + IndexRequest indexRequest = indexRequestCaptor.getValue(); + assertArrayEquals(new String[] {TEST_INDEX}, indexRequest.indices()); + assertEquals(TimeValue.timeValueSeconds(5), indexRequest.timeout()); + AbstractAuditMessageTests.TestAuditMessage auditMessage = parseAuditMessage(indexRequest.source()); + assertEquals("bar", auditMessage.getResourceId()); + assertEquals("Here is my warning", auditMessage.getMessage()); + assertEquals(Level.WARNING, auditMessage.getLevel()); + } + + public void testError() throws IOException { + TestAuditor auditor = new TestAuditor(client, "node_1"); + auditor.error("foobar", "Here is my error"); + + verify(client).index(indexRequestCaptor.capture(), any()); + IndexRequest indexRequest = indexRequestCaptor.getValue(); + assertArrayEquals(new String[] {TEST_INDEX}, indexRequest.indices()); + assertEquals(TimeValue.timeValueSeconds(5), indexRequest.timeout()); + AbstractAuditMessageTests.TestAuditMessage auditMessage = parseAuditMessage(indexRequest.source()); + assertEquals("foobar", auditMessage.getResourceId()); + assertEquals("Here is my error", auditMessage.getMessage()); + assertEquals(Level.ERROR, auditMessage.getLevel()); + } + + private AbstractAuditMessageTests.TestAuditMessage parseAuditMessage(BytesReference msg) throws IOException { + XContentParser parser = XContentFactory.xContent(XContentHelper.xContentType(msg)) + .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, msg.streamInput()); + return AbstractAuditMessageTests.TestAuditMessage.PARSER.apply(parser, null); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/LevelTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/LevelTests.java new file mode 100644 index 0000000000000..5b0e5dba89c97 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/LevelTests.java @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.common.notifications; + +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class LevelTests extends ESTestCase { + + public void testFromString() { + assertEquals(Level.INFO, Level.fromString("info")); + assertEquals(Level.INFO, Level.fromString("INFO")); + assertEquals(Level.ACTIVITY, Level.fromString("activity")); + assertEquals(Level.ACTIVITY, Level.fromString("ACTIVITY")); + assertEquals(Level.WARNING, Level.fromString("warning")); + assertEquals(Level.WARNING, Level.fromString("WARNING")); + assertEquals(Level.ERROR, Level.fromString("error")); + assertEquals(Level.ERROR, Level.fromString("ERROR")); + } + + public void testToString() { + assertEquals("info", Level.INFO.toString()); + assertEquals("activity", Level.ACTIVITY.toString()); + assertEquals("warning", Level.WARNING.toString()); + assertEquals("error", Level.ERROR.toString()); + } + + public void testValidOrdinals() { + assertThat(Level.INFO.ordinal(), equalTo(0)); + assertThat(Level.ACTIVITY.ordinal(), equalTo(1)); + assertThat(Level.WARNING.ordinal(), equalTo(2)); + assertThat(Level.ERROR.ordinal(), equalTo(3)); + } + + public void testwriteTo() throws Exception { + try (BytesStreamOutput out = new BytesStreamOutput()) { + Level.INFO.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + assertThat(in.readVInt(), equalTo(0)); + } + } + + try (BytesStreamOutput out = new BytesStreamOutput()) { + Level.ACTIVITY.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + assertThat(in.readVInt(), equalTo(1)); + } + } + + try (BytesStreamOutput out = new BytesStreamOutput()) { + Level.WARNING.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + assertThat(in.readVInt(), equalTo(2)); + } + } + + try (BytesStreamOutput out = new BytesStreamOutput()) { + Level.ERROR.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + assertThat(in.readVInt(), equalTo(3)); + } + } + } + + public void testReadFrom() throws Exception { + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.writeVInt(0); + try (StreamInput in = out.bytes().streamInput()) { + assertThat(Level.readFromStream(in), equalTo(Level.INFO)); + } + } + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.writeVInt(1); + try (StreamInput in = out.bytes().streamInput()) { + assertThat(Level.readFromStream(in), equalTo(Level.ACTIVITY)); + } + } + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.writeVInt(2); + try (StreamInput in = out.bytes().streamInput()) { + assertThat(Level.readFromStream(in), equalTo(Level.WARNING)); + } + } + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.writeVInt(3); + try (StreamInput in = out.bytes().streamInput()) { + assertThat(Level.readFromStream(in), equalTo(Level.ERROR)); + } + } + } + + public void testInvalidReadFrom() throws Exception { + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.writeVInt(randomIntBetween(4, Integer.MAX_VALUE)); + try (StreamInput in = out.bytes().streamInput()) { + Level.readFromStream(in); + fail("Expected IOException"); + } catch (IOException e) { + assertThat(e.getMessage(), containsString("Unknown Level ordinal [")); + } + } + } + +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/notifications/DataFrameAuditMessageTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/notifications/DataFrameAuditMessageTests.java new file mode 100644 index 0000000000000..38bf4a4ef8915 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/notifications/DataFrameAuditMessageTests.java @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.dataframe.notifications; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.xpack.core.common.notifications.Level; +import org.junit.Before; + +import java.util.Date; + +public class DataFrameAuditMessageTests extends AbstractSerializingTestCase { + private long startMillis; + + @Before + public void setStartTime() { + startMillis = System.currentTimeMillis(); + } + + public void testNewInfo() { + DataFrameAuditMessage info = DataFrameAuditMessage.messageBuilder().info("foo", "some info", "some_node"); + assertEquals("foo", info.getResourceId()); + assertEquals("some info", info.getMessage()); + assertEquals(Level.INFO, info.getLevel()); + assertDateBetweenStartAndNow(info.getTimestamp()); + } + + public void testNewWarning() { + DataFrameAuditMessage warning = DataFrameAuditMessage.messageBuilder().warning("bar", "some warning", "some_node"); + assertEquals("bar", warning.getResourceId()); + assertEquals("some warning", warning.getMessage()); + assertEquals(Level.WARNING, warning.getLevel()); + assertDateBetweenStartAndNow(warning.getTimestamp()); + } + + + public void testNewError() { + DataFrameAuditMessage error = DataFrameAuditMessage.messageBuilder().error("foo", "some error", "some_node"); + assertEquals("foo", error.getResourceId()); + assertEquals("some error", error.getMessage()); + assertEquals(Level.ERROR, error.getLevel()); + assertDateBetweenStartAndNow(error.getTimestamp()); + } + + public void testNewActivity() { + DataFrameAuditMessage error = DataFrameAuditMessage.messageBuilder().activity("foo", "some error", "some_node"); + assertEquals("foo", error.getResourceId()); + assertEquals("some error", error.getMessage()); + assertEquals(Level.ACTIVITY, error.getLevel()); + assertDateBetweenStartAndNow(error.getTimestamp()); + } + + private void assertDateBetweenStartAndNow(Date timestamp) { + long timestampMillis = timestamp.getTime(); + assertTrue(timestampMillis >= startMillis); + assertTrue(timestampMillis <= System.currentTimeMillis()); + } + + @Override + protected DataFrameAuditMessage doParseInstance(XContentParser parser) { + return DataFrameAuditMessage.PARSER.apply(parser, null); + } + + @Override + protected DataFrameAuditMessage createTestInstance() { + DataFrameAuditMessage auditMessage = new DataFrameAuditMessage(); + auditMessage.setLevel(randomFrom(Level.values())); + auditMessage.setMessage(randomAlphaOfLengthBetween(1, 20)); + auditMessage.setNodeName(randomAlphaOfLengthBetween(1, 20)); + auditMessage.setResourceId(randomAlphaOfLengthBetween(1, 20)); + return auditMessage; + } + + @Override + protected Reader instanceReader() { + return DataFrameAuditMessage::new; + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java index cb39388ec9c50..1a25fa90ce2ca 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java @@ -1047,6 +1047,7 @@ public void testDataFrameTransformsAdminRole() { assertThat(role.cluster().check(StopDataFrameTransformAction.NAME, request), is(true)); assertThat(role.runAs().check(randomAlphaOfLengthBetween(1, 30)), is(false)); + assertOnlyReadAllowed(role, ".data-frame-notifications"); assertNoAccessAllowed(role, "foo"); assertNoAccessAllowed(role, ".data-frame-internal-1"); // internal use only @@ -1070,6 +1071,7 @@ public void testDataFrameTransformsUserRole() { assertThat(role.cluster().check(StopDataFrameTransformAction.NAME, request), is(false)); assertThat(role.runAs().check(randomAlphaOfLengthBetween(1, 30)), is(false)); + assertOnlyReadAllowed(role, ".data-frame-notifications"); assertNoAccessAllowed(role, "foo"); assertNoAccessAllowed(role, ".data-frame-internal-1"); diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameAuditorIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameAuditorIT.java new file mode 100644 index 0000000000000..a84f39f63e70c --- /dev/null +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameAuditorIT.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.dataframe.integration; + +import org.elasticsearch.client.Request; +import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; +import org.junit.Before; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.Matchers.is; + +public class DataFrameAuditorIT extends DataFrameRestTestCase { + + private static final String TEST_USER_NAME = "df_admin_plus_data"; + private static final String DATA_ACCESS_ROLE = "test_data_access"; + private static final String BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS = + basicAuthHeaderValue(TEST_USER_NAME, TEST_PASSWORD_SECURE_STRING); + + private static boolean indicesCreated = false; + + // preserve indices in order to reuse source indices in several test cases + @Override + protected boolean preserveIndicesUponCompletion() { + return true; + } + + @Before + public void createIndexes() throws IOException { + + // it's not possible to run it as @BeforeClass as clients aren't initialized then, so we need this little hack + if (indicesCreated) { + return; + } + + createReviewsIndex(); + indicesCreated = true; + setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME); + setupUser(TEST_USER_NAME, Arrays.asList("data_frame_transforms_admin", DATA_ACCESS_ROLE)); + } + + @SuppressWarnings("unchecked") + public void testAuditorWritesAudits() throws Exception { + String transformId = "simplePivotForAudit"; + String dataFrameIndex = "pivot_reviews_user_id_above_20"; + setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, dataFrameIndex); + String query = "\"match\": {\"user_id\": \"user_26\"}"; + + createPivotReviewsTransform(transformId, dataFrameIndex, query, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS); + + startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS); + + // Make sure we wrote to the audit + assertTrue(indexExists(DataFrameInternalIndex.AUDIT_INDEX)); + Request request = new Request("GET", DataFrameInternalIndex.AUDIT_INDEX + "/_search"); + request.setJsonEntity("{\"query\":{\"term\":{\"transform_id\":\"simplePivotForAudit\"}}}"); + Map response = entityAsMap(client().performRequest(request)); + Map hitRsp = (Map) ((List) ((Map)response.get("hits")).get("hits")).get(0); + Map source = (Map)hitRsp.get("_source"); + assertThat(source.get("transform_id"), equalTo(transformId)); + assertThat(source.get("level"), equalTo("info")); + assertThat(source.get("message"), is(notNullValue())); + assertThat(source.get("node_name"), is(notNullValue())); + assertThat(source.get("timestamp"), is(notNullValue())); + } +} diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java index 67d16c654ad8d..44a53121a2794 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java @@ -55,6 +55,7 @@ import org.elasticsearch.xpack.dataframe.action.TransportPutDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.action.TransportStopDataFrameTransformAction; +import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.rest.action.RestDeleteDataFrameTransformAction; @@ -96,6 +97,7 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu private final Settings settings; private final boolean transportClientMode; private final SetOnce dataFrameTransformsConfigManager = new SetOnce<>(); + private final SetOnce dataFrameAuditor = new SetOnce<>(); private final SetOnce schedulerEngine = new SetOnce<>(); public DataFrame(Settings settings) { @@ -175,10 +177,10 @@ public Collection createComponents(Client client, ClusterService cluster if (enabled == false || transportClientMode) { return emptyList(); } - + dataFrameAuditor.set(new DataFrameAuditor(client, clusterService.getNodeName())); dataFrameTransformsConfigManager.set(new DataFrameTransformsConfigManager(client, xContentRegistry)); - return Collections.singletonList(dataFrameTransformsConfigManager.get()); + return Arrays.asList(dataFrameTransformsConfigManager.get(), dataFrameAuditor.get()); } @Override @@ -189,6 +191,11 @@ public UnaryOperator> getIndexTemplateMetaDat } catch (IOException e) { logger.error("Error creating data frame index template", e); } + try { + templates.put(DataFrameInternalIndex.AUDIT_INDEX, DataFrameInternalIndex.getAuditIndexTemplateMetaData()); + } catch (IOException e) { + logger.warn("Error creating data frame audit index", e); + } return templates; }; } @@ -204,8 +211,10 @@ public List> getPersistentTasksExecutor(ClusterServic // the transforms config manager should have been created assert dataFrameTransformsConfigManager.get() != null; + // the auditor should have been created + assert dataFrameAuditor.get() != null; return Collections.singletonList(new DataFrameTransformPersistentTasksExecutor(client, dataFrameTransformsConfigManager.get(), - schedulerEngine.get(), threadPool)); + schedulerEngine.get(), dataFrameAuditor.get(), threadPool)); } @Override diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/notifications/DataFrameAuditor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/notifications/DataFrameAuditor.java new file mode 100644 index 0000000000000..419d0100cba43 --- /dev/null +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/notifications/DataFrameAuditor.java @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.dataframe.notifications; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor; +import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage; + +import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN; +import static org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex.AUDIT_INDEX; + +public class DataFrameAuditor extends AbstractAuditor { + + private static final Logger logger = LogManager.getLogger(DataFrameAuditor.class); + + public DataFrameAuditor(Client client, String nodeName){ + super(client, nodeName, DataFrameAuditMessage.messageBuilder()); + } + + @Override + protected String getExecutionOrigin() { + return DATA_FRAME_ORIGIN; + } + + @Override + protected String getAuditIndex() { + return AUDIT_INDEX; + } + + @Override + protected void onIndexResponse(IndexResponse response) { + logger.trace("Successfully wrote audit message"); + } + + @Override + protected void onIndexFailure(Exception exception) { + logger.debug("Failed to write audit message", exception); + } +} diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java index 1871bc067967e..25f3939680541 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java @@ -13,12 +13,15 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import java.io.IOException; import java.util.Collections; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; +import static org.elasticsearch.xpack.core.dataframe.DataFrameField.DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD; public final class DataFrameInternalIndex { @@ -27,11 +30,16 @@ public final class DataFrameInternalIndex { public static final String INDEX_TEMPLATE_PATTERN = ".data-frame-internal-"; public static final String INDEX_TEMPLATE_NAME = INDEX_TEMPLATE_PATTERN + INDEX_TEMPLATE_VERSION; public static final String INDEX_NAME = INDEX_TEMPLATE_NAME; + public static final String AUDIT_INDEX = ".data-frame-notifications"; // constants for mappings public static final String DYNAMIC = "dynamic"; public static final String PROPERTIES = "properties"; public static final String TYPE = "type"; + public static final String DATE = "date"; + public static final String TEXT = "text"; + public static final String FIELDS = "fields"; + public static final String RAW = "raw"; // data types public static final String DOUBLE = "double"; @@ -50,6 +58,51 @@ public static IndexTemplateMetaData getIndexTemplateMetaData() throws IOExceptio return dataFrameTemplate; } + public static IndexTemplateMetaData getAuditIndexTemplateMetaData() throws IOException { + IndexTemplateMetaData dataFrameTemplate = IndexTemplateMetaData.builder(AUDIT_INDEX) + .patterns(Collections.singletonList(AUDIT_INDEX)) + .version(Version.CURRENT.id) + .settings(Settings.builder() + // the configurations are expected to be small + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")) + .putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(auditMappings())) + .build(); + return dataFrameTemplate; + } + + private static XContentBuilder auditMappings() throws IOException { + XContentBuilder builder = jsonBuilder().startObject(); + builder.startObject(SINGLE_MAPPING_NAME); + addMetaInformation(builder); + builder.startObject(PROPERTIES) + .startObject(DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD) + .field(TYPE, KEYWORD) + .endObject() + .startObject(AbstractAuditMessage.LEVEL.getPreferredName()) + .field(TYPE, KEYWORD) + .endObject() + .startObject(AbstractAuditMessage.MESSAGE.getPreferredName()) + .field(TYPE, TEXT) + .startObject(FIELDS) + .startObject(RAW) + .field(TYPE, KEYWORD) + .endObject() + .endObject() + .endObject() + .startObject(AbstractAuditMessage.TIMESTAMP.getPreferredName()) + .field(TYPE, DATE) + .endObject() + .startObject(AbstractAuditMessage.NODE_NAME.getPreferredName()) + .field(TYPE, KEYWORD) + .endObject() + .endObject() + .endObject() + .endObject(); + + return builder; + } + private static XContentBuilder mappings() throws IOException { XContentBuilder builder = jsonBuilder(); builder.startObject(); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index fb2557edce901..889558b31815d 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -21,6 +21,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.dataframe.DataFrame; +import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import java.util.Map; @@ -33,13 +34,15 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx private final DataFrameTransformsConfigManager transformsConfigManager; private final SchedulerEngine schedulerEngine; private final ThreadPool threadPool; + private final DataFrameAuditor auditor; public DataFrameTransformPersistentTasksExecutor(Client client, DataFrameTransformsConfigManager transformsConfigManager, - SchedulerEngine schedulerEngine, ThreadPool threadPool) { + SchedulerEngine schedulerEngine, DataFrameAuditor auditor, ThreadPool threadPool) { super(DataFrameField.TASK_NAME, DataFrame.TASK_THREAD_POOL_NAME); this.client = client; this.transformsConfigManager = transformsConfigManager; this.schedulerEngine = schedulerEngine; + this.auditor = auditor; this.threadPool = threadPool; } @@ -67,6 +70,7 @@ static SchedulerEngine.Schedule next() { protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, PersistentTasksCustomMetaData.PersistentTask persistentTask, Map headers) { return new DataFrameTransformTask(id, type, action, parentTaskId, persistentTask.getParams(), - (DataFrameTransformState) persistentTask.getState(), client, transformsConfigManager, schedulerEngine, threadPool, headers); + (DataFrameTransformState) persistentTask.getState(), client, transformsConfigManager, schedulerEngine, auditor, threadPool, + headers); } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index ace968ee8a6b3..2e9c008c5b88d 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction.Response; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; +import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import java.util.Map; @@ -50,6 +51,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S private final SchedulerEngine schedulerEngine; private final ThreadPool threadPool; private final DataFrameIndexer indexer; + private final DataFrameAuditor auditor; // the generation of this data frame, for v1 there will be only // 0: data frame not created or still indexing @@ -58,11 +60,12 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S public DataFrameTransformTask(long id, String type, String action, TaskId parentTask, DataFrameTransform transform, DataFrameTransformState state, Client client, DataFrameTransformsConfigManager transformsConfigManager, - SchedulerEngine schedulerEngine, ThreadPool threadPool, Map headers) { + SchedulerEngine schedulerEngine, DataFrameAuditor auditor, ThreadPool threadPool, Map headers) { super(id, type, action, DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + transform.getId(), parentTask, headers); this.transform = transform; this.schedulerEngine = schedulerEngine; this.threadPool = threadPool; + this.auditor = auditor; IndexerState initialState = IndexerState.STOPPED; long initialGeneration = 0; Map initialPosition = null; @@ -84,7 +87,7 @@ public DataFrameTransformTask(long id, String type, String action, TaskId parent } this.indexer = new ClientDataFrameIndexer(transform.getId(), transformsConfigManager, new AtomicReference<>(initialState), - initialPosition, client); + initialPosition, client, auditor); this.generation = new AtomicReference(initialGeneration); } @@ -139,6 +142,7 @@ public synchronized void start(ActionListener listener) { updatePersistentTaskState(state, ActionListener.wrap( (task) -> { + auditor.info(transform.getId(), "Updated state to [" + state.getIndexerState() + "]"); logger.debug("Successfully updated state for data frame transform [" + transform.getId() + "] to [" + state.getIndexerState() + "][" + state.getPosition() + "]"); listener.onResponse(new StartDataFrameTransformAction.Response(true)); @@ -166,6 +170,7 @@ public synchronized void stop(ActionListener { + auditor.info(transform.getId(), "Updated state to [" + state.getIndexerState() + "]"); logger.debug("Successfully updated state for data frame transform [{}] to [{}]", transform.getId(), state.getIndexerState()); listener.onResponse(new StopDataFrameTransformAction.Response(true)); @@ -227,15 +232,17 @@ protected class ClientDataFrameIndexer extends DataFrameIndexer { private final Client client; private final DataFrameTransformsConfigManager transformsConfigManager; private final String transformId; + private final DataFrameAuditor auditor; private DataFrameTransformConfig transformConfig = null; public ClientDataFrameIndexer(String transformId, DataFrameTransformsConfigManager transformsConfigManager, - AtomicReference initialState, Map initialPosition, Client client) { + AtomicReference initialState, Map initialPosition, Client client, DataFrameAuditor auditor) { super(threadPool.executor(ThreadPool.Names.GENERIC), initialState, initialPosition); this.transformId = transformId; this.transformsConfigManager = transformsConfigManager; this.client = client; + this.auditor = auditor; } @Override @@ -270,6 +277,7 @@ public synchronized boolean maybeTriggerAsyncJob(long now) { // todo: set job into failed state if (transformConfig.isValid() == false) { + auditor.error(transformId, "Cannot execute data frame transform as configuration is invalid"); throw new RuntimeException( DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID, transformId)); } @@ -313,16 +321,19 @@ protected void doSaveState(IndexerState indexerState, Map positi @Override protected void onFailure(Exception exc) { + auditor.error(transform.getId(), "Data frame transform failed with an exception: " + exc.getMessage()); logger.warn("Data frame transform [" + transform.getId() + "] failed with an exception: ", exc); } @Override protected void onFinish() { + auditor.info(transform.getId(), "Finished indexing for data frame transform"); logger.info("Finished indexing for data frame transform [" + transform.getId() + "]"); } @Override protected void onAbort() { + auditor.info(transform.getId(), "Received abort request, stopping indexer"); logger.info("Data frame transform [" + transform.getId() + "] received abort request, stopping indexer"); shutdown(); } From ad006db40c8c599769a6855767264c5203ae83dd Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 14 Mar 2019 09:44:35 -0500 Subject: [PATCH 2/3] Adjusting Level, Auditor, and message to address pr comments --- .../notifications/AbstractAuditMessage.java | 113 ++++-------------- .../{AbstractAuditor.java => Auditor.java} | 33 +++-- .../core/common/notifications/Level.java | 18 +-- .../notifications/DataFrameAuditMessage.java | 37 +++--- .../AbstractAuditMessageTests.java | 81 ++++++------- ...actAuditorTests.java => AuditorTests.java} | 38 +----- .../core/common/notifications/LevelTests.java | 83 +------------ .../DataFrameAuditMessageTests.java | 37 +++--- .../xpack/dataframe/DataFrame.java | 12 +- .../notifications/DataFrameAuditor.java | 45 ------- ...FrameTransformPersistentTasksExecutor.java | 12 +- .../transforms/DataFrameTransformTask.java | 15 ++- 12 files changed, 152 insertions(+), 372 deletions(-) rename x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/{AbstractAuditor.java => Auditor.java} (68%) rename x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/{AbstractAuditorTests.java => AuditorTests.java} (81%) delete mode 100644 x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/notifications/DataFrameAuditor.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessage.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessage.java index 3c57c57a2e28f..85327337730f4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessage.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessage.java @@ -6,9 +6,6 @@ package org.elasticsearch.xpack.core.common.notifications; import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -16,9 +13,8 @@ import java.io.IOException; import java.util.Date; import java.util.Objects; -import java.util.function.Supplier; -public abstract class AbstractAuditMessage implements ToXContentObject, Writeable { +public abstract class AbstractAuditMessage implements ToXContentObject { public static final ParseField TYPE = new ParseField("audit_message"); public static final ParseField MESSAGE = new ParseField("message"); @@ -26,108 +22,57 @@ public abstract class AbstractAuditMessage implements ToXContentObject, Writeabl public static final ParseField TIMESTAMP = new ParseField("timestamp"); public static final ParseField NODE_NAME = new ParseField("node_name"); - private String resourceId; - private String message; - private Level level; - private Date timestamp; - private String nodeName; + private final String resourceId; + private final String message; + private final Level level; + private final Date timestamp; + private final String nodeName; - protected AbstractAuditMessage() { - this.timestamp = new Date(); - } - - AbstractAuditMessage(String resourceId, String message, Level level, String nodeName) { + public AbstractAuditMessage(String resourceId, String message, Level level, String nodeName) { this.resourceId = resourceId; - this.message = message; - this.level = level; + this.message = Objects.requireNonNull(message); + this.level = Objects.requireNonNull(level); this.timestamp = new Date(); this.nodeName = nodeName; } - public AbstractAuditMessage(StreamInput in) throws IOException { - resourceId = in.readOptionalString(); - message = in.readOptionalString(); - if (in.readBoolean()) { - level = Level.readFromStream(in); - } - if (in.readBoolean()) { - timestamp = new Date(in.readLong()); - } - nodeName = in.readOptionalString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeOptionalString(resourceId); - out.writeOptionalString(message); - boolean hasLevel = level != null; - out.writeBoolean(hasLevel); - if (hasLevel) { - level.writeTo(out); - } - boolean hasTimestamp = timestamp != null; - out.writeBoolean(hasTimestamp); - if (hasTimestamp) { - out.writeLong(timestamp.getTime()); - } - out.writeOptionalString(nodeName); + protected AbstractAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) { + this.resourceId = resourceId; + this.message = Objects.requireNonNull(message); + this.level = Objects.requireNonNull(level); + this.timestamp = Objects.requireNonNull(timestamp); + this.nodeName = nodeName; } public final String getResourceId() { return resourceId; } - public final void setResourceId(String resourceId) { - this.resourceId = resourceId; - } - public final String getMessage() { return message; } - public final void setMessage(String message) { - this.message = message; - } - public final Level getLevel() { return level; } - public final void setLevel(Level level) { - this.level = level; - } - public final Date getTimestamp() { return timestamp; } - public final void setTimestamp(Date timestamp) { - this.timestamp = timestamp; - } - public final String getNodeName() { return nodeName; } - public final void setNodeName(String nodeName) { - this.nodeName = nodeName; - } - @Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startObject(); if (resourceId != null) { builder.field(getResourceField(), resourceId); } - if (message != null) { - builder.field(MESSAGE.getPreferredName(), message); - } - if (level != null) { - builder.field(LEVEL.getPreferredName(), level); - } - if (timestamp != null) { - builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime()); - } + builder.field(MESSAGE.getPreferredName(), message); + builder.field(LEVEL.getPreferredName(), level); + builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime()); if (nodeName != null) { builder.field(NODE_NAME.getPreferredName(), nodeName); } @@ -158,17 +103,7 @@ public boolean equals(Object obj) { protected abstract String getResourceField(); - public static final class AuditMessageBuilder { - - private final Supplier auditSupplier; - - public AuditMessageBuilder(Supplier auditSupplier) { - this.auditSupplier = Objects.requireNonNull(auditSupplier); - } - - public T activity(String resourceId, String message, String nodeName) { - return newMessage(Level.ACTIVITY, resourceId, message, nodeName); - } + public abstract static class AbstractBuilder { public T info(String resourceId, String message, String nodeName) { return newMessage(Level.INFO, resourceId, message, nodeName); @@ -182,14 +117,6 @@ public T error(String resourceId, String message, String nodeName) { return newMessage(Level.ERROR, resourceId, message, nodeName); } - private T newMessage(Level level, String resourceId, String message, String nodeName) { - T audit = auditSupplier.get(); - audit.setLevel(level); - audit.setResourceId(resourceId); - audit.setMessage(message); - audit.setNodeName(nodeName); - return audit; - } - + protected abstract T newMessage(Level level, String resourceId, String message, String nodeName); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/Auditor.java similarity index 68% rename from x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java rename to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/Auditor.java index 566da2a021b66..01acb18900b2d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/Auditor.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.core.common.notifications; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; @@ -19,15 +21,24 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; -public abstract class AbstractAuditor { +public class Auditor { + private static final Logger logger = LogManager.getLogger(Auditor.class); private final Client client; private final String nodeName; - private final AbstractAuditMessage.AuditMessageBuilder messageBuilder; + private final String auditIndex; + private final String executionOrigin; + private final AbstractAuditMessage.AbstractBuilder messageBuilder; - public AbstractAuditor(Client client, String nodeName, AbstractAuditMessage.AuditMessageBuilder messageBuilder) { + public Auditor(Client client, + String nodeName, + String auditIndex, + String executionOrigin, + AbstractAuditMessage.AbstractBuilder messageBuilder) { this.client = Objects.requireNonNull(client); this.nodeName = Objects.requireNonNull(nodeName); + this.auditIndex = auditIndex; + this.executionOrigin = executionOrigin; this.messageBuilder = Objects.requireNonNull(messageBuilder); } @@ -43,20 +54,20 @@ public final void error(String resourceId, String message) { indexDoc(messageBuilder.error(resourceId, message, nodeName)); } - protected abstract String getExecutionOrigin(); - - protected abstract String getAuditIndex(); - - protected abstract void onIndexResponse(IndexResponse response); + protected void onIndexResponse(IndexResponse response) { + logger.trace("Successfully wrote audit message"); + } - protected abstract void onIndexFailure(Exception exception); + protected void onIndexFailure(Exception exception) { + logger.debug("Failed to write audit message", exception); + } private void indexDoc(ToXContent toXContent) { - IndexRequest indexRequest = new IndexRequest(getAuditIndex()); + IndexRequest indexRequest = new IndexRequest(auditIndex); indexRequest.source(toXContentBuilder(toXContent)); indexRequest.timeout(TimeValue.timeValueSeconds(5)); executeAsyncWithOrigin(client.threadPool().getThreadContext(), - getExecutionOrigin(), + executionOrigin, indexRequest, ActionListener.wrap( this::onIndexResponse, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/Level.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/Level.java index b8cf5ada10f0e..34aac72517ee0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/Level.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/Level.java @@ -5,15 +5,10 @@ */ package org.elasticsearch.xpack.core.common.notifications; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Writeable; - -import java.io.IOException; import java.util.Locale; -public enum Level implements Writeable { - INFO, ACTIVITY, WARNING, ERROR; +public enum Level { + INFO, WARNING, ERROR; /** * Case-insensitive from string method. @@ -26,15 +21,6 @@ public static Level fromString(String value) { return Level.valueOf(value.toUpperCase(Locale.ROOT)); } - public static Level readFromStream(StreamInput in) throws IOException { - return in.readEnum(Level.class); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeEnum(this); - } - @Override public String toString() { return name().toLowerCase(Locale.ROOT); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/notifications/DataFrameAuditMessage.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/notifications/DataFrameAuditMessage.java index b72d764b37452..7dab9be6ab3cc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/notifications/DataFrameAuditMessage.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/notifications/DataFrameAuditMessage.java @@ -6,35 +6,37 @@ package org.elasticsearch.xpack.core.dataframe.notifications; import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage; import org.elasticsearch.xpack.core.common.notifications.Level; import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils; -import java.io.IOException; import java.util.Date; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; import static org.elasticsearch.xpack.core.dataframe.DataFrameField.DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD; public class DataFrameAuditMessage extends AbstractAuditMessage { private static final ParseField TRANSFORM_ID = new ParseField(DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD); - public static final ObjectParser PARSER = new ObjectParser<>("data_frame_audit_message", + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "data_frame_audit_message", true, - DataFrameAuditMessage::new); + a -> new DataFrameAuditMessage((String)a[0], (String)a[1], (Level)a[2], (Date)a[3], (String)a[4])); static { - PARSER.declareString(AbstractAuditMessage::setResourceId, TRANSFORM_ID); - PARSER.declareString(AbstractAuditMessage::setMessage, MESSAGE); - PARSER.declareField(AbstractAuditMessage::setLevel, p -> { + PARSER.declareString(optionalConstructorArg(), TRANSFORM_ID); + PARSER.declareString(constructorArg(), MESSAGE); + PARSER.declareField(constructorArg(), p -> { if (p.currentToken() == XContentParser.Token.VALUE_STRING) { return Level.fromString(p.text()); } throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); }, LEVEL, ObjectParser.ValueType.STRING); - PARSER.declareField(AbstractAuditMessage::setTimestamp, parser -> { + PARSER.declareField(constructorArg(), parser -> { if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) { return new Date(parser.longValue()); } else if (parser.currentToken() == XContentParser.Token.VALUE_STRING) { @@ -43,15 +45,15 @@ public class DataFrameAuditMessage extends AbstractAuditMessage { throw new IllegalArgumentException( "unexpected token [" + parser.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]"); }, TIMESTAMP, ObjectParser.ValueType.VALUE); - PARSER.declareString(AbstractAuditMessage::setNodeName, NODE_NAME); + PARSER.declareString(optionalConstructorArg(), NODE_NAME); } - public DataFrameAuditMessage(StreamInput in) throws IOException { - super(in); + public DataFrameAuditMessage(String resourceId, String message, Level level, String nodeName) { + super(resourceId, message, level, nodeName); } - public DataFrameAuditMessage(){ - super(); + protected DataFrameAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) { + super(resourceId, message, level, timestamp, nodeName); } @Override @@ -59,7 +61,12 @@ protected String getResourceField() { return TRANSFORM_ID.getPreferredName(); } - public static AbstractAuditMessage.AuditMessageBuilder messageBuilder() { - return new AuditMessageBuilder<>(DataFrameAuditMessage::new); + public static AbstractAuditMessage.AbstractBuilder builder() { + return new AbstractBuilder() { + @Override + protected DataFrameAuditMessage newMessage(Level level, String resourceId, String message, String nodeName) { + return new DataFrameAuditMessage(resourceId, message, level, nodeName); + } + }; } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessageTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessageTests.java index cffa1f0b565cd..8fb425698376e 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessageTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessageTests.java @@ -6,66 +6,69 @@ package org.elasticsearch.xpack.core.common.notifications; import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.test.AbstractXContentTestCase; import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils; import org.junit.Before; -import java.io.IOException; import java.util.Date; -public class AbstractAuditMessageTests extends AbstractSerializingTestCase { +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; + +public class AbstractAuditMessageTests extends AbstractXContentTestCase { private long startMillis; static class TestAuditMessage extends AbstractAuditMessage { private static final ParseField ID = new ParseField("test_id"); - public static final ObjectParser PARSER = new ObjectParser<>(AbstractAuditMessage.TYPE.getPreferredName(), + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + AbstractAuditMessage.TYPE.getPreferredName(), true, - TestAuditMessage::new); + a -> new TestAuditMessage((String)a[0], (String)a[1], (Level)a[2], (Date)a[3], (String)a[4])); static { - PARSER.declareString(AbstractAuditMessage::setResourceId, ID); - PARSER.declareString(AbstractAuditMessage::setMessage, MESSAGE); - PARSER.declareField(AbstractAuditMessage::setLevel, p -> { + PARSER.declareString(optionalConstructorArg(), ID); + PARSER.declareString(constructorArg(), MESSAGE); + PARSER.declareField(constructorArg(), p -> { if (p.currentToken() == XContentParser.Token.VALUE_STRING) { return Level.fromString(p.text()); } throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); }, LEVEL, ObjectParser.ValueType.STRING); - PARSER.declareField(AbstractAuditMessage::setTimestamp, parser -> { - if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) { - return new Date(parser.longValue()); - } else if (parser.currentToken() == XContentParser.Token.VALUE_STRING) { - return new Date(TimeUtils.dateStringToEpoch(parser.text())); - } - throw new IllegalArgumentException( - "unexpected token [" + parser.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]"); - }, TIMESTAMP, ObjectParser.ValueType.VALUE); - PARSER.declareString(AbstractAuditMessage::setNodeName, NODE_NAME); - } - - TestAuditMessage() { - super(); - } - - TestAuditMessage(StreamInput in) throws IOException { - super(in); + PARSER.declareField(constructorArg(), parser -> { + if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) { + return new Date(parser.longValue()); + } else if (parser.currentToken() == XContentParser.Token.VALUE_STRING) { + return new Date(TimeUtils.dateStringToEpoch(parser.text())); + } + throw new IllegalArgumentException( + "unexpected token [" + parser.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]"); + }, TIMESTAMP, ObjectParser.ValueType.VALUE); + PARSER.declareString(optionalConstructorArg(), NODE_NAME); } TestAuditMessage(String resourceId, String message, Level level, String nodeName) { super(resourceId, message, level, nodeName); } + TestAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) { + super(resourceId, message, level, timestamp, nodeName); + } + @Override protected String getResourceField() { return "test_id"; } - static AbstractAuditMessage.AuditMessageBuilder newBuilder() { - return new AuditMessageBuilder<>(TestAuditMessage::new); + static AbstractAuditMessage.AbstractBuilder newBuilder() { + return new AbstractBuilder() { + @Override + protected TestAuditMessage newMessage(Level level, String resourceId, String message, String nodeName) { + return new TestAuditMessage(resourceId, message, level, nodeName); + } + }; } } @@ -99,14 +102,6 @@ public void testNewError() { assertDateBetweenStartAndNow(error.getTimestamp()); } - public void testNewActivity() { - TestAuditMessage error = TestAuditMessage.newBuilder().activity("foo", "some error", "some_node"); - assertEquals("foo", error.getResourceId()); - assertEquals("some error", error.getMessage()); - assertEquals(Level.ACTIVITY, error.getLevel()); - assertDateBetweenStartAndNow(error.getTimestamp()); - } - private void assertDateBetweenStartAndNow(Date timestamp) { long timestampMillis = timestamp.getTime(); assertTrue(timestampMillis >= startMillis); @@ -119,13 +114,13 @@ protected TestAuditMessage doParseInstance(XContentParser parser) { } @Override - protected TestAuditMessage createTestInstance() { - return new TestAuditMessage(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 200), - randomFrom(Level.values()), randomAlphaOfLengthBetween(1, 20)); + protected boolean supportsUnknownFields() { + return true; } @Override - protected Reader instanceReader() { - return TestAuditMessage::new; + protected TestAuditMessage createTestInstance() { + return new TestAuditMessage(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 200), + randomFrom(Level.values()), randomAlphaOfLengthBetween(1, 20)); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditorTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AuditorTests.java similarity index 81% rename from x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditorTests.java rename to x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AuditorTests.java index 84a52e3cea5cb..1389af62dc719 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditorTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AuditorTests.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.core.common.notifications; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; @@ -29,38 +28,13 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class AbstractAuditorTests extends ESTestCase { +public class AuditorTests extends ESTestCase { private Client client; private ArgumentCaptor indexRequestCaptor; private static final String TEST_ORIGIN = "test_origin"; private static final String TEST_INDEX = "test_index"; - - static class TestAuditor extends AbstractAuditor { - - TestAuditor(Client client, String nodeName) { - super(client, nodeName, AbstractAuditMessageTests.TestAuditMessage.newBuilder()); - } - - @Override - protected String getExecutionOrigin() { - return TEST_ORIGIN; - } - - @Override - protected String getAuditIndex() { - return TEST_INDEX; - } - - @Override - protected void onIndexResponse(IndexResponse response) { - - } - - @Override - protected void onIndexFailure(Exception exception) { - - } - } + private static final AbstractAuditMessage.AbstractBuilder builder = + AbstractAuditMessageTests.TestAuditMessage.newBuilder(); @Before public void setUpMocks() { @@ -73,7 +47,7 @@ public void setUpMocks() { } public void testInfo() throws IOException { - TestAuditor auditor = new TestAuditor(client, "node_1"); + Auditor auditor = new Auditor<>(client, "node_1", TEST_INDEX, TEST_ORIGIN, builder); auditor.info("foo", "Here is my info"); verify(client).index(indexRequestCaptor.capture(), any()); @@ -87,7 +61,7 @@ public void testInfo() throws IOException { } public void testWarning() throws IOException { - TestAuditor auditor = new TestAuditor(client, "node_1"); + Auditor auditor = new Auditor<>(client, "node_1", TEST_INDEX, TEST_ORIGIN, builder); auditor.warning("bar", "Here is my warning"); verify(client).index(indexRequestCaptor.capture(), any()); @@ -101,7 +75,7 @@ public void testWarning() throws IOException { } public void testError() throws IOException { - TestAuditor auditor = new TestAuditor(client, "node_1"); + Auditor auditor = new Auditor<>(client, "node_1", TEST_INDEX, TEST_ORIGIN, builder); auditor.error("foobar", "Here is my error"); verify(client).index(indexRequestCaptor.capture(), any()); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/LevelTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/LevelTests.java index 5b0e5dba89c97..a66d230b4678d 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/LevelTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/LevelTests.java @@ -5,13 +5,8 @@ */ package org.elasticsearch.xpack.core.common.notifications; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.test.ESTestCase; -import java.io.IOException; - -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; public class LevelTests extends ESTestCase { @@ -19,8 +14,6 @@ public class LevelTests extends ESTestCase { public void testFromString() { assertEquals(Level.INFO, Level.fromString("info")); assertEquals(Level.INFO, Level.fromString("INFO")); - assertEquals(Level.ACTIVITY, Level.fromString("activity")); - assertEquals(Level.ACTIVITY, Level.fromString("ACTIVITY")); assertEquals(Level.WARNING, Level.fromString("warning")); assertEquals(Level.WARNING, Level.fromString("WARNING")); assertEquals(Level.ERROR, Level.fromString("error")); @@ -29,85 +22,13 @@ public void testFromString() { public void testToString() { assertEquals("info", Level.INFO.toString()); - assertEquals("activity", Level.ACTIVITY.toString()); assertEquals("warning", Level.WARNING.toString()); assertEquals("error", Level.ERROR.toString()); } public void testValidOrdinals() { assertThat(Level.INFO.ordinal(), equalTo(0)); - assertThat(Level.ACTIVITY.ordinal(), equalTo(1)); - assertThat(Level.WARNING.ordinal(), equalTo(2)); - assertThat(Level.ERROR.ordinal(), equalTo(3)); - } - - public void testwriteTo() throws Exception { - try (BytesStreamOutput out = new BytesStreamOutput()) { - Level.INFO.writeTo(out); - try (StreamInput in = out.bytes().streamInput()) { - assertThat(in.readVInt(), equalTo(0)); - } - } - - try (BytesStreamOutput out = new BytesStreamOutput()) { - Level.ACTIVITY.writeTo(out); - try (StreamInput in = out.bytes().streamInput()) { - assertThat(in.readVInt(), equalTo(1)); - } - } - - try (BytesStreamOutput out = new BytesStreamOutput()) { - Level.WARNING.writeTo(out); - try (StreamInput in = out.bytes().streamInput()) { - assertThat(in.readVInt(), equalTo(2)); - } - } - - try (BytesStreamOutput out = new BytesStreamOutput()) { - Level.ERROR.writeTo(out); - try (StreamInput in = out.bytes().streamInput()) { - assertThat(in.readVInt(), equalTo(3)); - } - } + assertThat(Level.WARNING.ordinal(), equalTo(1)); + assertThat(Level.ERROR.ordinal(), equalTo(2)); } - - public void testReadFrom() throws Exception { - try (BytesStreamOutput out = new BytesStreamOutput()) { - out.writeVInt(0); - try (StreamInput in = out.bytes().streamInput()) { - assertThat(Level.readFromStream(in), equalTo(Level.INFO)); - } - } - try (BytesStreamOutput out = new BytesStreamOutput()) { - out.writeVInt(1); - try (StreamInput in = out.bytes().streamInput()) { - assertThat(Level.readFromStream(in), equalTo(Level.ACTIVITY)); - } - } - try (BytesStreamOutput out = new BytesStreamOutput()) { - out.writeVInt(2); - try (StreamInput in = out.bytes().streamInput()) { - assertThat(Level.readFromStream(in), equalTo(Level.WARNING)); - } - } - try (BytesStreamOutput out = new BytesStreamOutput()) { - out.writeVInt(3); - try (StreamInput in = out.bytes().streamInput()) { - assertThat(Level.readFromStream(in), equalTo(Level.ERROR)); - } - } - } - - public void testInvalidReadFrom() throws Exception { - try (BytesStreamOutput out = new BytesStreamOutput()) { - out.writeVInt(randomIntBetween(4, Integer.MAX_VALUE)); - try (StreamInput in = out.bytes().streamInput()) { - Level.readFromStream(in); - fail("Expected IOException"); - } catch (IOException e) { - assertThat(e.getMessage(), containsString("Unknown Level ordinal [")); - } - } - } - } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/notifications/DataFrameAuditMessageTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/notifications/DataFrameAuditMessageTests.java index 38bf4a4ef8915..e845dd76fc679 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/notifications/DataFrameAuditMessageTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/notifications/DataFrameAuditMessageTests.java @@ -5,15 +5,14 @@ */ package org.elasticsearch.xpack.core.dataframe.notifications; -import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.test.AbstractXContentTestCase; import org.elasticsearch.xpack.core.common.notifications.Level; import org.junit.Before; import java.util.Date; -public class DataFrameAuditMessageTests extends AbstractSerializingTestCase { +public class DataFrameAuditMessageTests extends AbstractXContentTestCase { private long startMillis; @Before @@ -22,7 +21,7 @@ public void setStartTime() { } public void testNewInfo() { - DataFrameAuditMessage info = DataFrameAuditMessage.messageBuilder().info("foo", "some info", "some_node"); + DataFrameAuditMessage info = DataFrameAuditMessage.builder().info("foo", "some info", "some_node"); assertEquals("foo", info.getResourceId()); assertEquals("some info", info.getMessage()); assertEquals(Level.INFO, info.getLevel()); @@ -30,7 +29,7 @@ public void testNewInfo() { } public void testNewWarning() { - DataFrameAuditMessage warning = DataFrameAuditMessage.messageBuilder().warning("bar", "some warning", "some_node"); + DataFrameAuditMessage warning = DataFrameAuditMessage.builder().warning("bar", "some warning", "some_node"); assertEquals("bar", warning.getResourceId()); assertEquals("some warning", warning.getMessage()); assertEquals(Level.WARNING, warning.getLevel()); @@ -39,21 +38,13 @@ public void testNewWarning() { public void testNewError() { - DataFrameAuditMessage error = DataFrameAuditMessage.messageBuilder().error("foo", "some error", "some_node"); + DataFrameAuditMessage error = DataFrameAuditMessage.builder().error("foo", "some error", "some_node"); assertEquals("foo", error.getResourceId()); assertEquals("some error", error.getMessage()); assertEquals(Level.ERROR, error.getLevel()); assertDateBetweenStartAndNow(error.getTimestamp()); } - public void testNewActivity() { - DataFrameAuditMessage error = DataFrameAuditMessage.messageBuilder().activity("foo", "some error", "some_node"); - assertEquals("foo", error.getResourceId()); - assertEquals("some error", error.getMessage()); - assertEquals(Level.ACTIVITY, error.getLevel()); - assertDateBetweenStartAndNow(error.getTimestamp()); - } - private void assertDateBetweenStartAndNow(Date timestamp) { long timestampMillis = timestamp.getTime(); assertTrue(timestampMillis >= startMillis); @@ -66,17 +57,17 @@ protected DataFrameAuditMessage doParseInstance(XContentParser parser) { } @Override - protected DataFrameAuditMessage createTestInstance() { - DataFrameAuditMessage auditMessage = new DataFrameAuditMessage(); - auditMessage.setLevel(randomFrom(Level.values())); - auditMessage.setMessage(randomAlphaOfLengthBetween(1, 20)); - auditMessage.setNodeName(randomAlphaOfLengthBetween(1, 20)); - auditMessage.setResourceId(randomAlphaOfLengthBetween(1, 20)); - return auditMessage; + protected boolean supportsUnknownFields() { + return true; } @Override - protected Reader instanceReader() { - return DataFrameAuditMessage::new; + protected DataFrameAuditMessage createTestInstance() { + return new DataFrameAuditMessage( + randomBoolean() ? null : randomAlphaOfLength(10), + randomAlphaOfLengthBetween(1, 20), + randomFrom(Level.values()), + randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20) + ); } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java index 44a53121a2794..2e899fe5cdeb0 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java @@ -40,6 +40,7 @@ import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.common.notifications.Auditor; import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction; @@ -47,6 +48,7 @@ import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; +import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.dataframe.action.TransportDeleteDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.action.TransportGetDataFrameTransformsAction; @@ -55,7 +57,6 @@ import org.elasticsearch.xpack.dataframe.action.TransportPutDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.action.TransportStopDataFrameTransformAction; -import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.rest.action.RestDeleteDataFrameTransformAction; @@ -81,6 +82,7 @@ import java.util.function.UnaryOperator; import static java.util.Collections.emptyList; +import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN; public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlugin { @@ -97,7 +99,7 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu private final Settings settings; private final boolean transportClientMode; private final SetOnce dataFrameTransformsConfigManager = new SetOnce<>(); - private final SetOnce dataFrameAuditor = new SetOnce<>(); + private final SetOnce> dataFrameAuditor = new SetOnce<>(); private final SetOnce schedulerEngine = new SetOnce<>(); public DataFrame(Settings settings) { @@ -177,7 +179,11 @@ public Collection createComponents(Client client, ClusterService cluster if (enabled == false || transportClientMode) { return emptyList(); } - dataFrameAuditor.set(new DataFrameAuditor(client, clusterService.getNodeName())); + dataFrameAuditor.set(new Auditor<>(client, + clusterService.getNodeName(), + DataFrameInternalIndex.AUDIT_INDEX, + DATA_FRAME_ORIGIN, + DataFrameAuditMessage.builder())); dataFrameTransformsConfigManager.set(new DataFrameTransformsConfigManager(client, xContentRegistry)); return Arrays.asList(dataFrameTransformsConfigManager.get(), dataFrameAuditor.get()); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/notifications/DataFrameAuditor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/notifications/DataFrameAuditor.java deleted file mode 100644 index 419d0100cba43..0000000000000 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/notifications/DataFrameAuditor.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.dataframe.notifications; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor; -import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage; - -import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN; -import static org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex.AUDIT_INDEX; - -public class DataFrameAuditor extends AbstractAuditor { - - private static final Logger logger = LogManager.getLogger(DataFrameAuditor.class); - - public DataFrameAuditor(Client client, String nodeName){ - super(client, nodeName, DataFrameAuditMessage.messageBuilder()); - } - - @Override - protected String getExecutionOrigin() { - return DATA_FRAME_ORIGIN; - } - - @Override - protected String getAuditIndex() { - return AUDIT_INDEX; - } - - @Override - protected void onIndexResponse(IndexResponse response) { - logger.trace("Successfully wrote audit message"); - } - - @Override - protected void onIndexFailure(Exception exception) { - logger.debug("Failed to write audit message", exception); - } -} diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index 889558b31815d..cfe6993db2975 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -16,12 +16,13 @@ import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.common.notifications.Auditor; import org.elasticsearch.xpack.core.dataframe.DataFrameField; +import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.dataframe.DataFrame; -import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import java.util.Map; @@ -34,10 +35,13 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx private final DataFrameTransformsConfigManager transformsConfigManager; private final SchedulerEngine schedulerEngine; private final ThreadPool threadPool; - private final DataFrameAuditor auditor; + private final Auditor auditor; - public DataFrameTransformPersistentTasksExecutor(Client client, DataFrameTransformsConfigManager transformsConfigManager, - SchedulerEngine schedulerEngine, DataFrameAuditor auditor, ThreadPool threadPool) { + public DataFrameTransformPersistentTasksExecutor(Client client, + DataFrameTransformsConfigManager transformsConfigManager, + SchedulerEngine schedulerEngine, + Auditor auditor, + ThreadPool threadPool) { super(DataFrameField.TASK_NAME, DataFrame.TASK_THREAD_POOL_NAME); this.client = client; this.transformsConfigManager = transformsConfigManager; diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 2e9c008c5b88d..3bb69024dd887 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -22,8 +22,10 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.common.notifications.Auditor; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; +import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; @@ -34,7 +36,6 @@ import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction.Response; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; -import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import java.util.Map; @@ -51,7 +52,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S private final SchedulerEngine schedulerEngine; private final ThreadPool threadPool; private final DataFrameIndexer indexer; - private final DataFrameAuditor auditor; + private final Auditor auditor; // the generation of this data frame, for v1 there will be only // 0: data frame not created or still indexing @@ -59,8 +60,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S private final AtomicReference generation; public DataFrameTransformTask(long id, String type, String action, TaskId parentTask, DataFrameTransform transform, - DataFrameTransformState state, Client client, DataFrameTransformsConfigManager transformsConfigManager, - SchedulerEngine schedulerEngine, DataFrameAuditor auditor, ThreadPool threadPool, Map headers) { + DataFrameTransformState state, Client client, DataFrameTransformsConfigManager transformsConfigManager, + SchedulerEngine schedulerEngine, Auditor auditor, + ThreadPool threadPool, Map headers) { super(id, type, action, DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + transform.getId(), parentTask, headers); this.transform = transform; this.schedulerEngine = schedulerEngine; @@ -232,12 +234,13 @@ protected class ClientDataFrameIndexer extends DataFrameIndexer { private final Client client; private final DataFrameTransformsConfigManager transformsConfigManager; private final String transformId; - private final DataFrameAuditor auditor; + private final Auditor auditor; private DataFrameTransformConfig transformConfig = null; public ClientDataFrameIndexer(String transformId, DataFrameTransformsConfigManager transformsConfigManager, - AtomicReference initialState, Map initialPosition, Client client, DataFrameAuditor auditor) { + AtomicReference initialState, Map initialPosition, Client client, + Auditor auditor) { super(threadPool.executor(ThreadPool.Names.GENERIC), initialState, initialPosition); this.transformId = transformId; this.transformsConfigManager = transformsConfigManager; From c55d74c9c10b965e63844882971c1d6f06083063 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 14 Mar 2019 10:46:24 -0500 Subject: [PATCH 3/3] Addressing PR comments --- .../security/authz/store/ReservedRolesStoreTests.java | 4 ++-- .../dataframe/persistence/DataFrameInternalIndex.java | 10 +++++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java index 1a25fa90ce2ca..9d970cca55119 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java @@ -1047,7 +1047,7 @@ public void testDataFrameTransformsAdminRole() { assertThat(role.cluster().check(StopDataFrameTransformAction.NAME, request), is(true)); assertThat(role.runAs().check(randomAlphaOfLengthBetween(1, 30)), is(false)); - assertOnlyReadAllowed(role, ".data-frame-notifications"); + assertOnlyReadAllowed(role, ".data-frame-notifications-1"); assertNoAccessAllowed(role, "foo"); assertNoAccessAllowed(role, ".data-frame-internal-1"); // internal use only @@ -1071,7 +1071,7 @@ public void testDataFrameTransformsUserRole() { assertThat(role.cluster().check(StopDataFrameTransformAction.NAME, request), is(false)); assertThat(role.runAs().check(randomAlphaOfLengthBetween(1, 30)), is(false)); - assertOnlyReadAllowed(role, ".data-frame-notifications"); + assertOnlyReadAllowed(role, ".data-frame-notifications-1"); assertNoAccessAllowed(role, "foo"); assertNoAccessAllowed(role, ".data-frame-internal-1"); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java index 25f3939680541..91982ce49aaed 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java @@ -30,7 +30,10 @@ public final class DataFrameInternalIndex { public static final String INDEX_TEMPLATE_PATTERN = ".data-frame-internal-"; public static final String INDEX_TEMPLATE_NAME = INDEX_TEMPLATE_PATTERN + INDEX_TEMPLATE_VERSION; public static final String INDEX_NAME = INDEX_TEMPLATE_NAME; - public static final String AUDIT_INDEX = ".data-frame-notifications"; + + public static final String AUDIT_TEMPLATE_VERSION = "1"; + public static final String AUDIT_INDEX_PREFIX = ".data-frame-notifications-"; + public static final String AUDIT_INDEX = AUDIT_INDEX_PREFIX + AUDIT_TEMPLATE_VERSION; // constants for mappings public static final String DYNAMIC = "dynamic"; @@ -60,10 +63,10 @@ public static IndexTemplateMetaData getIndexTemplateMetaData() throws IOExceptio public static IndexTemplateMetaData getAuditIndexTemplateMetaData() throws IOException { IndexTemplateMetaData dataFrameTemplate = IndexTemplateMetaData.builder(AUDIT_INDEX) - .patterns(Collections.singletonList(AUDIT_INDEX)) + .patterns(Collections.singletonList(AUDIT_INDEX_PREFIX + "*")) .version(Version.CURRENT.id) .settings(Settings.builder() - // the configurations are expected to be small + // the audits are expected to be small .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")) .putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(auditMappings())) @@ -75,6 +78,7 @@ private static XContentBuilder auditMappings() throws IOException { XContentBuilder builder = jsonBuilder().startObject(); builder.startObject(SINGLE_MAPPING_NAME); addMetaInformation(builder); + builder.field(DYNAMIC, "false"); builder.startObject(PROPERTIES) .startObject(DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD) .field(TYPE, KEYWORD)