Skip to content

Commit 6e0617d

Browse files
committed
feat: add update namespace
Signed-off-by: callum-ryan <[email protected]>
1 parent f2f3311 commit 6e0617d

File tree

1 file changed

+168
-80
lines changed

1 file changed

+168
-80
lines changed

crates/catalog/sql/src/catalog.rs

Lines changed: 168 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -118,51 +118,33 @@ impl SqlCatalog {
118118
};
119119

120120
sqlx::query(
121-
&("create table if not exists ".to_string()
122-
+ CATALOG_TABLE_VIEW_NAME
123-
+ " ("
124-
+ CATALOG_NAME
125-
+ " varchar(255) not null,"
126-
+ TABLE_NAMESPACE
127-
+ " varchar(255) not null,"
128-
+ TABLE_NAME
129-
+ " varchar(255) not null,"
130-
+ METADATA_LOCATION_PROP
131-
+ " varchar(255),"
132-
+ PREVIOUS_METADATA_LOCATION_PROP
133-
+ " varchar(255),"
134-
+ RECORD_TYPE
135-
+ " varchar(5), primary key ("
136-
+ CATALOG_NAME
137-
+ ", "
138-
+ TABLE_NAMESPACE
139-
+ ", "
140-
+ TABLE_NAME
141-
+ ")
142-
);"),
121+
&format!("create table if not exists {} ({} varchar(255) not null, {} varchar(255) not null, {} varchar(255) not null, {} varchar(255), {} varchar(255), {} varchar(5), primary key ({}, {}, {}))",
122+
CATALOG_TABLE_VIEW_NAME,
123+
CATALOG_NAME,
124+
TABLE_NAMESPACE,
125+
TABLE_NAME,
126+
METADATA_LOCATION_PROP,
127+
PREVIOUS_METADATA_LOCATION_PROP,
128+
RECORD_TYPE,
129+
CATALOG_NAME,
130+
TABLE_NAMESPACE,
131+
TABLE_NAME),
143132
)
144133
.execute(&pool)
145134
.await
146135
.map_err(from_sqlx_error)?;
147136

148137
sqlx::query(
149-
&("create table if not exists ".to_owned()
150-
+ NAMESPACE_PROPERTIES_TABLE_NAME
151-
+ " ( "
152-
+ CATALOG_NAME
153-
+ " varchar(255) not null, "
154-
+ NAMESPACE_NAME
155-
+ " varchar(255) not null, "
156-
+ NAMESPACE_PROPERTY_KEY
157-
+ " varchar(255), "
158-
+ NAMESPACE_PROPERTY_VALUE
159-
+ " varchar(255), primary key ("
160-
+ CATALOG_NAME
161-
+ ", "
162-
+ NAMESPACE_NAME
163-
+ ", "
164-
+ NAMESPACE_PROPERTY_KEY
165-
+ ") );"),
138+
&format!("create table if not exists {} ({} varchar(255) not null, {} varchar(255) not null, {} varchar(255), {} varchar(255), primary key ({}, {}, {}))",
139+
NAMESPACE_PROPERTIES_TABLE_NAME,
140+
CATALOG_NAME,
141+
NAMESPACE_NAME,
142+
NAMESPACE_PROPERTY_KEY,
143+
NAMESPACE_PROPERTY_VALUE,
144+
CATALOG_NAME,
145+
NAMESPACE_NAME,
146+
NAMESPACE_PROPERTY_KEY)
147+
166148
)
167149
.execute(&pool)
168150
.await
@@ -217,20 +199,23 @@ struct TableRef {
217199

218200
fn query_map(row: &AnyRow) -> std::result::Result<TableRef, sqlx::Error> {
219201
Ok(TableRef {
220-
table_namespace: row.try_get(0)?,
221-
table_name: row.try_get(1)?,
222-
metadata_location: row.try_get(2)?,
223-
_previous_metadata_location: row.try_get::<String, _>(3).map(Some).or_else(|err| {
224-
if let sqlx::Error::ColumnDecode {
225-
index: _,
226-
source: _,
227-
} = err
228-
{
229-
Ok(None)
230-
} else {
231-
Err(err)
232-
}
233-
})?,
202+
table_namespace: row.try_get(TABLE_NAMESPACE)?,
203+
table_name: row.try_get(TABLE_NAME)?,
204+
metadata_location: row.try_get(METADATA_LOCATION_PROP)?,
205+
_previous_metadata_location: row
206+
.try_get::<String, _>(PREVIOUS_METADATA_LOCATION_PROP)
207+
.map(Some)
208+
.or_else(|err| {
209+
if let sqlx::Error::ColumnDecode {
210+
index: _,
211+
source: _,
212+
} = err
213+
{
214+
Ok(None)
215+
} else {
216+
Err(err)
217+
}
218+
})?,
234219
})
235220
}
236221

@@ -242,8 +227,8 @@ struct NamespacePropRef {
242227

243228
fn query_map_namespace(row: &AnyRow) -> std::result::Result<NamespacePropRef, sqlx::Error> {
244229
Ok(NamespacePropRef {
245-
namespace_prop_key: row.try_get(1)?,
246-
namespace_prop_value: row.try_get(2)?,
230+
namespace_prop_key: row.try_get(NAMESPACE_PROPERTY_KEY)?,
231+
namespace_prop_value: row.try_get(NAMESPACE_PROPERTY_VALUE)?,
247232
})
248233
}
249234

@@ -304,27 +289,30 @@ impl Catalog for SqlCatalog {
304289
NAMESPACE_PROPERTY_VALUE
305290
);
306291

307-
self.execute_statement(
308-
&query_string,
309-
vec![
310-
Some(&catalog_name),
311-
Some(&namespace),
312-
None::<&String>,
313-
None::<&String>,
314-
],
315-
)
316-
.await?;
317-
for (key, value) in properties.iter() {
292+
if properties.is_empty() {
318293
self.execute_statement(
319294
&query_string,
320295
vec![
321296
Some(&catalog_name),
322297
Some(&namespace),
323-
Some(&key),
324-
Some(&value),
298+
None::<&String>,
299+
None::<&String>,
325300
],
326301
)
327302
.await?;
303+
} else {
304+
for (key, value) in properties.iter() {
305+
self.execute_statement(
306+
&query_string,
307+
vec![
308+
Some(&catalog_name),
309+
Some(&namespace),
310+
Some(&key),
311+
Some(&value),
312+
],
313+
)
314+
.await?;
315+
}
328316
}
329317
}
330318

@@ -381,10 +369,65 @@ impl Catalog for SqlCatalog {
381369

382370
async fn update_namespace(
383371
&self,
384-
_namespace: &NamespaceIdent,
385-
_properties: HashMap<String, String>,
372+
namespace: &NamespaceIdent,
373+
properties: HashMap<String, String>,
386374
) -> Result<()> {
387-
todo!()
375+
let catalog_name = self.name.clone();
376+
let namespace_name = namespace.join(".");
377+
let exists = self.namespace_exists(namespace).await?;
378+
if !exists {
379+
Err(Error::new(
380+
ErrorKind::Unexpected,
381+
"cannot update namespace that does not exist",
382+
))
383+
} else {
384+
let current_nsp = self.get_namespace(namespace).await?;
385+
let current_props = current_nsp.properties();
386+
387+
for (key, value) in properties {
388+
if current_props.contains_key(&key) {
389+
self.execute_statement(
390+
&format!(
391+
"update {} set {} = ?, {} = ? where {} = ? and {} = ? and {} = ?",
392+
NAMESPACE_PROPERTIES_TABLE_NAME,
393+
NAMESPACE_PROPERTY_KEY,
394+
NAMESPACE_PROPERTY_VALUE,
395+
NAMESPACE_PROPERTY_KEY,
396+
CATALOG_NAME,
397+
NAMESPACE_NAME
398+
),
399+
vec![
400+
Some(&key),
401+
Some(&value),
402+
Some(&key),
403+
Some(&catalog_name),
404+
Some(&namespace_name),
405+
],
406+
)
407+
.await?;
408+
} else {
409+
self.execute_statement(
410+
&format!(
411+
"insert into {} ({}, {}, {}, {}) values (?, ?, ?, ?)",
412+
NAMESPACE_PROPERTIES_TABLE_NAME,
413+
CATALOG_NAME,
414+
NAMESPACE_NAME,
415+
NAMESPACE_PROPERTY_KEY,
416+
NAMESPACE_PROPERTY_VALUE
417+
),
418+
vec![
419+
Some(&catalog_name),
420+
Some(&namespace_name),
421+
Some(&key),
422+
Some(&value),
423+
],
424+
)
425+
.await?;
426+
}
427+
}
428+
429+
Ok(())
430+
}
388431
}
389432

390433
async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
@@ -394,7 +437,10 @@ impl Catalog for SqlCatalog {
394437
if tbls.len() > 0 {
395438
Err(Error::new(
396439
ErrorKind::Unexpected,
397-
format!("unable to drop namespace it contains {} tables", tbls.len()),
440+
format!(
441+
"unable to drop namespace as it contains {} tables",
442+
tbls.len()
443+
),
398444
))
399445
} else {
400446
self.execute_statement(
@@ -603,7 +649,7 @@ impl Catalog for SqlCatalog {
603649
let src_table_exist = self.table_exists(src).await;
604650
let dst_table_exist = self.table_exists(dest).await;
605651

606-
let _pre_rename_check = match src_table_exist {
652+
match src_table_exist {
607653
Ok(res) => {
608654
if res {
609655
match dst_table_exist {
@@ -665,10 +711,6 @@ impl Catalog for SqlCatalog {
665711

666712
async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
667713
todo!()
668-
// let table_ident = commit.identifier();
669-
// let requirements = commit.take_requirements();
670-
// let updates = commit.take_updates();
671-
// let table = self.load_table(table_ident).await?;
672714
}
673715
}
674716

@@ -678,7 +720,7 @@ pub mod tests {
678720

679721
use iceberg::{
680722
spec::{NestedField, PrimitiveType, Schema, Type},
681-
Catalog, NamespaceIdent, TableCreation, TableIdent,
723+
Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent,
682724
};
683725
use tempfile::TempDir;
684726

@@ -706,9 +748,10 @@ pub mod tests {
706748
let catalog = SqlCatalog::new(config).await.unwrap();
707749

708750
let namespace = NamespaceIdent::new("test".to_owned());
751+
let mut props = HashMap::from([("test_prop".to_string(), "test_prop_value".to_string())]);
709752

710753
catalog
711-
.create_namespace(&namespace, HashMap::new())
754+
.create_namespace(&namespace, props.clone())
712755
.await
713756
.unwrap();
714757

@@ -749,13 +792,58 @@ pub mod tests {
749792
.expect("Failed to list namespaces");
750793
assert_eq!(namespaces[0].encode_in_url(), "test");
751794

795+
let test_namespace = catalog
796+
.get_namespace(&namespace)
797+
.await
798+
.expect("Failed to get namespace");
799+
800+
assert_eq!(
801+
test_namespace,
802+
Namespace::with_properties(namespace.clone(), props.clone())
803+
);
804+
805+
props.insert("test_prop2".to_string(), "test_prop_value2".to_string());
806+
807+
catalog
808+
.update_namespace(&namespace, props.clone())
809+
.await
810+
.unwrap();
811+
812+
let test_namespace = catalog
813+
.get_namespace(&namespace)
814+
.await
815+
.expect("Failed to get namespace");
816+
817+
assert_eq!(
818+
test_namespace,
819+
Namespace::with_properties(namespace.clone(), props.clone())
820+
);
821+
752822
//load table points to a /var location - check why
753823

754824
let table = catalog.load_table(&identifier).await.unwrap();
755825

756826
assert!(table.metadata().location().ends_with("/warehouse/table1"));
757827

758-
//tear down the database and tables
828+
catalog.drop_table(&identifier).await.unwrap();
829+
830+
let exists = catalog
831+
.table_exists(&identifier)
832+
.await
833+
.expect("Table doesn't exist");
834+
835+
assert!(!exists);
836+
837+
catalog.drop_namespace(&namespace).await.unwrap();
838+
839+
let nsp_exists = catalog
840+
.namespace_exists(&namespace)
841+
.await
842+
.expect("Namespace doesn't exist");
843+
844+
assert!(!nsp_exists);
845+
846+
// tear down the database and tables
759847
sqlx::Sqlite::drop_database(sql_lite_uri).await.unwrap();
760848
}
761849
}

0 commit comments

Comments
 (0)