Skip to content

Commit a5dd462

Browse files
committed
crate api-sessions & api-snowflake-rest
1 parent 0d2a7a4 commit a5dd462

File tree

15 files changed

+280
-245
lines changed

15 files changed

+280
-245
lines changed

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ members = [
1111
"crates/core-executor",
1212
"crates/core-history",
1313
"crates/core-metastore",
14-
"crates/core-utils",
14+
"crates/core-utils", "crates/api-sessions",
1515
]
1616
resolver = "2"
1717
package.license-file = "LICENSE"
@@ -33,6 +33,8 @@ uuid = { version = "1.10.0", features = ["v4", "serde"] }
3333
axum = { version = "0.8.1", features = ["multipart", "macros"] }
3434
axum-macros = "0.5"
3535
dashmap = "6.1.0"
36+
regex = "1.11"
37+
indexmap = "2.7.1"
3638
url = "2.5"
3739
tokio = { version = "1", features = ["full"] }
3840
async-trait = { version = "0.1.84" }

crates/api-sessions/Cargo.toml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
[package]
2+
name = "api-sessions"
3+
version = "0.1.0"
4+
edition = "2024"
5+
license-file.workspace = true
6+
7+
[dependencies]
8+
core-executor = { path = "../core-executor" }
9+
10+
async-trait = { workspace = true }
11+
axum = { workspace = true }
12+
tower-sessions = "0.14"
13+
tokio = { workspace = true }
14+
time = "0.3.37"
15+
http = { workspace = true }
16+
serde = { workspace = true }
17+
serde_json = { workspace = true }
18+
snafu = { workspace = true }
19+
tracing = { workspace = true }
20+
uuid = { workspace = true }
21+
22+
[lints]
23+
workspace = true

crates/api-sessions/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub mod session;
2+
pub use crate::session::{DFSessionId, RequestSessionMemory, RequestSessionStore};

crates/runtime/src/http/session.rs renamed to crates/api-sessions/src/session.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
1-
use crate::http::error::ErrorResponse;
2-
use axum::{extract::FromRequestParts, response::IntoResponse, Json};
1+
use axum::{Json, extract::FromRequestParts, response::IntoResponse};
32
use http::request::Parts;
4-
use snafu::prelude::*;
3+
use serde::{Deserialize, Serialize};
54
use snafu::ResultExt;
5+
use snafu::prelude::*;
66
use std::{collections::HashMap, sync::Arc};
77
use time::OffsetDateTime;
88
use tokio::sync::Mutex;
99
use tower_sessions::{
10+
ExpiredDeletion, Session, SessionStore,
1011
session::{Id, Record},
11-
session_store, ExpiredDeletion, Session, SessionStore,
12+
session_store,
1213
};
1314

14-
use crate::execution::service::ExecutionService;
15+
use core_executor::service::ExecutionService;
16+
use uuid;
1517

1618
pub type RequestSessionMemory = Arc<Mutex<HashMap<Id, Record>>>;
1719

@@ -108,11 +110,7 @@ impl ExpiredDeletion for RequestSessionStore {
108110
.iter()
109111
.filter_map(
110112
|(id, Record { expiry_date, .. })| {
111-
if *expiry_date <= now {
112-
Some(*id)
113-
} else {
114-
None
115-
}
113+
if *expiry_date <= now { Some(*id) } else { None }
116114
},
117115
)
118116
.collect::<Vec<_>>();
@@ -176,6 +174,12 @@ pub enum SessionError {
176174
},
177175
}
178176

177+
#[derive(Debug, Serialize, Deserialize)]
178+
pub struct ErrorResponse {
179+
pub message: String,
180+
pub status_code: u16,
181+
}
182+
179183
impl IntoResponse for SessionError {
180184
fn into_response(self) -> axum::response::Response {
181185
let er = ErrorResponse {

crates/api-snowflake-rest/Cargo.toml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,21 @@ edition = "2024"
55
license-file.workspace = true
66

77
[dependencies]
8+
api-sessions = { path = "../api-sessions" }
9+
core-metastore = { path = "../core-metastore" }
10+
core-executor = { path = "../core-executor" }
11+
12+
axum = { workspace = true }
13+
flate2 = "1"
14+
regex = { workspace = true }
15+
base64 = "0.22"
16+
indexmap = { workspace = true }
17+
datafusion = { workspace = true }
18+
snafu = { workspace = true }
19+
serde = { workspace = true }
20+
serde_json = { workspace = true }
21+
tracing = { workspace = true }
22+
uuid = { workspace = true }
823

924
[lints]
1025
workspace = true
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
use axum::{Json, http, response::IntoResponse};
2+
use snafu::prelude::*;
3+
4+
use crate::schemas::JsonResponse;
5+
use core_executor::error::ExecutionError;
6+
use datafusion::arrow::error::ArrowError;
7+
8+
#[derive(Snafu, Debug)]
9+
#[snafu(visibility(pub(crate)))]
10+
pub enum DbtError {
11+
#[snafu(display("Failed to decompress GZip body"))]
12+
GZipDecompress { source: std::io::Error },
13+
14+
#[snafu(display("Failed to parse login request"))]
15+
LoginRequestParse { source: serde_json::Error },
16+
17+
#[snafu(display("Failed to parse query body"))]
18+
QueryBodyParse { source: serde_json::Error },
19+
20+
#[snafu(display("Missing auth token"))]
21+
MissingAuthToken,
22+
23+
#[snafu(display("Invalid warehouse_id format"))]
24+
InvalidWarehouseIdFormat { source: uuid::Error },
25+
26+
#[snafu(display("Missing DBT session"))]
27+
MissingDbtSession,
28+
29+
#[snafu(display("Invalid auth data"))]
30+
InvalidAuthData,
31+
32+
#[snafu(display("Feature not implemented"))]
33+
NotImplemented,
34+
35+
#[snafu(display("Failed to parse row JSON"))]
36+
RowParse { source: serde_json::Error },
37+
38+
#[snafu(display("UTF8 error: {source}"))]
39+
Utf8 { source: std::string::FromUtf8Error },
40+
41+
#[snafu(display("Arrow error: {source}"))]
42+
Arrow { source: ArrowError },
43+
44+
// #[snafu(transparent)]
45+
// Metastore {
46+
// source: core_metastore::error::MetastoreError,
47+
// },
48+
#[snafu(transparent)]
49+
Execution { source: ExecutionError },
50+
}
51+
52+
pub type DbtResult<T> = std::result::Result<T, DbtError>;
53+
54+
impl IntoResponse for DbtError {
55+
fn into_response(self) -> axum::response::Response<axum::body::Body> {
56+
if let Self::Execution { source } = self {
57+
return convert_into_response(&source);
58+
}
59+
// if let Self::Metastore { source } = self {
60+
// return source.into_response();
61+
// }
62+
63+
let status_code = match &self {
64+
Self::GZipDecompress { .. }
65+
| Self::LoginRequestParse { .. }
66+
| Self::QueryBodyParse { .. }
67+
| Self::InvalidWarehouseIdFormat { .. } => http::StatusCode::BAD_REQUEST,
68+
Self::RowParse { .. }
69+
| Self::Utf8 { .. }
70+
| Self::Arrow { .. }
71+
// | Self::Metastore { .. }
72+
| Self::Execution { .. }
73+
| Self::NotImplemented { .. } => http::StatusCode::OK,
74+
Self::MissingAuthToken | Self::MissingDbtSession | Self::InvalidAuthData => {
75+
http::StatusCode::UNAUTHORIZED
76+
}
77+
};
78+
79+
let message = match &self {
80+
Self::GZipDecompress { source } => format!("failed to decompress GZip body: {source}"),
81+
Self::LoginRequestParse { source } => {
82+
format!("failed to parse login request: {source}")
83+
}
84+
Self::QueryBodyParse { source } => format!("failed to parse query body: {source}"),
85+
Self::InvalidWarehouseIdFormat { source } => format!("invalid warehouse_id: {source}"),
86+
Self::RowParse { source } => format!("failed to parse row JSON: {source}"),
87+
Self::MissingAuthToken | Self::MissingDbtSession | Self::InvalidAuthData => {
88+
"session error".to_string()
89+
}
90+
Self::Utf8 { source } => {
91+
format!("Error encoding UTF8 string: {source}")
92+
}
93+
Self::Arrow { source } => {
94+
format!("Error encoding in Arrow format: {source}")
95+
}
96+
Self::NotImplemented => "feature not implemented".to_string(),
97+
// Self::Metastore { source } => source.to_string(),
98+
Self::Execution { source } => source.to_string(),
99+
};
100+
101+
let body = Json(JsonResponse {
102+
success: false,
103+
message: Some(message),
104+
// TODO: On error data field contains details about actual error
105+
// {'data': {'internalError': False, 'unredactedFromSecureObject': False, 'errorCode': '002003', 'age': 0, 'sqlState': '02000', 'queryId': '01bb407f-0002-97af-0004-d66e006a69fa', 'line': 1, 'pos': 14, 'type': 'COMPILATION'}}
106+
data: None,
107+
code: Some(status_code.as_u16().to_string()),
108+
});
109+
(status_code, body).into_response()
110+
}
111+
}
112+
113+
fn convert_into_response(error: &ExecutionError) -> axum::response::Response {
114+
let status_code = match error {
115+
ExecutionError::RegisterUDF { .. }
116+
| ExecutionError::RegisterUDAF { .. }
117+
| ExecutionError::InvalidTableIdentifier { .. }
118+
| ExecutionError::InvalidSchemaIdentifier { .. }
119+
| ExecutionError::InvalidFilePath { .. }
120+
| ExecutionError::InvalidBucketIdentifier { .. }
121+
| ExecutionError::TableProviderNotFound { .. }
122+
| ExecutionError::MissingDataFusionSession { .. }
123+
| ExecutionError::Utf8 { .. }
124+
| ExecutionError::VolumeNotFound { .. }
125+
| ExecutionError::ObjectStore { .. }
126+
| ExecutionError::ObjectAlreadyExists { .. }
127+
| ExecutionError::UnsupportedFileFormat { .. }
128+
| ExecutionError::RefreshCatalogList { .. }
129+
| ExecutionError::UrlParse { .. }
130+
| ExecutionError::JobError { .. }
131+
| ExecutionError::UploadFailed { .. } => http::StatusCode::BAD_REQUEST,
132+
ExecutionError::Arrow { .. }
133+
| ExecutionError::S3Tables { .. }
134+
| ExecutionError::Iceberg { .. }
135+
| ExecutionError::CatalogListDowncast { .. }
136+
| ExecutionError::CatalogDownCast { .. }
137+
| ExecutionError::RegisterCatalog { .. } => http::StatusCode::INTERNAL_SERVER_ERROR,
138+
ExecutionError::DatabaseNotFound { .. }
139+
| ExecutionError::TableNotFound { .. }
140+
| ExecutionError::SchemaNotFound { .. }
141+
| ExecutionError::CatalogNotFound { .. }
142+
| ExecutionError::Metastore { .. }
143+
| ExecutionError::DataFusion { .. }
144+
| ExecutionError::DataFusionQuery { .. } => http::StatusCode::OK,
145+
};
146+
147+
let message = match error {
148+
ExecutionError::DataFusion { source } => format!("DataFusion error: {source}"),
149+
ExecutionError::DataFusionQuery { source, query } => {
150+
format!("DataFusion error: {source}, query: {query}")
151+
}
152+
ExecutionError::InvalidTableIdentifier { ident } => {
153+
format!("Invalid table identifier: {ident}")
154+
}
155+
ExecutionError::InvalidSchemaIdentifier { ident } => {
156+
format!("Invalid schema identifier: {ident}")
157+
}
158+
ExecutionError::InvalidFilePath { path } => format!("Invalid file path: {path}"),
159+
ExecutionError::InvalidBucketIdentifier { ident } => {
160+
format!("Invalid bucket identifier: {ident}")
161+
}
162+
ExecutionError::Arrow { source } => format!("Arrow error: {source}"),
163+
ExecutionError::TableProviderNotFound { table_name } => {
164+
format!("No Table Provider found for table: {table_name}")
165+
}
166+
ExecutionError::MissingDataFusionSession { id } => {
167+
format!("Missing DataFusion session for id: {id}")
168+
}
169+
ExecutionError::Utf8 { source } => format!("Error encoding UTF8 string: {source}"),
170+
ExecutionError::Metastore { source } => format!("Metastore error: {source}"),
171+
ExecutionError::DatabaseNotFound { db } => format!("Database not found: {db}"),
172+
ExecutionError::TableNotFound { table } => format!("Table not found: {table}"),
173+
ExecutionError::SchemaNotFound { schema } => format!("Schema not found: {schema}"),
174+
ExecutionError::VolumeNotFound { volume } => format!("Volume not found: {volume}"),
175+
ExecutionError::ObjectStore { source } => format!("Object store error: {source}"),
176+
ExecutionError::ObjectAlreadyExists { type_name, name } => {
177+
format!("Object of type {type_name} with name {name} already exists")
178+
}
179+
ExecutionError::UnsupportedFileFormat { format } => {
180+
format!("Unsupported file format {format}")
181+
}
182+
ExecutionError::RefreshCatalogList { source } => {
183+
format!("Refresh catalog list error: {source}")
184+
}
185+
_ => "Internal server error".to_string(),
186+
};
187+
188+
let body = Json(JsonResponse {
189+
success: false,
190+
message: Some(message),
191+
// TODO: On error data field contains details about actual error
192+
// {'data': {'internalError': False, 'unredactedFromSecureObject': False, 'errorCode': '002003', 'age': 0, 'sqlState': '02000', 'queryId': '01bb407f-0002-97af-0004-d66e006a69fa', 'line': 1, 'pos': 14, 'type': 'COMPILATION'}}
193+
data: None,
194+
code: Some(status_code.as_u16().to_string()),
195+
});
196+
(status_code, body).into_response()
197+
}

crates/runtime/src/http/dbt/handlers.rs renamed to crates/api-snowflake-rest/src/handlers.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,22 @@
1-
use super::error::{self as dbt_error, DbtError, DbtResult};
2-
use crate::execution::query::QueryContext;
3-
use crate::execution::utils::DataSerializationFormat;
4-
use crate::http::dbt::schemas::{
1+
use crate::error::{self as dbt_error, DbtError, DbtResult};
2+
use crate::schemas::{
53
JsonResponse, LoginData, LoginRequestBody, LoginRequestQuery, LoginResponse, QueryRequest,
64
QueryRequestBody, ResponseData,
75
};
8-
use crate::http::session::DFSessionId;
9-
use crate::http::state::AppState;
6+
use crate::state::AppState;
7+
use api_sessions::DFSessionId;
8+
use axum::Json;
109
use axum::body::Bytes;
1110
use axum::extract::{Query, State};
1211
use axum::http::HeaderMap;
13-
use axum::Json;
1412
use base64;
1513
use base64::engine::general_purpose::STANDARD as engine_base64;
1614
use base64::prelude::*;
17-
use datafusion::arrow::ipc::writer::{IpcWriteOptions, StreamWriter};
15+
use core_executor::{query::QueryContext, utils::DataSerializationFormat};
1816
use datafusion::arrow::ipc::MetadataVersion;
19-
use datafusion::arrow::json::writer::JsonArray;
17+
use datafusion::arrow::ipc::writer::{IpcWriteOptions, StreamWriter};
2018
use datafusion::arrow::json::WriterBuilder;
19+
use datafusion::arrow::json::writer::JsonArray;
2120
use datafusion::arrow::record_batch::RecordBatch;
2221
use flate2::read::GzDecoder;
2322
use regex::Regex;
Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,5 @@
1-
pub fn add(left: u64, right: u64) -> u64 {
2-
left + right
3-
}
4-
5-
#[cfg(test)]
6-
mod tests {
7-
use super::*;
8-
9-
#[test]
10-
fn it_works() {
11-
let result = add(2, 2);
12-
assert_eq!(result, 4);
13-
}
14-
}
1+
pub mod error;
2+
pub mod handlers;
3+
pub mod router;
4+
pub mod schemas;
5+
pub mod state;

crates/runtime/src/http/dbt/router.rs renamed to crates/api-snowflake-rest/src/router.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
use crate::http::state::AppState;
2-
use axum::routing::post;
1+
use crate::handlers::{abort, login, query};
2+
use crate::state::AppState;
33
use axum::Router;
4-
5-
use crate::http::dbt::handlers::{abort, login, query};
4+
use axum::routing::post;
65

76
pub fn create_router() -> Router<AppState> {
87
Router::new()

crates/runtime/src/http/dbt/schemas.rs renamed to crates/api-snowflake-rest/src/schemas.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::error::{self as dbt_error, DbtResult};
2-
use crate::execution::models::ColumnInfo as ColumnInfoModel;
2+
use core_executor::models::ColumnInfo as ColumnInfoModel;
33
use indexmap::IndexMap;
44
use serde::{Deserialize, Serialize};
55
use snafu::ResultExt;

0 commit comments

Comments
 (0)