Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ object ExampleSparkJob {
import spark.implicits._

// implicit FS is needed for enableControlMeasuresTracking, setCheckpoint calls, e.g. standard HDFS here:
implicit val localHdfs = FileSystem.get(new Configuration)
implicit val localHdfs = FileSystem.get(spark.sparkContext.hadoopConfiguration)

// Initializing library to hook up to Apache Spark
spark.enableControlMeasuresTracking(sourceInfoFile = "data/input/_INFO")
Expand Down Expand Up @@ -196,8 +196,28 @@ in 'data/input/_INFO'. Two checkpoints are created. Any business logic can be in
and saving it to Parquet format.

### Storing Measurements in AWS S3
Starting with version 3.0.0, persistence support for AWS S3 has been added.
AWS S3 can be both used for loading the measurement data from as well as saving the measurements back to.

#### AWS S3 via Hadoop FS API
Since version 3.1.0, persistence support for AWS S3 via Hadoop FS API is available. The usage is the same as with
regular HDFS with the exception of providing a different file system, e.g.:
```scala
import java.net.URI
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SparkSession

val spark = SparkSession
.builder()
.appName("Example Spark Job")
.getOrCreate()

val s3Uri = new URI("s3://my-awesome-bucket")
implicit val fs = FileSystem.get(s3Uri, spark.sparkContext.hadoopConfiguration)

```
The rest of the usage is the same in the example listed above.

#### AWS S3 via AWS SDK for S3
Starting with version 3.0.0, there is also persistence support for AWS S3 via AWS SDK S3.

The following example demonstrates the setup:
```scala
Expand Down Expand Up @@ -238,7 +258,7 @@ object S3Example {
}

```
The rest of the processing logic and programatic approach to the library remains unchanged.
The rest of the processing logic and programmatic approach to the library remains unchanged.


## Atum library routines
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ package za.co.absa.atum.core

import java.io.{PrintWriter, StringWriter}

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.QueryExecutionListener
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.regions.Region
import za.co.absa.atum.persistence.{HadoopFsControlMeasuresStorer, S3ControlMeasuresStorer, S3KmsSettings}
import za.co.absa.atum.persistence.{S3ControlMeasuresStorer, S3KmsSettings}
import za.co.absa.atum.utils.ExecutionPlanUtils._
import za.co.absa.atum.utils.S3Utils
import za.co.absa.atum.utils.{InfoFile, S3Utils}

/**
* The class is responsible for listening to DataSet save events and outputting corresponding control measurements.
Expand All @@ -39,12 +40,9 @@ class SparkQueryExecutionListener(cf: ControlFrameworkState) extends QueryExecut
Atum.log.debug(s"SparkQueryExecutionListener.onSuccess for S3ControlMeasuresStorer: writing to ${s3storer.outputLocation.s3String}")
writeInfoFileForQueryForSdkS3(qe, s3storer.outputLocation.region, s3storer.kmsSettings)(s3storer.credentialsProvider)

case Some(hadoopStorer: HadoopFsControlMeasuresStorer) =>
Atum.log.debug(s"SparkQueryExecutionListener.onSuccess: writing to Hadoop FS")
writeInfoFileForQuery(qe)(hadoopStorer.outputFs)

case _ =>
Atum.log.info("No usable storer is set, therefore no data will be written the automatically with DF-save to an _INFO file.")
Atum.log.debug(s"SparkQueryExecutionListener.onSuccess: writing to Hadoop FS")
writeInfoFileForQuery(qe)
Copy link
Collaborator Author

@dk1844 dk1844 Dec 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing the info file write here was the main cause.

}

// Notify listeners
Expand All @@ -64,14 +62,21 @@ class SparkQueryExecutionListener(cf: ControlFrameworkState) extends QueryExecut
}

/** Write _INFO file with control measurements to the output directory based on the query plan */
private def writeInfoFileForQuery(qe: QueryExecution)(implicit outputFs: FileSystem): Unit = {
val infoFilePath = inferOutputInfoFileName(qe, cf.outputInfoFileName)
private[core] def writeInfoFileForQuery(qe: QueryExecution)(): Unit = {
val infoFileDir: Option[String] = inferOutputInfoFileDir(qe)

implicit val hadoopConf: Configuration = qe.sparkSession.sparkContext.hadoopConfiguration
val fsWithDir = infoFileDir
.map(InfoFile)
.flatMap(_.toOptFsPath) // path + FS based on HDFS or S3 over hadoopFS

// Write _INFO file to the output directory
infoFilePath.foreach(path => {
fsWithDir.foreach { case (fs, dir) => {
val path = new Path(dir, cf.outputInfoFileName)

Atum.log.info(s"Inferred _INFO Path = ${path.toUri.toString}")
cf.storeCurrentInfoFile(path)
})
cf.storeCurrentInfoFile(path)(fs)
}}

// Write _INFO file to a registered storer
if (cf.accumulator.isStorerLoaded) {
Expand Down
15 changes: 13 additions & 2 deletions atum/src/main/scala/za/co/absa/atum/utils/ExecutionPlanUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,22 @@ object ExecutionPlanUtils {
* @return The inferred output control measurements file path of the source dataset
*/
def inferOutputInfoFileName(qe: QueryExecution, infoFileName: String = Constants.DefaultInfoFileName): Option[Path] = {
inferOutputInfoFileDir(qe).map { dir =>
new Path(dir, infoFileName)
}
}

/**
* Based on the `qe` supplied, output _INFO file path is inference is attempted
* @param qe QueryExecution - path inference basis
* @return optional inferred _INFO file path
*/
def inferOutputInfoFileDir(qe: QueryExecution): Option[String] = {
qe.analyzed match {
case s: SaveIntoDataSourceCommand =>
Some(new Path(s.options("path"), infoFileName))
Some(s.options("path"))
case h: InsertIntoHadoopFsRelationCommand =>
Some(new Path(h.outputPath, infoFileName))
Some(h.outputPath.toString)
case a =>
log.warn(s"Logical plan: ${qe.logical.treeString}")
log.warn(s"Analyzed plan: ${qe.analyzed.treeString}")
Expand Down
2 changes: 1 addition & 1 deletion atum/src/main/scala/za/co/absa/atum/utils/InfoFile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ private[atum] case class InfoFile(infoFile: String) {

private val validatedInfoFile: Option[String] = if (infoFile.isEmpty) None else Some(infoFile)

private def toOptFsPath(implicit hadoopConfiguration: Configuration): Option[(FileSystem, Path)] = {
def toOptFsPath(implicit hadoopConfiguration: Configuration): Option[(FileSystem, Path)] = {
validatedInfoFile.map { definedInfoFile =>
definedInfoFile.toS3Location match {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.model.ControlMeasure
import za.co.absa.atum.persistence.TestResources
import za.co.absa.atum.utils.{FileUtils, HdfsFileUtils}

class ControlMeasuresHdfsStorerJsonSpec extends AnyFlatSpec with Matchers {

val expectedFilePath: String = TestResources.InputInfo.localPath
val inputControlMeasure = TestResources.InputInfo.controlMeasure
val inputControlMeasure: ControlMeasure = TestResources.InputInfo.controlMeasure

val hadoopConfiguration = new Configuration()
implicit val fs = FileSystem.get(hadoopConfiguration)
val hadoopConfiguration: Configuration = new Configuration()
implicit val fs: FileSystem = FileSystem.get(hadoopConfiguration)

"ControlMeasuresHdfsStorerJsonFile" should "store json file to HDFS" in {

Expand Down
Empty file modified build-all.sh
100755 → 100644
Empty file.
2 changes: 1 addition & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
<artifactId>scalatest-maven-plugin</artifactId>
<version>${scalatest.maven.version}</version>
<configuration>
<skipTests>true</skipTests>
<skipTests>false</skipTests>
</configuration>
</plugin>
<!-- Uber jar generation -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object SampleMeasurements1 {
import spark.implicits._

val hadoopConfiguration = spark.sparkContext.hadoopConfiguration
implicit val fs = FileSystem.get(hadoopConfiguration)
implicit val fs: FileSystem = FileSystem.get(hadoopConfiguration)

// Initializing library to hook up to Apache Spark
spark.enableControlMeasuresTracking(sourceInfoFile = "data/input/wikidata.csv.info")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object SampleMeasurements2 {
import spark.implicits._

val hadoopConfiguration = spark.sparkContext.hadoopConfiguration
implicit val fs = FileSystem.get(hadoopConfiguration)
implicit val fs: FileSystem = FileSystem.get(hadoopConfiguration)

// Initializing library to hook up to Apache Spark
// No need to specify datasetName and datasetVersion as it is stage 2 and it will be determined automatically
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object SampleSdkS3Measurements1 {
import spark.implicits._

val hadoopConfiguration = spark.sparkContext.hadoopConfiguration
implicit val fs = FileSystem.get(hadoopConfiguration)
implicit val fs: FileSystem = FileSystem.get(hadoopConfiguration)

// This sample example relies on local credentials profile named "saml" with access to the s3 location defined below
implicit val samlCredentialsProvider = S3Utils.getLocalProfileCredentialsProvider("saml")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object SampleSdkS3Measurements2 {
import spark.implicits._

val hadoopConfiguration = spark.sparkContext.hadoopConfiguration
implicit val fs = FileSystem.get(hadoopConfiguration)
implicit val fs: FileSystem = FileSystem.get(hadoopConfiguration)

// This sample example relies on local credentials profile named "saml" with access to the s3 location defined below
// AND by having explicitly defined KMS Key ID
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package za.co.absa.atum

import org.apache.hadoop.fs.FileSystem
import org.apache.log4j.LogManager
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.model.{Checkpoint, Measurement}
import za.co.absa.atum.persistence.ControlMeasuresParser
import za.co.absa.atum.utils.SparkTestBase

class HdfsInfoIntegrationSuite extends AnyFlatSpec with SparkTestBase with Matchers with BeforeAndAfterAll {

private val log = LogManager.getLogger(this.getClass)
val tempDir: String = LocalFsTestUtils.createLocalTemporaryDirectory("hdfsTestOutput")

override def afterAll: Unit = {
LocalFsTestUtils.safeDeleteTestDir(tempDir)
}

private val inputCsv = "data/input/wikidata.csv"
private def readSparkInputCsv(inputCsvPath: String): DataFrame = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(inputCsvPath)

private def writeSparkData(df: DataFrame, outputPath: String): Unit =
df.write.mode(SaveMode.Overwrite)
.parquet(outputPath)

{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I get how this is styled. Shouldn't there be some keyword here?

Copy link
Collaborator Author

@dk1844 dk1844 Jan 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what what?
The {} block is used to limit the visibility of val outputPath, as a logical constraint. Or are you discussing the formatting of the block?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, interesting, haven't thought about it that way. I have never seen a standalone block like this in scala. So it was weird to me.

val outputPath = s"$tempDir/outputCheck1"
// implicit variant only writes to derived outputPath, explicit writes to both implicit derived path and the explicit one, too.
Seq(
("implicit output _INFO path only", "", Seq(s"$outputPath/_INFO")),
("implicit & explicit output _INFO path", s"$outputPath/extra/_INFO2", Seq(s"$outputPath/_INFO", s"$outputPath/extra/_INFO2"))
).foreach { case (testCaseName, destinationInfoFilePath, expectedPaths) =>

"_INFO" should s"be written on spark.write ($testCaseName)" in {
import spark.implicits._
import za.co.absa.atum.AtumImplicits._

val hadoopConfiguration = spark.sparkContext.hadoopConfiguration
implicit val fs: FileSystem = FileSystem.get(hadoopConfiguration)

// Initializing library to hook up to Apache Spark
spark.enableControlMeasuresTracking(sourceInfoFile = "data/input/wikidata.csv.info", destinationInfoFile = destinationInfoFilePath)
.setControlMeasuresWorkflow("Job 1")

val df1 = readSparkInputCsv(inputCsv)
df1.setCheckpoint("Checkpoint0")
val filteredDf1 = df1.filter($"total_response_size" > 1000)
filteredDf1.setCheckpoint("Checkpoint1") // stateful, do not need return value
writeSparkData(filteredDf1, outputPath) // implicit output _INFO file path is derived from this path passed to spark.write

spark.disableControlMeasuresTracking()

expectedPaths.foreach { expectedPath =>
log.info(s"Checking $expectedPath to contain expected values")

val infoContentJson = LocalFsTestUtils.readFileAsString(expectedPath)
val infoControlMeasures = ControlMeasuresParser.fromJson(infoContentJson)

infoControlMeasures.checkpoints.map(_.name) shouldBe Seq("Source", "Raw", "Checkpoint0", "Checkpoint1")
val checkpoint0 = infoControlMeasures.checkpoints.collectFirst { case c: Checkpoint if c.name == "Checkpoint0" => c }.get
checkpoint0.controls should contain(Measurement("recordCount", "count", "*", "5000"))

val checkpoint1 = infoControlMeasures.checkpoints.collectFirst { case c: Checkpoint if c.name == "Checkpoint1" => c }.get
checkpoint1.controls should contain(Measurement("recordCount", "count", "*", "4964"))
}
}
}
}

}
43 changes: 43 additions & 0 deletions examples/src/test/scala/za/co/absa/atum/LocalFsTestUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package za.co.absa.atum

import java.io.File
import java.nio.file.Files

import org.apache.commons.io.FileUtils
import org.apache.log4j.LogManager

import scala.io.Source
import scala.util.control.NonFatal

object LocalFsTestUtils {
private val log = LogManager.getLogger(this.getClass)

/**
* Creates a temporary directory in the local filesystem.
*
* @param prefix A prefix to use for the temporary directory.
* @return A path to a temporary directory.
*/
def createLocalTemporaryDirectory(prefix: String): String = {
val tmpPath = Files.createTempDirectory(prefix)
tmpPath.toAbsolutePath.toString
}

def safeDeleteTestDir(path: String): Unit = {
try {
FileUtils.deleteDirectory(new File(path))
} catch {
case NonFatal(_) => log.warn(s"Unable to delete a test directory $path")
}
}

def readFileAsString(filename: String, lineSeparator: String = "\n"): String = {
val sourceFile = Source.fromFile(filename)
try {
sourceFile.getLines().mkString(lineSeparator)
} finally {
sourceFile.close()
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@

package za.co.absa.atum.examples

import org.scalatest.Ignore
import org.scalatest.funsuite.AnyFunSuite
import za.co.absa.atum.utils._

class SampleMeasurementsS3RunnerSpec extends AnyFunSuite
@Ignore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because unlike the hadoop-fs tests, these tests should not be run against actual S3. Thus, they now:

  1. serve as an example
  2. can be run manually, provided certain conditions are met (files exist on S3 inside a specified bucket, KEY ID is supplied, local saml profile is supplied)

class SampleMeasurementsS3RunnerExampleSpec extends AnyFunSuite
with SparkJobRunnerMethods
with SparkLocalMaster {

Expand Down