@@ -15,15 +15,13 @@ use datafusion_iceberg::catalog::catalog::IcebergCatalog;
1515use datafusion_iceberg:: planner:: IcebergQueryPlanner ;
1616use iceberg_rest_catalog:: apis:: configuration:: Configuration ;
1717use iceberg_rest_catalog:: catalog:: RestCatalog ;
18- use icelake:: TableIdentifier ;
1918use object_store:: path:: Path ;
2019use object_store:: { ObjectStore , PutPayload } ;
2120use runtime:: datafusion:: execution:: SqlExecutor ;
2221use rusoto_core:: { HttpClient , Region } ;
2322use rusoto_credential:: StaticProvider ;
2423use rusoto_s3:: { GetBucketAclRequest , S3Client , S3 } ;
2524use snafu:: ResultExt ;
26- use std:: collections:: HashMap ;
2725use std:: sync:: Arc ;
2826use url:: Url ;
2927use uuid:: Uuid ;
@@ -355,9 +353,8 @@ impl ControlService for ControlServiceImpl {
355353 data : Bytes ,
356354 file_name : String ,
357355 ) -> ControlPlaneResult < ( ) > {
358- //println!("{:?}", warehouse_id);
359-
360356 let warehouse = self . get_warehouse ( * warehouse_id) . await ?;
357+ let warehouse_name = warehouse. name . clone ( ) ;
361358 let storage_profile = self . get_profile ( warehouse. storage_profile_id ) . await ?;
362359 let object_store = storage_profile
363360 . get_object_store ( )
@@ -375,8 +372,25 @@ impl ControlService for ControlServiceImpl {
375372 . await
376373 . context ( crate :: error:: ObjectStoreSnafu ) ?;
377374
375+ // Create table from CSV
376+ let config = {
377+ let mut config = Configuration :: new ( ) ;
378+ config. base_path = "http://0.0.0.0:3000/catalog" . to_string ( ) ;
379+ config
380+ } ;
381+ let object_store_builder = storage_profile
382+ . get_object_store_builder ( )
383+ . context ( crate :: error:: InvalidStorageProfileSnafu ) ?;
384+ let rest_client = RestCatalog :: new (
385+ Some ( warehouse_id. to_string ( ) . as_str ( ) ) ,
386+ config,
387+ object_store_builder,
388+ ) ;
389+ let catalog = IcebergCatalog :: new ( Arc :: new ( rest_client) , None ) . await ?;
378390 let ctx = SessionContext :: new ( ) ;
391+ ctx. register_catalog ( warehouse_name. clone ( ) , Arc :: new ( catalog) ) ;
379392
393+ // Register CSV file as a table
380394 let storage_endpoint_url = storage_profile
381395 . endpoint
382396 . as_ref ( )
@@ -385,7 +399,7 @@ impl ControlService for ControlServiceImpl {
385399 let path_string = match & storage_profile. credentials {
386400 Credentials :: AccessKey ( _) => {
387401 // If the storage profile is AWS S3, modify the path_string with the S3 prefix
388- format ! ( "{storage_endpoint_url}/{path_string}" , )
402+ format ! ( "{storage_endpoint_url}/{path_string}" )
389403 }
390404 Credentials :: Role ( _) => path_string,
391405 } ;
@@ -395,11 +409,17 @@ impl ControlService for ControlServiceImpl {
395409 } ,
396410 ) ?;
397411 ctx. register_object_store ( & endpoint_url, Arc :: from ( object_store) ) ;
412+ ctx. register_csv ( table_name, path_string, CsvReadOptions :: new ( ) )
413+ . await ?;
398414
399- // println!("{:?}", data);
400- // Commented code is writing with iceberg-rust-jankaul
401- // Let it sit here just in case
402- //////////////////////////////////////
415+ let insert_query = format ! (
416+ "INSERT INTO {warehouse_name}.{database_name}.{table_name} SELECT * FROM {table_name}"
417+ ) ;
418+ let executor = SqlExecutor :: new ( ctx) . context ( crate :: error:: ExecutionSnafu ) ?;
419+ executor
420+ . execute_with_custom_plan ( & insert_query, warehouse_name. as_str ( ) )
421+ . await
422+ . context ( crate :: error:: ExecutionSnafu ) ?;
403423
404424 // let config = {
405425 // let mut config = Configuration::new();
@@ -438,65 +458,65 @@ impl ControlService for ControlServiceImpl {
438458
439459 //////////////////////////////////////
440460
441- let config = {
442- HashMap :: from ( [
443- ( "iceberg.catalog.type" . to_string ( ) , "rest" . to_string ( ) ) ,
444- (
445- "iceberg.catalog.demo.warehouse" . to_string ( ) ,
446- warehouse_id. to_string ( ) ,
447- ) ,
448- ( "iceberg.catalog.name" . to_string ( ) , "demo" . to_string ( ) ) ,
449- (
450- "iceberg.catalog.demo.uri" . to_string ( ) ,
451- "http://0.0.0.0:3000/catalog" . to_string ( ) ,
452- ) ,
453- (
454- "iceberg.table.io.region" . to_string ( ) ,
455- storage_profile. region . to_string ( ) ,
456- ) ,
457- (
458- "iceberg.table.io.endpoint" . to_string ( ) ,
459- storage_endpoint_url. to_string ( ) ,
460- ) ,
461- // (
462- // "iceberg.table.io.bucket".to_string(),
463- // "examples".to_string(),
464- // ),
465- // (
466- // "iceberg.table.io.access_key_id".to_string(),
467- // "minioadmin".to_string(),
468- // ),
469- // (
470- // "iceberg.table.io.secret_access_key".to_string(),
471- // "minioadmin".to_string(),
472- // ),
473- ] )
474- } ;
475- let catalog = icelake:: catalog:: load_catalog ( & config) . await ?;
476- let table_ident = TableIdentifier :: new ( vec ! [ database_name, table_name] ) ?;
477- let mut table = catalog. load_table ( & table_ident) . await ?;
478- let table_schema = table. current_arrow_schema ( ) ?;
479- println ! ( "{:?}" , table. table_name( ) ) ;
480-
481- let df = ctx
482- . read_csv ( path_string, CsvReadOptions :: new ( ) . schema ( & table_schema) )
483- . await ?;
484- let data = df. collect ( ) . await ?;
485-
486- let builder = table. writer_builder ( ) ?. rolling_writer_builder ( None ) ?;
487- let mut writer = table
488- . writer_builder ( ) ?
489- . build_append_only_writer ( builder)
490- . await ?;
491-
492- for r in data {
493- writer. write ( & r) . await ?;
494- }
495-
496- let res: Vec < icelake:: types:: DataFile > = writer. close ( ) . await ?;
497- let mut txn = icelake:: transaction:: Transaction :: new ( & mut table) ;
498- txn. append_data_file ( res) ;
499- txn. commit ( ) . await ?;
461+ // let config = {
462+ // HashMap::from([
463+ // ("iceberg.catalog.type".to_string(), "rest".to_string()),
464+ // (
465+ // "iceberg.catalog.demo.warehouse".to_string(),
466+ // warehouse_id.to_string(),
467+ // ),
468+ // ("iceberg.catalog.name".to_string(), "demo".to_string()),
469+ // (
470+ // "iceberg.catalog.demo.uri".to_string(),
471+ // "http://0.0.0.0:3000/catalog".to_string(),
472+ // ),
473+ // (
474+ // "iceberg.table.io.region".to_string(),
475+ // storage_profile.region.to_string(),
476+ // ),
477+ // (
478+ // "iceberg.table.io.endpoint".to_string(),
479+ // storage_endpoint_url.to_string(),
480+ // ),
481+ // // (
482+ // // "iceberg.table.io.bucket".to_string(),
483+ // // "examples".to_string(),
484+ // // ),
485+ // // (
486+ // // "iceberg.table.io.access_key_id".to_string(),
487+ // // "minioadmin".to_string(),
488+ // // ),
489+ // // (
490+ // // "iceberg.table.io.secret_access_key".to_string(),
491+ // // "minioadmin".to_string(),
492+ // // ),
493+ // ])
494+ // };
495+ // let catalog = icelake::catalog::load_catalog(&config).await?;
496+ // let table_ident = TableIdentifier::new(vec![database_name, table_name])?;
497+ // let mut table = catalog.load_table(&table_ident).await?;
498+ // let table_schema = table.current_arrow_schema()?;
499+ // println!("{:?}", table.table_name());
500+ //
501+ // let df = ctx
502+ // .read_csv(path_string, CsvReadOptions::new().schema(&table_schema))
503+ // .await?;
504+ // let data = df.collect().await?;
505+ //
506+ // let builder = table.writer_builder()?.rolling_writer_builder(None)?;
507+ // let mut writer = table
508+ // .writer_builder()?
509+ // .build_append_only_writer(builder)
510+ // .await?;
511+ //
512+ // for r in data {
513+ // writer.write(&r).await?;
514+ // }
515+ //
516+ // let res: Vec<icelake::types::DataFile> = writer.close().await?;
517+ // let mut txn = icelake::transaction::Transaction::new(&mut table);
518+ // txn.append_data_file(res);
519+ // txn.commit().await?;
500520
501521 Ok ( ( ) )
502522 }
0 commit comments