@@ -25,6 +25,7 @@ use datafusion::{
2525 catalog:: { CatalogProvider , SchemaProvider } ,
2626 datasource:: { TableProvider , TableType } ,
2727} ;
28+ use datafusion_ffi:: schema_provider:: { FFI_SchemaProvider , ForeignSchemaProvider } ;
2829use datafusion_ffi:: table_provider:: { FFI_TableProvider , ForeignTableProvider } ;
2930use pyo3:: exceptions:: PyKeyError ;
3031use pyo3:: prelude:: * ;
@@ -48,8 +49,8 @@ pub struct PyTable {
4849 pub table : Arc < dyn TableProvider > ,
4950}
5051
51- impl PyCatalog {
52- pub fn new ( catalog : Arc < dyn CatalogProvider > ) -> Self {
52+ impl From < Arc < dyn CatalogProvider > > for PyCatalog {
53+ fn from ( catalog : Arc < dyn CatalogProvider > ) -> Self {
5354 Self { catalog }
5455 }
5556}
@@ -72,6 +73,13 @@ impl PyTable {
7273
7374#[ pymethods]
7475impl PyCatalog {
76+ #[ new]
77+ fn new ( catalog : PyObject ) -> Self {
78+ let catalog_provider =
79+ Arc :: new ( RustWrappedPyCatalogProvider :: new ( catalog) ) as Arc < dyn CatalogProvider > ;
80+ catalog_provider. into ( )
81+ }
82+
7583 fn names ( & self ) -> Vec < String > {
7684 self . catalog . schema_names ( )
7785 }
@@ -286,3 +294,109 @@ impl SchemaProvider for RustWrappedPySchemaProvider {
286294 } )
287295 }
288296}
297+
298+ #[ derive( Debug ) ]
299+ struct RustWrappedPyCatalogProvider {
300+ catalog_provider : PyObject ,
301+ }
302+
303+ impl RustWrappedPyCatalogProvider {
304+ fn new ( catalog_provider : PyObject ) -> Self {
305+ Self { catalog_provider }
306+ }
307+
308+ fn schema_inner ( & self , name : & str ) -> PyResult < Option < Arc < dyn SchemaProvider > > > {
309+ Python :: with_gil ( |py| {
310+ let provider = self . catalog_provider . bind ( py) ;
311+
312+ let py_schema = provider. call_method1 ( "schema" , ( name, ) ) ?;
313+ if py_schema. is_none ( ) {
314+ return Ok ( None ) ;
315+ }
316+
317+ if py_schema. hasattr ( "__datafusion_schema_provider__" ) ? {
318+ let capsule = provider
319+ . getattr ( "__datafusion_schema_provider__" ) ?
320+ . call0 ( ) ?;
321+ let capsule = capsule. downcast :: < PyCapsule > ( ) . map_err ( py_datafusion_err) ?;
322+ validate_pycapsule ( capsule, "datafusion_schema_provider" ) ?;
323+
324+ let provider = unsafe { capsule. reference :: < FFI_SchemaProvider > ( ) } ;
325+ let provider: ForeignSchemaProvider = provider. into ( ) ;
326+
327+ Ok ( Some ( Arc :: new ( provider) as Arc < dyn SchemaProvider > ) )
328+ } else {
329+ let py_schema = RustWrappedPySchemaProvider :: new ( py_schema. into ( ) ) ;
330+
331+ Ok ( Some ( Arc :: new ( py_schema) as Arc < dyn SchemaProvider > ) )
332+ }
333+ } )
334+ }
335+ }
336+
337+ #[ async_trait]
338+ impl CatalogProvider for RustWrappedPyCatalogProvider {
339+ fn as_any ( & self ) -> & dyn Any {
340+ self
341+ }
342+
343+ fn schema_names ( & self ) -> Vec < String > {
344+ Python :: with_gil ( |py| {
345+ let provider = self . catalog_provider . bind ( py) ;
346+ provider
347+ . getattr ( "schema_names" )
348+ . and_then ( |names| names. extract :: < Vec < String > > ( ) )
349+ . unwrap_or_default ( )
350+ } )
351+ }
352+
353+ fn schema ( & self , name : & str ) -> Option < Arc < dyn SchemaProvider > > {
354+ self . schema_inner ( name) . unwrap_or_else ( |err| {
355+ log:: error!( "CatalogProvider schema returned error: {err}" ) ;
356+ None
357+ } )
358+ }
359+
360+ fn register_schema (
361+ & self ,
362+ name : & str ,
363+ schema : Arc < dyn SchemaProvider > ,
364+ ) -> datafusion:: common:: Result < Option < Arc < dyn SchemaProvider > > > {
365+ let py_schema: PyDatabase = schema. into ( ) ;
366+ Python :: with_gil ( |py| {
367+ let provider = self . catalog_provider . bind ( py) ;
368+ let schema = provider
369+ . call_method1 ( "register_schema" , ( name, py_schema) )
370+ . map_err ( to_datafusion_err) ?;
371+ if schema. is_none ( ) {
372+ return Ok ( None ) ;
373+ }
374+
375+ let schema = Arc :: new ( RustWrappedPySchemaProvider :: new ( schema. into ( ) ) )
376+ as Arc < dyn SchemaProvider > ;
377+
378+ Ok ( Some ( schema) )
379+ } )
380+ }
381+
382+ fn deregister_schema (
383+ & self ,
384+ name : & str ,
385+ cascade : bool ,
386+ ) -> datafusion:: common:: Result < Option < Arc < dyn SchemaProvider > > > {
387+ Python :: with_gil ( |py| {
388+ let provider = self . catalog_provider . bind ( py) ;
389+ let schema = provider
390+ . call_method1 ( "deregister_schema" , ( name, cascade) )
391+ . map_err ( to_datafusion_err) ?;
392+ if schema. is_none ( ) {
393+ return Ok ( None ) ;
394+ }
395+
396+ let schema = Arc :: new ( RustWrappedPySchemaProvider :: new ( schema. into ( ) ) )
397+ as Arc < dyn SchemaProvider > ;
398+
399+ Ok ( Some ( schema) )
400+ } )
401+ }
402+ }
0 commit comments