@@ -22,11 +22,15 @@ mod mem_writer;
2222
2323use std:: {
2424 collections:: HashMap ,
25- sync:: { Arc , Mutex , RwLock } ,
25+ sync:: { Arc , Mutex , RwLock , RwLockWriteGuard } ,
26+ } ;
27+
28+ use crate :: {
29+ option:: { Mode , CONFIG } ,
30+ utils,
2631} ;
2732
2833use self :: { errors:: StreamWriterError , file_writer:: FileWriter , mem_writer:: MemWriter } ;
29- use crate :: utils;
3034use arrow_array:: { RecordBatch , TimestampMillisecondArray } ;
3135use arrow_schema:: Schema ;
3236use chrono:: NaiveDateTime ;
@@ -62,6 +66,11 @@ impl Writer {
6266 self . mem . push ( schema_key, rb) ;
6367 Ok ( ( ) )
6468 }
69+
70+ fn push_mem ( & mut self , schema_key : & str , rb : RecordBatch ) -> Result < ( ) , StreamWriterError > {
71+ self . mem . push ( schema_key, rb) ;
72+ Ok ( ( ) )
73+ }
6574}
6675
6776#[ derive( Deref , DerefMut , Default ) ]
@@ -80,7 +89,8 @@ impl WriterTable {
8089
8190 match hashmap_guard. get ( stream_name) {
8291 Some ( stream_writer) => {
83- stream_writer. lock ( ) . unwrap ( ) . push (
92+ self . handle_existing_writer (
93+ stream_writer,
8494 stream_name,
8595 schema_key,
8696 record,
@@ -89,26 +99,84 @@ impl WriterTable {
8999 }
90100 None => {
91101 drop ( hashmap_guard) ;
92- let mut map = self . write ( ) . unwrap ( ) ;
102+ let map = self . write ( ) . unwrap ( ) ;
93103 // check for race condition
94104 // if map contains entry then just
95- if let Some ( writer) = map. get ( stream_name) {
105+ self . handle_missing_writer ( map, stream_name, schema_key, record, parsed_timestamp) ?;
106+ }
107+ } ;
108+ Ok ( ( ) )
109+ }
110+
111+ fn handle_existing_writer (
112+ & self ,
113+ stream_writer : & Mutex < Writer > ,
114+ stream_name : & str ,
115+ schema_key : & str ,
116+ record : RecordBatch ,
117+ parsed_timestamp : NaiveDateTime ,
118+ ) -> Result < ( ) , StreamWriterError > {
119+ if CONFIG . parseable . mode != Mode :: Query {
120+ stream_writer. lock ( ) . unwrap ( ) . push (
121+ stream_name,
122+ schema_key,
123+ record,
124+ parsed_timestamp,
125+ ) ?;
126+ } else {
127+ stream_writer
128+ . lock ( )
129+ . unwrap ( )
130+ . push_mem ( stream_name, record) ?;
131+ }
132+
133+ Ok ( ( ) )
134+ }
135+
136+ fn handle_missing_writer (
137+ & self ,
138+ mut map : RwLockWriteGuard < HashMap < String , Mutex < Writer > > > ,
139+ stream_name : & str ,
140+ schema_key : & str ,
141+ record : RecordBatch ,
142+ parsed_timestamp : NaiveDateTime ,
143+ ) -> Result < ( ) , StreamWriterError > {
144+ match map. get ( stream_name) {
145+ Some ( writer) => {
146+ if CONFIG . parseable . mode != Mode :: Query {
96147 writer. lock ( ) . unwrap ( ) . push (
97148 stream_name,
98149 schema_key,
99150 record,
100151 parsed_timestamp,
101152 ) ?;
102153 } else {
154+ writer. lock ( ) . unwrap ( ) . push_mem ( stream_name, record) ?;
155+ }
156+ }
157+ None => {
158+ if CONFIG . parseable . mode != Mode :: Query {
103159 let mut writer = Writer :: default ( ) ;
104160 writer. push ( stream_name, schema_key, record, parsed_timestamp) ?;
105161 map. insert ( stream_name. to_owned ( ) , Mutex :: new ( writer) ) ;
162+ } else {
163+ let mut writer = Writer :: default ( ) ;
164+ writer. push_mem ( schema_key, record) ?;
165+ map. insert ( stream_name. to_owned ( ) , Mutex :: new ( writer) ) ;
106166 }
107167 }
108- } ;
168+ }
109169 Ok ( ( ) )
110170 }
111171
172+ pub fn clear ( & self , stream_name : & str ) {
173+ let map = self . write ( ) . unwrap ( ) ;
174+ if let Some ( writer) = map. get ( stream_name) {
175+ let w = & mut writer. lock ( ) . unwrap ( ) . mem ;
176+ w. clear ( ) ;
177+ }
178+ }
179+
112180 pub fn delete_stream ( & self , stream_name : & str ) {
113181 self . write ( ) . unwrap ( ) . remove ( stream_name) ;
114182 }
0 commit comments