Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 125 additions & 36 deletions server/src/main/java/org/elasticsearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.document.LatLonDocValuesField;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
Expand All @@ -36,15 +35,18 @@
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.IndexSearcher;
Expand All @@ -66,19 +68,23 @@
import org.apache.lucene.store.Lock;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.Version;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.index.analysis.AnalyzerScope;
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.fielddata.IndexFieldData;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -99,6 +105,9 @@ public class Lucene {
}

public static final String SOFT_DELETE_FIELD = "__soft_delete";
// We can't use hard-deletes with soft-deletes together because we can't exclude documents that hard-deleted and soft-deleted
// from liveDocs. We use this numeric docValues to exclude rolled back documents from liveDocs.
public static final String ROLLED_BACK_FIELD = "__rolled_back";

public static final NamedAnalyzer STANDARD_ANALYZER = new NamedAnalyzer("_standard", AnalyzerScope.GLOBAL, new StandardAnalyzer());
public static final NamedAnalyzer KEYWORD_ANALYZER = new NamedAnalyzer("_keyword", AnalyzerScope.GLOBAL, new KeywordAnalyzer());
Expand Down Expand Up @@ -837,28 +846,120 @@ public int length() {
}

/**
* Wraps a directory reader to include all live docs.
* The wrapped reader can be used to query all documents.
*
* @param in the input directory reader
* @return the wrapped reader
* Creates a wrapper that wraps a provided directory reader to make all docs as live except those marked as rolled back.
* The wrapped reader should be used to query history in Lucene index.
*/
public static DirectoryReader wrapAllDocsLive(DirectoryReader in) throws IOException {
return new DirectoryReaderWithAllLiveDocs(in);
}
public static CheckedFunction<DirectoryReader, DirectoryReader, IOException> noDeletesReaderWrapper() {
NoDeletesReaderWrapper wrapper = new NoDeletesReaderWrapper();
return wrapper::wrapNoDeletes;
}

// pkg level for testing
static final class NoDeletesReaderWrapper {
// Docs are marked as rolled back only via changing the rollback docValues field. This update will increase the docValuesGen.
// We can use CoreCacheKey of the reader as the key, then recompute if the associated liveDocs has a different docValuesGen.
final Map<IndexReader.CacheKey, NoDeletesLiveDocs> liveDocsCache = ConcurrentCollections.newConcurrentMap();

DirectoryReader wrapNoDeletes(DirectoryReader in) throws IOException {
return new NoDeletesDirectoryReader(in);
}

private static final class NoDeletesLiveDocs {
final int numDocs;
final FixedBitSet bits;
final long docValueGen;
NoDeletesLiveDocs(FixedBitSet bits, int numDocs, long docValuesGen) {
assert numDocs == bits.cardinality();
this.numDocs = numDocs;
this.bits = bits;
this.docValueGen = docValuesGen;
}
}

private final class NoDeletesDirectoryReader extends FilterDirectoryReader {
NoDeletesDirectoryReader(DirectoryReader in) throws IOException {
super(in, new NoDeletesSubReaderWrapper());
}
@Override
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
return new NoDeletesDirectoryReader(in);
}
@Override
public CacheHelper getReaderCacheHelper() {
return null; // Modifying liveDocs
}
}

private static final class DirectoryReaderWithAllLiveDocs extends FilterDirectoryReader {
static final class SubReaderWithAllLiveDocs extends FilterLeafReader {
SubReaderWithAllLiveDocs(LeafReader in) {
private final class NoDeletesSubReaderWrapper extends FilterDirectoryReader.SubReaderWrapper {
@Override
public LeafReader wrap(LeafReader leaf) {
try {
if (leaf.getLiveDocs() == null) {
return leaf;
}
DocIdSetIterator rollbackDocs = DocValuesFieldExistsQuery.getDocValuesDocIdSetIterator(ROLLED_BACK_FIELD, leaf);
if (rollbackDocs == null) {
return new SubReaderWithLiveDocs(leaf, null, leaf.maxDoc());
}
final SegmentReader segmentReader = segmentReader(leaf);
final long docValuesGen = segmentReader.getSegmentInfo().getDocValuesGen();
final IndexReader.CacheHelper cacheHelper = segmentReader.getCoreCacheHelper();
final NoDeletesLiveDocs liveDocs;
if (cacheHelper != null) {
liveDocs = liveDocsCache.compute(cacheHelper.getKey(), (k, v) -> {
if (v == null) {
// only need to register for the first time
cacheHelper.addClosedListener(liveDocsCache::remove);
}
if (v == null || v.docValueGen != docValuesGen) {
return makeLiveDocs(segmentReader, rollbackDocs, docValuesGen);
} else {
assert v.bits.equals(makeLiveDocs(leaf, rollbackDocs, docValuesGen).bits);
return v;
}
});
} else {
liveDocs = makeLiveDocs(segmentReader, rollbackDocs, docValuesGen);
}
return new SubReaderWithLiveDocs(leaf, liveDocs.bits, liveDocs.numDocs);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

NoDeletesLiveDocs makeLiveDocs(LeafReader leaf, DocIdSetIterator rollbackDocs, long docValuesGen) {
try {
int rollbackCount = 0;
FixedBitSet liveDocs = new FixedBitSet(leaf.maxDoc());
liveDocs.set(0, liveDocs.length());
int docId;
while ((docId = rollbackDocs.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
assert leaf.getLiveDocs().get(docId) == false : "doc [" + docId + "] is rolled back but not deleted";
liveDocs.clear(docId);
rollbackCount++;
}
return new NoDeletesLiveDocs(liveDocs, liveDocs.length() - rollbackCount, docValuesGen);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
}

private static final class SubReaderWithLiveDocs extends FilterLeafReader {
final Bits liveDocs;
final int numDocs;
SubReaderWithLiveDocs(LeafReader in, Bits liveDocs, int numDocs) {
super(in);
this.liveDocs = liveDocs;
this.numDocs = numDocs;
}
@Override
public Bits getLiveDocs() {
return null;
return liveDocs;
}
@Override
public int numDocs() {
return maxDoc();
return numDocs;
}
@Override
public CacheHelper getCoreCacheHelper() {
Expand All @@ -869,29 +970,17 @@ public CacheHelper getReaderCacheHelper() {
return null; // Modifying liveDocs
}
}
DirectoryReaderWithAllLiveDocs(DirectoryReader in) throws IOException {
super(in, new FilterDirectoryReader.SubReaderWrapper() {
@Override
public LeafReader wrap(LeafReader leaf) {
return new SubReaderWithAllLiveDocs(leaf);
}
});
}
@Override
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
return wrapAllDocsLive(in);
}

@Override
public CacheHelper getReaderCacheHelper() {
return null; // Modifying liveDocs
}
}

/**
* Returns a numeric docvalues which can be used to soft-delete documents.
*/
public static NumericDocValuesField newSoftDeleteField() {
return new NumericDocValuesField(SOFT_DELETE_FIELD, 1);
/** A shortcut allows reading numeric docValues once. The docId must have value in the accessing DV */
public static long readNumericDV(LeafReader leafReader, String field, int docId) throws IOException {
NumericDocValues dv = leafReader.getNumericDocValues(field);
if (dv == null) {
throw new IllegalStateException("missing docValues [" + field + "]");
}
if (dv.advanceExact(docId) == false) {
throw new IllegalStateException("doc [" + docId + "] does not exist in [" + field + "] docValues");
}
return dv.longValue();
}
}
Loading