From 42fc77990ccd0118a61227a7aab441c67059fd6e Mon Sep 17 00:00:00 2001 From: Maciej Obuchowski Date: Wed, 16 Apr 2025 15:51:07 +0200 Subject: [PATCH 1/3] spark instrumentation use openlineage trace id if can generate it out of root parent information Signed-off-by: Maciej Obuchowski --- .../spark/AbstractDatadogSparkListener.java | 5 +- .../spark/OpenlineageParentContext.java | 96 ++++++++++++------- 2 files changed, 67 insertions(+), 34 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 511515bc35c..f866460b84f 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -168,7 +168,7 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp } public void setupOpenLineage(DDTraceId traceId) { - log.debug("Setting up OpenLineage configuration"); + log.error("Setting up OpenLineage configuration with trace id {}", traceId); if (openLineageSparkListener != null) { openLineageSparkConf.set("spark.openlineage.transport.type", "composite"); openLineageSparkConf.set("spark.openlineage.transport.continueOnFailure", "true"); @@ -284,6 +284,9 @@ private void captureOpenlineageContextIfPresent( builder.withTag("openlineage_parent_job_namespace", context.getParentJobNamespace()); builder.withTag("openlineage_parent_job_name", context.getParentJobName()); builder.withTag("openlineage_parent_run_id", context.getParentRunId()); + builder.withTag("openlineage_root_parent_job_namespace", context.getRootParentJobNamespace()); + builder.withTag("openlineage_root_parent_job_name", context.getRootParentJobName()); + builder.withTag("openlineage_root_parent_run_id", context.getRootParentRunId()); } @Override diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java index 6a0b28a70c0..a0d58074753 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java @@ -7,10 +7,7 @@ import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; import datadog.trace.bootstrap.instrumentation.api.AgentTraceCollector; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; +import datadog.trace.util.FNV64Hash; import java.util.Collections; import java.util.Map; import java.util.Optional; @@ -32,11 +29,19 @@ public class OpenlineageParentContext implements AgentSpanContext { private final String parentJobNamespace; private final String parentJobName; private final String parentRunId; + private final String rootParentJobNamespace; + private final String rootParentJobName; + private final String rootParentRunId; public static final String OPENLINEAGE_PARENT_JOB_NAMESPACE = "spark.openlineage.parentJobNamespace"; public static final String OPENLINEAGE_PARENT_JOB_NAME = "spark.openlineage.parentJobName"; public static final String OPENLINEAGE_PARENT_RUN_ID = "spark.openlineage.parentRunId"; + public static final String OPENLINEAGE_ROOT_PARENT_JOB_NAMESPACE = + "spark.openlineage.rootParentJobNamespace"; + public static final String OPENLINEAGE_ROOT_PARENT_JOB_NAME = + "spark.openlineage.rootParentJobName"; + public static final String OPENLINEAGE_ROOT_PARENT_RUN_ID = "spark.openlineage.rootParentRunId"; public static Optional from(SparkConf sparkConf) { if (!sparkConf.contains(OPENLINEAGE_PARENT_JOB_NAMESPACE) @@ -53,60 +58,73 @@ public static Optional from(SparkConf sparkConf) { return Optional.empty(); } + if (!sparkConf.contains(OPENLINEAGE_ROOT_PARENT_RUN_ID)) { + log.error("Have parent info, but not root parent info. Can't construct valid trace id."); + return Optional.empty(); + } + + String rootParentJobNamespace = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAMESPACE, ""); + String rootParentJobName = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAME, ""); + String rootParentRunId = sparkConf.get(OPENLINEAGE_ROOT_PARENT_RUN_ID, ""); + return Optional.of( - new OpenlineageParentContext(parentJobNamespace, parentJobName, parentRunId)); + new OpenlineageParentContext( + parentJobNamespace, + parentJobName, + parentRunId, + rootParentJobNamespace, + rootParentJobName, + rootParentRunId)); } - OpenlineageParentContext(String parentJobNamespace, String parentJobName, String parentRunId) { + OpenlineageParentContext( + String parentJobNamespace, + String parentJobName, + String parentRunId, + String rootParentJobNamespace, + String rootParentJobName, + String rootParentRunId) { log.debug( - "Creating OpenlineageParentContext with parentJobNamespace: {}, parentJobName: {}, parentRunId: {}", + "Creating OpenlineageParentContext with parentJobNamespace: {}, parentJobName: {}, parentRunId: {}, rootParentJobNamespace: {}, rootParentJobName: {}, rootParentRunId: {}", parentJobNamespace, parentJobName, - parentRunId); + parentRunId, + rootParentJobNamespace, + rootParentJobName, + rootParentRunId); this.parentJobNamespace = parentJobNamespace; this.parentJobName = parentJobName; this.parentRunId = parentRunId; - MessageDigest digest = null; - try { - digest = MessageDigest.getInstance("SHA-256"); - } catch (NoSuchAlgorithmException e) { - log.debug("Unable to find SHA-256 algorithm", e); - } + this.rootParentJobNamespace = rootParentJobNamespace; + this.rootParentJobName = rootParentJobName; + this.rootParentRunId = rootParentRunId; - if (digest != null && parentJobNamespace != null && parentRunId != null) { - traceId = computeTraceId(digest, parentJobNamespace, parentJobName, parentRunId); + if (this.parentRunId != null) { + traceId = computeTraceId(this.parentRunId); spanId = DDSpanId.ZERO; - childRootSpanId = - computeChildRootSpanId(digest, parentJobNamespace, parentJobName, parentRunId); + if (this.rootParentRunId != null) { + childRootSpanId = computeSpanId(this.rootParentRunId); + } else { + childRootSpanId = DDSpanId.ZERO; + } } else { traceId = DDTraceId.ZERO; spanId = DDSpanId.ZERO; - childRootSpanId = DDSpanId.ZERO; } log.debug("Created OpenlineageParentContext with traceId: {}, spanId: {}", traceId, spanId); } - private long computeChildRootSpanId( - MessageDigest digest, String parentJobNamespace, String parentJobName, String parentRunId) { - byte[] inputBytes = - (parentJobNamespace + parentJobName + parentRunId).getBytes(StandardCharsets.UTF_8); - byte[] hash = digest.digest(inputBytes); - - return ByteBuffer.wrap(hash).getLong(); + private long computeSpanId(String runId) { + return FNV64Hash.generateHash(runId, FNV64Hash.Version.v1A); } - private DDTraceId computeTraceId( - MessageDigest digest, String parentJobNamespace, String parentJobName, String parentRunId) { - byte[] inputBytes = - (parentJobNamespace + parentJobName + parentRunId).getBytes(StandardCharsets.UTF_8); - byte[] hash = digest.digest(inputBytes); - - return DDTraceId.from(ByteBuffer.wrap(hash).getLong()); + private DDTraceId computeTraceId(String runId) { + return DDTraceId.from(FNV64Hash.generateHash(runId, FNV64Hash.Version.v1A)); } @Override @@ -159,4 +177,16 @@ public String getParentJobName() { public String getParentRunId() { return parentRunId; } + + public String getRootParentJobNamespace() { + return rootParentJobNamespace; + } + + public String getRootParentJobName() { + return rootParentJobName; + } + + public String getRootParentRunId() { + return rootParentRunId; + } } From cc7cdde36a3f040d75694e4259c0887b599b2eb5 Mon Sep 17 00:00:00 2001 From: Maciej Obuchowski Date: Wed, 23 Apr 2025 13:38:57 +0200 Subject: [PATCH 2/3] spark: use OpenLineage context to generate trace/span id if present Signed-off-by: Maciej Obuchowski --- .../spark/AbstractDatadogSparkListener.java | 8 +-- .../spark/OpenlineageParentContext.java | 38 +++++----- .../spark/OpenlineageParentContextTest.groovy | 70 ++++++++++++++----- 3 files changed, 72 insertions(+), 44 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index f866460b84f..96bda5e9515 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -273,13 +273,7 @@ private void captureOpenlineageContextIfPresent( AgentTracer.SpanBuilder builder, OpenlineageParentContext context) { builder.asChildOf(context); - builder.withSpanId(context.getChildRootSpanId()); - - log.debug( - "Captured Openlineage context: {}, with child trace_id: {}, child root span id: {}", - context, - context.getTraceId(), - context.getChildRootSpanId()); + log.debug("Captured Openlineage context: {}, with trace_id: {}", context, context.getTraceId()); builder.withTag("openlineage_parent_job_namespace", context.getParentJobNamespace()); builder.withTag("openlineage_parent_job_name", context.getParentJobName()); diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java index a0d58074753..280c6e3f1da 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java @@ -24,7 +24,6 @@ public class OpenlineageParentContext implements AgentSpanContext { private final DDTraceId traceId; private final long spanId; - private final long childRootSpanId; private final String parentJobNamespace; private final String parentJobName; @@ -50,23 +49,29 @@ public static Optional from(SparkConf sparkConf) { return Optional.empty(); } + if (!sparkConf.contains(OPENLINEAGE_ROOT_PARENT_RUN_ID)) { + log.error("Have parent info, but not root parent info. Can't construct valid trace id."); + return Optional.empty(); + } + String parentJobNamespace = sparkConf.get(OPENLINEAGE_PARENT_JOB_NAMESPACE); String parentJobName = sparkConf.get(OPENLINEAGE_PARENT_JOB_NAME); String parentRunId = sparkConf.get(OPENLINEAGE_PARENT_RUN_ID); if (!UUID.matcher(parentRunId).matches()) { + log.error("OpenLineage parent run id is not a valid UUID: {}", parentRunId); return Optional.empty(); } - if (!sparkConf.contains(OPENLINEAGE_ROOT_PARENT_RUN_ID)) { - log.error("Have parent info, but not root parent info. Can't construct valid trace id."); + String rootParentJobNamespace = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAMESPACE); + String rootParentJobName = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAME); + String rootParentRunId = sparkConf.get(OPENLINEAGE_ROOT_PARENT_RUN_ID); + + if (!UUID.matcher(rootParentRunId).matches()) { + log.error("OpenLineage root parent run id is not a valid UUID: {}", parentRunId); return Optional.empty(); } - String rootParentJobNamespace = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAMESPACE, ""); - String rootParentJobName = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAME, ""); - String rootParentRunId = sparkConf.get(OPENLINEAGE_ROOT_PARENT_RUN_ID, ""); - return Optional.of( new OpenlineageParentContext( parentJobNamespace, @@ -101,19 +106,15 @@ public static Optional from(SparkConf sparkConf) { this.rootParentJobName = rootParentJobName; this.rootParentRunId = rootParentRunId; - if (this.parentRunId != null) { + if (this.rootParentRunId != null) { + traceId = computeTraceId(this.rootParentRunId); + spanId = computeSpanId(this.parentRunId); + } else if (this.parentRunId != null) { traceId = computeTraceId(this.parentRunId); - spanId = DDSpanId.ZERO; - - if (this.rootParentRunId != null) { - childRootSpanId = computeSpanId(this.rootParentRunId); - } else { - childRootSpanId = DDSpanId.ZERO; - } + spanId = computeSpanId(this.parentRunId); } else { traceId = DDTraceId.ZERO; spanId = DDSpanId.ZERO; - childRootSpanId = DDSpanId.ZERO; } log.debug("Created OpenlineageParentContext with traceId: {}, spanId: {}", traceId, spanId); @@ -124,6 +125,7 @@ private long computeSpanId(String runId) { } private DDTraceId computeTraceId(String runId) { + log.debug("Generating traceID from runId: {}", runId); return DDTraceId.from(FNV64Hash.generateHash(runId, FNV64Hash.Version.v1A)); } @@ -137,10 +139,6 @@ public long getSpanId() { return spanId; } - public long getChildRootSpanId() { - return childRootSpanId; - } - @Override public AgentTraceCollector getTraceCollector() { return AgentTracer.NoopAgentTraceCollector.INSTANCE; diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/OpenlineageParentContextTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/OpenlineageParentContextTest.groovy index 34ec29b42b4..0287ab06e87 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/OpenlineageParentContextTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/OpenlineageParentContextTest.groovy @@ -1,11 +1,10 @@ package datadog.trace.instrumentation.spark -import datadog.trace.api.DDSpanId import org.apache.spark.SparkConf import spock.lang.Specification class OpenlineageParentContextTest extends Specification { - def "should create none empty OpenLineageParentContext using SHA-256 for TraceID and root span SpanId if all required fields are present" () { + def "should create OpenLineageParentContext with particular trace id based on root parent id" () { given: SparkConf mockSparkConf = Mock(SparkConf) @@ -13,9 +12,11 @@ class OpenlineageParentContextTest extends Specification { mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> true mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> true mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> true + mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> true mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> "default" mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> "dag-push-to-s3-spark.upload_to_s3" mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> parentRunId + mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> rootParentRunId then: Optional parentContext = OpenlineageParentContext.from(mockSparkConf) @@ -23,16 +24,16 @@ class OpenlineageParentContextTest extends Specification { parentContext.get().getParentJobNamespace() == "default" parentContext.get().getParentJobName() == "dag-push-to-s3-spark.upload_to_s3" - parentContext.get().getParentRunId() == expectedParentRunId + parentContext.get().getRootParentRunId() == rootParentRunId + parentContext.get().getParentRunId() == parentRunId - parentContext.get().traceId.toLong() == expectedTraceId - parentContext.get().spanId == DDSpanId.ZERO - parentContext.get().childRootSpanId == expectedRootSpanId + parentContext.get().traceId.toString() == expectedTraceId + parentContext.get().spanId.toString() == expectedSpanId where: - parentRunId | expectedParentRunId | expectedTraceId | expectedRootSpanId - "ad3b6baa-8d88-3b38-8dbe-f06232249a84" | "ad3b6baa-8d88-3b38-8dbe-f06232249a84" | 0xa475569dbce5e6cfL | 0xa475569dbce5e6cfL - "ad3b6baa-8d88-3b38-8dbe-f06232249a85" | "ad3b6baa-8d88-3b38-8dbe-f06232249a85" | 0x31da6680bd14991bL | 0x31da6680bd14991bL + rootParentRunId | parentRunId | expectedTraceId | expectedSpanId + "01964820-5280-7674-b04e-82fbed085f39" | "ad3b6baa-8d88-3b38-8dbe-f06232249a84" | "13959090542865903119" | "2903780135964948649" + "1a1a1a1a-2b2b-3c3c-4d4d-5e5e5e5e5e5e" | "6f6f6f6f-5e5e-4d4d-3c3c-2b2b2b2b2b2b" | "15830118871223350489" | "8020087091656517257" } def "should create empty OpenLineageParentContext if any required field is missing" () { @@ -43,20 +44,24 @@ class OpenlineageParentContextTest extends Specification { mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> jobNamespacePresent mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> jobNamePresent mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> runIdPresent + mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> rootParentIdPresent then: Optional parentContext = OpenlineageParentContext.from(mockSparkConf) parentContext.isPresent() == expected where: - jobNamespacePresent | jobNamePresent | runIdPresent | expected - true | true | false | false - true | false | true | false - false | true | true | false - true | false | false | false - false | true | false | false - false | false | true | false - false | false | false | false + jobNamespacePresent | jobNamePresent | runIdPresent | rootParentIdPresent | expected + true | true | true | false | false + true | true | false | false | false + true | true | true | false | false + true | true | false | true | false + true | false | true | false | false + false | true | true | true | false + true | false | false | false | false + false | true | false | false | false + false | false | true | true | false + false | false | false | false | false } def "should only generate a non-empty OpenlineageParentContext if parentRunId is a valid UUID" () { @@ -67,9 +72,12 @@ class OpenlineageParentContextTest extends Specification { mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> true mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> true mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> true + mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> true mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> "default" mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> "dag-push-to-s3-spark.upload_to_s3" mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> runId + mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> runId + then: Optional parentContext = OpenlineageParentContext.from(mockSparkConf) @@ -83,5 +91,33 @@ class OpenlineageParentContextTest extends Specification { "6afeb6ee-729d-37f7-b8e6f47ca694" | false "6AFEB6EE-729D-37F7-AD73-B8E6F47CA694" | true } + + def "should only generate a non-empty OpenlineageParentContext if rootParentRunId is a valid UUID" () { + given: + SparkConf mockSparkConf = Mock(SparkConf) + + when: + mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> true + mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> true + mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> true + mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> true + mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> "default" + mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> "dag-push-to-s3-spark.upload_to_s3" + mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> "6afeb6ee-729d-37f7-ad73-b8e6f47ca694" + mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> rootParentRunId + + + then: + Optional parentContext = OpenlineageParentContext.from(mockSparkConf) + parentContext.isPresent() == expected + + where: + rootParentRunId | expected + "6afeb6ee-729d-37f7-ad73-b8e6f47ca694" | true + " " | false + "invalid-uuid" | false + "6afeb6ee-729d-37f7-b8e6f47ca694" | false + "6AFEB6EE-729D-37F7-AD73-B8E6F47CA694" | true + } } From dd7895c99a2010b0be749c186e034780ebdcd1a5 Mon Sep 17 00:00:00 2001 From: Maciej Obuchowski Date: Wed, 23 Apr 2025 17:47:52 +0200 Subject: [PATCH 3/3] code review fixes, add version tag Signed-off-by: Maciej Obuchowski --- .../src/test/groovy/SparkListenerTest.groovy | 8 +++++-- .../src/test/groovy/SparkListenerTest.groovy | 8 +++++-- .../spark/AbstractDatadogSparkListener.java | 2 +- .../spark/OpenlineageParentContext.java | 6 +++--- .../spark/AbstractSparkListenerTest.groovy | 21 +++++++++++++++++-- 5 files changed, 35 insertions(+), 10 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/test/groovy/SparkListenerTest.groovy b/dd-java-agent/instrumentation/spark/spark_2.12/src/test/groovy/SparkListenerTest.groovy index e34cd1ab08e..4d91429e4c0 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/test/groovy/SparkListenerTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/test/groovy/SparkListenerTest.groovy @@ -1,12 +1,16 @@ import datadog.trace.instrumentation.spark.AbstractSparkListenerTest import datadog.trace.instrumentation.spark.DatadogSpark212Listener import org.apache.spark.SparkConf -import org.apache.spark.scheduler.SparkListener class SparkListenerTest extends AbstractSparkListenerTest { @Override - protected SparkListener getTestDatadogSparkListener() { + protected DatadogSpark212Listener getTestDatadogSparkListener() { def conf = new SparkConf() return new DatadogSpark212Listener(conf, "some_app_id", "some_version") } + + @Override + protected DatadogSpark212Listener getTestDatadogSparkListener(SparkConf conf) { + return new DatadogSpark212Listener(conf, "some_app_id", "some_version") + } } diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/test/groovy/SparkListenerTest.groovy b/dd-java-agent/instrumentation/spark/spark_2.13/src/test/groovy/SparkListenerTest.groovy index a358f95e4f3..cbfb88b13df 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/test/groovy/SparkListenerTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/test/groovy/SparkListenerTest.groovy @@ -1,12 +1,16 @@ import datadog.trace.instrumentation.spark.AbstractSparkListenerTest import datadog.trace.instrumentation.spark.DatadogSpark213Listener import org.apache.spark.SparkConf -import org.apache.spark.scheduler.SparkListener class SparkListenerTest extends AbstractSparkListenerTest { @Override - protected SparkListener getTestDatadogSparkListener() { + protected DatadogSpark213Listener getTestDatadogSparkListener() { def conf = new SparkConf() return new DatadogSpark213Listener(conf, "some_app_id", "some_version") } + + @Override + protected DatadogSpark213Listener getTestDatadogSparkListener(SparkConf conf) { + return new DatadogSpark213Listener(conf, "some_app_id", "some_version") + } } diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 96bda5e9515..6d17ad16e80 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -168,7 +168,7 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp } public void setupOpenLineage(DDTraceId traceId) { - log.error("Setting up OpenLineage configuration with trace id {}", traceId); + log.debug("Setting up OpenLineage configuration with trace id {}", traceId); if (openLineageSparkListener != null) { openLineageSparkConf.set("spark.openlineage.transport.type", "composite"); openLineageSparkConf.set("spark.openlineage.transport.continueOnFailure", "true"); diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java index 280c6e3f1da..51977d73f6c 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java @@ -50,7 +50,7 @@ public static Optional from(SparkConf sparkConf) { } if (!sparkConf.contains(OPENLINEAGE_ROOT_PARENT_RUN_ID)) { - log.error("Have parent info, but not root parent info. Can't construct valid trace id."); + log.debug("Found parent info, but not root parent info. Can't construct valid trace id."); return Optional.empty(); } @@ -59,7 +59,7 @@ public static Optional from(SparkConf sparkConf) { String parentRunId = sparkConf.get(OPENLINEAGE_PARENT_RUN_ID); if (!UUID.matcher(parentRunId).matches()) { - log.error("OpenLineage parent run id is not a valid UUID: {}", parentRunId); + log.debug("OpenLineage parent run id is not a valid UUID: {}", parentRunId); return Optional.empty(); } @@ -68,7 +68,7 @@ public static Optional from(SparkConf sparkConf) { String rootParentRunId = sparkConf.get(OPENLINEAGE_ROOT_PARENT_RUN_ID); if (!UUID.matcher(rootParentRunId).matches()) { - log.error("OpenLineage root parent run id is not a valid UUID: {}", parentRunId); + log.debug("OpenLineage root parent run id is not a valid UUID: {}", parentRunId); return Optional.empty(); } diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy index 2e910eafe09..47367fbef82 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy @@ -4,10 +4,10 @@ import com.datadoghq.sketch.ddsketch.DDSketchProtoBinding import com.datadoghq.sketch.ddsketch.proto.DDSketch import com.datadoghq.sketch.ddsketch.store.CollapsingLowestDenseStore import datadog.trace.agent.test.AgentTestRunner +import org.apache.spark.SparkConf import org.apache.spark.Success$ import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.JobSucceeded$ -import org.apache.spark.scheduler.SparkListener import org.apache.spark.scheduler.SparkListenerApplicationEnd import org.apache.spark.scheduler.SparkListenerApplicationStart import org.apache.spark.scheduler.SparkListenerExecutorAdded @@ -30,7 +30,8 @@ import scala.collection.JavaConverters abstract class AbstractSparkListenerTest extends AgentTestRunner { - protected abstract SparkListener getTestDatadogSparkListener() + protected abstract AbstractDatadogSparkListener getTestDatadogSparkListener() + protected abstract AbstractDatadogSparkListener getTestDatadogSparkListener(SparkConf conf) protected applicationStartEvent(time=0L) { // Constructor of SparkListenerApplicationStart changed starting spark 3.0 @@ -463,6 +464,22 @@ abstract class AbstractSparkListenerTest extends AgentTestRunner { } } + def "sets up OpenLineage trace id properly"() { + setup: + def conf = new SparkConf() + conf.set("spark.openlineage.parentRunId", "ad3b6baa-8d88-3b38-8dbe-f06232249a84") + conf.set("spark.openlineage.parentJobNamespace", "default") + conf.set("spark.openlineage.parentJobName", "dag-push-to-s3-spark.upload_to_s3") + conf.set("spark.openlineage.rootParentRunId", "01964820-5280-7674-b04e-82fbed085f39") + conf.set("spark.openlineage.rootParentJobNamespace", "default") + conf.set("spark.openlineage.rootParentJobName", "dag-push-to-s3-spark") + def listener = getTestDatadogSparkListener(conf) + + expect: + listener.onApplicationStart(applicationStartEvent(1000L)) + assert listener.openLineageSparkConf.get("spark.openlineage.run.tags").contains("13959090542865903119") + } + def "test lastJobFailed is not set when job is cancelled"() { setup: def listener = getTestDatadogSparkListener()