Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions core/src/test/scala/org/apache/spark/FileServerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import java.net.URI
import java.util.jar.{JarEntry, JarOutputStream}
import javax.net.ssl.SSLException

import com.google.common.io.ByteStreams
import org.apache.commons.io.{FileUtils, IOUtils}
import com.google.common.io.{ByteStreams, Files}
import org.apache.commons.lang3.RandomUtils
import org.scalatest.FunSuite

Expand Down Expand Up @@ -239,7 +238,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
def fileTransferTest(server: HttpFileServer, sm: SecurityManager = null): Unit = {
val randomContent = RandomUtils.nextBytes(100)
val file = File.createTempFile("FileServerSuite", "sslTests", tmpDir)
FileUtils.writeByteArrayToFile(file, randomContent)
Files.write(randomContent, file)
server.addFile(file)

val uri = new URI(server.serverUri + "/files/" + file.getName)
Expand All @@ -254,7 +253,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
Utils.setupSecureURLConnection(connection, sm)
}

val buf = IOUtils.toByteArray(connection.getInputStream)
val buf = ByteStreams.toByteArray(connection.getInputStream)
assert(buf === randomContent)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.{File, FileWriter, PrintWriter}

import scala.collection.mutable.ArrayBuffer

import org.apache.commons.lang.math.RandomUtils
import org.apache.commons.lang3.RandomUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{LongWritable, Text}
Expand Down Expand Up @@ -60,7 +60,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext
tmpFile = new File(testTempDir, getClass.getSimpleName + ".txt")
val pw = new PrintWriter(new FileWriter(tmpFile))
for (x <- 1 to numRecords) {
pw.println(RandomUtils.nextInt(numBuckets))
pw.println(RandomUtils.nextInt(0, numBuckets))
}
pw.close()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,24 @@

package org.apache.spark.network.netty

import java.io.InputStreamReader
import java.nio._
import java.nio.charset.Charset
import java.util.concurrent.TimeUnit

import scala.concurrent.duration._
import scala.concurrent.{Await, Promise}
import scala.util.{Failure, Success, Try}

import org.apache.commons.io.IOUtils
import com.google.common.io.CharStreams
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.shuffle.BlockFetchingListener
import org.apache.spark.network.{BlockDataManager, BlockTransferService}
import org.apache.spark.storage.{BlockId, ShuffleBlockId}
import org.apache.spark.{SecurityManager, SparkConf}
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite, ShouldMatchers}
import org.scalatest.{FunSuite, ShouldMatchers}

class NettyBlockTransferSecuritySuite extends FunSuite with MockitoSugar with ShouldMatchers {
test("security default off") {
Expand Down Expand Up @@ -113,7 +115,9 @@ class NettyBlockTransferSecuritySuite extends FunSuite with MockitoSugar with Sh

val result = fetchBlock(exec0, exec1, "1", blockId) match {
case Success(buf) =>
IOUtils.toString(buf.createInputStream()) should equal(blockString)
val actualString = CharStreams.toString(
new InputStreamReader(buf.createInputStream(), Charset.forName("UTF-8")))
actualString should equal(blockString)
buf.release()
Success()
case Failure(t) =>
Expand Down
4 changes: 4 additions & 0 deletions external/flume-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
<url>http://spark.apache.org/</url>

<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import java.util.concurrent.atomic.AtomicLong

import scala.collection.mutable

import org.apache.flume.Channel
import org.apache.commons.lang.RandomStringUtils
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.flume.Channel
import org.apache.commons.lang3.RandomStringUtils

/**
* Class that implements the SparkFlumeProtocol, that is used by the Avro Netty Server to process
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.spark.sql.jdbc
import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, SQLException}
import java.util.Properties

import org.apache.commons.lang.StringEscapeUtils.escapeSql
import org.apache.commons.lang3.StringUtils

import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Row, SpecificMutableRow}
Expand Down Expand Up @@ -239,6 +240,9 @@ private[sql] class JDBCRDD(
case _ => value
}

private def escapeSql(value: String): String =
if (value == null) null else StringUtils.replace(value, "'", "''")

/**
* Turns a single Filter into a String representing a SQL expression.
* Returns null for an unhandled filter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.thriftserver

import scala.collection.JavaConversions._

import org.apache.commons.lang.exception.ExceptionUtils
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
Expand Down Expand Up @@ -61,7 +61,7 @@ private[hive] abstract class AbstractSparkSQLDriver(
} catch {
case cause: Throwable =>
logError(s"Failed in [$command]", cause)
new CommandProcessorResponse(1, ExceptionUtils.getFullStackTrace(cause), null)
new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(cause), null)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,16 @@ import java.util.{ArrayList => JArrayList}

import jline.{ConsoleReader, History}

import org.apache.commons.lang.StringUtils
import org.apache.commons.lang3.StringUtils
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.cli.{CliDriver, CliSessionState, OptionsProcessor}
import org.apache.hadoop.hive.common.LogUtils.LogInitializationException
import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils, LogUtils}
import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, SetProcessor, CommandProcessor, CommandProcessorFactory}
import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, SetProcessor, CommandProcessor}
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.thrift.transport.TSocket

import org.apache.spark.Logging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.spark.sql.hive.execution;

import org.apache.hadoop.hive.ql.exec.UDF;

import java.util.List;
import org.apache.commons.lang.StringUtils;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;

public class UDFListString extends UDF {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import scala.collection.mutable.ArrayBuffer

import org.scalatest.BeforeAndAfterEach

import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.metastore.TableType
import org.apache.hadoop.hive.ql.metadata.Table
Expand Down Expand Up @@ -174,7 +173,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("SELECT * FROM jsonTable"),
Row("a", "b"))

FileUtils.deleteDirectory(tempDir)
Utils.deleteRecursively(tempDir)
sparkContext.parallelize(("a1", "b1", "c1") :: Nil).toDF()
.toJSON.saveAsTextFile(tempDir.getCanonicalPath)

Expand All @@ -190,7 +189,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
checkAnswer(
sql("SELECT * FROM jsonTable"),
Row("a1", "b1", "c1"))
FileUtils.deleteDirectory(tempDir)
Utils.deleteRecursively(tempDir)
}

test("drop, change, recreate") {
Expand All @@ -212,7 +211,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("SELECT * FROM jsonTable"),
Row("a", "b"))

FileUtils.deleteDirectory(tempDir)
Utils.deleteRecursively(tempDir)
sparkContext.parallelize(("a", "b", "c") :: Nil).toDF()
.toJSON.saveAsTextFile(tempDir.getCanonicalPath)

Expand All @@ -231,7 +230,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
checkAnswer(
sql("SELECT * FROM jsonTable"),
Row("a", "b", "c"))
FileUtils.deleteDirectory(tempDir)
Utils.deleteRecursively(tempDir)
}

test("invalidate cache and reload") {
Expand Down