From f7693f0abfe0923868c1918ddcaeaece2c107c5d Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 19 Jan 2018 08:57:50 -0800 Subject: [PATCH 1/3] fix --- .../datasources}/HadoopFsRelationTest.scala | 11 ++-- .../json}/JsonHadoopFsRelationSuite.scala | 3 +- .../orc/OrcHadoopFsRelationSuite.scala | 46 +------------ .../ParquetHadoopFsRelationSuite.scala | 5 +- .../SimpleTextHadoopFsRelationSuite.scala | 1 + .../sql/sources/SimpleTextRelation.scala | 0 .../orc/HiveOrcHadoopFsRelationSuite.scala | 64 +++++++++++++++++++ 7 files changed, 76 insertions(+), 54 deletions(-) rename sql/{hive/src/test/scala/org/apache/spark/sql/sources => core/src/test/scala/org/apache/spark/sql/execution/datasources}/HadoopFsRelationTest.scala (99%) rename sql/{hive/src/test/scala/org/apache/spark/sql/sources => core/src/test/scala/org/apache/spark/sql/execution/datasources/json}/JsonHadoopFsRelationSuite.scala (97%) rename sql/{hive/src/test/scala/org/apache/spark/sql/hive => core/src/test/scala/org/apache/spark/sql/execution/datasources}/orc/OrcHadoopFsRelationSuite.scala (66%) rename sql/{hive/src/test/scala/org/apache/spark/sql/sources => core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet}/ParquetHadoopFsRelationSuite.scala (98%) rename sql/{hive => core}/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala (98%) rename sql/{hive => core}/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala (100%) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcHadoopFsRelationSuite.scala diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationTest.scala similarity index 99% rename from sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationTest.scala index 80aff446bc24b..938b2547d717d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationTest.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.sources +package org.apache.spark.sql.execution.datasources import java.io.File @@ -26,15 +26,14 @@ import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ import org.apache.spark.sql.execution.DataSourceScanExec -import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.sources.SimpleTextSource +import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} import org.apache.spark.sql.types._ -abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with TestHiveSingleton { - import spark.implicits._ +abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with SharedSQLContext { + import testImplicits._ val dataSourceName: String diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonHadoopFsRelationSuite.scala similarity index 97% rename from sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonHadoopFsRelationSuite.scala index 27f398ebf301a..0bcba41e3797d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonHadoopFsRelationSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.sources +package org.apache.spark.sql.execution.datasources.json import java.math.BigDecimal @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.catalog.CatalogUtils +import org.apache.spark.sql.execution.datasources.HadoopFsRelationTest import org.apache.spark.sql.types._ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcHadoopFsRelationSuite.scala similarity index 66% rename from sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcHadoopFsRelationSuite.scala index a1f054b8e3f44..b15f273074f01 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcHadoopFsRelationSuite.scala @@ -15,16 +15,14 @@ * limitations under the License. */ -package org.apache.spark.sql.hive.orc - -import java.io.File +package org.apache.spark.sql.execution.datasources.orc import org.apache.hadoop.fs.Path import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.catalog.CatalogUtils +import org.apache.spark.sql.execution.datasources.HadoopFsRelationTest import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.HadoopFsRelationTest import org.apache.spark.sql.types._ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { @@ -82,44 +80,4 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { } } } - - test("SPARK-13543: Support for specifying compression codec for ORC via option()") { - withTempPath { dir => - val path = s"${dir.getCanonicalPath}/table1" - val df = (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b") - df.write - .option("compression", "ZlIb") - .orc(path) - - // Check if this is compressed as ZLIB. - val maybeOrcFile = new File(path).listFiles().find { f => - !f.getName.startsWith("_") && f.getName.endsWith(".zlib.orc") - } - assert(maybeOrcFile.isDefined) - val orcFilePath = maybeOrcFile.get.toPath.toString - val expectedCompressionKind = - OrcFileOperator.getFileReader(orcFilePath).get.getCompression - assert("ZLIB" === expectedCompressionKind.name()) - - val copyDf = spark - .read - .orc(path) - checkAnswer(df, copyDf) - } - } - - test("Default compression codec is snappy for ORC compression") { - withTempPath { file => - spark.range(0, 10).write - .orc(file.getCanonicalPath) - val expectedCompressionKind = - OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression - assert("SNAPPY" === expectedCompressionKind.name()) - } - } -} - -class HiveOrcHadoopFsRelationSuite extends OrcHadoopFsRelationSuite { - override val dataSourceName: String = - classOf[org.apache.spark.sql.hive.orc.OrcFileFormat].getCanonicalName } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetHadoopFsRelationSuite.scala similarity index 98% rename from sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetHadoopFsRelationSuite.scala index dce5bb7ddba66..dd77ffe92fd58 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetHadoopFsRelationSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.sources +package org.apache.spark.sql.execution.datasources.parquet import java.io.File @@ -25,7 +25,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.CatalogUtils -import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol +import org.apache.spark.sql.execution.datasources.{HadoopFsRelationTest, SQLHadoopMapReduceCommitProtocol} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -162,7 +162,6 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { } test("SPARK-11500: Not deterministic order of columns when using merging schemas.") { - import testImplicits._ withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") { withTempPath { dir => val pathOne = s"${dir.getCanonicalPath}/part=1" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala similarity index 98% rename from sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala index 2ec593b95c9b6..713b773fdbb2d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.catalyst.expressions.PredicateHelper +import org.apache.spark.sql.execution.datasources.HadoopFsRelationTest import org.apache.spark.sql.types._ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with PredicateHelper { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala similarity index 100% rename from sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala rename to sql/core/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcHadoopFsRelationSuite.scala new file mode 100644 index 0000000000000..6f4b096647aee --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcHadoopFsRelationSuite.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import java.io.File + +import org.apache.spark.sql.execution.datasources.orc.OrcHadoopFsRelationSuite + +class HiveOrcHadoopFsRelationSuite extends OrcHadoopFsRelationSuite { + import testImplicits._ + + override val dataSourceName: String = + classOf[org.apache.spark.sql.hive.orc.OrcFileFormat].getCanonicalName + + test("SPARK-13543: Support for specifying compression codec for ORC via option()") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/table1" + val df = (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b") + df.write + .option("compression", "ZlIb") + .orc(path) + + // Check if this is compressed as ZLIB. + val maybeOrcFile = new File(path).listFiles().find { f => + !f.getName.startsWith("_") && f.getName.endsWith(".zlib.orc") + } + assert(maybeOrcFile.isDefined) + val orcFilePath = maybeOrcFile.get.toPath.toString + val expectedCompressionKind = + OrcFileOperator.getFileReader(orcFilePath).get.getCompression + assert("ZLIB" === expectedCompressionKind.name()) + + val copyDf = spark + .read + .orc(path) + checkAnswer(df, copyDf) + } + } + + test("Default compression codec is snappy for ORC compression") { + withTempPath { file => + spark.range(0, 10).write + .orc(file.getCanonicalPath) + val expectedCompressionKind = + OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression + assert("SNAPPY" === expectedCompressionKind.name()) + } + } +} From b83f859137ca9ed33c3c7e4295c433b7bbca6eee Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 19 Jan 2018 14:36:03 -0800 Subject: [PATCH 2/3] fix --- .../sql/execution/datasources/HadoopFsRelationTest.scala | 2 +- .../datasources/json/JsonHadoopFsRelationSuite.scala | 3 ++- .../execution/datasources/orc/OrcHadoopFsRelationSuite.scala | 5 ++++- .../datasources/parquet/ParquetHadoopFsRelationSuite.scala | 3 ++- .../spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala | 4 +++- .../spark/sql/hive/orc/HiveOrcHadoopFsRelationSuite.scala | 5 +++-- 6 files changed, 15 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationTest.scala index 938b2547d717d..8a28f8a20bd55 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationTest.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} import org.apache.spark.sql.types._ -abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with SharedSQLContext { +abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { import testImplicits._ val dataSourceName: String diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonHadoopFsRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonHadoopFsRelationSuite.scala index 0bcba41e3797d..a8a72e1cc15d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonHadoopFsRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonHadoopFsRelationSuite.scala @@ -24,9 +24,10 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.execution.datasources.HadoopFsRelationTest +import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ -class JsonHadoopFsRelationSuite extends HadoopFsRelationTest { +class JsonHadoopFsRelationSuite extends HadoopFsRelationTest with SharedSQLContext { override val dataSourceName: String = "json" private val badJson = "\u0000\u0000\u0000A\u0001AAA" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcHadoopFsRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcHadoopFsRelationSuite.scala index b15f273074f01..6ed1227d57ba2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcHadoopFsRelationSuite.scala @@ -23,9 +23,12 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.execution.datasources.HadoopFsRelationTest import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ -class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { +class OrcHadoopFsRelationSuite extends SharedSQLContext + +abstract class OrcHadoopFsRelationBase extends HadoopFsRelationTest { import testImplicits._ override protected val enableAutoThreadAudit = false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetHadoopFsRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetHadoopFsRelationSuite.scala index dd77ffe92fd58..92ddffb62a785 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetHadoopFsRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetHadoopFsRelationSuite.scala @@ -27,10 +27,11 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.execution.datasources.{HadoopFsRelationTest, SQLHadoopMapReduceCommitProtocol} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ -class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { +class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest with SharedSQLContext { import testImplicits._ override val dataSourceName: String = "parquet" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala index 713b773fdbb2d..57700fe9f87a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala @@ -22,9 +22,11 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.catalyst.expressions.PredicateHelper import org.apache.spark.sql.execution.datasources.HadoopFsRelationTest +import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ -class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with PredicateHelper { +class SimpleTextHadoopFsRelationSuite + extends HadoopFsRelationTest with PredicateHelper with SharedSQLContext { override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName // We have a very limited number of supported types at here since it is just for a diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcHadoopFsRelationSuite.scala index 6f4b096647aee..55d909987d2d7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcHadoopFsRelationSuite.scala @@ -19,9 +19,10 @@ package org.apache.spark.sql.hive.orc import java.io.File -import org.apache.spark.sql.execution.datasources.orc.OrcHadoopFsRelationSuite +import org.apache.spark.sql.execution.datasources.orc.OrcHadoopFsRelationBase +import org.apache.spark.sql.hive.test.TestHiveSingleton -class HiveOrcHadoopFsRelationSuite extends OrcHadoopFsRelationSuite { +class HiveOrcHadoopFsRelationSuite extends OrcHadoopFsRelationBase with TestHiveSingleton { import testImplicits._ override val dataSourceName: String = From 9c85b18c059e4ab3b4b25a5b2e414b4f0c67072f Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 20 Jan 2018 08:02:43 -0800 Subject: [PATCH 3/3] fix test --- .../json/JsonHadoopFsRelationSuite.scala | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonHadoopFsRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonHadoopFsRelationSuite.scala index a8a72e1cc15d6..928726ed24eba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonHadoopFsRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonHadoopFsRelationSuite.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.execution.datasources.HadoopFsRelationTest +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -112,14 +113,16 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest with SharedSQLConte test("invalid json with leading nulls - from file (multiLine=true)") { import testImplicits._ - withTempDir { tempDir => - val path = tempDir.getAbsolutePath - Seq(badJson, """{"a":1}""").toDS().write.mode("overwrite").text(path) - val expected = s"""$badJson\n{"a":1}\n""" - val schema = new StructType().add("a", IntegerType).add("_corrupt_record", StringType) - val df = - spark.read.format(dataSourceName).option("multiLine", true).schema(schema).load(path) - checkAnswer(df, Row(null, expected)) + withSQLConf(SQLConf.MAX_RECORDS_PER_FILE.key -> "2") { + withTempDir { tempDir => + val path = tempDir.getAbsolutePath + Seq(badJson, """{"a":1}""").toDS().repartition(1).write.mode("overwrite").text(path) + val expected = s"""$badJson\n{"a":1}\n""" + val schema = new StructType().add("a", IntegerType).add("_corrupt_record", StringType) + val df = + spark.read.format(dataSourceName).option("multiLine", true).schema(schema).load(path) + checkAnswer(df, Row(null, expected)) + } } }