Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions docs/sql-data-sources-generic-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,40 @@ To load all files recursively, you can use:
{% include_example recursive_file_lookup r/RSparkSQLExample.R %}
</div>
</div>

### Modification Time Path Filters

`modifiedBefore` and `modifiedAfter` are options that can be
applied together or separately in order to achieve greater
granularity over which files may load during a Spark batch query.
(Note that Structured Streaming file sources don't support these options.)

* `modifiedBefore`: an optional timestamp to only include files with
modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
* `modifiedAfter`: an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)

When a timezone option is not provided, the timestamps will be interpreted according
to the Spark session timezone (`spark.sql.session.timeZone`).

To load files with paths matching a given modified time range, you can use:

<div class="codetabs">
<div data-lang="scala" markdown="1">
{% include_example load_with_modified_time_filter scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
</div>

<div data-lang="java" markdown="1">
{% include_example load_with_modified_time_filter java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
</div>

<div data-lang="python" markdown="1">
{% include_example load_with_modified_time_filter python/sql/datasource.py %}
</div>

<div data-lang="r" markdown="1">
{% include_example load_with_modified_time_filter r/RSparkSQLExample.R %}
</div>
</div>
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,22 @@ private static void runGenericFileSourceOptionsExample(SparkSession spark) {
// |file1.parquet|
// +-------------+
// $example off:load_with_path_glob_filter$
// $example on:load_with_modified_time_filter$
Dataset<Row> beforeFilterDF = spark.read().format("parquet")
// Only load files modified before 7/1/2020 at 05:30
.option("modifiedBefore", "2020-07-01T05:30:00")
// Only load files modified after 6/1/2020 at 05:30
.option("modifiedAfter", "2020-06-01T05:30:00")
// Interpret both times above relative to CST timezone
.option("timeZone", "CST")
.load("examples/src/main/resources/dir1");
beforeFilterDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
// $example off:load_with_modified_time_filter$
}

private static void runBasicDataSourceExample(SparkSession spark) {
Expand Down
20 changes: 20 additions & 0 deletions examples/src/main/python/sql/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,26 @@ def generic_file_source_options_example(spark):
# +-------------+
# $example off:load_with_path_glob_filter$

# $example on:load_with_modified_time_filter$
# Only load files modified before 07/1/2050 @ 08:30:00
df = spark.read.load("examples/src/main/resources/dir1",
format="parquet", modifiedBefore="2050-07-01T08:30:00")
df.show()
# +-------------+
# | file|
# +-------------+
# |file1.parquet|
# +-------------+
# Only load files modified after 06/01/2050 @ 08:30:00
df = spark.read.load("examples/src/main/resources/dir1",
format="parquet", modifiedAfter="2050-06-01T08:30:00")
df.show()
# +-------------+
# | file|
# +-------------+
# +-------------+
# $example off:load_with_modified_time_filter$


def basic_datasource_example(spark):
# $example on:generic_load_save_functions$
Expand Down
8 changes: 8 additions & 0 deletions examples/src/main/r/RSparkSQLExample.R
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ df <- read.df("examples/src/main/resources/dir1", "parquet", pathGlobFilter = "*
# 1 file1.parquet
# $example off:load_with_path_glob_filter$

# $example on:load_with_modified_time_filter$
beforeDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedBefore= "2020-07-01T05:30:00")
# file
# 1 file1.parquet
afterDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedAfter = "2020-06-01T05:30:00")
# file
# $example off:load_with_modified_time_filter$

# $example on:manual_save_options_orc$
df <- read.df("examples/src/main/resources/users.orc", "orc")
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,27 @@ object SQLDataSourceExample {
// |file1.parquet|
// +-------------+
// $example off:load_with_path_glob_filter$
// $example on:load_with_modified_time_filter$
val beforeFilterDF = spark.read.format("parquet")
// Files modified before 07/01/2020 at 05:30 are allowed
.option("modifiedBefore", "2020-07-01T05:30:00")
.load("examples/src/main/resources/dir1");
beforeFilterDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
val afterFilterDF = spark.read.format("parquet")
// Files modified after 06/01/2020 at 05:30 are allowed
.option("modifiedAfter", "2020-06-01T05:30:00")
.load("examples/src/main/resources/dir1");
afterFilterDF.show();
// +-------------+
// | file|
// +-------------+
// +-------------+
// $example off:load_with_modified_time_filter$
}

private def runBasicDataSourceExample(spark: SparkSession): Unit = {
Expand Down
81 changes: 74 additions & 7 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ def option(self, key, value):
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.
* ``modifiedBefore``: an optional timestamp to only include files with
modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
* ``modifiedAfter``: an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
"""
self._jreader = self._jreader.option(key, to_str(value))
return self
Expand All @@ -149,6 +155,12 @@ def options(self, **options):
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.
* ``modifiedBefore``: an optional timestamp to only include files with
modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
* ``modifiedAfter``: an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
"""
for k in options:
self._jreader = self._jreader.option(k, to_str(options[k]))
Expand Down Expand Up @@ -203,7 +215,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
multiLine=None, allowUnquotedControlChars=None, lineSep=None, samplingRatio=None,
dropFieldIfAllNull=None, encoding=None, locale=None, pathGlobFilter=None,
recursiveFileLookup=None, allowNonNumericNumbers=None):
recursiveFileLookup=None, allowNonNumericNumbers=None,
modifiedBefore=None, modifiedAfter=None):
"""
Loads JSON files and returns the results as a :class:`DataFrame`.

Expand Down Expand Up @@ -322,6 +335,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
``+Infinity`` and ``Infinity``.
* ``-INF``: for negative infinity, alias ``-Infinity``.
* ``NaN``: for other not-a-numbers, like result of division by zero.
modifiedBefore : an optional timestamp to only include files with
modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
modifiedAfter : an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)


Examples
--------
Expand All @@ -344,6 +364,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep,
samplingRatio=samplingRatio, dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding,
locale=locale, pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup,
modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter,
allowNonNumericNumbers=allowNonNumericNumbers)
if isinstance(path, str):
path = [path]
Expand Down Expand Up @@ -410,6 +431,15 @@ def parquet(self, *paths, **options):
disables
`partition discovery <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery>`_. # noqa

modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
modifiedBefore (batch only) : an optional timestamp to only include files with
modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
modifiedAfter (batch only) : an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)

Examples
--------
>>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
Expand All @@ -418,13 +448,18 @@ def parquet(self, *paths, **options):
"""
mergeSchema = options.get('mergeSchema', None)
pathGlobFilter = options.get('pathGlobFilter', None)
modifiedBefore = options.get('modifiedBefore', None)
modifiedAfter = options.get('modifiedAfter', None)
recursiveFileLookup = options.get('recursiveFileLookup', None)
self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
recursiveFileLookup=recursiveFileLookup)
recursiveFileLookup=recursiveFileLookup, modifiedBefore=modifiedBefore,
modifiedAfter=modifiedAfter)

return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))

def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None,
recursiveFileLookup=None):
recursiveFileLookup=None, modifiedBefore=None,
modifiedAfter=None):
"""
Loads text files and returns a :class:`DataFrame` whose schema starts with a
string column named "value", and followed by partitioned columns if there
Expand Down Expand Up @@ -453,6 +488,15 @@ def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None,
recursively scan a directory for files. Using this option disables
`partition discovery <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery>`_. # noqa

modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
modifiedBefore (batch only) : an optional timestamp to only include files with
modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
modifiedAfter (batch only) : an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)

Examples
--------
>>> df = spark.read.text('python/test_support/sql/text-test.txt')
Expand All @@ -464,7 +508,9 @@ def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None,
"""
self._set_opts(
wholetext=wholetext, lineSep=lineSep, pathGlobFilter=pathGlobFilter,
recursiveFileLookup=recursiveFileLookup)
recursiveFileLookup=recursiveFileLookup, modifiedBefore=modifiedBefore,
modifiedAfter=modifiedAfter)

if isinstance(paths, str):
paths = [paths]
return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths)))
Expand All @@ -476,7 +522,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
pathGlobFilter=None, recursiveFileLookup=None):
pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None):
r"""Loads a CSV file and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -631,6 +677,15 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
recursively scan a directory for files. Using this option disables
`partition discovery <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery>`_. # noqa

modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
modifiedBefore (batch only) : an optional timestamp to only include files with
modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
modifiedAfter (batch only) : an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)

Examples
--------
>>> df = spark.read.csv('python/test_support/sql/ages.csv')
Expand All @@ -652,7 +707,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep,
pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup)
pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup,
modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter)
if isinstance(path, str):
path = [path]
if type(path) == list:
Expand All @@ -679,7 +735,8 @@ def func(iterator):
else:
raise TypeError("path can be only string, list or RDD")

def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None):
def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None,
modifiedBefore=None, modifiedAfter=None):
"""Loads ORC files, returning the result as a :class:`DataFrame`.

.. versionadded:: 1.5.0
Expand All @@ -701,13 +758,23 @@ def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=N
disables
`partition discovery <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery>`_. # noqa

modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
modifiedBefore : an optional timestamp to only include files with
modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
modifiedAfter : an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)

Examples
--------
>>> df = spark.read.orc('python/test_support/sql/orc_partitioned')
>>> df.dtypes
[('a', 'bigint'), ('b', 'int'), ('c', 'int')]
"""
self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter,
recursiveFileLookup=recursiveFileLookup)
if isinstance(path, str):
path = [path]
Expand Down
Loading