Skip to content

Commit 82f2aa9

Browse files
committed
fix: bug in sending data more than 4MB
Need to update the `push_logs_unchecked` function as event processing has changed Need to clean up the `do_get` function for airplane
1 parent 5cd4986 commit 82f2aa9

File tree

4 files changed

+107
-55
lines changed

4 files changed

+107
-55
lines changed

server/src/event/writer.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ impl Writer {
6767
Ok(())
6868
}
6969

70-
7170
fn push_mem(&mut self, schema_key: &str, rb: RecordBatch) -> Result<(), StreamWriterError> {
7271
self.mem.push(schema_key, rb);
7372
Ok(())
@@ -95,8 +94,8 @@ impl WriterTable {
9594
stream_name,
9695
schema_key,
9796
record,
98-
parsed_timestamp,
99-
)?;
97+
parsed_timestamp,
98+
)?;
10099
} else {
101100
stream_writer
102101
.lock()
@@ -115,15 +114,11 @@ impl WriterTable {
115114
stream_name,
116115
schema_key,
117116
record,
118-
parsed_timestamp,
119-
)?;
117+
parsed_timestamp,
118+
)?;
120119
} else {
121120
writer.lock().unwrap().push_mem(stream_name, record)?;
122121
}
123-
} else if CONFIG.parseable.mode != Mode::Query {
124-
let mut writer = Writer::default();
125-
writer.push(stream_name, schema_key, record, parsed_timestamp)?;
126-
map.insert(stream_name.to_owned(), Mutex::new(writer));
127122
} else {
128123
let mut writer = Writer::default();
129124
writer.push(stream_name, schema_key, record, parsed_timestamp)?;

server/src/handlers/airplane.rs

Lines changed: 57 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use arrow_array::RecordBatch;
2+
use arrow_flight::encode::FlightDataEncoderBuilder;
23
use arrow_flight::flight_service_server::FlightServiceServer;
34
use arrow_flight::PollInfo;
45
use arrow_schema::{ArrowError, Schema};
56
use arrow_select::concat::concat_batches;
67
use chrono::Utc;
7-
use crossterm::event;
88
use datafusion::common::tree_node::TreeNode;
99
use std::net::SocketAddr;
1010
use std::sync::Arc;
@@ -28,14 +28,14 @@ use crate::handlers::livetail::cross_origin_config;
2828
use crate::handlers::http::query::{authorize_and_set_filter_tags, into_query};
2929
use crate::query::{TableScanVisitor, QUERY_SESSION};
3030
use crate::storage::object_storage::commit_schema_to_storage;
31-
use crate::utils::arrow::flight::{get_query_from_ticket, run_do_get_rpc};
31+
use crate::utils::arrow::flight::{add_temporary_events, get_query_from_ticket, run_do_get_rpc};
3232
use arrow_flight::{
3333
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
3434
FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc,
3535
SchemaResult, Ticket,
3636
};
37-
use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
38-
use futures::stream::BoxStream;
37+
use arrow_ipc::writer::IpcWriteOptions;
38+
use futures::{stream, TryStreamExt};
3939
use tonic::{Request, Response, Status, Streaming};
4040

4141
use crate::handlers::livetail::extract_session_key;
@@ -50,13 +50,13 @@ pub struct AirServiceImpl {}
5050

5151
#[tonic::async_trait]
5252
impl FlightService for AirServiceImpl {
53-
type HandshakeStream = BoxStream<'static, Result<HandshakeResponse, Status>>;
54-
type ListFlightsStream = BoxStream<'static, Result<FlightInfo, Status>>;
55-
type DoGetStream = BoxStream<'static, Result<FlightData, Status>>;
56-
type DoPutStream = BoxStream<'static, Result<PutResult, Status>>;
57-
type DoActionStream = BoxStream<'static, Result<arrow_flight::Result, Status>>;
58-
type ListActionsStream = BoxStream<'static, Result<ActionType, Status>>;
59-
type DoExchangeStream = BoxStream<'static, Result<FlightData, Status>>;
53+
type HandshakeStream = stream::BoxStream<'static, Result<HandshakeResponse, Status>>;
54+
type ListFlightsStream = stream::BoxStream<'static, Result<FlightInfo, Status>>;
55+
type DoGetStream = stream::BoxStream<'static, Result<FlightData, Status>>;
56+
type DoPutStream = stream::BoxStream<'static, Result<PutResult, Status>>;
57+
type DoActionStream = stream::BoxStream<'static, Result<arrow_flight::Result, Status>>;
58+
type ListActionsStream = stream::BoxStream<'static, Result<ActionType, Status>>;
59+
type DoExchangeStream = stream::BoxStream<'static, Result<FlightData, Status>>;
6060

6161
async fn handshake(
6262
&self,
@@ -175,24 +175,26 @@ impl FlightService for AirServiceImpl {
175175
let mut batches = run_do_get_rpc(im, sql.clone()).await?;
176176
minute_result.append(&mut batches);
177177
}
178-
let mr = minute_result.iter().map(|rb| rb).collect::<Vec<_>>();
179-
let schema = STREAM_INFO
180-
.schema(&stream_name)
181-
.map_err(|err| Status::failed_precondition(format!("Metadata Error: {}", err)))?;
182-
let rb = concat_batches(&schema, mr)
183-
.map_err(|err| Status::failed_precondition(format!("ArrowError: {}", err)))?;
184-
185-
let event = push_logs_unchecked(rb, &stream_name)
186-
.await
187-
.map_err(|err| Status::internal(err.to_string()))?;
188-
let mut events = vec![];
189-
for batch in minute_result {
190-
events.push(
191-
push_logs_unchecked(batch, &stream_name)
192-
.await
193-
.map_err(|err| Status::internal(err.to_string()))?,
194-
);
195-
}
178+
let mr = minute_result.iter().collect::<Vec<_>>();
179+
let event = add_temporary_events(&stream_name, mr).await?;
180+
181+
// let schema = STREAM_INFO
182+
// .schema(&stream_name)
183+
// .map_err(|err| Status::failed_precondition(format!("Metadata Error: {}", err)))?;
184+
// let rb = concat_batches(&schema, mr)
185+
// .map_err(|err| Status::failed_precondition(format!("ArrowError: {}", err)))?;
186+
//
187+
// let event = push_logs_unchecked(rb, &stream_name)
188+
// .await
189+
// .map_err(|err| Status::internal(err.to_string()))?;
190+
// let mut events = vec![];
191+
// for batch in minute_result {
192+
// events.push(
193+
// push_logs_unchecked(batch, &stream_name)
194+
// .await
195+
// .map_err(|err| Status::internal(err.to_string()))?,
196+
// );
197+
// }
196198
Some(event)
197199
} else {
198200
None
@@ -216,22 +218,30 @@ impl FlightService for AirServiceImpl {
216218
.map(|batch| batch.schema())
217219
.map(|s| s.as_ref().clone())
218220
.collect::<Vec<_>>();
219-
220221
let schema = Schema::try_merge(schemas).map_err(|err| Status::internal(err.to_string()))?;
221-
let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
222-
let schema_flight_data = SchemaAsIpc::new(&schema, &options);
223-
224-
let mut flights = vec![FlightData::from(schema_flight_data)];
225-
let encoder = IpcDataGenerator::default();
226-
let mut tracker = DictionaryTracker::new(false);
227-
for batch in &results {
228-
let (flight_dictionaries, flight_batch) = encoder
229-
.encoded_batch(batch, &mut tracker, &options)
230-
.map_err(|e| Status::internal(e.to_string()))?;
231-
flights.extend(flight_dictionaries.into_iter().map(Into::into));
232-
flights.push(flight_batch.into());
233-
}
234-
let output = futures::stream::iter(flights.into_iter().map(Ok));
222+
let input_stream = futures::stream::iter(results.into_iter().map(Ok));
223+
224+
let flight_data_stream = FlightDataEncoderBuilder::new()
225+
.with_max_flight_data_size(usize::MAX)
226+
.with_schema(schema.into())
227+
.build(input_stream);
228+
229+
let flight_data_stream = flight_data_stream.map_err(|err| Status::unknown(err.to_string()));
230+
231+
// let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
232+
// let schema_flight_data = SchemaAsIpc::new(&schema, &options);
233+
//
234+
// let mut flights = vec![FlightData::from(schema_flight_data)];
235+
// let encoder = IpcDataGenerator::default();
236+
// let mut tracker = DictionaryTracker::new(false);
237+
// for batch in &results {
238+
// let (flight_dictionaries, flight_batch) = encoder
239+
// .encoded_batch(batch, &mut tracker, &options)
240+
// .map_err(|e| Status::internal(e.to_string()))?;
241+
// flights.extend(flight_dictionaries.into_iter().map(Into::into));
242+
// flights.push(flight_batch.into());
243+
// }
244+
// let output = futures::stream::iter(flights.into_iter().map(Ok));
235245
if let Some(events) = events {
236246
events.clear(&stream_name);
237247
// for event in events {
@@ -244,7 +254,9 @@ impl FlightService for AirServiceImpl {
244254
.with_label_values(&[&format!("flight-query-{}", stream_name)])
245255
.observe(time);
246256

247-
Ok(Response::new(Box::pin(output) as Self::DoGetStream))
257+
Ok(Response::new(
258+
Box::pin(flight_data_stream) as Self::DoGetStream
259+
))
248260
}
249261

250262
async fn do_put(

server/src/handlers/http/ingest.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,23 @@ pub async fn push_logs_unchecked(
102102
batches: RecordBatch,
103103
stream_name: &str,
104104
) -> Result<event::Event, PostError> {
105-
todo!("timepartition fix");
105+
todo!("need to fix");
106+
107+
// let glob_storage = CONFIG.storage().get_object_store();
108+
// let object_store_format = glob_storage
109+
// .get_object_store_format(&stream_name)
110+
// .await
111+
// .map_err(|_| PostError::StreamNotFound(stream_name.to_string()))?;
112+
113+
// let time_partition = object_store_format.time_partition;
114+
// let time_partition_limit = object_store_format.time_partition_limit;
106115

107116
// let event = event::Event {
108117
// rb: batches,
109118
// stream_name: stream_name.to_string(),
110119
// origin_format: "json",
111120
// origin_size: 0,
121+
// time_partition,
112122
// is_first_event: true,
113123
// };
114124
// event.process_unchecked()?;

server/src/utils/arrow/flight.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1+
use crate::event::Event;
2+
use crate::handlers::http::ingest::push_logs_unchecked;
13
use crate::handlers::http::query::Query as QueryJson;
4+
use crate::metadata::STREAM_INFO;
25
use crate::{
36
handlers::http::modal::IngestorMetadata,
47
option::{Mode, CONFIG},
58
};
69
use arrow_array::RecordBatch;
710
use arrow_flight::Ticket;
11+
use arrow_select::concat::concat_batches;
812
use futures::TryStreamExt;
913
use serde_json::Value as JsonValue;
1014
use tonic::{Request, Status};
@@ -66,3 +70,34 @@ pub async fn run_do_get_rpc(im: IngestorMetadata, sql: String) -> Result<Vec<Rec
6670

6771
Ok(response.try_collect().await?)
6872
}
73+
74+
/// all the records from the ingesters are concatinated into one event and pushed to memory
75+
pub async fn add_temporary_events(
76+
stream_name: &str,
77+
minute_result: Vec<&RecordBatch>,
78+
) -> Result<
79+
//Vec<Event>
80+
Event,
81+
Status,
82+
> {
83+
let schema = STREAM_INFO
84+
.schema(stream_name)
85+
.map_err(|err| Status::failed_precondition(format!("Metadata Error: {}", err)))?;
86+
let rb = concat_batches(&schema, minute_result)
87+
.map_err(|err| Status::failed_precondition(format!("ArrowError: {}", err)))?;
88+
89+
let event = push_logs_unchecked(rb, stream_name)
90+
.await
91+
.map_err(|err| Status::internal(err.to_string()))?;
92+
// let mut events = vec![];
93+
// for batch in minute_result {
94+
// events.push(
95+
// push_logs_unchecked(batch, &stream_name)
96+
// .await
97+
// .map_err(|err| Status::internal(err.to_string()))?,
98+
// );
99+
// }
100+
//
101+
// Ok(events)
102+
Ok(event)
103+
}

0 commit comments

Comments
 (0)