Skip to content

Conversation

@zjffdu
Copy link
Contributor

@zjffdu zjffdu commented Aug 24, 2016

What changes were proposed in this pull request?

Spark will add sparkr.zip to archive only when it is yarn mode (SparkSubmit.scala).

    if (args.isR && clusterManager == YARN) {
      val sparkRPackagePath = RUtils.localSparkRPackagePath
      if (sparkRPackagePath.isEmpty) {
        printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.")
      }
      val sparkRPackageFile = new File(sparkRPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
      if (!sparkRPackageFile.exists()) {
        printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
      }
      val sparkRPackageURI = Utils.resolveURI(sparkRPackageFile.getAbsolutePath).toString

      // Distribute the SparkR package.
      // Assigns a symbol link name "sparkr" to the shipped package.
      args.archives = mergeFileLists(args.archives, sparkRPackageURI + "#sparkr")

      // Distribute the R package archive containing all the built R packages.
      if (!RUtils.rPackages.isEmpty) {
        val rPackageFile =
          RPackageUtils.zipRLibraries(new File(RUtils.rPackages.get), R_PACKAGE_ARCHIVE)
        if (!rPackageFile.exists()) {
          printErrorAndExit("Failed to zip all the built R packages.")
        }

        val rPackageURI = Utils.resolveURI(rPackageFile.getAbsolutePath).toString
        // Assigns a symbol link name "rpkg" to the shipped package.
        args.archives = mergeFileLists(args.archives, rPackageURI + "#rpkg")
      }
    }

So it is necessary to pass spark.master from R process to JVM. Otherwise sparkr.zip won't be distributed to executor. Besides that I also pass spark.yarn.keytab/spark.yarn.principal to spark side, because JVM process need them to access secured cluster.

How was this patch tested?

Verify it manually in R Studio using the following code.

Sys.setenv(SPARK_HOME="/Users/jzhang/github/spark")
.libPaths(c(file.path(Sys.getenv(), "R", "lib"), .libPaths()))
library(SparkR)
sparkR.session(master="yarn-client", sparkConfig = list(spark.executor.instances="1"))
df <- as.DataFrame(mtcars)
head(df)

@SparkQA
Copy link

SparkQA commented Aug 24, 2016

Test build #64339 has finished for PR 14784 at commit 20c36c4.

  • This patch fails R style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 24, 2016

Test build #64340 has finished for PR 14784 at commit 635d98d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zjffdu
Copy link
Contributor Author

zjffdu commented Aug 24, 2016

@shivaram @felixcheung Could you help review it ? Thanks

R/pkg/R/sparkR.R Outdated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as you can see on L240, "master" is passed to createSparkContext. If this is not being set as spark.master, could you track down what's going on?

Copy link
Contributor Author

@zjffdu zjffdu Aug 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is passed to createSparkContext, but it is too late as it should be passed to JVM when it is started, otherwise sparkr.zip is not added to args. archives as the above code shows. Another approach is to duplicate the code of SparkSubmit.scala in createSparkContext.

Copy link
Member

@felixcheung felixcheung Aug 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I see. So then starting programmatically with master set to cluster manager has been broken from early on then.
@shivaram I think we need to port this fix to 2.0.1?

Copy link
Member

@felixcheung felixcheung Aug 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also I'd move this

if (nzchar(master)) {
     assign("spark.master", master, envir = sparkConfigMap)

to above L357, and change as

if (nzchar(master)) {
     sparkConfigMap[["spark.master"]] <- master

This is because of the user has explicitly set
sparkR.session(spark.master = "yarn-client") it should override whatever master is (that's in L361).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felixcheung sorry, I don't get what you mean. L357 will override whatever master is, that's why I make the change after that, so that we can pass the correct master to sparkConfigMap

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I think that could go either way

scala> val a = SparkSession.builder().master("yarn").config("spark.master", "local[3]").getOrCreate()
scala> a.conf.get("master")
java.util.NoSuchElementException: master
scala> a.conf.get("spark.master")
res8: String = local[3]

I think the current R implementation mimic the programmatic behavior rather than commandline.

We could certainly change it - just want to highlight this is a breaking change if we are to silently flip the precedence order, and would think we should avoid unless if we have reason to.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That just seems like an ordering issue ? For example

> val a = SparkSession.builder().config("spark.master", "local[3]").master("yarn").getOrCreate()
> a.conf.get("spark.master")
res1: String = yarn

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is - we are turning the Scala Builder syntax of multiple function calls into one in R; we just pick the order that most prominently featured which is

SparkSession.builder().master("yarn").config("spark.master", "local[3]")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not a critical issue as it is very rare to happen. We can skip it for now. And I think it is better to do in scala side (maybe throw exception in this scenario)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shivaram @felixcheung Any more comments ?

@SparkQA
Copy link

SparkQA commented Aug 29, 2016

Test build #64566 has finished for PR 14784 at commit 986cddc.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@shivaram
Copy link
Contributor

shivaram commented Sep 8, 2016

@zjffdu Sorry for the delay. I think the change looks pretty good. We could just add a logWarning in case we find a collision ? Also could you bring this up to date with the master branch ?

@felixcheung
Copy link
Member

felixcheung commented Sep 8, 2016

@shivaram @zjffdu my preference would be to keep the existing precedence order, from our discussion above.

If we think we should change it at this point, then we should be handling everything consistently, in that other explicitly parameter should also take precedence this way, this includes: master, appName, and possibly with previously unmapped parameters like sparkHome, enableHiveSupport, sparkJars (==spark.jars), sparkPackages (==spark.jars.packages?). And we would need to update the roxygen2 doc on the change.

@zjffdu
Copy link
Contributor Author

zjffdu commented Sep 19, 2016

@shivaram @felixcheung Sorry for late response, I just rebase the PR and also take spark.master over master. Please help review.

@SparkQA
Copy link

SparkQA commented Sep 19, 2016

Test build #65589 has finished for PR 14784 at commit e635164.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

R/pkg/R/sparkR.R Outdated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused. To be concrete I think we were talking about something like:

    sparkConfigMap <- convertNamedListToEnv(sparkConfig)
    namedParams <- list(...)
    if (length(namedParams) > 0) {
      paramMap <- convertNamedListToEnv(namedParams)
     # Override for certain named parameters
     if (exists("spark.master", envir = paramMap)) {
       master <- paramMap[["spark.master"]]
     }
     if (exists("spark.app.name", envir = paramMap)) {
       appName <- paramMap[["spark.app.name"]]
     }
      overrideEnvs(sparkConfigMap, paramMap)
    }
   if (nzchar(master)) {
     sparkConfigMap[["spark.master"]] <- master
   }

R/pkg/R/sparkR.R Outdated
Copy link
Member

@felixcheung felixcheung Sep 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is from an outdated code - could you check your merge with master or edit this manually?

@SparkQA
Copy link

SparkQA commented Sep 20, 2016

Test build #65626 has finished for PR 14784 at commit c91d02a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@felixcheung
Copy link
Member

LGTM. Thanks for finding and fixing this problem.

@felixcheung
Copy link
Member

merged this to master and branch-2.0

@asfgit asfgit closed this in f62ddc5 Sep 23, 2016
asfgit pushed a commit that referenced this pull request Sep 23, 2016
… running sparkr in RStudio

## What changes were proposed in this pull request?

Spark will add sparkr.zip to archive only when it is yarn mode (SparkSubmit.scala).
```
    if (args.isR && clusterManager == YARN) {
      val sparkRPackagePath = RUtils.localSparkRPackagePath
      if (sparkRPackagePath.isEmpty) {
        printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.")
      }
      val sparkRPackageFile = new File(sparkRPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
      if (!sparkRPackageFile.exists()) {
        printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
      }
      val sparkRPackageURI = Utils.resolveURI(sparkRPackageFile.getAbsolutePath).toString

      // Distribute the SparkR package.
      // Assigns a symbol link name "sparkr" to the shipped package.
      args.archives = mergeFileLists(args.archives, sparkRPackageURI + "#sparkr")

      // Distribute the R package archive containing all the built R packages.
      if (!RUtils.rPackages.isEmpty) {
        val rPackageFile =
          RPackageUtils.zipRLibraries(new File(RUtils.rPackages.get), R_PACKAGE_ARCHIVE)
        if (!rPackageFile.exists()) {
          printErrorAndExit("Failed to zip all the built R packages.")
        }

        val rPackageURI = Utils.resolveURI(rPackageFile.getAbsolutePath).toString
        // Assigns a symbol link name "rpkg" to the shipped package.
        args.archives = mergeFileLists(args.archives, rPackageURI + "#rpkg")
      }
    }
```
So it is necessary to pass spark.master from R process to JVM. Otherwise sparkr.zip won't be distributed to executor.  Besides that I also pass spark.yarn.keytab/spark.yarn.principal to spark side, because JVM process need them to access secured cluster.

## How was this patch tested?

Verify it manually in R Studio using the following code.
```
Sys.setenv(SPARK_HOME="/Users/jzhang/github/spark")
.libPaths(c(file.path(Sys.getenv(), "R", "lib"), .libPaths()))
library(SparkR)
sparkR.session(master="yarn-client", sparkConfig = list(spark.executor.instances="1"))
df <- as.DataFrame(mtcars)
head(df)

```

…

Author: Jeff Zhang <[email protected]>

Closes #14784 from zjffdu/SPARK-17210.

(cherry picked from commit f62ddc5)
Signed-off-by: Felix Cheung <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants