-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-46400][CORE][SQL] When there are corrupted files in the local maven repo, skip this cache and try again #44343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8bdf9e1
a0848b8
fd78f4d
5a9dafc
a7f5497
44400c9
bd705f1
264a084
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,7 +27,7 @@ import org.apache.ivy.Ivy | |
| import org.apache.ivy.core.LogOptions | ||
| import org.apache.ivy.core.module.descriptor.{Artifact, DefaultDependencyDescriptor, DefaultExcludeRule, DefaultModuleDescriptor, ExcludeRule} | ||
| import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId} | ||
| import org.apache.ivy.core.report.ResolveReport | ||
| import org.apache.ivy.core.report.{DownloadStatus, ResolveReport} | ||
| import org.apache.ivy.core.resolve.ResolveOptions | ||
| import org.apache.ivy.core.retrieve.RetrieveOptions | ||
| import org.apache.ivy.core.settings.IvySettings | ||
|
|
@@ -43,8 +43,8 @@ import org.apache.spark.util.ArrayImplicits._ | |
| private[spark] object MavenUtils extends Logging { | ||
| val JAR_IVY_SETTING_PATH_KEY: String = "spark.jars.ivySettings" | ||
|
|
||
| // // Exposed for testing | ||
| // var printStream = SparkSubmit.printStream | ||
| // Exposed for testing | ||
| // var printStream = SparkSubmit.printStream | ||
|
|
||
| // Exposed for testing. | ||
| // These components are used to make the default exclusion rules for Spark dependencies. | ||
|
|
@@ -113,7 +113,7 @@ private[spark] object MavenUtils extends Logging { | |
| splits(2) != null && splits(2).trim.nonEmpty, | ||
| s"The version cannot be null or " + | ||
| s"be whitespace. The version provided is: ${splits(2)}") | ||
| new MavenCoordinate(splits(0), splits(1), splits(2)) | ||
| MavenCoordinate(splits(0), splits(1), splits(2)) | ||
| }.toImmutableArraySeq | ||
| } | ||
|
|
||
|
|
@@ -128,24 +128,30 @@ private[spark] object MavenUtils extends Logging { | |
| } | ||
|
|
||
| /** | ||
| * Extracts maven coordinates from a comma-delimited string | ||
| * Create a ChainResolver used by Ivy to search for and resolve dependencies. | ||
| * | ||
| * @param defaultIvyUserDir | ||
| * The default user path for Ivy | ||
| * @param useLocalM2AsCache | ||
| * Whether to use the local maven repo as a cache | ||
| * @return | ||
| * A ChainResolver used by Ivy to search for and resolve dependencies. | ||
| */ | ||
| private[util] def createRepoResolvers(defaultIvyUserDir: File): ChainResolver = { | ||
| private[util] def createRepoResolvers( | ||
| defaultIvyUserDir: File, | ||
| useLocalM2AsCache: Boolean = true): ChainResolver = { | ||
| // We need a chain resolver if we want to check multiple repositories | ||
| val cr = new ChainResolver | ||
| cr.setName("spark-list") | ||
|
|
||
| val localM2 = new IBiblioResolver | ||
| localM2.setM2compatible(true) | ||
| localM2.setRoot(m2Path.toURI.toString) | ||
| localM2.setUsepoms(true) | ||
| localM2.setName("local-m2-cache") | ||
| cr.add(localM2) | ||
| if (useLocalM2AsCache) { | ||
| val localM2 = new IBiblioResolver | ||
| localM2.setM2compatible(true) | ||
| localM2.setRoot(m2Path.toURI.toString) | ||
| localM2.setUsepoms(true) | ||
| localM2.setName("local-m2-cache") | ||
| cr.add(localM2) | ||
| } | ||
|
|
||
| val localIvy = new FileSystemResolver | ||
| val localIvyRoot = new File(defaultIvyUserDir, "local") | ||
|
|
@@ -263,18 +269,22 @@ private[spark] object MavenUtils extends Logging { | |
| * Comma-delimited string of remote repositories other than maven central | ||
| * @param ivyPath | ||
| * The path to the local ivy repository | ||
| * @param useLocalM2AsCache | ||
| * Whether or not use `local-m2 repo` as cache | ||
| * @return | ||
| * An IvySettings object | ||
| */ | ||
| def buildIvySettings(remoteRepos: Option[String], ivyPath: Option[String])(implicit | ||
| printStream: PrintStream): IvySettings = { | ||
| def buildIvySettings( | ||
| remoteRepos: Option[String], | ||
| ivyPath: Option[String], | ||
| useLocalM2AsCache: Boolean = true)(implicit printStream: PrintStream): IvySettings = { | ||
| val ivySettings: IvySettings = new IvySettings | ||
| processIvyPathArg(ivySettings, ivyPath) | ||
|
|
||
| // create a pattern matcher | ||
| ivySettings.addMatcher(new GlobPatternMatcher) | ||
| // create the dependency resolvers | ||
| val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir) | ||
| val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir, useLocalM2AsCache) | ||
| ivySettings.addResolver(repoResolver) | ||
| ivySettings.setDefaultResolver(repoResolver.getName) | ||
| processRemoteRepoArg(ivySettings, remoteRepos) | ||
|
|
@@ -310,7 +320,7 @@ private[spark] object MavenUtils extends Logging { | |
| JAR_IVY_SETTING_PATH_KEY) | ||
| } | ||
| require(file.exists(), s"Ivy settings file $file does not exist") | ||
| require(file.isFile(), s"Ivy settings file $file is not a normal file") | ||
| require(file.isFile, s"Ivy settings file $file is not a normal file") | ||
| val ivySettings: IvySettings = new IvySettings | ||
| try { | ||
| ivySettings.load(file) | ||
|
|
@@ -351,7 +361,7 @@ private[spark] object MavenUtils extends Logging { | |
| cr.add(brr) | ||
| // scalastyle:off println | ||
| printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}") | ||
| // scalastyle:on println | ||
| // scalastyle:on println | ||
| } | ||
|
|
||
| ivySettings.addResolver(cr) | ||
|
|
@@ -376,14 +386,38 @@ private[spark] object MavenUtils extends Logging { | |
| */ | ||
| private def clearIvyResolutionFiles( | ||
| mdId: ModuleRevisionId, | ||
| ivySettings: IvySettings, | ||
| defaultCacheFile: File, | ||
| ivyConfName: String): Unit = { | ||
| val currentResolutionFiles = Seq( | ||
| s"${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml", | ||
| s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.xml", | ||
| s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.properties") | ||
| currentResolutionFiles.foreach { filename => | ||
| new File(ivySettings.getDefaultCache, filename).delete() | ||
| new File(defaultCacheFile, filename).delete() | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Clear invalid cache files in ivy. The cache file is usually at | ||
| * ~/.ivy2/cache/${groupId}/${artifactId}/ivy-${version}.xml, | ||
| * ~/.ivy2/cache/${groupId}/${artifactId}/ivy-${version}.xml.original, and | ||
| * ~/.ivy2/cache/${groupId}/${artifactId}/ivydata-${version}.properties. | ||
| * Because when using `local-m2` repo as a cache, some invalid files were created. | ||
| * If not deleted here, an error prompt similar to `unknown resolver local-m2-cache` | ||
| * will be generated, making some confusion for users. | ||
| */ | ||
| private def clearInvalidIvyCacheFiles( | ||
| mdId: ModuleRevisionId, | ||
| defaultCacheFile: File): Unit = { | ||
| val cacheFiles = Seq( | ||
| s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" + | ||
| s"ivy-${mdId.getRevision}.xml", | ||
| s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" + | ||
| s"ivy-${mdId.getRevision}.xml.original", | ||
| s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" + | ||
| s"ivydata-${mdId.getRevision}.properties") | ||
| cacheFiles.foreach { filename => | ||
| new File(defaultCacheFile, filename).delete() | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -394,6 +428,8 @@ private[spark] object MavenUtils extends Logging { | |
| * Comma-delimited string of maven coordinates | ||
| * @param ivySettings | ||
| * An IvySettings containing resolvers to use | ||
| * @param noCacheIvySettings | ||
| * An no-cache(local-m2-cache) IvySettings containing resolvers to use | ||
| * @param transitive | ||
| * Whether resolving transitive dependencies, default is true | ||
| * @param exclusions | ||
|
|
@@ -405,6 +441,7 @@ private[spark] object MavenUtils extends Logging { | |
| def resolveMavenCoordinates( | ||
| coordinates: String, | ||
| ivySettings: IvySettings, | ||
| noCacheIvySettings: Option[IvySettings] = None, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where was a non-None
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added the input for
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, this is my bad, it should be passed in when
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The above issues have been resolved. |
||
| transitive: Boolean, | ||
| exclusions: Seq[String] = Nil, | ||
| isTest: Boolean = false)(implicit printStream: PrintStream): Seq[String] = { | ||
|
|
@@ -433,6 +470,8 @@ private[spark] object MavenUtils extends Logging { | |
| // scalastyle:on println | ||
|
|
||
| val ivy = Ivy.newInstance(ivySettings) | ||
| ivy.pushContext() | ||
|
|
||
| // Set resolve options to download transitive dependencies as well | ||
| val resolveOptions = new ResolveOptions | ||
| resolveOptions.setTransitive(transitive) | ||
|
|
@@ -445,6 +484,11 @@ private[spark] object MavenUtils extends Logging { | |
| } else { | ||
| resolveOptions.setDownload(true) | ||
| } | ||
| // retrieve all resolved dependencies | ||
| retrieveOptions.setDestArtifactPattern( | ||
| packagesDirectory.getAbsolutePath + File.separator + | ||
| "[organization]_[artifact]-[revision](-[classifier]).[ext]") | ||
| retrieveOptions.setConfs(Array(ivyConfName)) | ||
|
|
||
| // Add exclusion rules for Spark and Scala Library | ||
| addExclusionRules(ivySettings, ivyConfName, md) | ||
|
|
@@ -456,19 +500,44 @@ private[spark] object MavenUtils extends Logging { | |
| // resolve dependencies | ||
| val rr: ResolveReport = ivy.resolve(md, resolveOptions) | ||
| if (rr.hasError) { | ||
| throw new RuntimeException(rr.getAllProblemMessages.toString) | ||
| // SPARK-46302: When there are some corrupted jars in the local maven repo, | ||
| // we try to continue without the cache | ||
| val failedReports = rr.getArtifactsReports(DownloadStatus.FAILED, true) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My concern is that if the submission machine has a .m2 dir, it will most likely enter the retry process of without local-m2-cache because the local-m2-cache produced from building spark distribution also has this issues now. Perhaps it's difficult to obtain a perfect local-m2-cache now. |
||
| if (failedReports.nonEmpty && noCacheIvySettings.isDefined) { | ||
| val failedArtifacts = failedReports.map(r => r.getArtifact) | ||
| logInfo(s"Download failed: ${failedArtifacts.mkString("[", ", ", "]")}, " + | ||
| s"attempt to retry while skipping local-m2-cache.") | ||
| failedArtifacts.foreach(artifact => { | ||
| clearInvalidIvyCacheFiles(artifact.getModuleRevisionId, ivySettings.getDefaultCache) | ||
| }) | ||
| ivy.popContext() | ||
|
|
||
| val noCacheIvy = Ivy.newInstance(noCacheIvySettings.get) | ||
| noCacheIvy.pushContext() | ||
|
|
||
| val noCacheRr = noCacheIvy.resolve(md, resolveOptions) | ||
| if (noCacheRr.hasError) { | ||
| throw new RuntimeException(noCacheRr.getAllProblemMessages.toString) | ||
| } | ||
| noCacheIvy.retrieve(noCacheRr.getModuleDescriptor.getModuleRevisionId, retrieveOptions) | ||
| val dependencyPaths = resolveDependencyPaths( | ||
| noCacheRr.getArtifacts.toArray, packagesDirectory) | ||
| noCacheIvy.popContext() | ||
|
|
||
| dependencyPaths | ||
| } else { | ||
| throw new RuntimeException(rr.getAllProblemMessages.toString) | ||
| } | ||
| } else { | ||
| ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId, retrieveOptions) | ||
| val dependencyPaths = resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory) | ||
| ivy.popContext() | ||
|
|
||
| dependencyPaths | ||
| } | ||
| // retrieve all resolved dependencies | ||
| retrieveOptions.setDestArtifactPattern( | ||
| packagesDirectory.getAbsolutePath + File.separator + | ||
| "[organization]_[artifact]-[revision](-[classifier]).[ext]") | ||
| ivy.retrieve( | ||
| rr.getModuleDescriptor.getModuleRevisionId, | ||
| retrieveOptions.setConfs(Array(ivyConfName))) | ||
| resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory) | ||
| } finally { | ||
| System.setOut(sysOut) | ||
| clearIvyResolutionFiles(md.getModuleRevisionId, ivySettings, ivyConfName) | ||
| clearIvyResolutionFiles(md.getModuleRevisionId, ivySettings.getDefaultCache, ivyConfName) | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -477,37 +546,37 @@ private[spark] object MavenUtils extends Logging { | |
| coords: String, | ||
| ivySettings: IvySettings, | ||
| ivyConfName: String): ExcludeRule = { | ||
| val c = extractMavenCoordinates(coords)(0) | ||
| val c = extractMavenCoordinates(coords).head | ||
| val id = new ArtifactId(new ModuleId(c.groupId, c.artifactId), "*", "*", "*") | ||
| val rule = new DefaultExcludeRule(id, ivySettings.getMatcher("glob"), null) | ||
| rule.addConfiguration(ivyConfName) | ||
| rule | ||
| } | ||
|
|
||
| def isInvalidQueryString(tokens: Array[String]): Boolean = { | ||
| private def isInvalidQueryString(tokens: Array[String]): Boolean = { | ||
| tokens.length != 2 || StringUtils.isBlank(tokens(0)) || StringUtils.isBlank(tokens(1)) | ||
| } | ||
|
|
||
| /** | ||
| * Parse URI query string's parameter value of `transitive` and `exclude`. Other invalid | ||
| * parameters will be ignored. | ||
| * Parse URI query string's parameter value of `transitive`, `exclude` and `repos`. | ||
| * Other invalid parameters will be ignored. | ||
| * | ||
| * @param uri | ||
| * Ivy URI need to be downloaded. | ||
| * @return | ||
| * Tuple value of parameter `transitive` and `exclude` value. | ||
| * Tuple value of parameter `transitive`, `exclude` and `repos` value. | ||
| * | ||
| * 1. transitive: whether to download dependency jar of Ivy URI, default value is true and | ||
| * this parameter value is case-insensitive. This mimics Hive's behaviour for parsing the | ||
| * transitive parameter. Invalid value will be treat as false. Example: Input: | ||
| * exclude=org.mortbay.jetty:jetty&transitive=true Output: true | ||
| * | ||
| * 2. exclude: comma separated exclusions to apply when resolving transitive dependencies, | ||
| * consists of `group:module` pairs separated by commas. Example: Input: | ||
| * excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http Output: | ||
| * [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http] | ||
| * 2. exclude: comma separated exclusions to apply when resolving transitive dependencies, | ||
| * consists of `group:module` pairs separated by commas. Example: Input: | ||
| * excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http Output: | ||
| * [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http] | ||
| * | ||
| * 3. repos: comma separated repositories to use when resolving dependencies. | ||
| * 3. repos: comma separated repositories to use when resolving dependencies. | ||
| */ | ||
| def parseQueryParams(uri: URI): (Boolean, String, String) = { | ||
| val uriQuery = uri.getQuery | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original code comments were incorrect.