Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.net.{HttpURLConnection, URI, URL}
import java.nio.charset.StandardCharsets
import java.security.SecureRandom
import java.security.cert.X509Certificate
import java.util.Arrays
import java.util.{Arrays, Properties}
import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
import java.util.jar.{JarEntry, JarOutputStream}
import javax.net.ssl._
Expand All @@ -35,6 +35,7 @@ import scala.sys.process.{Process, ProcessLogger}
import scala.util.Try

import com.google.common.io.{ByteStreams, Files}
import org.apache.log4j.PropertyConfigurator

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -256,6 +257,29 @@ private[spark] object TestUtils {
s"Can't find $numExecutors executors before $timeout milliseconds elapsed")
}

/**
* config a log4j properties used for testsuite
*/
def configTestLog4j(level: String): Unit = {
val pro = new Properties()
pro.put("log4j.rootLogger", s"$level, console")
pro.put("log4j.appender.console", "org.apache.log4j.ConsoleAppender")
pro.put("log4j.appender.console.target", "System.err")
pro.put("log4j.appender.console.layout", "org.apache.log4j.PatternLayout")
pro.put("log4j.appender.console.layout.ConversionPattern",
"%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n")
PropertyConfigurator.configure(pro)
}

/**
* Lists files recursively.
*/
def recursiveList(f: File): Array[File] = {
require(f.isDirectory)
val current = f.listFiles
current ++ current.filter(_.isDirectory).flatMap(recursiveList)
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

import java.io.{ByteArrayOutputStream, PrintStream}
import java.io.{ByteArrayOutputStream, File, PrintStream}
import java.lang.reflect.InvocationTargetException
import java.net.URI
import java.nio.charset.StandardCharsets
Expand Down Expand Up @@ -233,7 +233,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
// Set name from main class if not given
name = Option(name).orElse(Option(mainClass)).orNull
if (name == null && primaryResource != null) {
name = Utils.stripDirectory(primaryResource)
name = new File(primaryResource).getName()
}

// Action should be SUBMIT unless otherwise specified
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task, TaskDescription}
Expand Down Expand Up @@ -141,8 +142,7 @@ private[spark] class Executor(
conf.getSizeAsBytes("spark.task.maxDirectResultSize", 1L << 20),
RpcUtils.maxMessageSizeBytes(conf))

// Limit of bytes for total size of results (default is 1GB)
private val maxResultSize = Utils.getMaxResultSize(conf)
private val maxResultSize = conf.get(MAX_RESULT_SIZE)

// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.regex.PatternSyntaxException
import scala.util.matching.Regex

import org.apache.spark.network.util.{ByteUnit, JavaUtils}
import org.apache.spark.util.Utils

private object ConfigHelpers {

Expand All @@ -45,7 +46,7 @@ private object ConfigHelpers {
}

def stringToSeq[T](str: String, converter: String => T): Seq[T] = {
str.split(",").map(_.trim()).filter(_.nonEmpty).map(converter)
Utils.stringToSeq(str).map(converter)
}

def seqToString[T](v: Seq[T], stringConverter: T => String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,4 +520,9 @@ package object config {
.checkValue(v => v > 0, "The threshold should be positive.")
.createWithDefault(10000000)

private[spark] val MAX_RESULT_SIZE = ConfigBuilder("spark.driver.maxResultSize")
.doc("Size limit for results.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1g")

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ private[spark] class TaskSetManager(
val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75)
val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5)

// Limit of bytes for total size of results (default is 1GB)
val maxResultSize = Utils.getMaxResultSize(conf)
val maxResultSize = conf.get(config.MAX_RESULT_SIZE)

val speculationEnabled = conf.getBoolean("spark.speculation", false)

Expand Down
Loading