3030import org .apache .lucene .search .Scorer ;
3131import org .apache .lucene .search .Weight ;
3232import org .apache .lucene .store .Directory ;
33+ import org .apache .lucene .store .FilterDirectory ;
3334import org .apache .lucene .store .IOContext ;
35+ import org .apache .lucene .store .IndexInput ;
3436import org .apache .lucene .store .IndexOutput ;
3537import org .apache .lucene .store .Lock ;
3638import org .apache .lucene .store .TrackingDirectoryWrapper ;
3739import org .apache .lucene .util .Bits ;
3840import org .apache .lucene .util .BytesRef ;
3941import org .apache .lucene .util .FixedBitSet ;
42+ import org .elasticsearch .common .Strings ;
4043import org .elasticsearch .common .lucene .Lucene ;
4144import org .elasticsearch .core .internal .io .IOUtils ;
4245
4346import java .io .ByteArrayOutputStream ;
47+ import java .io .FileNotFoundException ;
4448import java .io .IOException ;
4549import java .io .PrintStream ;
50+ import java .nio .file .NoSuchFileException ;
4651import java .util .ArrayList ;
4752import java .util .Arrays ;
53+ import java .util .Collection ;
4854import java .util .Collections ;
4955import java .util .HashMap ;
56+ import java .util .HashSet ;
5057import java .util .List ;
5158import java .util .Map ;
59+ import java .util .Set ;
60+ import java .util .concurrent .atomic .AtomicInteger ;
5261import java .util .function .Supplier ;
5362
5463import static org .apache .lucene .codecs .compressing .CompressingStoredFieldsWriter .FIELDS_EXTENSION ;
@@ -60,23 +69,23 @@ public class SourceOnlySnapshot {
6069
6170 private static final String FIELDS_INDEX_EXTENSION = INDEX_EXTENSION_PREFIX + FIELDS_INDEX_EXTENSION_SUFFIX ;
6271 private static final String FIELDS_META_EXTENSION = INDEX_EXTENSION_PREFIX + FIELDS_META_EXTENSION_SUFFIX ;
63- private final Directory targetDirectory ;
72+ private final LinkedFilesDirectory targetDirectory ;
6473 private final Supplier <Query > deleteByQuerySupplier ;
6574
66- public SourceOnlySnapshot (Directory targetDirectory , Supplier <Query > deleteByQuerySupplier ) {
75+ public SourceOnlySnapshot (LinkedFilesDirectory targetDirectory , Supplier <Query > deleteByQuerySupplier ) {
6776 this .targetDirectory = targetDirectory ;
6877 this .deleteByQuerySupplier = deleteByQuerySupplier ;
6978 }
7079
71- public SourceOnlySnapshot (Directory targetDirectory ) {
80+ public SourceOnlySnapshot (LinkedFilesDirectory targetDirectory ) {
7281 this (targetDirectory , null );
7382 }
7483
7584 public synchronized List <String > syncSnapshot (IndexCommit commit ) throws IOException {
7685 long generation ;
7786 Map <BytesRef , SegmentCommitInfo > existingSegments = new HashMap <>();
78- if (Lucene .indexExists (targetDirectory )) {
79- SegmentInfos existingsSegmentInfos = Lucene .readSegmentInfos (targetDirectory );
87+ if (Lucene .indexExists (targetDirectory . getWrapped () )) {
88+ SegmentInfos existingsSegmentInfos = Lucene .readSegmentInfos (targetDirectory . getWrapped () );
8089 for (SegmentCommitInfo info : existingsSegmentInfos ) {
8190 existingSegments .put (new BytesRef (info .info .getId ()), info );
8291 }
@@ -191,63 +200,78 @@ DirectoryReader wrapReader(DirectoryReader reader) throws IOException {
191200
192201 private SegmentCommitInfo syncSegment (SegmentCommitInfo segmentCommitInfo , LiveDocs liveDocs , FieldInfos fieldInfos ,
193202 Map <BytesRef , SegmentCommitInfo > existingSegments , List <String > createdFiles ) throws IOException {
194- SegmentInfo si = segmentCommitInfo .info ;
195- Codec codec = si .getCodec ();
196- final String segmentSuffix = "" ;
197- SegmentCommitInfo newInfo ;
198- final TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper (targetDirectory );
199- BytesRef segmentId = new BytesRef (si .getId ());
200- boolean exists = existingSegments .containsKey (segmentId );
201- if (exists == false ) {
202- SegmentInfo newSegmentInfo = new SegmentInfo (si .dir , si .getVersion (), si .getMinVersion (), si .name , si .maxDoc (), false ,
203- si .getCodec (), si .getDiagnostics (), si .getId (), si .getAttributes (), null );
204- // we drop the sort on purpose since the field we sorted on doesn't exist in the target index anymore.
205- newInfo = new SegmentCommitInfo (newSegmentInfo , 0 , 0 , -1 , -1 , -1 );
206- List <FieldInfo > fieldInfoCopy = new ArrayList <>(fieldInfos .size ());
207- for (FieldInfo fieldInfo : fieldInfos ) {
208- fieldInfoCopy .add (new FieldInfo (fieldInfo .name , fieldInfo .number ,
209- false , false , false , IndexOptions .NONE , DocValuesType .NONE , -1 , fieldInfo .attributes (), 0 , 0 , 0 ,
210- fieldInfo .isSoftDeletesField ()));
211- }
212- FieldInfos newFieldInfos = new FieldInfos (fieldInfoCopy .toArray (new FieldInfo [0 ]));
213- codec .fieldInfosFormat ().write (trackingDir , newSegmentInfo , segmentSuffix , newFieldInfos , IOContext .DEFAULT );
214- newInfo .setFieldInfosFiles (trackingDir .getCreatedFiles ());
215- String idxFile = IndexFileNames .segmentFileName (newSegmentInfo .name , segmentSuffix , FIELDS_INDEX_EXTENSION );
216- String dataFile = IndexFileNames .segmentFileName (newSegmentInfo .name , segmentSuffix , FIELDS_EXTENSION );
217- String metaFile = IndexFileNames .segmentFileName (newSegmentInfo .name , segmentSuffix , FIELDS_META_EXTENSION );
218- Directory sourceDir = newSegmentInfo .dir ;
203+ Directory toClose = null ;
204+ try {
205+ SegmentInfo si = segmentCommitInfo .info ;
206+ Codec codec = si .getCodec ();
207+ Directory sourceDir = si .dir ;
219208 if (si .getUseCompoundFile ()) {
220- sourceDir = codec .compoundFormat ().getCompoundReader (sourceDir , si , IOContext .DEFAULT );
209+ sourceDir = new LinkedFilesDirectory .CloseMePleaseWrapper (
210+ codec .compoundFormat ().getCompoundReader (sourceDir , si , IOContext .DEFAULT ));
211+ toClose = sourceDir ;
212+ }
213+ final String segmentSuffix = "" ;
214+ SegmentCommitInfo newInfo ;
215+ final TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper (targetDirectory );
216+ BytesRef segmentId = new BytesRef (si .getId ());
217+ boolean exists = existingSegments .containsKey (segmentId );
218+ if (exists == false ) {
219+ SegmentInfo newSegmentInfo = new SegmentInfo (targetDirectory , si .getVersion (), si .getMinVersion (), si .name , si .maxDoc (),
220+ false , si .getCodec (), si .getDiagnostics (), si .getId (), si .getAttributes (), null );
221+ // we drop the sort on purpose since the field we sorted on doesn't exist in the target index anymore.
222+ newInfo = new SegmentCommitInfo (newSegmentInfo , 0 , 0 , -1 , -1 , -1 );
223+ List <FieldInfo > fieldInfoCopy = new ArrayList <>(fieldInfos .size ());
224+ for (FieldInfo fieldInfo : fieldInfos ) {
225+ fieldInfoCopy .add (new FieldInfo (fieldInfo .name , fieldInfo .number ,
226+ false , false , false , IndexOptions .NONE , DocValuesType .NONE , -1 , fieldInfo .attributes (), 0 , 0 , 0 ,
227+ fieldInfo .isSoftDeletesField ()));
228+ }
229+ FieldInfos newFieldInfos = new FieldInfos (fieldInfoCopy .toArray (new FieldInfo [0 ]));
230+ codec .fieldInfosFormat ().write (trackingDir , newSegmentInfo , segmentSuffix , newFieldInfos , IOContext .DEFAULT );
231+ newInfo .setFieldInfosFiles (trackingDir .getCreatedFiles ());
232+ } else {
233+ newInfo = existingSegments .get (segmentId );
234+ assert newInfo .info .getUseCompoundFile () == false ;
221235 }
236+
237+ // link files for stored fields to target directory
238+ final String idxFile = IndexFileNames .segmentFileName (newInfo .info .name , segmentSuffix , FIELDS_INDEX_EXTENSION );
239+ final String dataFile = IndexFileNames .segmentFileName (newInfo .info .name , segmentSuffix , FIELDS_EXTENSION );
240+ final String metaFile = IndexFileNames .segmentFileName (newInfo .info .name , segmentSuffix , FIELDS_META_EXTENSION );
222241 trackingDir .copyFrom (sourceDir , idxFile , idxFile , IOContext .DEFAULT );
242+ assert targetDirectory .linkedFiles .containsKey (idxFile );
243+ assert trackingDir .getCreatedFiles ().contains (idxFile );
223244 trackingDir .copyFrom (sourceDir , dataFile , dataFile , IOContext .DEFAULT );
224- if (Arrays .asList (sourceDir .listAll ()).contains (metaFile )) { // only exists for Lucene 8.5+ indices
225- trackingDir .copyFrom (sourceDir , metaFile , metaFile , IOContext .DEFAULT );
226- }
227- if (sourceDir != newSegmentInfo .dir ) {
228- sourceDir .close ();
245+ assert targetDirectory .linkedFiles .containsKey (dataFile );
246+ assert trackingDir .getCreatedFiles ().contains (dataFile );
247+ if (Arrays .asList (sourceDir .listAll ()).contains (metaFile )) { // only exists for Lucene 8.5+ indices
248+ trackingDir .copyFrom (sourceDir , metaFile , metaFile , IOContext .DEFAULT );
249+ assert targetDirectory .linkedFiles .containsKey (metaFile );
250+ assert trackingDir .getCreatedFiles ().contains (metaFile );
229251 }
230- } else {
231- newInfo = existingSegments .get (segmentId );
232- assert newInfo .info .getUseCompoundFile () == false ;
233- }
234- if (liveDocs .bits != null && liveDocs .numDeletes != 0 && liveDocs .numDeletes != newInfo .getDelCount ()) {
235- if (newInfo .getDelCount () != 0 ) {
236- assert assertLiveDocs (liveDocs .bits , liveDocs .numDeletes );
252+
253+ if (liveDocs .bits != null && liveDocs .numDeletes != 0 && liveDocs .numDeletes != newInfo .getDelCount ()) {
254+ assert newInfo .getDelCount () == 0 || assertLiveDocs (liveDocs .bits , liveDocs .numDeletes );
255+ codec .liveDocsFormat ().writeLiveDocs (liveDocs .bits , trackingDir , newInfo , liveDocs .numDeletes - newInfo .getDelCount (),
256+ IOContext .DEFAULT );
257+ SegmentCommitInfo info = new SegmentCommitInfo (newInfo .info , liveDocs .numDeletes , 0 , newInfo .getNextDelGen (), -1 , -1 );
258+ info .setFieldInfosFiles (newInfo .getFieldInfosFiles ());
259+ info .info .setFiles (trackingDir .getCreatedFiles ());
260+ newInfo = info ;
237261 }
238- codec .liveDocsFormat ().writeLiveDocs (liveDocs .bits , trackingDir , newInfo , liveDocs .numDeletes - newInfo .getDelCount (),
239- IOContext .DEFAULT );
240- SegmentCommitInfo info = new SegmentCommitInfo (newInfo .info , liveDocs .numDeletes , 0 , newInfo .getNextDelGen (), -1 , -1 );
241- info .setFieldInfosFiles (newInfo .getFieldInfosFiles ());
242- info .info .setFiles (trackingDir .getCreatedFiles ());
243- newInfo = info ;
244- }
245- if (exists == false ) {
246- newInfo .info .setFiles (trackingDir .getCreatedFiles ());
247- codec .segmentInfoFormat ().write (trackingDir , newInfo .info , IOContext .DEFAULT );
262+ if (exists == false ) {
263+ newInfo .info .setFiles (trackingDir .getCreatedFiles ());
264+ codec .segmentInfoFormat ().write (trackingDir , newInfo .info , IOContext .DEFAULT );
265+ }
266+ final Set <String > createdFilesForThisSegment = trackingDir .getCreatedFiles ();
267+ createdFilesForThisSegment .remove (idxFile );
268+ createdFilesForThisSegment .remove (dataFile );
269+ createdFilesForThisSegment .remove (metaFile );
270+ createdFiles .addAll (createdFilesForThisSegment );
271+ return newInfo ;
272+ } finally {
273+ IOUtils .close (toClose );
248274 }
249- createdFiles .addAll (trackingDir .getCreatedFiles ());
250- return newInfo ;
251275 }
252276
253277 private boolean assertLiveDocs (Bits liveDocs , int deletes ) {
@@ -270,4 +294,165 @@ private static class LiveDocs {
270294 this .bits = bits ;
271295 }
272296 }
297+
298+ public static class LinkedFilesDirectory extends Directory {
299+
300+ private final Directory wrapped ;
301+ private final Map <String , Directory > linkedFiles = new HashMap <>();
302+
303+ public LinkedFilesDirectory (Directory wrapped ) {
304+ this .wrapped = wrapped ;
305+ }
306+
307+ public Directory getWrapped () {
308+ return wrapped ;
309+ }
310+
311+ @ Override
312+ public String [] listAll () throws IOException {
313+ Set <String > files = new HashSet <>();
314+ Collections .addAll (files , wrapped .listAll ());
315+ files .addAll (linkedFiles .keySet ());
316+ String [] result = files .toArray (Strings .EMPTY_ARRAY );
317+ Arrays .sort (result );
318+ return result ;
319+ }
320+
321+ @ Override
322+ public void deleteFile (String name ) throws IOException {
323+ final Directory directory = linkedFiles .remove (name );
324+ if (directory == null ) {
325+ wrapped .deleteFile (name );
326+ } else {
327+ try {
328+ wrapped .deleteFile (name );
329+ } catch (NoSuchFileException | FileNotFoundException e ) {
330+ // ignore
331+ } finally {
332+ directory .close ();
333+ }
334+ }
335+ }
336+
337+ @ Override
338+ public long fileLength (String name ) throws IOException {
339+ final Directory linkedDir = linkedFiles .get (name );
340+ if (linkedDir != null ) {
341+ return linkedDir .fileLength (name );
342+ } else {
343+ return wrapped .fileLength (name );
344+ }
345+ }
346+
347+ @ Override
348+ public IndexOutput createOutput (String name , IOContext context ) throws IOException {
349+ if (linkedFiles .containsKey (name )) {
350+ throw new IllegalArgumentException ("file cannot be created as linked file with name " + name + " already exists" );
351+ } else {
352+ return wrapped .createOutput (name , context );
353+ }
354+ }
355+
356+ @ Override
357+ public IndexOutput createTempOutput (String prefix , String suffix , IOContext context ) throws IOException {
358+ return wrapped .createTempOutput (prefix , suffix , context );
359+ }
360+
361+ @ Override
362+ public void sync (Collection <String > names ) throws IOException {
363+ final List <String > primaryNames = new ArrayList <>();
364+
365+ for (String name : names ) {
366+ if (linkedFiles .containsKey (name ) == false ) {
367+ primaryNames .add (name );
368+ }
369+ }
370+
371+ if (primaryNames .isEmpty () == false ) {
372+ wrapped .sync (primaryNames );
373+ }
374+ }
375+
376+ @ Override
377+ public void syncMetaData () throws IOException {
378+ wrapped .syncMetaData ();
379+ }
380+
381+ @ Override
382+ public void rename (String source , String dest ) throws IOException {
383+ if (linkedFiles .containsKey (source ) || linkedFiles .containsKey (dest )) {
384+ throw new IllegalArgumentException ("file cannot be renamed as linked file with name " + source + " or " + dest +
385+ " already exists" );
386+ } else {
387+ wrapped .rename (source , dest );
388+ }
389+ }
390+
391+ @ Override
392+ public IndexInput openInput (String name , IOContext context ) throws IOException {
393+ final Directory linkedDir = linkedFiles .get (name );
394+ if (linkedDir != null ) {
395+ return linkedDir .openInput (name , context );
396+ } else {
397+ return wrapped .openInput (name , context );
398+ }
399+ }
400+
401+ @ Override
402+ public Lock obtainLock (String name ) throws IOException {
403+ return wrapped .obtainLock (name );
404+ }
405+
406+ @ Override
407+ public void close () throws IOException {
408+ IOUtils .close (() -> IOUtils .close (linkedFiles .values ()), linkedFiles ::clear , wrapped );
409+ }
410+
411+ @ Override
412+ public void copyFrom (Directory from , String src , String dest , IOContext context ) throws IOException {
413+ if (src .equals (dest ) == false ) {
414+ throw new IllegalArgumentException ();
415+ } else {
416+ final Directory previous ;
417+ if (from instanceof CloseMePleaseWrapper ) {
418+ ((CloseMePleaseWrapper ) from ).incRef ();
419+ previous = linkedFiles .put (src , from );
420+ } else {
421+ previous = linkedFiles .put (src , new FilterDirectory (from ) {
422+ @ Override
423+ public void close () {
424+ // ignore
425+ }
426+ });
427+ }
428+ IOUtils .close (previous );
429+ }
430+ }
431+
432+ static class CloseMePleaseWrapper extends FilterDirectory {
433+
434+ private final AtomicInteger refCount = new AtomicInteger (1 );
435+
436+ CloseMePleaseWrapper (Directory in ) {
437+ super (in );
438+ }
439+
440+ public void incRef () {
441+ int ref = refCount .incrementAndGet ();
442+ assert ref > 1 ;
443+ }
444+
445+ @ Override
446+ public void close () throws IOException {
447+ if (refCount .decrementAndGet () == 0 ) {
448+ in .close ();
449+ }
450+ }
451+ }
452+
453+ @ Override
454+ public Set <String > getPendingDeletions () throws IOException {
455+ return wrapped .getPendingDeletions ();
456+ }
457+ }
273458}
0 commit comments