Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,28 @@ def table(self, tableName):
return self._df(self._jreader.table(tableName))

@since(1.4)
def parquet(self, *path):
def parquet(self, *paths):
"""Loads a Parquet file, returning the result as a :class:`DataFrame`.
>>> df = sqlContext.read.parquet('python/test_support/sql/parquet_partitioned')
>>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
"""
return self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, path)))
return self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, paths)))

@since(1.5)
def orc(self, path):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the function parquet support multiple path in loading, should the api orc support that also?

"""
Loads an ORC file, returning the result as a :class:`DataFrame`.
::Note: Currently ORC support is only available together with
:class:`HiveContext`.
>>> df = hiveContext.read.orc('python/test_support/sql/orc_partitioned')
>>> df.dtypes
[('a', 'bigint'), ('b', 'int'), ('c', 'int')]
"""
return self._df(self._jreader.orc(path))

@since(1.4)
def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None,
Expand Down Expand Up @@ -378,6 +392,29 @@ def parquet(self, path, mode=None, partitionBy=None):
self.partitionBy(partitionBy)
self._jwrite.parquet(path)

def orc(self, path, mode=None, partitionBy=None):
"""Saves the content of the :class:`DataFrame` in ORC format at the specified path.
::Note: Currently ORC support is only available together with
:class:`HiveContext`.
:param path: the path in any Hadoop supported file system
:param mode: specifies the behavior of the save operation when data already exists.
* ``append``: Append contents of this :class:`DataFrame` to existing data.
* ``overwrite``: Overwrite existing data.
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` (default case): Throw an exception if data already exists.
:param partitionBy: names of partitioning columns
>>> orc_df = hiveContext.read.orc('python/test_support/sql/orc_partitioned')
>>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
if partitionBy is not None:
self.partitionBy(partitionBy)
self._jwrite.orc(path)

@since(1.4)
def jdbc(self, url, table, mode=None, properties={}):
"""Saves the content of the :class:`DataFrame` to a external database table via JDBC.
Expand Down Expand Up @@ -408,7 +445,7 @@ def _test():
import os
import tempfile
from pyspark.context import SparkContext
from pyspark.sql import Row, SQLContext
from pyspark.sql import Row, SQLContext, HiveContext
import pyspark.sql.readwriter

os.chdir(os.environ["SPARK_HOME"])
Expand All @@ -420,6 +457,7 @@ def _test():
globs['os'] = os
globs['sc'] = sc
globs['sqlContext'] = SQLContext(sc)
globs['hiveContext'] = HiveContext(sc)
globs['df'] = globs['sqlContext'].read.parquet('python/test_support/sql/parquet_partitioned')

(failure_count, test_count) = doctest.testmod(
Expand Down
Binary file not shown.
Empty file.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,15 @@ class DataFrameReader private[sql](sqlContext: SQLContext) {
}
}

/**
* Loads an ORC file and returns the result as a [[DataFrame]].
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Also add examples like you did at DataFrameWriter?

  • This is equivalent to:
  • {{{
  • format("orc").load(path)
  • }}}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The one in DataFrameWriter isn't an example, it says that method is equivalent to format("orc").load(path).

* @param path input path
* @since 1.5.0
* @note Currently, this method can only be used together with `HiveContext`.
*/
def orc(path: String): DataFrame = format("orc").load(path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple paths support?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did consider this but didn't add multiple-path support in this PR intentionally. The problem here is that currently we can't specify multiple paths via data source options, while DataFrame reader/writer API relies on the "path" option to find the input path. If you check DataFrameReader.parquet, you may see it's specially implemented, which I don't think is a good approach.

I'm thinking about adding multiple value support for data source options in a more general way. (Maybe just simple comma separated lists with proper comma escaping.)


/**
* Returns the specified table as a [[DataFrame]].
*
Expand Down
12 changes: 12 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,18 @@ final class DataFrameWriter private[sql](df: DataFrame) {
*/
def parquet(path: String): Unit = format("parquet").save(path)

/**
* Saves the content of the [[DataFrame]] in ORC format at the specified path.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should document this is only available if hive is ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, right. Thanks for reminding!

* This is equivalent to:
* {{{
* format("orc").save(path)
* }}}
*
* @since 1.5.0
* @note Currently, this method can only be used together with `HiveContext`.
*/
def orc(path: String): Unit = format("orc").save(path)

///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
///////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
.parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1))
.toDF("a", "b", "p1")
.write
.format("orc")
.save(partitionDir.toString)
.orc(partitionDir.toString)
}

val dataSchemaWithPartition =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll {

def makeOrcFile[T <: Product: ClassTag: TypeTag](
data: Seq[T], path: File): Unit = {
data.toDF().write.format("orc").mode("overwrite").save(path.getCanonicalPath)
data.toDF().write.mode("overwrite").orc(path.getCanonicalPath)
}


def makeOrcFile[T <: Product: ClassTag: TypeTag](
df: DataFrame, path: File): Unit = {
df.write.format("orc").mode("overwrite").save(path.getCanonicalPath)
df.write.mode("overwrite").orc(path.getCanonicalPath)
}

protected def withTempTable(tableName: String)(f: => Unit): Unit = {
Expand Down Expand Up @@ -90,7 +90,7 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll {
makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
}

read.format("orc").load(base.getCanonicalPath).registerTempTable("t")
read.orc(base.getCanonicalPath).registerTempTable("t")

withTempTable("t") {
checkAnswer(
Expand Down Expand Up @@ -137,7 +137,7 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll {
makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
}

read.format("orc").load(base.getCanonicalPath).registerTempTable("t")
read.orc(base.getCanonicalPath).registerTempTable("t")

withTempTable("t") {
checkAnswer(
Expand Down Expand Up @@ -187,9 +187,8 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll {
}

read
.format("orc")
.option(ConfVars.DEFAULTPARTITIONNAME.varname, defaultPartitionName)
.load(base.getCanonicalPath)
.orc(base.getCanonicalPath)
.registerTempTable("t")

withTempTable("t") {
Expand Down Expand Up @@ -230,9 +229,8 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll {
}

read
.format("orc")
.option(ConfVars.DEFAULTPARTITIONNAME.varname, defaultPartitionName)
.load(base.getCanonicalPath)
.orc(base.getCanonicalPath)
.registerTempTable("t")

withTempTable("t") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {

withOrcFile(data) { file =>
checkAnswer(
sqlContext.read.format("orc").load(file),
sqlContext.read.orc(file),
data.toDF().collect())
}
}

test("Read/write binary data") {
withOrcFile(BinaryData("test".getBytes("utf8")) :: Nil) { file =>
val bytes = read.format("orc").load(file).head().getAs[Array[Byte]](0)
val bytes = read.orc(file).head().getAs[Array[Byte]](0)
assert(new String(bytes, "utf8") === "test")
}
}
Expand All @@ -88,7 +88,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {

withOrcFile(data) { file =>
checkAnswer(
read.format("orc").load(file),
read.orc(file),
data.toDF().collect())
}
}
Expand Down Expand Up @@ -158,7 +158,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {

withOrcFile(data) { file =>
checkAnswer(
read.format("orc").load(file),
read.orc(file),
Row(Seq.fill(5)(null): _*))
}
}
Expand Down Expand Up @@ -310,7 +310,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
""".stripMargin)

val errorMessage = intercept[AnalysisException] {
sqlContext.read.format("orc").load(path)
sqlContext.read.orc(path)
}.getMessage

assert(errorMessage.contains("Failed to discover schema from ORC files"))
Expand All @@ -323,7 +323,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
|SELECT key, value FROM single
""".stripMargin)

val df = sqlContext.read.format("orc").load(path)
val df = sqlContext.read.orc(path)
assert(df.schema === singleRowDF.schema.asNullable)
checkAnswer(df, singleRowDF)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private[sql] trait OrcTest extends SQLTestUtils {
(data: Seq[T])
(f: String => Unit): Unit = {
withTempPath { file =>
sparkContext.parallelize(data).toDF().write.format("orc").save(file.getCanonicalPath)
sparkContext.parallelize(data).toDF().write.orc(file.getCanonicalPath)
f(file.getCanonicalPath)
}
}
Expand All @@ -51,7 +51,7 @@ private[sql] trait OrcTest extends SQLTestUtils {
protected def withOrcDataFrame[T <: Product: ClassTag: TypeTag]
(data: Seq[T])
(f: DataFrame => Unit): Unit = {
withOrcFile(data)(path => f(sqlContext.read.format("orc").load(path)))
withOrcFile(data)(path => f(sqlContext.read.orc(path)))
}

/**
Expand All @@ -70,11 +70,11 @@ private[sql] trait OrcTest extends SQLTestUtils {

protected def makeOrcFile[T <: Product: ClassTag: TypeTag](
data: Seq[T], path: File): Unit = {
data.toDF().write.format("orc").mode(SaveMode.Overwrite).save(path.getCanonicalPath)
data.toDF().write.mode(SaveMode.Overwrite).orc(path.getCanonicalPath)
}

protected def makeOrcFile[T <: Product: ClassTag: TypeTag](
df: DataFrame, path: File): Unit = {
df.write.format("orc").mode(SaveMode.Overwrite).save(path.getCanonicalPath)
df.write.mode(SaveMode.Overwrite).orc(path.getCanonicalPath)
}
}