1818 */
1919use datafusion:: arrow;
2020use datafusion:: arrow:: datatypes:: Schema ;
21+ use datafusion:: arrow:: ipc:: writer:: StreamWriter ;
2122use datafusion:: arrow:: json;
2223use datafusion:: arrow:: json:: reader:: infer_json_schema;
2324use datafusion:: arrow:: record_batch:: RecordBatch ;
24- use datafusion:: parquet:: arrow:: { ArrowReader , ArrowWriter , ParquetFileArrowReader } ;
25- use datafusion:: parquet:: file:: properties:: WriterProperties ;
26- use datafusion:: parquet:: file:: serialized_reader:: SerializedFileReader ;
25+ use lazy_static:: lazy_static;
2726use log:: error;
28- use std:: fs;
27+ use std:: collections:: HashMap ;
28+ use std:: fs:: OpenOptions ;
2929use std:: io:: BufReader ;
3030use std:: sync:: Arc ;
31+ use std:: sync:: Mutex ;
32+ use std:: sync:: RwLock ;
3133
3234use crate :: metadata;
35+ use crate :: metadata:: STREAM_INFO ;
3336use crate :: option:: CONFIG ;
3437use crate :: response;
3538use crate :: storage:: ObjectStorage ;
3639use crate :: Error ;
3740
41+ type LocalWriter = Mutex < Option < StreamWriter < std:: fs:: File > > > ;
42+
43+ lazy_static ! {
44+ #[ derive( Default ) ]
45+ pub static ref STREAM_WRITERS : RwLock <HashMap <String , LocalWriter >> = RwLock :: new( HashMap :: new( ) ) ;
46+ }
47+
48+ impl STREAM_WRITERS {
49+ // append to a existing stream
50+ fn append_to_local ( stream : & str , record : & RecordBatch ) -> Result < ( ) , ( ) > {
51+ let hashmap_guard = STREAM_WRITERS . read ( ) . unwrap ( ) ;
52+ match hashmap_guard. get ( stream) {
53+ Some ( localwriter) => {
54+ let mut writer_guard = localwriter. lock ( ) . unwrap ( ) ;
55+ if let Some ( ref mut writer) = * writer_guard {
56+ writer. write ( record) . map_err ( |_| ( ) ) ?;
57+ } else {
58+ drop ( writer_guard) ;
59+ drop ( hashmap_guard) ;
60+ STREAM_WRITERS :: set_entry ( stream, record) . unwrap ( ) ;
61+ }
62+ }
63+ None => {
64+ drop ( hashmap_guard) ;
65+ STREAM_WRITERS :: create_entry ( stream. to_string ( ) , record) . unwrap ( ) ;
66+ }
67+ } ;
68+ Ok ( ( ) )
69+ }
70+
71+ // create a new entry with new stream_writer
72+ // todo: error type
73+ // Only create entry for valid streams
74+ fn create_entry ( stream : String , record : & RecordBatch ) -> Result < ( ) , ( ) > {
75+ let mut hashmap_guard = STREAM_WRITERS . write ( ) . unwrap ( ) ;
76+
77+ if STREAM_INFO . schema ( & stream) . is_err ( ) {
78+ return Err ( ( ) ) ;
79+ }
80+
81+ let file = OpenOptions :: new ( )
82+ . append ( true )
83+ . create_new ( true )
84+ . open ( data_file_path ( & stream) )
85+ . map_err ( |_| ( ) ) ?;
86+
87+ let mut stream_writer = StreamWriter :: try_new ( file, & record. schema ( ) ) . map_err ( |_| ( ) ) ?;
88+ stream_writer. write ( record) . map_err ( |_| ( ) ) ?;
89+
90+ hashmap_guard. insert ( stream, Mutex :: new ( Some ( stream_writer) ) ) ;
91+
92+ Ok ( ( ) )
93+ }
94+
95+ // Deleting a logstream requires that metadata is deleted first
96+ pub fn delete_entry ( stream : & str ) -> Result < ( ) , ( ) > {
97+ let mut hashmap_guard = STREAM_WRITERS . write ( ) . unwrap ( ) ;
98+
99+ if STREAM_INFO . schema ( stream) . is_ok ( ) {
100+ return Err ( ( ) ) ;
101+ }
102+
103+ hashmap_guard. remove ( stream) ;
104+
105+ Ok ( ( ) )
106+ }
107+
108+ fn set_entry ( stream : & str , record : & RecordBatch ) -> Result < ( ) , ( ) > {
109+ let file = OpenOptions :: new ( )
110+ . append ( true )
111+ . create_new ( true )
112+ . open ( data_file_path ( stream) )
113+ . map_err ( |_| ( ) ) ?;
114+
115+ let mut stream_writer = StreamWriter :: try_new ( file, & record. schema ( ) ) . map_err ( |_| ( ) ) ?;
116+ stream_writer. write ( record) . map_err ( |_| ( ) ) ?;
117+
118+ STREAM_WRITERS
119+ . read ( )
120+ . expect ( "Current Thread should not hold any lock" )
121+ . get ( stream)
122+ . expect ( "set entry is only called on valid entries" )
123+ . lock ( )
124+ . expect ( "Poisioning is not handled yet" )
125+ . replace ( stream_writer) ; // replace the stream writer behind this mutex
126+
127+ Ok ( ( ) )
128+ }
129+
130+ // Unset the entry so that
131+ pub fn unset_entry ( stream : & str ) {
132+ let guard = STREAM_WRITERS . read ( ) . unwrap ( ) ;
133+ let stream_writer = match guard. get ( stream) {
134+ Some ( writer) => writer,
135+ None => return ,
136+ } ;
137+ stream_writer
138+ . lock ( )
139+ . expect ( "Poisioning is not handled yet" )
140+ . take ( ) ;
141+ }
142+ }
143+
144+ #[ derive( Debug , thiserror:: Error ) ]
145+ enum StreamWriterError { }
146+
147+ fn data_file_path ( stream_name : & str ) -> String {
148+ format ! (
149+ "{}/{}" ,
150+ CONFIG
151+ . parseable
152+ . local_stream_data_path( stream_name)
153+ . to_string_lossy( ) ,
154+ "data.records"
155+ )
156+ }
157+
38158#[ derive( Clone ) ]
39159pub struct Event {
40160 pub body : String ,
@@ -44,17 +164,6 @@ pub struct Event {
44164// Events holds the schema related to a each event for a single log stream
45165
46166impl Event {
47- fn data_file_path ( & self ) -> String {
48- format ! (
49- "{}/{}" ,
50- CONFIG
51- . parseable
52- . local_stream_data_path( & self . stream_name)
53- . to_string_lossy( ) ,
54- "data.parquet"
55- )
56- }
57-
58167 pub async fn process (
59168 & self ,
60169 storage : & impl ObjectStorage ,
@@ -65,12 +174,11 @@ impl Event {
65174 } ) ?;
66175
67176 let event = self . get_reader ( inferred_schema. clone ( ) ) ;
68- let size = self . body_size ( ) ;
69177
70178 let stream_schema = metadata:: STREAM_INFO . schema ( & self . stream_name ) ?;
71179 let is_first_event = stream_schema. is_none ( ) ;
72180
73- let compressed_size = if let Some ( existing_schema) = stream_schema {
181+ if let Some ( existing_schema) = stream_schema {
74182 // validate schema before processing the event
75183 if existing_schema != inferred_schema {
76184 return Err ( Error :: SchemaMismatch ( self . stream_name . clone ( ) ) ) ;
@@ -84,11 +192,6 @@ impl Event {
84192 . await ?
85193 } ;
86194
87- if let Err ( e) = metadata:: STREAM_INFO . update_stats ( & self . stream_name , size, compressed_size)
88- {
89- error ! ( "Couldn't update stream stats. {:?}" , e) ;
90- }
91-
92195 if let Err ( e) = metadata:: STREAM_INFO . check_alerts ( self ) . await {
93196 error ! ( "Error checking for alerts. {:?}" , e) ;
94197 }
@@ -115,59 +218,44 @@ impl Event {
115218 storage : & impl ObjectStorage ,
116219 ) -> Result < u64 , Error > {
117220 let rb = event. next ( ) ?. ok_or ( Error :: MissingRecord ) ?;
221+ let stream_name = & self . stream_name ;
118222
119- // Store record batch to Parquet file on local cache
120- let compressed_size = self . convert_arrow_parquet ( rb) ? ;
223+ // Store record batch on local cache
224+ STREAM_WRITERS :: create_entry ( stream_name . clone ( ) , & rb) . unwrap ( ) ;
121225
122226 // Put the inferred schema to object store
123- let stream_name = & self . stream_name ;
124-
125227 storage
126228 . put_schema ( stream_name. clone ( ) , & schema)
127229 . await
128230 . map_err ( |e| response:: EventError {
129231 msg : format ! (
130232 "Failed to upload schema for log stream {} due to err: {}" ,
131- self . stream_name, e
233+ stream_name, e
132234 ) ,
133235 } ) ?;
134236
135237 // set the schema in memory for this stream
136238 metadata:: STREAM_INFO
137- . set_schema ( & self . stream_name , schema)
239+ . set_schema ( stream_name, schema)
138240 . map_err ( |e| response:: EventError {
139241 msg : format ! (
140242 "Failed to set schema for log stream {} due to err: {}" ,
141- & self . stream_name, e
243+ stream_name, e
142244 ) ,
143245 } ) ?;
144246
145- Ok ( compressed_size )
247+ Ok ( 0 )
146248 }
147249
148250 // event process all events after the 1st event. Concatenates record batches
149251 // and puts them in memory store for each event.
150252 fn process_event < R : std:: io:: Read > ( & self , mut event : json:: Reader < R > ) -> Result < u64 , Error > {
151- let next_event_rb = event. next ( ) ?. ok_or ( Error :: MissingRecord ) ?;
152-
153- let compressed_size = match self . convert_parquet_rb_reader ( ) {
154- Ok ( mut arrow_reader) => {
155- let mut total_size = 0 ;
156- let rb = arrow_reader. get_record_reader ( 2048 ) . unwrap ( ) ;
157- for prev_rb in rb {
158- let new_rb = RecordBatch :: concat (
159- & std:: sync:: Arc :: new ( arrow_reader. get_schema ( ) . unwrap ( ) ) ,
160- & [ next_event_rb. clone ( ) , prev_rb. unwrap ( ) ] ,
161- ) ?;
162- total_size += self . convert_arrow_parquet ( new_rb) ?;
163- }
253+ let rb = event. next ( ) ?. ok_or ( Error :: MissingRecord ) ?;
254+ let stream_name = & self . stream_name ;
164255
165- total_size
166- }
167- Err ( _) => self . convert_arrow_parquet ( next_event_rb) ?,
168- } ;
256+ STREAM_WRITERS :: append_to_local ( stream_name, & rb) . map_err ( |_| Error :: MissingRecord ) ?;
169257
170- Ok ( compressed_size )
258+ Ok ( 0 )
171259 }
172260
173261 // inferSchema is a constructor to Schema
@@ -187,32 +275,4 @@ impl Event {
187275 json:: reader:: DecoderOptions :: new ( ) . with_batch_size ( 1024 ) ,
188276 )
189277 }
190-
191- fn body_size ( & self ) -> u64 {
192- self . body . as_bytes ( ) . len ( ) as u64
193- }
194-
195- // convert arrow record batch to parquet
196- // and write it to local cache path as a data.parquet file.
197- fn convert_arrow_parquet ( & self , rb : RecordBatch ) -> Result < u64 , Error > {
198- let parquet_path = self . data_file_path ( ) ;
199- let parquet_file = fs:: File :: create ( & parquet_path) ?;
200- let props = WriterProperties :: builder ( ) . build ( ) ;
201- let mut writer =
202- ArrowWriter :: try_new ( parquet_file, Arc :: new ( self . infer_schema ( ) ?) , Some ( props) ) ?;
203- writer. write ( & rb) ?;
204- writer. close ( ) ?;
205-
206- let compressed_size = fs:: metadata ( parquet_path) ?. len ( ) ;
207-
208- Ok ( compressed_size)
209- }
210-
211- pub fn convert_parquet_rb_reader ( & self ) -> Result < ParquetFileArrowReader , Error > {
212- let file = fs:: File :: open ( & self . data_file_path ( ) ) ?;
213- let file_reader = SerializedFileReader :: new ( file) ?;
214- let arrow_reader = ParquetFileArrowReader :: new ( Arc :: new ( file_reader) ) ;
215-
216- Ok ( arrow_reader)
217- }
218278}
0 commit comments