diff --git a/README.md b/README.md index d9401a6d..483033ee 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,62 @@ select to order and positionally filter columns of a DataFrame ```scala SchemaUtils.alignSchema(dataFrameToBeAligned, modelSchema) - ``` + ``` + +### ColumnImplicits + +_ColumnImplicits_ provide implicit methods for transforming Spark Columns + +1. Transforms the column into a booleaan column, checking if values are negative or positive infinity + + ```scala + column.isInfinite() + ``` +2. Returns column with requested substring. It shifts the substring indexation to be in accordance with Scala/ Java. + The provided starting position where to start the substring from, if negative it will be counted from end + + ```scala + column.zeroBasedSubstr(startPos) + ``` +3. Returns column with requested substring. It shifts the substring indexation to be in accordance with Scala/ Java. + If the provided starting position where to start the substring from is negative, it will be counted from end. + The length of the desired substring, if longer then the rest of the string, all the remaining characters are taken. + + + ```scala + column.zeroBasedSubstr(startPos, length) + ``` + +### StructFieldImplicits + +_StructFieldImplicits_ provides implicit methods for working with StructField objects. +Of them, metadata methods are: + +1. Gets the metadata Option[String] value given a key + + ```scala + structField.metadata.getOptString(key) + ``` + +2. Gets the metadata Char value given a key if the value is a single character String, it returns the char, + otherwise None + + ```scala + structField.metadata.getOptChar(key) + ``` + +3. Gets the metadata boolean value of a given key, given that it can be transformed into boolean + + ```scala + structField.metadata.getStringAsBoolean(key) + ``` + +4. Checks the structfield if it has the provided key, returns a boolean + + ```scala + structField.metadata.hasKey(key) + ``` + # Spark Version Guard A class which checks if the Spark job version is compatible with the Spark Versions supported by the library diff --git a/src/main/scala/za/co/absa/spark/commons/implicits/ColumnImplicits.scala b/src/main/scala/za/co/absa/spark/commons/implicits/ColumnImplicits.scala new file mode 100644 index 00000000..00fce653 --- /dev/null +++ b/src/main/scala/za/co/absa/spark/commons/implicits/ColumnImplicits.scala @@ -0,0 +1,65 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.spark.commons.implicits + +import org.apache.spark.sql.Column +import org.apache.spark.sql.functions._ + +object ColumnImplicits { + implicit class ColumnEnhancements(column: Column) { + def isInfinite: Column = { + column.isin(Double.PositiveInfinity, Double.NegativeInfinity) + } + + /** + * Spark strings are based on 1 unlike scala. The function shifts the substring indexation to be in accordance with + * Scala/ Java. + * Another enhancement is, that the function allows a negative index, denoting counting of the index from back + * This version takes the substring from the startPos until the end. + * + * @param startPos the index (zero based) where to start the substring from, if negative it's counted from end + * @return column with requested substring + */ + def zeroBasedSubstr(startPos: Int): Column = { + if (startPos >= 0) { + zeroBasedSubstr(startPos, Int.MaxValue - startPos) + } else { + zeroBasedSubstr(startPos, -startPos) + } + } + + /** + * Spark strings are base on 1 unlike scala. The function shifts the substring indexation to be in accordance with + * Scala/ Java. + * Another enhancement is, that the function allows a negative index, denoting counting of the index from back + * This version takes the substring from the startPos and takes up to the given number of characters (less. + * + * @param startPos the index (zero based) where to start the substring from, if negative it's counted from end + * @param len length of the desired substring, if longer then the rest of the string, all the remaining characters are taken + * @return column with requested substring + */ + def zeroBasedSubstr(startPos: Int, len: Int): Column = { + if (startPos >= 0) { + column.substr(startPos + 1, len) + } else { + val startPosColumn = greatest(length(column) + startPos + 1, lit(1)) + val lenColumn = lit(len) + when(length(column) + startPos <= 0, length(column) + startPos).otherwise(0) + column.substr(startPosColumn, lenColumn) + } + } + } +} diff --git a/src/main/scala/za/co/absa/spark/commons/implicits/StructFieldImplicits.scala b/src/main/scala/za/co/absa/spark/commons/implicits/StructFieldImplicits.scala new file mode 100644 index 00000000..fdcb601e --- /dev/null +++ b/src/main/scala/za/co/absa/spark/commons/implicits/StructFieldImplicits.scala @@ -0,0 +1,47 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.spark.commons.implicits + +import org.apache.spark.sql.types._ +import scala.util.Try + +object StructFieldImplicits { + implicit class StructFieldMetadataEnhancement(val metadata: Metadata) { + def getOptString(key: String): Option[String] = { + Try(metadata.getString(key)).toOption + } + + def getOptChar(key: String): Option[Char] = { + val resultString = Try(metadata.getString(key)).toOption + resultString.flatMap { s => + if (s != null && s.length == 1) { + Option(s(0)) + } else { + None + } + } + } + + def getOptStringAsBoolean(key: String): Option[Boolean] = { + Try(metadata.getString(key).toBoolean).toOption + } + + def hasKey(key: String): Boolean = { + metadata.contains(key) + } + } +} diff --git a/src/test/scala/za/co/absa/spark/commons/implicits/ColumnImplicitsTest.scala b/src/test/scala/za/co/absa/spark/commons/implicits/ColumnImplicitsTest.scala new file mode 100644 index 00000000..20d1e62b --- /dev/null +++ b/src/test/scala/za/co/absa/spark/commons/implicits/ColumnImplicitsTest.scala @@ -0,0 +1,45 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.spark.commons.implicits + +import org.apache.spark.sql.Column +import org.apache.spark.sql.functions.lit +import org.scalatest.funsuite.AnyFunSuite +import za.co.absa.spark.commons.implicits.ColumnImplicits.ColumnEnhancements + +class ColumnImplicitsTest extends AnyFunSuite{ + + private val column: Column = lit("abcdefgh") + + test("zeroBasedSubstr with startPos") { + assertResult("cdefgh")(column.zeroBasedSubstr(2).expr.eval().toString) + assertResult("gh")(column.zeroBasedSubstr(-2).expr.eval().toString) + assertResult("")(column.zeroBasedSubstr(Int.MaxValue).expr.eval().toString) + assertResult("abcdefgh")(column.zeroBasedSubstr(Int.MinValue).expr.eval().toString) + } + + test("zeroBasedSubstr with startPos and len") { + assertResult("cde")(column.zeroBasedSubstr(2, 3).expr.eval().toString) + assertResult("gh")(column.zeroBasedSubstr(-2, 7).expr.eval().toString) + assertResult("")(column.zeroBasedSubstr(Int.MaxValue, 1).expr.eval().toString) + assertResult("")(column.zeroBasedSubstr(Int.MaxValue, -3).expr.eval().toString) + assertResult("")(column.zeroBasedSubstr(4, -3).expr.eval().toString) + assertResult("")(column.zeroBasedSubstr(Int.MinValue,2).expr.eval().toString) + assertResult("")(column.zeroBasedSubstr(Int.MinValue,-3).expr.eval().toString) + } + +} diff --git a/src/test/scala/za/co/absa/spark/commons/implicits/StructFieldImplicitsTest.scala b/src/test/scala/za/co/absa/spark/commons/implicits/StructFieldImplicitsTest.scala new file mode 100644 index 00000000..055afa9c --- /dev/null +++ b/src/test/scala/za/co/absa/spark/commons/implicits/StructFieldImplicitsTest.scala @@ -0,0 +1,62 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.spark.commons.implicits + +import org.apache.spark.sql.types.{Metadata, StringType, StructField} +import org.scalatest.funsuite.AnyFunSuite +import za.co.absa.spark.commons.implicits.StructFieldImplicits.StructFieldMetadataEnhancement + +class StructFieldImplicitsTest extends AnyFunSuite { + + def fieldWith(value123: String) = { + val value1 = s"""{ "a" : ${value123} }""" + StructField("uu", StringType, true, Metadata.fromJson(value1)) + } + + test("getOptString") { + assertResult(Some(""))(fieldWith("\"\"").metadata.getOptString("a")) + assertResult(None)(fieldWith("123").metadata.getOptString("a")) + assertResult(Some("ffbfg"))(fieldWith("\"ffbfg\"").metadata.getOptString("a")) + assertResult(Some(null))(fieldWith("null").metadata.getOptString("a")) + } + + test("getOptChar") { + assertResult(None)(fieldWith("\"\"").metadata.getOptChar("a")) + assertResult(None)(fieldWith("123").metadata.getOptChar("a")) + assertResult(Some('g'))(fieldWith("\"g\"").metadata.getOptChar("a")) + assertResult(None)(fieldWith("\"abc\"").metadata.getOptChar("a")) + assertResult(None)(fieldWith("null").metadata.getOptChar("a")) + } + + test("getOptStringAsBoolean") { + assertResult(None)(fieldWith("\"\"").metadata.getOptStringAsBoolean("a")) + assertResult(None)(fieldWith("123").metadata.getOptStringAsBoolean("a")) + assertResult(Some(true))(fieldWith("\"true\"").metadata.getOptStringAsBoolean("a")) + assertResult(Some(false))(fieldWith("\"false\"").metadata.getOptStringAsBoolean("a")) + assertResult(None)(fieldWith("false").metadata.getOptStringAsBoolean("a")) + assertResult(None)(fieldWith("true").metadata.getOptStringAsBoolean("a")) + assertResult(None)(fieldWith("null").metadata.getOptStringAsBoolean("a")) + } + + test("hasKey") { + assertResult(true)(fieldWith("\"\"").metadata.hasKey("a")) + assertResult(false)(fieldWith("123").metadata.hasKey("b")) + assertResult(true)(fieldWith("\"hvh\"").metadata.hasKey("a")) + assertResult(true)(fieldWith("null").metadata.hasKey("a")) + } + +}