Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
94ebd97
added sqlite conn to metastore, metastore new changed from sync to async
YaroslavLitvinov Oct 23, 2025
ebc781e
Merge remote-tracking branch 'origin/main' into yaro/sqlite-metastore
YaroslavLitvinov Oct 23, 2025
f551732
add patchdog workflow
YaroslavLitvinov Oct 23, 2025
0f10c4f
add workflow dispatch
YaroslavLitvinov Oct 23, 2025
5914c47
stage before merging main
YaroslavLitvinov Oct 24, 2025
0851f8b
Merge remote-tracking branch 'origin/main' into yaro/sqlite-metastore
YaroslavLitvinov Oct 24, 2025
9aa26a3
metastore tests separated from implemetation; remove utoipa::ToSchema…
YaroslavLitvinov Oct 25, 2025
ed61103
metastore api simplify
YaroslavLitvinov Oct 25, 2025
76c45c8
added ID to RwObject
YaroslavLitvinov Oct 26, 2025
dd7aed6
broken, staged
YaroslavLitvinov Oct 27, 2025
5d3f7f4
staged wip
YaroslavLitvinov Oct 28, 2025
5305557
merged
YaroslavLitvinov Oct 29, 2025
b588851
stage volumes basic ops ok
YaroslavLitvinov Oct 29, 2025
51e493c
move from iter_volumes to get_volumes
YaroslavLitvinov Oct 29, 2025
82f8e9a
volume tests fixes
YaroslavLitvinov Oct 30, 2025
47b0224
wip
YaroslavLitvinov Nov 1, 2025
145ea36
make it compilable after databases support
YaroslavLitvinov Nov 2, 2025
5f2f634
run test server in separate thread
YaroslavLitvinov Nov 3, 2025
c793523
staged
YaroslavLitvinov Nov 3, 2025
7776c57
Merge remote-tracking branch 'origin/main' into yaro/sqlite-metastore
YaroslavLitvinov Nov 3, 2025
82fe602
wip
YaroslavLitvinov Nov 4, 2025
c1433fa
before changes
YaroslavLitvinov Nov 4, 2025
5877ce7
wip
YaroslavLitvinov Nov 5, 2025
797b957
draft
YaroslavLitvinov Nov 5, 2025
2e52499
some fixes, tests failing
YaroslavLitvinov Nov 6, 2025
c8dce28
fix known metastore bugs
YaroslavLitvinov Nov 6, 2025
f839468
lint
YaroslavLitvinov Nov 6, 2025
e337143
fmt
YaroslavLitvinov Nov 6, 2025
dbd70da
merged with main
YaroslavLitvinov Nov 6, 2025
1bd415d
Merge remote-tracking branch 'origin' into yaro/sqlite-metastore
YaroslavLitvinov Nov 6, 2025
ea97b3e
couple of snapshots
YaroslavLitvinov Nov 6, 2025
27774f7
staged work, use ids for compile time checks
YaroslavLitvinov Nov 7, 2025
e328080
fmt
YaroslavLitvinov Nov 7, 2025
b663667
use inmemory table metastore
YaroslavLitvinov Nov 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,801 changes: 195 additions & 1,606 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 4 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
default-members = ["crates/embucketd"]
members = [
"crates/embucketd",
# "crates/embucket-seed",
"crates/api-iceberg-rest",
"crates/api-snowflake-rest",
"crates/api-ui",
Expand All @@ -14,9 +13,7 @@ members = [
"crates/core-executor",
"crates/core-history",
"crates/core-metastore",
"crates/core-utils",
"crates/api-sessions",
"crates/benchmarks",
"crates/core-sqlite"
]
resolver = "2"
Expand All @@ -34,7 +31,7 @@ lto = "off"
debug = false

[workspace.dependencies]
core-sqlite = { path = "crates/core-sqlite" } # features = ["vfs"]
core-sqlite = { path = "crates/core-sqlite" }
async-trait = { version = "0.1.84" }
aws-config = { version = "1.5.17" }
aws-credential-types = { version = "1.2.1", features = ["hardcoded-credentials"]}
Expand Down Expand Up @@ -69,7 +66,6 @@ regex = "1.11.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.9"
slatedb = { version = "0.8.2", features = ["moka"] }
snafu = { version = "0.8.5", features = ["futures"] }
tikv-jemallocator = { version = "0.6.0" }
strum = { version = "0.27.2", features = ["derive"] }
Expand All @@ -96,14 +92,16 @@ url = "2.5"
utoipa = { version = "5.3.1", features = ["uuid", "chrono"] }
utoipa-axum = { version = "0.2.0" }
utoipa-swagger-ui = { version = "9", features = ["axum"] }
uuid = { version = "1.10.0", features = ["v4", "serde"] }
uuid = { version = "1.18.1", features = ["v4", "v7", "serde"] }
validator = { version = "0.20.0", features = ["derive"] }
mockall = "0.13.1"
reqwest = "0.12.14"
insta = { version = "1.42.0", features = ["json", "filters", "redactions"] }
cfg-if = { version = "1.0.3" }
rusqlite = { version = "0.37.0", features = ["blob", "trace", "bundled"] }
deadpool-sqlite = { version = "0.12.1", features = ["tracing"] }
deadpool = { version = "0.12.3" }
deadpool-diesel = { version = "0.6.1", features = ["sqlite", "tracing"] }

[patch.crates-io]
datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "832c278922863064571c0a7c5716a3ff87ce5201" }
Expand Down
1 change: 0 additions & 1 deletion crates/api-iceberg-rest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ license-file.workspace = true

[dependencies]
core-metastore = { path = "../core-metastore" }
core-utils = { path = "../core-utils" }
error-stack-trace = { path = "../error-stack-trace" }
error-stack = { path = "../error-stack" }

Expand Down
15 changes: 5 additions & 10 deletions crates/api-iceberg-rest/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl IntoResponse for Error {
fields(status_code),
skip(self)
)]
#[allow(clippy::match_same_arms)]
fn into_response(self) -> axum::response::Response {
tracing::error!(error_message = %self.output_msg(), "Iceberg API error");
let metastore_error = match self {
Expand Down Expand Up @@ -87,16 +88,10 @@ impl IntoResponse for Error {
| core_metastore::Error::TableNotFound { .. }
| core_metastore::Error::ObjectNotFound { .. } => http::StatusCode::NOT_FOUND,
core_metastore::Error::ObjectStore { .. }
| core_metastore::Error::ObjectStorePath { .. }
| core_metastore::Error::CreateDirectory { .. }
| core_metastore::Error::SlateDB { .. }
| core_metastore::Error::UtilSlateDB { .. }
| core_metastore::Error::Iceberg { .. }
| core_metastore::Error::IcebergSpec { .. }
| core_metastore::Error::Serde { .. }
| core_metastore::Error::TableMetadataBuilder { .. }
| core_metastore::Error::TableObjectStoreNotFound { .. }
| core_metastore::Error::UrlParse { .. } => http::StatusCode::INTERNAL_SERVER_ERROR,
| core_metastore::Error::ObjectStorePath { .. } => {
http::StatusCode::INTERNAL_SERVER_ERROR
}
_ => http::StatusCode::INTERNAL_SERVER_ERROR,
};

// Record the result as part of the current span.
Expand Down
23 changes: 9 additions & 14 deletions crates/api-iceberg-rest/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use crate::state::State as AppState;
use axum::http::StatusCode;
use axum::{Json, extract::Path, extract::Query, extract::State};
use core_metastore::error::{self as metastore_error};
use core_metastore::{SchemaIdent as MetastoreSchemaIdent, TableIdent as MetastoreTableIdent};
use core_utils::scan_iterator::ScanIterator;
use core_metastore::{
ListParams, SchemaIdent as MetastoreSchemaIdent, TableIdent as MetastoreTableIdent,
};
use iceberg_rest_catalog::models::{
CatalogConfig, CommitTableResponse, CreateNamespaceRequest, CreateNamespaceResponse,
CreateTableRequest, GetNamespaceResponse, ListNamespacesResponse, ListTablesResponse,
Expand Down Expand Up @@ -94,10 +95,8 @@ pub async fn list_namespaces(
) -> Result<Json<ListNamespacesResponse>> {
let schemas = state
.metastore
.iter_schemas(&database_name)
.collect()
.get_schemas(ListParams::default().by_parent_name(database_name.clone()))
.await
.context(metastore_error::UtilSlateDBSnafu)
.context(api_iceberg_rest_error::MetastoreSnafu {
operation: Operation::ListNamespaces,
})?;
Expand Down Expand Up @@ -263,20 +262,16 @@ pub async fn delete_table(
}

#[tracing::instrument(level = "debug", skip(state), err, ret(level = tracing::Level::TRACE))]
pub async fn list_tables(
pub async fn get_tables(
State(state): State<AppState>,
Path((database_name, schema_name)): Path<(String, String)>,
) -> Result<Json<ListTablesResponse>> {
let schema_ident = MetastoreSchemaIdent::new(database_name, schema_name);
let tables = state
.metastore
.iter_tables(&schema_ident)
.collect()
.await
.context(metastore_error::UtilSlateDBSnafu)
.context(api_iceberg_rest_error::MetastoreSnafu {
let tables = state.metastore.get_tables(&schema_ident).await.context(
api_iceberg_rest_error::MetastoreSnafu {
operation: Operation::ListTables,
})?;
},
)?;
Ok(Json(from_tables_list(tables)))
}

Expand Down
4 changes: 2 additions & 2 deletions crates/api-iceberg-rest/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use axum::routing::{delete, get, post};

use crate::handlers::{
commit_table, create_namespace, create_table, delete_namespace, delete_table, get_config,
get_namespace, get_table, list_namespaces, list_tables, list_views, register_table,
get_namespace, get_table, get_tables, list_namespaces, list_views, register_table,
report_metrics,
};

pub fn create_router() -> Router<State> {
let table_router: Router<State> = Router::new()
.route("/", post(create_table))
.route("/", get(list_tables))
.route("/", get(get_tables))
.route("/{table}", get(get_table))
.route("/{table}", delete(delete_table))
.route("/{table}", post(commit_table))
Expand Down
2 changes: 1 addition & 1 deletion crates/api-iceberg-rest/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use core_metastore::metastore::Metastore;
use core_metastore::Metastore;
use std::sync::Arc;

use serde::{Deserialize, Serialize};
Expand Down
7 changes: 3 additions & 4 deletions crates/api-snowflake-rest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@ default-server = [
"dep:core-executor",
"dep:core-metastore",
"dep:core-history",
"dep:core-utils",
"dep:tower-sessions",
"dep:tower-http",
"dep:axum",
"dep:snafu",
"dep:tracing",
"dep:flate2",
"dep:indexmap",
"dep:datafusion",
Expand All @@ -30,18 +28,17 @@ api-sessions = { path = "../api-sessions", optional = true }
core-executor = { path = "../core-executor", optional = true }
core-metastore = { path = "../core-metastore", optional = true }
core-history = { path = "../core-history", optional = true }
core-utils = { path = "../core-utils", optional = true }

error-stack-trace = { path = "../error-stack-trace" }
error-stack = { path = "../error-stack" }

tracing-subscriber = { version = "0.3.20", features = ["env-filter", "registry", "fmt", "json"] }
tracing = { workspace = true }

tower-sessions = { workspace = true, optional = true }
tower-http = { workspace = true, optional = true }
axum = { workspace = true, optional = true }
snafu = { workspace = true, optional = true }
tracing = { workspace = true, optional = true }
flate2 = { version = "1", optional = true}
indexmap = { workspace = true, optional = true }
base64 = { version = "0.22" }
Expand All @@ -55,6 +52,8 @@ time = { workspace = true }
uuid = { workspace = true }
tokio = { workspace = true }
cfg-if = { workspace = true }
axum-server = "0.7.2"
futures = "0.3.31"

[dev-dependencies]
insta = { workspace = true }
Expand Down
5 changes: 3 additions & 2 deletions crates/api-snowflake-rest/src/server/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ pub async fn abort(
request_id,
}): Json<AbortRequestBody>,
) -> Result<Json<serde_json::value::Value>> {
state
let _query_status = state
.execution_svc
.abort_query(RunningQueryId::ByRequestId(request_id, sql_text))?;
.abort_query(RunningQueryId::ByRequestId(request_id, sql_text))
.await?;
Ok(Json(serde_json::value::Value::Null))
}
8 changes: 4 additions & 4 deletions crates/api-snowflake-rest/src/server/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use super::state;
use axum::middleware;
use core_executor::service::CoreExecutionService;
use core_executor::utils::Config as UtilsConfig;
use core_history::SlateDBHistoryStore;
use core_metastore::SlateDBMetastore;
use core_history::HistoryStoreDb;
use core_metastore::MetastoreDb;
use std::sync::Arc;
use tower::ServiceBuilder;
use tower_http::compression::CompressionLayer;
Expand All @@ -30,8 +30,8 @@ pub fn create_router() -> Router<AppState> {
// TODO: We should consider using this by both main and tests
#[allow(clippy::needless_pass_by_value, clippy::expect_used)]
pub async fn make_app(
metastore: SlateDBMetastore,
history_store: SlateDBHistoryStore,
metastore: MetastoreDb,
history_store: HistoryStoreDb,
snowflake_rest_cfg: Config,
execution_cfg: UtilsConfig,
) -> Result<Router, Box<dyn std::error::Error>> {
Expand Down
104 changes: 88 additions & 16 deletions crates/api-snowflake-rest/src/server/test_server.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,89 @@
use super::server_models::Config;
use crate::server::router::make_app;
use crate::server::server_models::Config as AppCfg;
use core_executor::utils::Config as UtilsConfig;
use core_history::SlateDBHistoryStore;
use core_metastore::SlateDBMetastore;
use core_history::HistoryStoreDb;
use core_metastore::MetastoreDb;
use std::net::SocketAddr;
use std::net::TcpListener;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
use tokio::runtime::Builder;
use tracing_subscriber::fmt::format::FmtSpan;

#[allow(clippy::expect_used)]
pub async fn run_test_rest_api_server(data_format: &str) -> SocketAddr {
let app_cfg = Config::new(data_format)
.expect("Failed to create server config")
.with_demo_credentials("embucket".to_string(), "embucket".to_string());
#[must_use]
pub fn server_default_cfg(data_format: &str) -> Option<(AppCfg, UtilsConfig)> {
Some((
Config::new(data_format)
.expect("Failed to create server config")
.with_demo_credentials("embucket".to_string(), "embucket".to_string()),
UtilsConfig::default().with_max_concurrency_level(2),
))
}

#[allow(clippy::expect_used)]
pub fn run_test_rest_api_server(server_cfg: Option<(AppCfg, UtilsConfig)>) -> SocketAddr {
let (app_cfg, executor_cfg) = server_cfg.unwrap_or_else(|| {
server_default_cfg("json").expect("Failed to create default server config")
});

let server_cond = Arc::new((Mutex::new(false), Condvar::new())); // Shared state with a condition
let server_cond_clone = Arc::clone(&server_cond);

let listener = TcpListener::bind("0.0.0.0:0").expect("Failed to bind to address");
let addr = listener.local_addr().expect("Failed to get local address");

// Start a new thread for the server
let _handle = std::thread::spawn(move || {
// Create the Tokio runtime
let rt = Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create Tokio runtime");

// Start the Axum server
rt.block_on(async {
let () = run_test_rest_api_server_with_config(
app_cfg,
executor_cfg,
listener,
server_cond_clone,
)
.await;
});
});
// Note: Not joining thread as
// We are not interested in graceful thread termination, as soon out tests passed.

run_test_rest_api_server_with_config(app_cfg, UtilsConfig::default()).await
let (lock, cvar) = &*server_cond;
let timeout_duration = std::time::Duration::from_secs(1);

// Lock the mutex and wait for notification with timeout
let notified = lock.lock().expect("Failed to lock mutex");
let result = cvar
.wait_timeout(notified, timeout_duration)
.expect("Failed to wait for server start");

// Check if notified or timed out
if *result.0 {
tracing::info!("Test server is up and running.");
thread::sleep(Duration::from_millis(10));
} else {
tracing::error!("Timeout occurred while waiting for server start.");
}

addr
}

#[allow(clippy::unwrap_used, clippy::expect_used)]
pub async fn run_test_rest_api_server_with_config(
app_cfg: Config,
execution_cfg: UtilsConfig,
) -> SocketAddr {
let listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap();
listener: std::net::TcpListener,
server_cond: Arc<(Mutex<bool>, Condvar)>,
) {
let addr = listener.local_addr().unwrap();

let traces_writer = std::fs::OpenOptions::new()
Expand All @@ -39,24 +102,33 @@ pub async fn run_test_rest_api_server_with_config(
.with_line_number(true)
.with_span_events(FmtSpan::NONE)
.with_level(true)
.with_max_level(tracing_subscriber::filter::LevelFilter::DEBUG)
.with_max_level(tracing_subscriber::filter::LevelFilter::TRACE)
.finish();

// ignoring error: as with parralel tests execution, just first thread is able to set it successfully
// since all tests run in a single process
let _ = tracing::subscriber::set_global_default(subscriber);

let metastore = SlateDBMetastore::new_in_memory().await;
let history = SlateDBHistoryStore::new_in_memory().await;
tracing::info!("Starting server at {addr}");

let metastore = MetastoreDb::new_in_memory().await;
let history = HistoryStoreDb::new_in_memory().await;

let app = make_app(metastore, history, app_cfg, execution_cfg)
.await
.unwrap()
.into_make_service_with_connect_info::<SocketAddr>();

tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
// Lock the mutex and set the notification flag
{
let (lock, cvar) = &*server_cond;
let mut notify_server_started = lock.lock().unwrap();
*notify_server_started = true; // Set notification
cvar.notify_one(); // Notify the waiting thread
}

addr
tracing::info!("Server ready at {addr}");

// Serve the application
axum_server::from_tcp(listener).serve(app).await.unwrap();
}
1 change: 1 addition & 0 deletions crates/api-snowflake-rest/src/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub async fn http_req_with_headers<T: serde::de::DeserializeOwned>(
url: &String,
payload: String,
) -> Result<(HeaderMap, T), TestHttpError> {
tracing::trace!("Request: {method} {url}");
let res = client
.request(method.clone(), url)
.headers(headers)
Expand Down
Loading
Loading