Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Column.scala
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ class Column(protected[sql] val expr: Expression) extends Logging {
* @group expr_ops
* @since 1.4.0
*/
def when(condition: Column, value: Any):Column = this.expr match {
def when(condition: Column, value: Any): Column = this.expr match {
case CaseWhen(branches: Seq[Expression]) =>
CaseWhen(branches ++ Seq(lit(condition).expr, lit(value).expr))
case _ =>
Expand Down Expand Up @@ -378,7 +378,7 @@ class Column(protected[sql] val expr: Expression) extends Logging {
* @group expr_ops
* @since 1.4.0
*/
def otherwise(value: Any):Column = this.expr match {
def otherwise(value: Any): Column = this.expr match {
case CaseWhen(branches: Seq[Expression]) =>
if (branches.size % 2 == 0) {
CaseWhen(branches :+ lit(value).expr)
Expand Down
18 changes: 9 additions & 9 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ class DataFrame private[sql](
val newCols = logicalPlan.output.zip(colNames).map { case (oldAttribute, newName) =>
Column(oldAttribute).as(newName)
}
select(newCols :_*)
select(newCols : _*)
}

/**
Expand Down Expand Up @@ -500,7 +500,7 @@ class DataFrame private[sql](
*/
@scala.annotation.varargs
def sort(sortCol: String, sortCols: String*): DataFrame = {
sort((sortCol +: sortCols).map(apply) :_*)
sort((sortCol +: sortCols).map(apply) : _*)
}

/**
Expand Down Expand Up @@ -531,7 +531,7 @@ class DataFrame private[sql](
* @since 1.3.0
*/
@scala.annotation.varargs
def orderBy(sortCol: String, sortCols: String*): DataFrame = sort(sortCol, sortCols :_*)
def orderBy(sortCol: String, sortCols: String*): DataFrame = sort(sortCol, sortCols : _*)

/**
* Returns a new [[DataFrame]] sorted by the given expressions.
Expand All @@ -540,7 +540,7 @@ class DataFrame private[sql](
* @since 1.3.0
*/
@scala.annotation.varargs
def orderBy(sortExprs: Column*): DataFrame = sort(sortExprs :_*)
def orderBy(sortExprs: Column*): DataFrame = sort(sortExprs : _*)

/**
* Selects column based on the column name and return it as a [[Column]].
Expand Down Expand Up @@ -611,7 +611,7 @@ class DataFrame private[sql](
* @since 1.3.0
*/
@scala.annotation.varargs
def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) :_*)
def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)

/**
* Selects a set of SQL expressions. This is a variant of `select` that accepts
Expand Down Expand Up @@ -825,7 +825,7 @@ class DataFrame private[sql](
* @since 1.3.0
*/
def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = {
groupBy().agg(aggExpr, aggExprs :_*)
groupBy().agg(aggExpr, aggExprs : _*)
}

/**
Expand Down Expand Up @@ -863,7 +863,7 @@ class DataFrame private[sql](
* @since 1.3.0
*/
@scala.annotation.varargs
def agg(expr: Column, exprs: Column*): DataFrame = groupBy().agg(expr, exprs :_*)
def agg(expr: Column, exprs: Column*): DataFrame = groupBy().agg(expr, exprs : _*)

/**
* Returns a new [[DataFrame]] by taking the first `n` rows. The difference between this function
Expand Down Expand Up @@ -1039,7 +1039,7 @@ class DataFrame private[sql](
val name = field.name
if (resolver(name, colName)) col.as(colName) else Column(name)
}
select(colNames :_*)
select(colNames : _*)
} else {
select(Column("*"), col.as(colName))
}
Expand Down Expand Up @@ -1262,7 +1262,7 @@ class DataFrame private[sql](
* @group action
* @since 1.3.0
*/
override def collectAsList(): java.util.List[Row] = java.util.Arrays.asList(rdd.collect() :_*)
override def collectAsList(): java.util.List[Row] = java.util.Arrays.asList(rdd.collect() : _*)

/**
* Returns the number of rows in the [[DataFrame]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ private[sql] case class DataFrameHolder(df: DataFrame) {
// `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
def toDF(): DataFrame = df

def toDF(colNames: String*): DataFrame = df.toDF(colNames :_*)
def toDF(colNames: String*): DataFrame = df.toDF(colNames : _*)
}
10 changes: 5 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class GroupedData protected[sql](
*/
@scala.annotation.varargs
def mean(colNames: String*): DataFrame = {
aggregateNumericColumns(colNames:_*)(Average)
aggregateNumericColumns(colNames : _*)(Average)
}

/**
Expand All @@ -259,7 +259,7 @@ class GroupedData protected[sql](
*/
@scala.annotation.varargs
def max(colNames: String*): DataFrame = {
aggregateNumericColumns(colNames:_*)(Max)
aggregateNumericColumns(colNames : _*)(Max)
}

/**
Expand All @@ -271,7 +271,7 @@ class GroupedData protected[sql](
*/
@scala.annotation.varargs
def avg(colNames: String*): DataFrame = {
aggregateNumericColumns(colNames:_*)(Average)
aggregateNumericColumns(colNames : _*)(Average)
}

/**
Expand All @@ -283,7 +283,7 @@ class GroupedData protected[sql](
*/
@scala.annotation.varargs
def min(colNames: String*): DataFrame = {
aggregateNumericColumns(colNames:_*)(Min)
aggregateNumericColumns(colNames : _*)(Min)
}

/**
Expand All @@ -295,6 +295,6 @@ class GroupedData protected[sql](
*/
@scala.annotation.varargs
def sum(colNames: String*): DataFrame = {
aggregateNumericColumns(colNames:_*)(Sum)
aggregateNumericColumns(colNames : _*)(Sum)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
implicit class StringToColumn(val sc: StringContext) {
def $(args: Any*): ColumnName = {
new ColumnName(sc.s(args :_*))
new ColumnName(sc.s(args : _*))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
}
}

protected val AS = Keyword("AS")
protected val CACHE = Keyword("CACHE")
protected val CLEAR = Keyword("CLEAR")
protected val IN = Keyword("IN")
protected val LAZY = Keyword("LAZY")
protected val SET = Keyword("SET")
protected val SHOW = Keyword("SHOW")
protected val TABLE = Keyword("TABLE")
protected val TABLES = Keyword("TABLES")
protected val AS = Keyword("AS")
protected val CACHE = Keyword("CACHE")
protected val CLEAR = Keyword("CLEAR")
protected val IN = Keyword("IN")
protected val LAZY = Keyword("LAZY")
protected val SET = Keyword("SET")
protected val SHOW = Keyword("SHOW")
protected val TABLE = Keyword("TABLE")
protected val TABLES = Keyword("TABLES")
protected val UNCACHE = Keyword("UNCACHE")

override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | show | others
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ private[sql] case class InMemoryColumnarTableScan(
case GreaterThanOrEqual(a: AttributeReference, l: Literal) => l <= statsFor(a).upperBound
case GreaterThanOrEqual(l: Literal, a: AttributeReference) => statsFor(a).lowerBound <= l

case IsNull(a: Attribute) => statsFor(a).nullCount > 0
case IsNull(a: Attribute) => statsFor(a).nullCount > 0
case IsNotNull(a: Attribute) => statsFor(a).count - statsFor(a).nullCount > 0
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
.sliding(2)
.map {
case Seq(a) => true
case Seq(a,b) => a compatibleWith b
case Seq(a, b) => a.compatibleWith(b)
}.exists(!_)

// Adds Exchange or Sort operators as required
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case (predicate, None) => predicate
// Filter needs to be applied above when it contains partitioning
// columns
case (predicate, _) if(!predicate.references.map(_.name).toSet
.intersect (partitionColNames).isEmpty) => predicate
case (predicate, _)
if !predicate.references.map(_.name).toSet.intersect(partitionColNames).isEmpty =>
predicate
}
}
} else {
Expand All @@ -270,7 +271,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
projectList,
filters,
identity[Seq[Expression]], // All filters still need to be evaluated.
InMemoryColumnarTableScan(_, filters, mem)) :: Nil
InMemoryColumnarTableScan(_, filters, mem)) :: Nil
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ case class BroadcastLeftSemiJoinHash(
override def output: Seq[Attribute] = left.output

protected override def doExecute(): RDD[Row] = {
val buildIter= buildPlan.execute().map(_.copy()).collect().toIterator
val buildIter = buildPlan.execute().map(_.copy()).collect().toIterator
val hashSet = new java.util.HashSet[Row]()
var currentRow: Row = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private[sql] object FrequentItems extends Logging {
(name, originalSchema.fields(index).dataType)
}

val freqItems = df.select(cols.map(Column(_)):_*).rdd.aggregate(countMaps)(
val freqItems = df.select(cols.map(Column(_)) : _*).rdd.aggregate(countMaps)(
seqOp = (counts, row) => {
var i = 0
while (i < numCols) {
Expand All @@ -110,7 +110,7 @@ private[sql] object FrequentItems extends Logging {
}
)
val justItems = freqItems.map(m => m.baseMap.keys.toSeq)
val resultRow = Row(justItems:_*)
val resultRow = Row(justItems : _*)
// append frequent Items to the column name for easy debugging
val outputCols = colInfo.map { v =>
StructField(v._1 + "_freqItems", ArrayType(v._2, false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ object functions {
*/
@scala.annotation.varargs
def countDistinct(columnName: String, columnNames: String*): Column =
countDistinct(Column(columnName), columnNames.map(Column.apply) :_*)
countDistinct(Column(columnName), columnNames.map(Column.apply) : _*)

/**
* Aggregate function: returns the approximate number of distinct items in a group.
Expand Down
44 changes: 23 additions & 21 deletions sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ private[sql] object JDBCRDD extends Logging {
scale: Int,
signed: Boolean): DataType = {
val answer = sqlType match {
// scalastyle:off
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want to turn off the check here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just too many things I didn't bother changing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see... Regex replacing rocks here :)

case java.sql.Types.ARRAY => null
case java.sql.Types.BIGINT => if (signed) { LongType } else { DecimalType.Unlimited }
case java.sql.Types.BINARY => BinaryType
Expand Down Expand Up @@ -92,7 +93,8 @@ private[sql] object JDBCRDD extends Logging {
case java.sql.Types.TINYINT => IntegerType
case java.sql.Types.VARBINARY => BinaryType
case java.sql.Types.VARCHAR => StringType
case _ => null
case _ => null
// scalastyle:on
}

if (answer == null) throw new SQLException("Unsupported type " + sqlType)
Expand Down Expand Up @@ -323,19 +325,19 @@ private[sql] class JDBCRDD(
*/
def getConversions(schema: StructType): Array[JDBCConversion] = {
schema.fields.map(sf => sf.dataType match {
case BooleanType => BooleanConversion
case DateType => DateConversion
case BooleanType => BooleanConversion
case DateType => DateConversion
case DecimalType.Unlimited => DecimalConversion(None)
case DecimalType.Fixed(d) => DecimalConversion(Some(d))
case DoubleType => DoubleConversion
case FloatType => FloatConversion
case IntegerType => IntegerConversion
case LongType =>
case DecimalType.Fixed(d) => DecimalConversion(Some(d))
case DoubleType => DoubleConversion
case FloatType => FloatConversion
case IntegerType => IntegerConversion
case LongType =>
if (sf.metadata.contains("binarylong")) BinaryLongConversion else LongConversion
case StringType => StringConversion
case TimestampType => TimestampConversion
case BinaryType => BinaryConversion
case _ => throw new IllegalArgumentException(s"Unsupported field $sf")
case StringType => StringConversion
case TimestampType => TimestampConversion
case BinaryType => BinaryConversion
case _ => throw new IllegalArgumentException(s"Unsupported field $sf")
}).toArray
}

Expand Down Expand Up @@ -376,8 +378,8 @@ private[sql] class JDBCRDD(
while (i < conversions.length) {
val pos = i + 1
conversions(i) match {
case BooleanConversion => mutableRow.setBoolean(i, rs.getBoolean(pos))
case DateConversion =>
case BooleanConversion => mutableRow.setBoolean(i, rs.getBoolean(pos))
case DateConversion =>
// DateUtils.fromJavaDate does not handle null value, so we need to check it.
val dateVal = rs.getDate(pos)
if (dateVal != null) {
Expand Down Expand Up @@ -407,14 +409,14 @@ private[sql] class JDBCRDD(
} else {
mutableRow.update(i, Decimal(decimalVal))
}
case DoubleConversion => mutableRow.setDouble(i, rs.getDouble(pos))
case FloatConversion => mutableRow.setFloat(i, rs.getFloat(pos))
case IntegerConversion => mutableRow.setInt(i, rs.getInt(pos))
case LongConversion => mutableRow.setLong(i, rs.getLong(pos))
case DoubleConversion => mutableRow.setDouble(i, rs.getDouble(pos))
case FloatConversion => mutableRow.setFloat(i, rs.getFloat(pos))
case IntegerConversion => mutableRow.setInt(i, rs.getInt(pos))
case LongConversion => mutableRow.setLong(i, rs.getLong(pos))
// TODO(davies): use getBytes for better performance, if the encoding is UTF-8
case StringConversion => mutableRow.setString(i, rs.getString(pos))
case TimestampConversion => mutableRow.update(i, rs.getTimestamp(pos))
case BinaryConversion => mutableRow.update(i, rs.getBytes(pos))
case StringConversion => mutableRow.setString(i, rs.getString(pos))
case TimestampConversion => mutableRow.update(i, rs.getTimestamp(pos))
case BinaryConversion => mutableRow.update(i, rs.getBytes(pos))
case BinaryLongConversion => {
val bytes = rs.getBytes(pos)
var ans = 0L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private[sql] object InferSchema {
case ArrayType(NullType, containsNull) => ArrayType(StringType, containsNull)
case ArrayType(struct: StructType, containsNull) =>
ArrayType(nullTypeToStringType(struct), containsNull)
case struct: StructType =>nullTypeToStringType(struct)
case struct: StructType => nullTypeToStringType(struct)
case other: DataType => other
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private[sql] object JacksonGenerator {
*/
def apply(rowSchema: StructType, gen: JsonGenerator)(row: Row): Unit = {
def valWriter: (DataType, Any) => Unit = {
case (_, null) | (NullType, _) => gen.writeNull()
case (_, null) | (NullType, _) => gen.writeNull()
case (StringType, v: String) => gen.writeString(v)
case (TimestampType, v: java.sql.Timestamp) => gen.writeString(v.toString)
case (IntegerType, v: Int) => gen.writeNumber(v)
Expand All @@ -48,16 +48,16 @@ private[sql] object JacksonGenerator {
case (DateType, v) => gen.writeString(v.toString)
case (udt: UserDefinedType[_], v) => valWriter(udt.sqlType, udt.serialize(v))

case (ArrayType(ty, _), v: Seq[_] ) =>
case (ArrayType(ty, _), v: Seq[_]) =>
gen.writeStartArray()
v.foreach(valWriter(ty,_))
v.foreach(valWriter(ty, _))
gen.writeEndArray()

case (MapType(kv,vv, _), v: Map[_,_]) =>
case (MapType(kv, vv, _), v: Map[_, _]) =>
gen.writeStartObject()
v.foreach { p =>
gen.writeFieldName(p._1.toString)
valWriter(vv,p._2)
valWriter(vv, p._2)
}
gen.writeEndObject()

Expand Down
Loading