@@ -29,16 +29,17 @@ use std::fs::OpenOptions;
2929use std:: io:: BufReader ;
3030use std:: sync:: Arc ;
3131use std:: sync:: Mutex ;
32+ use std:: sync:: MutexGuard ;
3233use std:: sync:: RwLock ;
3334
3435use crate :: metadata;
35- use crate :: metadata:: STREAM_INFO ;
3636use crate :: option:: CONFIG ;
3737use crate :: response;
3838use crate :: storage:: ObjectStorage ;
3939use crate :: Error ;
4040
4141type LocalWriter = Mutex < Option < StreamWriter < std:: fs:: File > > > ;
42+ type LocalWriterGuard < ' a > = MutexGuard < ' a , Option < StreamWriter < std:: fs:: File > > > ;
4243
4344lazy_static ! {
4445 #[ derive( Default ) ]
@@ -47,102 +48,126 @@ lazy_static! {
4748
4849impl STREAM_WRITERS {
4950 // append to a existing stream
50- fn append_to_local ( stream : & str , record : & RecordBatch ) -> Result < ( ) , ( ) > {
51- let hashmap_guard = STREAM_WRITERS . read ( ) . unwrap ( ) ;
51+ fn append_to_local ( stream : & str , record : & RecordBatch ) -> Result < ( ) , StreamWriterError > {
52+ let hashmap_guard = STREAM_WRITERS
53+ . read ( )
54+ . map_err ( |_| StreamWriterError :: RwPoisioned ) ?;
55+
5256 match hashmap_guard. get ( stream) {
5357 Some ( localwriter) => {
54- let mut writer_guard = localwriter. lock ( ) . unwrap ( ) ;
58+ let mut writer_guard = localwriter
59+ . lock ( )
60+ . map_err ( |_| StreamWriterError :: MutexPoisioned ) ?;
61+
62+ // if it's some writer then we write without dropping any lock
63+ // hashmap cannot be brought mutably at any point until this finishes
5564 if let Some ( ref mut writer) = * writer_guard {
56- writer. write ( record) . map_err ( |_| ( ) ) ?;
65+ writer. write ( record) . map_err ( StreamWriterError :: Writer ) ?;
5766 } else {
58- drop ( writer_guard) ;
59- drop ( hashmap_guard) ;
60- STREAM_WRITERS :: set_entry ( stream, record) . unwrap ( ) ;
67+ // pass on this mutex to set entry so that it can be reused
68+ // we have a guard for underlying entry thus
69+ // hashmap must not be availible as mutable to any other thread
70+ STREAM_WRITERS :: set_entry ( writer_guard, stream, record) ?;
6171 }
6272 }
73+ // entry is not present thus we create it
6374 None => {
75+ // this requires mutable borrow of the map so we drop this read lock and wait for write lock
6476 drop ( hashmap_guard) ;
65- STREAM_WRITERS :: create_entry ( stream. to_string ( ) , record) . unwrap ( ) ;
77+ STREAM_WRITERS :: create_entry ( stream. to_string ( ) , record) ? ;
6678 }
6779 } ;
6880 Ok ( ( ) )
6981 }
7082
7183 // create a new entry with new stream_writer
72- // todo: error type
7384 // Only create entry for valid streams
74- fn create_entry ( stream : String , record : & RecordBatch ) -> Result < ( ) , ( ) > {
75- let mut hashmap_guard = STREAM_WRITERS . write ( ) . unwrap ( ) ;
76-
77- if STREAM_INFO . schema ( & stream) . is_err ( ) {
78- return Err ( ( ) ) ;
79- }
85+ fn create_entry ( stream : String , record : & RecordBatch ) -> Result < ( ) , StreamWriterError > {
86+ let mut hashmap_guard = STREAM_WRITERS
87+ . write ( )
88+ . map_err ( |_| StreamWriterError :: RwPoisioned ) ?;
8089
8190 let file = OpenOptions :: new ( )
8291 . append ( true )
8392 . create_new ( true )
8493 . open ( data_file_path ( & stream) )
85- . map_err ( |_| ( ) ) ?;
94+ . map_err ( StreamWriterError :: Io ) ?;
8695
87- let mut stream_writer = StreamWriter :: try_new ( file, & record. schema ( ) ) . map_err ( |_| ( ) ) ?;
88- stream_writer. write ( record) . map_err ( |_| ( ) ) ?;
96+ let mut stream_writer = StreamWriter :: try_new ( file, & record. schema ( ) )
97+ . expect ( "File and RecordBatch both are checked" ) ;
98+
99+ stream_writer
100+ . write ( record)
101+ . map_err ( StreamWriterError :: Writer ) ?;
89102
90103 hashmap_guard. insert ( stream, Mutex :: new ( Some ( stream_writer) ) ) ;
91104
92105 Ok ( ( ) )
93106 }
94107
95108 // Deleting a logstream requires that metadata is deleted first
96- pub fn delete_entry ( stream : & str ) -> Result < ( ) , ( ) > {
97- let mut hashmap_guard = STREAM_WRITERS . write ( ) . unwrap ( ) ;
98-
99- if STREAM_INFO . schema ( stream) . is_ok ( ) {
100- return Err ( ( ) ) ;
101- }
109+ pub fn delete_entry ( stream : & str ) -> Result < ( ) , StreamWriterError > {
110+ let mut hashmap_guard = STREAM_WRITERS
111+ . write ( )
112+ . map_err ( |_| StreamWriterError :: RwPoisioned ) ?;
102113
103114 hashmap_guard. remove ( stream) ;
104115
105116 Ok ( ( ) )
106117 }
107118
108- fn set_entry ( stream : & str , record : & RecordBatch ) -> Result < ( ) , ( ) > {
119+ fn set_entry (
120+ mut writer_guard : LocalWriterGuard ,
121+ stream : & str ,
122+ record : & RecordBatch ,
123+ ) -> Result < ( ) , StreamWriterError > {
109124 let file = OpenOptions :: new ( )
110125 . append ( true )
111126 . create_new ( true )
112127 . open ( data_file_path ( stream) )
113- . map_err ( |_| ( ) ) ?;
128+ . map_err ( StreamWriterError :: Io ) ?;
114129
115- let mut stream_writer = StreamWriter :: try_new ( file, & record. schema ( ) ) . map_err ( |_| ( ) ) ? ;
116- stream_writer . write ( record ) . map_err ( |_| ( ) ) ? ;
130+ let mut stream_writer = StreamWriter :: try_new ( file, & record. schema ( ) )
131+ . expect ( "File and RecordBatch both are checked" ) ;
117132
118- STREAM_WRITERS
119- . read ( )
120- . expect ( "Current Thread should not hold any lock" )
121- . get ( stream)
122- . expect ( "set entry is only called on valid entries" )
123- . lock ( )
124- . expect ( "Poisioning is not handled yet" )
125- . replace ( stream_writer) ; // replace the stream writer behind this mutex
133+ stream_writer
134+ . write ( record)
135+ . map_err ( StreamWriterError :: Writer ) ?;
136+
137+ writer_guard. replace ( stream_writer) ; // replace the stream writer behind this mutex
126138
127139 Ok ( ( ) )
128140 }
129141
130142 // Unset the entry so that
131- pub fn unset_entry ( stream : & str ) {
132- let guard = STREAM_WRITERS . read ( ) . unwrap ( ) ;
143+ pub fn unset_entry ( stream : & str ) -> Result < ( ) , StreamWriterError > {
144+ let guard = STREAM_WRITERS
145+ . read ( )
146+ . map_err ( |_| StreamWriterError :: RwPoisioned ) ?;
133147 let stream_writer = match guard. get ( stream) {
134148 Some ( writer) => writer,
135- None => return ,
149+ None => return Ok ( ( ) ) ,
136150 } ;
137151 stream_writer
138152 . lock ( )
139- . expect ( "Poisioning is not handled yet" )
153+ . map_err ( |_| StreamWriterError :: MutexPoisioned ) ?
140154 . take ( ) ;
155+
156+ Ok ( ( ) )
141157 }
142158}
143159
144160#[ derive( Debug , thiserror:: Error ) ]
145- enum StreamWriterError { }
161+ pub enum StreamWriterError {
162+ #[ error( "Arrow writer failed: {0}" ) ]
163+ Writer ( arrow:: error:: ArrowError ) ,
164+ #[ error( "Io Error when creating new file: {0}" ) ]
165+ Io ( std:: io:: Error ) ,
166+ #[ error( "RwLock was poisioned" ) ]
167+ RwPoisioned ,
168+ #[ error( "Mutex was poisioned" ) ]
169+ MutexPoisioned ,
170+ }
146171
147172fn data_file_path ( stream_name : & str ) -> String {
148173 format ! (
@@ -253,7 +278,7 @@ impl Event {
253278 let rb = event. next ( ) ?. ok_or ( Error :: MissingRecord ) ?;
254279 let stream_name = & self . stream_name ;
255280
256- STREAM_WRITERS :: append_to_local ( stream_name, & rb) . map_err ( |_| Error :: MissingRecord ) ? ;
281+ STREAM_WRITERS :: append_to_local ( stream_name, & rb) . unwrap ( ) ;
257282
258283 Ok ( 0 )
259284 }
0 commit comments