Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public class InternalEngine extends Engine {
private final AtomicInteger throttleRequestCount = new AtomicInteger();
private final EngineConfig.OpenMode openMode;
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
private static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
Expand Down Expand Up @@ -178,6 +179,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
}
logger.trace("recovered [{}]", seqNoStats);
seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
// norelease
/*
* We have no guarantees that all operations above the local checkpoint are in the Lucene commit or the translog. This means
Expand Down Expand Up @@ -226,6 +228,17 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
logger.trace("created new InternalEngine");
}

private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) {
long commitMaxUnsafeAutoIdTimestamp = Long.MIN_VALUE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's wrong with writer.getLiveCommitData().getOrDefault(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.MIN_VALUE);?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's an iterable :

public final synchronized Iterable<Map.Entry<String,String>> getLiveCommitData() {

I can throw it into a HashMap, if you prefer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wondered if there was something better than iterating too but there's not since IndexWriter#getLiveCommitData only returns an Iterable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah oh well :) fair enough...

for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
if (entry.getKey().equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) {
commitMaxUnsafeAutoIdTimestamp = Long.parseLong(entry.getValue());
break;
}
}
maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), commitMaxUnsafeAutoIdTimestamp));
}

private static SequenceNumbersService sequenceNumberService(
final ShardId shardId,
final IndexSettings indexSettings,
Expand Down Expand Up @@ -500,7 +513,7 @@ private boolean canOptimizeAddDocument(Index index) {
return true;
case LOCAL_TRANSLOG_RECOVERY:
assert index.isRetry();
return false; // even if retry is set we never optimize local recovery
return true; // allow to optimize in order to update the max safe time stamp
default:
throw new IllegalArgumentException("unknown origin " + index.origin());
}
Expand Down Expand Up @@ -1770,6 +1783,7 @@ private long commitIndexWriter(final IndexWriter writer, final Translog translog
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3220,11 +3220,40 @@ public void testEngineMaxTimestampIsInitialized() throws IOException {

}

long maxTimestamp = Math.abs(randomLong());
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE,
maxTimestamp, null))) {
assertEquals(maxTimestamp, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
final long timestamp1 = Math.abs(randomLong());
final Path storeDir = createTempDir();
final Path translogDir = createTempDir();
try (Store store = createStore(newFSDirectory(storeDir));
Engine engine = new InternalEngine(
config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, timestamp1, null))) {
assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
}
final long timestamp2 = randomNonNegativeLong();
final long timestamp3 = randomNonNegativeLong();
final long maxTimestamp12 = Math.max(timestamp1, timestamp2);
final long maxTimestamp123 = Math.max(maxTimestamp12, timestamp3);
try (Store store = createStore(newFSDirectory(storeDir));
Engine engine = new InternalEngine(
copy(config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, timestamp2, null),
randomFrom(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG)))) {
assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
if (engine.config().getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
// recover from translog and commit maxTimestamp12
engine.recoverFromTranslog();
// force flush as the were no ops performed
engine.flush(true, false);
}
final ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(),
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
engine.index(appendOnlyPrimary(doc, true, timestamp3));
assertEquals(maxTimestamp123, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
}
try (Store store = createStore(newFSDirectory(storeDir));
Engine engine = new InternalEngine(
config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) {
assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
engine.recoverFromTranslog();
assertEquals(maxTimestamp123, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
}
}

Expand Down