Skip to content

Commit 5f605a9

Browse files
committed
use TableSchema
1 parent 06167a4 commit 5f605a9

File tree

14 files changed

+53
-53
lines changed

14 files changed

+53
-53
lines changed

datafusion-examples/examples/csv_json_opener.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ use datafusion::{
3131
test_util::aggr_test_schema,
3232
};
3333

34-
use datafusion::datasource::physical_plan::FileScanConfigBuilder;
34+
use datafusion::datasource::{
35+
physical_plan::FileScanConfigBuilder, table_schema::TableSchema,
36+
};
3537
use futures::StreamExt;
3638
use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore};
3739

@@ -67,7 +69,7 @@ async fn csv_opener() -> Result<()> {
6769

6870
let config = CsvSource::new(true, b',', b'"')
6971
.with_comment(Some(b'#'))
70-
.with_schema(schema)
72+
.with_schema(TableSchema::new(schema, vec![]))
7173
.with_batch_size(8192)
7274
.with_projection(&scan_config);
7375

datafusion/core/src/datasource/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ pub use datafusion_catalog::view;
4545
pub use datafusion_datasource::schema_adapter;
4646
pub use datafusion_datasource::sink;
4747
pub use datafusion_datasource::source;
48+
pub use datafusion_datasource::table_schema;
4849
pub use datafusion_execution::object_store;
4950
pub use datafusion_physical_expr::create_ordering;
5051

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ mod tests {
5454
use datafusion_datasource::source::DataSourceExec;
5555

5656
use datafusion_datasource::file::FileSource;
57-
use datafusion_datasource::{FileRange, PartitionedFile};
57+
use datafusion_datasource::{FileRange, PartitionedFile, TableSchema};
5858
use datafusion_datasource_parquet::source::ParquetSource;
5959
use datafusion_datasource_parquet::{
6060
DefaultParquetFileReaderFactory, ParquetFileReaderFactory, ParquetFormat,
@@ -186,7 +186,7 @@ mod tests {
186186
source = source.with_bloom_filter_on_read(false);
187187
}
188188

189-
source.with_schema(Arc::clone(&table_schema))
189+
source.with_schema(TableSchema::new(Arc::clone(&table_schema), vec![]))
190190
}
191191

192192
fn build_parquet_exec(

datafusion/core/src/test_util/parquet.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,10 @@ impl TestParquetFile {
186186
ParquetSource::new(parquet_options)
187187
.with_predicate(Arc::clone(&physical_filter_expr)),
188188
)
189-
.with_schema(Arc::clone(&self.schema));
189+
.with_schema(datafusion_datasource::TableSchema::new(
190+
Arc::clone(&self.schema),
191+
vec![],
192+
));
190193
let config = scan_config_builder.with_source(source).build();
191194
let parquet_exec = DataSourceExec::from_data_source(config);
192195

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use datafusion_datasource::{
2424
file_scan_config::FileScanConfigBuilder, file_stream::FileOpenFuture,
2525
file_stream::FileOpener, schema_adapter::DefaultSchemaAdapterFactory,
2626
schema_adapter::SchemaAdapterFactory, source::DataSourceExec, PartitionedFile,
27+
TableSchema,
2728
};
2829
use datafusion_physical_expr_common::physical_expr::fmt_sql;
2930
use datafusion_physical_optimizer::PhysicalOptimizerRule;
@@ -156,9 +157,9 @@ impl FileSource for TestSource {
156157
})
157158
}
158159

159-
fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
160+
fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
160161
Arc::new(TestSource {
161-
schema: Some(schema),
162+
schema: Some(schema.file_schema().clone()),
162163
..self.clone()
163164
})
164165
}

datafusion/datasource-arrow/src/source.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ use std::sync::Arc;
2020

2121
use datafusion_datasource::as_file_source;
2222
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
23+
use datafusion_datasource::TableSchema;
2324

2425
use arrow::buffer::Buffer;
25-
use arrow::datatypes::SchemaRef;
2626
use arrow_ipc::reader::FileDecoder;
2727
use datafusion_common::error::Result;
2828
use datafusion_common::{exec_datafusion_err, Statistics};
@@ -73,7 +73,7 @@ impl FileSource for ArrowSource {
7373
Arc::new(Self { ..self.clone() })
7474
}
7575

76-
fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
76+
fn with_schema(&self, _schema: TableSchema) -> Arc<dyn FileSource> {
7777
Arc::new(Self { ..self.clone() })
7878
}
7979
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {

datafusion/datasource-avro/src/source.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use datafusion_datasource::file::FileSource;
2929
use datafusion_datasource::file_scan_config::FileScanConfig;
3030
use datafusion_datasource::file_stream::FileOpener;
3131
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
32+
use datafusion_datasource::TableSchema;
3233
use datafusion_physical_expr_common::sort_expr::LexOrdering;
3334
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
3435

@@ -84,9 +85,9 @@ impl FileSource for AvroSource {
8485
Arc::new(conf)
8586
}
8687

87-
fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
88+
fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
8889
let mut conf = self.clone();
89-
conf.schema = Some(schema);
90+
conf.schema = Some(schema.file_schema().clone());
9091
Arc::new(conf)
9192
}
9293
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {

datafusion/datasource-csv/src/source.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use datafusion_datasource::file_compression_type::FileCompressionType;
2929
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
3030
use datafusion_datasource::{
3131
as_file_source, calculate_range, FileRange, ListingTableUrl, PartitionedFile,
32-
RangeCalculation,
32+
RangeCalculation, TableSchema,
3333
};
3434

3535
use arrow::csv;
@@ -258,9 +258,9 @@ impl FileSource for CsvSource {
258258
Arc::new(conf)
259259
}
260260

261-
fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
261+
fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
262262
let mut conf = self.clone();
263-
conf.file_schema = Some(schema);
263+
conf.file_schema = Some(schema.file_schema().clone());
264264
Arc::new(conf)
265265
}
266266

datafusion/datasource-json/src/source.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
3232
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
3333
use datafusion_datasource::{
3434
as_file_source, calculate_range, ListingTableUrl, PartitionedFile, RangeCalculation,
35+
TableSchema,
3536
};
3637
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
3738

@@ -122,7 +123,7 @@ impl FileSource for JsonSource {
122123
Arc::new(conf)
123124
}
124125

125-
fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
126+
fn with_schema(&self, _schema: TableSchema) -> Arc<dyn FileSource> {
126127
Arc::new(Self { ..self.clone() })
127128
}
128129
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -463,8 +463,7 @@ impl FileFormat for ParquetFormat {
463463
metadata_size_hint = Some(metadata);
464464
}
465465

466-
let mut source = ParquetSource::new(self.options.clone())
467-
.with_table_partition_cols(conf.table_partition_cols().clone());
466+
let mut source = ParquetSource::new(self.options.clone());
468467

469468
// Use the CachedParquetFileReaderFactory
470469
let metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();

0 commit comments

Comments
 (0)