Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ Spark Context APIs

SparkContext.PACKAGE_EXTENSIONS
SparkContext.accumulator
SparkContext.addArchive
SparkContext.addFile
SparkContext.addPyFile
SparkContext.applicationId
Expand Down
44 changes: 44 additions & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,50 @@ def addPyFile(self, path: str) -> None:

importlib.invalidate_caches()

def addArchive(self, path: str) -> None:
"""
Add an archive to be downloaded with this Spark job on every node.
The `path` passed can be either a local file, a file in HDFS
(or other Hadoop-supported filesystems), or an HTTP, HTTPS or
FTP URI.

To access the file in Spark jobs, use :meth:`SparkFiles.get` with the
filename to find its download/unpacked location. The given path should
be one of .zip, .tar, .tar.gz, .tgz and .jar.

.. versionadded:: 3.3.0

Notes
-----
A path can be added only once. Subsequent additions of the same path are ignored.
This API is experimental.

Examples
--------
Creates a zipped file that contains a text file written '100'.

>>> import zipfile
>>> from pyspark import SparkFiles
>>> path = os.path.join(tempdir, "test.txt")
>>> zip_path = os.path.join(tempdir, "test.zip")
>>> with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipped:
... with open(path, "w") as f:
... _ = f.write("100")
... zipped.write(path, os.path.basename(path))
>>> sc.addArchive(zip_path)

Reads the '100' as an integer in the zipped file, and processes
it with the data in the RDD.

>>> def func(iterator):
... with open("%s/test.txt" % SparkFiles.get("test.zip")) as f:
... v = int(f.readline())
... return [x * int(v) for x in iterator]
>>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()
[100, 200, 300, 400]
"""
self._jsc.sc().addArchive(path)

def setCheckpointDir(self, dirName: str) -> None:
"""
Set the directory under which RDDs are going to be checkpointed. The
Expand Down