diff --git a/Cargo.lock b/Cargo.lock index bd1993ed9..11f113ff7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -71,7 +71,7 @@ dependencies = [ "tokio", "tokio-util", "tracing", - "zstd", + "zstd 0.13.1", ] [[package]] @@ -573,6 +573,7 @@ dependencies = [ "arrow-schema", "flatbuffers", "lz4_flex", + "zstd 0.13.1", ] [[package]] @@ -679,8 +680,8 @@ dependencies = [ "pin-project-lite", "tokio", "xz2", - "zstd", - "zstd-safe", + "zstd 0.13.1", + "zstd-safe 7.1.0", ] [[package]] @@ -1399,7 +1400,7 @@ dependencies = [ "url", "uuid", "xz2", - "zstd", + "zstd 0.13.1", ] [[package]] @@ -2869,7 +2870,7 @@ dependencies = [ "thrift", "tokio", "twox-hash", - "zstd", + "zstd 0.13.1", ] [[package]] @@ -4305,6 +4306,7 @@ dependencies = [ "axum", "base64 0.21.7", "bytes", + "flate2", "h2", "http 0.2.12", "http-body 0.4.6", @@ -4322,6 +4324,7 @@ dependencies = [ "tower-layer", "tower-service", "tracing", + "zstd 0.12.4", ] [[package]] @@ -5102,13 +5105,32 @@ dependencies = [ "flate2", ] +[[package]] +name = "zstd" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a27595e173641171fc74a1232b7b1c7a7cb6e18222c11e9dfb9888fa424c53c" +dependencies = [ + "zstd-safe 6.0.6", +] + [[package]] name = "zstd" version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d789b1514203a1120ad2429eae43a7bd32b90976a7bb8a05f7ec02fa88cc23a" dependencies = [ - "zstd-safe", + "zstd-safe 7.1.0", +] + +[[package]] +name = "zstd-safe" +version = "6.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee98ffd0b48ee95e6c5168188e44a54550b1564d9d530ee21d5f0eaed1069581" +dependencies = [ + "libc", + "zstd-sys", ] [[package]] diff --git a/server/Cargo.toml b/server/Cargo.toml index 6620de080..5d9eb69e2 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -12,15 +12,13 @@ build = "build.rs" arrow-schema = { version = "51.0.0", features = ["serde"] } arrow-array = { version = "51.0.0" } arrow-json = "51.0.0" -arrow-ipc = "51.0.0" +arrow-ipc = { version = "51.0.0", features = ["zstd"] } arrow-select = "51.0.0" datafusion = "37.1.0" object_store = { version = "0.9.1", features = ["cloud", "aws"] } parquet = "51.0.0" - -### LiveTail server deps -arrow-flight = "51.0.0" -tonic = {version = "0.11.0", features = ["tls"] } +arrow-flight = { version = "51.0.0", features = [ "tls" ] } +tonic = {version = "0.11.0", features = ["tls", "transport", "gzip", "zstd"] } tonic-web = "0.11.0" tower-http = { version = "0.4.4", features = ["cors"] } diff --git a/server/src/banner.rs b/server/src/banner.rs index ca665ffa4..fa817eac1 100644 --- a/server/src/banner.rs +++ b/server/src/banner.rs @@ -49,11 +49,12 @@ fn print_ascii_art() { fn status_info(config: &Config, scheme: &str, id: Uid) { let address = format!( - "\"{}://{}\" ({}), \":{}\" (gRPC)", + "\"{}://{}\" ({}), \":{}\" (livetail), \":{}\" (flight protocol)", scheme, config.parseable.address, scheme.to_ascii_uppercase(), - config.parseable.grpc_port + config.parseable.grpc_port, + config.parseable.flight_port ); let mut credentials = diff --git a/server/src/cli.rs b/server/src/cli.rs index 2ad9899cd..cd3f8cf7a 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -89,6 +89,9 @@ pub struct Cli { /// public address for the parseable server ingestor pub ingestor_endpoint: String, + + /// port use by airplane(flight query service) + pub flight_port: u16, } impl Cli { @@ -118,6 +121,7 @@ impl Cli { pub const INGESTOR_ENDPOINT: &'static str = "ingestor-endpoint"; pub const DEFAULT_USERNAME: &'static str = "admin"; pub const DEFAULT_PASSWORD: &'static str = "admin"; + pub const FLIGHT_PORT: &'static str = "flight-port"; pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_staging_path.join(stream_name) @@ -275,6 +279,16 @@ impl Cli { .value_parser(value_parser!(u16)) .help("Port for gRPC server"), ) + .arg( + Arg::new(Self::FLIGHT_PORT) + .long(Self::FLIGHT_PORT) + .env("P_FLIGHT_PORT") + .value_name("PORT") + .default_value("8002") + .required(false) + .value_parser(value_parser!(u16)) + .help("Port for Arrow Flight Querying Engine"), + ) .arg( Arg::new(Self::LIVETAIL_CAPACITY) .long(Self::LIVETAIL_CAPACITY) @@ -317,11 +331,11 @@ impl Cli { .help("Mode of operation"), ) .arg( - Arg::new(Self::INGESTOR_ENDPOINT) - .long(Self::INGESTOR_ENDPOINT) - .env("P_INGESTOR_ENDPOINT") - .value_name("URL") - .required(false) + Arg::new(Self::INGESTOR_ENDPOINT) + .long(Self::INGESTOR_ENDPOINT) + .env("P_INGESTOR_ENDPOINT") + .value_name("URL") + .required(false) .help("URL to connect to this specific ingestor. Default is the address of the server.") ) .arg( @@ -401,6 +415,10 @@ impl FromArgMatches for Cli { .get_one::(Self::GRPC_PORT) .cloned() .expect("default for livetail port"); + self.flight_port = m + .get_one::(Self::FLIGHT_PORT) + .cloned() + .expect("default for flight port"); self.livetail_channel_capacity = m .get_one::(Self::LIVETAIL_CAPACITY) .cloned() diff --git a/server/src/event.rs b/server/src/event.rs index eeb95e0b0..396a95a58 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -28,7 +28,7 @@ use std::sync::Arc; use self::error::EventError; pub use self::writer::STREAM_WRITERS; -use crate::metadata; +use crate::{handlers::http::ingest::PostError, metadata}; use chrono::NaiveDateTime; pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; @@ -48,7 +48,7 @@ pub struct Event { // Events holds the schema related to a each event for a single log stream impl Event { - pub async fn process(self) -> Result<(), EventError> { + pub async fn process(&self) -> Result<(), EventError> { let mut key = get_schema_key(&self.rb.schema().fields); if self.time_partition.is_some() { let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string(); @@ -77,7 +77,7 @@ impl Event { crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb); if let Err(e) = metadata::STREAM_INFO - .check_alerts(&self.stream_name, self.rb) + .check_alerts(&self.stream_name, &self.rb) .await { log::error!("Error checking for alerts. {:?}", e); @@ -86,6 +86,24 @@ impl Event { Ok(()) } + pub fn process_unchecked(self) -> Result { + let key = get_schema_key(&self.rb.schema().fields); + + Self::process_event( + &self.stream_name, + &key, + self.rb.clone(), + self.parsed_timestamp, + ) + .map_err(PostError::Event)?; + + Ok(self) + } + + pub fn clear(&self, stream_name: &str) { + STREAM_WRITERS.clear(stream_name); + } + // event process all events after the 1st event. Concatenates record batches // and puts them in memory store for each event. fn process_event( diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 737a0a514..ce0bf4372 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -22,11 +22,15 @@ mod mem_writer; use std::{ collections::HashMap, - sync::{Arc, Mutex, RwLock}, + sync::{Arc, Mutex, RwLock, RwLockWriteGuard}, +}; + +use crate::{ + option::{Mode, CONFIG}, + utils, }; use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter}; -use crate::utils; use arrow_array::{RecordBatch, TimestampMillisecondArray}; use arrow_schema::Schema; use chrono::NaiveDateTime; @@ -62,6 +66,11 @@ impl Writer { self.mem.push(schema_key, rb); Ok(()) } + + fn push_mem(&mut self, schema_key: &str, rb: RecordBatch) -> Result<(), StreamWriterError> { + self.mem.push(schema_key, rb); + Ok(()) + } } #[derive(Deref, DerefMut, Default)] @@ -80,7 +89,8 @@ impl WriterTable { match hashmap_guard.get(stream_name) { Some(stream_writer) => { - stream_writer.lock().unwrap().push( + self.handle_existing_writer( + stream_writer, stream_name, schema_key, record, @@ -89,10 +99,51 @@ impl WriterTable { } None => { drop(hashmap_guard); - let mut map = self.write().unwrap(); + let map = self.write().unwrap(); // check for race condition // if map contains entry then just - if let Some(writer) = map.get(stream_name) { + self.handle_missing_writer(map, stream_name, schema_key, record, parsed_timestamp)?; + } + }; + Ok(()) + } + + fn handle_existing_writer( + &self, + stream_writer: &Mutex, + stream_name: &str, + schema_key: &str, + record: RecordBatch, + parsed_timestamp: NaiveDateTime, + ) -> Result<(), StreamWriterError> { + if CONFIG.parseable.mode != Mode::Query { + stream_writer.lock().unwrap().push( + stream_name, + schema_key, + record, + parsed_timestamp, + )?; + } else { + stream_writer + .lock() + .unwrap() + .push_mem(stream_name, record)?; + } + + Ok(()) + } + + fn handle_missing_writer( + &self, + mut map: RwLockWriteGuard>>, + stream_name: &str, + schema_key: &str, + record: RecordBatch, + parsed_timestamp: NaiveDateTime, + ) -> Result<(), StreamWriterError> { + match map.get(stream_name) { + Some(writer) => { + if CONFIG.parseable.mode != Mode::Query { writer.lock().unwrap().push( stream_name, schema_key, @@ -100,15 +151,32 @@ impl WriterTable { parsed_timestamp, )?; } else { + writer.lock().unwrap().push_mem(stream_name, record)?; + } + } + None => { + if CONFIG.parseable.mode != Mode::Query { let mut writer = Writer::default(); writer.push(stream_name, schema_key, record, parsed_timestamp)?; map.insert(stream_name.to_owned(), Mutex::new(writer)); + } else { + let mut writer = Writer::default(); + writer.push_mem(schema_key, record)?; + map.insert(stream_name.to_owned(), Mutex::new(writer)); } } - }; + } Ok(()) } + pub fn clear(&self, stream_name: &str) { + let map = self.write().unwrap(); + if let Some(writer) = map.get(stream_name) { + let w = &mut writer.lock().unwrap().mem; + w.clear(); + } + } + pub fn delete_stream(&self, stream_name: &str) { self.write().unwrap().remove(stream_name); } diff --git a/server/src/event/writer/file_writer.rs b/server/src/event/writer/file_writer.rs index 1b193eb4c..db729b8d1 100644 --- a/server/src/event/writer/file_writer.rs +++ b/server/src/event/writer/file_writer.rs @@ -29,6 +29,7 @@ use crate::storage::staging::StorageDir; use chrono::NaiveDateTime; pub struct ArrowWriter { + #[allow(dead_code)] pub file_path: PathBuf, pub writer: StreamWriter, } diff --git a/server/src/event/writer/mem_writer.rs b/server/src/event/writer/mem_writer.rs index 1f5ce4532..561f2c4e5 100644 --- a/server/src/event/writer/mem_writer.rs +++ b/server/src/event/writer/mem_writer.rs @@ -34,8 +34,8 @@ pub struct MemWriter { schema: Schema, // for checking uniqueness of schema schema_map: HashSet, - read_buffer: Vec, - mutable_buffer: MutableBuffer, + pub read_buffer: Vec, + pub mutable_buffer: MutableBuffer, } impl Default for MemWriter { @@ -62,6 +62,14 @@ impl MemWriter { } } + pub fn clear(&mut self) { + self.schema = Schema::empty(); + self.schema_map.clear(); + self.read_buffer.clear(); + self.mutable_buffer.inner.clear(); + self.mutable_buffer.rows = 0; + } + pub fn recordbatch_cloned(&self, schema: &Arc) -> Vec { let mut read_buffer = self.read_buffer.clone(); if self.mutable_buffer.rows > 0 { @@ -83,7 +91,7 @@ fn concat_records(schema: &Arc, record: &[RecordBatch]) -> RecordBatch { } #[derive(Debug, Default)] -struct MutableBuffer { +pub struct MutableBuffer { pub inner: Vec, pub rows: usize, } diff --git a/server/src/handlers.rs b/server/src/handlers.rs index d610011cf..aa227122b 100644 --- a/server/src/handlers.rs +++ b/server/src/handlers.rs @@ -16,6 +16,7 @@ * */ +pub mod airplane; pub mod http; pub mod livetail; diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs new file mode 100644 index 000000000..8fd83f35d --- /dev/null +++ b/server/src/handlers/airplane.rs @@ -0,0 +1,340 @@ +use arrow_array::RecordBatch; +use arrow_flight::encode::FlightDataEncoderBuilder; +use arrow_flight::flight_service_server::FlightServiceServer; +use arrow_flight::PollInfo; +use arrow_schema::ArrowError; + +use datafusion::common::tree_node::TreeNode; +use serde_json::json; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Instant; +use tonic::codec::CompressionEncoding; + +use futures_util::{Future, TryFutureExt}; + +use tonic::transport::{Identity, Server, ServerTlsConfig}; +use tonic_web::GrpcWebLayer; + +use crate::event::commit_schema; +use crate::handlers::http::cluster::get_ingestor_info; +use crate::handlers::http::fetch_schema; + +use crate::metrics::QUERY_EXECUTE_TIME; +use crate::option::{Mode, CONFIG}; + +use crate::handlers::livetail::cross_origin_config; + +use crate::handlers::http::query::{authorize_and_set_filter_tags, into_query}; +use crate::query::{TableScanVisitor, QUERY_SESSION}; +use crate::storage::object_storage::commit_schema_to_storage; +use crate::utils::arrow::flight::{ + append_temporary_events, get_query_from_ticket, run_do_get_rpc, send_to_ingester, +}; +use arrow_flight::{ + flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData, + FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, + SchemaResult, Ticket, +}; +use arrow_ipc::writer::IpcWriteOptions; +use futures::{stream, TryStreamExt}; +use tonic::{Request, Response, Status, Streaming}; + +use crate::handlers::livetail::extract_session_key; +use crate::metadata::STREAM_INFO; +use crate::rbac::Users; + +#[derive(Clone, Debug)] +pub struct AirServiceImpl {} + +#[tonic::async_trait] +impl FlightService for AirServiceImpl { + type HandshakeStream = stream::BoxStream<'static, Result>; + type ListFlightsStream = stream::BoxStream<'static, Result>; + type DoGetStream = stream::BoxStream<'static, Result>; + type DoPutStream = stream::BoxStream<'static, Result>; + type DoActionStream = stream::BoxStream<'static, Result>; + type ListActionsStream = stream::BoxStream<'static, Result>; + type DoExchangeStream = stream::BoxStream<'static, Result>; + + async fn handshake( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented( + "handshake is disabled in favour of direct authentication and authorization", + )) + } + + /// list_flights is an operation that allows a client + /// to query a Flight server for information + /// about available datasets or "flights" that the server can provide. + async fn list_flights( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Implement list_flights")) + } + + async fn poll_flight_info( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Implement poll_flight_info")) + } + + async fn get_flight_info( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Implement get_flight_info")) + } + + async fn get_schema( + &self, + request: Request, + ) -> Result, Status> { + let table_name = request.into_inner().path; + let table_name = table_name[0].clone(); + + let schema = STREAM_INFO + .schema(&table_name) + .map_err(|err| Status::failed_precondition(err.to_string()))?; + + let options = IpcWriteOptions::default(); + let schema_result = SchemaAsIpc::new(&schema, &options) + .try_into() + .map_err(|err: ArrowError| Status::internal(err.to_string()))?; + + Ok(Response::new(schema_result)) + } + + async fn do_get(&self, req: Request) -> Result, Status> { + let key = extract_session_key(req.metadata())?; + + let ticket = get_query_from_ticket(req)?; + + log::info!("query requested to airplane: {:?}", ticket); + + // get the query session_state + let session_state = QUERY_SESSION.state(); + + // get the logical plan and extract the table name + let raw_logical_plan = session_state + .create_logical_plan(&ticket.query) + .await + .map_err(|err| { + log::error!("Datafusion Error: Failed to create logical plan: {}", err); + Status::internal("Failed to create logical plan") + })?; + + // create a visitor to extract the table name + let mut visitor = TableScanVisitor::default(); + let _ = raw_logical_plan.visit(&mut visitor); + + let tables = visitor.into_inner(); + + if CONFIG.parseable.mode == Mode::Query { + // using http to get the schema. may update to use flight later + for table in tables { + if let Ok(new_schema) = fetch_schema(&table).await { + // commit schema merges the schema internally and updates the schema in storage. + commit_schema_to_storage(&table, new_schema.clone()) + .await + .map_err(|err| Status::internal(err.to_string()))?; + commit_schema(&table, Arc::new(new_schema)) + .map_err(|err| Status::internal(err.to_string()))?; + } + } + } + + // map payload to query + let mut query = into_query(&ticket, &session_state) + .await + .map_err(|_| Status::internal("Failed to parse query"))?; + + // if table name is not present it is a Malformed Query + let stream_name = query + .first_table_name() + .ok_or_else(|| Status::invalid_argument("Malformed Query"))?; + + let event = + if send_to_ingester(query.start.timestamp_millis(), query.end.timestamp_millis()) { + let sql = format!("select * from {}", &stream_name); + let start_time = ticket.start_time.clone(); + let end_time = ticket.end_time.clone(); + let out_ticket = json!({ + "query": sql, + "startTime": start_time, + "endTime": end_time + }) + .to_string(); + + let ingester_metadatas = get_ingestor_info() + .await + .map_err(|err| Status::failed_precondition(err.to_string()))?; + let mut minute_result: Vec = vec![]; + + for im in ingester_metadatas { + if let Ok(mut batches) = run_do_get_rpc(im, out_ticket.clone()).await { + minute_result.append(&mut batches); + } + } + let mr = minute_result.iter().collect::>(); + let event = append_temporary_events(&stream_name, mr).await?; + Some(event) + } else { + None + }; + let permissions = Users.get_permissions(&key); + + authorize_and_set_filter_tags(&mut query, permissions, &stream_name).map_err(|_| { + Status::permission_denied("User Does not have permission to access this") + })?; + let time = Instant::now(); + let (results, _) = query + .execute(stream_name.clone()) + .await + .map_err(|err| Status::internal(err.to_string()))?; + + /* + * INFO: No returning the schema with the data. + * kept it in case it needs to be sent in the future. + + let schemas = results + .iter() + .map(|batch| batch.schema()) + .map(|s| s.as_ref().clone()) + .collect::>(); + let schema = Schema::try_merge(schemas).map_err(|err| Status::internal(err.to_string()))?; + */ + let input_stream = futures::stream::iter(results.into_iter().map(Ok)); + let write_options = IpcWriteOptions::default() + .try_with_compression(Some(arrow_ipc::CompressionType(1))) + .map_err(|err| Status::failed_precondition(err.to_string()))?; + + let flight_data_stream = FlightDataEncoderBuilder::new() + .with_max_flight_data_size(usize::MAX) + .with_options(write_options) + // .with_schema(schema.into()) + .build(input_stream); + + let flight_data_stream = flight_data_stream.map_err(|err| Status::unknown(err.to_string())); + + if let Some(event) = event { + event.clear(&stream_name); + } + + let time = time.elapsed().as_secs_f64(); + QUERY_EXECUTE_TIME + .with_label_values(&[&format!("flight-query-{}", stream_name)]) + .observe(time); + + Ok(Response::new( + Box::pin(flight_data_stream) as Self::DoGetStream + )) + } + + async fn do_put( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented( + "do_put not implemented because we are only using flight for querying", + )) + } + + async fn do_action( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented( + "do_action not implemented because we are only using flight for querying", + )) + } + + async fn list_actions( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented( + "list_actions not implemented because we are only using flight for querying", + )) + } + + async fn do_exchange( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented( + "do_exchange not implemented because we are only using flight for querying", + )) + } +} + +pub fn server() -> impl Future>> + Send { + let mut addr: SocketAddr = CONFIG + .parseable + .address + .parse() + .expect("valid socket address"); + addr.set_port(CONFIG.parseable.flight_port); + + let service = AirServiceImpl {}; + + let svc = FlightServiceServer::new(service) + .max_encoding_message_size(usize::MAX) + .max_decoding_message_size(usize::MAX) + .send_compressed(CompressionEncoding::Zstd) + .accept_compressed(CompressionEncoding::Zstd); + + let cors = cross_origin_config(); + + let identity = match ( + &CONFIG.parseable.tls_cert_path, + &CONFIG.parseable.tls_key_path, + ) { + (Some(cert), Some(key)) => { + match (std::fs::read_to_string(cert), std::fs::read_to_string(key)) { + (Ok(cert_file), Ok(key_file)) => { + let identity = Identity::from_pem(cert_file, key_file); + Some(identity) + } + _ => None, + } + } + (_, _) => None, + }; + + let config = identity.map(|id| ServerTlsConfig::new().identity(id)); + + // rust is treating closures as different types + let err_map_fn = |err| Box::new(err) as Box; + + // match on config to decide if we want to use tls or not + match config { + Some(config) => { + let server = match Server::builder().tls_config(config) { + Ok(server) => server, + Err(_) => Server::builder(), + }; + + server + .max_frame_size(16 * 1024 * 1024 - 2) + .accept_http1(true) + .layer(cors) + .layer(GrpcWebLayer::new()) + .add_service(svc) + .serve(addr) + .map_err(err_map_fn) + } + None => Server::builder() + .max_frame_size(16 * 1024 * 1024 - 2) + .accept_http1(true) + .layer(cors) + .layer(GrpcWebLayer::new()) + .add_service(svc) + .serve(addr) + .map_err(err_map_fn), + } +} diff --git a/server/src/handlers/http/cluster/utils.rs b/server/src/handlers/http/cluster/utils.rs index 1caf9cd65..cea27bb04 100644 --- a/server/src/handlers/http/cluster/utils.rs +++ b/server/src/handlers/http/cluster/utils.rs @@ -16,7 +16,9 @@ * */ -use crate::handlers::http::{logstream::error::StreamError, modal::IngestorMetadata}; +use crate::handlers::http::{ + base_path_without_preceding_slash, logstream::error::StreamError, modal::IngestorMetadata, +}; use actix_web::http::header; use chrono::{DateTime, Utc}; use http::StatusCode; @@ -161,7 +163,11 @@ pub fn merge_quried_stats(stats: Vec) -> QueriedStats { } pub async fn check_liveness(domain_name: &str) -> bool { - let uri = match Url::parse(&format!("{}liveness", domain_name)) { + let uri = match Url::parse(&format!( + "{}{}/liveness", + domain_name, + base_path_without_preceding_slash() + )) { Ok(uri) => uri, Err(err) => { log::error!("Node Indentifier Failed To Parse: {}", err); diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 7532ad59e..3926e3d7b 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -33,6 +33,7 @@ use crate::storage::{LogStream, ObjectStorageError}; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; use crate::utils::json::convert_array_to_object; use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; +use arrow_array::RecordBatch; use arrow_schema::{Field, Schema}; use bytes::Bytes; use chrono::{DateTime, Utc}; @@ -40,6 +41,7 @@ use http::StatusCode; use serde_json::Value; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; + // Handler for POST /api/v1/ingest // ingests events by extracting stream name from header // creates if stream does not exist @@ -96,6 +98,22 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result Result { + event::Event { + rb: batches, + stream_name: stream_name.to_string(), + origin_format: "json", + origin_size: 0, + parsed_timestamp: Utc::now().naive_utc(), + time_partition: None, + is_first_event: true, // NOTE: Maybe should be false + } + .process_unchecked() +} + async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result<(), PostError> { let glob_storage = CONFIG.storage().get_object_store(); let object_store_format = glob_storage diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index c230981af..2ce245949 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -18,11 +18,13 @@ use crate::analytics; use crate::banner; +use crate::handlers::airplane; use crate::handlers::http::logstream; use crate::handlers::http::middleware::RouteExt; use crate::localcache::LocalCacheManager; use crate::metadata; use crate::metrics; +use crate::migration::metadata_migration::migrate_ingester_metadata; use crate::rbac; use crate::rbac::role::Action; use crate::storage; @@ -38,6 +40,10 @@ use super::IngestorMetadata; use super::OpenIdClient; use super::ParseableServer; +use crate::{ + handlers::http::{base_path, cross_origin_config}, + option::CONFIG, +}; use actix_web::body::MessageBody; use actix_web::Scope; use actix_web::{web, App, HttpServer}; @@ -49,14 +55,9 @@ use itertools::Itertools; use once_cell::sync::Lazy; use relative_path::RelativePathBuf; -use crate::{ - handlers::http::{base_path, cross_origin_config}, - option::CONFIG, -}; - /// ! have to use a guard before using it pub static INGESTOR_META: Lazy = - Lazy::new(|| staging::get_ingestor_info().expect("dir is readable and writeable")); + Lazy::new(|| staging::get_ingestor_info().expect("Should Be valid Json")); #[derive(Default)] pub struct IngestServer; @@ -113,6 +114,7 @@ impl ParseableServer for IngestServer { self.validate_credentials().await?; let metadata = storage::resolve_parseable_metadata().await?; + banner::print(&CONFIG, &metadata).await; rbac::map::init(&metadata); // set the info in the global metadata @@ -214,6 +216,7 @@ impl IngestServer { // create the ingestor metadata and put the .ingestor.json file in the object store async fn set_ingestor_metadata(&self) -> anyhow::Result<()> { + migrate_ingester_metadata().await?; let store = CONFIG.storage().get_object_store(); // find the meta file in staging if not generate new metadata @@ -336,7 +339,10 @@ impl IngestServer { let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = sync::object_store_sync(); + tokio::spawn(airplane::server()); + let app = self.start(prometheus, CONFIG.parseable.openid.clone()); + tokio::pin!(app); loop { tokio::select! { diff --git a/server/src/handlers/http/modal/mod.rs b/server/src/handlers/http/modal/mod.rs index edd7bd3c3..8af1119e3 100644 --- a/server/src/handlers/http/modal/mod.rs +++ b/server/src/handlers/http/modal/mod.rs @@ -62,9 +62,11 @@ pub struct IngestorMetadata { pub bucket_name: String, pub token: String, pub ingestor_id: String, + pub flight_port: String, } impl IngestorMetadata { + #[allow(clippy::too_many_arguments)] pub fn new( port: String, domain_name: String, @@ -73,6 +75,7 @@ impl IngestorMetadata { username: &str, password: &str, ingestor_id: String, + flight_port: String, ) -> Self { let token = base64::prelude::BASE64_STANDARD.encode(format!("{}:{}", username, password)); @@ -85,6 +88,7 @@ impl IngestorMetadata { bucket_name, token, ingestor_id, + flight_port, } } @@ -110,9 +114,10 @@ mod test { "admin", "admin", "ingestor_id".to_string(), + "8002".to_string(), ); - let rhs = serde_json::from_slice::(br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=", "ingestor_id": "ingestor_id"}"#).unwrap(); + let rhs = serde_json::from_slice::(br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=", "ingestor_id": "ingestor_id","flight_port": "8002"}"#).unwrap(); assert_eq!(rhs, lhs); } @@ -127,13 +132,14 @@ mod test { "admin", "admin", "ingestor_id".to_string(), + "8002".to_string(), ); let lhs = serde_json::to_string(&im) .unwrap() .try_into_bytes() .unwrap(); - let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id"}"# + let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id","flight_port":"8002"}"# .try_into_bytes() .unwrap(); diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 1312b407f..607cc5653 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -16,6 +16,7 @@ * */ +use crate::handlers::airplane; use crate::handlers::http::cluster; use crate::handlers::http::middleware::RouteExt; use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; @@ -183,6 +184,8 @@ impl QueryServer { analytics::init_analytics_scheduler()?; } + tokio::spawn(airplane::server()); + self.start(prometheus, CONFIG.parseable.openid.clone()) .await?; diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 72d377139..165581234 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -416,6 +416,7 @@ impl Server { } tokio::spawn(handlers::livetail::server()); + tokio::spawn(handlers::airplane::server()); let app = self.start(prometheus, CONFIG.parseable.openid.clone()); diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 96e0766a9..534db8075 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -50,15 +50,15 @@ use crate::utils::actix::extract_session_key_from_req; #[derive(Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct Query { - query: String, - start_time: String, - end_time: String, + pub query: String, + pub start_time: String, + pub end_time: String, #[serde(default)] - send_null: bool, + pub send_null: bool, #[serde(skip)] - fields: bool, + pub fields: bool, #[serde(skip)] - filter_tags: Option>, + pub filter_tags: Option>, } pub async fn query(req: HttpRequest, query_request: Query) -> Result { @@ -99,6 +99,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result, table_name: &str, @@ -183,7 +184,7 @@ impl FromRequest for Query { } } -async fn into_query( +pub async fn into_query( query: &Query, session_state: &SessionState, ) -> Result { diff --git a/server/src/handlers/livetail.rs b/server/src/handlers/livetail.rs index 76b6f8005..3de8426e7 100644 --- a/server/src/handlers/livetail.rs +++ b/server/src/handlers/livetail.rs @@ -231,7 +231,7 @@ pub fn server() -> impl Future Result<&str, Status> { +pub fn extract_stream(body: &serde_json::Value) -> Result<&str, Status> { body.as_object() .ok_or(Status::invalid_argument("expected object in request body"))? .get("stream") @@ -240,7 +240,7 @@ fn extract_stream(body: &serde_json::Value) -> Result<&str, Status> { .ok_or(Status::invalid_argument("stream key value is invalid")) } -fn extract_session_key(headers: &MetadataMap) -> Result { +pub fn extract_session_key(headers: &MetadataMap) -> Result { // Extract username and password from the request using basic auth extractor. let basic = extract_basic_auth(headers).map(|creds| SessionKey::BasicAuth { username: creds.user_id, @@ -286,6 +286,7 @@ fn extract_cookie(header: &MetadataMap) -> Option { .find(|cookie| cookie.name() == SESSION_COOKIE_NAME) } -fn cross_origin_config() -> CorsLayer { +#[inline(always)] +pub fn cross_origin_config() -> CorsLayer { CorsLayer::very_permissive().allow_credentials(true) } diff --git a/server/src/main.rs b/server/src/main.rs index 04d6ed8b7..95cbcb919 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -46,11 +46,8 @@ use std::sync::Arc; use handlers::http::modal::ParseableServer; use option::{Mode, CONFIG}; -use crate::{ - handlers::http::modal::{ - ingest_server::IngestServer, query_server::QueryServer, server::Server, - }, - // localcache::LocalCacheManager, +use crate::handlers::http::modal::{ + ingest_server::IngestServer, query_server::QueryServer, server::Server, }; pub const STORAGE_UPLOAD_INTERVAL: u32 = 60; diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 4140f5156..ce8a9dad2 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -63,7 +63,7 @@ impl StreamInfo { pub async fn check_alerts( &self, stream_name: &str, - rb: RecordBatch, + rb: &RecordBatch, ) -> Result<(), CheckAlertError> { let map = self.read().expect(LOCK_EXPECT); let meta = map diff --git a/server/src/migration.rs b/server/src/migration.rs index 9e7e9a3db..d9c15fc4c 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -17,7 +17,7 @@ * */ -mod metadata_migration; +pub mod metadata_migration; mod schema_migration; mod stream_metadata_migration; diff --git a/server/src/migration/metadata_migration.rs b/server/src/migration/metadata_migration.rs index cbeee200a..72bab9db6 100644 --- a/server/src/migration/metadata_migration.rs +++ b/server/src/migration/metadata_migration.rs @@ -19,7 +19,12 @@ use rand::distributions::DistString; use serde_json::{Map, Value as JsonValue}; -use crate::option::CONFIG; +use crate::{ + handlers::http::modal::IngestorMetadata, + option::CONFIG, + storage::{object_storage::ingestor_metadata_path, staging}, +}; +use actix_web::body::MessageBody; /* v1 @@ -118,3 +123,40 @@ pub fn update_v3(mut storage_metadata: JsonValue) -> JsonValue { storage_metadata } + +pub async fn migrate_ingester_metadata() -> anyhow::Result<()> { + let imp = ingestor_metadata_path(None); + let bytes = match CONFIG.storage().get_object_store().get_object(&imp).await { + Ok(bytes) => bytes, + Err(_) => { + log::debug!("No metadata found for ingester. So migration is not required"); + return Ok(()); + } + }; + let mut json = serde_json::from_slice::(&bytes)?; + let meta = json + .as_object_mut() + .ok_or_else(|| anyhow::anyhow!("Unable to parse Ingester Metadata"))?; + let fp = meta.get("flight_port"); + + if fp.is_none() { + meta.insert( + "flight_port".to_owned(), + JsonValue::String(CONFIG.parseable.flight_port.to_string()), + ); + } + let bytes = serde_json::to_string(&json)? + .try_into_bytes() + .map_err(|err| anyhow::anyhow!(err))?; + + let resource: IngestorMetadata = serde_json::from_value(json)?; + staging::put_ingestor_info(resource.clone())?; + + CONFIG + .storage() + .get_object_store() + .put_object(&imp, bytes) + .await?; + + Ok(()) +} diff --git a/server/src/option.rs b/server/src/option.rs index d11df0805..e607c2062 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -44,8 +44,11 @@ impl Config { fn new() -> Self { let cli = create_parseable_cli_command() .name("Parseable") - .about("A Cloud Native, log analytics platform") - .before_help("Log Lake for the cloud-native world") + .about( + r#"A Cloud Native, log analytics platform +Log Lake for the cloud-native world +"#, + ) .arg_required_else_help(true) .subcommand_required(true) .color(clap::ColorChoice::Always) @@ -192,11 +195,10 @@ fn create_parseable_cli_command() -> Command { command!() .name("Parseable") .bin_name("parseable") - .about("Parseable is a log storage and observability platform.") .propagate_version(true) .next_line_help(false) .help_template( - r#" + r#"{name} v{version} {about} Join the community at https://logg.ing/community. diff --git a/server/src/query.rs b/server/src/query.rs index aaac8d1cf..ce80ba2e1 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -18,7 +18,7 @@ mod filter_optimizer; mod listing_table_builder; -mod stream_schema_provider; +pub mod stream_schema_provider; use chrono::{DateTime, Utc}; use chrono::{NaiveDateTime, TimeZone}; diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index 43e58ce3a..ef0eb69ba 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -607,7 +607,7 @@ fn is_overlapping_query( false } -fn include_now(filters: &[Expr], time_partition: Option) -> bool { +pub fn include_now(filters: &[Expr], time_partition: Option) -> bool { let current_minute = Utc::now() .with_second(0) .and_then(|x| x.with_nanosecond(0)) diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index b0846d0d8..c8871536e 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -536,7 +536,7 @@ pub async fn commit_schema_to_storage( } #[inline(always)] -fn to_bytes(any: &(impl ?Sized + serde::Serialize)) -> Bytes { +pub fn to_bytes(any: &(impl ?Sized + serde::Serialize)) -> Bytes { serde_json::to_vec(any) .map(|any| any.into()) .expect("serialize cannot fail") diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index abeac7062..bfd51a503 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -17,14 +17,6 @@ * */ -use std::{ - collections::HashMap, - fs, - path::{Path, PathBuf}, - process, - sync::Arc, -}; - use crate::{ event::DEFAULT_TIMESTAMP_KEY, handlers::http::modal::{ingest_server::INGESTOR_META, IngestorMetadata, DEFAULT_VERSION}, @@ -36,6 +28,7 @@ use crate::{ hostname_unchecked, }, }; +use anyhow::anyhow; use arrow_schema::{ArrowError, Schema}; use base64::Engine; use chrono::{NaiveDateTime, Timelike}; @@ -48,6 +41,14 @@ use parquet::{ schema::types::ColumnPath, }; use rand::distributions::DistString; +use serde_json::Value as JsonValue; +use std::{ + collections::HashMap, + fs, + path::{Path, PathBuf}, + process, + sync::Arc, +}; const ARROW_FILE_EXTENSION: &str = "data.arrows"; const PARQUET_FILE_EXTENSION: &str = "data.parquet"; @@ -332,7 +333,21 @@ pub fn get_ingestor_info() -> anyhow::Result { if flag { // get the ingestor metadata from staging - let mut meta: IngestorMetadata = serde_json::from_slice(&std::fs::read(path)?)?; + let mut meta: JsonValue = serde_json::from_slice(&std::fs::read(path)?)?; + + // migrate the staging meta + let obj = meta + .as_object_mut() + .ok_or_else(|| anyhow!("Could Not parse Ingestor Metadata Json"))?; + + if obj.get("flight_port").is_none() { + obj.insert( + "flight_port".to_owned(), + JsonValue::String(CONFIG.parseable.flight_port.to_string()), + ); + } + + let mut meta: IngestorMetadata = serde_json::from_value(meta)?; // compare url endpoint and port if meta.domain_name != url { @@ -380,6 +395,7 @@ pub fn get_ingestor_info() -> anyhow::Result { &CONFIG.parseable.username, &CONFIG.parseable.password, get_ingestor_id(), + CONFIG.parseable.flight_port.to_string(), ); put_ingestor_info(out.clone())?; @@ -392,7 +408,7 @@ pub fn get_ingestor_info() -> anyhow::Result { /// # Parameters /// /// * `ingestor_info`: The ingestor info to be stored. -fn put_ingestor_info(info: IngestorMetadata) -> anyhow::Result<()> { +pub fn put_ingestor_info(info: IngestorMetadata) -> anyhow::Result<()> { let path = PathBuf::from(&CONFIG.parseable.local_staging_path); let file_name = format!("ingestor.{}.json", info.ingestor_id); let file_path = path.join(file_name); diff --git a/server/src/utils/arrow.rs b/server/src/utils/arrow.rs index 945b637fe..4bb9cde93 100644 --- a/server/src/utils/arrow.rs +++ b/server/src/utils/arrow.rs @@ -24,6 +24,7 @@ use arrow_schema::Schema; use itertools::Itertools; pub mod batch_adapter; +pub mod flight; pub mod merged_reader; pub mod reverse_reader; @@ -32,6 +33,28 @@ pub use batch_adapter::adapt_batch; pub use merged_reader::MergedRecordReader; use serde_json::{Map, Value}; +/// example function for concat recordbatch(may not work) +/// use arrow::record_batch::RecordBatch; +/// use arrow::error::Result; +/// +/// fn concat_batches(batch1: RecordBatch, batch2: RecordBatch) -> Result { +/// let schema = batch1.schema(); +/// let columns = schema +/// .fields() +/// .iter() +/// .enumerate() +/// .map(|(i, _)| -> Result<_> { +/// let array1 = batch1.column(i); +/// let array2 = batch2.column(i); +/// let array = arrow::compute::concat(&[array1.as_ref(), array2.as_ref()])?; +/// Ok(array) +/// }) +/// .collect::>>()?; +/// +/// RecordBatch::try_new(schema.clone(), columns) +/// } +/// + /// Replaces columns in a record batch with new arrays. /// /// # Arguments diff --git a/server/src/utils/arrow/flight.rs b/server/src/utils/arrow/flight.rs new file mode 100644 index 000000000..113407c9d --- /dev/null +++ b/server/src/utils/arrow/flight.rs @@ -0,0 +1,121 @@ +use crate::event::Event; +use crate::handlers::http::ingest::push_logs_unchecked; +use crate::handlers::http::query::Query as QueryJson; +use crate::metadata::STREAM_INFO; +use crate::query::stream_schema_provider::include_now; +use crate::{ + handlers::http::modal::IngestorMetadata, + option::{Mode, CONFIG}, +}; + +use arrow_array::RecordBatch; +use arrow_flight::Ticket; +use arrow_select::concat::concat_batches; +use datafusion::logical_expr::BinaryExpr; +use datafusion::prelude::Expr; +use datafusion::scalar::ScalarValue; +use futures::TryStreamExt; + +use tonic::{Request, Status}; + +use arrow_flight::FlightClient; +use http::Uri; +use tonic::transport::Channel; + +pub fn get_query_from_ticket(req: Request) -> Result { + serde_json::from_slice::(&req.into_inner().ticket) + .map_err(|err| Status::internal(err.to_string())) +} + +pub async fn run_do_get_rpc( + im: IngestorMetadata, + ticket: String, +) -> Result, Status> { + let url = im + .domain_name + .rsplit_once(':') + .ok_or(Status::failed_precondition( + "Ingester metadata is courupted", + ))? + .0; + let url = format!("{}:{}", url, im.flight_port); + let url = url + .parse::() + .map_err(|_| Status::failed_precondition("Ingester metadata is courupted"))?; + let channel = Channel::builder(url) + .connect() + .await + .map_err(|err| Status::failed_precondition(err.to_string()))?; + + let client = FlightClient::new(channel); + let inn = client + .into_inner() + .accept_compressed(tonic::codec::CompressionEncoding::Gzip) + .max_decoding_message_size(usize::MAX) + .max_encoding_message_size(usize::MAX); + + let mut client = FlightClient::new_from_inner(inn); + + client.add_header("authorization", &im.token)?; + + let response = client + .do_get(Ticket { + ticket: ticket.into(), + }) + .await?; + + Ok(response.try_collect().await?) +} + +/// all the records from the ingesters are concatinated into one event and pushed to memory +pub async fn append_temporary_events( + stream_name: &str, + minute_result: Vec<&RecordBatch>, +) -> Result< + //Vec + Event, + Status, +> { + let schema = STREAM_INFO + .schema(stream_name) + .map_err(|err| Status::failed_precondition(format!("Metadata Error: {}", err)))?; + let rb = concat_batches(&schema, minute_result) + .map_err(|err| Status::failed_precondition(format!("ArrowError: {}", err)))?; + + let event = push_logs_unchecked(rb, stream_name) + .await + .map_err(|err| Status::internal(err.to_string()))?; + Ok(event) +} + +pub fn send_to_ingester(start: i64, end: i64) -> bool { + let filter_start = lit_timestamp_milli( + start, //query.start.timestamp_millis() + ); + let filter_end = lit_timestamp_milli( + end, //query.end.timestamp_millis() + ); + + let expr_left = Expr::Column(datafusion::common::Column { + relation: None, + name: "p_timestamp".to_owned(), + }); + + let ex1 = BinaryExpr::new( + Box::new(expr_left.clone()), + datafusion::logical_expr::Operator::Gt, + Box::new(filter_start), + ); + let ex2 = BinaryExpr::new( + Box::new(expr_left), + datafusion::logical_expr::Operator::Lt, + Box::new(filter_end), + ); + let ex = [Expr::BinaryExpr(ex1), Expr::BinaryExpr(ex2)]; + + CONFIG.parseable.mode == Mode::Query && include_now(&ex, None) +} + +fn lit_timestamp_milli(time: i64) -> Expr { + Expr::Literal(ScalarValue::TimestampMillisecond(Some(time), None)) +}