Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,19 @@ class JavaSparkContext(val sc: SparkContext)
sc.addFile(path)
}

/**
* Add a file to be downloaded with this Spark job on every node.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
* use `SparkFiles.get(fileName)` to find its download location.
*
* A directory can be given if the recursive option is set to true. Currently directories are only
* supported for Hadoop-supported filesystems.
*/
def addFile(path: String, recursive: Boolean): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Why do we need this?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since JavaSparkContext is the Java stubs which will be called by PySpark.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're calling the method on SparkContext:

self._jsc.sc().addFile(path, recursive)

I don't think this needed to be exposed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. I found _jsc.sc() and _jsc are mix-used in context.py. I will do some clean up and unify them in a follow up work. Thanks for your comments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. There may be a reason the Java context is needed for some calls. I suppose that where the SparkContext could be used ... yeah that's simpler but doesn't really save anything because we wouldn't be able to take methods out of JavaSparkContext. That's why I was hoping to avoid adding a method to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen After investigating the code, I found it's not very straightforward to clean up the interfaces at JavaSparkContext, since they were called by Python and R. For Python side, we can use _jsc.sc() in some cases, but it's messy if we use both JavaSparkContext and JavaSparkContext.sc at R side. So I think we should leave it as it is, or any other suggestion? Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but can we undo this change? it doesn't seem like we need to duplicate this method in the Java API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not required to undo this, since I will send a PR to support recursively add files under a directory for SparkR soon and it will leverage this API. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, if that requires the Java context, I get it.

sc.addFile(path, recursive)
}

/**
* Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
Expand Down
7 changes: 5 additions & 2 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ def accumulator(self, value, accum_param=None):
SparkContext._next_accum_id += 1
return Accumulator(SparkContext._next_accum_id - 1, value, accum_param)

def addFile(self, path):
def addFile(self, path, recursive=False):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This basically doesn't change the API right? you can still call it as before with the same behavior.

It seems reasonable to me overall because it adds parity between the APIs, isn't complex and doesn't change behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it does not change the existing API.

"""
Add a file to be downloaded with this Spark job on every node.
The C{path} passed can be either a local file, a file in HDFS
Expand All @@ -773,6 +773,9 @@ def addFile(self, path):
L{SparkFiles.get(fileName)<pyspark.files.SparkFiles.get>} with the
filename to find its download location.

A directory can be given if the recursive option is set to True.
Currently directories are only supported for Hadoop-supported filesystems.

>>> from pyspark import SparkFiles
>>> path = os.path.join(tempdir, "test.txt")
>>> with open(path, "w") as testFile:
Expand All @@ -785,7 +788,7 @@ def addFile(self, path):
>>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()
[100, 200, 300, 400]
"""
self._jsc.sc().addFile(path)
self._jsc.sc().addFile(path, recursive)

def addPyFile(self, path):
"""
Expand Down
20 changes: 15 additions & 5 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,13 +409,23 @@ def func(x):
self.assertEqual("Hello World!", res)

def test_add_file_locally(self):
path = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
path = os.path.join(SPARK_HOME, "python/test_support/hello/hello.txt")
self.sc.addFile(path)
download_path = SparkFiles.get("hello.txt")
self.assertNotEqual(path, download_path)
with open(download_path) as test_file:
self.assertEqual("Hello World!\n", test_file.readline())

def test_add_file_recursively_locally(self):
path = os.path.join(SPARK_HOME, "python/test_support/hello")
self.sc.addFile(path, True)
download_path = SparkFiles.get("hello")
self.assertNotEqual(path, download_path)
with open(download_path + "/hello.txt") as test_file:
self.assertEqual("Hello World!\n", test_file.readline())
with open(download_path + "/sub_hello/sub_hello.txt") as test_file:
self.assertEqual("Sub Hello World!\n", test_file.readline())

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: maybe the above block should be in a separate test like def test_add_file_locally_recursive?

def test_add_py_file_locally(self):
# To ensure that we're actually testing addPyFile's effects, check that
# this fails due to `userlibrary` not being on the Python path:
Expand Down Expand Up @@ -514,7 +524,7 @@ def test_transforming_pickle_file(self):

def test_cartesian_on_textfile(self):
# Regression test for
path = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
path = os.path.join(SPARK_HOME, "python/test_support/hello/hello.txt")
a = self.sc.textFile(path)
result = a.cartesian(a).collect()
(x, y) = result[0]
Expand Down Expand Up @@ -751,7 +761,7 @@ def test_zip_with_different_serializers(self):
b = b._reserialize(MarshalSerializer())
self.assertEqual(a.zip(b).collect(), [(0, 100), (1, 101), (2, 102), (3, 103), (4, 104)])
# regression test for SPARK-4841
path = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
path = os.path.join(SPARK_HOME, "python/test_support/hello/hello.txt")
t = self.sc.textFile(path)
cnt = t.count()
self.assertEqual(cnt, t.zip(t).count())
Expand Down Expand Up @@ -1214,7 +1224,7 @@ def test_oldhadoop(self):
ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
self.assertEqual(ints, ei)

hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
hellopath = os.path.join(SPARK_HOME, "python/test_support/hello/hello.txt")
oldconf = {"mapred.input.dir": hellopath}
hello = self.sc.hadoopRDD("org.apache.hadoop.mapred.TextInputFormat",
"org.apache.hadoop.io.LongWritable",
Expand All @@ -1233,7 +1243,7 @@ def test_newhadoop(self):
ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
self.assertEqual(ints, ei)

hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
hellopath = os.path.join(SPARK_HOME, "python/test_support/hello/hello.txt")
newconf = {"mapred.input.dir": hellopath}
hello = self.sc.newAPIHadoopRDD("org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
"org.apache.hadoop.io.LongWritable",
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions python/test_support/hello/sub_hello/sub_hello.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Sub Hello World!