Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.expressions.codegen

import org.apache.commons.lang3.StringUtils
import java.util.regex.Matcher

/**
* An utility class that indents a block of code based on the curly braces and parentheses.
Expand All @@ -26,13 +26,17 @@ import org.apache.commons.lang3.StringUtils
* Written by Matei Zaharia.
*/
object CodeFormatter {
val commentHolder = """\/\*(.+?)\*\/""".r

def format(code: CodeAndComment): String = {
new CodeFormatter().addLines(
StringUtils.replaceEach(
code.body,
code.comment.keys.toArray,
code.comment.values.toArray)
).result
val formatter = new CodeFormatter
code.body.split("\n").foreach { line =>
val commentReplaced = commentHolder.replaceAllIn(
line.trim,
m => code.comment.get(m.group(1)).map(Matcher.quoteReplacement).getOrElse(m.group(0)))
formatter.addLine(commentReplaced)
}
formatter.result()
Copy link
Contributor Author

@cloud-fan cloud-fan May 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @sarutak , here I assume the placeholder will always take an entire line, is it corrected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How slow is it if we use an regexp to match the placeholder here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, SGTM!

}

def stripExtraNewLines(input: String): String = {
Expand All @@ -53,16 +57,28 @@ object CodeFormatter {
def stripOverlappingComments(codeAndComment: CodeAndComment): CodeAndComment = {
val code = new StringBuilder
val map = codeAndComment.comment

def getComment(line: String): Option[String] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous one doesn't handle the case that the line is not in the comments map.

if (line.startsWith("/*") && line.endsWith("*/")) {
map.get(line.substring(2, line.length - 2))
} else {
None
}
}

var lastLine: String = "dummy"
codeAndComment.body.split('\n').foreach { l =>
val line = l.trim()
val skip = lastLine.startsWith("/*") && lastLine.endsWith("*/") &&
line.startsWith("/*") && line.endsWith("*/") &&
map(lastLine).substring(3).contains(map(line).substring(3))

val skip = getComment(lastLine).zip(getComment(line)).exists {
case (lastComment, currentComment) =>
lastComment.substring(3).contains(currentComment.substring(3))
}

if (!skip) {
code.append(line)
code.append("\n")
code.append(line).append("\n")
}

lastLine = line
}
new CodeAndComment(code.result().trim(), map)
Expand Down Expand Up @@ -117,8 +133,9 @@ private class CodeFormatter {
} else {
indentString
}
code.append(f"/* ${currentLine}%03d */ ")
code.append(f"/* ${currentLine}%03d */")
if (line.trim().length > 0) {
code.append(" ") // add a space after the line number comment.
code.append(thisLineIndent)
if (inCommentBlock && line.startsWith("*") || line.startsWith("*/")) code.append(" ")
code.append(line)
Expand All @@ -129,10 +146,5 @@ private class CodeFormatter {
currentLine += 1
}

private def addLines(code: String): CodeFormatter = {
code.split('\n').foreach(s => addLine(s.trim()))
this
}

private def result(): String = code.result()
}
Original file line number Diff line number Diff line change
Expand Up @@ -717,28 +717,18 @@ class CodegenContext {
*/
def getPlaceHolderToComments(): collection.Map[String, String] = placeHolderToComments

/**
* Register a multi-line comment and return the corresponding place holder
*/
private def registerMultilineComment(text: String): String = {
val placeHolder = s"/*${freshName("c")}*/"
val comment = text.split("(\r\n)|\r|\n").mkString("/**\n * ", "\n * ", "\n */")
placeHolderToComments += (placeHolder -> comment)
placeHolder
}

/**
* Register a comment and return the corresponding place holder
*/
def registerComment(text: String): String = {
if (text.contains("\n") || text.contains("\r")) {
registerMultilineComment(text)
val name = freshName("c")
val comment = if (text.contains("\n") || text.contains("\r")) {
text.split("(\r\n)|\r|\n").mkString("/**\n * ", "\n * ", "\n */")
} else {
val placeHolder = s"/*${freshName("c")}*/"
val safeComment = s"// $text"
placeHolderToComments += (placeHolder -> safeComment)
placeHolder
s"// $text"
}
placeHolderToComments += (name -> comment)
s"/*$name*/"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.objects.CreateExternalRow
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -137,6 +138,19 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
}
}

test("SPARK-14224: split wide external row creation into blocks due to JVM code size limit") {
val length = 5000
val schema = StructType(Seq.fill(length)(StructField("int", IntegerType)))
val expressions = Seq(CreateExternalRow(Seq.fill(length)(Literal(1)), schema))
val plan = GenerateMutableProjection.generate(expressions)
val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
val expected = Seq(Row.fromSeq(Seq.fill(length)(1)))

if (!checkResult(actual, expected)) {
fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected")
}
}

test("test generated safe and unsafe projection") {
val schema = new StructType(Array(
StructField("a", StringType, true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import org.apache.spark.sql.catalyst.util._

class CodeFormatterSuite extends SparkFunSuite {

def testCase(name: String)(input: String)(expected: String): Unit = {
def testCase(name: String)(
input: String, comment: Map[String, String] = Map.empty)(expected: String): Unit = {
test(name) {
val sourceCode = new CodeAndComment(input, Map.empty)
val sourceCode = new CodeAndComment(input.trim, comment)
Copy link
Contributor Author

@cloud-fan cloud-fan May 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

always trim the input, so that we can write

"""
  |abc
  |xyz
""".stripMargin

instead of

"""abc
  |xyz""".stripMargin

if (CodeFormatter.format(sourceCode).trim !== expected.trim) {
fail(
s"""
Expand All @@ -43,19 +44,21 @@ class CodeFormatterSuite extends SparkFunSuite {
|/*project_c2*/
""".stripMargin,
Map(
"/*project_c4*/" -> "// (((input[0, bigint, false] + 1) + 2) + 3))",
"/*project_c3*/" -> "// ((input[0, bigint, false] + 1) + 2)",
"/*project_c2*/" -> "// (input[0, bigint, false] + 1)"
"project_c4" -> "// (((input[0, bigint, false] + 1) + 2) + 3))",
"project_c3" -> "// ((input[0, bigint, false] + 1) + 2)",
"project_c2" -> "// (input[0, bigint, false] + 1)"
))

val reducedCode = CodeFormatter.stripOverlappingComments(code)
assert(reducedCode.body === "/*project_c4*/")
}

testCase("basic example") {
"""class A {
"""
|class A {
|blahblah;
|}""".stripMargin
|}
""".stripMargin
}{
"""
|/* 001 */ class A {
Expand All @@ -65,11 +68,13 @@ class CodeFormatterSuite extends SparkFunSuite {
}

testCase("nested example") {
"""class A {
"""
|class A {
| if (c) {
|duh;
|}
|}""".stripMargin
|}
""".stripMargin
} {
"""
|/* 001 */ class A {
Expand All @@ -81,9 +86,11 @@ class CodeFormatterSuite extends SparkFunSuite {
}

testCase("single line") {
"""class A {
"""
|class A {
| if (c) {duh;}
|}""".stripMargin
|}
""".stripMargin
}{
"""
|/* 001 */ class A {
Expand All @@ -93,9 +100,11 @@ class CodeFormatterSuite extends SparkFunSuite {
}

testCase("if else on the same line") {
"""class A {
"""
|class A {
| if (c) {duh;} else {boo;}
|}""".stripMargin
|}
""".stripMargin
}{
"""
|/* 001 */ class A {
Expand All @@ -105,10 +114,12 @@ class CodeFormatterSuite extends SparkFunSuite {
}

testCase("function calls") {
"""foo(
"""
|foo(
|a,
|b,
|c)""".stripMargin
|c)
""".stripMargin
}{
"""
|/* 001 */ foo(
Expand All @@ -119,10 +130,12 @@ class CodeFormatterSuite extends SparkFunSuite {
}

testCase("single line comments") {
"""// This is a comment about class A { { { ( (
"""
|// This is a comment about class A { { { ( (
|class A {
|class body;
|}""".stripMargin
|}
""".stripMargin
}{
"""
|/* 001 */ // This is a comment about class A { { { ( (
Expand All @@ -133,10 +146,12 @@ class CodeFormatterSuite extends SparkFunSuite {
}

testCase("single line comments /* */ ") {
"""/** This is a comment about class A { { { ( ( */
"""
|/** This is a comment about class A { { { ( ( */
|class A {
|class body;
|}""".stripMargin
|}
""".stripMargin
}{
"""
|/* 001 */ /** This is a comment about class A { { { ( ( */
Expand All @@ -147,12 +162,14 @@ class CodeFormatterSuite extends SparkFunSuite {
}

testCase("multi-line comments") {
""" /* This is a comment about
"""
| /* This is a comment about
|class A {
|class body; ...*/
|class A {
|class body;
|}""".stripMargin
|}
""".stripMargin
}{
"""
|/* 001 */ /* This is a comment about
Expand All @@ -164,30 +181,56 @@ class CodeFormatterSuite extends SparkFunSuite {
""".stripMargin
}

// scalastyle:off whitespace.end.of.line
testCase("reduce empty lines") {
CodeFormatter.stripExtraNewLines(
"""class A {
"""
|class A {
|
|
| /*** comment1 */
| /*
| * multi
| * line
| * comment
| */
|
| class body;
|
|
| if (c) {duh;}
| else {boo;}
|}""".stripMargin)
|}
""".stripMargin.trim)
}{
"""
|/* 001 */ class A {
|/* 002 */ /*** comment1 */
|/* 003 */ class body;
|/* 004 */
|/* 005 */ if (c) {duh;}
|/* 006 */ else {boo;}
|/* 007 */ }
|/* 002 */ /*
|/* 003 */ * multi
|/* 004 */ * line
|/* 005 */ * comment
|/* 006 */ */
|/* 007 */ class body;
|/* 008 */
|/* 009 */ if (c) {duh;}
|/* 010 */ else {boo;}
|/* 011 */ }
""".stripMargin
}

testCase("comment place holder")(
"""
|/*c1*/
|class A
|/*c2*/
|class B
|/*c1*//*c2*/
""".stripMargin, Map("c1" -> "/*abc*/", "c2" -> "/*xyz*/")
) {
"""
|/* 001 */ /*abc*/
|/* 002 */ class A
|/* 003 */ /*xyz*/
|/* 004 */ class B
|/* 005 */ /*abc*//*xyz*/
""".stripMargin
}
// scalastyle:on whitespace.end.of.line
}
Original file line number Diff line number Diff line change
Expand Up @@ -581,21 +581,11 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
assert(CatalystReadSupport.expandUDT(schema) === expected)
}

test("read/write wide table") {
withTempPath { dir =>
val path = dir.getCanonicalPath

val df = spark.range(1000).select(Seq.tabulate(1000) {i => ('id + i).as(s"c$i")} : _*)
df.write.mode(SaveMode.Overwrite).parquet(path)
checkAnswer(spark.read.parquet(path), df)
}
}

test("returning batch for wide table") {
withSQLConf("spark.sql.codegen.maxFields" -> "100") {
withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "10") {
withTempPath { dir =>
val path = dir.getCanonicalPath
val df = spark.range(100).select(Seq.tabulate(110) {i => ('id + i).as(s"c$i")} : _*)
val df = spark.range(10).select(Seq.tabulate(11) {i => ('id + i).as(s"c$i")} : _*)
df.write.mode(SaveMode.Overwrite).parquet(path)

// donot return batch, because whole stage codegen is disabled for wide table (>200 columns)
Expand All @@ -605,7 +595,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
checkAnswer(df2, df)

// return batch
val columns = Seq.tabulate(90) {i => s"c$i"}
val columns = Seq.tabulate(9) {i => s"c$i"}
val df3 = df2.selectExpr(columns : _*)
assert(
df3.queryExecution.sparkPlan.find(_.isInstanceOf[BatchedDataSourceScanExec]).isDefined,
Expand Down