Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import datadog.trace.instrumentation.spark.AbstractSparkListenerTest
import datadog.trace.instrumentation.spark.DatadogSpark212Listener
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.SparkListener

class SparkListenerTest extends AbstractSparkListenerTest {
@Override
protected SparkListener getTestDatadogSparkListener() {
protected DatadogSpark212Listener getTestDatadogSparkListener() {
def conf = new SparkConf()
return new DatadogSpark212Listener(conf, "some_app_id", "some_version")
}

@Override
protected DatadogSpark212Listener getTestDatadogSparkListener(SparkConf conf) {
return new DatadogSpark212Listener(conf, "some_app_id", "some_version")
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import datadog.trace.instrumentation.spark.AbstractSparkListenerTest
import datadog.trace.instrumentation.spark.DatadogSpark213Listener
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.SparkListener

class SparkListenerTest extends AbstractSparkListenerTest {
@Override
protected SparkListener getTestDatadogSparkListener() {
protected DatadogSpark213Listener getTestDatadogSparkListener() {
def conf = new SparkConf()
return new DatadogSpark213Listener(conf, "some_app_id", "some_version")
}

@Override
protected DatadogSpark213Listener getTestDatadogSparkListener(SparkConf conf) {
return new DatadogSpark213Listener(conf, "some_app_id", "some_version")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
}

public void setupOpenLineage(DDTraceId traceId) {
log.debug("Setting up OpenLineage configuration");
log.debug("Setting up OpenLineage configuration with trace id {}", traceId);
if (openLineageSparkListener != null) {
openLineageSparkConf.set("spark.openlineage.transport.type", "composite");
openLineageSparkConf.set("spark.openlineage.transport.continueOnFailure", "true");
Expand Down Expand Up @@ -273,17 +273,14 @@ private void captureOpenlineageContextIfPresent(
AgentTracer.SpanBuilder builder, OpenlineageParentContext context) {
builder.asChildOf(context);

builder.withSpanId(context.getChildRootSpanId());

log.debug(
"Captured Openlineage context: {}, with child trace_id: {}, child root span id: {}",
context,
context.getTraceId(),
context.getChildRootSpanId());
log.debug("Captured Openlineage context: {}, with trace_id: {}", context, context.getTraceId());

builder.withTag("openlineage_parent_job_namespace", context.getParentJobNamespace());
builder.withTag("openlineage_parent_job_name", context.getParentJobName());
builder.withTag("openlineage_parent_run_id", context.getParentRunId());
builder.withTag("openlineage_root_parent_job_namespace", context.getRootParentJobNamespace());
builder.withTag("openlineage_root_parent_job_name", context.getRootParentJobName());
builder.withTag("openlineage_root_parent_run_id", context.getRootParentRunId());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
import datadog.trace.bootstrap.instrumentation.api.AgentTraceCollector;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import datadog.trace.util.FNV64Hash;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
Expand All @@ -27,16 +24,23 @@ public class OpenlineageParentContext implements AgentSpanContext {

private final DDTraceId traceId;
private final long spanId;
private final long childRootSpanId;

private final String parentJobNamespace;
private final String parentJobName;
private final String parentRunId;
private final String rootParentJobNamespace;
private final String rootParentJobName;
private final String rootParentRunId;

public static final String OPENLINEAGE_PARENT_JOB_NAMESPACE =
"spark.openlineage.parentJobNamespace";
public static final String OPENLINEAGE_PARENT_JOB_NAME = "spark.openlineage.parentJobName";
public static final String OPENLINEAGE_PARENT_RUN_ID = "spark.openlineage.parentRunId";
public static final String OPENLINEAGE_ROOT_PARENT_JOB_NAMESPACE =
"spark.openlineage.rootParentJobNamespace";
public static final String OPENLINEAGE_ROOT_PARENT_JOB_NAME =
"spark.openlineage.rootParentJobName";
public static final String OPENLINEAGE_ROOT_PARENT_RUN_ID = "spark.openlineage.rootParentRunId";

public static Optional<OpenlineageParentContext> from(SparkConf sparkConf) {
if (!sparkConf.contains(OPENLINEAGE_PARENT_JOB_NAMESPACE)
Expand All @@ -45,68 +49,84 @@ public static Optional<OpenlineageParentContext> from(SparkConf sparkConf) {
return Optional.empty();
}

if (!sparkConf.contains(OPENLINEAGE_ROOT_PARENT_RUN_ID)) {
log.debug("Found parent info, but not root parent info. Can't construct valid trace id.");
return Optional.empty();
}

String parentJobNamespace = sparkConf.get(OPENLINEAGE_PARENT_JOB_NAMESPACE);
String parentJobName = sparkConf.get(OPENLINEAGE_PARENT_JOB_NAME);
String parentRunId = sparkConf.get(OPENLINEAGE_PARENT_RUN_ID);

if (!UUID.matcher(parentRunId).matches()) {
log.debug("OpenLineage parent run id is not a valid UUID: {}", parentRunId);
return Optional.empty();
}

String rootParentJobNamespace = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAMESPACE);
String rootParentJobName = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAME);
String rootParentRunId = sparkConf.get(OPENLINEAGE_ROOT_PARENT_RUN_ID);

if (!UUID.matcher(rootParentRunId).matches()) {
log.debug("OpenLineage root parent run id is not a valid UUID: {}", parentRunId);
return Optional.empty();
}

return Optional.of(
new OpenlineageParentContext(parentJobNamespace, parentJobName, parentRunId));
new OpenlineageParentContext(
parentJobNamespace,
parentJobName,
parentRunId,
rootParentJobNamespace,
rootParentJobName,
rootParentRunId));
}

OpenlineageParentContext(String parentJobNamespace, String parentJobName, String parentRunId) {
OpenlineageParentContext(
String parentJobNamespace,
String parentJobName,
String parentRunId,
String rootParentJobNamespace,
String rootParentJobName,
String rootParentRunId) {
log.debug(
"Creating OpenlineageParentContext with parentJobNamespace: {}, parentJobName: {}, parentRunId: {}",
"Creating OpenlineageParentContext with parentJobNamespace: {}, parentJobName: {}, parentRunId: {}, rootParentJobNamespace: {}, rootParentJobName: {}, rootParentRunId: {}",
parentJobNamespace,
parentJobName,
parentRunId);
parentRunId,
rootParentJobNamespace,
rootParentJobName,
rootParentRunId);

this.parentJobNamespace = parentJobNamespace;
this.parentJobName = parentJobName;
this.parentRunId = parentRunId;

MessageDigest digest = null;
try {
digest = MessageDigest.getInstance("SHA-256");
} catch (NoSuchAlgorithmException e) {
log.debug("Unable to find SHA-256 algorithm", e);
}
this.rootParentJobNamespace = rootParentJobNamespace;
this.rootParentJobName = rootParentJobName;
this.rootParentRunId = rootParentRunId;

if (digest != null && parentJobNamespace != null && parentRunId != null) {
traceId = computeTraceId(digest, parentJobNamespace, parentJobName, parentRunId);
spanId = DDSpanId.ZERO;

childRootSpanId =
computeChildRootSpanId(digest, parentJobNamespace, parentJobName, parentRunId);
if (this.rootParentRunId != null) {
traceId = computeTraceId(this.rootParentRunId);
spanId = computeSpanId(this.parentRunId);
} else if (this.parentRunId != null) {
traceId = computeTraceId(this.parentRunId);
spanId = computeSpanId(this.parentRunId);
} else {
traceId = DDTraceId.ZERO;
spanId = DDSpanId.ZERO;

childRootSpanId = DDSpanId.ZERO;
}

log.debug("Created OpenlineageParentContext with traceId: {}, spanId: {}", traceId, spanId);
}

private long computeChildRootSpanId(
MessageDigest digest, String parentJobNamespace, String parentJobName, String parentRunId) {
byte[] inputBytes =
(parentJobNamespace + parentJobName + parentRunId).getBytes(StandardCharsets.UTF_8);
byte[] hash = digest.digest(inputBytes);

return ByteBuffer.wrap(hash).getLong();
private long computeSpanId(String runId) {
return FNV64Hash.generateHash(runId, FNV64Hash.Version.v1A);
}

private DDTraceId computeTraceId(
MessageDigest digest, String parentJobNamespace, String parentJobName, String parentRunId) {
byte[] inputBytes =
(parentJobNamespace + parentJobName + parentRunId).getBytes(StandardCharsets.UTF_8);
byte[] hash = digest.digest(inputBytes);

return DDTraceId.from(ByteBuffer.wrap(hash).getLong());
private DDTraceId computeTraceId(String runId) {
log.debug("Generating traceID from runId: {}", runId);
return DDTraceId.from(FNV64Hash.generateHash(runId, FNV64Hash.Version.v1A));
}

@Override
Expand All @@ -119,10 +139,6 @@ public long getSpanId() {
return spanId;
}

public long getChildRootSpanId() {
return childRootSpanId;
}

@Override
public AgentTraceCollector getTraceCollector() {
return AgentTracer.NoopAgentTraceCollector.INSTANCE;
Expand Down Expand Up @@ -159,4 +175,16 @@ public String getParentJobName() {
public String getParentRunId() {
return parentRunId;
}

public String getRootParentJobNamespace() {
return rootParentJobNamespace;
}

public String getRootParentJobName() {
return rootParentJobName;
}

public String getRootParentRunId() {
return rootParentRunId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import com.datadoghq.sketch.ddsketch.DDSketchProtoBinding
import com.datadoghq.sketch.ddsketch.proto.DDSketch
import com.datadoghq.sketch.ddsketch.store.CollapsingLowestDenseStore
import datadog.trace.agent.test.AgentTestRunner
import org.apache.spark.SparkConf
import org.apache.spark.Success$
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.JobSucceeded$
import org.apache.spark.scheduler.SparkListener
import org.apache.spark.scheduler.SparkListenerApplicationEnd
import org.apache.spark.scheduler.SparkListenerApplicationStart
import org.apache.spark.scheduler.SparkListenerExecutorAdded
Expand All @@ -30,7 +30,8 @@ import scala.collection.JavaConverters

abstract class AbstractSparkListenerTest extends AgentTestRunner {

protected abstract SparkListener getTestDatadogSparkListener()
protected abstract AbstractDatadogSparkListener getTestDatadogSparkListener()
protected abstract AbstractDatadogSparkListener getTestDatadogSparkListener(SparkConf conf)

protected applicationStartEvent(time=0L) {
// Constructor of SparkListenerApplicationStart changed starting spark 3.0
Expand Down Expand Up @@ -463,6 +464,22 @@ abstract class AbstractSparkListenerTest extends AgentTestRunner {
}
}

def "sets up OpenLineage trace id properly"() {
setup:
def conf = new SparkConf()
conf.set("spark.openlineage.parentRunId", "ad3b6baa-8d88-3b38-8dbe-f06232249a84")
conf.set("spark.openlineage.parentJobNamespace", "default")
conf.set("spark.openlineage.parentJobName", "dag-push-to-s3-spark.upload_to_s3")
conf.set("spark.openlineage.rootParentRunId", "01964820-5280-7674-b04e-82fbed085f39")
conf.set("spark.openlineage.rootParentJobNamespace", "default")
conf.set("spark.openlineage.rootParentJobName", "dag-push-to-s3-spark")
def listener = getTestDatadogSparkListener(conf)

expect:
listener.onApplicationStart(applicationStartEvent(1000L))
assert listener.openLineageSparkConf.get("spark.openlineage.run.tags").contains("13959090542865903119")
}

def "test lastJobFailed is not set when job is cancelled"() {
setup:
def listener = getTestDatadogSparkListener()
Expand Down
Loading