Skip to content

Commit da979ee

Browse files
authored
Bugfix/48 no storer write (#51)
* #48 no storer write fix - hadoopfs default storer is used - hdfs test enabled for build, while s3 ignored - readme update - implicit saving test adding 1 (loader non-"", storer = "") - explicit saving test adding (loader non-"", storer = defined)
1 parent af7ff2c commit da979ee

File tree

14 files changed

+187
-29
lines changed

14 files changed

+187
-29
lines changed

README.md

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ object ExampleSparkJob {
168168
import spark.implicits._
169169

170170
// implicit FS is needed for enableControlMeasuresTracking, setCheckpoint calls, e.g. standard HDFS here:
171-
implicit val localHdfs = FileSystem.get(new Configuration)
171+
implicit val localHdfs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
172172

173173
// Initializing library to hook up to Apache Spark
174174
spark.enableControlMeasuresTracking(sourceInfoFile = "data/input/_INFO")
@@ -196,8 +196,28 @@ in 'data/input/_INFO'. Two checkpoints are created. Any business logic can be in
196196
and saving it to Parquet format.
197197

198198
### Storing Measurements in AWS S3
199-
Starting with version 3.0.0, persistence support for AWS S3 has been added.
200-
AWS S3 can be both used for loading the measurement data from as well as saving the measurements back to.
199+
200+
#### AWS S3 via Hadoop FS API
201+
Since version 3.1.0, persistence support for AWS S3 via Hadoop FS API is available. The usage is the same as with
202+
regular HDFS with the exception of providing a different file system, e.g.:
203+
```scala
204+
import java.net.URI
205+
import org.apache.hadoop.fs.FileSystem
206+
import org.apache.spark.sql.SparkSession
207+
208+
val spark = SparkSession
209+
.builder()
210+
.appName("Example Spark Job")
211+
.getOrCreate()
212+
213+
val s3Uri = new URI("s3://my-awesome-bucket")
214+
implicit val fs = FileSystem.get(s3Uri, spark.sparkContext.hadoopConfiguration)
215+
216+
```
217+
The rest of the usage is the same in the example listed above.
218+
219+
#### AWS S3 via AWS SDK for S3
220+
Starting with version 3.0.0, there is also persistence support for AWS S3 via AWS SDK S3.
201221

202222
The following example demonstrates the setup:
203223
```scala
@@ -238,7 +258,7 @@ object S3Example {
238258
}
239259

240260
```
241-
The rest of the processing logic and programatic approach to the library remains unchanged.
261+
The rest of the processing logic and programmatic approach to the library remains unchanged.
242262

243263

244264
## Atum library routines

atum/src/main/scala/za/co/absa/atum/core/SparkQueryExecutionListener.scala

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,15 @@ package za.co.absa.atum.core
1717

1818
import java.io.{PrintWriter, StringWriter}
1919

20-
import org.apache.hadoop.fs.FileSystem
20+
import org.apache.hadoop.conf.Configuration
21+
import org.apache.hadoop.fs.Path
2122
import org.apache.spark.sql.execution.QueryExecution
2223
import org.apache.spark.sql.util.QueryExecutionListener
2324
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
2425
import software.amazon.awssdk.regions.Region
25-
import za.co.absa.atum.persistence.{HadoopFsControlMeasuresStorer, S3ControlMeasuresStorer, S3KmsSettings}
26+
import za.co.absa.atum.persistence.{S3ControlMeasuresStorer, S3KmsSettings}
2627
import za.co.absa.atum.utils.ExecutionPlanUtils._
27-
import za.co.absa.atum.utils.S3Utils
28+
import za.co.absa.atum.utils.{InfoFile, S3Utils}
2829

2930
/**
3031
* The class is responsible for listening to DataSet save events and outputting corresponding control measurements.
@@ -39,12 +40,9 @@ class SparkQueryExecutionListener(cf: ControlFrameworkState) extends QueryExecut
3940
Atum.log.debug(s"SparkQueryExecutionListener.onSuccess for S3ControlMeasuresStorer: writing to ${s3storer.outputLocation.s3String}")
4041
writeInfoFileForQueryForSdkS3(qe, s3storer.outputLocation.region, s3storer.kmsSettings)(s3storer.credentialsProvider)
4142

42-
case Some(hadoopStorer: HadoopFsControlMeasuresStorer) =>
43-
Atum.log.debug(s"SparkQueryExecutionListener.onSuccess: writing to Hadoop FS")
44-
writeInfoFileForQuery(qe)(hadoopStorer.outputFs)
45-
4643
case _ =>
47-
Atum.log.info("No usable storer is set, therefore no data will be written the automatically with DF-save to an _INFO file.")
44+
Atum.log.debug(s"SparkQueryExecutionListener.onSuccess: writing to Hadoop FS")
45+
writeInfoFileForQuery(qe)
4846
}
4947

5048
// Notify listeners
@@ -64,14 +62,21 @@ class SparkQueryExecutionListener(cf: ControlFrameworkState) extends QueryExecut
6462
}
6563

6664
/** Write _INFO file with control measurements to the output directory based on the query plan */
67-
private def writeInfoFileForQuery(qe: QueryExecution)(implicit outputFs: FileSystem): Unit = {
68-
val infoFilePath = inferOutputInfoFileName(qe, cf.outputInfoFileName)
65+
private[core] def writeInfoFileForQuery(qe: QueryExecution)(): Unit = {
66+
val infoFileDir: Option[String] = inferOutputInfoFileDir(qe)
67+
68+
implicit val hadoopConf: Configuration = qe.sparkSession.sparkContext.hadoopConfiguration
69+
val fsWithDir = infoFileDir
70+
.map(InfoFile)
71+
.flatMap(_.toOptFsPath) // path + FS based on HDFS or S3 over hadoopFS
6972

7073
// Write _INFO file to the output directory
71-
infoFilePath.foreach(path => {
74+
fsWithDir.foreach { case (fs, dir) => {
75+
val path = new Path(dir, cf.outputInfoFileName)
76+
7277
Atum.log.info(s"Inferred _INFO Path = ${path.toUri.toString}")
73-
cf.storeCurrentInfoFile(path)
74-
})
78+
cf.storeCurrentInfoFile(path)(fs)
79+
}}
7580

7681
// Write _INFO file to a registered storer
7782
if (cf.accumulator.isStorerLoaded) {

atum/src/main/scala/za/co/absa/atum/utils/ExecutionPlanUtils.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,22 @@ object ExecutionPlanUtils {
9797
* @return The inferred output control measurements file path of the source dataset
9898
*/
9999
def inferOutputInfoFileName(qe: QueryExecution, infoFileName: String = Constants.DefaultInfoFileName): Option[Path] = {
100+
inferOutputInfoFileDir(qe).map { dir =>
101+
new Path(dir, infoFileName)
102+
}
103+
}
104+
105+
/**
106+
* Based on the `qe` supplied, output _INFO file path is inference is attempted
107+
* @param qe QueryExecution - path inference basis
108+
* @return optional inferred _INFO file path
109+
*/
110+
def inferOutputInfoFileDir(qe: QueryExecution): Option[String] = {
100111
qe.analyzed match {
101112
case s: SaveIntoDataSourceCommand =>
102-
Some(new Path(s.options("path"), infoFileName))
113+
Some(s.options("path"))
103114
case h: InsertIntoHadoopFsRelationCommand =>
104-
Some(new Path(h.outputPath, infoFileName))
115+
Some(h.outputPath.toString)
105116
case a =>
106117
log.warn(s"Logical plan: ${qe.logical.treeString}")
107118
log.warn(s"Analyzed plan: ${qe.analyzed.treeString}")

atum/src/main/scala/za/co/absa/atum/utils/InfoFile.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ private[atum] case class InfoFile(infoFile: String) {
1212

1313
private val validatedInfoFile: Option[String] = if (infoFile.isEmpty) None else Some(infoFile)
1414

15-
private def toOptFsPath(implicit hadoopConfiguration: Configuration): Option[(FileSystem, Path)] = {
15+
def toOptFsPath(implicit hadoopConfiguration: Configuration): Option[(FileSystem, Path)] = {
1616
validatedInfoFile.map { definedInfoFile =>
1717
definedInfoFile.toS3Location match {
1818

atum/src/test/scala/za/co/absa/atum/persistence/hdfs/ControlMeasuresHdfsStorerJsonSpec.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,17 @@ import org.apache.hadoop.conf.Configuration
44
import org.apache.hadoop.fs.{FileSystem, Path}
55
import org.scalatest.flatspec.AnyFlatSpec
66
import org.scalatest.matchers.should.Matchers
7+
import za.co.absa.atum.model.ControlMeasure
78
import za.co.absa.atum.persistence.TestResources
89
import za.co.absa.atum.utils.{FileUtils, HdfsFileUtils}
910

1011
class ControlMeasuresHdfsStorerJsonSpec extends AnyFlatSpec with Matchers {
1112

1213
val expectedFilePath: String = TestResources.InputInfo.localPath
13-
val inputControlMeasure = TestResources.InputInfo.controlMeasure
14+
val inputControlMeasure: ControlMeasure = TestResources.InputInfo.controlMeasure
1415

15-
val hadoopConfiguration = new Configuration()
16-
implicit val fs = FileSystem.get(hadoopConfiguration)
16+
val hadoopConfiguration: Configuration = new Configuration()
17+
implicit val fs: FileSystem = FileSystem.get(hadoopConfiguration)
1718

1819
"ControlMeasuresHdfsStorerJsonFile" should "store json file to HDFS" in {
1920

build-all.sh

100755100644
File mode changed.

examples/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@
7979
<artifactId>scalatest-maven-plugin</artifactId>
8080
<version>${scalatest.maven.version}</version>
8181
<configuration>
82-
<skipTests>true</skipTests>
82+
<skipTests>false</skipTests>
8383
</configuration>
8484
</plugin>
8585
<!-- Uber jar generation -->

examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ object SampleMeasurements1 {
2929
import spark.implicits._
3030

3131
val hadoopConfiguration = spark.sparkContext.hadoopConfiguration
32-
implicit val fs = FileSystem.get(hadoopConfiguration)
32+
implicit val fs: FileSystem = FileSystem.get(hadoopConfiguration)
3333

3434
// Initializing library to hook up to Apache Spark
3535
spark.enableControlMeasuresTracking(sourceInfoFile = "data/input/wikidata.csv.info")

examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ object SampleMeasurements2 {
3030
import spark.implicits._
3131

3232
val hadoopConfiguration = spark.sparkContext.hadoopConfiguration
33-
implicit val fs = FileSystem.get(hadoopConfiguration)
33+
implicit val fs: FileSystem = FileSystem.get(hadoopConfiguration)
3434

3535
// Initializing library to hook up to Apache Spark
3636
// No need to specify datasetName and datasetVersion as it is stage 2 and it will be determined automatically

examples/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements1.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ object SampleSdkS3Measurements1 {
3232
import spark.implicits._
3333

3434
val hadoopConfiguration = spark.sparkContext.hadoopConfiguration
35-
implicit val fs = FileSystem.get(hadoopConfiguration)
35+
implicit val fs: FileSystem = FileSystem.get(hadoopConfiguration)
3636

3737
// This sample example relies on local credentials profile named "saml" with access to the s3 location defined below
3838
implicit val samlCredentialsProvider = S3Utils.getLocalProfileCredentialsProvider("saml")

0 commit comments

Comments
 (0)