Skip to content

Commit 9a73ba7

Browse files
committed
Add snapshots pending deletion in cluster state to delete snapshot when searchable snapshot index
1 parent b4bcb86 commit 9a73ba7

File tree

13 files changed

+1381
-69
lines changed

13 files changed

+1381
-69
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,6 @@ protected void masterOperation(
6464
ClusterState state,
6565
final ActionListener<AcknowledgedResponse> listener
6666
) {
67-
snapshotsService.deleteSnapshots(request, listener.map(v -> AcknowledgedResponse.TRUE));
67+
snapshotsService.deleteSnapshotsByName(request, listener.map(v -> AcknowledgedResponse.TRUE));
6868
}
6969
}

server/src/main/java/org/elasticsearch/cluster/ClusterModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ public static List<Entry> getNamedWriteables() {
125125
SnapshotDeletionsInProgress::readDiffFrom);
126126
registerClusterCustom(entries, RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress::new,
127127
RepositoryCleanupInProgress::readDiffFrom);
128+
registerClusterCustom(entries, SnapshotDeletionsInPending.TYPE, SnapshotDeletionsInPending::new,
129+
SnapshotDeletionsInPending::readDiffFrom);
128130
// Metadata
129131
registerMetadataCustom(entries, RepositoriesMetadata.TYPE, RepositoriesMetadata::new, RepositoriesMetadata::readDiffFrom);
130132
registerMetadataCustom(entries, IngestMetadata.TYPE, IngestMetadata::new, IngestMetadata::readDiffFrom);
Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.cluster;
10+
11+
import org.elasticsearch.Version;
12+
import org.elasticsearch.cluster.ClusterState.Custom;
13+
import org.elasticsearch.common.io.stream.StreamInput;
14+
import org.elasticsearch.common.io.stream.StreamOutput;
15+
import org.elasticsearch.common.io.stream.Writeable;
16+
import org.elasticsearch.snapshots.SnapshotId;
17+
import org.elasticsearch.xcontent.ToXContentObject;
18+
import org.elasticsearch.xcontent.XContentBuilder;
19+
20+
import java.io.IOException;
21+
import java.util.Collections;
22+
import java.util.Comparator;
23+
import java.util.Iterator;
24+
import java.util.Objects;
25+
import java.util.Set;
26+
import java.util.SortedSet;
27+
import java.util.TreeSet;
28+
import java.util.function.Consumer;
29+
30+
import static java.util.Collections.unmodifiableSortedSet;
31+
32+
/**
33+
* Represents snapshots marked as to be deleted and pending deletion.
34+
*/
35+
public class SnapshotDeletionsInPending extends AbstractNamedDiffable<Custom> implements Custom {
36+
37+
public static final SnapshotDeletionsInPending EMPTY = new SnapshotDeletionsInPending(Collections.emptySortedSet());
38+
public static final String TYPE = "snapshot_deletions_pending";
39+
40+
public static final int MAX_PENDING_DELETIONS = 500;
41+
42+
/**
43+
* A list of snapshots to delete, sorted by creation time
44+
*/
45+
private final SortedSet<Entry> entries;
46+
47+
private SnapshotDeletionsInPending(SortedSet<Entry> entries) {
48+
this.entries = unmodifiableSortedSet(Objects.requireNonNull(entries));
49+
assert entries.size() <= MAX_PENDING_DELETIONS : entries.size() + " > " + MAX_PENDING_DELETIONS;
50+
}
51+
52+
public SnapshotDeletionsInPending(StreamInput in) throws IOException {
53+
this(new TreeSet<>(in.readSet(Entry::new)));
54+
}
55+
56+
@Override
57+
public void writeTo(StreamOutput out) throws IOException {
58+
out.writeCollection(entries);
59+
}
60+
61+
@Override
62+
public String getWriteableName() {
63+
return TYPE;
64+
}
65+
66+
public boolean isEmpty() {
67+
return entries.isEmpty();
68+
}
69+
70+
public boolean contains(SnapshotId snapshotId) {
71+
return entries.stream().anyMatch(entry -> Objects.equals(entry.getSnapshotId(), snapshotId));
72+
}
73+
74+
public SortedSet<Entry> entries() {
75+
return entries;
76+
}
77+
78+
@Override
79+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
80+
builder.startArray(TYPE);
81+
for (Entry entry : entries) {
82+
entry.toXContent(builder, params);
83+
}
84+
builder.endArray();
85+
return builder;
86+
}
87+
88+
@Override
89+
public Version getMinimalSupportedVersion() {
90+
return Version.CURRENT.minimumCompatibilityVersion();
91+
}
92+
93+
public SnapshotDeletionsInPending withRemovedSnapshots(Set<SnapshotId> snapshotIds) {
94+
if (snapshotIds == null || snapshotIds.isEmpty()) {
95+
return this;
96+
}
97+
boolean changed = false;
98+
final SortedSet<Entry> updatedEntries = new TreeSet<>(entries);
99+
if (updatedEntries.removeIf(entry -> snapshotIds.contains(entry.getSnapshotId()))) {
100+
changed = true;
101+
}
102+
if (changed == false) {
103+
return this;
104+
} else if (updatedEntries.isEmpty()) {
105+
return EMPTY;
106+
} else {
107+
return new SnapshotDeletionsInPending(updatedEntries);
108+
}
109+
}
110+
111+
@Override
112+
public String toString() {
113+
final StringBuilder builder = new StringBuilder("SnapshotDeletionsInPending[");
114+
boolean prepend = true;
115+
116+
final Iterator<Entry> iterator = entries.stream().iterator();
117+
while (iterator.hasNext()) {
118+
if (prepend == false) {
119+
builder.append(',');
120+
}
121+
final Entry entry = iterator.next();
122+
builder.append('[').append(entry.repositoryName).append('/').append(entry.repositoryUuid).append(']');
123+
builder.append('[').append(entry.snapshotId).append(',').append(entry.creationTime).append(']');
124+
builder.append('\n');
125+
prepend = false;
126+
}
127+
builder.append(']');
128+
return builder.toString();
129+
}
130+
131+
public static class Entry implements Writeable, ToXContentObject, Comparable<Entry> {
132+
133+
private final String repositoryName;
134+
private final String repositoryUuid;
135+
private final SnapshotId snapshotId;
136+
private final long creationTime;
137+
138+
public Entry(String repositoryName, String repositoryUuid, SnapshotId snapshotId, long creationTime) {
139+
this.repositoryName = Objects.requireNonNull(repositoryName);
140+
this.repositoryUuid = Objects.requireNonNull(repositoryUuid);
141+
this.snapshotId = Objects.requireNonNull(snapshotId);
142+
this.creationTime = creationTime;
143+
}
144+
145+
private Entry(StreamInput in) throws IOException {
146+
this.repositoryName = in.readString();
147+
this.repositoryUuid = in.readString();
148+
this.creationTime = in.readVLong();
149+
this.snapshotId = new SnapshotId(in);
150+
}
151+
152+
@Override
153+
public void writeTo(StreamOutput out) throws IOException {
154+
out.writeString(repositoryName);
155+
out.writeString(repositoryUuid);
156+
out.writeVLong(creationTime);
157+
snapshotId.writeTo(out);
158+
}
159+
160+
public String getRepositoryName() {
161+
return repositoryName;
162+
}
163+
164+
public String getRepositoryUuid() {
165+
return repositoryUuid;
166+
}
167+
168+
public SnapshotId getSnapshotId() {
169+
return snapshotId;
170+
}
171+
172+
public long getCreationTime() {
173+
return creationTime;
174+
}
175+
176+
@Override
177+
public boolean equals(Object o) {
178+
if (this == o) return true;
179+
if (o == null || getClass() != o.getClass()) return false;
180+
Entry entry = (Entry) o;
181+
return creationTime == entry.creationTime
182+
&& Objects.equals(repositoryName, entry.repositoryName)
183+
&& Objects.equals(repositoryUuid, entry.repositoryUuid)
184+
&& Objects.equals(snapshotId, entry.snapshotId);
185+
}
186+
187+
@Override
188+
public int hashCode() {
189+
return Objects.hash(repositoryName, repositoryUuid, snapshotId, creationTime);
190+
}
191+
192+
@Override
193+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
194+
builder.startObject();
195+
{
196+
builder.field("repository_name", repositoryName);
197+
builder.field("repository_uuid", repositoryUuid);
198+
builder.timeField("creation_time_millis", "creation_time", creationTime);
199+
builder.field("snapshot", snapshotId);
200+
}
201+
builder.endObject();
202+
return builder;
203+
}
204+
205+
@Override
206+
public int compareTo(final Entry other) {
207+
return Comparator.comparingLong(Entry::getCreationTime)
208+
.reversed()
209+
.thenComparing(Entry::getSnapshotId)
210+
.compare(this, other);
211+
}
212+
}
213+
214+
public static final class Builder {
215+
216+
private final SortedSet<Entry> entries = new TreeSet<>();
217+
private final Consumer<Entry> consumer;
218+
219+
public Builder(SnapshotDeletionsInPending snapshotDeletionsInPending, Consumer<Entry> onLimitExceeded) {
220+
entries.addAll(snapshotDeletionsInPending.entries);
221+
this.consumer = onLimitExceeded;
222+
}
223+
224+
private void ensureLimit() {
225+
while (entries.size() >= MAX_PENDING_DELETIONS) {
226+
final Entry removed = entries.last();
227+
entries.remove(removed);
228+
if (consumer != null) {
229+
consumer.accept(removed);
230+
}
231+
}
232+
}
233+
234+
public Builder add(String repositoryName, String repositoryUuid, SnapshotId snapshotId, long creationTime) {
235+
ensureLimit();
236+
entries.add(new Entry(repositoryName, repositoryUuid, snapshotId, creationTime));
237+
return this;
238+
}
239+
240+
public SnapshotDeletionsInPending build() {
241+
ensureLimit();
242+
return entries.isEmpty() == false ? new SnapshotDeletionsInPending(entries) : EMPTY;
243+
}
244+
}
245+
246+
public static NamedDiff<Custom> readDiffFrom(StreamInput in) throws IOException {
247+
return readDiffFrom(Custom.class, TYPE, in);
248+
}
249+
}

server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,15 @@ public boolean hasExecutingDeletion(String repository) {
114114
return false;
115115
}
116116

117+
/**
118+
* Checks if the current {@link SnapshotDeletionsInProgress} contains the given {@link SnapshotId}
119+
*
120+
* @param snapshotId the snapshot id
121+
*/
122+
public boolean contains(SnapshotId snapshotId) {
123+
return getEntries().stream().anyMatch(entry -> entry.getSnapshots().contains(snapshotId));
124+
}
125+
117126
/**
118127
* Returns {@code true} if there are snapshot deletions in progress in the cluster,
119128
* returns {@code false} otherwise.

0 commit comments

Comments
 (0)