Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,42 @@
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
* <li>Name: <code>AES_ENCRYPT</code>
* <ul>
* <li>SQL semantic: <code>AES_ENCRYPT(expr, key[, mode[, padding]])</code></li>
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
* <li>Name: <code>AES_DECRYPT</code>
* <ul>
* <li>SQL semantic: <code>AES_DECRYPT(expr, key[, mode[, padding]])</code></li>
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
* <li>Name: <code>SHA1</code>
* <ul>
* <li>SQL semantic: <code>SHA1(expr)</code></li>
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
* <li>Name: <code>SHA2</code>
* <ul>
* <li>SQL semantic: <code>SHA2(expr, bitLength)</code></li>
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
* <li>Name: <code>MD5</code>
* <ul>
* <li>SQL semantic: <code>MD5(expr)</code></li>
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
* <li>Name: <code>CRC32</code>
* <ul>
* <li>SQL semantic: <code>CRC32(expr)</code></li>
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
* </ol>
* Note: SQL semantic conforms ANSI standard, so some expressions are not supported when ANSI off,
* including: add, subtract, multiply, divide, remainder, pmod.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ public String build(Expression expr) {
case "DATE_ADD":
case "DATE_DIFF":
case "TRUNC":
case "AES_ENCRYPT":
case "AES_DECRYPT":
case "SHA1":
case "SHA2":
case "MD5":
case "CRC32":
return visitSQLFunction(name,
Arrays.stream(e.children()).map(c -> build(c)).toArray(String[]::new));
case "CASE_WHEN": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) {
generateExpression(child).map(v => new V2Extract("WEEK", v))
case YearOfWeek(child) =>
generateExpression(child).map(v => new V2Extract("YEAR_OF_WEEK", v))
case encrypt: AesEncrypt => generateExpressionWithName("AES_ENCRYPT", encrypt.children)
case decrypt: AesDecrypt => generateExpressionWithName("AES_DECRYPT", decrypt.children)
case Crc32(child) => generateExpressionWithName("CRC32", Seq(child))
case Md5(child) => generateExpressionWithName("MD5", Seq(child))
case Sha1(child) => generateExpressionWithName("SHA1", Seq(child))
case sha2: Sha2 => generateExpressionWithName("SHA2", sha2.children)
// TODO supports other expressions
case ApplyFunctionExpression(function, children) =>
val childrenExpressions = children.flatMap(generateExpression(_))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[sql] object H2Dialect extends JdbcDialect {
Set("ABS", "COALESCE", "GREATEST", "LEAST", "RAND", "LOG", "LOG10", "LN", "EXP",
"POWER", "SQRT", "FLOOR", "CEIL", "ROUND", "SIN", "SINH", "COS", "COSH", "TAN",
"TANH", "COT", "ASIN", "ACOS", "ATAN", "ATAN2", "DEGREES", "RADIANS", "SIGN",
"PI", "SUBSTRING", "UPPER", "LOWER", "TRANSLATE", "TRIM")
"PI", "SUBSTRING", "UPPER", "LOWER", "TRANSLATE", "TRIM", "MD5", "SHA1", "SHA2")

override def isSupportedFunction(funcName: String): Boolean =
supportedFunctions.contains(funcName)
Expand Down Expand Up @@ -235,5 +235,22 @@ private[sql] object H2Dialect extends JdbcDialect {
}
s"EXTRACT($newField FROM $source)"
}

override def visitSQLFunction(funcName: String, inputs: Array[String]): String = {
if (isSupportedFunction(funcName)) {
funcName match {
case "MD5" =>
"RAWTOHEX(HASH('MD5', " + inputs.mkString(",") + "))"
case "SHA1" =>
"RAWTOHEX(HASH('SHA-1', " + inputs.mkString(",") + "))"
case "SHA2" =>
"RAWTOHEX(HASH('SHA-" + inputs(1) + "'," + inputs(0) + "))"
case _ => super.visitSQLFunction(funcName, inputs)
}
} else {
throw new UnsupportedOperationException(
s"${this.getClass.getSimpleName} does not support function: $funcName");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel

val tempDir = Utils.createTempDir()
val url = s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass"
val testBytes = Array[Byte](99.toByte, 134.toByte, 135.toByte, 200.toByte, 205.toByte) ++
Array.fill(15)(0.toByte)

val testH2Dialect = new JdbcDialect {
override def canHandle(url: String): Boolean = H2Dialect.canHandle(url)
Expand Down Expand Up @@ -178,6 +180,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
"('amy', '2022-05-19', '2022-05-19 00:00:00')").executeUpdate()
conn.prepareStatement("INSERT INTO \"test\".\"datetime\" VALUES " +
"('alex', '2022-05-18', '2022-05-18 00:00:00')").executeUpdate()

conn.prepareStatement("CREATE TABLE \"test\".\"binary1\" (name TEXT(32),b BINARY(20))")
.executeUpdate()
val stmt = conn.prepareStatement("INSERT INTO \"test\".\"binary1\" VALUES (?, ?)")
stmt.setString(1, "jen")
stmt.setBytes(2, testBytes)
stmt.executeUpdate()
}
H2Dialect.registerFunction("my_avg", IntegralAverage)
H2Dialect.registerFunction("my_strlen", StrLen(CharLength))
Expand Down Expand Up @@ -860,7 +869,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkSortRemoved(df2)
checkPushedInfo(df2,
"PushedFilters: [DEPT IS NOT NULL, DEPT > 1]",
"PushedTopN: ORDER BY [SALARY ASC NULLS FIRST] LIMIT 1")
"PushedTopN: ORDER BY [SALARY ASC NULLS FIRST] LIMIT 1")
checkAnswer(df2, Seq(Row(2, "david", 10000.00)))
}

Expand Down Expand Up @@ -1190,6 +1199,52 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkAnswer(df8, Seq(Row("alex")))
}

test("scan with filter push-down with misc functions") {
val df1 = sql("SELECT name FROM h2.test.binary1 WHERE " +
"md5(b) = '4371fe0aa613bcb081543a37d241adcb'")
checkFiltersRemoved(df1)
val expectedPlanFragment1 = "PushedFilters: [B IS NOT NULL, " +
"MD5(B) = '4371fe0aa613bcb081543a37d241adcb']"
checkPushedInfo(df1, expectedPlanFragment1)
checkAnswer(df1, Seq(Row("jen")))

val df2 = sql("SELECT name FROM h2.test.binary1 WHERE " +
"sha1(b) = 'cf355e86e8666f9300ef12e996acd5c629e0b0a1'")
checkFiltersRemoved(df2)
val expectedPlanFragment2 = "PushedFilters: [B IS NOT NULL, " +
"SHA1(B) = 'cf355e86e8666f9300ef12e996acd5c629e0b0a1'],"
checkPushedInfo(df2, expectedPlanFragment2)
checkAnswer(df2, Seq(Row("jen")))

val df3 = sql("SELECT name FROM h2.test.binary1 WHERE " +
"sha2(b, 256) = '911732d10153f859dec04627df38b19290ec707ff9f83910d061421fdc476109'")
checkFiltersRemoved(df3)
val expectedPlanFragment3 = "PushedFilters: [B IS NOT NULL, (SHA2(B, 256)) = " +
"'911732d10153f859dec04627df38b19290ec707ff9f83910d061421fdc476109']"
checkPushedInfo(df3, expectedPlanFragment3)
checkAnswer(df3, Seq(Row("jen")))

val df4 = sql("SELECT * FROM h2.test.employee WHERE crc32(name) = '142689369'")
checkFiltersRemoved(df4, false)
val expectedPlanFragment4 = "PushedFilters: [NAME IS NOT NULL], "
checkPushedInfo(df4, expectedPlanFragment4)
checkAnswer(df4, Seq(Row(6, "jen", 12000, 1200, true)))

val df5 = sql("SELECT name FROM h2.test.employee WHERE " +
"aes_encrypt(cast(null as string), name) is null")
checkFiltersRemoved(df5, false)
val expectedPlanFragment5 = "PushedFilters: [], "
checkPushedInfo(df5, expectedPlanFragment5)
checkAnswer(df5, Seq(Row("amy"), Row("cathy"), Row("alex"), Row("david"), Row("jen")))

val df6 = sql("SELECT name FROM h2.test.employee WHERE " +
"aes_decrypt(cast(null as binary), name) is null")
checkFiltersRemoved(df6, false)
val expectedPlanFragment6 = "PushedFilters: [], "
checkPushedInfo(df6, expectedPlanFragment6)
checkAnswer(df6, Seq(Row("amy"), Row("cathy"), Row("alex"), Row("david"), Row("jen")))
}

test("scan with filter push-down with UDF") {
JdbcDialects.unregisterDialect(H2Dialect)
try {
Expand Down Expand Up @@ -1269,7 +1324,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
Seq(Row("test", "people", false), Row("test", "empty_table", false),
Row("test", "employee", false), Row("test", "item", false), Row("test", "dept", false),
Row("test", "person", false), Row("test", "view1", false), Row("test", "view2", false),
Row("test", "datetime", false)))
Row("test", "datetime", false), Row("test", "binary1", false)))
}

test("SQL API: create table as select") {
Expand Down Expand Up @@ -1819,12 +1874,12 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkFiltersRemoved(df)
checkAggregateRemoved(df)
checkPushedInfo(df,
"""
|PushedAggregates: [VAR_POP(BONUS), VAR_POP(DISTINCT BONUS),
|VAR_SAMP(BONUS), VAR_SAMP(DISTINCT BONUS)],
|PushedFilters: [DEPT IS NOT NULL, DEPT > 0],
|PushedGroupByExpressions: [DEPT],
|""".stripMargin.replaceAll("\n", " "))
"""
|PushedAggregates: [VAR_POP(BONUS), VAR_POP(DISTINCT BONUS),
|VAR_SAMP(BONUS), VAR_SAMP(DISTINCT BONUS)],
|PushedFilters: [DEPT IS NOT NULL, DEPT > 0],
|PushedGroupByExpressions: [DEPT],
|""".stripMargin.replaceAll("\n", " "))
checkAnswer(df, Seq(Row(10000d, 10000d, 20000d, 20000d),
Row(2500d, 2500d, 5000d, 5000d), Row(0d, 0d, null, null)))
}
Expand Down