Skip to content

Commit bc9b259

Browse files
panbingkunMaxGekk
authored andcommitted
[SPARK-50066][SQL] Codegen Support for SchemaOfXml (by Invoke & RuntimeReplaceable)
### What changes were proposed in this pull request? The pr aims to add `Codegen` Support for `schema_of_xml`. ### Why are the changes needed? - improve codegen coverage. - simplified code. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA & Existed UT (eg: XmlFunctionsSuite#`*schema_of_xml*`) ### Was this patch authored or co-authored using generative AI tooling? No. Closes #48594 from panbingkun/SPARK-50066. Authored-by: panbingkun <[email protected]> Signed-off-by: Max Gekk <[email protected]>
1 parent bd94419 commit bc9b259

File tree

2 files changed

+58
-18
lines changed

2 files changed

+58
-18
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.expressions.xml
19+
20+
import org.apache.spark.sql.catalyst.xml.XmlInferSchema
21+
import org.apache.spark.sql.internal.SQLConf
22+
import org.apache.spark.sql.types.{ArrayType, DataType, StructType}
23+
import org.apache.spark.unsafe.types.UTF8String
24+
25+
object XmlExpressionEvalUtils {
26+
27+
def schemaOfXml(xmlInferSchema: XmlInferSchema, xml: UTF8String): UTF8String = {
28+
val dataType = xmlInferSchema.infer(xml.toString).get match {
29+
case st: StructType =>
30+
xmlInferSchema.canonicalizeType(st).getOrElse(StructType(Nil))
31+
case at: ArrayType if at.elementType.isInstanceOf[StructType] =>
32+
xmlInferSchema
33+
.canonicalizeType(at.elementType)
34+
.map(ArrayType(_, containsNull = at.containsNull))
35+
.getOrElse(ArrayType(StructType(Nil), containsNull = at.containsNull))
36+
case other: DataType =>
37+
xmlInferSchema.canonicalizeType(other).getOrElse(SQLConf.get.defaultStringType)
38+
}
39+
40+
UTF8String.fromString(dataType.sql)
41+
}
42+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ import java.io.CharArrayWriter
2121
import org.apache.spark.sql.catalyst.InternalRow
2222
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
2323
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess}
24-
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, ExprCode}
24+
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
25+
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
26+
import org.apache.spark.sql.catalyst.expressions.xml.XmlExpressionEvalUtils
2527
import org.apache.spark.sql.catalyst.util.{DropMalformedMode, FailFastMode, FailureSafeParser, PermissiveMode}
2628
import org.apache.spark.sql.catalyst.util.TypeUtils._
2729
import org.apache.spark.sql.catalyst.xml.{StaxXmlGenerator, StaxXmlParser, ValidatorUtil, XmlInferSchema, XmlOptions}
@@ -149,7 +151,9 @@ case class XmlToStructs(
149151
case class SchemaOfXml(
150152
child: Expression,
151153
options: Map[String, String])
152-
extends UnaryExpression with CodegenFallback with QueryErrorsBase {
154+
extends UnaryExpression
155+
with RuntimeReplaceable
156+
with QueryErrorsBase {
153157

154158
def this(child: Expression) = this(child, Map.empty[String, String])
155159

@@ -192,26 +196,20 @@ case class SchemaOfXml(
192196
}
193197
}
194198

195-
override def eval(v: InternalRow): Any = {
196-
val dataType = xmlInferSchema.infer(xml.toString).get match {
197-
case st: StructType =>
198-
xmlInferSchema.canonicalizeType(st).getOrElse(StructType(Nil))
199-
case at: ArrayType if at.elementType.isInstanceOf[StructType] =>
200-
xmlInferSchema
201-
.canonicalizeType(at.elementType)
202-
.map(ArrayType(_, containsNull = at.containsNull))
203-
.getOrElse(ArrayType(StructType(Nil), containsNull = at.containsNull))
204-
case other: DataType =>
205-
xmlInferSchema.canonicalizeType(other).getOrElse(SQLConf.get.defaultStringType)
206-
}
207-
208-
UTF8String.fromString(dataType.sql)
209-
}
210-
211199
override def prettyName: String = "schema_of_xml"
212200

213201
override protected def withNewChildInternal(newChild: Expression): SchemaOfXml =
214202
copy(child = newChild)
203+
204+
@transient private lazy val xmlInferSchemaObjectType = ObjectType(classOf[XmlInferSchema])
205+
206+
override def replacement: Expression = StaticInvoke(
207+
XmlExpressionEvalUtils.getClass,
208+
dataType,
209+
"schemaOfXml",
210+
Seq(Literal(xmlInferSchema, xmlInferSchemaObjectType), child),
211+
Seq(xmlInferSchemaObjectType, child.dataType)
212+
)
215213
}
216214

217215
/**

0 commit comments

Comments
 (0)