diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/test/groovy/SparkListenerTest.groovy b/dd-java-agent/instrumentation/spark/spark_2.12/src/test/groovy/SparkListenerTest.groovy index e34cd1ab08e..4d91429e4c0 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/test/groovy/SparkListenerTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/test/groovy/SparkListenerTest.groovy @@ -1,12 +1,16 @@ import datadog.trace.instrumentation.spark.AbstractSparkListenerTest import datadog.trace.instrumentation.spark.DatadogSpark212Listener import org.apache.spark.SparkConf -import org.apache.spark.scheduler.SparkListener class SparkListenerTest extends AbstractSparkListenerTest { @Override - protected SparkListener getTestDatadogSparkListener() { + protected DatadogSpark212Listener getTestDatadogSparkListener() { def conf = new SparkConf() return new DatadogSpark212Listener(conf, "some_app_id", "some_version") } + + @Override + protected DatadogSpark212Listener getTestDatadogSparkListener(SparkConf conf) { + return new DatadogSpark212Listener(conf, "some_app_id", "some_version") + } } diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/test/groovy/SparkListenerTest.groovy b/dd-java-agent/instrumentation/spark/spark_2.13/src/test/groovy/SparkListenerTest.groovy index a358f95e4f3..cbfb88b13df 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/test/groovy/SparkListenerTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/test/groovy/SparkListenerTest.groovy @@ -1,12 +1,16 @@ import datadog.trace.instrumentation.spark.AbstractSparkListenerTest import datadog.trace.instrumentation.spark.DatadogSpark213Listener import org.apache.spark.SparkConf -import org.apache.spark.scheduler.SparkListener class SparkListenerTest extends AbstractSparkListenerTest { @Override - protected SparkListener getTestDatadogSparkListener() { + protected DatadogSpark213Listener getTestDatadogSparkListener() { def conf = new SparkConf() return new DatadogSpark213Listener(conf, "some_app_id", "some_version") } + + @Override + protected DatadogSpark213Listener getTestDatadogSparkListener(SparkConf conf) { + return new DatadogSpark213Listener(conf, "some_app_id", "some_version") + } } diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 511515bc35c..6d17ad16e80 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -168,7 +168,7 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp } public void setupOpenLineage(DDTraceId traceId) { - log.debug("Setting up OpenLineage configuration"); + log.debug("Setting up OpenLineage configuration with trace id {}", traceId); if (openLineageSparkListener != null) { openLineageSparkConf.set("spark.openlineage.transport.type", "composite"); openLineageSparkConf.set("spark.openlineage.transport.continueOnFailure", "true"); @@ -273,17 +273,14 @@ private void captureOpenlineageContextIfPresent( AgentTracer.SpanBuilder builder, OpenlineageParentContext context) { builder.asChildOf(context); - builder.withSpanId(context.getChildRootSpanId()); - - log.debug( - "Captured Openlineage context: {}, with child trace_id: {}, child root span id: {}", - context, - context.getTraceId(), - context.getChildRootSpanId()); + log.debug("Captured Openlineage context: {}, with trace_id: {}", context, context.getTraceId()); builder.withTag("openlineage_parent_job_namespace", context.getParentJobNamespace()); builder.withTag("openlineage_parent_job_name", context.getParentJobName()); builder.withTag("openlineage_parent_run_id", context.getParentRunId()); + builder.withTag("openlineage_root_parent_job_namespace", context.getRootParentJobNamespace()); + builder.withTag("openlineage_root_parent_job_name", context.getRootParentJobName()); + builder.withTag("openlineage_root_parent_run_id", context.getRootParentRunId()); } @Override diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java index 6a0b28a70c0..51977d73f6c 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java @@ -7,10 +7,7 @@ import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; import datadog.trace.bootstrap.instrumentation.api.AgentTraceCollector; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; +import datadog.trace.util.FNV64Hash; import java.util.Collections; import java.util.Map; import java.util.Optional; @@ -27,16 +24,23 @@ public class OpenlineageParentContext implements AgentSpanContext { private final DDTraceId traceId; private final long spanId; - private final long childRootSpanId; private final String parentJobNamespace; private final String parentJobName; private final String parentRunId; + private final String rootParentJobNamespace; + private final String rootParentJobName; + private final String rootParentRunId; public static final String OPENLINEAGE_PARENT_JOB_NAMESPACE = "spark.openlineage.parentJobNamespace"; public static final String OPENLINEAGE_PARENT_JOB_NAME = "spark.openlineage.parentJobName"; public static final String OPENLINEAGE_PARENT_RUN_ID = "spark.openlineage.parentRunId"; + public static final String OPENLINEAGE_ROOT_PARENT_JOB_NAMESPACE = + "spark.openlineage.rootParentJobNamespace"; + public static final String OPENLINEAGE_ROOT_PARENT_JOB_NAME = + "spark.openlineage.rootParentJobName"; + public static final String OPENLINEAGE_ROOT_PARENT_RUN_ID = "spark.openlineage.rootParentRunId"; public static Optional from(SparkConf sparkConf) { if (!sparkConf.contains(OPENLINEAGE_PARENT_JOB_NAMESPACE) @@ -45,68 +49,84 @@ public static Optional from(SparkConf sparkConf) { return Optional.empty(); } + if (!sparkConf.contains(OPENLINEAGE_ROOT_PARENT_RUN_ID)) { + log.debug("Found parent info, but not root parent info. Can't construct valid trace id."); + return Optional.empty(); + } + String parentJobNamespace = sparkConf.get(OPENLINEAGE_PARENT_JOB_NAMESPACE); String parentJobName = sparkConf.get(OPENLINEAGE_PARENT_JOB_NAME); String parentRunId = sparkConf.get(OPENLINEAGE_PARENT_RUN_ID); if (!UUID.matcher(parentRunId).matches()) { + log.debug("OpenLineage parent run id is not a valid UUID: {}", parentRunId); + return Optional.empty(); + } + + String rootParentJobNamespace = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAMESPACE); + String rootParentJobName = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAME); + String rootParentRunId = sparkConf.get(OPENLINEAGE_ROOT_PARENT_RUN_ID); + + if (!UUID.matcher(rootParentRunId).matches()) { + log.debug("OpenLineage root parent run id is not a valid UUID: {}", parentRunId); return Optional.empty(); } return Optional.of( - new OpenlineageParentContext(parentJobNamespace, parentJobName, parentRunId)); + new OpenlineageParentContext( + parentJobNamespace, + parentJobName, + parentRunId, + rootParentJobNamespace, + rootParentJobName, + rootParentRunId)); } - OpenlineageParentContext(String parentJobNamespace, String parentJobName, String parentRunId) { + OpenlineageParentContext( + String parentJobNamespace, + String parentJobName, + String parentRunId, + String rootParentJobNamespace, + String rootParentJobName, + String rootParentRunId) { log.debug( - "Creating OpenlineageParentContext with parentJobNamespace: {}, parentJobName: {}, parentRunId: {}", + "Creating OpenlineageParentContext with parentJobNamespace: {}, parentJobName: {}, parentRunId: {}, rootParentJobNamespace: {}, rootParentJobName: {}, rootParentRunId: {}", parentJobNamespace, parentJobName, - parentRunId); + parentRunId, + rootParentJobNamespace, + rootParentJobName, + rootParentRunId); this.parentJobNamespace = parentJobNamespace; this.parentJobName = parentJobName; this.parentRunId = parentRunId; - MessageDigest digest = null; - try { - digest = MessageDigest.getInstance("SHA-256"); - } catch (NoSuchAlgorithmException e) { - log.debug("Unable to find SHA-256 algorithm", e); - } + this.rootParentJobNamespace = rootParentJobNamespace; + this.rootParentJobName = rootParentJobName; + this.rootParentRunId = rootParentRunId; - if (digest != null && parentJobNamespace != null && parentRunId != null) { - traceId = computeTraceId(digest, parentJobNamespace, parentJobName, parentRunId); - spanId = DDSpanId.ZERO; - - childRootSpanId = - computeChildRootSpanId(digest, parentJobNamespace, parentJobName, parentRunId); + if (this.rootParentRunId != null) { + traceId = computeTraceId(this.rootParentRunId); + spanId = computeSpanId(this.parentRunId); + } else if (this.parentRunId != null) { + traceId = computeTraceId(this.parentRunId); + spanId = computeSpanId(this.parentRunId); } else { traceId = DDTraceId.ZERO; spanId = DDSpanId.ZERO; - - childRootSpanId = DDSpanId.ZERO; } log.debug("Created OpenlineageParentContext with traceId: {}, spanId: {}", traceId, spanId); } - private long computeChildRootSpanId( - MessageDigest digest, String parentJobNamespace, String parentJobName, String parentRunId) { - byte[] inputBytes = - (parentJobNamespace + parentJobName + parentRunId).getBytes(StandardCharsets.UTF_8); - byte[] hash = digest.digest(inputBytes); - - return ByteBuffer.wrap(hash).getLong(); + private long computeSpanId(String runId) { + return FNV64Hash.generateHash(runId, FNV64Hash.Version.v1A); } - private DDTraceId computeTraceId( - MessageDigest digest, String parentJobNamespace, String parentJobName, String parentRunId) { - byte[] inputBytes = - (parentJobNamespace + parentJobName + parentRunId).getBytes(StandardCharsets.UTF_8); - byte[] hash = digest.digest(inputBytes); - - return DDTraceId.from(ByteBuffer.wrap(hash).getLong()); + private DDTraceId computeTraceId(String runId) { + log.debug("Generating traceID from runId: {}", runId); + return DDTraceId.from(FNV64Hash.generateHash(runId, FNV64Hash.Version.v1A)); } @Override @@ -119,10 +139,6 @@ public long getSpanId() { return spanId; } - public long getChildRootSpanId() { - return childRootSpanId; - } - @Override public AgentTraceCollector getTraceCollector() { return AgentTracer.NoopAgentTraceCollector.INSTANCE; @@ -159,4 +175,16 @@ public String getParentJobName() { public String getParentRunId() { return parentRunId; } + + public String getRootParentJobNamespace() { + return rootParentJobNamespace; + } + + public String getRootParentJobName() { + return rootParentJobName; + } + + public String getRootParentRunId() { + return rootParentRunId; + } } diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy index 2e910eafe09..47367fbef82 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy @@ -4,10 +4,10 @@ import com.datadoghq.sketch.ddsketch.DDSketchProtoBinding import com.datadoghq.sketch.ddsketch.proto.DDSketch import com.datadoghq.sketch.ddsketch.store.CollapsingLowestDenseStore import datadog.trace.agent.test.AgentTestRunner +import org.apache.spark.SparkConf import org.apache.spark.Success$ import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.JobSucceeded$ -import org.apache.spark.scheduler.SparkListener import org.apache.spark.scheduler.SparkListenerApplicationEnd import org.apache.spark.scheduler.SparkListenerApplicationStart import org.apache.spark.scheduler.SparkListenerExecutorAdded @@ -30,7 +30,8 @@ import scala.collection.JavaConverters abstract class AbstractSparkListenerTest extends AgentTestRunner { - protected abstract SparkListener getTestDatadogSparkListener() + protected abstract AbstractDatadogSparkListener getTestDatadogSparkListener() + protected abstract AbstractDatadogSparkListener getTestDatadogSparkListener(SparkConf conf) protected applicationStartEvent(time=0L) { // Constructor of SparkListenerApplicationStart changed starting spark 3.0 @@ -463,6 +464,22 @@ abstract class AbstractSparkListenerTest extends AgentTestRunner { } } + def "sets up OpenLineage trace id properly"() { + setup: + def conf = new SparkConf() + conf.set("spark.openlineage.parentRunId", "ad3b6baa-8d88-3b38-8dbe-f06232249a84") + conf.set("spark.openlineage.parentJobNamespace", "default") + conf.set("spark.openlineage.parentJobName", "dag-push-to-s3-spark.upload_to_s3") + conf.set("spark.openlineage.rootParentRunId", "01964820-5280-7674-b04e-82fbed085f39") + conf.set("spark.openlineage.rootParentJobNamespace", "default") + conf.set("spark.openlineage.rootParentJobName", "dag-push-to-s3-spark") + def listener = getTestDatadogSparkListener(conf) + + expect: + listener.onApplicationStart(applicationStartEvent(1000L)) + assert listener.openLineageSparkConf.get("spark.openlineage.run.tags").contains("13959090542865903119") + } + def "test lastJobFailed is not set when job is cancelled"() { setup: def listener = getTestDatadogSparkListener() diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/OpenlineageParentContextTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/OpenlineageParentContextTest.groovy index 34ec29b42b4..0287ab06e87 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/OpenlineageParentContextTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/OpenlineageParentContextTest.groovy @@ -1,11 +1,10 @@ package datadog.trace.instrumentation.spark -import datadog.trace.api.DDSpanId import org.apache.spark.SparkConf import spock.lang.Specification class OpenlineageParentContextTest extends Specification { - def "should create none empty OpenLineageParentContext using SHA-256 for TraceID and root span SpanId if all required fields are present" () { + def "should create OpenLineageParentContext with particular trace id based on root parent id" () { given: SparkConf mockSparkConf = Mock(SparkConf) @@ -13,9 +12,11 @@ class OpenlineageParentContextTest extends Specification { mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> true mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> true mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> true + mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> true mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> "default" mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> "dag-push-to-s3-spark.upload_to_s3" mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> parentRunId + mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> rootParentRunId then: Optional parentContext = OpenlineageParentContext.from(mockSparkConf) @@ -23,16 +24,16 @@ class OpenlineageParentContextTest extends Specification { parentContext.get().getParentJobNamespace() == "default" parentContext.get().getParentJobName() == "dag-push-to-s3-spark.upload_to_s3" - parentContext.get().getParentRunId() == expectedParentRunId + parentContext.get().getRootParentRunId() == rootParentRunId + parentContext.get().getParentRunId() == parentRunId - parentContext.get().traceId.toLong() == expectedTraceId - parentContext.get().spanId == DDSpanId.ZERO - parentContext.get().childRootSpanId == expectedRootSpanId + parentContext.get().traceId.toString() == expectedTraceId + parentContext.get().spanId.toString() == expectedSpanId where: - parentRunId | expectedParentRunId | expectedTraceId | expectedRootSpanId - "ad3b6baa-8d88-3b38-8dbe-f06232249a84" | "ad3b6baa-8d88-3b38-8dbe-f06232249a84" | 0xa475569dbce5e6cfL | 0xa475569dbce5e6cfL - "ad3b6baa-8d88-3b38-8dbe-f06232249a85" | "ad3b6baa-8d88-3b38-8dbe-f06232249a85" | 0x31da6680bd14991bL | 0x31da6680bd14991bL + rootParentRunId | parentRunId | expectedTraceId | expectedSpanId + "01964820-5280-7674-b04e-82fbed085f39" | "ad3b6baa-8d88-3b38-8dbe-f06232249a84" | "13959090542865903119" | "2903780135964948649" + "1a1a1a1a-2b2b-3c3c-4d4d-5e5e5e5e5e5e" | "6f6f6f6f-5e5e-4d4d-3c3c-2b2b2b2b2b2b" | "15830118871223350489" | "8020087091656517257" } def "should create empty OpenLineageParentContext if any required field is missing" () { @@ -43,20 +44,24 @@ class OpenlineageParentContextTest extends Specification { mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> jobNamespacePresent mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> jobNamePresent mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> runIdPresent + mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> rootParentIdPresent then: Optional parentContext = OpenlineageParentContext.from(mockSparkConf) parentContext.isPresent() == expected where: - jobNamespacePresent | jobNamePresent | runIdPresent | expected - true | true | false | false - true | false | true | false - false | true | true | false - true | false | false | false - false | true | false | false - false | false | true | false - false | false | false | false + jobNamespacePresent | jobNamePresent | runIdPresent | rootParentIdPresent | expected + true | true | true | false | false + true | true | false | false | false + true | true | true | false | false + true | true | false | true | false + true | false | true | false | false + false | true | true | true | false + true | false | false | false | false + false | true | false | false | false + false | false | true | true | false + false | false | false | false | false } def "should only generate a non-empty OpenlineageParentContext if parentRunId is a valid UUID" () { @@ -67,9 +72,12 @@ class OpenlineageParentContextTest extends Specification { mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> true mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> true mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> true + mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> true mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> "default" mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> "dag-push-to-s3-spark.upload_to_s3" mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> runId + mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> runId + then: Optional parentContext = OpenlineageParentContext.from(mockSparkConf) @@ -83,5 +91,33 @@ class OpenlineageParentContextTest extends Specification { "6afeb6ee-729d-37f7-b8e6f47ca694" | false "6AFEB6EE-729D-37F7-AD73-B8E6F47CA694" | true } + + def "should only generate a non-empty OpenlineageParentContext if rootParentRunId is a valid UUID" () { + given: + SparkConf mockSparkConf = Mock(SparkConf) + + when: + mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> true + mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> true + mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> true + mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> true + mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> "default" + mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> "dag-push-to-s3-spark.upload_to_s3" + mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> "6afeb6ee-729d-37f7-ad73-b8e6f47ca694" + mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> rootParentRunId + + + then: + Optional parentContext = OpenlineageParentContext.from(mockSparkConf) + parentContext.isPresent() == expected + + where: + rootParentRunId | expected + "6afeb6ee-729d-37f7-ad73-b8e6f47ca694" | true + " " | false + "invalid-uuid" | false + "6afeb6ee-729d-37f7-b8e6f47ca694" | false + "6AFEB6EE-729D-37F7-AD73-B8E6F47CA694" | true + } }