From e0b09fb9474642c0a35ea1d20de7e4ca1f02d286 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 16 Jul 2015 20:59:41 +0800 Subject: [PATCH 1/2] Adds DataFrame reader/writer shortcut methods for ORC --- python/pyspark/sql/readwriter.py | 33 ++++++++++++++++-- .../sql/orc_partitioned/._SUCCESS.crc | Bin 0 -> 8 bytes .../test_support/sql/orc_partitioned/_SUCCESS | 0 ...9af031-b970-49d6-ad39-30460a0be2c8.orc.crc | Bin 0 -> 12 bytes ...0-829af031-b970-49d6-ad39-30460a0be2c8.orc | Bin 0 -> 168 bytes ...9af031-b970-49d6-ad39-30460a0be2c8.orc.crc | Bin 0 -> 12 bytes ...0-829af031-b970-49d6-ad39-30460a0be2c8.orc | Bin 0 -> 168 bytes .../apache/spark/sql/DataFrameReader.scala | 8 +++++ .../apache/spark/sql/DataFrameWriter.scala | 11 ++++++ .../hive/orc/OrcHadoopFsRelationSuite.scala | 3 +- .../hive/orc/OrcPartitionDiscoverySuite.scala | 14 ++++---- .../spark/sql/hive/orc/OrcQuerySuite.scala | 12 +++---- .../apache/spark/sql/hive/orc/OrcTest.scala | 8 ++--- 13 files changed, 67 insertions(+), 22 deletions(-) create mode 100644 python/test_support/sql/orc_partitioned/._SUCCESS.crc create mode 100755 python/test_support/sql/orc_partitioned/_SUCCESS create mode 100644 python/test_support/sql/orc_partitioned/b=0/c=0/.part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc.crc create mode 100755 python/test_support/sql/orc_partitioned/b=0/c=0/part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc create mode 100644 python/test_support/sql/orc_partitioned/b=1/c=1/.part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc.crc create mode 100755 python/test_support/sql/orc_partitioned/b=1/c=1/part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 882a03090ec13..dc4acc8c1490e 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -146,14 +146,24 @@ def table(self, tableName): return self._df(self._jreader.table(tableName)) @since(1.4) - def parquet(self, *path): + def parquet(self, *paths): """Loads a Parquet file, returning the result as a :class:`DataFrame`. >>> df = sqlContext.read.parquet('python/test_support/sql/parquet_partitioned') >>> df.dtypes [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')] """ - return self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, path))) + return self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, paths))) + + @since(1.5) + def orc(self, path): + """Loads an ORC file, returning the result as a :class:`DataFrame`. + + >>> df = sqlContext.read.orc('python/test_support/sql/orc_partitioned') + >>> df.dtypes + [('a', 'bigint'), ('b', 'int'), ('c', 'int')] + """ + return self._df(self._jreader.orc(path)) @since(1.4) def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, @@ -378,6 +388,25 @@ def parquet(self, path, mode=None, partitionBy=None): self.partitionBy(partitionBy) self._jwrite.parquet(path) + def orc(self, path, mode=None, partitionBy=None): + """Saves the content of the :class:`DataFrame` in ORC format at the specified path. + + :param path: the path in any Hadoop supported file system + :param mode: specifies the behavior of the save operation when data already exists. + + * ``append``: Append contents of this :class:`DataFrame` to existing data. + * ``overwrite``: Overwrite existing data. + * ``ignore``: Silently ignore this operation if data already exists. + * ``error`` (default case): Throw an exception if data already exists. + :param partitionBy: names of partitioning columns + + >>> df.write.orc(os.path.join(tempfile.mkdtemp(), 'data')) + """ + self.mode(mode) + if partitionBy is not None: + self.partitionBy(partitionBy) + self._jwrite.orc(path) + @since(1.4) def jdbc(self, url, table, mode=None, properties={}): """Saves the content of the :class:`DataFrame` to a external database table via JDBC. diff --git a/python/test_support/sql/orc_partitioned/._SUCCESS.crc b/python/test_support/sql/orc_partitioned/._SUCCESS.crc new file mode 100644 index 0000000000000000000000000000000000000000..3b7b044936a890cd8d651d349a752d819d71d22c GIT binary patch literal 8 PcmYc;N@ieSU}69O2$TUk literal 0 HcmV?d00001 diff --git a/python/test_support/sql/orc_partitioned/_SUCCESS b/python/test_support/sql/orc_partitioned/_SUCCESS new file mode 100755 index 0000000000000..e69de29bb2d1d diff --git a/python/test_support/sql/orc_partitioned/b=0/c=0/.part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc.crc b/python/test_support/sql/orc_partitioned/b=0/c=0/.part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc.crc new file mode 100644 index 0000000000000000000000000000000000000000..834cf0b7f227244a3ccda18809a0bb49d27b59d2 GIT binary patch literal 12 TcmYc;N@ieSU}CV!x?uzW5r_im literal 0 HcmV?d00001 diff --git a/python/test_support/sql/orc_partitioned/b=0/c=0/part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc b/python/test_support/sql/orc_partitioned/b=0/c=0/part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc new file mode 100755 index 0000000000000000000000000000000000000000..49438018733565be297429b4f9349450441230f9 GIT binary patch literal 168 zcmeYda^_`V;9?PC;$Tzk9-$Hm^fGr7_ER>tdO)gOz`6{6JV5RXb@0hV&KsbZTiB@>>uPT3IK<`7W)7I literal 0 HcmV?d00001 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 9ad6e21da7bf7..818d99214bb4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -267,6 +267,14 @@ class DataFrameReader private[sql](sqlContext: SQLContext) { } } + /** + * Loads an ORC file and returns the result as a [[DataFrame]]. + * + * @param path input path + * @since 1.5.0 + */ + def orc(path: String): DataFrame = format("orc").load(path) + /** * Returns the specified table as a [[DataFrame]]. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 5548b26cb8f80..fcdc13b7f942a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -280,6 +280,17 @@ final class DataFrameWriter private[sql](df: DataFrame) { */ def parquet(path: String): Unit = format("parquet").save(path) + /** + * Saves the content of the [[DataFrame]] in ORC format at the specified path. + * This is equivalent to: + * {{{ + * format("orc").save(path) + * }}} + * + * @since 1.5.0 + */ + def orc(path: String): Unit = format("orc").save(path) + /////////////////////////////////////////////////////////////////////////////////////// // Builder pattern config options /////////////////////////////////////////////////////////////////////////////////////// diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index 080af5bb23c16..af3f468aaa5e9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -41,8 +41,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { .parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1)) .toDF("a", "b", "p1") .write - .format("orc") - .save(partitionDir.toString) + .orc(partitionDir.toString) } val dataSchemaWithPartition = diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala index 3c2efe329bfd5..d463e8fd626f9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala @@ -49,13 +49,13 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll { def makeOrcFile[T <: Product: ClassTag: TypeTag]( data: Seq[T], path: File): Unit = { - data.toDF().write.format("orc").mode("overwrite").save(path.getCanonicalPath) + data.toDF().write.mode("overwrite").orc(path.getCanonicalPath) } def makeOrcFile[T <: Product: ClassTag: TypeTag]( df: DataFrame, path: File): Unit = { - df.write.format("orc").mode("overwrite").save(path.getCanonicalPath) + df.write.mode("overwrite").orc(path.getCanonicalPath) } protected def withTempTable(tableName: String)(f: => Unit): Unit = { @@ -90,7 +90,7 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll { makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) } - read.format("orc").load(base.getCanonicalPath).registerTempTable("t") + read.orc(base.getCanonicalPath).registerTempTable("t") withTempTable("t") { checkAnswer( @@ -137,7 +137,7 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll { makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) } - read.format("orc").load(base.getCanonicalPath).registerTempTable("t") + read.orc(base.getCanonicalPath).registerTempTable("t") withTempTable("t") { checkAnswer( @@ -187,9 +187,8 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll { } read - .format("orc") .option(ConfVars.DEFAULTPARTITIONNAME.varname, defaultPartitionName) - .load(base.getCanonicalPath) + .orc(base.getCanonicalPath) .registerTempTable("t") withTempTable("t") { @@ -230,9 +229,8 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll { } read - .format("orc") .option(ConfVars.DEFAULTPARTITIONNAME.varname, defaultPartitionName) - .load(base.getCanonicalPath) + .orc(base.getCanonicalPath) .registerTempTable("t") withTempTable("t") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index ca131faaeef05..744d462938141 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -63,14 +63,14 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { withOrcFile(data) { file => checkAnswer( - sqlContext.read.format("orc").load(file), + sqlContext.read.orc(file), data.toDF().collect()) } } test("Read/write binary data") { withOrcFile(BinaryData("test".getBytes("utf8")) :: Nil) { file => - val bytes = read.format("orc").load(file).head().getAs[Array[Byte]](0) + val bytes = read.orc(file).head().getAs[Array[Byte]](0) assert(new String(bytes, "utf8") === "test") } } @@ -88,7 +88,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { withOrcFile(data) { file => checkAnswer( - read.format("orc").load(file), + read.orc(file), data.toDF().collect()) } } @@ -158,7 +158,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { withOrcFile(data) { file => checkAnswer( - read.format("orc").load(file), + read.orc(file), Row(Seq.fill(5)(null): _*)) } } @@ -310,7 +310,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { """.stripMargin) val errorMessage = intercept[AnalysisException] { - sqlContext.read.format("orc").load(path) + sqlContext.read.orc(path) }.getMessage assert(errorMessage.contains("Failed to discover schema from ORC files")) @@ -323,7 +323,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { |SELECT key, value FROM single """.stripMargin) - val df = sqlContext.read.format("orc").load(path) + val df = sqlContext.read.orc(path) assert(df.schema === singleRowDF.schema.asNullable) checkAnswer(df, singleRowDF) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala index 5daf691aa8c53..9d76d6503a3e6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala @@ -39,7 +39,7 @@ private[sql] trait OrcTest extends SQLTestUtils { (data: Seq[T]) (f: String => Unit): Unit = { withTempPath { file => - sparkContext.parallelize(data).toDF().write.format("orc").save(file.getCanonicalPath) + sparkContext.parallelize(data).toDF().write.orc(file.getCanonicalPath) f(file.getCanonicalPath) } } @@ -51,7 +51,7 @@ private[sql] trait OrcTest extends SQLTestUtils { protected def withOrcDataFrame[T <: Product: ClassTag: TypeTag] (data: Seq[T]) (f: DataFrame => Unit): Unit = { - withOrcFile(data)(path => f(sqlContext.read.format("orc").load(path))) + withOrcFile(data)(path => f(sqlContext.read.orc(path))) } /** @@ -70,11 +70,11 @@ private[sql] trait OrcTest extends SQLTestUtils { protected def makeOrcFile[T <: Product: ClassTag: TypeTag]( data: Seq[T], path: File): Unit = { - data.toDF().write.format("orc").mode(SaveMode.Overwrite).save(path.getCanonicalPath) + data.toDF().write.mode(SaveMode.Overwrite).orc(path.getCanonicalPath) } protected def makeOrcFile[T <: Product: ClassTag: TypeTag]( df: DataFrame, path: File): Unit = { - df.write.format("orc").mode(SaveMode.Overwrite).save(path.getCanonicalPath) + df.write.mode(SaveMode.Overwrite).orc(path.getCanonicalPath) } } From 284d0432f3f60ad25163ffa2165b3e90d97fc6a7 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 17 Jul 2015 16:45:45 +0800 Subject: [PATCH 2/2] Fixes PySpark test cases and addresses PR comments --- python/pyspark/sql/readwriter.py | 17 +++++++++++++---- .../org/apache/spark/sql/DataFrameReader.scala | 1 + .../org/apache/spark/sql/DataFrameWriter.scala | 1 + 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index dc4acc8c1490e..dea8bad79e187 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -157,9 +157,13 @@ def parquet(self, *paths): @since(1.5) def orc(self, path): - """Loads an ORC file, returning the result as a :class:`DataFrame`. + """ + Loads an ORC file, returning the result as a :class:`DataFrame`. + + ::Note: Currently ORC support is only available together with + :class:`HiveContext`. - >>> df = sqlContext.read.orc('python/test_support/sql/orc_partitioned') + >>> df = hiveContext.read.orc('python/test_support/sql/orc_partitioned') >>> df.dtypes [('a', 'bigint'), ('b', 'int'), ('c', 'int')] """ @@ -391,6 +395,9 @@ def parquet(self, path, mode=None, partitionBy=None): def orc(self, path, mode=None, partitionBy=None): """Saves the content of the :class:`DataFrame` in ORC format at the specified path. + ::Note: Currently ORC support is only available together with + :class:`HiveContext`. + :param path: the path in any Hadoop supported file system :param mode: specifies the behavior of the save operation when data already exists. @@ -400,7 +407,8 @@ def orc(self, path, mode=None, partitionBy=None): * ``error`` (default case): Throw an exception if data already exists. :param partitionBy: names of partitioning columns - >>> df.write.orc(os.path.join(tempfile.mkdtemp(), 'data')) + >>> orc_df = hiveContext.read.orc('python/test_support/sql/orc_partitioned') + >>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) if partitionBy is not None: @@ -437,7 +445,7 @@ def _test(): import os import tempfile from pyspark.context import SparkContext - from pyspark.sql import Row, SQLContext + from pyspark.sql import Row, SQLContext, HiveContext import pyspark.sql.readwriter os.chdir(os.environ["SPARK_HOME"]) @@ -449,6 +457,7 @@ def _test(): globs['os'] = os globs['sc'] = sc globs['sqlContext'] = SQLContext(sc) + globs['hiveContext'] = HiveContext(sc) globs['df'] = globs['sqlContext'].read.parquet('python/test_support/sql/parquet_partitioned') (failure_count, test_count) = doctest.testmod( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 818d99214bb4b..f8cffecf18c31 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -272,6 +272,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) { * * @param path input path * @since 1.5.0 + * @note Currently, this method can only be used together with `HiveContext`. */ def orc(path: String): DataFrame = format("orc").load(path) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index fcdc13b7f942a..3e7b9cd7976c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -288,6 +288,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { * }}} * * @since 1.5.0 + * @note Currently, this method can only be used together with `HiveContext`. */ def orc(path: String): Unit = format("orc").save(path)