|
27 | 27 | import org.apache.lucene.search.CollectionStatistics; |
28 | 28 | import org.apache.lucene.search.CollectionTerminatedException; |
29 | 29 | import org.apache.lucene.search.Collector; |
| 30 | +import org.apache.lucene.search.CollectorManager; |
30 | 31 | import org.apache.lucene.search.ConjunctionDISI; |
31 | 32 | import org.apache.lucene.search.DocIdSetIterator; |
32 | 33 | import org.apache.lucene.search.Explanation; |
|
35 | 36 | import org.apache.lucene.search.Query; |
36 | 37 | import org.apache.lucene.search.QueryCache; |
37 | 38 | import org.apache.lucene.search.QueryCachingPolicy; |
| 39 | +import org.apache.lucene.search.ScoreDoc; |
38 | 40 | import org.apache.lucene.search.ScoreMode; |
39 | 41 | import org.apache.lucene.search.Scorer; |
40 | 42 | import org.apache.lucene.search.TermStatistics; |
| 43 | +import org.apache.lucene.search.TopFieldDocs; |
| 44 | +import org.apache.lucene.search.TotalHits; |
41 | 45 | import org.apache.lucene.search.Weight; |
42 | 46 | import org.apache.lucene.search.similarities.Similarity; |
43 | 47 | import org.apache.lucene.util.BitSet; |
44 | 48 | import org.apache.lucene.util.BitSetIterator; |
45 | 49 | import org.apache.lucene.util.Bits; |
46 | 50 | import org.apache.lucene.util.CombinedBitSet; |
47 | 51 | import org.apache.lucene.util.SparseFixedBitSet; |
| 52 | +import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; |
| 53 | +import org.elasticsearch.search.DocValueFormat; |
48 | 54 | import org.elasticsearch.search.dfs.AggregatedDfs; |
49 | 55 | import org.elasticsearch.search.profile.Timer; |
50 | 56 | import org.elasticsearch.search.profile.query.ProfileWeight; |
51 | 57 | import org.elasticsearch.search.profile.query.QueryProfileBreakdown; |
52 | 58 | import org.elasticsearch.search.profile.query.QueryProfiler; |
53 | 59 | import org.elasticsearch.search.profile.query.QueryTimingType; |
| 60 | +import org.elasticsearch.search.query.QuerySearchResult; |
54 | 61 |
|
55 | 62 | import java.io.IOException; |
| 63 | +import java.util.ArrayList; |
56 | 64 | import java.util.Arrays; |
57 | 65 | import java.util.List; |
58 | 66 | import java.util.Set; |
@@ -131,12 +139,86 @@ public Weight createWeight(Query query, ScoreMode scoreMode, float boost) throws |
131 | 139 | } |
132 | 140 | } |
133 | 141 |
|
| 142 | + private void checkCancelled() { |
| 143 | + if (checkCancelled != null) { |
| 144 | + checkCancelled.run(); |
| 145 | + } |
| 146 | + } |
| 147 | + |
| 148 | + public void search(List<LeafReaderContext> leaves, Weight weight, CollectorManager manager, |
| 149 | + QuerySearchResult result, DocValueFormat[] formats, TotalHits totalHits) throws IOException { |
| 150 | + final List<Collector> collectors = new ArrayList<>(leaves.size()); |
| 151 | + for (LeafReaderContext ctx : leaves) { |
| 152 | + final Collector collector = manager.newCollector(); |
| 153 | + searchLeaf(ctx, weight, collector); |
| 154 | + collectors.add(collector); |
| 155 | + } |
| 156 | + TopFieldDocs mergedTopDocs = (TopFieldDocs) manager.reduce(collectors); |
| 157 | + // Lucene sets shards indexes during merging of topDocs from different collectors |
| 158 | + // We need to reset shard index; ES will set shard index later during reduce stage |
| 159 | + for (ScoreDoc scoreDoc : mergedTopDocs.scoreDocs) { |
| 160 | + scoreDoc.shardIndex = -1; |
| 161 | + } |
| 162 | + if (totalHits != null) { // we have already precalculated totalHits for the whole index |
| 163 | + mergedTopDocs = new TopFieldDocs(totalHits, mergedTopDocs.scoreDocs, mergedTopDocs.fields); |
| 164 | + } |
| 165 | + result.topDocs(new TopDocsAndMaxScore(mergedTopDocs, Float.NaN), formats); |
| 166 | + } |
| 167 | + |
134 | 168 | @Override |
135 | 169 | protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException { |
136 | | - final Weight cancellableWeight; |
137 | | - if (checkCancelled != null) { |
138 | | - cancellableWeight = new Weight(weight.getQuery()) { |
| 170 | + for (LeafReaderContext ctx : leaves) { // search each subreader |
| 171 | + searchLeaf(ctx, weight, collector); |
| 172 | + } |
| 173 | + } |
| 174 | + |
| 175 | + /** |
| 176 | + * Lower-level search API. |
| 177 | + * |
| 178 | + * {@link LeafCollector#collect(int)} is called for every matching document in |
| 179 | + * the provided <code>ctx</code>. |
| 180 | + */ |
| 181 | + private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException { |
| 182 | + checkCancelled(); |
| 183 | + weight = wrapWeight(weight); |
| 184 | + final LeafCollector leafCollector; |
| 185 | + try { |
| 186 | + leafCollector = collector.getLeafCollector(ctx); |
| 187 | + } catch (CollectionTerminatedException e) { |
| 188 | + // there is no doc of interest in this reader context |
| 189 | + // continue with the following leaf |
| 190 | + return; |
| 191 | + } |
| 192 | + Bits liveDocs = ctx.reader().getLiveDocs(); |
| 193 | + BitSet liveDocsBitSet = getSparseBitSetOrNull(liveDocs); |
| 194 | + if (liveDocsBitSet == null) { |
| 195 | + BulkScorer bulkScorer = weight.bulkScorer(ctx); |
| 196 | + if (bulkScorer != null) { |
| 197 | + try { |
| 198 | + bulkScorer.score(leafCollector, liveDocs); |
| 199 | + } catch (CollectionTerminatedException e) { |
| 200 | + // collection was terminated prematurely |
| 201 | + // continue with the following leaf |
| 202 | + } |
| 203 | + } |
| 204 | + } else { |
| 205 | + // if the role query result set is sparse then we should use the SparseFixedBitSet for advancing: |
| 206 | + Scorer scorer = weight.scorer(ctx); |
| 207 | + if (scorer != null) { |
| 208 | + try { |
| 209 | + intersectScorerAndBitSet(scorer, liveDocsBitSet, leafCollector, |
| 210 | + checkCancelled == null ? () -> { } : checkCancelled); |
| 211 | + } catch (CollectionTerminatedException e) { |
| 212 | + // collection was terminated prematurely |
| 213 | + // continue with the following leaf |
| 214 | + } |
| 215 | + } |
| 216 | + } |
| 217 | + } |
139 | 218 |
|
| 219 | + private Weight wrapWeight(Weight weight) { |
| 220 | + if (checkCancelled != null) { |
| 221 | + return new Weight(weight.getQuery()) { |
140 | 222 | @Override |
141 | 223 | public void extractTerms(Set<Term> terms) { |
142 | 224 | throw new UnsupportedOperationException(); |
@@ -168,48 +250,10 @@ public BulkScorer bulkScorer(LeafReaderContext context) throws IOException { |
168 | 250 | } |
169 | 251 | }; |
170 | 252 | } else { |
171 | | - cancellableWeight = weight; |
| 253 | + return weight; |
172 | 254 | } |
173 | | - searchInternal(leaves, cancellableWeight, collector); |
174 | 255 | } |
175 | 256 |
|
176 | | - private void searchInternal(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException { |
177 | | - for (LeafReaderContext ctx : leaves) { // search each subreader |
178 | | - final LeafCollector leafCollector; |
179 | | - try { |
180 | | - leafCollector = collector.getLeafCollector(ctx); |
181 | | - } catch (CollectionTerminatedException e) { |
182 | | - // there is no doc of interest in this reader context |
183 | | - // continue with the following leaf |
184 | | - continue; |
185 | | - } |
186 | | - Bits liveDocs = ctx.reader().getLiveDocs(); |
187 | | - BitSet liveDocsBitSet = getSparseBitSetOrNull(liveDocs); |
188 | | - if (liveDocsBitSet == null) { |
189 | | - BulkScorer bulkScorer = weight.bulkScorer(ctx); |
190 | | - if (bulkScorer != null) { |
191 | | - try { |
192 | | - bulkScorer.score(leafCollector, liveDocs); |
193 | | - } catch (CollectionTerminatedException e) { |
194 | | - // collection was terminated prematurely |
195 | | - // continue with the following leaf |
196 | | - } |
197 | | - } |
198 | | - } else { |
199 | | - // if the role query result set is sparse then we should use the SparseFixedBitSet for advancing: |
200 | | - Scorer scorer = weight.scorer(ctx); |
201 | | - if (scorer != null) { |
202 | | - try { |
203 | | - intersectScorerAndBitSet(scorer, liveDocsBitSet, leafCollector, |
204 | | - checkCancelled == null ? () -> {} : checkCancelled); |
205 | | - } catch (CollectionTerminatedException e) { |
206 | | - // collection was terminated prematurely |
207 | | - // continue with the following leaf |
208 | | - } |
209 | | - } |
210 | | - } |
211 | | - } |
212 | | - } |
213 | 257 |
|
214 | 258 | private static BitSet getSparseBitSetOrNull(Bits liveDocs) { |
215 | 259 | if (liveDocs instanceof SparseFixedBitSet) { |
|
0 commit comments