Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

public class Auditor<T extends AbstractAuditMessage> {
public abstract class AbstractAuditor<T extends AbstractAuditMessage> {

private static final Logger logger = LogManager.getLogger(Auditor.class);
private static final Logger logger = LogManager.getLogger(AbstractAuditor.class);
private final Client client;
private final String nodeName;
private final String auditIndex;
private final String executionOrigin;
private final AbstractAuditMessage.AbstractBuilder<T> messageBuilder;

public Auditor(Client client,
String nodeName,
String auditIndex,
String executionOrigin,
AbstractAuditMessage.AbstractBuilder<T> messageBuilder) {
public AbstractAuditor(Client client,
String nodeName,
String auditIndex,
String executionOrigin,
AbstractAuditMessage.AbstractBuilder<T> messageBuilder) {
this.client = Objects.requireNonNull(client);
this.nodeName = Objects.requireNonNull(nodeName);
this.auditIndex = auditIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
import org.elasticsearch.xpack.core.ml.job.results.ReservedFieldNames;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.core.ml.notifications.AuditMessage;
import org.elasticsearch.xpack.core.ml.notifications.AnomalyDetectionAuditMessage;
import org.elasticsearch.xpack.core.ml.utils.ExponentialAverageCalculationContext;

import java.io.IOException;
Expand Down Expand Up @@ -1122,21 +1122,21 @@ public static XContentBuilder auditMessageMapping() throws IOException {
.startObject(Job.ID.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AuditMessage.LEVEL.getPreferredName())
.startObject(AnomalyDetectionAuditMessage.LEVEL.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AuditMessage.MESSAGE.getPreferredName())
.startObject(AnomalyDetectionAuditMessage.MESSAGE.getPreferredName())
.field(TYPE, TEXT)
.startObject(FIELDS)
.startObject(RAW)
.field(TYPE, KEYWORD)
.endObject()
.endObject()
.endObject()
.startObject(AuditMessage.TIMESTAMP.getPreferredName())
.startObject(AnomalyDetectionAuditMessage.TIMESTAMP.getPreferredName())
.field(TYPE, DATE)
.endObject()
.startObject(AuditMessage.NODE_NAME.getPreferredName())
.startObject(AnomalyDetectionAuditMessage.NODE_NAME.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.endObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class AuditMessage extends AbstractAuditMessage {
public class AnomalyDetectionAuditMessage extends AbstractAuditMessage {

public static final ConstructingObjectParser<AuditMessage, Void> PARSER = new ConstructingObjectParser<>(
public static final ConstructingObjectParser<AnomalyDetectionAuditMessage, Void> PARSER = new ConstructingObjectParser<>(
"ml_audit_message",
true,
a -> new AuditMessage((String)a[0], (String)a[1], (Level)a[2], (Date)a[3], (String)a[4]));
a -> new AnomalyDetectionAuditMessage((String)a[0], (String)a[1], (Level)a[2], (Date)a[3], (String)a[4]));

static {
PARSER.declareString(optionalConstructorArg(), Job.ID);
Expand All @@ -41,11 +41,11 @@ public class AuditMessage extends AbstractAuditMessage {
PARSER.declareString(optionalConstructorArg(), NODE_NAME);
}

public AuditMessage(String resourceId, String message, Level level, String nodeName) {
public AnomalyDetectionAuditMessage(String resourceId, String message, Level level, String nodeName) {
super(resourceId, message, level, nodeName);
}

protected AuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) {
protected AnomalyDetectionAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) {
super(resourceId, message, level, timestamp, nodeName);
}

Expand All @@ -54,11 +54,11 @@ protected String getResourceField() {
return Job.ID.getPreferredName();
}

public static AbstractBuilder<AuditMessage> builder() {
return new AbstractBuilder<AuditMessage>() {
public static AbstractBuilder<AnomalyDetectionAuditMessage> builder() {
return new AbstractBuilder<AnomalyDetectionAuditMessage>() {
@Override
protected AuditMessage newMessage(Level level, String resourceId, String message, String nodeName) {
return new AuditMessage(resourceId, message, level, nodeName);
protected AnomalyDetectionAuditMessage newMessage(Level level, String resourceId, String message, String nodeName) {
return new AnomalyDetectionAuditMessage(resourceId, message, level, nodeName);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class AuditorTests extends ESTestCase {
private Client client;
private ArgumentCaptor<IndexRequest> indexRequestCaptor;
public class AbstractAuditorTests extends ESTestCase {

private static final String TEST_NODE_NAME = "node_1";
private static final String TEST_ORIGIN = "test_origin";
private static final String TEST_INDEX = "test_index";
private static final AbstractAuditMessage.AbstractBuilder<AbstractAuditMessageTests.TestAuditMessage> builder =
AbstractAuditMessageTests.TestAuditMessage.newBuilder();


private Client client;
private ArgumentCaptor<IndexRequest> indexRequestCaptor;

@Before
public void setUpMocks() {
client = mock(Client.class);
Expand All @@ -47,7 +48,7 @@ public void setUpMocks() {
}

public void testInfo() throws IOException {
Auditor<AbstractAuditMessageTests.TestAuditMessage> auditor = new Auditor<>(client, "node_1", TEST_INDEX, TEST_ORIGIN, builder);
AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor = new TestAuditor(client);
auditor.info("foo", "Here is my info");

verify(client).index(indexRequestCaptor.capture(), any());
Expand All @@ -61,7 +62,7 @@ public void testInfo() throws IOException {
}

public void testWarning() throws IOException {
Auditor<AbstractAuditMessageTests.TestAuditMessage> auditor = new Auditor<>(client, "node_1", TEST_INDEX, TEST_ORIGIN, builder);
AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor = new TestAuditor(client);
auditor.warning("bar", "Here is my warning");

verify(client).index(indexRequestCaptor.capture(), any());
Expand All @@ -75,7 +76,7 @@ public void testWarning() throws IOException {
}

public void testError() throws IOException {
Auditor<AbstractAuditMessageTests.TestAuditMessage> auditor = new Auditor<>(client, "node_1", TEST_INDEX, TEST_ORIGIN, builder);
AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor = new TestAuditor(client);
auditor.error("foobar", "Here is my error");

verify(client).index(indexRequestCaptor.capture(), any());
Expand All @@ -93,4 +94,10 @@ private AbstractAuditMessageTests.TestAuditMessage parseAuditMessage(BytesRefere
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, msg.streamInput());
return AbstractAuditMessageTests.TestAuditMessage.PARSER.apply(parser, null);
}

static class TestAuditor extends AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> {
TestAuditor(Client client) {
super(client, TEST_NODE_NAME, TEST_INDEX, TEST_ORIGIN, AbstractAuditMessageTests.TestAuditMessage.newBuilder());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@
public class LevelTests extends ESTestCase {

public void testFromString() {
assertEquals(Level.INFO, Level.fromString("info"));
assertEquals(Level.INFO, Level.fromString("INFO"));
assertEquals(Level.WARNING, Level.fromString("warning"));
assertEquals(Level.WARNING, Level.fromString("WARNING"));
assertEquals(Level.ERROR, Level.fromString("error"));
assertEquals(Level.ERROR, Level.fromString("ERROR"));
assertThat(Level.fromString("info"), equalTo(Level.INFO));
assertThat(Level.fromString("INFO"), equalTo(Level.INFO));
assertThat(Level.fromString("warning"), equalTo(Level.WARNING));
assertThat(Level.fromString("WARNING"), equalTo(Level.WARNING));
assertThat(Level.fromString("error"), equalTo(Level.ERROR));
assertThat(Level.fromString("ERROR"), equalTo(Level.ERROR));
}

public void testToString() {
assertEquals("info", Level.INFO.toString());
assertEquals("warning", Level.WARNING.toString());
assertEquals("error", Level.ERROR.toString());
assertThat(Level.INFO.toString(), equalTo("info"));
assertThat(Level.WARNING.toString(), equalTo("warning"));
assertThat(Level.ERROR.toString(), equalTo("error"));
}

public void testValidOrdinals() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void testAuditorWritesAudits() throws Exception {
assertBusy(() -> {
assertTrue(indexExists(DataFrameInternalIndex.AUDIT_INDEX));
});
// Since calls to write the Auditor are sent and forgot (async) we could have returned from the start,
// Since calls to write the AbstractAuditor are sent and forgot (async) we could have returned from the start,
// finished the job (as this is a very short DF job), all without the audit being fully written.
assertBusy(() -> {
refreshIndex(DataFrameInternalIndex.AUDIT_INDEX);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.elasticsearch.xpack.dataframe.notifications;

import org.elasticsearch.client.Client;
import org.elasticsearch.xpack.core.common.notifications.Auditor;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;

Expand All @@ -15,7 +15,7 @@
/**
* DataFrameAuditor class that abstracts away generic templating for easier injection
*/
public class DataFrameAuditor extends Auditor<DataFrameAuditMessage> {
public class DataFrameAuditor extends AbstractAuditor<DataFrameAuditMessage> {
public DataFrameAuditor(Client client, String nodeName) {
super(client, nodeName, DataFrameInternalIndex.AUDIT_INDEX, DATA_FRAME_ORIGIN, DataFrameAuditMessage.builder());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@
import org.elasticsearch.xpack.ml.job.process.normalizer.NativeNormalizerProcessFactory;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.process.DummyController;
import org.elasticsearch.xpack.ml.process.MlController;
import org.elasticsearch.xpack.ml.process.MlControllerHolder;
Expand Down Expand Up @@ -470,7 +470,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
return Collections.singletonList(new JobManagerHolder());
}

Auditor auditor = new Auditor(client, clusterService.getNodeName());
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
JobResultsPersister jobResultsPersister = new JobResultsPersister(client);
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,28 @@
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;

import java.util.Objects;


public class MlAssignmentNotifier implements ClusterStateListener {
private static final Logger logger = LogManager.getLogger(MlAssignmentNotifier.class);

private final Auditor auditor;
private final AnomalyDetectionAuditor auditor;
private final MlConfigMigrator mlConfigMigrator;
private final ThreadPool threadPool;

MlAssignmentNotifier(Settings settings, Auditor auditor, ThreadPool threadPool, Client client, ClusterService clusterService) {
MlAssignmentNotifier(Settings settings, AnomalyDetectionAuditor auditor, ThreadPool threadPool, Client client,
ClusterService clusterService) {
this.auditor = auditor;
this.mlConfigMigrator = new MlConfigMigrator(settings, client, clusterService);
this.threadPool = threadPool;
clusterService.addListener(this);
}

MlAssignmentNotifier(Auditor auditor, ThreadPool threadPool, MlConfigMigrator mlConfigMigrator, ClusterService clusterService) {
MlAssignmentNotifier(AnomalyDetectionAuditor auditor, ThreadPool threadPool, MlConfigMigrator mlConfigMigrator,
ClusterService clusterService) {
this.auditor = auditor;
this.mlConfigMigrator = mlConfigMigrator;
this.threadPool = threadPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -53,14 +53,14 @@ public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJ
private final ThreadPool threadPool;
private final Client client;
private final ClusterService clusterService;
private final Auditor auditor;
private final AnomalyDetectionAuditor auditor;
private final PersistentTasksService persistentTasksService;
private final JobConfigProvider jobConfigProvider;
private final DatafeedConfigProvider datafeedConfigProvider;

@Inject
public TransportCloseJobAction(TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
ClusterService clusterService, Client client, Auditor auditor,
ClusterService clusterService, Client client, AnomalyDetectionAuditor auditor,
PersistentTasksService persistentTasksService, JobConfigProvider jobConfigProvider,
DatafeedConfigProvider datafeedConfigProvider) {
// We fork in innerTaskOperation(...), so we can use ThreadPool.Names.SAME here:
Expand Down Expand Up @@ -242,8 +242,10 @@ static void addJobAccordingToState(String jobId, PersistentTasksCustomMetaData t
}
}

static TransportCloseJobAction.WaitForCloseRequest buildWaitForCloseRequest(List<String> openJobIds, List<String> closingJobIds,
PersistentTasksCustomMetaData tasks, Auditor auditor) {
static TransportCloseJobAction.WaitForCloseRequest buildWaitForCloseRequest(List<String> openJobIds,
List<String> closingJobIds,
PersistentTasksCustomMetaData tasks,
AnomalyDetectionAuditor auditor) {
TransportCloseJobAction.WaitForCloseRequest waitForCloseRequest = new TransportCloseJobAction.WaitForCloseRequest();

for (String jobId : openJobIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover;
import org.elasticsearch.xpack.ml.job.retention.MlDataRemover;
import org.elasticsearch.xpack.ml.job.retention.UnusedStateRemover;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;

import java.util.Arrays;
Expand Down Expand Up @@ -54,7 +54,7 @@ protected void doExecute(Task task, DeleteExpiredDataAction.Request request,
}

private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response> listener) {
Auditor auditor = new Auditor(client, clusterService.getNodeName());
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, auditor),
new ExpiredForecastsRemover(client, threadPool),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;

Expand All @@ -94,7 +94,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo

private final Client client;
private final PersistentTasksService persistentTasksService;
private final Auditor auditor;
private final AnomalyDetectionAuditor auditor;
private final JobResultsProvider jobResultsProvider;
private final JobConfigProvider jobConfigProvider;
private final DatafeedConfigProvider datafeedConfigProvider;
Expand All @@ -113,7 +113,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
public TransportDeleteJobAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, PersistentTasksService persistentTasksService,
Client client, Auditor auditor, JobResultsProvider jobResultsProvider,
Client client, AnomalyDetectionAuditor auditor, JobResultsProvider jobResultsProvider,
JobConfigProvider jobConfigProvider, DatafeedConfigProvider datafeedConfigProvider,
MlMemoryTracker memoryTracker) {
super(DeleteJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
Expand Down
Loading