Skip to content

Commit 736f6ce

Browse files
committed
Merge branch 'master' into SPARK-16775-reduce-internal-warnings-from-deprecated-accumulator-api
2 parents 46fa97d + e9fc0b6 commit 736f6ce

File tree

44 files changed

+832
-695
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+832
-695
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io._
2121
import java.lang.reflect.Constructor
2222
import java.net.URI
2323
import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
24-
import java.util.concurrent.ConcurrentMap
24+
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
2525
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
2626

2727
import scala.collection.JavaConverters._
@@ -262,8 +262,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
262262
private[spark] def env: SparkEnv = _env
263263

264264
// Used to store a URL for each static file/jar together with the file's local timestamp
265-
private[spark] val addedFiles = HashMap[String, Long]()
266-
private[spark] val addedJars = HashMap[String, Long]()
265+
private[spark] val addedFiles = new ConcurrentHashMap[String, Long]().asScala
266+
private[spark] val addedJars = new ConcurrentHashMap[String, Long]().asScala
267267

268268
// Keeps track of all persisted RDDs
269269
private[spark] val persistentRdds = {
@@ -1430,14 +1430,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
14301430
schemeCorrectedPath
14311431
}
14321432
val timestamp = System.currentTimeMillis
1433-
addedFiles(key) = timestamp
1434-
1435-
// Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
1436-
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
1437-
hadoopConfiguration, timestamp, useCache = false)
1438-
1439-
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
1440-
postEnvironmentUpdate()
1433+
if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
1434+
logInfo(s"Added file $path at $key with timestamp $timestamp")
1435+
// Fetch the file locally so that closures which are run on the driver can still use the
1436+
// SparkFiles API to access files.
1437+
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
1438+
hadoopConfiguration, timestamp, useCache = false)
1439+
postEnvironmentUpdate()
1440+
}
14411441
}
14421442

14431443
/**
@@ -1705,12 +1705,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
17051705
case exc: FileNotFoundException =>
17061706
logError(s"Jar not found at $path")
17071707
null
1708-
case e: Exception =>
1709-
// For now just log an error but allow to go through so spark examples work.
1710-
// The spark examples don't really need the jar distributed since its also
1711-
// the app jar.
1712-
logError("Error adding jar (" + e + "), was the --addJars option used?")
1713-
null
17141708
}
17151709
}
17161710
// A JAR file which exists locally on every worker node
@@ -1721,11 +1715,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
17211715
}
17221716
}
17231717
if (key != null) {
1724-
addedJars(key) = System.currentTimeMillis
1725-
logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key))
1718+
val timestamp = System.currentTimeMillis
1719+
if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
1720+
logInfo(s"Added JAR $path at $key with timestamp $timestamp")
1721+
postEnvironmentUpdate()
1722+
}
17261723
}
17271724
}
1728-
postEnvironmentUpdate()
17291725
}
17301726

17311727
/**

core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.rpc.netty
1919

20-
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
20+
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
2121
import javax.annotation.concurrent.GuardedBy
2222

2323
import scala.collection.JavaConverters._
@@ -42,8 +42,10 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
4242
val inbox = new Inbox(ref, endpoint)
4343
}
4444

45-
private val endpoints = new ConcurrentHashMap[String, EndpointData]
46-
private val endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
45+
private val endpoints: ConcurrentMap[String, EndpointData] =
46+
new ConcurrentHashMap[String, EndpointData]
47+
private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] =
48+
new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
4749

4850
// Track the receivers whose inboxes may contain messages.
4951
private val receivers = new LinkedBlockingQueue[EndpointData]

core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,18 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
6666
}
6767

6868
override def addFile(file: File): String = {
69-
require(files.putIfAbsent(file.getName(), file) == null,
70-
s"File ${file.getName()} already registered.")
69+
val existingPath = files.putIfAbsent(file.getName, file)
70+
require(existingPath == null || existingPath == file,
71+
s"File ${file.getName} was already registered with a different path " +
72+
s"(old path = $existingPath, new path = $file")
7173
s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file.getName())}"
7274
}
7375

7476
override def addJar(file: File): String = {
75-
require(jars.putIfAbsent(file.getName(), file) == null,
76-
s"JAR ${file.getName()} already registered.")
77+
val existingPath = jars.putIfAbsent(file.getName, file)
78+
require(existingPath == null || existingPath == file,
79+
s"File ${file.getName} was already registered with a different path " +
80+
s"(old path = $existingPath, new path = $file")
7781
s"${rpcEnv.address.toSparkURL}/jars/${Utils.encodeFileNameToURIRawPath(file.getName())}"
7882
}
7983

core/src/main/scala/org/apache/spark/scheduler/Task.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.io.{DataInputStream, DataOutputStream}
2121
import java.nio.ByteBuffer
2222
import java.util.Properties
2323

24+
import scala.collection.mutable
2425
import scala.collection.mutable.HashMap
2526

2627
import org.apache.spark._
@@ -198,8 +199,8 @@ private[spark] object Task {
198199
*/
199200
def serializeWithDependencies(
200201
task: Task[_],
201-
currentFiles: HashMap[String, Long],
202-
currentJars: HashMap[String, Long],
202+
currentFiles: mutable.Map[String, Long],
203+
currentJars: mutable.Map[String, Long],
203204
serializer: SerializerInstance)
204205
: ByteBuffer = {
205206

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,57 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
216216
}
217217
}
218218

219+
test("cannot call addFile with different paths that have the same filename") {
220+
val dir = Utils.createTempDir()
221+
try {
222+
val subdir1 = new File(dir, "subdir1")
223+
val subdir2 = new File(dir, "subdir2")
224+
assert(subdir1.mkdir())
225+
assert(subdir2.mkdir())
226+
val file1 = new File(subdir1, "file")
227+
val file2 = new File(subdir2, "file")
228+
Files.write("old", file1, StandardCharsets.UTF_8)
229+
Files.write("new", file2, StandardCharsets.UTF_8)
230+
sc = new SparkContext("local-cluster[1,1,1024]", "test")
231+
sc.addFile(file1.getAbsolutePath)
232+
def getAddedFileContents(): String = {
233+
sc.parallelize(Seq(0)).map { _ =>
234+
scala.io.Source.fromFile(SparkFiles.get("file")).mkString
235+
}.first()
236+
}
237+
assert(getAddedFileContents() === "old")
238+
intercept[IllegalArgumentException] {
239+
sc.addFile(file2.getAbsolutePath)
240+
}
241+
assert(getAddedFileContents() === "old")
242+
} finally {
243+
Utils.deleteRecursively(dir)
244+
}
245+
}
246+
247+
// Regression tests for SPARK-16787
248+
for (
249+
schedulingMode <- Seq("local-mode", "non-local-mode");
250+
method <- Seq("addJar", "addFile")
251+
) {
252+
val jarPath = Thread.currentThread().getContextClassLoader.getResource("TestUDTF.jar").toString
253+
val master = schedulingMode match {
254+
case "local-mode" => "local"
255+
case "non-local-mode" => "local-cluster[1,1,1024]"
256+
}
257+
test(s"$method can be called twice with same file in $schedulingMode (SPARK-16787)") {
258+
sc = new SparkContext(master, "test")
259+
method match {
260+
case "addJar" =>
261+
sc.addJar(jarPath)
262+
sc.addJar(jarPath)
263+
case "addFile" =>
264+
sc.addFile(jarPath)
265+
sc.addFile(jarPath)
266+
}
267+
}
268+
}
269+
219270
test("Cancelling job group should not cause SparkContext to shutdown (SPARK-6414)") {
220271
try {
221272
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))

docs/js/api-docs.js

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,23 @@ function addBadges(allAnnotations, name, tag, html) {
4141
.add(annotations.closest("div.fullcomment").prevAll("h4.signature"))
4242
.prepend(html);
4343
}
44+
45+
$(document).ready(function() {
46+
var script = document.createElement('script');
47+
script.type = 'text/javascript';
48+
script.async = true;
49+
script.onload = function(){
50+
MathJax.Hub.Config({
51+
displayAlign: "left",
52+
tex2jax: {
53+
inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ],
54+
displayMath: [ ["$$","$$"], ["\\[", "\\]"] ],
55+
processEscapes: true,
56+
skipTags: ['script', 'noscript', 'style', 'textarea', 'pre', 'a']
57+
}
58+
});
59+
};
60+
script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') +
61+
'cdn.mathjax.org/mathjax/latest/MathJax.js?config=TeX-AMS-MML_HTMLorMML';
62+
document.getElementsByTagName('head')[0].appendChild(script);
63+
});

docs/sql-programming-guide.md

Lines changed: 14 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ from a Hive table, or from [Spark data sources](#data-sources).
132132

133133
As an example, the following creates a DataFrame based on the content of a JSON file:
134134

135-
{% include_example create_DataFrames r/RSparkSQLExample.R %}
135+
{% include_example create_df r/RSparkSQLExample.R %}
136136

137137
</div>
138138
</div>
@@ -180,7 +180,7 @@ In addition to simple column references and expressions, DataFrames also have a
180180

181181
<div data-lang="r" markdown="1">
182182

183-
{% include_example dataframe_operations r/RSparkSQLExample.R %}
183+
{% include_example untyped_ops r/RSparkSQLExample.R %}
184184

185185
For a complete list of the types of operations that can be performed on a DataFrame refer to the [API Documentation](api/R/index.html).
186186

@@ -214,7 +214,7 @@ The `sql` function on a `SparkSession` enables applications to run SQL queries p
214214
<div data-lang="r" markdown="1">
215215
The `sql` function enables applications to run SQL queries programmatically and returns the result as a `SparkDataFrame`.
216216

217-
{% include_example sql_query r/RSparkSQLExample.R %}
217+
{% include_example run_sql r/RSparkSQLExample.R %}
218218

219219
</div>
220220
</div>
@@ -377,7 +377,7 @@ In the simplest form, the default data source (`parquet` unless otherwise config
377377

378378
<div data-lang="r" markdown="1">
379379

380-
{% include_example source_parquet r/RSparkSQLExample.R %}
380+
{% include_example generic_load_save_functions r/RSparkSQLExample.R %}
381381

382382
</div>
383383
</div>
@@ -400,13 +400,11 @@ using this syntax.
400400
</div>
401401

402402
<div data-lang="python" markdown="1">
403-
404403
{% include_example manual_load_options python/sql/datasource.py %}
405404
</div>
406-
<div data-lang="r" markdown="1">
407-
408-
{% include_example source_json r/RSparkSQLExample.R %}
409405

406+
<div data-lang="r" markdown="1">
407+
{% include_example manual_load_options r/RSparkSQLExample.R %}
410408
</div>
411409
</div>
412410

@@ -425,13 +423,11 @@ file directly with SQL.
425423
</div>
426424

427425
<div data-lang="python" markdown="1">
428-
429426
{% include_example direct_sql python/sql/datasource.py %}
430427
</div>
431428

432429
<div data-lang="r" markdown="1">
433-
434-
{% include_example direct_query r/RSparkSQLExample.R %}
430+
{% include_example direct_sql r/RSparkSQLExample.R %}
435431

436432
</div>
437433
</div>
@@ -523,7 +519,7 @@ Using the data from the above example:
523519

524520
<div data-lang="r" markdown="1">
525521

526-
{% include_example load_programmatically r/RSparkSQLExample.R %}
522+
{% include_example basic_parquet_example r/RSparkSQLExample.R %}
527523

528524
</div>
529525

@@ -839,7 +835,7 @@ Note that the file that is offered as _a json file_ is not a typical JSON file.
839835
line must contain a separate, self-contained valid JSON object. As a consequence,
840836
a regular multi-line JSON file will most often fail.
841837

842-
{% include_example load_json_file r/RSparkSQLExample.R %}
838+
{% include_example json_dataset r/RSparkSQLExample.R %}
843839

844840
</div>
845841

@@ -925,7 +921,7 @@ You may need to grant write privilege to the user who starts the spark applicati
925921
When working with Hive one must instantiate `SparkSession` with Hive support. This
926922
adds support for finding tables in the MetaStore and writing queries using HiveQL.
927923

928-
{% include_example hive_table r/RSparkSQLExample.R %}
924+
{% include_example spark_hive r/RSparkSQLExample.R %}
929925

930926
</div>
931927
</div>
@@ -1067,43 +1063,19 @@ the Data Sources API. The following options are supported:
10671063
<div class="codetabs">
10681064

10691065
<div data-lang="scala" markdown="1">
1070-
1071-
{% highlight scala %}
1072-
val jdbcDF = spark.read.format("jdbc").options(
1073-
Map("url" -> "jdbc:postgresql:dbserver",
1074-
"dbtable" -> "schema.tablename")).load()
1075-
{% endhighlight %}
1076-
1066+
{% include_example jdbc_dataset scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
10771067
</div>
10781068

10791069
<div data-lang="java" markdown="1">
1080-
1081-
{% highlight java %}
1082-
1083-
Map<String, String> options = new HashMap<>();
1084-
options.put("url", "jdbc:postgresql:dbserver");
1085-
options.put("dbtable", "schema.tablename");
1086-
1087-
Dataset<Row> jdbcDF = spark.read().format("jdbc"). options(options).load();
1088-
{% endhighlight %}
1089-
1090-
1070+
{% include_example jdbc_dataset java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
10911071
</div>
10921072

10931073
<div data-lang="python" markdown="1">
1094-
1095-
{% highlight python %}
1096-
1097-
df = spark.read.format('jdbc').options(url='jdbc:postgresql:dbserver', dbtable='schema.tablename').load()
1098-
1099-
{% endhighlight %}
1100-
1074+
{% include_example jdbc_dataset python/sql/datasource.py %}
11011075
</div>
11021076

11031077
<div data-lang="r" markdown="1">
1104-
1105-
{% include_example jdbc r/RSparkSQLExample.R %}
1106-
1078+
{% include_example jdbc_dataset r/RSparkSQLExample.R %}
11071079
</div>
11081080

11091081
<div data-lang="sql" markdown="1">

0 commit comments

Comments
 (0)