Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
9d43738
#19 ColumnImplicits and StructFieldImplicits
AdrianOlosutean Jan 11, 2022
b88af07
#21 functions from enceladus SchemaUtils put into SchemaUtils and Str…
AdrianOlosutean Jan 13, 2022
330c1fa
#19 tests and small code fix
AdrianOlosutean Jan 13, 2022
475949e
Merge branch 'feature/19-add-columnimplicits' into feature/21-encelad…
AdrianOlosutean Jan 13, 2022
56bfd97
#22 fixes
AdrianOlosutean Jan 13, 2022
32021cf
#22 headers
AdrianOlosutean Jan 13, 2022
25f1983
Merge remote-tracking branch 'origin/master' into feature/19-add-colu…
AdrianOlosutean Jan 13, 2022
1a5b004
#19 feedback
AdrianOlosutean Jan 14, 2022
2110f30
#19 renames
AdrianOlosutean Jan 19, 2022
f62b5c1
Merge remote-tracking branch 'origin/master' into feature/21-enceladu…
AdrianOlosutean Jan 19, 2022
5f2746a
#22 refactoring
AdrianOlosutean Jan 20, 2022
e58d3aa
#19 other feedback
AdrianOlosutean Jan 20, 2022
ade77e2
#22 refactoring
AdrianOlosutean Jan 21, 2022
2517a20
#22 feedback
AdrianOlosutean Jan 24, 2022
d9533d2
Merge remote-tracking branch 'origin/master' into feature/19-add-colu…
AdrianOlosutean Jan 25, 2022
4248a6f
Merge branch 'feature/19-add-columnimplicits' into feature/21-encelad…
AdrianOlosutean Jan 25, 2022
8bde069
#22 merge
AdrianOlosutean Jan 25, 2022
c359974
#22 docs + import fixes
AdrianOlosutean Jan 26, 2022
965908d
#22 bugfix
AdrianOlosutean Jan 28, 2022
3230c90
#22 some feedback
AdrianOlosutean Feb 3, 2022
09f3d20
#22 multiple changes to implicit classes
AdrianOlosutean Feb 7, 2022
e77fe1d
Merge remote-tracking branch 'origin/master' into feature/21-enceladu…
AdrianOlosutean Feb 8, 2022
11807ae
#22 merging
AdrianOlosutean Feb 8, 2022
2e7c01f
Merge branch 'master' into feature/21-enceladus-schema-utils
AdrianOlosutean Feb 8, 2022
fdcaabc
Merge remote-tracking branch 'origin/master' into feature/21-enceladu…
AdrianOlosutean Feb 10, 2022
825ada4
#22 isOfType proper implemenetation
AdrianOlosutean Feb 10, 2022
8a141d8
Merge remote-tracking branch 'origin/feature/21-enceladus-schema-util…
AdrianOlosutean Feb 10, 2022
1fa973c
#22 fix doc
AdrianOlosutean Feb 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 173 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ val myListener = new MyQueryExecutionListener with NonFatalQueryExecutionListene
spark.listenerManager.register(myListener)
```

### Spark Schema Utils
### Schema Utils

>
>**Note:**
Expand All @@ -55,31 +55,30 @@ spark.listenerManager.register(myListener)
>|Spark| 2.4 | 3.1 | 3.2 |
>|Json4s| 3.5 | 3.7 | 3.7 |
>|Jackson| 2.6 | 2.10 | 2.12 |
_Spark Schema Utils_ provides methods for working with schemas, its comparison and alignment.
_Schema Utils_ provides methods for working with schemas, its comparison and alignment.

1. Schema comparison returning true/false. Ignores the order of columns
1. Returns the parent path of a field. Returns an empty string if a root level field name is provided.

```scala
SchemaUtils.equivalentSchemas(schema1, schema2)
SchemaUtils.getParentPath(columnName)
```

2. Schema comparison returning difference. Ignores the order of columns
2. Get paths for all array subfields of this given datatype

```scala
SchemaUtils.diff(schema1, schema2)
SchemaUtils.getAllArraySubPaths(other)
```

3. Schema selector generator which provides a List of columns to be used in a
select to order and positionally filter columns of a DataFrame
3. For a given list of field paths determines if any path pair is a subset of one another.

```scala
SchemaUtils.getDataFrameSelector(schema)
SchemaUtils.isCommonSubPath(paths)
```

4. Dataframe alignment method using the `getDataFrameSelector` method.
4. Append a new attribute to path or empty string.

```scala
SchemaUtils.alignSchema(dataFrameToBeAligned, modelSchema)
SchemaUtils.appendPath(path, fieldName)
```

### ColumnImplicits
Expand All @@ -97,18 +96,19 @@ _ColumnImplicits_ provide implicit methods for transforming Spark Columns
```scala
column.zeroBasedSubstr(startPos)
```

3. Returns column with requested substring. It shifts the substring indexation to be in accordance with Scala/ Java.
If the provided starting position where to start the substring from is negative, it will be counted from end.
The length of the desired substring, if longer then the rest of the string, all the remaining characters are taken.


```scala
column.zeroBasedSubstr(startPos, length)
```

### StructFieldImplicits

_StructFieldImplicits_ provides implicit methods for working with StructField objects.

Of them, metadata methods are:

1. Gets the metadata Option[String] value given a key
Expand All @@ -135,6 +135,156 @@ Of them, metadata methods are:
```scala
structField.metadata.hasKey(key)
```

### ArrayTypeImplicits

_ArrayTypeImplicits_ provides implicit methods for working with ArrayType objects.


1. Checks if the arraytype is equivalent to another

```scala
arrayType.isEquivalentArrayType(otherArrayType)
```

2. For an array of arrays, get the final element type at the bottom of the array

```scala
arrayType.getDeepestArrayType()
```

### DataTypeImplicits

_DataTypeImplicits_ provides implicit methods for working with DataType objects.


1. Checks if the datatype is equivalent to another

```scala
dataType.isEquivalentDataType(otherDt)
```

2. Checks if a casting between types always succeeds

```scala
dataType.doesCastAlwaysSucceed(otherDt)
```
3. Checks if type is primitive

```scala
dataType.isPrimitive()
```

### StructTypeImplicits

_StructTypeImplicits_ provides implicit methods for working with StructType objects.


1. Get a field from a text path

```scala
structType.getField(path)
```
Comment on lines +183 to +187
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably the source where it made sense.

2. Get a type of a field from a text path

```scala
structType.getFieldType(path)
```
3. Checks if the specified path is an array of structs

```scala
structType.isColumnArrayOfStruct(path)
```

4. Get nullability of a field from a text path

```scala
structType.getFieldNullability(path)
```

5. Checks if a field specified by a path exists

```scala
structType.fieldExists(path)
```

6. Get paths for all array fields in the schema

```scala
structType.getAllArrayPaths()
```

7. Get a closest unique column name

```scala
structType.getClosestUniqueName(desiredName)
```

8. Checks if a field is the only field in a struct

```scala
structType.isOnlyField(columnName)
```
9. Checks if 2 structtypes are equivalent

```scala
structType.isEquivalent(other)
```

10. Returns a list of differences in one utils to the other

```scala
structType.diffSchema(otherSchema, parent)
```

11. Checks if a field is of the specified type

```scala
structType.isOfType[ArrayType](path)
```
12. Checks if a field is a subset of the specified type

```scala
structType.isSubset(other)
```

13. Returns data selector that can be used to align utils of a data frame.

```scala
structType.getDataFrameSelector()
```

###StructTypeArrayImplicits

1. Get first array column's path out of complete path

```scala
structType.getFirstArrayPath(path)
```

2. Get all array columns' paths out of complete path.

```scala
structType.getAllArraysInPath(path)
```

3. For a given list of field paths determines the deepest common array path

```scala
structType.getDeepestCommonArrayPath(fieldPaths)
```

4. For a field path determines the deepest array path

```scala
structType.getDeepestArrayPath(path)
```

5. Checks if a field is an array that is not nested in another array

```scala
structType.isNonNestedArray(path)
```

# Spark Version Guard

Expand Down Expand Up @@ -176,4 +326,15 @@ _DataFrameImplicits_ provides methods for transformations on Dataframes

```scala
df.withColumnIfDoesNotExist((df: DataFrame, _) => df)(colName, colExpression)
```

3. Aligns the utils of a DataFrame to the selector for operations
where utils order might be important (e.g. hashing the whole rows and using except)

```scala
df.alignSchema(structType)
```

```scala
df.alignSchema(listColumns)
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spark.commons.implicits

import org.apache.spark.sql.types.{ArrayType, DataType, StructType}
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements

import scala.annotation.tailrec

object ArrayTypeImplicits {

implicit class ArrayTypeEnhancements(arrayType: ArrayType) {

/**
* Compares 2 array fields of a dataframe utils.
*
* @param other The second array to compare
* @return true if provided arrays are the same ignoring nullability
*/
@scala.annotation.tailrec
final def isEquivalentArrayType(other: ArrayType): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seem very similar to DataTypeEnhancements(dt: DataType).isEquivalentDataType(). Couldn't one of these methods use the other instead of containing the same logic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's fix only obvious errors now. We can improve and add items in next release - a minor.
Adrian rightly pointed out, endless improvements prevent release, and therefor usage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine by me, makes sense.

arrayType.elementType match {
case arrayType1: ArrayType =>
other.elementType match {
case arrayType2: ArrayType => arrayType1.isEquivalentArrayType(arrayType2)
case _ => false
}
case structType1: StructType =>
other.elementType match {
case structType2: StructType => structType1.isEquivalent(structType2)
case _ => false
}
case _ => arrayType.elementType == other.elementType
}
}


/**
* Finds all differences of two ArrayTypes and returns their paths
*
* @param array2 The second array to compare
* @param parent Parent path. This is used for the accumulation of differences and their print out
* @return Returns a Seq of found difference paths in scheme in the Array
*/
@scala.annotation.tailrec
private[implicits] final def diffArray(array2: ArrayType, parent: String): Seq[String] = {
arrayType.elementType match {
case _ if arrayType.elementType.typeName != array2.elementType.typeName =>
Seq(s"$parent data type doesn't match (${arrayType.elementType.typeName}) vs (${array2.elementType.typeName})")
case arrayType1: ArrayType =>
arrayType1.diffArray(array2.elementType.asInstanceOf[ArrayType], s"$parent")
case structType1: StructType =>
structType1.diffSchema(array2.elementType.asInstanceOf[StructType], s"$parent")
case _ => Seq.empty[String]
}
}

/**
* For an array of arrays of arrays, ... get the final element type at the bottom of the array
*
* @return A non-array data type at the bottom of array nesting
*/
final def getDeepestArrayType(): Unit = {
@tailrec
def getDeepestArrayTypeHelper(arrayType: ArrayType): DataType = {
arrayType.elementType match {
case a: ArrayType => getDeepestArrayTypeHelper(a)
case b => b
}
}
getDeepestArrayTypeHelper(arrayType)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package za.co.absa.spark.commons.implicits

import java.io.ByteArrayOutputStream

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements

object DataFrameImplicits {

Expand Down Expand Up @@ -73,6 +75,24 @@ object DataFrameImplicits {
df.withColumn(colName, colExpr)
}
}

/**
* Using utils selector returned from [[StructTypeEnhancements.getDataFrameSelector]] aligns the utils of a DataFrame to the selector
* for operations where utils order might be important (e.g. hashing the whole rows and using except)
*
* @param selector model structType for the alignment of df
* @return Returns aligned and filtered utils
*/
def alignSchema(selector: List[Column]): DataFrame = df.select(selector: _*)

/**
* Using utils selector from [[getDataFrameSelector]] aligns the utils of a DataFrame to the selector for operations
* where utils order might be important (e.g. hashing the whole rows and using except)
*
* @param structType model structType for the alignment of df
* @return Returns aligned and filtered utils
*/
def alignSchema(structType: StructType): DataFrame = alignSchema(structType.getDataFrameSelector())
}

}
Loading