Skip to content

Commit 321b0be

Browse files
committed
fix: bug in sending data more than 4MB
Need to update the `push_logs_unchecked` function as event processing has changed Need to clean up the `do_get` function for airplane
1 parent 5cd4986 commit 321b0be

File tree

6 files changed

+178
-97
lines changed

6 files changed

+178
-97
lines changed

server/src/event.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use std::sync::Arc;
2828

2929
use self::error::EventError;
3030
pub use self::writer::STREAM_WRITERS;
31-
use crate::metadata;
31+
use crate::{handlers::http::ingest::PostError, metadata};
3232
use chrono::NaiveDateTime;
3333

3434
pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
@@ -86,12 +86,18 @@ impl Event {
8686
Ok(())
8787
}
8888

89-
pub fn process_unchecked(&self) -> Result<(), EventError> {
89+
pub fn process_unchecked(self) -> Result<Self, PostError> {
9090
let key = get_schema_key(&self.rb.schema().fields);
9191

92-
Self::process_event(&self.stream_name, &key, self.rb.clone(), self.parsed_timestamp)?;
92+
Self::process_event(
93+
&self.stream_name,
94+
&key,
95+
self.rb.clone(),
96+
self.parsed_timestamp,
97+
)
98+
.map_err(PostError::Event)?;
9399

94-
Ok(())
100+
Ok(self)
95101
}
96102

97103
pub fn clear(&self, stream_name: &str) {

server/src/event/writer.rs

Lines changed: 67 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ mod mem_writer;
2222

2323
use std::{
2424
collections::HashMap,
25-
sync::{Arc, Mutex, RwLock},
25+
sync::{Arc, Mutex, RwLock, RwLockWriteGuard},
2626
};
2727

2828
use crate::{
@@ -67,7 +67,6 @@ impl Writer {
6767
Ok(())
6868
}
6969

70-
7170
fn push_mem(&mut self, schema_key: &str, rb: RecordBatch) -> Result<(), StreamWriterError> {
7271
self.mem.push(schema_key, rb);
7372
Ok(())
@@ -90,61 +89,97 @@ impl WriterTable {
9089

9190
match hashmap_guard.get(stream_name) {
9291
Some(stream_writer) => {
93-
if CONFIG.parseable.mode != Mode::Query {
94-
stream_writer.lock().unwrap().push(
95-
stream_name,
96-
schema_key,
97-
record,
92+
self.handle_existing_writer(
93+
stream_writer,
94+
stream_name,
95+
schema_key,
96+
record,
9897
parsed_timestamp,
9998
)?;
100-
} else {
101-
stream_writer
102-
.lock()
103-
.unwrap()
104-
.push_mem(stream_name, record)?;
105-
}
10699
}
107100
None => {
108101
drop(hashmap_guard);
109-
let mut map = self.write().unwrap();
102+
let map = self.write().unwrap();
110103
// check for race condition
111104
// if map contains entry then just
112-
if let Some(writer) = map.get(stream_name) {
113-
if CONFIG.parseable.mode != Mode::Query {
114-
writer.lock().unwrap().push(
115-
stream_name,
116-
schema_key,
117-
record,
105+
self.handle_missing_writer(
106+
map,
107+
stream_name,
108+
schema_key,
109+
record,
110+
parsed_timestamp,
111+
)?;
112+
}
113+
};
114+
Ok(())
115+
}
116+
117+
fn handle_existing_writer(
118+
&self,
119+
stream_writer: &Mutex<Writer>,
120+
stream_name: &str,
121+
schema_key: &str,
122+
record: RecordBatch,
123+
parsed_timestamp: NaiveDateTime,
124+
) -> Result<(), StreamWriterError> {
125+
if CONFIG.parseable.mode != Mode::Query {
126+
stream_writer.lock().unwrap().push(
127+
stream_name,
128+
schema_key,
129+
record,
130+
parsed_timestamp,
131+
)?;
132+
} else {
133+
stream_writer
134+
.lock()
135+
.unwrap()
136+
.push_mem(stream_name, record)?;
137+
}
138+
139+
Ok(())
140+
}
141+
142+
fn handle_missing_writer(
143+
&self,
144+
mut map: RwLockWriteGuard<HashMap<String, Mutex<Writer>>>,
145+
stream_name: &str,
146+
schema_key: &str,
147+
record: RecordBatch,
148+
parsed_timestamp: NaiveDateTime,
149+
) -> Result<(), StreamWriterError> {
150+
match map.get(stream_name) {
151+
Some(writer) => {
152+
if CONFIG.parseable.mode != Mode::Query {
153+
writer.lock().unwrap().push(
154+
stream_name,
155+
schema_key,
156+
record,
118157
parsed_timestamp,
119158
)?;
120-
} else {
121-
writer.lock().unwrap().push_mem(stream_name, record)?;
122-
}
123-
} else if CONFIG.parseable.mode != Mode::Query {
159+
} else {
160+
writer.lock().unwrap().push_mem(stream_name, record)?;
161+
}
162+
}
163+
None => {
164+
if CONFIG.parseable.mode != Mode::Query {
124165
let mut writer = Writer::default();
125166
writer.push(stream_name, schema_key, record, parsed_timestamp)?;
126167
map.insert(stream_name.to_owned(), Mutex::new(writer));
127168
} else {
128169
let mut writer = Writer::default();
129-
writer.push(stream_name, schema_key, record, parsed_timestamp)?;
170+
writer.push_mem(schema_key, record)?;
130171
map.insert(stream_name.to_owned(), Mutex::new(writer));
131172
}
132173
}
133-
};
174+
}
134175
Ok(())
135176
}
136177

137178
pub fn clear(&self, stream_name: &str) {
138179
let map = self.write().unwrap();
139180
if let Some(writer) = map.get(stream_name) {
140181
let w = &mut writer.lock().unwrap().mem;
141-
dbg!(&w.read_buffer.len());
142-
dbg!(&w.mutable_buffer.inner.len());
143-
144-
dbg!(&w.mutable_buffer.inner);
145182
w.clear();
146-
dbg!(&w.read_buffer.len());
147-
dbg!(&w.mutable_buffer.inner.len());
148183
}
149184
}
150185

server/src/handlers/airplane.rs

Lines changed: 63 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
use arrow_array::RecordBatch;
2+
use arrow_flight::encode::FlightDataEncoderBuilder;
23
use arrow_flight::flight_service_server::FlightServiceServer;
34
use arrow_flight::PollInfo;
45
use arrow_schema::{ArrowError, Schema};
5-
use arrow_select::concat::concat_batches;
66
use chrono::Utc;
7-
use crossterm::event;
87
use datafusion::common::tree_node::TreeNode;
98
use std::net::SocketAddr;
109
use std::sync::Arc;
@@ -19,7 +18,6 @@ use tonic_web::GrpcWebLayer;
1918
use crate::event::commit_schema;
2019
use crate::handlers::http::cluster::get_ingestor_info;
2120
use crate::handlers::http::fetch_schema;
22-
use crate::handlers::http::ingest::push_logs_unchecked;
2321
use crate::metrics::QUERY_EXECUTE_TIME;
2422
use crate::option::{Mode, CONFIG};
2523

@@ -28,14 +26,14 @@ use crate::handlers::livetail::cross_origin_config;
2826
use crate::handlers::http::query::{authorize_and_set_filter_tags, into_query};
2927
use crate::query::{TableScanVisitor, QUERY_SESSION};
3028
use crate::storage::object_storage::commit_schema_to_storage;
31-
use crate::utils::arrow::flight::{get_query_from_ticket, run_do_get_rpc};
29+
use crate::utils::arrow::flight::{append_temporary_events, get_query_from_ticket, run_do_get_rpc};
3230
use arrow_flight::{
3331
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
3432
FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc,
3533
SchemaResult, Ticket,
3634
};
37-
use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
38-
use futures::stream::BoxStream;
35+
use arrow_ipc::writer::IpcWriteOptions;
36+
use futures::{stream, TryStreamExt};
3937
use tonic::{Request, Response, Status, Streaming};
4038

4139
use crate::handlers::livetail::extract_session_key;
@@ -50,13 +48,13 @@ pub struct AirServiceImpl {}
5048

5149
#[tonic::async_trait]
5250
impl FlightService for AirServiceImpl {
53-
type HandshakeStream = BoxStream<'static, Result<HandshakeResponse, Status>>;
54-
type ListFlightsStream = BoxStream<'static, Result<FlightInfo, Status>>;
55-
type DoGetStream = BoxStream<'static, Result<FlightData, Status>>;
56-
type DoPutStream = BoxStream<'static, Result<PutResult, Status>>;
57-
type DoActionStream = BoxStream<'static, Result<arrow_flight::Result, Status>>;
58-
type ListActionsStream = BoxStream<'static, Result<ActionType, Status>>;
59-
type DoExchangeStream = BoxStream<'static, Result<FlightData, Status>>;
51+
type HandshakeStream = stream::BoxStream<'static, Result<HandshakeResponse, Status>>;
52+
type ListFlightsStream = stream::BoxStream<'static, Result<FlightInfo, Status>>;
53+
type DoGetStream = stream::BoxStream<'static, Result<FlightData, Status>>;
54+
type DoPutStream = stream::BoxStream<'static, Result<PutResult, Status>>;
55+
type DoActionStream = stream::BoxStream<'static, Result<arrow_flight::Result, Status>>;
56+
type ListActionsStream = stream::BoxStream<'static, Result<ActionType, Status>>;
57+
type DoExchangeStream = stream::BoxStream<'static, Result<FlightData, Status>>;
6058

6159
async fn handshake(
6260
&self,
@@ -175,24 +173,27 @@ impl FlightService for AirServiceImpl {
175173
let mut batches = run_do_get_rpc(im, sql.clone()).await?;
176174
minute_result.append(&mut batches);
177175
}
178-
let mr = minute_result.iter().map(|rb| rb).collect::<Vec<_>>();
179-
let schema = STREAM_INFO
180-
.schema(&stream_name)
181-
.map_err(|err| Status::failed_precondition(format!("Metadata Error: {}", err)))?;
182-
let rb = concat_batches(&schema, mr)
183-
.map_err(|err| Status::failed_precondition(format!("ArrowError: {}", err)))?;
184-
185-
let event = push_logs_unchecked(rb, &stream_name)
186-
.await
187-
.map_err(|err| Status::internal(err.to_string()))?;
188-
let mut events = vec![];
189-
for batch in minute_result {
190-
events.push(
191-
push_logs_unchecked(batch, &stream_name)
192-
.await
193-
.map_err(|err| Status::internal(err.to_string()))?,
194-
);
195-
}
176+
let mr = minute_result.iter().collect::<Vec<_>>();
177+
dbg!(&mr.len());
178+
let event = append_temporary_events(&stream_name, mr).await?;
179+
180+
// let schema = STREAM_INFO
181+
// .schema(&stream_name)
182+
// .map_err(|err| Status::failed_precondition(format!("Metadata Error: {}", err)))?;
183+
// let rb = concat_batches(&schema, mr)
184+
// .map_err(|err| Status::failed_precondition(format!("ArrowError: {}", err)))?;
185+
//
186+
// let event = push_logs_unchecked(rb, &stream_name)
187+
// .await
188+
// .map_err(|err| Status::internal(err.to_string()))?;
189+
// let mut events = vec![];
190+
// for batch in minute_result {
191+
// events.push(
192+
// push_logs_unchecked(batch, &stream_name)
193+
// .await
194+
// .map_err(|err| Status::internal(err.to_string()))?,
195+
// );
196+
// }
196197
Some(event)
197198
} else {
198199
None
@@ -210,28 +211,39 @@ impl FlightService for AirServiceImpl {
210211
.await
211212
.map_err(|err| Status::internal(err.to_string()))
212213
.unwrap();
214+
if results.len() > 1 {
215+
dbg!(&results.len());
216+
}
213217

214218
let schemas = results
215219
.iter()
216220
.map(|batch| batch.schema())
217221
.map(|s| s.as_ref().clone())
218222
.collect::<Vec<_>>();
219-
220223
let schema = Schema::try_merge(schemas).map_err(|err| Status::internal(err.to_string()))?;
221-
let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
222-
let schema_flight_data = SchemaAsIpc::new(&schema, &options);
223-
224-
let mut flights = vec![FlightData::from(schema_flight_data)];
225-
let encoder = IpcDataGenerator::default();
226-
let mut tracker = DictionaryTracker::new(false);
227-
for batch in &results {
228-
let (flight_dictionaries, flight_batch) = encoder
229-
.encoded_batch(batch, &mut tracker, &options)
230-
.map_err(|e| Status::internal(e.to_string()))?;
231-
flights.extend(flight_dictionaries.into_iter().map(Into::into));
232-
flights.push(flight_batch.into());
233-
}
234-
let output = futures::stream::iter(flights.into_iter().map(Ok));
224+
let input_stream = futures::stream::iter(results.into_iter().map(Ok));
225+
226+
let flight_data_stream = FlightDataEncoderBuilder::new()
227+
.with_max_flight_data_size(usize::MAX)
228+
.with_schema(schema.into())
229+
.build(input_stream);
230+
231+
let flight_data_stream = flight_data_stream.map_err(|err| Status::unknown(err.to_string()));
232+
233+
// let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
234+
// let schema_flight_data = SchemaAsIpc::new(&schema, &options);
235+
//
236+
// let mut flights = vec![FlightData::from(schema_flight_data)];
237+
// let encoder = IpcDataGenerator::default();
238+
// let mut tracker = DictionaryTracker::new(false);
239+
// for batch in &results {
240+
// let (flight_dictionaries, flight_batch) = encoder
241+
// .encoded_batch(batch, &mut tracker, &options)
242+
// .map_err(|e| Status::internal(e.to_string()))?;
243+
// flights.extend(flight_dictionaries.into_iter().map(Into::into));
244+
// flights.push(flight_batch.into());
245+
// }
246+
// let output = futures::stream::iter(flights.into_iter().map(Ok));
235247
if let Some(events) = events {
236248
events.clear(&stream_name);
237249
// for event in events {
@@ -244,7 +256,9 @@ impl FlightService for AirServiceImpl {
244256
.with_label_values(&[&format!("flight-query-{}", stream_name)])
245257
.observe(time);
246258

247-
Ok(Response::new(Box::pin(output) as Self::DoGetStream))
259+
Ok(Response::new(
260+
Box::pin(flight_data_stream) as Self::DoGetStream
261+
))
248262
}
249263

250264
async fn do_put(
@@ -297,9 +311,8 @@ pub fn server() -> impl Future<Output = Result<(), Box<dyn std::error::Error + S
297311
let svc = FlightServiceServer::new(service)
298312
.max_encoding_message_size(usize::MAX)
299313
.max_decoding_message_size(usize::MAX)
300-
.send_compressed(CompressionEncoding::Gzip);
301-
302-
dbg!(&svc);
314+
.send_compressed(CompressionEncoding::Zstd)
315+
.accept_compressed(CompressionEncoding::Zstd);
303316

304317
let cors = cross_origin_config();
305318

server/src/handlers/http/ingest.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -102,17 +102,16 @@ pub async fn push_logs_unchecked(
102102
batches: RecordBatch,
103103
stream_name: &str,
104104
) -> Result<event::Event, PostError> {
105-
todo!("timepartition fix");
106-
107-
// let event = event::Event {
108-
// rb: batches,
109-
// stream_name: stream_name.to_string(),
110-
// origin_format: "json",
111-
// origin_size: 0,
112-
// is_first_event: true,
113-
// };
114-
// event.process_unchecked()?;
115-
// Ok(event)
105+
event::Event {
106+
rb: batches,
107+
stream_name: stream_name.to_string(),
108+
origin_format: "json",
109+
origin_size: 0,
110+
parsed_timestamp: Utc::now().naive_utc(),
111+
time_partition: None,
112+
is_first_event: true, // NOTE: Maybe should be false
113+
}
114+
.process_unchecked()
116115
}
117116

118117
async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result<(), PostError> {

server/src/handlers/http/query.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
9999
let time = Instant::now();
100100

101101
let (records, fields) = query.execute(table_name.clone()).await?;
102+
if records.len() > 1 {
103+
dbg!(&records.len());
104+
}
102105
let response = QueryResponse {
103106
records,
104107
fields,

0 commit comments

Comments
 (0)