Skip to content

Commit 04cb636

Browse files
dakronetlrx
authored andcommitted
Add a NoopEngine implementation (elastic#31163)
This adds a new Engine implementation that does.. nothing. Any operations throw an `UnsupportedOperationException` when tried. This engine is intended as a building block for replicated closed indices in subsequent code changes. Relates to elastic#31141
1 parent eae5487 commit 04cb636

File tree

4 files changed

+513
-1
lines changed

4 files changed

+513
-1
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,7 @@ private void fillSegmentInfo(SegmentReader segmentReader, boolean verbose, boole
970970
*/
971971
public abstract List<Segment> segments(boolean verbose);
972972

973-
public final boolean refreshNeeded() {
973+
public boolean refreshNeeded() {
974974
if (store.tryIncRef()) {
975975
/*
976976
we need to inc the store here since we acquire a searcher and that might keep a file open on the
Lines changed: 384 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,384 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.engine;
21+
22+
import org.apache.lucene.index.DirectoryReader;
23+
import org.apache.lucene.index.IndexCommit;
24+
import org.apache.lucene.index.SegmentInfos;
25+
import org.elasticsearch.common.Nullable;
26+
import org.elasticsearch.index.seqno.SeqNoStats;
27+
import org.elasticsearch.index.seqno.SequenceNumbers;
28+
import org.elasticsearch.index.translog.Translog;
29+
import org.elasticsearch.index.translog.TranslogConfig;
30+
import org.elasticsearch.index.translog.TranslogCorruptedException;
31+
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
32+
import org.elasticsearch.index.translog.TranslogStats;
33+
34+
import java.io.Closeable;
35+
import java.io.IOException;
36+
import java.util.Arrays;
37+
import java.util.List;
38+
import java.util.Map;
39+
import java.util.concurrent.CountDownLatch;
40+
import java.util.function.BiFunction;
41+
import java.util.function.LongSupplier;
42+
import java.util.stream.Stream;
43+
44+
/**
45+
* NoopEngine is an engine implementation that does nothing but the bare minimum
46+
* required in order to have an engine. All attempts to do something (search,
47+
* index, get), throw {@link UnsupportedOperationException}. This does maintain
48+
* a translog with a deletion policy so that when flushing, no translog is
49+
* retained on disk (setting a retention size and age of 0).
50+
*
51+
* It's also important to notice that this does list the commits of the Store's
52+
* Directory so that the last commit's user data can be read for the historyUUID
53+
* and last committed segment info.
54+
*/
55+
final class NoopEngine extends Engine {
56+
57+
private static final Translog.Snapshot EMPTY_TRANSLOG_SNAPSHOT = new Translog.Snapshot() {
58+
@Override
59+
public int totalOperations() {
60+
return 0;
61+
}
62+
63+
@Override
64+
public Translog.Operation next() {
65+
return null;
66+
}
67+
68+
@Override
69+
public void close() {
70+
}
71+
};
72+
73+
private static final TranslogStats EMPTY_TRANSLOG_STATS = new TranslogStats(0, 0, 0, 0, 0);
74+
private static final Translog.Location EMPTY_TRANSLOG_LOCATION = new Translog.Location(0, 0, 0);
75+
76+
private final IndexCommit lastCommit;
77+
private final long localCheckpoint;
78+
private final long maxSeqNo;
79+
private final String historyUUID;
80+
private final SegmentInfos lastCommittedSegmentInfos;
81+
82+
NoopEngine(EngineConfig engineConfig) {
83+
super(engineConfig);
84+
85+
store.incRef();
86+
boolean success = false;
87+
88+
try {
89+
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
90+
List<IndexCommit> indexCommits = DirectoryReader.listCommits(store.directory());
91+
lastCommit = indexCommits.get(indexCommits.size() - 1);
92+
historyUUID = lastCommit.getUserData().get(HISTORY_UUID_KEY);
93+
localCheckpoint = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
94+
maxSeqNo = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO));
95+
96+
// The deletion policy for the translog should not keep any translogs around, so the min age/size is set to -1
97+
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1);
98+
99+
// The translog is opened and closed to validate that the translog UUID from lucene is the same as the one in the translog
100+
try (Translog translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier())) {
101+
if (translog.totalOperations() != 0) {
102+
throw new IllegalArgumentException("expected 0 translog operations but there were " + translog.totalOperations());
103+
}
104+
}
105+
106+
success = true;
107+
} catch (IOException | TranslogCorruptedException e) {
108+
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
109+
} finally {
110+
if (success == false) {
111+
if (isClosed.get() == false) {
112+
// failure we need to dec the store reference
113+
store.decRef();
114+
}
115+
}
116+
}
117+
logger.trace("created new NoopEngine");
118+
}
119+
120+
private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy translogDeletionPolicy,
121+
LongSupplier globalCheckpointSupplier) throws IOException {
122+
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
123+
final String translogUUID = loadTranslogUUIDFromLastCommit();
124+
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
125+
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier,
126+
engineConfig.getPrimaryTermSupplier());
127+
}
128+
129+
/**
130+
* Reads the current stored translog ID from the last commit data.
131+
*/
132+
@Nullable
133+
private String loadTranslogUUIDFromLastCommit() {
134+
final Map<String, String> commitUserData = lastCommittedSegmentInfos.getUserData();
135+
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
136+
throw new IllegalStateException("commit doesn't contain translog generation id");
137+
}
138+
return commitUserData.get(Translog.TRANSLOG_UUID_KEY);
139+
}
140+
141+
@Override
142+
protected SegmentInfos getLastCommittedSegmentInfos() {
143+
return lastCommittedSegmentInfos;
144+
}
145+
146+
@Override
147+
public String getHistoryUUID() {
148+
return historyUUID;
149+
}
150+
151+
@Override
152+
public long getWritingBytes() {
153+
return 0;
154+
}
155+
156+
@Override
157+
public long getIndexThrottleTimeInMillis() {
158+
return 0;
159+
}
160+
161+
@Override
162+
public boolean isThrottled() {
163+
return false;
164+
}
165+
166+
@Override
167+
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException {
168+
}
169+
170+
@Override
171+
public IndexResult index(Index index) {
172+
throw new UnsupportedOperationException("indexing is not supported on a noop engine");
173+
}
174+
175+
@Override
176+
public DeleteResult delete(Delete delete) {
177+
throw new UnsupportedOperationException("deletion is not supported on a noop engine");
178+
}
179+
180+
@Override
181+
public NoOpResult noOp(NoOp noOp) {
182+
throw new UnsupportedOperationException("noop is not supported on a noop engine");
183+
}
184+
185+
@Override
186+
public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) throws EngineException {
187+
throw new UnsupportedOperationException("synced flush is not supported on a noop engine");
188+
}
189+
190+
@Override
191+
public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException {
192+
throw new UnsupportedOperationException("gets are not supported on a noop engine");
193+
}
194+
195+
@Override
196+
public Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException {
197+
throw new UnsupportedOperationException("searching is not supported on a noop engine");
198+
}
199+
200+
@Override
201+
public boolean isTranslogSyncNeeded() {
202+
return false;
203+
}
204+
205+
@Override
206+
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) {
207+
throw new UnsupportedOperationException("translog synchronization should never be needed");
208+
}
209+
210+
@Override
211+
public void syncTranslog() {
212+
}
213+
214+
@Override
215+
public Closeable acquireTranslogRetentionLock() {
216+
return () -> { };
217+
}
218+
219+
@Override
220+
public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) {
221+
return EMPTY_TRANSLOG_SNAPSHOT;
222+
}
223+
224+
@Override
225+
public int estimateTranslogOperationsFromMinSeq(long minSeqNo) {
226+
return 0;
227+
}
228+
229+
@Override
230+
public TranslogStats getTranslogStats() {
231+
return EMPTY_TRANSLOG_STATS;
232+
}
233+
234+
@Override
235+
public Translog.Location getTranslogLastWriteLocation() {
236+
return EMPTY_TRANSLOG_LOCATION;
237+
}
238+
239+
@Override
240+
public long getLocalCheckpoint() {
241+
return this.localCheckpoint;
242+
}
243+
244+
@Override
245+
public void waitForOpsToComplete(long seqNo) {
246+
}
247+
248+
@Override
249+
public void resetLocalCheckpoint(long localCheckpoint) {
250+
assert localCheckpoint == getLocalCheckpoint() : "expected reset to existing local checkpoint of " +
251+
getLocalCheckpoint() + " got: " + localCheckpoint;
252+
}
253+
254+
@Override
255+
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
256+
return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
257+
}
258+
259+
@Override
260+
public long getLastSyncedGlobalCheckpoint() {
261+
return 0;
262+
}
263+
264+
@Override
265+
public long getIndexBufferRAMBytesUsed() {
266+
return 0;
267+
}
268+
269+
@Override
270+
public List<Segment> segments(boolean verbose) {
271+
return Arrays.asList(getSegmentInfo(lastCommittedSegmentInfos, verbose));
272+
}
273+
274+
@Override
275+
public void refresh(String source) throws EngineException {
276+
}
277+
278+
// Override the refreshNeeded method so that we don't attempt to acquire a searcher checking if we need to refresh
279+
@Override
280+
public boolean refreshNeeded() {
281+
// We never need to refresh a noop engine so always return false
282+
return false;
283+
}
284+
285+
@Override
286+
public void writeIndexingBuffer() throws EngineException {
287+
}
288+
289+
@Override
290+
public boolean shouldPeriodicallyFlush() {
291+
return false;
292+
}
293+
294+
@Override
295+
public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException {
296+
return new CommitId(lastCommittedSegmentInfos.getId());
297+
}
298+
299+
@Override
300+
public CommitId flush() throws EngineException {
301+
return new CommitId(lastCommittedSegmentInfos.getId());
302+
}
303+
304+
@Override
305+
public void trimUnreferencedTranslogFiles() throws EngineException {
306+
307+
}
308+
309+
@Override
310+
public boolean shouldRollTranslogGeneration() {
311+
return false;
312+
}
313+
314+
@Override
315+
public void rollTranslogGeneration() throws EngineException {
316+
}
317+
318+
@Override
319+
public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade,
320+
boolean upgradeOnlyAncientSegments) throws EngineException {
321+
}
322+
323+
@Override
324+
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException {
325+
return new Engine.IndexCommitRef(lastCommit, () -> {});
326+
}
327+
328+
@Override
329+
public IndexCommitRef acquireSafeIndexCommit() throws EngineException {
330+
return acquireLastIndexCommit(false);
331+
}
332+
333+
/**
334+
* Closes the engine without acquiring the write lock. This should only be
335+
* called while the write lock is hold or in a disaster condition ie. if the engine
336+
* is failed.
337+
*/
338+
@Override
339+
protected void closeNoLock(String reason, CountDownLatch closedLatch) {
340+
if (isClosed.compareAndSet(false, true)) {
341+
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() :
342+
"Either the write lock must be held or the engine must be currently be failing itself";
343+
try {
344+
store.decRef();
345+
logger.debug("engine closed [{}]", reason);
346+
} finally {
347+
closedLatch.countDown();
348+
}
349+
350+
}
351+
}
352+
353+
@Override
354+
public void activateThrottling() {
355+
throw new UnsupportedOperationException("closed engine can't throttle");
356+
}
357+
358+
@Override
359+
public void deactivateThrottling() {
360+
throw new UnsupportedOperationException("closed engine can't throttle");
361+
}
362+
363+
@Override
364+
public void restoreLocalCheckpointFromTranslog() {
365+
}
366+
367+
@Override
368+
public int fillSeqNoGaps(long primaryTerm) {
369+
return 0;
370+
}
371+
372+
@Override
373+
public Engine recoverFromTranslog() {
374+
return this;
375+
}
376+
377+
@Override
378+
public void skipTranslogRecovery() {
379+
}
380+
381+
@Override
382+
public void maybePruneDeletes() {
383+
}
384+
}

0 commit comments

Comments
 (0)