1616// under the License.
1717
1818use std:: collections:: HashMap ;
19- use std:: future:: Future ;
20- use std:: ops:: Deref ;
21- use std:: pin:: Pin ;
22- use std:: sync:: { Arc , RwLock } ;
23- use std:: task:: { Context , Poll } ;
19+ use std:: sync:: { Arc , OnceLock } ;
2420
2521use futures:: channel:: mpsc:: { channel, Sender } ;
2622use futures:: StreamExt ;
23+ use tokio:: sync:: Notify ;
2724
2825use crate :: runtime:: spawn;
2926use crate :: scan:: { DeleteFileContext , FileScanTask } ;
3027use crate :: spec:: { DataContentType , DataFile , Struct } ;
31- use crate :: { Error , ErrorKind , Result } ;
28+ use crate :: Result ;
3229
3330/// Index of delete files
3431#[ derive( Clone , Debug ) ]
3532pub ( crate ) struct DeleteFileIndex {
36- state : Arc < RwLock < DeleteFileIndexState > > ,
37- }
38-
39- #[ derive( Debug ) ]
40- enum DeleteFileIndexState {
41- Populating ,
42- Populated ( PopulatedDeleteFileIndex ) ,
33+ index : Arc < OnceLock < PopulatedDeleteFileIndex > > ,
34+ ready_notify : Arc < Notify > ,
4335}
4436
4537#[ derive( Debug ) ]
@@ -59,36 +51,50 @@ impl DeleteFileIndex {
5951 pub ( crate ) fn new ( ) -> ( DeleteFileIndex , Sender < DeleteFileContext > ) {
6052 // TODO: what should the channel limit be?
6153 let ( tx, rx) = channel ( 10 ) ;
62- let state = Arc :: new ( RwLock :: new ( DeleteFileIndexState :: Populating ) ) ;
54+ let index = Arc :: new ( OnceLock :: new ( ) ) ;
55+ let ready_notify = Arc :: new ( Notify :: new ( ) ) ;
6356 let delete_file_stream = rx. boxed ( ) ;
6457
6558 spawn ( {
66- let state = state. clone ( ) ;
59+ let index = index. clone ( ) ;
60+ let ready_notify = ready_notify. clone ( ) ;
6761 async move {
6862 let delete_files = delete_file_stream. collect :: < Vec < _ > > ( ) . await ;
6963
7064 let populated_delete_file_index = PopulatedDeleteFileIndex :: new ( delete_files) ;
7165
72- let mut guard = state. write ( ) . unwrap ( ) ;
73- * guard = DeleteFileIndexState :: Populated ( populated_delete_file_index) ;
66+ index
67+ . set ( populated_delete_file_index)
68+ . expect ( "delete file index should not be written by another thread" ) ;
69+ ready_notify. notify_waiters ( ) ;
7470 }
7571 } ) ;
7672
77- ( DeleteFileIndex { state } , tx)
73+ (
74+ DeleteFileIndex {
75+ index,
76+ ready_notify,
77+ } ,
78+ tx,
79+ )
7880 }
7981
8082 /// Gets all the delete files that apply to the specified data file.
81- ///
82- /// Returns a future that resolves to a Result<Vec<FileScanTaskDeleteFile>>
83- pub ( crate ) fn get_deletes_for_data_file < ' a > (
83+ pub ( crate ) async fn get_deletes_for_data_file (
8484 & self ,
85- data_file : & ' a DataFile ,
85+ data_file : & DataFile ,
8686 seq_num : Option < i64 > ,
87- ) -> DeletesForDataFile < ' a > {
88- DeletesForDataFile {
89- state : self . state . clone ( ) ,
90- data_file,
91- seq_num,
87+ ) -> Result < Vec < FileScanTask > > {
88+ match self . index . get ( ) {
89+ Some ( idx) => Ok ( idx. get_deletes_for_data_file ( data_file, seq_num) ) ,
90+ None => {
91+ self . ready_notify . notified ( ) . await ;
92+ Ok ( self
93+ . index
94+ . get ( )
95+ . unwrap ( )
96+ . get_deletes_for_data_file ( data_file, seq_num) )
97+ }
9298 }
9399 }
94100}
@@ -195,26 +201,3 @@ impl PopulatedDeleteFileIndex {
195201 results
196202 }
197203}
198-
199- /// Future for the `DeleteFileIndex::get_deletes_for_data_file` method
200- pub ( crate ) struct DeletesForDataFile < ' a > {
201- state : Arc < RwLock < DeleteFileIndexState > > ,
202- data_file : & ' a DataFile ,
203- seq_num : Option < i64 > ,
204- }
205-
206- impl Future for DeletesForDataFile < ' _ > {
207- type Output = Result < Vec < FileScanTask > > ;
208-
209- fn poll ( self : Pin < & mut Self > , _cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
210- match self . state . try_read ( ) {
211- Ok ( guard) => match guard. deref ( ) {
212- DeleteFileIndexState :: Populated ( idx) => Poll :: Ready ( Ok (
213- idx. get_deletes_for_data_file ( self . data_file , self . seq_num )
214- ) ) ,
215- _ => Poll :: Pending ,
216- } ,
217- Err ( err) => Poll :: Ready ( Err ( Error :: new ( ErrorKind :: Unexpected , err. to_string ( ) ) ) ) ,
218- }
219- }
220- }
0 commit comments