Skip to content

Conversation

@maropu
Copy link
Member

@maropu maropu commented Apr 25, 2017

What changes were proposed in this pull request?

This pr made it more consistent to handle column name duplication. In the current master, error handling is different when hitting column name duplication:

// json
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq("""{"a":1, "a":1}"""""").toDF().coalesce(1).write.mode("overwrite").text("/tmp/data")
scala> spark.read.format("json").schema(schema).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Reference 'a' is ambiguous, could be: a#12, a#13.;
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:181)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:153)

scala> spark.read.format("json").load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Duplicate column(s) : "a" found, cannot save to JSON format;
  at org.apache.spark.sql.execution.datasources.json.JsonDataSource.checkConstraints(JsonDataSource.scala:81)
  at org.apache.spark.sql.execution.datasources.json.JsonDataSource.inferSchema(JsonDataSource.scala:63)
  at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.inferSchema(JsonFileFormat.scala:57)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:176)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:176)

// csv
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq("a,a", "1,1").toDF().coalesce(1).write.mode("overwrite").text("/tmp/data")
scala> spark.read.format("csv").schema(schema).option("header", false).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Reference 'a' is ambiguous, could be: a#41, a#42.;
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:181)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:153)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:152)

// If `inferSchema` is true, a CSV format is duplicate-safe (See SPARK-16896)
scala> spark.read.format("csv").option("header", true).load("/tmp/data").show
+---+---+
| a0| a1|
+---+---+
|  1|  1|
+---+---+

// parquet
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq((1, 1)).toDF("a", "b").coalesce(1).write.mode("overwrite").parquet("/tmp/data")
scala> spark.read.format("parquet").schema(schema).option("header", false).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Reference 'a' is ambiguous, could be: a#110, a#111.;
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:181)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:153)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:152)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

When this patch applied, the results change to;


// json
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq("""{"a":1, "a":1}"""""").toDF().coalesce(1).write.mode("overwrite").text("/tmp/data")
scala> spark.read.format("json").schema(schema).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in datasource: "a";
  at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtil.scala:47)
  at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtil.scala:33)
  at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:186)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:368)

scala> spark.read.format("json").load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in datasource: "a";
  at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtil.scala:47)
  at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtil.scala:33)
  at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:186)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:368)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:156)


// csv
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq("a,a", "1,1").toDF().coalesce(1).write.mode("overwrite").text("/tmp/data")
scala> spark.read.format("csv").schema(schema).option("header", false).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in datasource: "a";
  at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtil.scala:47)
  at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtil.scala:33)
  at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:186)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:368)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)

scala> spark.read.format("csv").option("header", true).load("/tmp/data").show
+---+---+
| a0| a1|
+---+---+
|  1|  1|
+---+---+

// parquet
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq((1, 1)).toDF("a", "b").coalesce(1).write.mode("overwrite").parquet("/tmp/data")
scala> spark.read.format("parquet").schema(schema).option("header", false).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in datasource: "a";
  at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtil.scala:47)
  at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtil.scala:33)
  at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:186)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:368)

How was this patch tested?

Added tests in DataFrameReaderWriterSuite and SQLQueryTestSuite.

@SparkQA
Copy link

SparkQA commented Apr 25, 2017

Test build #76137 has finished for PR 17758 at commit 861a074.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 25, 2017

Test build #76138 has finished for PR 17758 at commit c222eae.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 26, 2017

Test build #76165 has finished for PR 17758 at commit de11a5b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 26, 2017

Test build #76167 has finished for PR 17758 at commit 05a7a61.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 12, 2017

Test build #76864 has finished for PR 17758 at commit 42f3fc9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented May 13, 2017

@gatorsmile Could you check this and give me advise on this? Thanks!

@maropu
Copy link
Member Author

maropu commented May 16, 2017

ping

@gatorsmile
Copy link
Member

Will review it tomorrow. Ping me if I forgot it.

@maropu
Copy link
Member Author

maropu commented May 17, 2017

Thanks alot!

@maropu
Copy link
Member Author

maropu commented May 20, 2017

@gatorsmile ping

@maropu
Copy link
Member Author

maropu commented May 23, 2017

ping

@gatorsmile
Copy link
Member

cc @wzhfy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename the file (SchemaUtil) also as SchemaUtils?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: duplicate """

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is header option necessary?

Copy link
Contributor

@wzhfy wzhfy Jun 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to add tests for case sensitivity.

Copy link
Contributor

@wzhfy wzhfy Jun 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to end-to-end tests, we can also add a suite for SchemaUtils and put the case sensitivity cases in it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added SchemaUtilsSuite.

@maropu
Copy link
Member Author

maropu commented Jun 11, 2017

Thanks! I'll update in a day

@wzhfy
Copy link
Contributor

wzhfy commented Jun 12, 2017

Hi @maropu , I just did some simple search, and found other places also related to duplicate columns. e.g. InsertIntoHadoopFsRelationCommand, PartitioningUtils.normalizePartitionSpec, SessionCatalog.alterTableSchema. Can you do a more comprehensive search to find if there are other places missed? Let's make all of them consistent as best as we can.

@maropu
Copy link
Member Author

maropu commented Jun 12, 2017

ok, I'll also check again. Thanks!

@SparkQA
Copy link

SparkQA commented Jun 13, 2017

Test build #77971 has finished for PR 17758 at commit ac3dcc3.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 13, 2017

Test build #77973 has finished for PR 17758 at commit 36f6849.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 13, 2017

Test build #77981 has finished for PR 17758 at commit 0bd161e.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 13, 2017

Test build #77986 has finished for PR 17758 at commit 1438b23.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 13, 2017

Test build #77991 has finished for PR 17758 at commit 46a3f30.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 6, 2017

Test build #79284 has finished for PR 17758 at commit 5c29a75.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

-- Catch case-sensitive name duplication
SET spark.sql.caseSensitive=true;

CREATE TABLE t(c0 STRING, c1 INT, c1 DOUBLE, c0 INT) USING parquet;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We didn't have test cases for create table before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In DDLSuite, we already have simple tests for duplicate columns. we better moving these tests there?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should keep them in one place. For now I think we still need to put them in DDLSuite because we need to run it with and without hive support. Can we pick some typical test cases here and move them to DDLSuite?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will update

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved some tests to DDLSuite and removed this file.

@SparkQA
Copy link

SparkQA commented Jul 7, 2017

Test build #79320 has finished for PR 17758 at commit 8cbfbd0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 7, 2017

Test build #79323 has finished for PR 17758 at commit 5ed2c0d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

LGTM

}
}

val errorMsg = intercept[AnalysisException] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this test related to streaming?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have streaming metadata logs, we have the different code path from non-streaming cases in Dataset. So, the original motivation of this tests is to check the duplication even in this path.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 647963a Jul 10, 2017
@maropu
Copy link
Member Author

maropu commented Jul 10, 2017

@cloud-fan Thanks! BTW, we better handle other duplication (e.g., local temporary tables)?;

scala> Seq((1, 1)).toDF("a", "a").createOrReplaceTempView("t")
scala> sql("SELECT * FROM t").show
+---+---+
|  a|  a|
+---+---+
|  1|  1|
+---+---+

This changes the existing behaviour though.

@cloud-fan
Copy link
Contributor

We can create a ticket first and discuss there.

@maropu
Copy link
Member Author

maropu commented Jul 10, 2017

ok, I'll file a ticket first. Thanks.

@shaulre
Copy link

shaulre commented Nov 5, 2017

Still when you load the json file through Dataset[String] by doing spark.read.json(spark.read.textFile("json.file"), Spark does not trow any error and you get DataFrame with duplicate columns. Is that an expected behaviour and a feature or it's actually a bug?

@maropu
Copy link
Member Author

maropu commented Nov 5, 2017

yea, I think that is an expected behaviour, actually, some operations of DataFrame/Dataset accepts the duplication. e.g.,

scala> Seq((1, 2)).toDF("a", "a").show
+---+---+
|  a|  a|
+---+---+
|  1|  2|
+---+---+

Also, the original motivation of this pr does not intend to change existing behaviour.

@shaulre
Copy link

shaulre commented Nov 5, 2017

Why it's ok to have duplicate columns when you read from RDD/DS and not when you read directly from file? Maybe it's should be configurable option?
When you have duplicate columns, how can you deal with columns selection or renaming?

@maropu
Copy link
Member Author

maropu commented Nov 5, 2017

Because it is a design of DataFrame and join results could have duplicate column names. Anyway, we couldn't change this behaviour in minor releases.

cloud-fan pushed a commit that referenced this pull request Jul 30, 2020
…lt datasources

### What changes were proposed in this pull request?
When `spark.sql.caseSensitive` is `false` (by default), check that there are not duplicate column names on the same level (top level or nested levels) in reading from in-built datasources Parquet, ORC, Avro and JSON. If such duplicate columns exist, throw the exception:
```
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema:
```

### Why are the changes needed?
To make handling of duplicate nested columns is similar to handling of duplicate top-level columns i. e. output the same error when `spark.sql.caseSensitive` is `false`:
```Scala
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema: `camelcase`
```

Checking of top-level duplicates was introduced by #17758.

### Does this PR introduce _any_ user-facing change?
Yes. For the example from SPARK-32431:

ORC:
```scala
java.io.IOException: Error reading file: file:/private/var/folders/p3/dfs6mf655d7fnjrsjvldh0tc0000gn/T/spark-c02c2f9a-0cdc-4859-94fc-b9c809ca58b1/part-00001-63e8c3f0-7131-4ec9-be02-30b3fdd276f4-c000.snappy.orc
	at org.apache.orc.impl.RecordReaderImpl.nextBatch(RecordReaderImpl.java:1329)
	at org.apache.orc.mapreduce.OrcMapreduceRecordReader.ensureBatch(OrcMapreduceRecordReader.java:78)
...
Caused by: java.io.EOFException: Read past end of RLE integer from compressed stream Stream for column 3 kind DATA position: 6 length: 6 range: 0 offset: 12 limit: 12 range 0 = 0 to 6 uncompressed: 3 to 3
	at org.apache.orc.impl.RunLengthIntegerReaderV2.readValues(RunLengthIntegerReaderV2.java:61)
	at org.apache.orc.impl.RunLengthIntegerReaderV2.next(RunLengthIntegerReaderV2.java:323)
```

JSON:
```scala
+------------+
|StructColumn|
+------------+
|        [,,]|
+------------+
```

Parquet:
```scala
+------------+
|StructColumn|
+------------+
|     [0,, 1]|
+------------+
```

Avro:
```scala
+------------+
|StructColumn|
+------------+
|        [,,]|
+------------+
```

After the changes, Parquet, ORC, JSON and Avro output the same error:
```scala
Found duplicate column(s) in the data schema: `camelcase`;
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema: `camelcase`;
	at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:112)
	at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:51)
	at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:67)
```

### How was this patch tested?
Run modified test suites:
```
$ build/sbt "sql/test:testOnly org.apache.spark.sql.FileBasedDataSourceSuite"
$ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.*"
```
and added new UT to `SchemaUtilsSuite`.

Closes #29234 from MaxGekk/nested-case-insensitive-column.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan pushed a commit that referenced this pull request Aug 3, 2020
…atasource

### What changes were proposed in this pull request?
Check that there are not duplicate column names on the same level (top level or nested levels) in reading from JDBC datasource. If such duplicate columns exist, throw the exception:
```
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the customSchema option value:
```
The check takes into account the SQL config `spark.sql.caseSensitive` (`false` by default).

### Why are the changes needed?
To make handling of duplicate nested columns is similar to handling of duplicate top-level columns i. e. output the same error:
```Scala
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the customSchema option value: `camelcase`
```

Checking of top-level duplicates was introduced by #17758, and duplicates in nested structures by #29234.

### Does this PR introduce _any_ user-facing change?
Yes.

### How was this patch tested?
Added new test suite `JdbcNestedDataSourceSuite`.

Closes #29317 from MaxGekk/jdbc-dup-nested-columns.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants