Skip to content

Commit 2069c4f

Browse files
committed
debugging
1 parent 87f2c9d commit 2069c4f

File tree

5 files changed

+67
-18
lines changed

5 files changed

+67
-18
lines changed

Cargo.lock

Lines changed: 27 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ parquet = "51.0.0"
2020

2121
### LiveTail server deps
2222
arrow-flight = "51.0.0"
23-
tonic = {version = "0.11.0", features = ["tls"] }
23+
tonic = {version = "0.11.0", features = ["tls", "transport", "gzip", "zstd"] }
2424
tonic-web = "0.11.0"
2525
tower-http = { version = "0.4.4", features = ["cors"] }
2626

server/src/event/writer.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,14 @@ impl WriterTable {
128128
pub fn clear(&self, stream_name: &str) {
129129
let map = self.write().unwrap();
130130
if let Some(writer) = map.get(stream_name) {
131-
writer.lock().unwrap().mem.clear();
131+
let w = &mut writer.lock().unwrap().mem;
132+
dbg!(&w.read_buffer.len());
133+
dbg!(&w.mutable_buffer.inner.len());
134+
135+
dbg!(&w.mutable_buffer.inner);
136+
w.clear();
137+
dbg!(&w.read_buffer.len());
138+
dbg!(&w.mutable_buffer.inner.len());
132139
}
133140
}
134141

server/src/event/writer/mem_writer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ pub struct MemWriter<const N: usize> {
3434
schema: Schema,
3535
// for checking uniqueness of schema
3636
schema_map: HashSet<String>,
37-
read_buffer: Vec<RecordBatch>,
38-
mutable_buffer: MutableBuffer<N>,
37+
pub read_buffer: Vec<RecordBatch>,
38+
pub mutable_buffer: MutableBuffer<N>,
3939
}
4040

4141
impl<const N: usize> Default for MemWriter<N> {
@@ -91,7 +91,7 @@ fn concat_records(schema: &Arc<Schema>, record: &[RecordBatch]) -> RecordBatch {
9191
}
9292

9393
#[derive(Debug, Default)]
94-
struct MutableBuffer<const N: usize> {
94+
pub struct MutableBuffer<const N: usize> {
9595
pub inner: Vec<RecordBatch>,
9696
pub rows: usize,
9797
}

server/src/handlers/airplane.rs

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@ use arrow_array::RecordBatch;
22
use arrow_flight::flight_service_server::FlightServiceServer;
33
use arrow_flight::PollInfo;
44
use arrow_schema::{ArrowError, Schema};
5+
use arrow_select::concat::concat_batches;
56
use chrono::Utc;
7+
use crossterm::event;
68
use datafusion::common::tree_node::TreeNode;
79
use std::net::SocketAddr;
810
use std::sync::Arc;
911
use std::time::Instant;
12+
use tonic::codec::CompressionEncoding;
1013

1114
use futures_util::{Future, TryFutureExt};
1215

@@ -42,7 +45,7 @@ use crate::rbac::Users;
4245
const L_CURLY: char = '{';
4346
const R_CURLY: char = '}';
4447

45-
#[derive(Clone)]
48+
#[derive(Clone, Debug)]
4649
pub struct AirServiceImpl {}
4750

4851
#[tonic::async_trait]
@@ -158,7 +161,7 @@ impl FlightService for AirServiceImpl {
158161

159162
let time_delta = query.end - Utc::now();
160163

161-
let events = if CONFIG.parseable.mode == Mode::Query && time_delta.num_seconds() < 1 {
164+
let events = if CONFIG.parseable.mode == Mode::Query && time_delta.num_seconds() < 2 {
162165
let sql = format!(
163166
"{}\"query\": \"select * from {}\"{}",
164167
L_CURLY, &stream_name, R_CURLY
@@ -172,6 +175,16 @@ impl FlightService for AirServiceImpl {
172175
let mut batches = run_do_get_rpc(im, sql.clone()).await?;
173176
minute_result.append(&mut batches);
174177
}
178+
let mr = minute_result.iter().map(|rb| rb).collect::<Vec<_>>();
179+
let schema = STREAM_INFO
180+
.schema(&stream_name)
181+
.map_err(|err| Status::failed_precondition(format!("Metadata Error: {}", err)))?;
182+
let rb = concat_batches(&schema, mr)
183+
.map_err(|err| Status::failed_precondition(format!("ArrowError: {}", err)))?;
184+
185+
let event = push_logs_unchecked(rb, &stream_name)
186+
.await
187+
.map_err(|err| Status::internal(err.to_string()))?;
175188
let mut events = vec![];
176189
for batch in minute_result {
177190
events.push(
@@ -180,7 +193,7 @@ impl FlightService for AirServiceImpl {
180193
.map_err(|err| Status::internal(err.to_string()))?,
181194
);
182195
}
183-
Some(events)
196+
Some(event)
184197
} else {
185198
None
186199
};
@@ -220,9 +233,10 @@ impl FlightService for AirServiceImpl {
220233
}
221234
let output = futures::stream::iter(flights.into_iter().map(Ok));
222235
if let Some(events) = events {
223-
for event in events {
224-
event.clear(&stream_name);
225-
}
236+
events.clear(&stream_name);
237+
// for event in events {
238+
// event.clear(&stream_name);
239+
// }
226240
}
227241

228242
let time = time.elapsed().as_secs_f64();
@@ -280,7 +294,12 @@ pub fn server() -> impl Future<Output = Result<(), Box<dyn std::error::Error + S
280294

281295
let service = AirServiceImpl {};
282296

283-
let svc = FlightServiceServer::new(service);
297+
let svc = FlightServiceServer::new(service)
298+
.max_encoding_message_size(usize::MAX)
299+
.max_decoding_message_size(usize::MAX)
300+
.send_compressed(CompressionEncoding::Gzip);
301+
302+
dbg!(&svc);
284303

285304
let cors = cross_origin_config();
286305

@@ -314,6 +333,7 @@ pub fn server() -> impl Future<Output = Result<(), Box<dyn std::error::Error + S
314333
};
315334

316335
server
336+
.max_frame_size(16 * 1024 * 1024 - 2)
317337
.accept_http1(true)
318338
.layer(cors)
319339
.layer(GrpcWebLayer::new())
@@ -322,6 +342,7 @@ pub fn server() -> impl Future<Output = Result<(), Box<dyn std::error::Error + S
322342
.map_err(err_map_fn)
323343
}
324344
None => Server::builder()
345+
.max_frame_size(16 * 1024 * 1024 - 2)
325346
.accept_http1(true)
326347
.layer(cors)
327348
.layer(GrpcWebLayer::new())

0 commit comments

Comments
 (0)