From 3ffafb841b8cb526aeb00cfbb949cea6000dadcd Mon Sep 17 00:00:00 2001 From: Aman Omer Date: Thu, 14 Nov 2019 11:30:33 +0530 Subject: [PATCH 1/3] Initial commit --- .../spark/sql/catalyst/expressions/Cast.scala | 4 +- .../expressions/postgreSQL/CastToDouble.scala | 43 +++++++++++++++++++ 2 files changed, 45 insertions(+), 2 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/postgreSQL/CastToDouble.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index f3b58fa3137b1..af72999db065d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -646,7 +646,7 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit } // DoubleConverter - private[this] def castToDouble(from: DataType): Any => Any = from match { + protected[this] def castToDouble(from: DataType): Any => Any = from match { case StringType => buildCast[UTF8String](_, s => { val doubleStr = s.toString @@ -1482,7 +1482,7 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit } } - private[this] def castToDoubleCode(from: DataType, ctx: CodegenContext): CastFunction = { + protected[this] def castToDoubleCode(from: DataType, ctx: CodegenContext): CastFunction = { from match { case StringType => val doubleStr = ctx.freshVariable("doubleStr", StringType) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/postgreSQL/CastToDouble.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/postgreSQL/CastToDouble.scala new file mode 100644 index 0000000000000..f7ba529f98eda --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/postgreSQL/CastToDouble.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions.postgreSQL + +import org.apache.spark.sql.catalyst.expressions.{CastBase, Expression, TimeZoneAwareExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.types.{DataType, DoubleType} + +case class CastToDouble(child: Expression, timeZoneId: Option[String]) extends CastBase{ + override def dataType: DataType = DoubleType + + override protected def ansiEnabled: Boolean = + throw new UnsupportedOperationException("PostgreSQL dialect doesn't support ansi mode") + + /** Returns a copy of this expression with the specified timeZoneId. */ + override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = + copy(timeZoneId = Option(timeZoneId)) + + override def castToDouble(from: DataType): Any => Any = super.castToDouble(from) + + override def castToDoubleCode(from: DataType, ctx: CodegenContext): CastFunction = + super.castToDoubleCode(from, ctx) + + override def nullable: Boolean = child.nullable + + override def toString: String = s"PostgreCastToDouble($child as ${dataType.simpleString})" + + override def sql: String = s"CAST(${child.sql} AS ${dataType.sql})" +} From 6ad51c7460fc6ad4df72643b42e499647216e10f Mon Sep 17 00:00:00 2001 From: Aman Omer Date: Thu, 14 Nov 2019 19:07:09 +0530 Subject: [PATCH 2/3] Added PostgreCastToDouble --- pom.xml | 2 +- .../catalyst/analysis/PostgreSQLDialect.scala | 38 +++++---- .../expressions/postgreSQL/CastToDouble.scala | 43 ---------- .../postgreSQL/PostgreCastToDouble.scala | 79 +++++++++++++++++++ 4 files changed, 104 insertions(+), 58 deletions(-) delete mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/postgreSQL/CastToDouble.scala create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/postgreSQL/PostgreCastToDouble.scala diff --git a/pom.xml b/pom.xml index 5110285547ab3..34163aecd5f12 100644 --- a/pom.xml +++ b/pom.xml @@ -118,7 +118,7 @@ 1.8 ${java.version} ${java.version} - 3.6.2 + 3.6.0 spark 1.7.16 1.2.17 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PostgreSQLDialect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PostgreSQLDialect.scala index e7f0e571804d3..49afbe0c4976a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PostgreSQLDialect.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PostgreSQLDialect.scala @@ -19,30 +19,40 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.Cast -import org.apache.spark.sql.catalyst.expressions.postgreSQL.PostgreCastToBoolean +import org.apache.spark.sql.catalyst.expressions.postgreSQL.{PostgreCastToBoolean, PostgreCastToDouble} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{BooleanType, StringType} +import org.apache.spark.sql.types.{BooleanType, DoubleType, StringType} object PostgreSQLDialect { + // The SQL configuration `spark.sql.dialect` can be changed in runtime. + // To make sure the configuration is effective, we have to check it during rule execution. val postgreSQLDialectRules: List[Rule[LogicalPlan]] = - CastToBoolean :: + if (SQLConf.get.usePostgreSQLDialect) { + CastToBoolean :: + CastToDouble :: + Nil + } else { Nil + } object CastToBoolean extends Rule[LogicalPlan] with Logging { override def apply(plan: LogicalPlan): LogicalPlan = { - // The SQL configuration `spark.sql.dialect` can be changed in runtime. - // To make sure the configuration is effective, we have to check it during rule execution. - val conf = SQLConf.get - if (conf.usePostgreSQLDialect) { - plan.transformExpressions { - case Cast(child, dataType, timeZoneId) - if child.dataType != BooleanType && dataType == BooleanType => - PostgreCastToBoolean(child, timeZoneId) - } - } else { - plan + plan.transformExpressions { + case Cast(child, dataType, timeZoneId) + if child.dataType != BooleanType && dataType == BooleanType => + PostgreCastToBoolean(child, timeZoneId) + } + } + } + + object CastToDouble extends Rule[LogicalPlan] with Logging { + override def apply(plan: LogicalPlan): LogicalPlan = { + plan.transformExpressions { + case Cast(child, dataType, timeZoneId) + if child.dataType != DoubleType && dataType == DoubleType => + PostgreCastToDouble(child, timeZoneId) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/postgreSQL/CastToDouble.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/postgreSQL/CastToDouble.scala deleted file mode 100644 index f7ba529f98eda..0000000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/postgreSQL/CastToDouble.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.catalyst.expressions.postgreSQL - -import org.apache.spark.sql.catalyst.expressions.{CastBase, Expression, TimeZoneAwareExpression} -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext -import org.apache.spark.sql.types.{DataType, DoubleType} - -case class CastToDouble(child: Expression, timeZoneId: Option[String]) extends CastBase{ - override def dataType: DataType = DoubleType - - override protected def ansiEnabled: Boolean = - throw new UnsupportedOperationException("PostgreSQL dialect doesn't support ansi mode") - - /** Returns a copy of this expression with the specified timeZoneId. */ - override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = - copy(timeZoneId = Option(timeZoneId)) - - override def castToDouble(from: DataType): Any => Any = super.castToDouble(from) - - override def castToDoubleCode(from: DataType, ctx: CodegenContext): CastFunction = - super.castToDoubleCode(from, ctx) - - override def nullable: Boolean = child.nullable - - override def toString: String = s"PostgreCastToDouble($child as ${dataType.simpleString})" - - override def sql: String = s"CAST(${child.sql} AS ${dataType.sql})" -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/postgreSQL/PostgreCastToDouble.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/postgreSQL/PostgreCastToDouble.scala new file mode 100644 index 0000000000000..c207b47a005aa --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/postgreSQL/PostgreCastToDouble.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions.postgreSQL + +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.expressions.{CastBase, Expression, TimeZoneAwareExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.types.{BooleanType, DataType, DateType, DoubleType, NumericType, StringType, TimestampType} +import org.apache.spark.unsafe.types.UTF8String + +case class PostgreCastToDouble(child: Expression, timeZoneId: Option[String]) + extends CastBase { + + override protected def ansiEnabled = + throw new UnsupportedOperationException("PostgreSQL dialect doesn't support ansi mode") + + override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = + copy(timeZoneId = Option(timeZoneId)) + + override def checkInputDataTypes(): TypeCheckResult = child.dataType match { + case TimestampType | BooleanType | DateType => + TypeCheckResult.TypeCheckFailure(s"cannot cast type ${child.dataType} to double") + case _ => + TypeCheckResult.TypeCheckSuccess + } + + override def castToDouble(from: DataType): Any => Any = from match { + case StringType => + buildCast[UTF8String](_, s => { + val doubleStr = s.toString + try doubleStr.toDouble catch { + case _: NumberFormatException => + throw new IllegalArgumentException(s"invalid input syntax for type double: $doubleStr") + } + }) + case NumericType() => + super.castToDouble(from) + } + + override def castToDoubleCode(from: DataType, ctx: CodegenContext): CastFunction = from match { + case StringType => + val doubleStr = ctx.freshVariable("doubleStr", StringType) + (c, evPrim, _) => + code""" + final String $doubleStr = $c.toString(); + try { + $evPrim = Double.valueOf($doubleStr); + } catch (java.lang.NumberFormatException e) { + throw new IllegalArgumentException("invalid input syntax for type double: $c + } + """ + case NumericType() => + super.castToDoubleCode(from, ctx) + } + + override def dataType: DataType = DoubleType + + override def nullable: Boolean = child.nullable + + override def toString: String = s"PostgreCastToDouble($child as ${dataType.simpleString})" + + override def sql: String = s"CAST(${child.sql} AS ${dataType.sql})" +} + From 0e46418e15af58171a5302f506789ae1bb348422 Mon Sep 17 00:00:00 2001 From: Aman Omer Date: Thu, 14 Nov 2019 19:13:59 +0530 Subject: [PATCH 3/3] pom.xml changes reverted --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 34163aecd5f12..5110285547ab3 100644 --- a/pom.xml +++ b/pom.xml @@ -118,7 +118,7 @@ 1.8 ${java.version} ${java.version} - 3.6.0 + 3.6.2 spark 1.7.16 1.2.17