1717 *
1818 */
1919use actix_web:: rt:: spawn;
20- use datafusion:: arrow;
2120use datafusion:: arrow:: datatypes:: Schema ;
2221use datafusion:: arrow:: error:: ArrowError ;
2322use datafusion:: arrow:: ipc:: writer:: StreamWriter ;
@@ -36,11 +35,10 @@ use std::sync::RwLock;
3635
3736use crate :: metadata;
3837use crate :: metadata:: LOCK_EXPECT ;
39- use crate :: option:: CONFIG ;
4038use crate :: s3;
41- use crate :: storage:: ObjectStorage ;
39+ use crate :: storage:: { ObjectStorage , StorageDir } ;
4240
43- use self :: error:: EventError ;
41+ use self :: error:: { EventError , StreamWriterError } ;
4442
4543type LocalWriter = Mutex < Option < StreamWriter < std:: fs:: File > > > ;
4644type LocalWriterGuard < ' a > = MutexGuard < ' a , Option < StreamWriter < std:: fs:: File > > > ;
@@ -91,18 +89,7 @@ impl STREAM_WRITERS {
9189 . write ( )
9290 . map_err ( |_| StreamWriterError :: RwPoisoned ) ?;
9391
94- let file = OpenOptions :: new ( )
95- . append ( true )
96- . create_new ( true )
97- . open ( data_file_path ( & stream) )
98- . map_err ( StreamWriterError :: Io ) ?;
99-
100- let mut stream_writer = StreamWriter :: try_new ( file, & record. schema ( ) )
101- . expect ( "File and RecordBatch both are checked" ) ;
102-
103- stream_writer
104- . write ( record)
105- . map_err ( StreamWriterError :: Writer ) ?;
92+ let stream_writer = init_new_stream_writer_file ( & stream, record) ?;
10693
10794 hashmap_guard. insert ( stream, Mutex :: new ( Some ( stream_writer) ) ) ;
10895
@@ -125,63 +112,51 @@ impl STREAM_WRITERS {
125112 stream : & str ,
126113 record : & RecordBatch ,
127114 ) -> Result < ( ) , StreamWriterError > {
128- let file = OpenOptions :: new ( )
129- . append ( true )
130- . create_new ( true )
131- . open ( data_file_path ( stream) )
132- . map_err ( StreamWriterError :: Io ) ?;
133-
134- let mut stream_writer = StreamWriter :: try_new ( file, & record. schema ( ) )
135- . expect ( "File and RecordBatch both are checked" ) ;
136-
137- stream_writer
138- . write ( record)
139- . map_err ( StreamWriterError :: Writer ) ?;
115+ let stream_writer = init_new_stream_writer_file ( stream, record) ?;
140116
141117 writer_guard. replace ( stream_writer) ; // replace the stream writer behind this mutex
142118
143119 Ok ( ( ) )
144120 }
145121
146- // Unset the entry so that
147- pub fn unset_entry ( stream : & str ) -> Result < ( ) , StreamWriterError > {
148- let guard = STREAM_WRITERS
122+ pub fn unset_all ( ) -> Result < ( ) , StreamWriterError > {
123+ let map = STREAM_WRITERS
149124 . read ( )
150125 . map_err ( |_| StreamWriterError :: RwPoisoned ) ?;
151- let stream_writer = match guard. get ( stream) {
152- Some ( writer) => writer,
153- None => return Ok ( ( ) ) ,
154- } ;
155- stream_writer
156- . lock ( )
157- . map_err ( |_| StreamWriterError :: MutexPoisoned ) ?
158- . take ( ) ;
126+
127+ for writer in map. values ( ) {
128+ if let Some ( mut streamwriter) = writer
129+ . lock ( )
130+ . map_err ( |_| StreamWriterError :: MutexPoisoned ) ?
131+ . take ( )
132+ {
133+ let _ = streamwriter. finish ( ) ;
134+ }
135+ }
159136
160137 Ok ( ( ) )
161138 }
162139}
163140
164- #[ derive( Debug , thiserror:: Error ) ]
165- pub enum StreamWriterError {
166- #[ error( "Arrow writer failed: {0}" ) ]
167- Writer ( arrow:: error:: ArrowError ) ,
168- #[ error( "Io Error when creating new file: {0}" ) ]
169- Io ( std:: io:: Error ) ,
170- #[ error( "RwLock was poisoned" ) ]
171- RwPoisoned ,
172- #[ error( "Mutex was poisoned" ) ]
173- MutexPoisoned ,
174- }
141+ fn init_new_stream_writer_file (
142+ stream_name : & str ,
143+ record : & RecordBatch ,
144+ ) -> Result < StreamWriter < std:: fs:: File > , StreamWriterError > {
145+ let dir = StorageDir :: new ( stream_name) ;
146+ let path = dir. path_by_current_time ( ) ;
147+
148+ std:: fs:: create_dir_all ( dir. data_path ) ?;
149+
150+ let file = OpenOptions :: new ( ) . create ( true ) . append ( true ) . open ( path) ?;
151+
152+ let mut stream_writer = StreamWriter :: try_new ( file, & record. schema ( ) )
153+ . expect ( "File and RecordBatch both are checked" ) ;
154+
155+ stream_writer
156+ . write ( record)
157+ . map_err ( StreamWriterError :: Writer ) ?;
175158
176- fn data_file_path ( stream_name : & str ) -> String {
177- format ! (
178- "{}/{}" ,
179- CONFIG
180- . parseable
181- . local_stream_data_path( stream_name)
182- . to_string_lossy( ) ,
183- "data.records"
184- )
159+ Ok ( stream_writer)
185160}
186161
187162#[ derive( Clone ) ]
@@ -309,7 +284,7 @@ impl Event {
309284 infer_json_schema ( & mut buf_reader, None )
310285 }
311286
312- fn get_reader ( & self , arrow_schema : arrow :: datatypes :: Schema ) -> json:: Reader < & [ u8 ] > {
287+ fn get_reader ( & self , arrow_schema : Schema ) -> json:: Reader < & [ u8 ] > {
313288 json:: Reader :: new (
314289 self . body . as_bytes ( ) ,
315290 Arc :: new ( arrow_schema) ,
@@ -348,8 +323,6 @@ pub mod error {
348323 use crate :: storage:: ObjectStorageError ;
349324 use datafusion:: arrow:: error:: ArrowError ;
350325
351- use super :: StreamWriterError ;
352-
353326 #[ derive( Debug , thiserror:: Error ) ]
354327 pub enum EventError {
355328 #[ error( "Missing Record from event body" ) ]
@@ -365,4 +338,16 @@ pub mod error {
365338 #[ error( "Schema Mismatch: {0}" ) ]
366339 ObjectStorage ( #[ from] ObjectStorageError ) ,
367340 }
341+
342+ #[ derive( Debug , thiserror:: Error ) ]
343+ pub enum StreamWriterError {
344+ #[ error( "Arrow writer failed: {0}" ) ]
345+ Writer ( #[ from] ArrowError ) ,
346+ #[ error( "Io Error when creating new file: {0}" ) ]
347+ Io ( #[ from] std:: io:: Error ) ,
348+ #[ error( "RwLock was poisoned" ) ]
349+ RwPoisoned ,
350+ #[ error( "Mutex was poisoned" ) ]
351+ MutexPoisoned ,
352+ }
368353}
0 commit comments