Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -210,7 +211,7 @@ private static boolean isSymlink(File file) throws IOException {
* The unit is also considered the default if the given string does not specify a unit.
*/
public static long timeStringAs(String str, TimeUnit unit) {
String lower = str.toLowerCase().trim();
String lower = str.toLowerCase(Locale.ROOT).trim();

try {
Matcher m = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(lower);
Expand Down Expand Up @@ -258,7 +259,7 @@ public static long timeStringAsSec(String str) {
* provided, a direct conversion to the provided unit is attempted.
*/
public static long byteStringAs(String str, ByteUnit unit) {
String lower = str.toLowerCase().trim();
String lower = str.toLowerCase(Locale.ROOT).trim();

try {
Matcher m = Pattern.compile("([0-9]+)([a-z]+)?").matcher(lower);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.network.util;

import java.util.Locale;
import java.util.Properties;

import com.google.common.primitives.Ints;
Expand Down Expand Up @@ -75,7 +76,9 @@ public String getModuleName() {
}

/** IO mode: nio or epoll */
public String ioMode() { return conf.get(SPARK_NETWORK_IO_MODE_KEY, "NIO").toUpperCase(); }
public String ioMode() {
return conf.get(SPARK_NETWORK_IO_MODE_KEY, "NIO").toUpperCase(Locale.ROOT);
}

/** If true, we will prefer allocating off-heap byte buffers within Netty. */
public boolean preferDirectBufs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;

public enum TaskSorting {
Expand All @@ -35,7 +36,7 @@ public enum TaskSorting {
}

public static TaskSorting fromString(String str) {
String lower = str.toLowerCase();
String lower = str.toLowerCase(Locale.ROOT);
for (TaskSorting t: values()) {
if (t.alternateNames.contains(lower)) {
return t;
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def setLogLevel(logLevel: String) {
// let's allow lowercase or mixed case too
val upperCased = logLevel.toUpperCase(Locale.ENGLISH)
val upperCased = logLevel.toUpperCase(Locale.ROOT)
require(SparkContext.VALID_LOG_LEVELS.contains(upperCased),
s"Supplied level $logLevel did not match one of:" +
s" ${SparkContext.VALID_LOG_LEVELS.mkString(",")}")
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark

import java.io.File
import java.net.Socket
import java.util.Locale

import scala.collection.mutable
import scala.util.Properties
Expand Down Expand Up @@ -319,7 +320,8 @@ object SparkEnv extends Logging {
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleMgrClass =
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.executor

import java.net.URL
import java.nio.ByteBuffer
import java.util.Locale
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.mutable
Expand Down Expand Up @@ -72,7 +73,7 @@ private[spark] class CoarseGrainedExecutorBackend(
def extractLogUrls: Map[String, String] = {
val prefix = "SPARK_LOG_URL_"
sys.env.filterKeys(_.startsWith(prefix))
.map(e => (e._1.substring(prefix.length).toLowerCase, e._2))
.map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2))
}

override def receive: PartialFunction[Any, Unit] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.io

import java.io._
import java.util.Locale

import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import net.jpountz.lz4.LZ4BlockOutputStream
Expand Down Expand Up @@ -66,7 +67,8 @@ private[spark] object CompressionCodec {
}

def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName)
val codecClass =
shortCompressionCodecNames.getOrElse(codecName.toLowerCase(Locale.ROOT), codecName)
val codec = try {
val ctor = Utils.classForName(codecClass).getConstructor(classOf[SparkConf])
Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.metrics.sink

import java.util.Properties
import java.util.{Locale, Properties}
import java.util.concurrent.TimeUnit

import com.codahale.metrics.{ConsoleReporter, MetricRegistry}
Expand All @@ -39,7 +39,7 @@ private[spark] class ConsoleSink(val property: Properties, val registry: MetricR
}

val pollUnit: TimeUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private[spark] class CsvSink(val property: Properties, val registry: MetricRegis
}

val pollUnit: TimeUnit = Option(property.getProperty(CSV_KEY_UNIT)) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.metrics.sink

import java.net.InetSocketAddress
import java.util.Properties
import java.util.{Locale, Properties}
import java.util.concurrent.TimeUnit

import com.codahale.metrics.MetricRegistry
Expand Down Expand Up @@ -59,15 +59,15 @@ private[spark] class GraphiteSink(val property: Properties, val registry: Metric
}

val pollUnit: TimeUnit = propertyToOption(GRAPHITE_KEY_UNIT) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
case None => TimeUnit.valueOf(GRAPHITE_DEFAULT_UNIT)
}

val prefix = propertyToOption(GRAPHITE_KEY_PREFIX).getOrElse(GRAPHITE_DEFAULT_PREFIX)

MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)

val graphite = propertyToOption(GRAPHITE_KEY_PROTOCOL).map(_.toLowerCase) match {
val graphite = propertyToOption(GRAPHITE_KEY_PROTOCOL).map(_.toLowerCase(Locale.ROOT)) match {
case Some("udp") => new GraphiteUDP(new InetSocketAddress(host, port))
case Some("tcp") | None => new Graphite(new InetSocketAddress(host, port))
case Some(p) => throw new Exception(s"Invalid Graphite protocol: $p")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.metrics.sink

import java.util.Properties
import java.util.{Locale, Properties}
import java.util.concurrent.TimeUnit

import com.codahale.metrics.{MetricRegistry, Slf4jReporter}
Expand All @@ -42,7 +42,7 @@ private[spark] class Slf4jSink(
}

val pollUnit: TimeUnit = Option(property.getProperty(SLF4J_KEY_UNIT)) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
case None => TimeUnit.valueOf(SLF4J_DEFAULT_UNIT)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.scheduler
import java.io._
import java.net.URI
import java.nio.charset.StandardCharsets
import java.util.Locale

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -316,7 +317,7 @@ private[spark] object EventLoggingListener extends Logging {
}

private def sanitize(str: String): String = {
str.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
str.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase(Locale.ROOT)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.scheduler

import java.io.{FileInputStream, InputStream}
import java.util.{NoSuchElementException, Properties}
import java.util.{Locale, NoSuchElementException, Properties}

import scala.util.control.NonFatal
import scala.xml.{Node, XML}
Expand Down Expand Up @@ -142,7 +142,8 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
defaultValue: SchedulingMode,
fileName: String): SchedulingMode = {

val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase
val xmlSchedulingMode =
(poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase(Locale.ROOT)
val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode found in " +
s"Fair Scheduler configuration file: $fileName, using " +
s"the default schedulingMode: $defaultValue for pool: $poolName"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.scheduler

import java.nio.ByteBuffer
import java.util.{Timer, TimerTask}
import java.util.{Locale, Timer, TimerTask}
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong

Expand Down Expand Up @@ -56,8 +56,7 @@ private[spark] class TaskSchedulerImpl private[scheduler](
val maxTaskFailures: Int,
private[scheduler] val blacklistTrackerOpt: Option[BlacklistTracker],
isLocal: Boolean = false)
extends TaskScheduler with Logging
{
extends TaskScheduler with Logging {

import TaskSchedulerImpl._

Expand Down Expand Up @@ -135,12 +134,13 @@ private[spark] class TaskSchedulerImpl private[scheduler](
private var schedulableBuilder: SchedulableBuilder = null
// default scheduler is FIFO
private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString)
val schedulingMode: SchedulingMode = try {
SchedulingMode.withName(schedulingModeConf.toUpperCase)
} catch {
case e: java.util.NoSuchElementException =>
throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: $schedulingModeConf")
}
val schedulingMode: SchedulingMode =
try {
SchedulingMode.withName(schedulingModeConf.toUpperCase(Locale.ROOT))
} catch {
case e: java.util.NoSuchElementException =>
throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: $schedulingModeConf")
}

val rootPool: Pool = new Pool("", schedulingMode, 0, 0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.serializer

import java.io._
import java.nio.ByteBuffer
import java.util.Locale
import javax.annotation.Nullable

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -244,7 +245,8 @@ class KryoDeserializationStream(
kryo.readClassAndObject(input).asInstanceOf[T]
} catch {
// DeserializationStream uses the EOF exception to indicate stopping condition.
case e: KryoException if e.getMessage.toLowerCase.contains("buffer underflow") =>
case e: KryoException
if e.getMessage.toLowerCase(Locale.ROOT).contains("buffer underflow") =>
throw new EOFException
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.ui.exec

import java.util.Locale
import javax.servlet.http.HttpServletRequest

import scala.xml.{Node, Text}
Expand All @@ -42,7 +43,8 @@ private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage
val v1 = if (threadTrace1.threadName.contains("Executor task launch")) 1 else 0
val v2 = if (threadTrace2.threadName.contains("Executor task launch")) 1 else 0
if (v1 == v2) {
threadTrace1.threadName.toLowerCase < threadTrace2.threadName.toLowerCase
threadTrace1.threadName.toLowerCase(Locale.ROOT) <
threadTrace2.threadName.toLowerCase(Locale.ROOT)
} else {
v1 > v2
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.ui.jobs

import java.util.Date
import java.util.{Date, Locale}
import javax.servlet.http.HttpServletRequest

import scala.collection.mutable.{Buffer, ListBuffer}
Expand Down Expand Up @@ -77,7 +77,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
| 'content': '<div class="job-timeline-content" data-toggle="tooltip"' +
| 'data-placement="top" data-html="true"' +
| 'data-title="${jsEscapedName} (Stage ${stageId}.${attemptId})<br>' +
| 'Status: ${status.toUpperCase}<br>' +
| 'Status: ${status.toUpperCase(Locale.ROOT)}<br>' +
| 'Submitted: ${UIUtils.formatDate(new Date(submissionTime))}' +
| '${
if (status != "running") {
Expand Down
26 changes: 16 additions & 10 deletions core/src/test/scala/org/apache/spark/DebugFilesystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark
import java.io.{FileDescriptor, InputStream}
import java.lang
import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand All @@ -31,21 +30,29 @@ import org.apache.spark.internal.Logging

object DebugFilesystem extends Logging {
// Stores the set of active streams and their creation sites.
private val openStreams = new ConcurrentHashMap[FSDataInputStream, Throwable]()
private val openStreams = mutable.Map.empty[FSDataInputStream, Throwable]

def clearOpenStreams(): Unit = {
def addOpenStream(stream: FSDataInputStream): Unit = openStreams.synchronized {
openStreams.put(stream, new Throwable())
}

def clearOpenStreams(): Unit = openStreams.synchronized {
openStreams.clear()
}

def assertNoOpenStreams(): Unit = {
val numOpen = openStreams.size()
def removeOpenStream(stream: FSDataInputStream): Unit = openStreams.synchronized {
openStreams.remove(stream)
}

def assertNoOpenStreams(): Unit = openStreams.synchronized {
val numOpen = openStreams.values.size
if (numOpen > 0) {
for (exc <- openStreams.values().asScala) {
for (exc <- openStreams.values) {
logWarning("Leaked filesystem connection created at:")
exc.printStackTrace()
}
throw new IllegalStateException(s"There are $numOpen possibly leaked file streams.",
openStreams.values().asScala.head)
openStreams.values.head)
}
}
}
Expand All @@ -60,8 +67,7 @@ class DebugFilesystem extends LocalFileSystem {

override def open(f: Path, bufferSize: Int): FSDataInputStream = {
val wrapped: FSDataInputStream = super.open(f, bufferSize)
openStreams.put(wrapped, new Throwable())

addOpenStream(wrapped)
new FSDataInputStream(wrapped.getWrappedStream) {
override def setDropBehind(dropBehind: lang.Boolean): Unit = wrapped.setDropBehind(dropBehind)

Expand Down Expand Up @@ -98,7 +104,7 @@ class DebugFilesystem extends LocalFileSystem {

override def close(): Unit = {
wrapped.close()
openStreams.remove(wrapped)
removeOpenStream(wrapped)
}

override def read(): Int = wrapped.read()
Expand Down
Loading