Skip to content

Commit 8cf66a1

Browse files
authored
Implement MERGE INTO (#147)
* # This is a combination of 14 commits. # This is the 1st commit message: Upload CSV to table # This is the commit message #2: Upload CSV to table # This is the commit message #3: Fix linter # This is the commit message #4: Remove unused error # This is the commit message #5: Fix clippy # This is the commit message #6: Fix clippy # This is the commit message #7: Fix linter # This is the commit message #8: Fix clippy # This is the commit message #9: Fix clippy # This is the commit message #10: Fix clippy # This is the commit message #11: Fix fmt # This is the commit message #12: Add pre-commit # This is the commit message #13: Parse merge into # This is the commit message #14: Parse merge into * Merge * Add full message text into the error * Fix * Update MERGE to insert only not matched records * Update deps * Update MERGE to insert only not matched records
1 parent f783b56 commit 8cf66a1

File tree

9 files changed

+233
-68
lines changed

9 files changed

+233
-68
lines changed

.pre-commit-config.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
repos:
2+
- repo: https://github.com/doublify/pre-commit-rust
3+
rev: v1.0
4+
hooks:
5+
- id: cargo-check
6+
args: [ "--workspace" ]
7+
- id: fmt
8+
args: [ "--", "--check" ]
9+
- id: clippy
10+
args: [ "--", "-D", "warnings" ]

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ datafusion = { version = "43" }
3535
snafu = { version = "0.8.5", features = ["futures"] }
3636

3737
[patch.crates-io]
38-
datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "bc5a978a4102391b2b7427dfdf94dd4e2667be49" }
38+
datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" }
3939

4040

4141
[workspace.lints.clippy]

crates/control_plane/Cargo.toml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ flatbuffers = { version = "24.3.25" }
1818
#iceberg-rest-catalog = { git = "https://github.com/JanKaul/iceberg-rust.git", rev = "836f11f" }
1919
#datafusion_iceberg = { git = "https://github.com/JanKaul/iceberg-rust.git", rev = "836f11f" }
2020

21-
datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "bc5a978a4102391b2b7427dfdf94dd4e2667be49" }
22-
datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "bc5a978a4102391b2b7427dfdf94dd4e2667be49" }
23-
datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "bc5a978a4102391b2b7427dfdf94dd4e2667be49" }
21+
datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" }
22+
datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" }
23+
datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" }
2424

25-
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "5d4211521085722de35b14c444da087c52309771" }
26-
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "5d4211521085722de35b14c444da087c52309771" }
27-
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "5d4211521085722de35b14c444da087c52309771" }
25+
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" }
26+
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" }
27+
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" }
2828

2929
arrow = { version = "53" }
3030
arrow-json = { version = "53" }

crates/nexus/src/http/dbt/error.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,24 @@ impl IntoResponse for DbtError {
5757
Self::NotImplemented => http::StatusCode::NOT_IMPLEMENTED,
5858
};
5959

60+
let message = match &self {
61+
Self::GZipDecompress { source } => format!("failed to decompress GZip body: {source}"),
62+
Self::LoginRequestParse { source } => {
63+
format!("failed to parse login request: {source}")
64+
}
65+
Self::QueryBodyParse { source } => format!("failed to parse query body: {source}"),
66+
Self::InvalidWarehouseIdFormat { source } => format!("invalid warehouse_id: {source}"),
67+
Self::ControlService { source } => source.to_string(),
68+
Self::RowParse { source } => format!("failed to parse row JSON: {source}"),
69+
Self::MissingAuthToken | Self::MissingDbtSession | Self::InvalidAuthData => {
70+
"session error".to_string()
71+
}
72+
Self::NotImplemented => "feature not implemented".to_string(),
73+
};
74+
6075
let body = Json(JsonResponse {
6176
success: false,
62-
message: Some(self.to_string()),
77+
message: Some(message),
6378
data: None,
6479
code: Some(status_code.as_u16().to_string()),
6580
});

crates/nexus/src/http/dbt/handlers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ pub async fn login(
3232

3333
//println!("Received login request: {:?}", query);
3434
//println!("Body data parameters: {:?}", body_json);
35-
let token = uuid::Uuid::new_v4().to_string();
35+
let token = Uuid::new_v4().to_string();
3636

3737
let warehouses = state
3838
.control_svc

crates/nexus/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ async fn buffer_and_print<B>(
199199
body: B,
200200
) -> Result<Bytes, (StatusCode, String)>
201201
where
202-
B: axum::body::HttpBody<Data = Bytes>,
202+
B: axum::body::HttpBody<Data = Bytes> + Send,
203203
B::Error: std::fmt::Display,
204204
{
205205
let bytes = match body.collect().await {

crates/runtime/Cargo.toml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ serde = { workspace = true }
1414
serde_json = { workspace = true }
1515
object_store = { workspace = true }
1616

17-
datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "bc5a978a4102391b2b7427dfdf94dd4e2667be49" }
18-
datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "bc5a978a4102391b2b7427dfdf94dd4e2667be49" }
19-
datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "bc5a978a4102391b2b7427dfdf94dd4e2667be49" }
17+
datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" }
18+
datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" }
19+
datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" }
2020

21-
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "5d4211521085722de35b14c444da087c52309771" }
22-
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "5d4211521085722de35b14c444da087c52309771" }
23-
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "5d4211521085722de35b14c444da087c52309771" }
21+
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" }
22+
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" }
23+
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" }
2424

2525
arrow = { version = "53" }
2626
arrow-json = { version = "53" }

0 commit comments

Comments
 (0)