diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleSettings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleSettings.java index 3cfd8556244a9..0a157b8197a10 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleSettings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleSettings.java @@ -16,10 +16,15 @@ public class LifecycleSettings { public static final String LIFECYCLE_NAME = "index.lifecycle.name"; public static final String LIFECYCLE_INDEXING_COMPLETE = "index.lifecycle.indexing_complete"; + public static final String SLM_HISTORY_INDEX_ENABLED = "slm.history_index_enabled"; + public static final Setting LIFECYCLE_POLL_INTERVAL_SETTING = Setting.timeSetting(LIFECYCLE_POLL_INTERVAL, TimeValue.timeValueMinutes(10), TimeValue.timeValueSeconds(1), Setting.Property.Dynamic, Setting.Property.NodeScope); public static final Setting LIFECYCLE_NAME_SETTING = Setting.simpleString(LIFECYCLE_NAME, Setting.Property.Dynamic, Setting.Property.IndexScope); public static final Setting LIFECYCLE_INDEXING_COMPLETE_SETTING = Setting.boolSetting(LIFECYCLE_INDEXING_COMPLETE, false, Setting.Property.Dynamic, Setting.Property.IndexScope); + + public static final Setting SLM_HISTORY_INDEX_ENABLED_SETTING = Setting.boolSetting(SLM_HISTORY_INDEX_ENABLED, true, + Setting.Property.NodeScope); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotHistoryItem.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotHistoryItem.java new file mode 100644 index 0000000000000..1c5a9ef751e9d --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotHistoryItem.java @@ -0,0 +1,223 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.snapshotlifecycle.history; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE; + +/** + * Represents the record of a Snapshot Lifecycle Management action, so that it + * can be indexed in a history index or recorded to a log in a structured way + */ +public class SnapshotHistoryItem implements Writeable, ToXContentObject { + static final ParseField TIMESTAMP = new ParseField("@timestamp"); + static final ParseField POLICY_ID = new ParseField("policy"); + static final ParseField REPOSITORY = new ParseField("repository"); + static final ParseField SNAPSHOT_NAME = new ParseField("snapshot_name"); + static final ParseField OPERATION = new ParseField("operation"); + static final ParseField SUCCESS = new ParseField("success"); + private static final String CREATE_OPERATION = "CREATE"; + protected final long timestamp; + protected final String policyId; + protected final String repository; + protected final String snapshotName; + protected final String operation; + protected final boolean success; + + private final Map snapshotConfiguration; + @Nullable + private final String errorDetails; + + static final ParseField SNAPSHOT_CONFIG = new ParseField("configuration"); + static final ParseField ERROR_DETAILS = new ParseField("error_details"); + + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>("snapshot_lifecycle_history_item", true, + (a, id) -> { + final long timestamp = (long) a[0]; + final String policyId = (String) a[1]; + final String repository = (String) a[2]; + final String snapshotName = (String) a[3]; + final String operation = (String) a[4]; + final boolean success = (boolean) a[5]; + final Map snapshotConfiguration = (Map) a[6]; + final String errorDetails = (String) a[7]; + return new SnapshotHistoryItem(timestamp, policyId, repository, snapshotName, operation, success, + snapshotConfiguration, errorDetails); + }); + + static { + PARSER.declareLong(ConstructingObjectParser.constructorArg(), TIMESTAMP); + PARSER.declareString(ConstructingObjectParser.constructorArg(), POLICY_ID); + PARSER.declareString(ConstructingObjectParser.constructorArg(), REPOSITORY); + PARSER.declareString(ConstructingObjectParser.constructorArg(), SNAPSHOT_NAME); + PARSER.declareString(ConstructingObjectParser.constructorArg(), OPERATION); + PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), SUCCESS); + PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.map(), SNAPSHOT_CONFIG); + PARSER.declareStringOrNull(ConstructingObjectParser.constructorArg(), ERROR_DETAILS); + } + + public static SnapshotHistoryItem parse(XContentParser parser, String name) { + return PARSER.apply(parser, name); + } + + SnapshotHistoryItem(long timestamp, String policyId, String repository, String snapshotName, String operation, + boolean success, Map snapshotConfiguration, String errorDetails) { + this.timestamp = timestamp; + this.policyId = Objects.requireNonNull(policyId); + this.repository = Objects.requireNonNull(repository); + this.snapshotName = Objects.requireNonNull(snapshotName); + this.operation = Objects.requireNonNull(operation); + this.success = success; + this.snapshotConfiguration = Objects.requireNonNull(snapshotConfiguration); + this.errorDetails = errorDetails; + } + + public static SnapshotHistoryItem successRecord(long timestamp, SnapshotLifecyclePolicy policy, String snapshotName) { + return new SnapshotHistoryItem(timestamp, policy.getId(), policy.getRepository(), snapshotName, CREATE_OPERATION, true, + policy.getConfig(), null); + } + + public static SnapshotHistoryItem failureRecord(long timeStamp, SnapshotLifecyclePolicy policy, String snapshotName, + Exception exception) throws IOException { + ToXContent.Params stacktraceParams = new ToXContent.MapParams(Collections.singletonMap(REST_EXCEPTION_SKIP_STACK_TRACE, "false")); + String exceptionString; + try (XContentBuilder causeXContentBuilder = JsonXContent.contentBuilder()) { + causeXContentBuilder.startObject(); + ElasticsearchException.generateThrowableXContent(causeXContentBuilder, stacktraceParams, exception); + causeXContentBuilder.endObject(); + exceptionString = BytesReference.bytes(causeXContentBuilder).utf8ToString(); + } + return new SnapshotHistoryItem(timeStamp, policy.getId(), policy.getRepository(), snapshotName, CREATE_OPERATION, false, + policy.getConfig(), exceptionString); + } + + public SnapshotHistoryItem(StreamInput in) throws IOException { + this.timestamp = in.readVLong(); + this.policyId = in.readString(); + this.repository = in.readString(); + this.snapshotName = in.readString(); + this.operation = in.readString(); + this.success = in.readBoolean(); + this.snapshotConfiguration = in.readMap(); + this.errorDetails = in.readOptionalString(); + } + + public Map getSnapshotConfiguration() { + return snapshotConfiguration; + } + + public String getErrorDetails() { + return errorDetails; + } + + public long getTimestamp() { + return timestamp; + } + + public String getPolicyId() { + return policyId; + } + + public String getRepository() { + return repository; + } + + public String getSnapshotName() { + return snapshotName; + } + + public String getOperation() { + return operation; + } + + public boolean isSuccess() { + return success; + } + + @Override + public final void writeTo(StreamOutput out) throws IOException { + out.writeVLong(timestamp); + out.writeString(policyId); + out.writeString(repository); + out.writeString(snapshotName); + out.writeString(operation); + out.writeBoolean(success); + out.writeMap(snapshotConfiguration); + out.writeOptionalString(errorDetails); + } + + @Override + public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.timeField(TIMESTAMP.getPreferredName(), "timestamp_string", timestamp); + builder.field(POLICY_ID.getPreferredName(), policyId); + builder.field(REPOSITORY.getPreferredName(), repository); + builder.field(SNAPSHOT_NAME.getPreferredName(), snapshotName); + builder.field(OPERATION.getPreferredName(), operation); + builder.field(SUCCESS.getPreferredName(), success); + builder.field(SNAPSHOT_CONFIG.getPreferredName(), snapshotConfiguration); + builder.field(ERROR_DETAILS.getPreferredName(), errorDetails); + } + builder.endObject(); + + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + boolean result; + if (this == o) result = true; + if (o == null || getClass() != o.getClass()) result = false; + SnapshotHistoryItem that1 = (SnapshotHistoryItem) o; + result = isSuccess() == that1.isSuccess() && + timestamp == that1.getTimestamp() && + Objects.equals(getPolicyId(), that1.getPolicyId()) && + Objects.equals(getRepository(), that1.getRepository()) && + Objects.equals(getSnapshotName(), that1.getSnapshotName()) && + Objects.equals(getOperation(), that1.getOperation()); + if (!result) return false; + SnapshotHistoryItem that = (SnapshotHistoryItem) o; + return Objects.equals(getSnapshotConfiguration(), that.getSnapshotConfiguration()) && + Objects.equals(getErrorDetails(), that.getErrorDetails()); + } + + @Override + public int hashCode() { + return Objects.hash(getTimestamp(), getPolicyId(), getRepository(), getSnapshotName(), getOperation(), isSuccess(), + getSnapshotConfiguration(), getErrorDetails()); + } + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotHistoryStore.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotHistoryStore.java new file mode 100644 index 0000000000000..83ceb17ba3a80 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotHistoryStore.java @@ -0,0 +1,84 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.snapshotlifecycle.history; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; + +import java.io.IOException; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import static org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING; +import static org.elasticsearch.xpack.core.snapshotlifecycle.history.SnapshotLifecycleTemplateRegistry.INDEX_TEMPLATE_VERSION; + +/** + * Records Snapshot Lifecycle Management actions as represented by {@link SnapshotHistoryItem} into an index + * for the purposes of querying and alerting. + */ +public class SnapshotHistoryStore { + private static final Logger logger = LogManager.getLogger(SnapshotHistoryStore.class); + private static final DateFormatter indexTimeFormat = DateFormatter.forPattern("yyyy.MM"); + + public static final String SLM_HISTORY_INDEX_PREFIX = ".slm-history-" + INDEX_TEMPLATE_VERSION + "-"; + + private final Client client; + private final ZoneId timeZone; + private final boolean slmHistoryEnabled; + + public SnapshotHistoryStore(Settings nodeSettings, Client client, ZoneId timeZone) { + this.client = client; + this.timeZone = timeZone; + slmHistoryEnabled = SLM_HISTORY_INDEX_ENABLED_SETTING.get(nodeSettings); + } + + /** + * Attempts to asynchronously index a snapshot lifecycle management history entry + * + * @param item The entry to index + */ + public void putAsync(SnapshotHistoryItem item) { + if (slmHistoryEnabled == false) { + logger.trace("not recording snapshot history item because [{}] is [false]: [{}]", + SLM_HISTORY_INDEX_ENABLED_SETTING.getKey(), item); + return; + } + final ZonedDateTime dateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(item.getTimestamp()), timeZone); + final String index = getHistoryIndexNameForTime(dateTime); + logger.trace("about to index snapshot history item in index [{}]: [{}]", index, item); + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + item.toXContent(builder, ToXContent.EMPTY_PARAMS); + IndexRequest request = new IndexRequest(index) + .source(builder); + client.index(request, ActionListener.wrap(indexResponse -> { + logger.debug("successfully indexed snapshot history item with id [{}] in index [{}]: [{}]", + indexResponse.getId(), index, item); + }, exception -> { + logger.error(new ParameterizedMessage("failed to index snapshot history item in index [{}]: [{}]", + index, item), exception); + })); + } catch (IOException exception) { + logger.error(new ParameterizedMessage("failed to index snapshot history item in index [{}]: [{}]", + index, item), exception); + } + } + + + static String getHistoryIndexNameForTime(ZonedDateTime time) { + return SLM_HISTORY_INDEX_PREFIX + indexTimeFormat.format(time); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotLifecycleTemplateRegistry.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotLifecycleTemplateRegistry.java new file mode 100644 index 0000000000000..27e5e63013e56 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotLifecycleTemplateRegistry.java @@ -0,0 +1,104 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.snapshotlifecycle.history; + +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata; +import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy; +import org.elasticsearch.xpack.core.template.IndexTemplateConfig; +import org.elasticsearch.xpack.core.template.IndexTemplateRegistry; +import org.elasticsearch.xpack.core.template.LifecyclePolicyConfig; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.core.ClientHelper.INDEX_LIFECYCLE_ORIGIN; +import static org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING; + +/** + * Manages the index template and associated ILM policy for the Snapshot + * Lifecycle Management history index. + */ +public class SnapshotLifecycleTemplateRegistry extends IndexTemplateRegistry { + // history (please add a comment why you increased the version here) + // version 1: initial + public static final String INDEX_TEMPLATE_VERSION = "1"; + + public static final String SLM_TEMPLATE_VERSION_VARIABLE = "xpack.slm.template.version"; + public static final String SLM_TEMPLATE_NAME = ".slm-history"; + + public static final String SLM_POLICY_NAME = "slm-history-ilm-policy"; + + public static final IndexTemplateConfig TEMPLATE_SLM_HISTORY = new IndexTemplateConfig( + SLM_TEMPLATE_NAME, + "/slm-history.json", + INDEX_TEMPLATE_VERSION, + SLM_TEMPLATE_VERSION_VARIABLE + ); + + public static final LifecyclePolicyConfig SLM_HISTORY_POLICY = new LifecyclePolicyConfig( + SLM_POLICY_NAME, + "/slm-history-ilm-policy.json" + ); + + private final boolean slmHistoryEnabled; + + public SnapshotLifecycleTemplateRegistry(Settings nodeSettings, ClusterService clusterService, ThreadPool threadPool, Client client, + NamedXContentRegistry xContentRegistry) { + super(nodeSettings, clusterService, threadPool, client, xContentRegistry); + slmHistoryEnabled = SLM_HISTORY_INDEX_ENABLED_SETTING.get(nodeSettings); + } + + @Override + protected List getTemplateConfigs() { + if (slmHistoryEnabled == false) { + return Collections.emptyList(); + } + return Collections.singletonList(TEMPLATE_SLM_HISTORY); + } + + @Override + protected List getPolicyConfigs() { + if (slmHistoryEnabled == false) { + return Collections.emptyList(); + } + return Collections.singletonList(SLM_HISTORY_POLICY); + } + + @Override + protected String getOrigin() { + return INDEX_LIFECYCLE_ORIGIN; // TODO use separate SLM origin? + } + + public boolean validate(ClusterState state) { + boolean allTemplatesPresent = getTemplateConfigs().stream() + .map(IndexTemplateConfig::getTemplateName) + .allMatch(name -> state.metaData().getTemplates().containsKey(name)); + + Optional> maybePolicies = Optional + .ofNullable(state.metaData().custom(IndexLifecycleMetadata.TYPE)) + .map(IndexLifecycleMetadata::getPolicies); + Set policyNames = getPolicyConfigs().stream() + .map(LifecyclePolicyConfig::getPolicyName) + .collect(Collectors.toSet()); + + boolean allPoliciesPresent = maybePolicies + .map(policies -> policies.keySet() + .containsAll(policyNames)) + .orElse(false); + return allTemplatesPresent && allPoliciesPresent; + } +} diff --git a/x-pack/plugin/core/src/main/resources/slm-history-ilm-policy.json b/x-pack/plugin/core/src/main/resources/slm-history-ilm-policy.json new file mode 100644 index 0000000000000..8bccc4d23cb46 --- /dev/null +++ b/x-pack/plugin/core/src/main/resources/slm-history-ilm-policy.json @@ -0,0 +1,10 @@ +{ + "phases": { + "delete": { + "min_age": "60d", + "actions": { + "delete": {} + } + } + } +} diff --git a/x-pack/plugin/core/src/main/resources/slm-history.json b/x-pack/plugin/core/src/main/resources/slm-history.json new file mode 100644 index 0000000000000..762c398b2d9a2 --- /dev/null +++ b/x-pack/plugin/core/src/main/resources/slm-history.json @@ -0,0 +1,58 @@ +{ + "index_patterns": [ + ".slm-history-${xpack.slm.template.version}*" + ], + "order": 2147483647, + "settings": { + "index.number_of_shards": 1, + "index.number_of_replicas": 0, + "index.auto_expand_replicas": "0-1", + "index.lifecycle.name": "slm-history-ilm-policy", + "index.format": 1 + }, + "mappings": { + "_doc": { + "dynamic": false, + "properties": { + "@timestamp": { + "type": "date", + "format": "epoch_millis" + }, + "policy": { + "type": "keyword" + }, + "repository": { + "type": "keyword" + }, + "snapshot_name":{ + "type": "keyword" + }, + "operation": { + "type": "keyword" + }, + "success": { + "type": "boolean" + }, + "configuration": { + "type": "object", + "dynamic": false, + "properties": { + "indices": { + "type": "keyword" + }, + "partial": { + "type": "boolean" + }, + "include_global_state": { + "type": "boolean" + } + } + }, + "error_details": { + "type": "text", + "index": false + } + } + } + } +} \ No newline at end of file diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotHistoryItemTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotHistoryItemTests.java new file mode 100644 index 0000000000000..0622398e08fbe --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotHistoryItemTests.java @@ -0,0 +1,108 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.snapshotlifecycle.history; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +public class SnapshotHistoryItemTests extends AbstractSerializingTestCase { + + @Override + protected SnapshotHistoryItem doParseInstance(XContentParser parser) throws IOException { + return SnapshotHistoryItem.parse(parser, this.getClass().getCanonicalName()); + } + + @Override + protected Writeable.Reader instanceReader() { + return SnapshotHistoryItem::new; + } + + @Override + protected SnapshotHistoryItem createTestInstance() { + long timestamp = randomNonNegativeLong(); + String policyId = randomAlphaOfLengthBetween(5, 10); + String repository = randomAlphaOfLengthBetween(5, 10); + String snapshotName = randomAlphaOfLengthBetween(5, 10); + String operation = randomAlphaOfLengthBetween(5, 10); + boolean success = randomBoolean(); + Map snapshotConfig = randomSnapshotConfiguration(); + String errorDetails = randomBoolean() ? null : randomAlphaOfLengthBetween(10, 20); + + return new SnapshotHistoryItem(timestamp, policyId, repository, snapshotName, operation, success, snapshotConfig, + errorDetails); + } + + @Override + protected SnapshotHistoryItem mutateInstance(SnapshotHistoryItem instance) { + final int branch = between(0, 7); + switch (branch) { + case 0: // New timestamp + return new SnapshotHistoryItem( + randomValueOtherThan(instance.getTimestamp(), ESTestCase::randomNonNegativeLong), + instance.getPolicyId(), instance.getRepository(), instance.getSnapshotName(), instance.getOperation(), + instance.isSuccess(), instance.getSnapshotConfiguration(), instance.getErrorDetails()); + case 1: // new policyId + return new SnapshotHistoryItem(instance.getTimestamp(), + randomValueOtherThan(instance.getPolicyId(), () -> randomAlphaOfLengthBetween(5, 10)), + instance.getSnapshotName(), instance.getRepository(), instance.getOperation(), instance.isSuccess(), + instance.getSnapshotConfiguration(), instance.getErrorDetails()); + case 2: // new repo name + return new SnapshotHistoryItem(instance.getTimestamp(), instance.getPolicyId(), instance.getSnapshotName(), + randomValueOtherThan(instance.getRepository(), () -> randomAlphaOfLengthBetween(5, 10)), + instance.getOperation(), instance.isSuccess(), instance.getSnapshotConfiguration(), instance.getErrorDetails()); + case 3: + return new SnapshotHistoryItem(instance.getTimestamp(), instance.getPolicyId(), instance.getRepository(), + randomValueOtherThan(instance.getSnapshotName(), () -> randomAlphaOfLengthBetween(5, 10)), + instance.getOperation(), instance.isSuccess(), instance.getSnapshotConfiguration(), instance.getErrorDetails()); + case 4: + return new SnapshotHistoryItem(instance.getTimestamp(), instance.getPolicyId(), instance.getRepository(), + instance.getSnapshotName(), + randomValueOtherThan(instance.getOperation(), () -> randomAlphaOfLengthBetween(5, 10)), + instance.isSuccess(), instance.getSnapshotConfiguration(), instance.getErrorDetails()); + case 5: + return new SnapshotHistoryItem(instance.getTimestamp(), instance.getPolicyId(), instance.getRepository(), + instance.getSnapshotName(), + instance.getOperation(), + instance.isSuccess() == false, + instance.getSnapshotConfiguration(), instance.getErrorDetails()); + case 6: + return new SnapshotHistoryItem(instance.getTimestamp(), instance.getPolicyId(), instance.getRepository(), + instance.getSnapshotName(), instance.getOperation(), instance.isSuccess(), + randomValueOtherThan(instance.getSnapshotConfiguration(), + SnapshotHistoryItemTests::randomSnapshotConfiguration), + instance.getErrorDetails()); + case 7: + return new SnapshotHistoryItem(instance.getTimestamp(), instance.getPolicyId(), instance.getRepository(), + instance.getSnapshotName(), instance.getOperation(), instance.isSuccess(), instance.getSnapshotConfiguration(), + randomValueOtherThan(instance.getErrorDetails(), () -> randomAlphaOfLengthBetween(10, 20))); + default: + throw new IllegalArgumentException("illegal randomization: " + branch); + } + } + + public static Map randomSnapshotConfiguration() { + Map configuration = new HashMap<>(); + configuration.put("indices", Arrays.asList(generateRandomStringArray(1, 10, false, false))); + if (frequently()) { + configuration.put("ignore_unavailable", randomBoolean()); + } + if (frequently()) { + configuration.put("include_global_state", randomBoolean()); + } + if (frequently()) { + configuration.put("partial", randomBoolean()); + } + return configuration; + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotHistoryStoreTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotHistoryStoreTests.java new file mode 100644 index 0000000000000..9457b2a36dd1e --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotHistoryStoreTests.java @@ -0,0 +1,178 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.snapshotlifecycle.history; + +import org.elasticsearch.action.index.IndexAction; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy; +import org.junit.After; +import org.junit.Before; + +import java.time.Instant; +import java.time.ZoneOffset; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING; +import static org.elasticsearch.xpack.core.snapshotlifecycle.history.SnapshotHistoryStore.getHistoryIndexNameForTime; +import static org.elasticsearch.xpack.core.snapshotlifecycle.history.SnapshotLifecycleTemplateRegistry.INDEX_TEMPLATE_VERSION; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.core.IsEqual.equalTo; + +public class SnapshotHistoryStoreTests extends ESTestCase { + + private ThreadPool threadPool; + private SnapshotLifecycleTemplateRegistryTests.VerifyingClient client; + private SnapshotHistoryStore historyStore; + + @Before + public void setup() { + threadPool = new TestThreadPool(this.getClass().getName()); + client = new SnapshotLifecycleTemplateRegistryTests.VerifyingClient(threadPool); + historyStore = new SnapshotHistoryStore(Settings.EMPTY, client, ZoneOffset.UTC); + } + + @After + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testNoActionIfDisabled() { + Settings settings = Settings.builder().put(SLM_HISTORY_INDEX_ENABLED_SETTING.getKey(), false).build(); + SnapshotHistoryStore disabledHistoryStore = new SnapshotHistoryStore(settings, client, ZoneOffset.UTC); + String policyId = randomAlphaOfLength(5); + SnapshotLifecyclePolicy policy = randomSnapshotLifecyclePolicy(policyId); + final long timestamp = randomNonNegativeLong(); + SnapshotLifecyclePolicy.ResolverContext context = new SnapshotLifecyclePolicy.ResolverContext(timestamp); + String snapshotId = policy.generateSnapshotName(context); + SnapshotHistoryItem record = SnapshotHistoryItem.successRecord(timestamp, policy, snapshotId); + + client.setVerifier((a,r,l) -> { + fail("the history store is disabled, no action should have been taken"); + return null; + }); + disabledHistoryStore.putAsync(record); + } + + @SuppressWarnings("unchecked") + public void testPut() throws Exception { + String policyId = randomAlphaOfLength(5); + SnapshotLifecyclePolicy policy = randomSnapshotLifecyclePolicy(policyId); + final long timestamp = randomNonNegativeLong(); + SnapshotLifecyclePolicy.ResolverContext context = new SnapshotLifecyclePolicy.ResolverContext(timestamp); + String snapshotId = policy.generateSnapshotName(context); + { + SnapshotHistoryItem record = SnapshotHistoryItem.successRecord(timestamp, policy, snapshotId); + + AtomicInteger calledTimes = new AtomicInteger(0); + client.setVerifier((action, request, listener) -> { + calledTimes.incrementAndGet(); + assertThat(action, instanceOf(IndexAction.class)); + assertThat(request, instanceOf(IndexRequest.class)); + IndexRequest indexRequest = (IndexRequest) request; + assertEquals(getHistoryIndexNameForTime(Instant.ofEpochMilli(timestamp).atZone(ZoneOffset.UTC)), indexRequest.index()); + final String indexedDocument = indexRequest.source().utf8ToString(); + assertThat(indexedDocument, containsString(policy.getId())); + assertThat(indexedDocument, containsString(policy.getRepository())); + assertThat(indexedDocument, containsString(snapshotId)); + if (policy.getConfig() != null) { + assertContainsMap(indexedDocument, policy.getConfig()); + } + assertNotNull(listener); + return new IndexResponse(); + }); + + historyStore.putAsync(record); + assertBusy(() -> assertThat(calledTimes.get(), equalTo(1))); + } + + { + final String cause = randomAlphaOfLength(9); + Exception failureException = new RuntimeException(cause); + SnapshotHistoryItem record = SnapshotHistoryItem.failureRecord(timestamp, policy, snapshotId, failureException); + + AtomicInteger calledTimes = new AtomicInteger(0); + client.setVerifier((action, request, listener) -> { + calledTimes.incrementAndGet(); + assertThat(action, instanceOf(IndexAction.class)); + assertThat(request, instanceOf(IndexRequest.class)); + IndexRequest indexRequest = (IndexRequest) request; + assertEquals(getHistoryIndexNameForTime(Instant.ofEpochMilli(timestamp).atZone(ZoneOffset.UTC)), indexRequest.index()); + final String indexedDocument = indexRequest.source().utf8ToString(); + assertThat(indexedDocument, containsString(policy.getId())); + assertThat(indexedDocument, containsString(policy.getRepository())); + assertThat(indexedDocument, containsString(snapshotId)); + if (policy.getConfig() != null) { + assertContainsMap(indexedDocument, policy.getConfig()); + } + assertThat(indexedDocument, containsString("runtime_exception")); + assertThat(indexedDocument, containsString(cause)); + assertNotNull(listener); + return new IndexResponse(); + }); + + historyStore.putAsync(record); + assertBusy(() -> assertThat(calledTimes.get(), equalTo(1))); + } + } + + @SuppressWarnings("unchecked") + private void assertContainsMap(String indexedDocument, Map map) { + map.forEach((k, v) -> { + assertThat(indexedDocument, containsString(k)); + if (v instanceof Map) { + assertContainsMap(indexedDocument, (Map) v); + } if (v instanceof Iterable) { + ((Iterable) v).forEach(elem -> { + assertThat(indexedDocument, containsString(elem.toString())); + }); + } else { + assertThat(indexedDocument, containsString(v.toString())); + } + }); + } + + + public void testIndexNameGeneration() { + String indexTemplateVersion = INDEX_TEMPLATE_VERSION; + assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli((long) 0).atZone(ZoneOffset.UTC)), + equalTo(".slm-history-"+ indexTemplateVersion +"-1970.01")); + assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(100000000000L).atZone(ZoneOffset.UTC)), + equalTo(".slm-history-" + indexTemplateVersion + "-1973.03")); + assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(1416582852000L).atZone(ZoneOffset.UTC)), + equalTo(".slm-history-" + indexTemplateVersion + "-2014.11")); + assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(2833165811000L).atZone(ZoneOffset.UTC)), + equalTo(".slm-history-" + indexTemplateVersion + "-2059.10")); + } + + public static SnapshotLifecyclePolicy randomSnapshotLifecyclePolicy(String id) { + Map config = new HashMap<>(); + for (int i = 0; i < randomIntBetween(2, 5); i++) { + config.put(randomAlphaOfLength(4), randomAlphaOfLength(4)); + } + return new SnapshotLifecyclePolicy(id, + randomAlphaOfLength(4), + randomSchedule(), + randomAlphaOfLength(4), + config); + } + + private static String randomSchedule() { + return randomIntBetween(0, 59) + " " + + randomIntBetween(0, 59) + " " + + randomIntBetween(0, 12) + " * * ?"; + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotLifecycleTemplateRegistryTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotLifecycleTemplateRegistryTests.java new file mode 100644 index 0000000000000..7c086d53de408 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/snapshotlifecycle/history/SnapshotLifecycleTemplateRegistryTests.java @@ -0,0 +1,331 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.snapshotlifecycle.history; + +import org.elasticsearch.Version; +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateAction; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterModule; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.TriFunction; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.indexlifecycle.DeleteAction; +import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata; +import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction; +import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy; +import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicyMetadata; +import org.elasticsearch.xpack.core.indexlifecycle.LifecycleType; +import org.elasticsearch.xpack.core.indexlifecycle.OperationMode; +import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType; +import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.elasticsearch.mock.orig.Mockito.when; +import static org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING; +import static org.elasticsearch.xpack.core.snapshotlifecycle.history.SnapshotLifecycleTemplateRegistry.SLM_POLICY_NAME; +import static org.elasticsearch.xpack.core.snapshotlifecycle.history.SnapshotLifecycleTemplateRegistry.SLM_TEMPLATE_NAME; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + +public class SnapshotLifecycleTemplateRegistryTests extends ESTestCase { + private SnapshotLifecycleTemplateRegistry registry; + private NamedXContentRegistry xContentRegistry; + private ClusterService clusterService; + private ThreadPool threadPool; + private VerifyingClient client; + + @Before + public void createRegistryAndClient() { + threadPool = new TestThreadPool(this.getClass().getName()); + client = new VerifyingClient(threadPool); + clusterService = ClusterServiceUtils.createClusterService(threadPool); + List entries = new ArrayList<>(ClusterModule.getNamedXWriteables()); + entries.addAll(Arrays.asList( + new NamedXContentRegistry.Entry(LifecycleType.class, new ParseField(TimeseriesLifecycleType.TYPE), + (p) -> TimeseriesLifecycleType.INSTANCE), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse))); + xContentRegistry = new NamedXContentRegistry(entries); + registry = new SnapshotLifecycleTemplateRegistry(Settings.EMPTY, clusterService, threadPool, client, xContentRegistry); + } + + @After + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testDisabledDoesNotAddTemplates() { + Settings settings = Settings.builder().put(SLM_HISTORY_INDEX_ENABLED_SETTING.getKey(), false).build(); + SnapshotLifecycleTemplateRegistry disabledRegistry = new SnapshotLifecycleTemplateRegistry(settings, clusterService, threadPool, + client, xContentRegistry); + assertThat(disabledRegistry.getTemplateConfigs(), hasSize(0)); + assertThat(disabledRegistry.getPolicyConfigs(), hasSize(0)); + } + + public void testThatNonExistingTemplatesAreAddedImmediately() throws Exception { + DiscoveryNode node = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); + + ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyList(), nodes); + + AtomicInteger calledTimes = new AtomicInteger(0); + client.setVerifier((action, request, listener) -> { + if (action instanceof PutIndexTemplateAction) { + calledTimes.incrementAndGet(); + assertThat(action, instanceOf(PutIndexTemplateAction.class)); + assertThat(request, instanceOf(PutIndexTemplateRequest.class)); + final PutIndexTemplateRequest putRequest = (PutIndexTemplateRequest) request; + assertThat(putRequest.name(), equalTo(SLM_TEMPLATE_NAME)); + assertThat(putRequest.settings().get("index.lifecycle.name"), equalTo(SLM_POLICY_NAME)); + assertNotNull(listener); + return new TestPutIndexTemplateResponse(true); + } else if (action instanceof PutLifecycleAction) { + // Ignore this, it's verified in another test + return new PutLifecycleAction.Response(true); + } else { + fail("client called with unexpected request:" + request.toString()); + return null; + } + }); + registry.clusterChanged(event); + assertBusy(() -> assertThat(calledTimes.get(), equalTo(registry.getTemplateConfigs().size()))); + + calledTimes.set(0); + // now delete one template from the cluster state and lets retry + ClusterChangedEvent newEvent = createClusterChangedEvent(Collections.emptyList(), nodes); + registry.clusterChanged(newEvent); + assertBusy(() -> assertThat(calledTimes.get(), equalTo(1))); + } + + public void testThatNonExistingPoliciesAreAddedImmediately() throws Exception { + DiscoveryNode node = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); + + AtomicInteger calledTimes = new AtomicInteger(0); + client.setVerifier((action, request, listener) -> { + if (action instanceof PutLifecycleAction) { + calledTimes.incrementAndGet(); + assertThat(action, instanceOf(PutLifecycleAction.class)); + assertThat(request, instanceOf(PutLifecycleAction.Request.class)); + final PutLifecycleAction.Request putRequest = (PutLifecycleAction.Request) request; + assertThat(putRequest.getPolicy().getName(), equalTo(SLM_POLICY_NAME)); + assertNotNull(listener); + return new PutLifecycleAction.Response(true); + } else if (action instanceof PutIndexTemplateAction) { + // Ignore this, it's verified in another test + return new TestPutIndexTemplateResponse(true); + } else { + fail("client called with unexpected request:" + request.toString()); + return null; + } + }); + + ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyList(), nodes); + registry.clusterChanged(event); + assertBusy(() -> assertThat(calledTimes.get(), equalTo(1))); + } + + public void testPolicyAlreadyExists() { + DiscoveryNode node = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); + + Map policyMap = new HashMap<>(); + List policies = registry.getPolicyConfigs().stream() + .map(policyConfig -> policyConfig.load(xContentRegistry)) + .collect(Collectors.toList()); + assertThat(policies, hasSize(1)); + LifecyclePolicy policy = policies.get(0); + policyMap.put(policy.getName(), policy); + + client.setVerifier((action, request, listener) -> { + if (action instanceof PutIndexTemplateAction) { + // Ignore this, it's verified in another test + return new TestPutIndexTemplateResponse(true); + } else if (action instanceof PutLifecycleAction) { + fail("if the policy already exists it should be re-put"); + } else { + fail("client called with unexpected request:" + request.toString()); + } + return null; + }); + + ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyList(), policyMap, nodes); + registry.clusterChanged(event); + } + + public void testPolicyAlreadyExistsButDiffers() throws IOException { + DiscoveryNode node = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); + + Map policyMap = new HashMap<>(); + String policyStr = "{\"phases\":{\"delete\":{\"min_age\":\"1m\",\"actions\":{\"delete\":{}}}}}"; + List policies = registry.getPolicyConfigs().stream() + .map(policyConfig -> policyConfig.load(xContentRegistry)) + .collect(Collectors.toList()); + assertThat(policies, hasSize(1)); + LifecyclePolicy policy = policies.get(0); + + client.setVerifier((action, request, listener) -> { + if (action instanceof PutIndexTemplateAction) { + // Ignore this, it's verified in another test + return new TestPutIndexTemplateResponse(true); + } else if (action instanceof PutLifecycleAction) { + fail("if the policy already exists it should be re-put"); + } else { + fail("client called with unexpected request:" + request.toString()); + } + return null; + }); + + try (XContentParser parser = XContentType.JSON.xContent() + .createParser(xContentRegistry, LoggingDeprecationHandler.THROW_UNSUPPORTED_OPERATION, policyStr)) { + LifecyclePolicy different = LifecyclePolicy.parse(parser, policy.getName()); + policyMap.put(policy.getName(), different); + ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyList(), policyMap, nodes); + registry.clusterChanged(event); + } + } + + public void testThatMissingMasterNodeDoesNothing() { + DiscoveryNode localNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").add(localNode).build(); + + client.setVerifier((a,r,l) -> { + fail("if the master is missing nothing should happen"); + return null; + }); + + ClusterChangedEvent event = createClusterChangedEvent(Arrays.asList(SLM_TEMPLATE_NAME), nodes); + registry.clusterChanged(event); + } + + public void testValidate() { + assertFalse(registry.validate(createClusterState(Settings.EMPTY, Collections.emptyList(), Collections.emptyMap(), null))); + assertFalse(registry.validate(createClusterState(Settings.EMPTY, List.of(SLM_TEMPLATE_NAME), Collections.emptyMap(), null))); + + Map policyMap = new HashMap<>(); + policyMap.put(SLM_POLICY_NAME, new LifecyclePolicy(SLM_POLICY_NAME, new HashMap<>())); + assertFalse(registry.validate(createClusterState(Settings.EMPTY, Collections.emptyList(), policyMap, null))); + + assertTrue(registry.validate(createClusterState(Settings.EMPTY, List.of(SLM_TEMPLATE_NAME), policyMap, null))); + } + + // ------------- + + /** + * A client that delegates to a verifying function for action/request/listener + */ + public static class VerifyingClient extends NoOpClient { + + private TriFunction, ActionRequest, ActionListener, ActionResponse> verifier = (a,r,l) -> { + fail("verifier not set"); + return null; + }; + + VerifyingClient(ThreadPool threadPool) { + super(threadPool); + } + + @Override + @SuppressWarnings("unchecked") + protected void doExecute(Action action, + Request request, + ActionListener listener) { + listener.onResponse((Response) verifier.apply(action, request, listener)); + } + + public VerifyingClient setVerifier(TriFunction, ActionRequest, ActionListener, ActionResponse> verifier) { + this.verifier = verifier; + return this; + } + } + + private ClusterChangedEvent createClusterChangedEvent(List existingTemplateNames, DiscoveryNodes nodes) { + return createClusterChangedEvent(existingTemplateNames, Collections.emptyMap(), nodes); + } + + private ClusterChangedEvent createClusterChangedEvent(List existingTemplateNames, + Map existingPolicies, + DiscoveryNodes nodes) { + ClusterState cs = createClusterState(Settings.EMPTY, existingTemplateNames, existingPolicies, nodes); + ClusterChangedEvent realEvent = new ClusterChangedEvent("created-from-test", cs, + ClusterState.builder(new ClusterName("test")).build()); + ClusterChangedEvent event = spy(realEvent); + when(event.localNodeMaster()).thenReturn(nodes.isLocalNodeElectedMaster()); + + return event; + } + + private ClusterState createClusterState(Settings nodeSettings, + List existingTemplateNames, + Map existingPolicies, + DiscoveryNodes nodes) { + ImmutableOpenMap.Builder indexTemplates = ImmutableOpenMap.builder(); + for (String name : existingTemplateNames) { + indexTemplates.put(name, mock(IndexTemplateMetaData.class)); + } + + Map existingILMMeta = existingPolicies.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> new LifecyclePolicyMetadata(e.getValue(), Collections.emptyMap(), 1, 1))); + IndexLifecycleMetadata ilmMeta = new IndexLifecycleMetadata(existingILMMeta, OperationMode.RUNNING); + + return ClusterState.builder(new ClusterName("test")) + .metaData(MetaData.builder() + .templates(indexTemplates.build()) + .transientSettings(nodeSettings) + .putCustom(IndexLifecycleMetadata.TYPE, ilmMeta) + .build()) + .blocks(new ClusterBlocks.Builder().build()) + .nodes(nodes) + .build(); + } + + private static class TestPutIndexTemplateResponse extends AcknowledgedResponse { + TestPutIndexTemplateResponse(boolean acknowledged) { + super(acknowledged); + } + } +} diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleIT.java index 57dbe7d77fac3..59710aa6d8f96 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleIT.java @@ -36,6 +36,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.startsWith; public class SnapshotLifecycleIT extends ESRestTestCase { @@ -103,6 +104,8 @@ public void testFullPolicySnapshot() throws Exception { String lastSnapshotName = (String) lastSuccessObject.get("snapshot_name"); assertThat(lastSnapshotName, startsWith("snap-")); + + assertHistoryIsPresent(policyName, true, repoId); }); Request delReq = new Request("DELETE", "/_slm/policy/" + policyName); @@ -149,6 +152,7 @@ public void testPolicyFailure() throws Exception { assertNotNull(snapshotName); assertThat(snapshotName, startsWith("snap-")); } + assertHistoryIsPresent(policyName, false, repoName); }); Request delReq = new Request("DELETE", "/_slm/policy/" + policyName); @@ -190,6 +194,7 @@ public void testPolicyManualExecution() throws Exception { snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); } assertThat(snapshotResponseMap.size(), greaterThan(0)); + assertHistoryIsPresent(policyName, true, repoId); } catch (ResponseException e) { fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity())); } @@ -206,6 +211,53 @@ public void testPolicyManualExecution() throws Exception { }); } + // This method should be called inside an assertBusy, it has no retry logic of its own + private void assertHistoryIsPresent(String policyName, boolean success, String repository) throws IOException { + final Request historySearchRequest = new Request("GET", ".slm-history*/_search"); + historySearchRequest.setJsonEntity("{\n" + + " \"query\": {\n" + + " \"bool\": {\n" + + " \"must\": [\n" + + " {\n" + + " \"term\": {\n" + + " \"policy\": \"" + policyName + "\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"term\": {\n" + + " \"success\": " + success + "\n" + + " }\n" + + " },\n" + + " {\n" + + " \"term\": {\n" + + " \"repository\": \"" + repository + "\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"term\": {\n" + + " \"operation\": \"CREATE\"\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + " }\n" + + "}"); + Response historyResponse; + try { + historyResponse = client().performRequest(historySearchRequest); + Map historyResponseMap; + try (InputStream is = historyResponse.getEntity().getContent()) { + historyResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); + } + assertThat((int)((Map) ((Map) historyResponseMap.get("hits")).get("total")).get("value"), + greaterThanOrEqualTo(1)); + } catch (ResponseException e) { + // Throw AssertionError instead of an exception if the search fails so that assertBusy works as expected + logger.error(e); + fail("failed to perform search:" + e.getMessage()); + } + } + private void createSnapshotPolicy(String policyName, String snapshotNamePattern, String schedule, String repoId, String indexPattern, boolean ignoreUnavailable) throws IOException { Map snapConfig = new HashMap<>(); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java index 1136969ae2d38..e07533667e94c 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java @@ -60,7 +60,13 @@ import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction; import org.elasticsearch.xpack.core.indexlifecycle.action.StartILMAction; import org.elasticsearch.xpack.core.indexlifecycle.action.StopILMAction; +import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecycleMetadata; +import org.elasticsearch.xpack.core.snapshotlifecycle.action.DeleteSnapshotLifecycleAction; import org.elasticsearch.xpack.core.snapshotlifecycle.action.ExecuteSnapshotLifecycleAction; +import org.elasticsearch.xpack.core.snapshotlifecycle.action.GetSnapshotLifecycleAction; +import org.elasticsearch.xpack.core.snapshotlifecycle.action.PutSnapshotLifecycleAction; +import org.elasticsearch.xpack.core.snapshotlifecycle.history.SnapshotHistoryStore; +import org.elasticsearch.xpack.core.snapshotlifecycle.history.SnapshotLifecycleTemplateRegistry; import org.elasticsearch.xpack.indexlifecycle.action.RestDeleteLifecycleAction; import org.elasticsearch.xpack.indexlifecycle.action.RestExplainLifecycleAction; import org.elasticsearch.xpack.indexlifecycle.action.RestGetLifecycleAction; @@ -81,12 +87,8 @@ import org.elasticsearch.xpack.indexlifecycle.action.TransportRetryAction; import org.elasticsearch.xpack.indexlifecycle.action.TransportStartILMAction; import org.elasticsearch.xpack.indexlifecycle.action.TransportStopILMAction; -import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecycleMetadata; import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleService; import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleTask; -import org.elasticsearch.xpack.core.snapshotlifecycle.action.DeleteSnapshotLifecycleAction; -import org.elasticsearch.xpack.core.snapshotlifecycle.action.GetSnapshotLifecycleAction; -import org.elasticsearch.xpack.core.snapshotlifecycle.action.PutSnapshotLifecycleAction; import org.elasticsearch.xpack.snapshotlifecycle.action.RestDeleteSnapshotLifecycleAction; import org.elasticsearch.xpack.snapshotlifecycle.action.RestExecuteSnapshotLifecycleAction; import org.elasticsearch.xpack.snapshotlifecycle.action.RestGetSnapshotLifecycleAction; @@ -110,6 +112,7 @@ public class IndexLifecycle extends Plugin implements ActionPlugin { private final SetOnce indexLifecycleInitialisationService = new SetOnce<>(); private final SetOnce snapshotLifecycleService = new SetOnce<>(); + private final SetOnce snapshotHistoryStore = new SetOnce<>(); private Settings settings; private boolean enabled; private boolean transportClientMode; @@ -143,7 +146,8 @@ public List> getSettings() { LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING, LifecycleSettings.LIFECYCLE_NAME_SETTING, LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING, - RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING); + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING, + LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING); } @Override @@ -156,9 +160,12 @@ public Collection createComponents(Client client, ClusterService cluster } indexLifecycleInitialisationService.set(new IndexLifecycleService(settings, client, clusterService, threadPool, getClock(), System::currentTimeMillis, xContentRegistry)); + SnapshotLifecycleTemplateRegistry templateRegistry = new SnapshotLifecycleTemplateRegistry(settings, clusterService, threadPool, + client, xContentRegistry); + snapshotHistoryStore.set(new SnapshotHistoryStore(settings, client, getClock().getZone())); snapshotLifecycleService.set(new SnapshotLifecycleService(settings, - () -> new SnapshotLifecycleTask(client, clusterService), clusterService, getClock())); - return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get()); + () -> new SnapshotLifecycleTask(client, clusterService, snapshotHistoryStore.get()), clusterService, getClock())); + return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotHistoryStore.get()); } @Override diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleTask.java index e11c59048ea5d..a31942ad4774e 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleTask.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; @@ -27,6 +28,8 @@ import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotInvocationRecord; import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecycleMetadata; import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicyMetadata; +import org.elasticsearch.xpack.core.snapshotlifecycle.history.SnapshotHistoryItem; +import org.elasticsearch.xpack.core.snapshotlifecycle.history.SnapshotHistoryStore; import org.elasticsearch.xpack.indexlifecycle.LifecyclePolicySecurityClient; import java.io.IOException; @@ -44,17 +47,19 @@ public class SnapshotLifecycleTask implements SchedulerEngine.Listener { private final Client client; private final ClusterService clusterService; + private final SnapshotHistoryStore historyStore; - public SnapshotLifecycleTask(final Client client, final ClusterService clusterService) { + public SnapshotLifecycleTask(final Client client, final ClusterService clusterService, final SnapshotHistoryStore historyStore) { this.client = client; this.clusterService = clusterService; + this.historyStore = historyStore; } @Override public void triggered(SchedulerEngine.Event event) { logger.debug("snapshot lifecycle policy task triggered from job [{}]", event.getJobName()); - final Optional snapshotName = maybeTakeSnapshot(event.getJobName(), client, clusterService); + final Optional snapshotName = maybeTakeSnapshot(event.getJobName(), client, clusterService, historyStore); // Would be cleaner if we could use Optional#ifPresentOrElse snapshotName.ifPresent(name -> @@ -72,7 +77,8 @@ public void triggered(SchedulerEngine.Event event) { * state in the policy's metadata * @return An optional snapshot name if the request was issued successfully */ - public static Optional maybeTakeSnapshot(final String jobId, final Client client, final ClusterService clusterService) { + public static Optional maybeTakeSnapshot(final String jobId, final Client client, final ClusterService clusterService, + final SnapshotHistoryStore historyStore) { Optional maybeMetadata = getSnapPolicyMetadata(jobId, clusterService.state()); String snapshotName = maybeMetadata.map(policyMetadata -> { CreateSnapshotRequest request = policyMetadata.getPolicy().toRequest(); @@ -85,16 +91,29 @@ public static Optional maybeTakeSnapshot(final String jobId, final Clien public void onResponse(CreateSnapshotResponse createSnapshotResponse) { logger.debug("snapshot response for [{}]: {}", policyMetadata.getPolicy().getId(), Strings.toString(createSnapshotResponse)); + final long timestamp = Instant.now().toEpochMilli(); clusterService.submitStateUpdateTask("slm-record-success-" + policyMetadata.getPolicy().getId(), - WriteJobStatus.success(policyMetadata.getPolicy().getId(), request.snapshot(), Instant.now().toEpochMilli())); + WriteJobStatus.success(policyMetadata.getPolicy().getId(), request.snapshot(), timestamp)); + historyStore.putAsync(SnapshotHistoryItem.successRecord(timestamp, policyMetadata.getPolicy(), request.snapshot())); } @Override public void onFailure(Exception e) { logger.error("failed to issue create snapshot request for snapshot lifecycle policy [{}]: {}", policyMetadata.getPolicy().getId(), e); + final long timestamp = Instant.now().toEpochMilli(); clusterService.submitStateUpdateTask("slm-record-failure-" + policyMetadata.getPolicy().getId(), - WriteJobStatus.failure(policyMetadata.getPolicy().getId(), request.snapshot(), Instant.now().toEpochMilli(), e)); + WriteJobStatus.failure(policyMetadata.getPolicy().getId(), request.snapshot(), timestamp, e)); + final SnapshotHistoryItem failureRecord; + try { + failureRecord = SnapshotHistoryItem.failureRecord(timestamp, policyMetadata.getPolicy(), request.snapshot(), e); + historyStore.putAsync(failureRecord); + } catch (IOException ex) { + // This shouldn't happen unless there's an issue with serializing the original exception, which shouldn't happen + logger.error(new ParameterizedMessage( + "failed to record snapshot creation failure for snapshot lifecycle policy [{}]", + policyMetadata.getPolicy().getId()), e); + } } }); return request.snapshot(); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/TransportExecuteSnapshotLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/TransportExecuteSnapshotLifecycleAction.java index 883a153b01610..8ff080dc542ad 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/TransportExecuteSnapshotLifecycleAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/TransportExecuteSnapshotLifecycleAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecycleMetadata; import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicyMetadata; import org.elasticsearch.xpack.core.snapshotlifecycle.action.ExecuteSnapshotLifecycleAction; +import org.elasticsearch.xpack.core.snapshotlifecycle.history.SnapshotHistoryStore; import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleService; import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleTask; @@ -37,14 +38,16 @@ public class TransportExecuteSnapshotLifecycleAction private static final Logger logger = LogManager.getLogger(TransportExecuteSnapshotLifecycleAction.class); private final Client client; + private final SnapshotHistoryStore historyStore; @Inject public TransportExecuteSnapshotLifecycleAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - Client client) { + Client client, SnapshotHistoryStore historyStore) { super(ExecuteSnapshotLifecycleAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, ExecuteSnapshotLifecycleAction.Request::new); this.client = client; + this.historyStore = historyStore; } @Override protected String executor() { @@ -80,7 +83,7 @@ protected void masterOperation(final ExecuteSnapshotLifecycleAction.Request requ } final Optional snapshotName = SnapshotLifecycleTask.maybeTakeSnapshot(SnapshotLifecycleService.getJobId(policyMetadata), - client, clusterService); + client, clusterService, historyStore); if (snapshotName.isPresent()) { listener.onResponse(new ExecuteSnapshotLifecycleAction.Response(snapshotName.get())); } else { diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationTests.java index a1a37beb1d129..490804bdd6f74 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationTests.java @@ -101,6 +101,9 @@ protected Settings nodeSettings(int nodeOrdinal) { settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false); settings.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); settings.put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s"); + + // This is necessary to prevent SLM installing a lifecycle policy, these tests assume a blank slate + settings.put(LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING.getKey(), false); return settings.build(); } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecyclePolicyTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecyclePolicyTests.java index cbce432406b0d..190c378937a17 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecyclePolicyTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecyclePolicyTests.java @@ -78,11 +78,15 @@ protected SnapshotLifecyclePolicy doParseInstance(XContentParser parser) throws @Override protected SnapshotLifecyclePolicy createTestInstance() { + id = randomAlphaOfLength(5); + return randomSnapshotLifecyclePolicy(id); + } + + public static SnapshotLifecyclePolicy randomSnapshotLifecyclePolicy(String id) { Map config = new HashMap<>(); for (int i = 0; i < randomIntBetween(2, 5); i++) { config.put(randomAlphaOfLength(4), randomAlphaOfLength(4)); } - id = randomAlphaOfLength(5); return new SnapshotLifecyclePolicy(id, randomAlphaOfLength(4), randomSchedule(), @@ -90,7 +94,7 @@ protected SnapshotLifecyclePolicy createTestInstance() { config); } - private String randomSchedule() { + private static String randomSchedule() { return randomIntBetween(0, 59) + " " + randomIntBetween(0, 59) + " " + randomIntBetween(0, 12) + " * * ?"; @@ -114,7 +118,7 @@ protected SnapshotLifecyclePolicy mutateInstance(SnapshotLifecyclePolicy instanc case 2: return new SnapshotLifecyclePolicy(instance.getId(), instance.getName(), - randomValueOtherThan(instance.getSchedule(), this::randomSchedule), + randomValueOtherThan(instance.getSchedule(), SnapshotLifecyclePolicyTests::randomSchedule), instance.getRepository(), instance.getConfig()); case 3: diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleServiceTests.java index 794ca7df213de..801b774d418df 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleServiceTests.java @@ -297,7 +297,7 @@ class FakeSnapshotTask extends SnapshotLifecycleTask { private final Consumer onTriggered; FakeSnapshotTask(Consumer onTriggered) { - super(null, null); + super(null, null, null); this.onTriggered = onTriggered; } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleTaskTests.java index 02e9705008bb7..999486415a141 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleTaskTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.snapshotlifecycle; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; @@ -14,11 +15,13 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotAction; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.TriFunction; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; @@ -30,14 +33,19 @@ import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecycleMetadata; import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy; import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicyMetadata; +import org.elasticsearch.xpack.core.snapshotlifecycle.history.SnapshotHistoryItem; +import org.elasticsearch.xpack.core.snapshotlifecycle.history.SnapshotHistoryStore; import java.io.IOException; +import java.time.ZoneId; +import java.time.ZoneOffset; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -82,8 +90,10 @@ public void testSkipCreatingSnapshotWhenJobDoesNotMatch() { fail("should not have tried to take a snapshot"); return null; })) { + SnapshotHistoryStore historyStore = new VerifyingHistoryStore(null, ZoneOffset.UTC, + item -> fail("should not have tried to store an item")); - SnapshotLifecycleTask task = new SnapshotLifecycleTask(client, clusterService); + SnapshotLifecycleTask task = new SnapshotLifecycleTask(client, clusterService, historyStore); // Trigger the event, but since the job name does not match, it should // not run the function to create a snapshot @@ -128,14 +138,15 @@ public void testCreateSnapshotOnTrigger() { " }" + "}"; - final AtomicBoolean called = new AtomicBoolean(false); + final AtomicBoolean clientCalled = new AtomicBoolean(false); + final SetOnce snapshotName = new SetOnce<>(); try (ClusterService clusterService = ClusterServiceUtils.createClusterService(state, threadPool); // This verifying client will verify that we correctly invoked // client.admin().createSnapshot(...) with the appropriate // request. It also returns a mock real response VerifyingClient client = new VerifyingClient(threadPool, (action, request, listener) -> { - assertFalse(called.getAndSet(true)); + assertFalse(clientCalled.getAndSet(true)); assertThat(action, instanceOf(CreateSnapshotAction.class)); assertThat(request, instanceOf(CreateSnapshotRequest.class)); @@ -144,6 +155,7 @@ public void testCreateSnapshotOnTrigger() { SnapshotLifecyclePolicy policy = slpm.getPolicy(); assertThat(req.snapshot(), startsWith(policy.getName() + "-")); assertThat(req.repository(), equalTo(policy.getRepository())); + snapshotName.set(req.snapshot()); if (req.indices().length > 0) { assertThat(Arrays.asList(req.indices()), equalTo(policy.getConfig().get("indices"))); } @@ -158,13 +170,24 @@ public void testCreateSnapshotOnTrigger() { return null; } })) { - - SnapshotLifecycleTask task = new SnapshotLifecycleTask(client, clusterService); + final AtomicBoolean historyStoreCalled = new AtomicBoolean(false); + SnapshotHistoryStore historyStore = new VerifyingHistoryStore(null, ZoneOffset.UTC, + item -> { + assertFalse(historyStoreCalled.getAndSet(true)); + final SnapshotLifecyclePolicy policy = slpm.getPolicy(); + assertEquals(policy.getId(), item.getPolicyId()); + assertEquals(policy.getRepository(), item.getRepository()); + assertEquals(policy.getConfig(), item.getSnapshotConfiguration()); + assertEquals(snapshotName.get(), item.getSnapshotName()); + }); + + SnapshotLifecycleTask task = new SnapshotLifecycleTask(client, clusterService, historyStore); // Trigger the event with a matching job name for the policy task.triggered(new SchedulerEngine.Event(SnapshotLifecycleService.getJobId(slpm), System.currentTimeMillis(), System.currentTimeMillis())); - assertTrue("snapshot should be triggered once", called.get()); + assertTrue("snapshot should be triggered once", clientCalled.get()); + assertTrue("history store should be called once", historyStoreCalled.get()); } threadPool.shutdownNow(); @@ -203,4 +226,19 @@ private SnapshotLifecyclePolicyMetadata makePolicyMeta(final String id) { .setModifiedDate(1) .build(); } + + public static class VerifyingHistoryStore extends SnapshotHistoryStore { + + Consumer verifier; + + public VerifyingHistoryStore(Client client, ZoneId timeZone, Consumer verifier) { + super(Settings.EMPTY, client, timeZone); + this.verifier = verifier; + } + + @Override + public void putAsync(SnapshotHistoryItem item) { + verifier.accept(item); + } + } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle b/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle index e6fd8412c948b..15b25cfac7600 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle @@ -40,6 +40,7 @@ integTestCluster { setting 'xpack.security.enabled', 'true' setting 'xpack.ml.enabled', 'true' setting 'xpack.watcher.enabled', 'false' + setting 'xpack.ilm.enabled', 'false' setting 'logger.org.elasticsearch.xpack.ml.datafeed', 'TRACE' setting 'xpack.monitoring.enabled', 'false' setting 'xpack.security.authc.token.enabled', 'true'