From 4ff1b18d3db9f50ba7f3d31288d0da37736d6b5f Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 5 Sep 2017 19:45:42 +0800 Subject: [PATCH 1/4] data source v2 --- .../spark/sql/sources/v2/DataSourceV2.java | 37 +++ .../sql/sources/v2/DataSourceV2Options.java | 49 ++++ .../v2/SchemaRequiredDataSourceV2.java | 42 ++++ .../sql/sources/v2/reader/DataReader.java | 36 +++ .../sources/v2/reader/DataSourceV2Reader.java | 58 +++++ .../spark/sql/sources/v2/reader/ReadTask.java | 44 ++++ .../CatalystFilterPushDownSupport.java | 40 +++ .../reader/downward/ColumnPruningSupport.java | 36 +++ .../downward/FilterPushDownSupport.java | 35 +++ .../sources/v2/reader/scan/UnsafeRowScan.java | 49 ++++ .../sources/v2/reader/upward/Statistics.java | 28 +++ .../v2/reader/upward/StatisticsSupport.java | 26 ++ .../apache/spark/sql/DataFrameReader.scala | 48 +++- .../spark/sql/execution/SparkPlanner.scala | 2 + .../datasources/v2/DataSourceRDD.scala | 71 ++++++ .../datasources/v2/DataSourceV2Relation.scala | 41 ++++ .../datasources/v2/DataSourceV2ScanExec.scala | 90 +++++++ .../datasources/v2/DataSourceV2Strategy.scala | 93 +++++++ .../sources/v2/JavaAdvancedDataSourceV2.java | 133 ++++++++++ .../v2/JavaSchemaRequiredDataSource.java | 53 ++++ .../sources/v2/JavaSimpleDataSourceV2.java | 85 +++++++ .../sources/v2/JavaUnsafeRowDataSourceV2.java | 90 +++++++ .../sql/sources/v2/DataSourceV2Suite.scala | 231 ++++++++++++++++++ 23 files changed, 1410 insertions(+), 7 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/SchemaRequiredDataSourceV2.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/downward/CatalystFilterPushDownSupport.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/downward/ColumnPruningSupport.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/downward/FilterPushDownSupport.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/scan/UnsafeRowScan.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/upward/Statistics.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/upward/StatisticsSupport.java create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala create mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java create mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java create mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java create mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java new file mode 100644 index 000000000000..15692b03d394 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.types.StructType; + +/** + * The main interface for data source v2 implementations. Users can mix in more interfaces to + * implement more functions other than just scan. + */ +public interface DataSourceV2 { + + /** + * Create a `DataSourceV2Reader` to scan the data for this data source. + * + * @param options the options for this data source reader, which is an immutable case-insensitive + * string-to-string map. + * @return a reader that implements the actual read logic. + */ + DataSourceV2Reader createReader(DataSourceV2Options options); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java new file mode 100644 index 000000000000..0e419b08d0dc --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; + +/** + * An immutable case-insensitive string-to-string map, which is used to represent data source + * options. + */ +public class DataSourceV2Options { + private final Map keyLowerCasedMap; + + private String toLowerCase(String key) { + return key.toLowerCase(Locale.ROOT); + } + + public DataSourceV2Options(Map originalMap) { + keyLowerCasedMap = new HashMap<>(originalMap.size()); + for (Map.Entry entry : originalMap.entrySet()) { + keyLowerCasedMap.put(toLowerCase(entry.getKey()), entry.getValue()); + } + } + + /** + * Returns the option value to which the specified key is mapped, case-insensitively. + */ + public Optional get(String key) { + return Optional.ofNullable(keyLowerCasedMap.get(toLowerCase(key))); + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SchemaRequiredDataSourceV2.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SchemaRequiredDataSourceV2.java new file mode 100644 index 000000000000..3b1b069c6358 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SchemaRequiredDataSourceV2.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.types.StructType; + +/** + * A variant of `DataSourceV2` which requires users to provide a schema when reading data. A data + * source can inherit both `DataSourceV2` and `SchemaRequiredDataSourceV2` if it supports both schema + * inference and user-specified schemas. + */ +public interface SchemaRequiredDataSourceV2 { + + /** + * Create a `DataSourceV2Reader` to scan the data for this data source. + * + * @param schema the full schema of this data source reader. Full schema usually maps to the + * physical schema of the underlying storage of this data source reader, e.g. + * CSV files, JSON files, etc, while this reader may not read data with full + * schema, as column pruning or other optimizations may happen. + * @param options the options for this data source reader, which is an immutable case-insensitive + * string-to-string map. + * @return a reader that implements the actual read logic. + */ + DataSourceV2Reader createReader(StructType schema, DataSourceV2Options options); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java new file mode 100644 index 000000000000..f171869fe8de --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import java.io.Closeable; + +/** + * A data reader returned by a read task and is responsible for outputting data for a RDD partition. + */ +public interface DataReader extends Closeable { + + /** + * Proceed to next record, returns false if there is no more records. + */ + boolean next(); + + /** + * Return the current record. This method should return same value until `next` is called. + */ + T get(); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java new file mode 100644 index 000000000000..78b3bdde2634 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import java.util.List; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.types.StructType; + +/** + * A data source reader that can mix in various query optimization interfaces and implement these + * optimizations. The actual scan logic should be delegated to `ReadTask`s that are returned by + * this data source reader. + * + * There are mainly 3 kinds of query optimizations: + * 1. push operators downward to the data source, e.g., column pruning, filter push down, etc. + * 2. propagate information upward to Spark, e.g., report statistics, report ordering, etc. + * 3. special scans like columnar scan, unsafe row scan, etc. Note that a data source reader can + * implement at most one special scan. + * + * Spark first applies all operator push-down optimizations which this data source supports. Then + * Spark collects information this data source provides for further optimizations. Finally Spark + * issues the scan request and does the actual data reading. + */ +public interface DataSourceV2Reader { + + /** + * Returns the actual schema of this data source reader, which may be different from the physical + * schema of the underlying storage, as column pruning or other optimizations may happen. + */ + StructType readSchema(); + + /** + * Returns a list of read tasks. Each task is responsible for outputting data for one RDD + * partition. That means the number of tasks returned here is same as the number of RDD + * partitions this scan outputs. + * + * Note that, this may not be a full scan if the data source reader mixes in other optimization + * interfaces like column pruning, filter push-down, etc. These optimizations are applied before + * Spark issues the scan request. + */ + List> createReadTasks(); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java new file mode 100644 index 000000000000..0382a183a3d8 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import java.io.Serializable; + +/** + * A read task returned by a data source reader and is responsible to create the data reader. + * The relationship between `ReadTask` and `DataReader` is similar to `Iterable` and `Iterator`. + * + * Note that, the read task will be serialized and sent to executors, then the data reader will be + * created on executors and do the actual reading. + */ +public interface ReadTask extends Serializable { + + /** + * The preferred locations where this read task can run faster, but Spark does not guarantee that + * this task will always run on these locations. The implementations should make sure that it can + * be run on any location. The location is a string representing the host name of an executor. + */ + default String[] preferredLocations() { + return new String[0]; + } + + /** + * Returns a data reader to do the actual reading work for this read task. + */ + DataReader createReader(); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/downward/CatalystFilterPushDownSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/downward/CatalystFilterPushDownSupport.java new file mode 100644 index 000000000000..8bcb1d902a4e --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/downward/CatalystFilterPushDownSupport.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.downward; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.catalyst.expressions.Expression; + +/** + * A mix-in interface for `DataSourceV2Reader`. Users can implement this interface to push down + * arbitrary expressions as predicates to the data source. This is an experimental and unstable + * interface + * + * Note that, if users implement both this interface and `FilterPushDownSupport`, Spark will ignore + * `FilterPushDownSupport` and only process this interface. + */ +@Experimental +@InterfaceStability.Unstable +public interface CatalystFilterPushDownSupport { + + /** + * Pushes down filters, and returns unsupported filters. + */ + Expression[] pushCatalystFilters(Expression[] filters); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/downward/ColumnPruningSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/downward/ColumnPruningSupport.java new file mode 100644 index 000000000000..c288ecb6dac4 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/downward/ColumnPruningSupport.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.downward; + +import org.apache.spark.sql.types.StructType; + +/** + * A mix-in interface for `DataSourceV2Reader`. Users can implement this interface to only read the + * required columns/nested fields during scan. + */ +public interface ColumnPruningSupport { + + /** + * Apply column pruning w.r.t. the given requiredSchema. + * + * Implementation should try its best to prune the unnecessary columns/nested fields, but it's + * also OK to do the pruning partially, e.g., a data source may not be able to prune nested + * fields, and only prune top-level columns. + */ + void pruneColumns(StructType requiredSchema); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/downward/FilterPushDownSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/downward/FilterPushDownSupport.java new file mode 100644 index 000000000000..772f8aadf1ec --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/downward/FilterPushDownSupport.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.downward; + +import org.apache.spark.sql.sources.Filter; + +/** + * A mix-in interface for `DataSourceV2Reader`. Users can implement this interface to push down + * filters to the data source and reduce the size of the data to be read. + * + * Note that, if users implement both this interface and `CatalystFilterPushDownSupport`, Spark + * will ignore this interface and only process `CatalystFilterPushDownSupport`. + */ +public interface FilterPushDownSupport { + + /** + * Pushes down filters, and returns unsupported filters. + */ + Filter[] pushFilters(Filter[] filters); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/scan/UnsafeRowScan.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/scan/UnsafeRowScan.java new file mode 100644 index 000000000000..d30b3b388c9f --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/scan/UnsafeRowScan.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.scan; + +import java.util.List; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.sources.v2.reader.ReadTask; + +/** + * A mix-in interface for `DataSourceV2Reader`. Users can implement this interface to output + * unsafe rows directly and avoid the row copy at Spark side. + * + * Note that, this is an experimental and unstable interface, as `UnsafeRow` is not public and + * may get changed in future Spark versions. + */ +@Experimental +@InterfaceStability.Unstable +public interface UnsafeRowScan extends DataSourceV2Reader { + + @Override + default List> createReadTasks() { + throw new IllegalStateException("createReadTasks should not be called with UnsafeRowScan."); + } + + /** + * Similar to `DataSourceV2Reader.createReadTasks`, but return data in unsafe row format. + */ + List> createUnsafeRowReadTasks(); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/upward/Statistics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/upward/Statistics.java new file mode 100644 index 000000000000..4c921beca608 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/upward/Statistics.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.upward; + +import java.util.OptionalLong; + +/** + * An interface to represent statistics for a data source. + */ +public interface Statistics { + OptionalLong sizeInBytes(); + OptionalLong numRows(); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/upward/StatisticsSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/upward/StatisticsSupport.java new file mode 100644 index 000000000000..60f589f18841 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/upward/StatisticsSupport.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.upward; + +/** + * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to report + * statistics to Spark. + */ +public interface StatisticsSupport { + Statistics getStatistics(); +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index c69acc413e87..7d804ae95c52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -32,6 +32,8 @@ import org.apache.spark.sql.execution.datasources.{DataSource, FailureSafeParser import org.apache.spark.sql.execution.datasources.csv._ import org.apache.spark.sql.execution.datasources.jdbc._ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options, SchemaRequiredDataSourceV2} import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.unsafe.types.UTF8String @@ -180,13 +182,45 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { "read files of Hive data source directly.") } - sparkSession.baseRelationToDataFrame( - DataSource.apply( - sparkSession, - paths = paths, - userSpecifiedSchema = userSpecifiedSchema, - className = source, - options = extraOptions.toMap).resolveRelation()) + val cls = DataSource.lookupDataSource(source) + val isDataSourceV2 = classOf[DataSourceV2].isAssignableFrom(cls) || + classOf[SchemaRequiredDataSourceV2].isAssignableFrom(cls) + if (isDataSourceV2) { + val dataSource = cls.newInstance() + val options = new DataSourceV2Options(extraOptions.asJava) + + val reader = (cls.newInstance(), userSpecifiedSchema) match { + case (ds: SchemaRequiredDataSourceV2, Some(schema)) => + ds.createReader(schema, options) + + case (ds: DataSourceV2, None) => + ds.createReader(options) + + case (_: SchemaRequiredDataSourceV2, None) => + throw new AnalysisException(s"A schema needs to be specified when using $dataSource.") + + case (ds: DataSourceV2, Some(schema)) => + val reader = ds.createReader(options) + if (reader.readSchema() != schema) { + throw new AnalysisException(s"$ds does not allow user-specified schemas.") + } + reader + + case _ => + throw new AnalysisException(s"$cls is not a valid Spark SQL Data Source.") + } + + Dataset.ofRows(sparkSession, DataSourceV2Relation(reader)) + } else { + // Code path for data source v1. + sparkSession.baseRelationToDataFrame( + DataSource.apply( + sparkSession, + paths = paths, + userSpecifiedSchema = userSpecifiedSchema, + className = source, + options = extraOptions.toMap).resolveRelation()) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index 4e718d609c92..b143d44eae17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileSourceStrategy} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy import org.apache.spark.sql.internal.SQLConf class SparkPlanner( @@ -35,6 +36,7 @@ class SparkPlanner( def strategies: Seq[Strategy] = experimentalMethods.extraStrategies ++ extraPlanningStrategies ++ ( + DataSourceV2Strategy :: FileSourceStrategy :: DataSourceStrategy(conf) :: SpecialLimits :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala new file mode 100644 index 000000000000..31c04498ec0a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.ReadTask + +class DataSourceRDDPartition(val index: Int, val readTask: ReadTask[UnsafeRow]) + extends Partition with Serializable + +class DataSourceRDD( + sc: SparkContext, + @transient private val generators: java.util.List[ReadTask[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + override protected def getPartitions: Array[Partition] = { + var index = 0 + val iter = generators.iterator() + val res = new Array[Partition](generators.size()) + while (iter.hasNext) { + res(index) = new DataSourceRDDPartition(index, iter.next()) + index += 1 + } + res + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { + val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createReader() + context.addTaskCompletionListener(_ => reader.close()) + val iter = new Iterator[UnsafeRow] { + private[this] var valuePrepared = false + + override def hasNext: Boolean = { + if (!valuePrepared) { + valuePrepared = reader.next() + } + valuePrepared + } + + override def next(): UnsafeRow = { + if (!hasNext) { + throw new java.util.NoSuchElementException("End of stream") + } + valuePrepared = false + reader.get() + } + } + new InterruptibleIterator(context, iter) + } + + override def getPreferredLocations(split: Partition): Seq[String] = { + split.asInstanceOf[DataSourceRDDPartition].readTask.preferredLocations() + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala new file mode 100644 index 000000000000..887dded288a2 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader +import org.apache.spark.sql.sources.v2.reader.upward.StatisticsSupport + +case class DataSourceV2Relation( + output: Seq[AttributeReference], + reader: DataSourceV2Reader) extends LeafNode { + + override def computeStats(): Statistics = reader match { + case r: StatisticsSupport => + Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes)) + case _ => + Statistics(sizeInBytes = conf.defaultSizeInBytes) + } +} + +object DataSourceV2Relation { + def apply(reader: DataSourceV2Reader): DataSourceV2Relation = { + new DataSourceV2Relation(reader.readSchema().toAttributes, reader) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala new file mode 100644 index 000000000000..1970cc2b9443 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import scala.collection.JavaConverters._ + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataSourceV2Reader, ReadTask} +import org.apache.spark.sql.sources.v2.reader.scan.UnsafeRowScan +import org.apache.spark.sql.types.StructType + +case class DataSourceV2ScanExec( + fullOutput: Array[AttributeReference], + @transient reader: DataSourceV2Reader, + // TODO: these 3 parameters are only used to determine the equality of the scan node, however, + // the reader also have this information, and ideally we can just rely on the equality of the + // reader. The only concern is, the reader implementation is outside of Spark and we have no + // control. + readSchema: StructType, + @transient filters: ExpressionSet, + hashPartitionKeys: Seq[String]) extends LeafExecNode { + + def output: Seq[Attribute] = readSchema.map(_.name).map { name => + fullOutput.find(_.name == name).get + } + + override def references: AttributeSet = AttributeSet.empty + + override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + + override protected def doExecute(): RDD[InternalRow] = { + val readTasks: java.util.List[ReadTask[UnsafeRow]] = reader match { + case r: UnsafeRowScan => r.createUnsafeRowReadTasks() + case _ => + reader.createReadTasks().asScala.map { + new RowToUnsafeRowReadTask(_, reader.readSchema()): ReadTask[UnsafeRow] + }.asJava + } + + val inputRDD = new DataSourceRDD(sparkContext, readTasks) + .asInstanceOf[RDD[InternalRow]] + val numOutputRows = longMetric("numOutputRows") + inputRDD.map { r => + numOutputRows += 1 + r + } + } +} + +class RowToUnsafeRowReadTask(rowReadTask: ReadTask[Row], schema: StructType) + extends ReadTask[UnsafeRow] { + + override def preferredLocations: Array[String] = rowReadTask.preferredLocations + + override def createReader: DataReader[UnsafeRow] = { + new RowToUnsafeDataReader(rowReadTask.createReader, RowEncoder.apply(schema)) + } +} + +class RowToUnsafeDataReader(rowReader: DataReader[Row], encoder: ExpressionEncoder[Row]) + extends DataReader[UnsafeRow] { + + override def next: Boolean = rowReader.next + + override def get: UnsafeRow = encoder.toRow(rowReader.get).asInstanceOf[UnsafeRow] + + override def close(): Unit = rowReader.close() +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala new file mode 100644 index 000000000000..9740ca6a4dca --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.Strategy +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.v2.reader.downward.{CatalystFilterPushDownSupport, ColumnPruningSupport, FilterPushDownSupport} + +object DataSourceV2Strategy extends Strategy { + // TODO: write path + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case PhysicalOperation(projects, filters, DataSourceV2Relation(output, reader)) => + val stayUpFilters: Seq[Expression] = reader match { + case r: CatalystFilterPushDownSupport => + r.pushCatalystFilters(filters.toArray) + + case r: FilterPushDownSupport => + // A map from original Catalyst expressions to corresponding translated data source + // filters. If a predicate is not in this map, it means it cannot be pushed down. + val translatedMap: Map[Expression, Filter] = filters.flatMap { p => + DataSourceStrategy.translateFilter(p).map(f => p -> f) + }.toMap + + // Catalyst predicate expressions that cannot be converted to data source filters. + val nonConvertiblePredicates = filters.filterNot(translatedMap.contains) + + // Data source filters that cannot be pushed down. An unhandled filter means + // the data source cannot guarantee the rows returned can pass the filter. + // As a result we must return it so Spark can plan an extra filter operator. + val unhandledFilters = r.pushFilters(translatedMap.values.toArray).toSet + val unhandledPredicates = translatedMap.filter { case (_, f) => + unhandledFilters.contains(f) + }.keys + + nonConvertiblePredicates ++ unhandledPredicates + + case _ => filters + } + + val attrMap = AttributeMap(output.zip(output)) + val projectSet = AttributeSet(projects.flatMap(_.references)) + val filterSet = AttributeSet(stayUpFilters.flatMap(_.references)) + + // Match original case of attributes. + // TODO: nested fields pruning + val requiredColumns = (projectSet ++ filterSet).toSeq.map(attrMap) + reader match { + case r: ColumnPruningSupport => + r.pruneColumns(requiredColumns.toStructType) + case _ => + } + + val scan = DataSourceV2ScanExec( + output.toArray, + reader, + reader.readSchema(), + ExpressionSet(filters), + Nil) + + val filterCondition = stayUpFilters.reduceLeftOption(And) + val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan) + + val withProject = if (projects == withFilter.output) { + withFilter + } else { + ProjectExec(projects, withFilter) + } + + withProject :: Nil + + case _ => Nil + } +} diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java new file mode 100644 index 000000000000..4f644f010d16 --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources.v2; + +import java.io.IOException; +import java.util.*; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.GenericRow; +import org.apache.spark.sql.sources.Filter; +import org.apache.spark.sql.sources.GreaterThan; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.reader.DataReader; +import org.apache.spark.sql.sources.v2.reader.downward.ColumnPruningSupport; +import org.apache.spark.sql.sources.v2.reader.ReadTask; +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.sources.v2.reader.downward.FilterPushDownSupport; +import org.apache.spark.sql.types.StructType; + +public class JavaAdvancedDataSourceV2 implements DataSourceV2 { + + class Reader implements DataSourceV2Reader, ColumnPruningSupport, FilterPushDownSupport { + private StructType requiredSchema = new StructType().add("i", "int").add("j", "int"); + private Filter[] filters = new Filter[0]; + + @Override + public StructType readSchema() { + return requiredSchema; + } + + @Override + public void pruneColumns(StructType requiredSchema) { + this.requiredSchema = requiredSchema; + } + + @Override + public Filter[] pushFilters(Filter[] filters) { + this.filters = filters; + return new Filter[0]; + } + + @Override + public List> createReadTasks() { + List> res = new ArrayList<>(); + + Integer lowerBound = null; + for (Filter filter : filters) { + if (filter instanceof GreaterThan) { + GreaterThan f = (GreaterThan) filter; + if ("i".equals(f.attribute()) && f.value() instanceof Integer) { + lowerBound = (Integer) f.value(); + break; + } + } + } + + if (lowerBound == null) { + res.add(new JavaAdvancedReadTask(0, 5, requiredSchema)); + res.add(new JavaAdvancedReadTask(5, 10, requiredSchema)); + } else if (lowerBound < 4) { + res.add(new JavaAdvancedReadTask(lowerBound + 1, 5, requiredSchema)); + res.add(new JavaAdvancedReadTask(5, 10, requiredSchema)); + } else if (lowerBound < 9) { + res.add(new JavaAdvancedReadTask(lowerBound + 1, 10, requiredSchema)); + } + + return res; + } + } + + static class JavaAdvancedReadTask implements ReadTask, DataReader { + private int start; + private int end; + private StructType requiredSchema; + + JavaAdvancedReadTask(int start, int end, StructType requiredSchema) { + this.start = start; + this.end = end; + this.requiredSchema = requiredSchema; + } + + @Override + public DataReader createReader() { + return new JavaAdvancedReadTask(start - 1, end, requiredSchema); + } + + @Override + public boolean next() { + start += 1; + return start < end; + } + + @Override + public Row get() { + Object[] values = new Object[requiredSchema.size()]; + for (int i = 0; i < values.length; i++) { + if ("i".equals(requiredSchema.apply(i).name())) { + values[i] = start; + } else if ("j".equals(requiredSchema.apply(i).name())) { + values[i] = -start; + } + } + return new GenericRow(values); + } + + @Override + public void close() throws IOException { + + } + } + + + @Override + public DataSourceV2Reader createReader(DataSourceV2Options options) { + return new Reader(); + } +} diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java new file mode 100644 index 000000000000..0a4930ad0df9 --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources.v2; + +import java.util.List; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.SchemaRequiredDataSourceV2; +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.sources.v2.reader.ReadTask; +import org.apache.spark.sql.types.StructType; + +public class JavaSchemaRequiredDataSource implements SchemaRequiredDataSourceV2 { + + class Reader implements DataSourceV2Reader { + private final StructType schema; + + Reader(StructType schema) { + this.schema = schema; + } + + @Override + public StructType readSchema() { + return schema; + } + + @Override + public List> createReadTasks() { + return java.util.Collections.emptyList(); + } + } + + @Override + public DataSourceV2Reader createReader(StructType schema, DataSourceV2Options options) { + return new Reader(schema); + } +} diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java new file mode 100644 index 000000000000..3bb260457f69 --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources.v2; + +import java.io.IOException; +import java.util.List; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.GenericRow; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.reader.DataReader; +import org.apache.spark.sql.sources.v2.reader.ReadTask; +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.types.StructType; + +public class JavaSimpleDataSourceV2 implements DataSourceV2 { + + class Reader implements DataSourceV2Reader { + private final StructType schema = new StructType().add("i", "int").add("j", "int"); + + @Override + public StructType readSchema() { + return schema; + } + + @Override + public List> createReadTasks() { + return java.util.Arrays.asList( + new JavaSimpleReadTask(0, 5), + new JavaSimpleReadTask(5, 10)); + } + } + + static class JavaSimpleReadTask implements ReadTask, DataReader { + private int start; + private int end; + + JavaSimpleReadTask(int start, int end) { + this.start = start; + this.end = end; + } + + @Override + public DataReader createReader() { + return new JavaSimpleReadTask(start - 1, end); + } + + @Override + public boolean next() { + start += 1; + return start < end; + } + + @Override + public Row get() { + return new GenericRow(new Object[] {start, -start}); + } + + @Override + public void close() throws IOException { + + } + } + + @Override + public DataSourceV2Reader createReader(DataSourceV2Options options) { + return new Reader(); + } +} diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java new file mode 100644 index 000000000000..06043cd6d58c --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources.v2; + +import java.io.IOException; +import java.util.List; + +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.reader.DataReader; +import org.apache.spark.sql.sources.v2.reader.ReadTask; +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.sources.v2.reader.scan.UnsafeRowScan; +import org.apache.spark.sql.types.StructType; + +public class JavaUnsafeRowDataSourceV2 implements DataSourceV2 { + + class Reader implements DataSourceV2Reader, UnsafeRowScan { + private final StructType schema = new StructType().add("i", "int").add("j", "int"); + + @Override + public StructType readSchema() { + return schema; + } + + @Override + public List> createUnsafeRowReadTasks() { + return java.util.Arrays.asList( + new JavaUnsafeRowReadTask(0, 5), + new JavaUnsafeRowReadTask(5, 10)); + } + } + + static class JavaUnsafeRowReadTask implements ReadTask, DataReader { + private int start; + private int end; + private UnsafeRow row; + + JavaUnsafeRowReadTask(int start, int end) { + this.start = start; + this.end = end; + this.row = new UnsafeRow(2); + row.pointTo(new byte[8 * 3], 8 * 3); + } + + @Override + public DataReader createReader() { + return new JavaUnsafeRowReadTask(start - 1, end); + } + + @Override + public boolean next() { + start += 1; + return start < end; + } + + @Override + public UnsafeRow get() { + row.setInt(0, start); + row.setInt(1, -start); + return row; + } + + @Override + public void close() throws IOException { + + } + } + + @Override + public DataSourceV2Reader createReader(DataSourceV2Options options) { + return new Reader(); + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala new file mode 100644 index 000000000000..3101261309da --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2 + +import java.util.{ArrayList, List => JList} + +import test.org.apache.spark.sql.sources.v2._ + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.{Filter, GreaterThan} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.downward.{ColumnPruningSupport, FilterPushDownSupport} +import org.apache.spark.sql.sources.v2.reader.scan.UnsafeRowScan +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType + +class DataSourceV2Suite extends QueryTest with SharedSQLContext { + import testImplicits._ + + test("simplest implementation") { + Seq(classOf[SimpleDataSourceV2], classOf[JavaSimpleDataSourceV2]).foreach { cls => + withClue(cls.getName) { + val df = spark.read.format(cls.getName).load() + checkAnswer(df, (0 until 10).map(i => Row(i, -i))) + checkAnswer(df.select('j), (0 until 10).map(i => Row(-i))) + checkAnswer(df.filter('i > 5), (6 until 10).map(i => Row(i, -i))) + } + } + } + + test("advanced implementation") { + Seq(classOf[AdvancedDataSourceV2], classOf[JavaAdvancedDataSourceV2]).foreach { cls => + withClue(cls.getName) { + val df = spark.read.format(cls.getName).load() + checkAnswer(df, (0 until 10).map(i => Row(i, -i))) + checkAnswer(df.select('j), (0 until 10).map(i => Row(-i))) + checkAnswer(df.filter('i > 3), (4 until 10).map(i => Row(i, -i))) + checkAnswer(df.select('j).filter('i > 6), (7 until 10).map(i => Row(-i))) + checkAnswer(df.select('i).filter('i > 10), Nil) + } + } + } + + test("unsafe row implementation") { + Seq(classOf[UnsafeRowDataSourceV2], classOf[JavaUnsafeRowDataSourceV2]).foreach { cls => + withClue(cls.getName) { + val df = spark.read.format(cls.getName).load() + checkAnswer(df, (0 until 10).map(i => Row(i, -i))) + checkAnswer(df.select('j), (0 until 10).map(i => Row(-i))) + checkAnswer(df.filter('i > 5), (6 until 10).map(i => Row(i, -i))) + } + } + } + + test("schema required data source") { + Seq(classOf[SchemaRequiredDataSource], classOf[JavaSchemaRequiredDataSource]).foreach { cls => + withClue(cls.getName) { + val e = intercept[AnalysisException](spark.read.format(cls.getName).load()) + assert(e.message.contains("A schema needs to be specified")) + + val schema = new StructType().add("i", "int").add("s", "string") + val df = spark.read.format(cls.getName).schema(schema).load() + + assert(df.schema == schema) + assert(df.collect().isEmpty) + } + } + } +} + +class SimpleDataSourceV2 extends DataSourceV2 { + + class Reader extends DataSourceV2Reader { + override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int") + + override def createReadTasks(): JList[ReadTask[Row]] = { + java.util.Arrays.asList(new SimpleReadTask(0, 5), new SimpleReadTask(5, 10)) + } + } + + override def createReader(options: DataSourceV2Options): DataSourceV2Reader = new Reader +} + +class SimpleReadTask(start: Int, end: Int) extends ReadTask[Row] with DataReader[Row] { + private var current = start - 1 + + override def createReader(): DataReader[Row] = new SimpleReadTask(start, end) + + override def next(): Boolean = { + current += 1 + current < end + } + + override def get(): Row = Row(current, -current) + + override def close(): Unit = {} +} + + + +class AdvancedDataSourceV2 extends DataSourceV2 { + + class Reader extends DataSourceV2Reader + with ColumnPruningSupport with FilterPushDownSupport { + + var requiredSchema = new StructType().add("i", "int").add("j", "int") + var filters = Array.empty[Filter] + + override def pruneColumns(requiredSchema: StructType): Unit = { + this.requiredSchema = requiredSchema + } + + override def pushFilters(filters: Array[Filter]): Array[Filter] = { + this.filters = filters + Array.empty + } + + override def readSchema(): StructType = { + requiredSchema + } + + override def createReadTasks(): JList[ReadTask[Row]] = { + val lowerBound = filters.collect { + case GreaterThan("i", v: Int) => v + }.headOption + + val res = new ArrayList[ReadTask[Row]] + + if (lowerBound.isEmpty) { + res.add(new AdvancedReadTask(0, 5, requiredSchema)) + res.add(new AdvancedReadTask(5, 10, requiredSchema)) + } else if (lowerBound.get < 4) { + res.add(new AdvancedReadTask(lowerBound.get + 1, 5, requiredSchema)) + res.add(new AdvancedReadTask(5, 10, requiredSchema)) + } else if (lowerBound.get < 9) { + res.add(new AdvancedReadTask(lowerBound.get + 1, 10, requiredSchema)) + } + + res + } + } + + override def createReader(options: DataSourceV2Options): DataSourceV2Reader = new Reader +} + +class AdvancedReadTask(start: Int, end: Int, requiredSchema: StructType) + extends ReadTask[Row] with DataReader[Row] { + + private var current = start - 1 + + override def createReader(): DataReader[Row] = new AdvancedReadTask(start, end, requiredSchema) + + override def close(): Unit = {} + + override def next(): Boolean = { + current += 1 + current < end + } + + override def get(): Row = { + val values = requiredSchema.map(_.name).map { + case "i" => current + case "j" => -current + } + Row.fromSeq(values) + } +} + + +class UnsafeRowDataSourceV2 extends DataSourceV2 { + + class Reader extends DataSourceV2Reader with UnsafeRowScan { + override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int") + + override def createUnsafeRowReadTasks(): JList[ReadTask[UnsafeRow]] = { + java.util.Arrays.asList(new UnsafeRowReadTask(0, 5), new UnsafeRowReadTask(5, 10)) + } + } + + override def createReader(options: DataSourceV2Options): DataSourceV2Reader = new Reader +} + +class UnsafeRowReadTask(start: Int, end: Int) + extends ReadTask[UnsafeRow] with DataReader[UnsafeRow] { + + private val row = new UnsafeRow(2) + row.pointTo(new Array[Byte](8 * 3), 8 * 3) + + private var current = start - 1 + + override def createReader(): DataReader[UnsafeRow] = new UnsafeRowReadTask(start, end) + + override def next(): Boolean = { + current += 1 + current < end + } + override def get(): UnsafeRow = { + row.setInt(0, current) + row.setInt(1, -current) + row + } + + override def close(): Unit = {} +} + +class SchemaRequiredDataSource extends SchemaRequiredDataSourceV2 { + + class Reader(val readSchema: StructType) extends DataSourceV2Reader { + override def createReadTasks(): JList[ReadTask[Row]] = + java.util.Collections.emptyList() + } + + override def createReader(schema: StructType, options: DataSourceV2Options): DataSourceV2Reader = + new Reader(schema) +} From abcc606e006e9975d1507eed379a48a3134165ad Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 13 Sep 2017 23:09:04 +0800 Subject: [PATCH 2/4] naming updates --- .../spark/sql/sources/v2/DataSourceV2.java | 22 +++------- .../sql/sources/v2/DataSourceV2Options.java | 4 +- .../spark/sql/sources/v2/ReadSupport.java | 36 +++++++++++++++++ ...urceV2.java => ReadSupportWithSchema.java} | 11 +++-- .../sources/v2/reader/DataSourceV2Reader.java | 14 ++++--- .../v2/reader/{upward => }/Statistics.java | 2 +- ...a => SupportsPushDownCatalystFilters.java} | 10 ++--- ...port.java => SupportsPushDownFilters.java} | 8 ++-- ...a => SupportsPushDownRequiredColumns.java} | 15 ++++--- ...ort.java => SupportsReportStatistics.java} | 4 +- ...owScan.java => SupportsScanUnsafeRow.java} | 12 +++--- .../apache/spark/sql/DataFrameReader.scala | 16 ++++---- .../datasources/v2/DataSourceV2Relation.scala | 5 +-- .../datasources/v2/DataSourceV2ScanExec.scala | 5 +-- .../datasources/v2/DataSourceV2Strategy.scala | 8 ++-- .../sources/v2/JavaAdvancedDataSourceV2.java | 11 ++--- .../v2/JavaSchemaRequiredDataSource.java | 5 ++- .../sources/v2/JavaSimpleDataSourceV2.java | 3 +- .../sources/v2/JavaUnsafeRowDataSourceV2.java | 10 ++--- .../sources/v2/DataSourceV2OptionsSuite.scala | 40 +++++++++++++++++++ .../sql/sources/v2/DataSourceV2Suite.scala | 14 +++---- 21 files changed, 159 insertions(+), 96 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java rename sql/core/src/main/java/org/apache/spark/sql/sources/v2/{SchemaRequiredDataSourceV2.java => ReadSupportWithSchema.java} (79%) rename sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/{upward => }/Statistics.java (94%) rename sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/{downward/CatalystFilterPushDownSupport.java => SupportsPushDownCatalystFilters.java} (79%) rename sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/{downward/FilterPushDownSupport.java => SupportsPushDownFilters.java} (80%) rename sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/{downward/ColumnPruningSupport.java => SupportsPushDownRequiredColumns.java} (74%) rename sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/{upward/StatisticsSupport.java => SupportsReportStatistics.java} (91%) rename sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/{scan/UnsafeRowScan.java => SupportsScanUnsafeRow.java} (81%) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2OptionsSuite.scala diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java index 15692b03d394..315b6b9511ea 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java @@ -17,21 +17,11 @@ package org.apache.spark.sql.sources.v2; -import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; -import org.apache.spark.sql.types.StructType; - /** - * The main interface for data source v2 implementations. Users can mix in more interfaces to - * implement more functions other than just scan. + * The base interface for data source v2 implementations. + * + * Note that this is an empty interface, data source implementations should mix-in at least one of + * the plug-in interfaces like `ReadSupport`. Otherwise it's just a dummy data source which is + * un-readable/writable. */ -public interface DataSourceV2 { - - /** - * Create a `DataSourceV2Reader` to scan the data for this data source. - * - * @param options the options for this data source reader, which is an immutable case-insensitive - * string-to-string map. - * @return a reader that implements the actual read logic. - */ - DataSourceV2Reader createReader(DataSourceV2Options options); -} +public interface DataSourceV2 {} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java index 0e419b08d0dc..b86d82d25e8a 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java @@ -23,8 +23,8 @@ import java.util.Optional; /** - * An immutable case-insensitive string-to-string map, which is used to represent data source - * options. + * An immutable string-to-string map in which keys are case-insensitive. This is used to represent + * data source options. */ public class DataSourceV2Options { private final Map keyLowerCasedMap; diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java new file mode 100644 index 000000000000..7bb710dc4836 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; + +/** + * A mix-in interface for `DataSourceV2`. Users can implement this interface to provide data reading + * ability and scan the data from the data source. + */ +public interface ReadSupport { + + /** + * Creates a `DataSourceV2Reader` to scan the data for this data source. + * + * @param options the options for this data source reader, which is an immutable case-insensitive + * string-to-string map. + * @return a reader that implements the actual read logic. + */ + DataSourceV2Reader createReader(DataSourceV2Options options); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SchemaRequiredDataSourceV2.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java similarity index 79% rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/SchemaRequiredDataSourceV2.java rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java index 3b1b069c6358..9de3e2f08e6c 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SchemaRequiredDataSourceV2.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java @@ -21,11 +21,14 @@ import org.apache.spark.sql.types.StructType; /** - * A variant of `DataSourceV2` which requires users to provide a schema when reading data. A data - * source can inherit both `DataSourceV2` and `SchemaRequiredDataSourceV2` if it supports both schema - * inference and user-specified schemas. + * A mix-in interface for `DataSourceV2`. Users can implement this interface to provide data reading + * ability and scan the data from the data source. + * + * This is a variant of `ReadSupport` that accepts user-specified schema when reading data. A data + * source can implement both `ReadSupport` and `ReadSupportWithSchema` if it supports both schema + * inference and user-specified schema. */ -public interface SchemaRequiredDataSourceV2 { +public interface ReadSupportWithSchema { /** * Create a `DataSourceV2Reader` to scan the data for this data source. diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java index 78b3bdde2634..4cf64182bfd8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java @@ -28,13 +28,15 @@ * this data source reader. * * There are mainly 3 kinds of query optimizations: - * 1. push operators downward to the data source, e.g., column pruning, filter push down, etc. - * 2. propagate information upward to Spark, e.g., report statistics, report ordering, etc. - * 3. special scans like columnar scan, unsafe row scan, etc. Note that a data source reader can - * implement at most one special scan. + * 1. Operators push-down. E.g., filter push-down, required columns push-down(aka column + * pruning), etc. These push-down interfaces are named like `SupportsPushDownXXX`. + * 2. Information Reporting. E.g., statistics reporting, ordering reporting, etc. These + * reporting interfaces are named like `SupportsReportingXXX`. + * 3. Special scan. E.g, columnar scan, unsafe row scan, etc. Note that a data source reader can + * implement at most one special scan. These scan interfaces are named like `SupportsScanXXX`. * - * Spark first applies all operator push-down optimizations which this data source supports. Then - * Spark collects information this data source provides for further optimizations. Finally Spark + * Spark first applies all operator push-down optimizations that this data source supports. Then + * Spark collects information this data source reported for further optimizations. Finally Spark * issues the scan request and does the actual data reading. */ public interface DataSourceV2Reader { diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/upward/Statistics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java similarity index 94% rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/upward/Statistics.java rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java index 4c921beca608..4123d2b55a5c 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/upward/Statistics.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.sources.v2.reader.upward; +package org.apache.spark.sql.sources.v2.reader; import java.util.OptionalLong; diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/downward/CatalystFilterPushDownSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java similarity index 79% rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/downward/CatalystFilterPushDownSupport.java rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java index 8bcb1d902a4e..9268714cd4ef 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/downward/CatalystFilterPushDownSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.sources.v2.reader.downward; +package org.apache.spark.sql.sources.v2.reader; import org.apache.spark.annotation.Experimental; import org.apache.spark.annotation.InterfaceStability; @@ -24,14 +24,14 @@ /** * A mix-in interface for `DataSourceV2Reader`. Users can implement this interface to push down * arbitrary expressions as predicates to the data source. This is an experimental and unstable - * interface + * interface as `Expression` is not public and may get changed in future Spark versions. * - * Note that, if users implement both this interface and `FilterPushDownSupport`, Spark will ignore - * `FilterPushDownSupport` and only process this interface. + * Note that, if users implement both this interface and `SupportsPushDownFilters`, Spark will + * ignore `SupportsPushDownFilters` and only process this interface. */ @Experimental @InterfaceStability.Unstable -public interface CatalystFilterPushDownSupport { +public interface SupportsPushDownCatalystFilters { /** * Pushes down filters, and returns unsupported filters. diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/downward/FilterPushDownSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java similarity index 80% rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/downward/FilterPushDownSupport.java rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java index 772f8aadf1ec..d9f33f76f287 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/downward/FilterPushDownSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.sources.v2.reader.downward; +package org.apache.spark.sql.sources.v2.reader; import org.apache.spark.sql.sources.Filter; @@ -23,10 +23,10 @@ * A mix-in interface for `DataSourceV2Reader`. Users can implement this interface to push down * filters to the data source and reduce the size of the data to be read. * - * Note that, if users implement both this interface and `CatalystFilterPushDownSupport`, Spark - * will ignore this interface and only process `CatalystFilterPushDownSupport`. + * Note that, if users implement both this interface and `SupportsPushDownCatalystFilters`, Spark + * will ignore this interface and only process `SupportsPushDownCatalystFilters`. */ -public interface FilterPushDownSupport { +public interface SupportsPushDownFilters { /** * Pushes down filters, and returns unsupported filters. diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/downward/ColumnPruningSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java similarity index 74% rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/downward/ColumnPruningSupport.java rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java index c288ecb6dac4..5a575436273e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/downward/ColumnPruningSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java @@ -15,22 +15,25 @@ * limitations under the License. */ -package org.apache.spark.sql.sources.v2.reader.downward; +package org.apache.spark.sql.sources.v2.reader; import org.apache.spark.sql.types.StructType; /** - * A mix-in interface for `DataSourceV2Reader`. Users can implement this interface to only read the - * required columns/nested fields during scan. + * A mix-in interface for `DataSourceV2Reader`. Users can implement this interface to push down + * required columns and only read these columns during scan. */ -public interface ColumnPruningSupport { +public interface SupportsPushDownRequiredColumns { /** - * Apply column pruning w.r.t. the given requiredSchema. + * Applies column pruning w.r.t. the given requiredSchema. * - * Implementation should try its best to prune the unnecessary columns/nested fields, but it's + * Implementation should try its best to prune the unnecessary columns or nested fields, but it's * also OK to do the pruning partially, e.g., a data source may not be able to prune nested * fields, and only prune top-level columns. + * + * Note that, data source implementations should update `DataSourceReader.readSchema` after + * applying column pruning. */ void pruneColumns(StructType requiredSchema); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/upward/StatisticsSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java similarity index 91% rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/upward/StatisticsSupport.java rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java index 60f589f18841..6378c70e86e5 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/upward/StatisticsSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java @@ -15,12 +15,12 @@ * limitations under the License. */ -package org.apache.spark.sql.sources.v2.reader.upward; +package org.apache.spark.sql.sources.v2.reader; /** * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to report * statistics to Spark. */ -public interface StatisticsSupport { +public interface SupportsReportStatistics { Statistics getStatistics(); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/scan/UnsafeRowScan.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java similarity index 81% rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/scan/UnsafeRowScan.java rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java index d30b3b388c9f..2b26ec25ffd6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/scan/UnsafeRowScan.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.sources.v2.reader.scan; +package org.apache.spark.sql.sources.v2.reader; import java.util.List; @@ -28,18 +28,16 @@ /** * A mix-in interface for `DataSourceV2Reader`. Users can implement this interface to output - * unsafe rows directly and avoid the row copy at Spark side. - * - * Note that, this is an experimental and unstable interface, as `UnsafeRow` is not public and - * may get changed in future Spark versions. + * unsafe rows directly and avoid the row copy at Spark side. This is an experimental and unstable + * interface, as `UnsafeRow` is not public and may get changed in future Spark versions. */ @Experimental @InterfaceStability.Unstable -public interface UnsafeRowScan extends DataSourceV2Reader { +public interface SupportsScanUnsafeRow extends DataSourceV2Reader { @Override default List> createReadTasks() { - throw new IllegalStateException("createReadTasks should not be called with UnsafeRowScan."); + throw new IllegalStateException("createReadTasks should not be called with SupportsScanUnsafeRow."); } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 7d804ae95c52..78b668c04fd5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.datasources.csv._ import org.apache.spark.sql.execution.datasources.jdbc._ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options, SchemaRequiredDataSourceV2} +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options, ReadSupport, ReadSupportWithSchema} import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.unsafe.types.UTF8String @@ -183,23 +183,21 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { } val cls = DataSource.lookupDataSource(source) - val isDataSourceV2 = classOf[DataSourceV2].isAssignableFrom(cls) || - classOf[SchemaRequiredDataSourceV2].isAssignableFrom(cls) - if (isDataSourceV2) { + if (classOf[DataSourceV2].isAssignableFrom(cls)) { val dataSource = cls.newInstance() val options = new DataSourceV2Options(extraOptions.asJava) val reader = (cls.newInstance(), userSpecifiedSchema) match { - case (ds: SchemaRequiredDataSourceV2, Some(schema)) => + case (ds: ReadSupportWithSchema, Some(schema)) => ds.createReader(schema, options) - case (ds: DataSourceV2, None) => + case (ds: ReadSupport, None) => ds.createReader(options) - case (_: SchemaRequiredDataSourceV2, None) => + case (_: ReadSupportWithSchema, None) => throw new AnalysisException(s"A schema needs to be specified when using $dataSource.") - case (ds: DataSourceV2, Some(schema)) => + case (ds: ReadSupport, Some(schema)) => val reader = ds.createReader(options) if (reader.readSchema() != schema) { throw new AnalysisException(s"$ds does not allow user-specified schemas.") @@ -207,7 +205,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { reader case _ => - throw new AnalysisException(s"$cls is not a valid Spark SQL Data Source.") + throw new AnalysisException(s"$cls does not support data reading.") } Dataset.ofRows(sparkSession, DataSourceV2Relation(reader)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 887dded288a2..3c9b598fd07c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -19,15 +19,14 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader -import org.apache.spark.sql.sources.v2.reader.upward.StatisticsSupport +import org.apache.spark.sql.sources.v2.reader.{DataSourceV2Reader, SupportsReportStatistics} case class DataSourceV2Relation( output: Seq[AttributeReference], reader: DataSourceV2Reader) extends LeafNode { override def computeStats(): Statistics = reader match { - case r: StatisticsSupport => + case r: SupportsReportStatistics => Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes)) case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala index 1970cc2b9443..7999c0ceb574 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -26,8 +26,7 @@ import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.LeafExecNode import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.sources.v2.reader.{DataReader, DataSourceV2Reader, ReadTask} -import org.apache.spark.sql.sources.v2.reader.scan.UnsafeRowScan +import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.types.StructType case class DataSourceV2ScanExec( @@ -52,7 +51,7 @@ case class DataSourceV2ScanExec( override protected def doExecute(): RDD[InternalRow] = { val readTasks: java.util.List[ReadTask[UnsafeRow]] = reader match { - case r: UnsafeRowScan => r.createUnsafeRowReadTasks() + case r: SupportsScanUnsafeRow => r.createUnsafeRowReadTasks() case _ => reader.createReadTasks().asScala.map { new RowToUnsafeRowReadTask(_, reader.readSchema()): ReadTask[UnsafeRow] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 9740ca6a4dca..b80f695b2a87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -24,17 +24,17 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.sources.v2.reader.downward.{CatalystFilterPushDownSupport, ColumnPruningSupport, FilterPushDownSupport} +import org.apache.spark.sql.sources.v2.reader._ object DataSourceV2Strategy extends Strategy { // TODO: write path override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(projects, filters, DataSourceV2Relation(output, reader)) => val stayUpFilters: Seq[Expression] = reader match { - case r: CatalystFilterPushDownSupport => + case r: SupportsPushDownCatalystFilters => r.pushCatalystFilters(filters.toArray) - case r: FilterPushDownSupport => + case r: SupportsPushDownFilters => // A map from original Catalyst expressions to corresponding translated data source // filters. If a predicate is not in this map, it means it cannot be pushed down. val translatedMap: Map[Expression, Filter] = filters.flatMap { p => @@ -65,7 +65,7 @@ object DataSourceV2Strategy extends Strategy { // TODO: nested fields pruning val requiredColumns = (projectSet ++ filterSet).toSeq.map(attrMap) reader match { - case r: ColumnPruningSupport => + case r: SupportsPushDownRequiredColumns => r.pruneColumns(requiredColumns.toStructType) case _ => } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java index 4f644f010d16..50900e98dedb 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java @@ -26,16 +26,13 @@ import org.apache.spark.sql.sources.GreaterThan; import org.apache.spark.sql.sources.v2.DataSourceV2; import org.apache.spark.sql.sources.v2.DataSourceV2Options; -import org.apache.spark.sql.sources.v2.reader.DataReader; -import org.apache.spark.sql.sources.v2.reader.downward.ColumnPruningSupport; -import org.apache.spark.sql.sources.v2.reader.ReadTask; -import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; -import org.apache.spark.sql.sources.v2.reader.downward.FilterPushDownSupport; +import org.apache.spark.sql.sources.v2.ReadSupport; +import org.apache.spark.sql.sources.v2.reader.*; import org.apache.spark.sql.types.StructType; -public class JavaAdvancedDataSourceV2 implements DataSourceV2 { +public class JavaAdvancedDataSourceV2 implements DataSourceV2, ReadSupport { - class Reader implements DataSourceV2Reader, ColumnPruningSupport, FilterPushDownSupport { + class Reader implements DataSourceV2Reader, SupportsPushDownRequiredColumns, SupportsPushDownFilters { private StructType requiredSchema = new StructType().add("i", "int").add("j", "int"); private Filter[] filters = new Filter[0]; diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java index 0a4930ad0df9..a174bd8092cb 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java @@ -20,13 +20,14 @@ import java.util.List; import org.apache.spark.sql.Row; +import org.apache.spark.sql.sources.v2.DataSourceV2; import org.apache.spark.sql.sources.v2.DataSourceV2Options; -import org.apache.spark.sql.sources.v2.SchemaRequiredDataSourceV2; +import org.apache.spark.sql.sources.v2.ReadSupportWithSchema; import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; import org.apache.spark.sql.sources.v2.reader.ReadTask; import org.apache.spark.sql.types.StructType; -public class JavaSchemaRequiredDataSource implements SchemaRequiredDataSourceV2 { +public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupportWithSchema { class Reader implements DataSourceV2Reader { private final StructType schema; diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java index 3bb260457f69..08469f14c257 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java @@ -24,12 +24,13 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow; import org.apache.spark.sql.sources.v2.DataSourceV2; import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.ReadSupport; import org.apache.spark.sql.sources.v2.reader.DataReader; import org.apache.spark.sql.sources.v2.reader.ReadTask; import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; import org.apache.spark.sql.types.StructType; -public class JavaSimpleDataSourceV2 implements DataSourceV2 { +public class JavaSimpleDataSourceV2 implements DataSourceV2, ReadSupport { class Reader implements DataSourceV2Reader { private final StructType schema = new StructType().add("i", "int").add("j", "int"); diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java index 06043cd6d58c..9efe7c791a93 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java @@ -23,15 +23,13 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow; import org.apache.spark.sql.sources.v2.DataSourceV2; import org.apache.spark.sql.sources.v2.DataSourceV2Options; -import org.apache.spark.sql.sources.v2.reader.DataReader; -import org.apache.spark.sql.sources.v2.reader.ReadTask; -import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; -import org.apache.spark.sql.sources.v2.reader.scan.UnsafeRowScan; +import org.apache.spark.sql.sources.v2.ReadSupport; +import org.apache.spark.sql.sources.v2.reader.*; import org.apache.spark.sql.types.StructType; -public class JavaUnsafeRowDataSourceV2 implements DataSourceV2 { +public class JavaUnsafeRowDataSourceV2 implements DataSourceV2, ReadSupport { - class Reader implements DataSourceV2Reader, UnsafeRowScan { + class Reader implements DataSourceV2Reader, SupportsScanUnsafeRow { private final StructType schema = new StructType().add("i", "int").add("j", "int"); @Override diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2OptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2OptionsSuite.scala new file mode 100644 index 000000000000..933f4075bcc8 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2OptionsSuite.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2 + +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkFunSuite + +/** + * A simple test suite to verify `DataSourceV2Options`. + */ +class DataSourceV2OptionsSuite extends SparkFunSuite { + + test("key is case-insensitive") { + val options = new DataSourceV2Options(Map("foo" -> "bar").asJava) + assert(options.get("foo").get() == "bar") + assert(options.get("FoO").get() == "bar") + assert(!options.get("abc").isPresent) + } + + test("value is case-sensitive") { + val options = new DataSourceV2Options(Map("foo" -> "bAr").asJava) + assert(options.get("foo").get == "bAr") + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index 3101261309da..9ce93d7ae926 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -25,8 +25,6 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.sources.{Filter, GreaterThan} import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.reader.downward.{ColumnPruningSupport, FilterPushDownSupport} -import org.apache.spark.sql.sources.v2.reader.scan.UnsafeRowScan import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.StructType @@ -84,7 +82,7 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { } } -class SimpleDataSourceV2 extends DataSourceV2 { +class SimpleDataSourceV2 extends DataSourceV2 with ReadSupport { class Reader extends DataSourceV2Reader { override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int") @@ -114,10 +112,10 @@ class SimpleReadTask(start: Int, end: Int) extends ReadTask[Row] with DataReader -class AdvancedDataSourceV2 extends DataSourceV2 { +class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport { class Reader extends DataSourceV2Reader - with ColumnPruningSupport with FilterPushDownSupport { + with SupportsPushDownRequiredColumns with SupportsPushDownFilters { var requiredSchema = new StructType().add("i", "int").add("j", "int") var filters = Array.empty[Filter] @@ -183,9 +181,9 @@ class AdvancedReadTask(start: Int, end: Int, requiredSchema: StructType) } -class UnsafeRowDataSourceV2 extends DataSourceV2 { +class UnsafeRowDataSourceV2 extends DataSourceV2 with ReadSupport { - class Reader extends DataSourceV2Reader with UnsafeRowScan { + class Reader extends DataSourceV2Reader with SupportsScanUnsafeRow { override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int") override def createUnsafeRowReadTasks(): JList[ReadTask[UnsafeRow]] = { @@ -219,7 +217,7 @@ class UnsafeRowReadTask(start: Int, end: Int) override def close(): Unit = {} } -class SchemaRequiredDataSource extends SchemaRequiredDataSourceV2 { +class SchemaRequiredDataSource extends DataSourceV2 with ReadSupportWithSchema { class Reader(val readSchema: StructType) extends DataSourceV2Reader { override def createReadTasks(): JList[ReadTask[Row]] = From a1301f5d86847a06adc872cad9a93a28884de3fe Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 14 Sep 2017 15:34:17 +0800 Subject: [PATCH 3/4] more comments --- .../java/org/apache/spark/sql/sources/v2/DataSourceV2.java | 3 ++- .../org/apache/spark/sql/sources/v2/reader/Statistics.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java index 315b6b9511ea..f6e6fe29d0db 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java @@ -18,7 +18,8 @@ package org.apache.spark.sql.sources.v2; /** - * The base interface for data source v2 implementations. + * The base interface for data source v2. Implementations must have a public, no arguments + * constructor. * * Note that this is an empty interface, data source implementations should mix-in at least one of * the plug-in interfaces like `ReadSupport`. Otherwise it's just a dummy data source which is diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java index 4123d2b55a5c..e7381d30494b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java @@ -20,7 +20,8 @@ import java.util.OptionalLong; /** - * An interface to represent statistics for a data source. + * An interface to represent statistics for a data source, which is returned by + * `SupportsReportStatistics`. */ public interface Statistics { OptionalLong sizeInBytes(); From d2c86f4339d59f227bef61b1c97f9770ce1233b9 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 15 Sep 2017 11:51:13 +0800 Subject: [PATCH 4/4] document improvement --- .../spark/sql/sources/v2/DataSourceV2.java | 5 ++++- .../sql/sources/v2/DataSourceV2Options.java | 3 +++ .../spark/sql/sources/v2/ReadSupport.java | 8 +++++--- .../sql/sources/v2/ReadSupportWithSchema.java | 14 ++++++++------ .../spark/sql/sources/v2/reader/DataReader.java | 6 +++++- .../sources/v2/reader/DataSourceV2Reader.java | 17 ++++++++++++----- .../spark/sql/sources/v2/reader/ReadTask.java | 8 ++++++-- .../spark/sql/sources/v2/reader/Statistics.java | 5 ++++- .../reader/SupportsPushDownCatalystFilters.java | 13 ++++++++----- .../v2/reader/SupportsPushDownFilters.java | 11 +++++++---- .../reader/SupportsPushDownRequiredColumns.java | 9 ++++++--- .../v2/reader/SupportsReportStatistics.java | 11 +++++++++-- .../v2/reader/SupportsScanUnsafeRow.java | 10 ++++++---- .../datasources/v2/DataSourceRDD.scala | 15 ++++++--------- 14 files changed, 89 insertions(+), 46 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java index f6e6fe29d0db..dbcbe326a751 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java @@ -17,12 +17,15 @@ package org.apache.spark.sql.sources.v2; +import org.apache.spark.annotation.InterfaceStability; + /** * The base interface for data source v2. Implementations must have a public, no arguments * constructor. * * Note that this is an empty interface, data source implementations should mix-in at least one of - * the plug-in interfaces like `ReadSupport`. Otherwise it's just a dummy data source which is + * the plug-in interfaces like {@link ReadSupport}. Otherwise it's just a dummy data source which is * un-readable/writable. */ +@InterfaceStability.Evolving public interface DataSourceV2 {} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java index b86d82d25e8a..9a89c8193dd6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java @@ -22,10 +22,13 @@ import java.util.Map; import java.util.Optional; +import org.apache.spark.annotation.InterfaceStability; + /** * An immutable string-to-string map in which keys are case-insensitive. This is used to represent * data source options. */ +@InterfaceStability.Evolving public class DataSourceV2Options { private final Map keyLowerCasedMap; diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java index 7bb710dc4836..ab5254a688d5 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java @@ -17,16 +17,18 @@ package org.apache.spark.sql.sources.v2; +import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; /** - * A mix-in interface for `DataSourceV2`. Users can implement this interface to provide data reading - * ability and scan the data from the data source. + * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * provide data reading ability and scan the data from the data source. */ +@InterfaceStability.Evolving public interface ReadSupport { /** - * Creates a `DataSourceV2Reader` to scan the data for this data source. + * Creates a {@link DataSourceV2Reader} to scan the data from this data source. * * @param options the options for this data source reader, which is an immutable case-insensitive * string-to-string map. diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java index 9de3e2f08e6c..c13aeca2ef36 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java @@ -17,21 +17,23 @@ package org.apache.spark.sql.sources.v2; +import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; import org.apache.spark.sql.types.StructType; /** - * A mix-in interface for `DataSourceV2`. Users can implement this interface to provide data reading - * ability and scan the data from the data source. + * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * provide data reading ability and scan the data from the data source. * - * This is a variant of `ReadSupport` that accepts user-specified schema when reading data. A data - * source can implement both `ReadSupport` and `ReadSupportWithSchema` if it supports both schema - * inference and user-specified schema. + * This is a variant of {@link ReadSupport} that accepts user-specified schema when reading data. + * A data source can implement both {@link ReadSupport} and {@link ReadSupportWithSchema} if it + * supports both schema inference and user-specified schema. */ +@InterfaceStability.Evolving public interface ReadSupportWithSchema { /** - * Create a `DataSourceV2Reader` to scan the data for this data source. + * Create a {@link DataSourceV2Reader} to scan the data from this data source. * * @param schema the full schema of this data source reader. Full schema usually maps to the * physical schema of the underlying storage of this data source reader, e.g. diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java index f171869fe8de..cfafc1a57679 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java @@ -19,9 +19,13 @@ import java.io.Closeable; +import org.apache.spark.annotation.InterfaceStability; + /** - * A data reader returned by a read task and is responsible for outputting data for a RDD partition. + * A data reader returned by {@link ReadTask#createReader()} and is responsible for outputting data + * for a RDD partition. */ +@InterfaceStability.Evolving public interface DataReader extends Closeable { /** diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java index 4cf64182bfd8..48feb049c1de 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java @@ -19,26 +19,33 @@ import java.util.List; +import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.Row; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.ReadSupport; +import org.apache.spark.sql.sources.v2.ReadSupportWithSchema; import org.apache.spark.sql.types.StructType; /** - * A data source reader that can mix in various query optimization interfaces and implement these - * optimizations. The actual scan logic should be delegated to `ReadTask`s that are returned by - * this data source reader. + * A data source reader that is returned by + * {@link ReadSupport#createReader(DataSourceV2Options)} or + * {@link ReadSupportWithSchema#createReader(StructType, DataSourceV2Options)}. + * It can mix in various query optimization interfaces to speed up the data scan. The actual scan + * logic should be delegated to {@link ReadTask}s that are returned by {@link #createReadTasks()}. * * There are mainly 3 kinds of query optimizations: * 1. Operators push-down. E.g., filter push-down, required columns push-down(aka column * pruning), etc. These push-down interfaces are named like `SupportsPushDownXXX`. * 2. Information Reporting. E.g., statistics reporting, ordering reporting, etc. These * reporting interfaces are named like `SupportsReportingXXX`. - * 3. Special scan. E.g, columnar scan, unsafe row scan, etc. Note that a data source reader can - * implement at most one special scan. These scan interfaces are named like `SupportsScanXXX`. + * 3. Special scans. E.g, columnar scan, unsafe row scan, etc. These scan interfaces are named + * like `SupportsScanXXX`. * * Spark first applies all operator push-down optimizations that this data source supports. Then * Spark collects information this data source reported for further optimizations. Finally Spark * issues the scan request and does the actual data reading. */ +@InterfaceStability.Evolving public interface DataSourceV2Reader { /** diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java index 0382a183a3d8..7885bfcdd49e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java @@ -19,13 +19,17 @@ import java.io.Serializable; +import org.apache.spark.annotation.InterfaceStability; + /** - * A read task returned by a data source reader and is responsible to create the data reader. - * The relationship between `ReadTask` and `DataReader` is similar to `Iterable` and `Iterator`. + * A read task returned by {@link DataSourceV2Reader#createReadTasks()} and is responsible for + * creating the actual data reader. The relationship between {@link ReadTask} and {@link DataReader} + * is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}. * * Note that, the read task will be serialized and sent to executors, then the data reader will be * created on executors and do the actual reading. */ +@InterfaceStability.Evolving public interface ReadTask extends Serializable { /** diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java index e7381d30494b..e8cd7adbca07 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java @@ -19,10 +19,13 @@ import java.util.OptionalLong; +import org.apache.spark.annotation.InterfaceStability; + /** * An interface to represent statistics for a data source, which is returned by - * `SupportsReportStatistics`. + * {@link SupportsReportStatistics#getStatistics()}. */ +@InterfaceStability.Evolving public interface Statistics { OptionalLong sizeInBytes(); OptionalLong numRows(); diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java index 9268714cd4ef..19d706238ec8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java @@ -22,13 +22,16 @@ import org.apache.spark.sql.catalyst.expressions.Expression; /** - * A mix-in interface for `DataSourceV2Reader`. Users can implement this interface to push down - * arbitrary expressions as predicates to the data source. This is an experimental and unstable - * interface as `Expression` is not public and may get changed in future Spark versions. + * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this + * interface to push down arbitrary expressions as predicates to the data source. + * This is an experimental and unstable interface as {@link Expression} is not public and may get + * changed in the future Spark versions. * - * Note that, if users implement both this interface and `SupportsPushDownFilters`, Spark will - * ignore `SupportsPushDownFilters` and only process this interface. + * Note that, if data source readers implement both this interface and + * {@link SupportsPushDownFilters}, Spark will ignore {@link SupportsPushDownFilters} and only + * process this interface. */ +@InterfaceStability.Evolving @Experimental @InterfaceStability.Unstable public interface SupportsPushDownCatalystFilters { diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java index d9f33f76f287..d4b509e7080f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java @@ -17,15 +17,18 @@ package org.apache.spark.sql.sources.v2.reader; +import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.sources.Filter; /** - * A mix-in interface for `DataSourceV2Reader`. Users can implement this interface to push down - * filters to the data source and reduce the size of the data to be read. + * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this + * interface to push down filters to the data source and reduce the size of the data to be read. * - * Note that, if users implement both this interface and `SupportsPushDownCatalystFilters`, Spark - * will ignore this interface and only process `SupportsPushDownCatalystFilters`. + * Note that, if data source readers implement both this interface and + * {@link SupportsPushDownCatalystFilters}, Spark will ignore this interface and only process + * {@link SupportsPushDownCatalystFilters}. */ +@InterfaceStability.Evolving public interface SupportsPushDownFilters { /** diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java index 5a575436273e..fe0ac8ee0ee3 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java @@ -17,12 +17,15 @@ package org.apache.spark.sql.sources.v2.reader; +import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.types.StructType; /** - * A mix-in interface for `DataSourceV2Reader`. Users can implement this interface to push down - * required columns and only read these columns during scan. + * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this + * interface to push down required columns to the data source and only read these columns during + * scan to reduce the size of the data to be read. */ +@InterfaceStability.Evolving public interface SupportsPushDownRequiredColumns { /** @@ -32,7 +35,7 @@ public interface SupportsPushDownRequiredColumns { * also OK to do the pruning partially, e.g., a data source may not be able to prune nested * fields, and only prune top-level columns. * - * Note that, data source implementations should update `DataSourceReader.readSchema` after + * Note that, data source readers should update {@link DataSourceV2Reader#readSchema()} after * applying column pruning. */ void pruneColumns(StructType requiredSchema); diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java index 6378c70e86e5..c019d2f819ab 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java @@ -17,10 +17,17 @@ package org.apache.spark.sql.sources.v2.reader; +import org.apache.spark.annotation.InterfaceStability; + /** - * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to report - * statistics to Spark. + * A mix in interface for {@link DataSourceV2Reader}. Data source readers can implement this + * interface to report statistics to Spark. */ +@InterfaceStability.Evolving public interface SupportsReportStatistics { + + /** + * Returns the basic statistics of this data source. + */ Statistics getStatistics(); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java index 2b26ec25ffd6..829f9a078760 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java @@ -27,10 +27,12 @@ import org.apache.spark.sql.sources.v2.reader.ReadTask; /** - * A mix-in interface for `DataSourceV2Reader`. Users can implement this interface to output - * unsafe rows directly and avoid the row copy at Spark side. This is an experimental and unstable - * interface, as `UnsafeRow` is not public and may get changed in future Spark versions. + * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this + * interface to output {@link UnsafeRow} directly and avoid the row copy at Spark side. + * This is an experimental and unstable interface, as {@link UnsafeRow} is not public and may get + * changed in the future Spark versions. */ +@InterfaceStability.Evolving @Experimental @InterfaceStability.Unstable public interface SupportsScanUnsafeRow extends DataSourceV2Reader { @@ -41,7 +43,7 @@ default List> createReadTasks() { } /** - * Similar to `DataSourceV2Reader.createReadTasks`, but return data in unsafe row format. + * Similar to {@link DataSourceV2Reader#createReadTasks()}, but returns data in unsafe row format. */ List> createUnsafeRowReadTasks(); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala index 31c04498ec0a..b8fe5ac8e3d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.v2 +import scala.collection.JavaConverters._ + import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.UnsafeRow @@ -27,18 +29,13 @@ class DataSourceRDDPartition(val index: Int, val readTask: ReadTask[UnsafeRow]) class DataSourceRDD( sc: SparkContext, - @transient private val generators: java.util.List[ReadTask[UnsafeRow]]) + @transient private val readTasks: java.util.List[ReadTask[UnsafeRow]]) extends RDD[UnsafeRow](sc, Nil) { override protected def getPartitions: Array[Partition] = { - var index = 0 - val iter = generators.iterator() - val res = new Array[Partition](generators.size()) - while (iter.hasNext) { - res(index) = new DataSourceRDDPartition(index, iter.next()) - index += 1 - } - res + readTasks.asScala.zipWithIndex.map { + case (readTask, index) => new DataSourceRDDPartition(index, readTask) + }.toArray } override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {