Skip to content

Commit cabe1df

Browse files
zjffduholdenk
authored andcommitted
[SPARK-12334][SQL][PYSPARK] Support read from multiple input paths for orc file in DataFrameReader.orc
Beside the issue in spark api, also fix 2 minor issues in pyspark - support read from multiple input paths for orc - support read from multiple input paths for text Author: Jeff Zhang <[email protected]> Closes #10307 from zjffdu/SPARK-12334.
1 parent 30b18e6 commit cabe1df

File tree

4 files changed

+25
-9
lines changed

4 files changed

+25
-9
lines changed

python/pyspark/sql/readwriter.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -161,15 +161,15 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
161161
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
162162
timeZone=None, wholeFile=None):
163163
"""
164-
Loads a JSON file and returns the results as a :class:`DataFrame`.
164+
Loads JSON files and returns the results as a :class:`DataFrame`.
165165
166166
`JSON Lines <http://jsonlines.org/>`_(newline-delimited JSON) is supported by default.
167167
For JSON (one record per file), set the `wholeFile` parameter to ``true``.
168168
169169
If the ``schema`` parameter is not specified, this function goes
170170
through the input once to determine the input schema.
171171
172-
:param path: string represents path to the JSON dataset,
172+
:param path: string represents path to the JSON dataset, or a list of paths,
173173
or RDD of Strings storing JSON objects.
174174
:param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema.
175175
:param primitivesAsString: infers all primitive values as a string type. If None is set,
@@ -252,7 +252,7 @@ def func(iterator):
252252
jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
253253
return self._df(self._jreader.json(jrdd))
254254
else:
255-
raise TypeError("path can be only string or RDD")
255+
raise TypeError("path can be only string, list or RDD")
256256

257257
@since(1.4)
258258
def table(self, tableName):
@@ -269,7 +269,7 @@ def table(self, tableName):
269269

270270
@since(1.4)
271271
def parquet(self, *paths):
272-
"""Loads a Parquet file, returning the result as a :class:`DataFrame`.
272+
"""Loads Parquet files, returning the result as a :class:`DataFrame`.
273273
274274
You can set the following Parquet-specific option(s) for reading Parquet files:
275275
* ``mergeSchema``: sets whether we should merge schemas collected from all \
@@ -407,15 +407,17 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
407407

408408
@since(1.5)
409409
def orc(self, path):
410-
"""Loads an ORC file, returning the result as a :class:`DataFrame`.
410+
"""Loads ORC files, returning the result as a :class:`DataFrame`.
411411
412412
.. note:: Currently ORC support is only available together with Hive support.
413413
414414
>>> df = spark.read.orc('python/test_support/sql/orc_partitioned')
415415
>>> df.dtypes
416416
[('a', 'bigint'), ('b', 'int'), ('c', 'int')]
417417
"""
418-
return self._df(self._jreader.orc(path))
418+
if isinstance(path, basestring):
419+
path = [path]
420+
return self._df(self._jreader.orc(_to_seq(self._spark._sc, path)))
419421

420422
@since(1.4)
421423
def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None,

python/pyspark/sql/tests.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,11 @@ def test_wholefile_csv(self):
450450
Row(_c0=u'Hyukjin', _c1=u'25', _c2=u'I am Hyukjin\n\nI love Spark!')]
451451
self.assertEqual(ages_newlines.collect(), expected)
452452

453+
def test_read_multiple_orc_file(self):
454+
df = self.spark.read.orc(["python/test_support/sql/orc_partitioned/b=0/c=0",
455+
"python/test_support/sql/orc_partitioned/b=1/c=1"])
456+
self.assertEqual(2, df.count())
457+
453458
def test_udf_with_input_file_name(self):
454459
from pyspark.sql.functions import udf, input_file_name
455460
from pyspark.sql.types import StringType

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
262262
}
263263

264264
/**
265-
* Loads a JSON file and returns the results as a `DataFrame`.
265+
* Loads JSON files and returns the results as a `DataFrame`.
266266
*
267267
* <a href="http://jsonlines.org/">JSON Lines</a> (newline-delimited JSON) is supported by
268268
* default. For JSON (one record per file), set the `wholeFile` option to true.
@@ -438,7 +438,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
438438
}
439439

440440
/**
441-
* Loads a CSV file and returns the result as a `DataFrame`.
441+
* Loads CSV files and returns the result as a `DataFrame`.
442442
*
443443
* This function will go through the input once to determine the input schema if `inferSchema`
444444
* is enabled. To avoid going through the entire data once, disable `inferSchema` option or
@@ -549,7 +549,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
549549
}
550550

551551
/**
552-
* Loads an ORC file and returns the result as a `DataFrame`.
552+
* Loads ORC files and returns the result as a `DataFrame`.
553553
*
554554
* @param paths input paths
555555
* @since 2.0.0

sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.spark.sql.hive.test.TestHive._
3333
import org.apache.spark.sql.hive.test.TestHive.implicits._
3434
import org.apache.spark.sql.internal.SQLConf
3535
import org.apache.spark.sql.types.{IntegerType, StructType}
36+
import org.apache.spark.util.Utils
3637

3738
case class AllDataTypesWithNonPrimitiveType(
3839
stringField: String,
@@ -611,4 +612,12 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
611612
}
612613
}
613614
}
615+
616+
test("read from multiple orc input paths") {
617+
val path1 = Utils.createTempDir()
618+
val path2 = Utils.createTempDir()
619+
makeOrcFile((1 to 10).map(Tuple1.apply), path1)
620+
makeOrcFile((1 to 10).map(Tuple1.apply), path2)
621+
assertResult(20)(read.orc(path1.getCanonicalPath, path2.getCanonicalPath).count())
622+
}
614623
}

0 commit comments

Comments
 (0)