Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ class StreamingContext private[streaming] (
}
}

if (sc.conf.contains("spark.cores.max")) {
val totalCores = sc.conf.getInt("spark.cores.max", 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, "spark.cores.max" is not set by default, with your change it will always return 1 if not set and throws exception as you wrote.

Besides if there're several receivers in streaming application, even > 1 is not sufficient. So simply checking "spark.cores.max" is not a feasible way to address the issue, and there may not be a good way to handle this problem properly, so I'd suggest to leave the current code as it is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The config spark.cores.max is used to limit the max number of cores that a single executor can require, and as @jerryshao pointed out, it's not set by default. I'm not convinced that validating the config can benefit your issue.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerryshao spark.cores.max will be returned only when conf contain it.
Here is the biggest possibility to make a judgment.

Copy link
Contributor

@jerryshao jerryshao Aug 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SOmeONee I'm not quite following your comment here. You assume that default value is "1" which changes the semantics of this configuration. What's more, checking whether "<=1" is not so sufficient as I mentioned above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jiangxb1987 "spark.cores.max" is per application configuration to limit the numbers of cores can be requested for this application, it is not a per executor limitation.

The config spark.cores.max is used to limit the max number of cores that a single executor can require

So still if we have 2 receivers in one streaming application, the minimum number should > 2, checking "1" here is still not feasible.

Since receiver number can only be gotten in run-time, checking configuration will not be worked as expected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, seems the validation here is not very useful.

if (totalCores <= 1) {
throw new SparkException(" Total executor cores (spark.cores.max) must greater than 1")
}
}

if (sc.conf.get("spark.master") == "local" || sc.conf.get("spark.master") == "local[1]") {
logWarning("spark.master should be set as local[n], n > 1 in local mode if you have receivers" +
" to get data, otherwise Spark jobs will not get resources to process the received data.")
Expand Down