From 8bdf9e14d885900ede891f8b99b8e30387e2a0fe Mon Sep 17 00:00:00 2001 From: panbingkun Date: Thu, 14 Dec 2023 11:00:14 +0800 Subject: [PATCH 1/5] [SPARK-46400][CORE][SQL] When there are corrupted files in the local maven repo, try to skip this cache --- .../org/apache/spark/util/MavenUtils.scala | 53 ++++++++++++++----- 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala b/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala index 2d7fba6f07d5b..ff158c1cbce01 100644 --- a/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala +++ b/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala @@ -27,7 +27,7 @@ import org.apache.ivy.Ivy import org.apache.ivy.core.LogOptions import org.apache.ivy.core.module.descriptor.{Artifact, DefaultDependencyDescriptor, DefaultExcludeRule, DefaultModuleDescriptor, ExcludeRule} import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId} -import org.apache.ivy.core.report.ResolveReport +import org.apache.ivy.core.report.{DownloadStatus, ResolveReport} import org.apache.ivy.core.resolve.ResolveOptions import org.apache.ivy.core.retrieve.RetrieveOptions import org.apache.ivy.core.settings.IvySettings @@ -132,20 +132,26 @@ private[spark] object MavenUtils extends Logging { * * @param defaultIvyUserDir * The default user path for Ivy + * @param useLocalM2AsCache + * Whether to use the local maven repo as a cache * @return * A ChainResolver used by Ivy to search for and resolve dependencies. */ - private[util] def createRepoResolvers(defaultIvyUserDir: File): ChainResolver = { + private[util] def createRepoResolvers( + defaultIvyUserDir: File, + useLocalM2AsCache: Boolean = true): ChainResolver = { // We need a chain resolver if we want to check multiple repositories val cr = new ChainResolver cr.setName("spark-list") - val localM2 = new IBiblioResolver - localM2.setM2compatible(true) - localM2.setRoot(m2Path.toURI.toString) - localM2.setUsepoms(true) - localM2.setName("local-m2-cache") - cr.add(localM2) + if (useLocalM2AsCache) { + val localM2 = new IBiblioResolver + localM2.setM2compatible(true) + localM2.setRoot(m2Path.toURI.toString) + localM2.setUsepoms(true) + localM2.setName("local-m2-cache") + cr.add(localM2) + } val localIvy = new FileSystemResolver val localIvyRoot = new File(defaultIvyUserDir, "local") @@ -266,15 +272,17 @@ private[spark] object MavenUtils extends Logging { * @return * An IvySettings object */ - def buildIvySettings(remoteRepos: Option[String], ivyPath: Option[String])(implicit - printStream: PrintStream): IvySettings = { + def buildIvySettings( + remoteRepos: Option[String], + ivyPath: Option[String], + useLocalM2AsCache: Boolean = true)(implicit printStream: PrintStream): IvySettings = { val ivySettings: IvySettings = new IvySettings processIvyPathArg(ivySettings, ivyPath) // create a pattern matcher ivySettings.addMatcher(new GlobPatternMatcher) // create the dependency resolvers - val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir) + val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir, useLocalM2AsCache) ivySettings.addResolver(repoResolver) ivySettings.setDefaultResolver(repoResolver.getName) processRemoteRepoArg(ivySettings, remoteRepos) @@ -394,6 +402,8 @@ private[spark] object MavenUtils extends Logging { * Comma-delimited string of maven coordinates * @param ivySettings * An IvySettings containing resolvers to use + * @param noCacheIvySettings + * An no-cache IvySettings containing resolvers to use * @param transitive * Whether resolving transitive dependencies, default is true * @param exclusions @@ -405,6 +415,7 @@ private[spark] object MavenUtils extends Logging { def resolveMavenCoordinates( coordinates: String, ivySettings: IvySettings, + noCacheIvySettings: Option[IvySettings] = None, transitive: Boolean, exclusions: Seq[String] = Nil, isTest: Boolean = false)(implicit printStream: PrintStream): Seq[String] = { @@ -432,7 +443,7 @@ private[spark] object MavenUtils extends Logging { printStream.println(s"The jars for the packages stored in: $packagesDirectory") // scalastyle:on println - val ivy = Ivy.newInstance(ivySettings) + var ivy = Ivy.newInstance(ivySettings) // Set resolve options to download transitive dependencies as well val resolveOptions = new ResolveOptions resolveOptions.setTransitive(transitive) @@ -454,9 +465,23 @@ private[spark] object MavenUtils extends Logging { md.addExcludeRule(createExclusion(e + ":*", ivySettings, ivyConfName)) } // resolve dependencies - val rr: ResolveReport = ivy.resolve(md, resolveOptions) + var rr: ResolveReport = ivy.resolve(md, resolveOptions) if (rr.hasError) { - throw new RuntimeException(rr.getAllProblemMessages.toString) + // SPARK-46302: When there are some corrupted jars in the maven repo, + // we try to continue without the cache + val failedReports = rr.getArtifactsReports(DownloadStatus.FAILED, true) + if (failedReports.nonEmpty && noCacheIvySettings.isDefined) { + val failedArtifacts = failedReports.map( + fr => fr.getArtifact).mkString("[", ", ", "]") + logInfo(s"Download failed: $failedArtifacts, attempt to skip local maven cache.") + ivy = Ivy.newInstance(noCacheIvySettings.get) + rr = ivy.resolve(md, resolveOptions) + if (rr.hasError) { + throw new RuntimeException(rr.getAllProblemMessages.toString) + } + } else { + throw new RuntimeException(rr.getAllProblemMessages.toString) + } } // retrieve all resolved dependencies retrieveOptions.setDestArtifactPattern( From a0848b8c455ea01661116694cc07587ab70530af Mon Sep 17 00:00:00 2001 From: panbingkun Date: Thu, 14 Dec 2023 11:11:45 +0800 Subject: [PATCH 2/5] Trigger build From fd78f4d94e54489bc8405f01c1f53b7da512d7ae Mon Sep 17 00:00:00 2001 From: panbingkun Date: Thu, 14 Dec 2023 11:14:26 +0800 Subject: [PATCH 3/5] Trigger build From a7f54973c0192e58ade9fa7adf6cfe64134f329a Mon Sep 17 00:00:00 2001 From: panbingkun Date: Fri, 15 Dec 2023 09:08:54 +0800 Subject: [PATCH 4/5] update --- .../src/main/scala/org/apache/spark/util/MavenUtils.scala | 4 ++-- .../apache/spark/sql/hive/client/IsolatedClientLoader.scala | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala b/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala index ff158c1cbce01..6fd291439d5ee 100644 --- a/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala +++ b/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala @@ -403,7 +403,7 @@ private[spark] object MavenUtils extends Logging { * @param ivySettings * An IvySettings containing resolvers to use * @param noCacheIvySettings - * An no-cache IvySettings containing resolvers to use + * An no-cache IvySettings containing resolvers to use * @param transitive * Whether resolving transitive dependencies, default is true * @param exclusions @@ -473,7 +473,7 @@ private[spark] object MavenUtils extends Logging { if (failedReports.nonEmpty && noCacheIvySettings.isDefined) { val failedArtifacts = failedReports.map( fr => fr.getArtifact).mkString("[", ", ", "]") - logInfo(s"Download failed: $failedArtifacts, attempt to skip local maven cache.") + logInfo(s"Download failed: $failedArtifacts, attempt to skip local-m2-cache.") ivy = Ivy.newInstance(noCacheIvySettings.get) rr = ivy.resolve(md, resolveOptions) if (rr.hasError) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 74b33e6437fb6..337c3d529d57e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -134,6 +134,9 @@ private[hive] object IsolatedClientLoader extends Logging { MavenUtils.buildIvySettings( Some(remoteRepos), ivyPath), + Some(MavenUtils.buildIvySettings( + Some(remoteRepos), + ivyPath, useLocalM2AsCache = false)), transitive = true, exclusions = version.exclusions) } From 44400c9703e778f2a576834cf23eb8f1eb138411 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Sun, 17 Dec 2023 12:28:57 +0800 Subject: [PATCH 5/5] [SPARK-46400][CORE][SQL] When there are corrupted files in the local maven repo, skip this cache and try again --- .../org/apache/spark/util/MavenUtils.scala | 120 ++++++++++++------ .../hive/client/IsolatedClientLoader.scala | 3 +- 2 files changed, 84 insertions(+), 39 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala b/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala index 6fd291439d5ee..65530b7fa4739 100644 --- a/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala +++ b/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala @@ -43,8 +43,8 @@ import org.apache.spark.util.ArrayImplicits._ private[spark] object MavenUtils extends Logging { val JAR_IVY_SETTING_PATH_KEY: String = "spark.jars.ivySettings" -// // Exposed for testing -// var printStream = SparkSubmit.printStream + // Exposed for testing + // var printStream = SparkSubmit.printStream // Exposed for testing. // These components are used to make the default exclusion rules for Spark dependencies. @@ -113,7 +113,7 @@ private[spark] object MavenUtils extends Logging { splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " + s"be whitespace. The version provided is: ${splits(2)}") - new MavenCoordinate(splits(0), splits(1), splits(2)) + MavenCoordinate(splits(0), splits(1), splits(2)) }.toImmutableArraySeq } @@ -128,7 +128,7 @@ private[spark] object MavenUtils extends Logging { } /** - * Extracts maven coordinates from a comma-delimited string + * Create a ChainResolver used by Ivy to search for and resolve dependencies. * * @param defaultIvyUserDir * The default user path for Ivy @@ -269,6 +269,8 @@ private[spark] object MavenUtils extends Logging { * Comma-delimited string of remote repositories other than maven central * @param ivyPath * The path to the local ivy repository + * @param useLocalM2AsCache + * Whether or not use `local-m2 repo` as cache * @return * An IvySettings object */ @@ -318,7 +320,7 @@ private[spark] object MavenUtils extends Logging { JAR_IVY_SETTING_PATH_KEY) } require(file.exists(), s"Ivy settings file $file does not exist") - require(file.isFile(), s"Ivy settings file $file is not a normal file") + require(file.isFile, s"Ivy settings file $file is not a normal file") val ivySettings: IvySettings = new IvySettings try { ivySettings.load(file) @@ -359,7 +361,7 @@ private[spark] object MavenUtils extends Logging { cr.add(brr) // scalastyle:off println printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}") - // scalastyle:on println + // scalastyle:on println } ivySettings.addResolver(cr) @@ -384,14 +386,38 @@ private[spark] object MavenUtils extends Logging { */ private def clearIvyResolutionFiles( mdId: ModuleRevisionId, - ivySettings: IvySettings, + defaultCacheFile: File, ivyConfName: String): Unit = { val currentResolutionFiles = Seq( s"${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml", s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.xml", s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.properties") currentResolutionFiles.foreach { filename => - new File(ivySettings.getDefaultCache, filename).delete() + new File(defaultCacheFile, filename).delete() + } + } + + /** + * Clear invalid cache files in ivy. The cache file is usually at + * ~/.ivy2/cache/${groupId}/${artifactId}/ivy-${version}.xml, + * ~/.ivy2/cache/${groupId}/${artifactId}/ivy-${version}.xml.original, and + * ~/.ivy2/cache/${groupId}/${artifactId}/ivydata-${version}.properties. + * Because when using `local-m2` repo as a cache, some invalid files were created. + * If not deleted here, an error prompt similar to `unknown resolver local-m2-cache` + * will be generated, making some confusion for users. + */ + private def clearInvalidIvyCacheFiles( + mdId: ModuleRevisionId, + defaultCacheFile: File): Unit = { + val cacheFiles = Seq( + s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" + + s"ivy-${mdId.getRevision}.xml", + s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" + + s"ivy-${mdId.getRevision}.xml.original", + s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" + + s"ivydata-${mdId.getRevision}.properties") + cacheFiles.foreach { filename => + new File(defaultCacheFile, filename).delete() } } @@ -403,7 +429,7 @@ private[spark] object MavenUtils extends Logging { * @param ivySettings * An IvySettings containing resolvers to use * @param noCacheIvySettings - * An no-cache IvySettings containing resolvers to use + * An no-cache(local-m2-cache) IvySettings containing resolvers to use * @param transitive * Whether resolving transitive dependencies, default is true * @param exclusions @@ -443,7 +469,9 @@ private[spark] object MavenUtils extends Logging { printStream.println(s"The jars for the packages stored in: $packagesDirectory") // scalastyle:on println - var ivy = Ivy.newInstance(ivySettings) + val ivy = Ivy.newInstance(ivySettings) + ivy.pushContext() + // Set resolve options to download transitive dependencies as well val resolveOptions = new ResolveOptions resolveOptions.setTransitive(transitive) @@ -456,6 +484,11 @@ private[spark] object MavenUtils extends Logging { } else { resolveOptions.setDownload(true) } + // retrieve all resolved dependencies + retrieveOptions.setDestArtifactPattern( + packagesDirectory.getAbsolutePath + File.separator + + "[organization]_[artifact]-[revision](-[classifier]).[ext]") + retrieveOptions.setConfs(Array(ivyConfName)) // Add exclusion rules for Spark and Scala Library addExclusionRules(ivySettings, ivyConfName, md) @@ -465,35 +498,46 @@ private[spark] object MavenUtils extends Logging { md.addExcludeRule(createExclusion(e + ":*", ivySettings, ivyConfName)) } // resolve dependencies - var rr: ResolveReport = ivy.resolve(md, resolveOptions) + val rr: ResolveReport = ivy.resolve(md, resolveOptions) if (rr.hasError) { - // SPARK-46302: When there are some corrupted jars in the maven repo, + // SPARK-46302: When there are some corrupted jars in the local maven repo, // we try to continue without the cache val failedReports = rr.getArtifactsReports(DownloadStatus.FAILED, true) if (failedReports.nonEmpty && noCacheIvySettings.isDefined) { - val failedArtifacts = failedReports.map( - fr => fr.getArtifact).mkString("[", ", ", "]") - logInfo(s"Download failed: $failedArtifacts, attempt to skip local-m2-cache.") - ivy = Ivy.newInstance(noCacheIvySettings.get) - rr = ivy.resolve(md, resolveOptions) - if (rr.hasError) { - throw new RuntimeException(rr.getAllProblemMessages.toString) + val failedArtifacts = failedReports.map(r => r.getArtifact) + logInfo(s"Download failed: ${failedArtifacts.mkString("[", ", ", "]")}, " + + s"attempt to retry while skipping local-m2-cache.") + failedArtifacts.foreach(artifact => { + clearInvalidIvyCacheFiles(artifact.getModuleRevisionId, ivySettings.getDefaultCache) + }) + ivy.popContext() + + val noCacheIvy = Ivy.newInstance(noCacheIvySettings.get) + noCacheIvy.pushContext() + + val noCacheRr = noCacheIvy.resolve(md, resolveOptions) + if (noCacheRr.hasError) { + throw new RuntimeException(noCacheRr.getAllProblemMessages.toString) } + noCacheIvy.retrieve(noCacheRr.getModuleDescriptor.getModuleRevisionId, retrieveOptions) + val dependencyPaths = resolveDependencyPaths( + noCacheRr.getArtifacts.toArray, packagesDirectory) + noCacheIvy.popContext() + + dependencyPaths } else { throw new RuntimeException(rr.getAllProblemMessages.toString) } + } else { + ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId, retrieveOptions) + val dependencyPaths = resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory) + ivy.popContext() + + dependencyPaths } - // retrieve all resolved dependencies - retrieveOptions.setDestArtifactPattern( - packagesDirectory.getAbsolutePath + File.separator + - "[organization]_[artifact]-[revision](-[classifier]).[ext]") - ivy.retrieve( - rr.getModuleDescriptor.getModuleRevisionId, - retrieveOptions.setConfs(Array(ivyConfName))) - resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory) } finally { System.setOut(sysOut) - clearIvyResolutionFiles(md.getModuleRevisionId, ivySettings, ivyConfName) + clearIvyResolutionFiles(md.getModuleRevisionId, ivySettings.getDefaultCache, ivyConfName) } } } @@ -502,37 +546,37 @@ private[spark] object MavenUtils extends Logging { coords: String, ivySettings: IvySettings, ivyConfName: String): ExcludeRule = { - val c = extractMavenCoordinates(coords)(0) + val c = extractMavenCoordinates(coords).head val id = new ArtifactId(new ModuleId(c.groupId, c.artifactId), "*", "*", "*") val rule = new DefaultExcludeRule(id, ivySettings.getMatcher("glob"), null) rule.addConfiguration(ivyConfName) rule } - def isInvalidQueryString(tokens: Array[String]): Boolean = { + private def isInvalidQueryString(tokens: Array[String]): Boolean = { tokens.length != 2 || StringUtils.isBlank(tokens(0)) || StringUtils.isBlank(tokens(1)) } /** - * Parse URI query string's parameter value of `transitive` and `exclude`. Other invalid - * parameters will be ignored. + * Parse URI query string's parameter value of `transitive`, `exclude` and `repos`. + * Other invalid parameters will be ignored. * * @param uri * Ivy URI need to be downloaded. * @return - * Tuple value of parameter `transitive` and `exclude` value. + * Tuple value of parameter `transitive`, `exclude` and `repos` value. * * 1. transitive: whether to download dependency jar of Ivy URI, default value is true and * this parameter value is case-insensitive. This mimics Hive's behaviour for parsing the * transitive parameter. Invalid value will be treat as false. Example: Input: * exclude=org.mortbay.jetty:jetty&transitive=true Output: true * - * 2. exclude: comma separated exclusions to apply when resolving transitive dependencies, - * consists of `group:module` pairs separated by commas. Example: Input: - * excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http Output: - * [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http] + * 2. exclude: comma separated exclusions to apply when resolving transitive dependencies, + * consists of `group:module` pairs separated by commas. Example: Input: + * excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http Output: + * [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http] * - * 3. repos: comma separated repositories to use when resolving dependencies. + * 3. repos: comma separated repositories to use when resolving dependencies. */ def parseQueryParams(uri: URI): (Boolean, String, String) = { val uriQuery = uri.getQuery diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 337c3d529d57e..e88ba451c169e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -136,7 +136,8 @@ private[hive] object IsolatedClientLoader extends Logging { ivyPath), Some(MavenUtils.buildIvySettings( Some(remoteRepos), - ivyPath, useLocalM2AsCache = false)), + ivyPath, + useLocalM2AsCache = false)), transitive = true, exclusions = version.exclusions) }