Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,62 @@ select to order and positionally filter columns of a DataFrame

```scala
SchemaUtils.alignSchema(dataFrameToBeAligned, modelSchema)
```
```

### ColumnImplicits

_ColumnImplicits_ provide implicit methods for transforming Spark Columns

1. Transforms the column into a booleaan column, checking if values are negative or positive infinity

```scala
column.isInfinite()
```
2. Returns column with requested substring. It shifts the substring indexation to be in accordance with Scala/ Java.
The provided starting position where to start the substring from, if negative it will be counted from end

```scala
column.zeroBasedSubstr(startPos)
```
3. Returns column with requested substring. It shifts the substring indexation to be in accordance with Scala/ Java.
If the provided starting position where to start the substring from is negative, it will be counted from end.
The length of the desired substring, if longer then the rest of the string, all the remaining characters are taken.


```scala
column.zeroBasedSubstr(startPos, length)
```

### StructFieldImplicits

_StructFieldImplicits_ provides implicit methods for working with StructField objects.
Of them, metadata methods are:

1. Gets the metadata Option[String] value given a key

```scala
structField.metadata.getOptString(key)
```

2. Gets the metadata Char value given a key if the value is a single character String, it returns the char,
otherwise None

```scala
structField.metadata.getOptChar(key)
```

3. Gets the metadata boolean value of a given key, given that it can be transformed into boolean

```scala
structField.metadata.getStringAsBoolean(key)
```

4. Checks the structfield if it has the provided key, returns a boolean

```scala
structField.metadata.hasKey(key)
```

# Spark Version Guard

A class which checks if the Spark job version is compatible with the Spark Versions supported by the library
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spark.commons.implicits

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._

object ColumnImplicits {
implicit class ColumnEnhancements(column: Column) {
def isInfinite: Column = {
column.isin(Double.PositiveInfinity, Double.NegativeInfinity)
}

/**
* Spark strings are based on 1 unlike scala. The function shifts the substring indexation to be in accordance with
* Scala/ Java.
* Another enhancement is, that the function allows a negative index, denoting counting of the index from back
* This version takes the substring from the startPos until the end.
*
* @param startPos the index (zero based) where to start the substring from, if negative it's counted from end
* @return column with requested substring
*/
def zeroBasedSubstr(startPos: Int): Column = {
if (startPos >= 0) {
zeroBasedSubstr(startPos, Int.MaxValue - startPos)
} else {
zeroBasedSubstr(startPos, -startPos)
}
}

/**
* Spark strings are base on 1 unlike scala. The function shifts the substring indexation to be in accordance with
* Scala/ Java.
* Another enhancement is, that the function allows a negative index, denoting counting of the index from back
* This version takes the substring from the startPos and takes up to the given number of characters (less.
*
* @param startPos the index (zero based) where to start the substring from, if negative it's counted from end
* @param len length of the desired substring, if longer then the rest of the string, all the remaining characters are taken
* @return column with requested substring
*/
def zeroBasedSubstr(startPos: Int, len: Int): Column = {
if (startPos >= 0) {
column.substr(startPos + 1, len)
} else {
val startPosColumn = greatest(length(column) + startPos + 1, lit(1))
val lenColumn = lit(len) + when(length(column) + startPos <= 0, length(column) + startPos).otherwise(0)
column.substr(startPosColumn, lenColumn)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spark.commons.implicits

import org.apache.spark.sql.types._
import scala.util.Try

object StructFieldImplicits {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a StructFieldImplicits now it is MetadataImplicits

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed this one #21 (comment)

So 1 object 2 implicit classes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, since Metadata is related to StructType

implicit class StructFieldMetadataEnhancement(val metadata: Metadata) {
def getOptString(key: String): Option[String] = {
Try(metadata.getString(key)).toOption
}

def getOptChar(key: String): Option[Char] = {
val resultString = Try(metadata.getString(key)).toOption
resultString.flatMap { s =>
if (s != null && s.length == 1) {
Option(s(0))
} else {
None
}
}
}

def getOptStringAsBoolean(key: String): Option[Boolean] = {
Try(metadata.getString(key).toBoolean).toOption
}

def hasKey(key: String): Boolean = {
metadata.contains(key)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spark.commons.implicits

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.lit
import org.scalatest.funsuite.AnyFunSuite
import za.co.absa.spark.commons.implicits.ColumnImplicits.ColumnEnhancements

class ColumnImplicitsTest extends AnyFunSuite{

private val column: Column = lit("abcdefgh")

test("zeroBasedSubstr with startPos") {
assertResult("cdefgh")(column.zeroBasedSubstr(2).expr.eval().toString)
assertResult("gh")(column.zeroBasedSubstr(-2).expr.eval().toString)
assertResult("")(column.zeroBasedSubstr(Int.MaxValue).expr.eval().toString)
assertResult("abcdefgh")(column.zeroBasedSubstr(Int.MinValue).expr.eval().toString)
}

test("zeroBasedSubstr with startPos and len") {
assertResult("cde")(column.zeroBasedSubstr(2, 3).expr.eval().toString)
assertResult("gh")(column.zeroBasedSubstr(-2, 7).expr.eval().toString)
assertResult("")(column.zeroBasedSubstr(Int.MaxValue, 1).expr.eval().toString)
assertResult("")(column.zeroBasedSubstr(Int.MaxValue, -3).expr.eval().toString)
assertResult("")(column.zeroBasedSubstr(4, -3).expr.eval().toString)
assertResult("")(column.zeroBasedSubstr(Int.MinValue,2).expr.eval().toString)
assertResult("")(column.zeroBasedSubstr(Int.MinValue,-3).expr.eval().toString)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spark.commons.implicits

import org.apache.spark.sql.types.{Metadata, StringType, StructField}
import org.scalatest.funsuite.AnyFunSuite
import za.co.absa.spark.commons.implicits.StructFieldImplicits.StructFieldMetadataEnhancement

class StructFieldImplicitsTest extends AnyFunSuite {

def fieldWith(value123: String) = {
val value1 = s"""{ "a" : ${value123} }"""
StructField("uu", StringType, true, Metadata.fromJson(value1))
}

test("getOptString") {
assertResult(Some(""))(fieldWith("\"\"").metadata.getOptString("a"))
assertResult(None)(fieldWith("123").metadata.getOptString("a"))
assertResult(Some("ffbfg"))(fieldWith("\"ffbfg\"").metadata.getOptString("a"))
assertResult(Some(null))(fieldWith("null").metadata.getOptString("a"))
}

test("getOptChar") {
assertResult(None)(fieldWith("\"\"").metadata.getOptChar("a"))
assertResult(None)(fieldWith("123").metadata.getOptChar("a"))
assertResult(Some('g'))(fieldWith("\"g\"").metadata.getOptChar("a"))
assertResult(None)(fieldWith("\"abc\"").metadata.getOptChar("a"))
assertResult(None)(fieldWith("null").metadata.getOptChar("a"))
}

test("getOptStringAsBoolean") {
assertResult(None)(fieldWith("\"\"").metadata.getOptStringAsBoolean("a"))
assertResult(None)(fieldWith("123").metadata.getOptStringAsBoolean("a"))
assertResult(Some(true))(fieldWith("\"true\"").metadata.getOptStringAsBoolean("a"))
assertResult(Some(false))(fieldWith("\"false\"").metadata.getOptStringAsBoolean("a"))
assertResult(None)(fieldWith("false").metadata.getOptStringAsBoolean("a"))
assertResult(None)(fieldWith("true").metadata.getOptStringAsBoolean("a"))
assertResult(None)(fieldWith("null").metadata.getOptStringAsBoolean("a"))
}

test("hasKey") {
assertResult(true)(fieldWith("\"\"").metadata.hasKey("a"))
assertResult(false)(fieldWith("123").metadata.hasKey("b"))
assertResult(true)(fieldWith("\"hvh\"").metadata.hasKey("a"))
assertResult(true)(fieldWith("null").metadata.hasKey("a"))
}

}