Skip to content

Commit 5baa904

Browse files
Revert "Use OpenLineage root parent information to generate trace id (#8726)"
This reverts commit 44de9a3.
1 parent 75634fe commit 5baa904

File tree

6 files changed

+72
-158
lines changed

6 files changed

+72
-158
lines changed
Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,12 @@
11
import datadog.trace.instrumentation.spark.AbstractSparkListenerTest
22
import datadog.trace.instrumentation.spark.DatadogSpark212Listener
33
import org.apache.spark.SparkConf
4+
import org.apache.spark.scheduler.SparkListener
45

56
class SparkListenerTest extends AbstractSparkListenerTest {
67
@Override
7-
protected DatadogSpark212Listener getTestDatadogSparkListener() {
8+
protected SparkListener getTestDatadogSparkListener() {
89
def conf = new SparkConf()
910
return new DatadogSpark212Listener(conf, "some_app_id", "some_version")
1011
}
11-
12-
@Override
13-
protected DatadogSpark212Listener getTestDatadogSparkListener(SparkConf conf) {
14-
return new DatadogSpark212Listener(conf, "some_app_id", "some_version")
15-
}
1612
}
Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,12 @@
11
import datadog.trace.instrumentation.spark.AbstractSparkListenerTest
22
import datadog.trace.instrumentation.spark.DatadogSpark213Listener
33
import org.apache.spark.SparkConf
4+
import org.apache.spark.scheduler.SparkListener
45

56
class SparkListenerTest extends AbstractSparkListenerTest {
67
@Override
7-
protected DatadogSpark213Listener getTestDatadogSparkListener() {
8+
protected SparkListener getTestDatadogSparkListener() {
89
def conf = new SparkConf()
910
return new DatadogSpark213Listener(conf, "some_app_id", "some_version")
1011
}
11-
12-
@Override
13-
protected DatadogSpark213Listener getTestDatadogSparkListener(SparkConf conf) {
14-
return new DatadogSpark213Listener(conf, "some_app_id", "some_version")
15-
}
1612
}

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
168168
}
169169

170170
public void setupOpenLineage(DDTraceId traceId) {
171-
log.debug("Setting up OpenLineage configuration with trace id {}", traceId);
171+
log.debug("Setting up OpenLineage configuration");
172172
if (openLineageSparkListener != null) {
173173
openLineageSparkConf.set("spark.openlineage.transport.type", "composite");
174174
openLineageSparkConf.set("spark.openlineage.transport.continueOnFailure", "true");
@@ -273,14 +273,17 @@ private void captureOpenlineageContextIfPresent(
273273
AgentTracer.SpanBuilder builder, OpenlineageParentContext context) {
274274
builder.asChildOf(context);
275275

276-
log.debug("Captured Openlineage context: {}, with trace_id: {}", context, context.getTraceId());
276+
builder.withSpanId(context.getChildRootSpanId());
277+
278+
log.debug(
279+
"Captured Openlineage context: {}, with child trace_id: {}, child root span id: {}",
280+
context,
281+
context.getTraceId(),
282+
context.getChildRootSpanId());
277283

278284
builder.withTag("openlineage_parent_job_namespace", context.getParentJobNamespace());
279285
builder.withTag("openlineage_parent_job_name", context.getParentJobName());
280286
builder.withTag("openlineage_parent_run_id", context.getParentRunId());
281-
builder.withTag("openlineage_root_parent_job_namespace", context.getRootParentJobNamespace());
282-
builder.withTag("openlineage_root_parent_job_name", context.getRootParentJobName());
283-
builder.withTag("openlineage_root_parent_run_id", context.getRootParentRunId());
284287
}
285288

286289
@Override

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java

Lines changed: 41 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@
77
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
88
import datadog.trace.bootstrap.instrumentation.api.AgentTraceCollector;
99
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
10-
import datadog.trace.util.FNV64Hash;
10+
import java.nio.ByteBuffer;
11+
import java.nio.charset.StandardCharsets;
12+
import java.security.MessageDigest;
13+
import java.security.NoSuchAlgorithmException;
1114
import java.util.Collections;
1215
import java.util.Map;
1316
import java.util.Optional;
@@ -24,23 +27,16 @@ public class OpenlineageParentContext implements AgentSpanContext {
2427

2528
private final DDTraceId traceId;
2629
private final long spanId;
30+
private final long childRootSpanId;
2731

2832
private final String parentJobNamespace;
2933
private final String parentJobName;
3034
private final String parentRunId;
31-
private final String rootParentJobNamespace;
32-
private final String rootParentJobName;
33-
private final String rootParentRunId;
3435

3536
public static final String OPENLINEAGE_PARENT_JOB_NAMESPACE =
3637
"spark.openlineage.parentJobNamespace";
3738
public static final String OPENLINEAGE_PARENT_JOB_NAME = "spark.openlineage.parentJobName";
3839
public static final String OPENLINEAGE_PARENT_RUN_ID = "spark.openlineage.parentRunId";
39-
public static final String OPENLINEAGE_ROOT_PARENT_JOB_NAMESPACE =
40-
"spark.openlineage.rootParentJobNamespace";
41-
public static final String OPENLINEAGE_ROOT_PARENT_JOB_NAME =
42-
"spark.openlineage.rootParentJobName";
43-
public static final String OPENLINEAGE_ROOT_PARENT_RUN_ID = "spark.openlineage.rootParentRunId";
4440

4541
public static Optional<OpenlineageParentContext> from(SparkConf sparkConf) {
4642
if (!sparkConf.contains(OPENLINEAGE_PARENT_JOB_NAMESPACE)
@@ -49,84 +45,68 @@ public static Optional<OpenlineageParentContext> from(SparkConf sparkConf) {
4945
return Optional.empty();
5046
}
5147

52-
if (!sparkConf.contains(OPENLINEAGE_ROOT_PARENT_RUN_ID)) {
53-
log.debug("Found parent info, but not root parent info. Can't construct valid trace id.");
54-
return Optional.empty();
55-
}
56-
5748
String parentJobNamespace = sparkConf.get(OPENLINEAGE_PARENT_JOB_NAMESPACE);
5849
String parentJobName = sparkConf.get(OPENLINEAGE_PARENT_JOB_NAME);
5950
String parentRunId = sparkConf.get(OPENLINEAGE_PARENT_RUN_ID);
6051

6152
if (!UUID.matcher(parentRunId).matches()) {
62-
log.debug("OpenLineage parent run id is not a valid UUID: {}", parentRunId);
63-
return Optional.empty();
64-
}
65-
66-
String rootParentJobNamespace = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAMESPACE);
67-
String rootParentJobName = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAME);
68-
String rootParentRunId = sparkConf.get(OPENLINEAGE_ROOT_PARENT_RUN_ID);
69-
70-
if (!UUID.matcher(rootParentRunId).matches()) {
71-
log.debug("OpenLineage root parent run id is not a valid UUID: {}", parentRunId);
7253
return Optional.empty();
7354
}
7455

7556
return Optional.of(
76-
new OpenlineageParentContext(
77-
parentJobNamespace,
78-
parentJobName,
79-
parentRunId,
80-
rootParentJobNamespace,
81-
rootParentJobName,
82-
rootParentRunId));
57+
new OpenlineageParentContext(parentJobNamespace, parentJobName, parentRunId));
8358
}
8459

85-
OpenlineageParentContext(
86-
String parentJobNamespace,
87-
String parentJobName,
88-
String parentRunId,
89-
String rootParentJobNamespace,
90-
String rootParentJobName,
91-
String rootParentRunId) {
60+
OpenlineageParentContext(String parentJobNamespace, String parentJobName, String parentRunId) {
9261
log.debug(
93-
"Creating OpenlineageParentContext with parentJobNamespace: {}, parentJobName: {}, parentRunId: {}, rootParentJobNamespace: {}, rootParentJobName: {}, rootParentRunId: {}",
62+
"Creating OpenlineageParentContext with parentJobNamespace: {}, parentJobName: {}, parentRunId: {}",
9463
parentJobNamespace,
9564
parentJobName,
96-
parentRunId,
97-
rootParentJobNamespace,
98-
rootParentJobName,
99-
rootParentRunId);
65+
parentRunId);
10066

10167
this.parentJobNamespace = parentJobNamespace;
10268
this.parentJobName = parentJobName;
10369
this.parentRunId = parentRunId;
10470

105-
this.rootParentJobNamespace = rootParentJobNamespace;
106-
this.rootParentJobName = rootParentJobName;
107-
this.rootParentRunId = rootParentRunId;
71+
MessageDigest digest = null;
72+
try {
73+
digest = MessageDigest.getInstance("SHA-256");
74+
} catch (NoSuchAlgorithmException e) {
75+
log.debug("Unable to find SHA-256 algorithm", e);
76+
}
10877

109-
if (this.rootParentRunId != null) {
110-
traceId = computeTraceId(this.rootParentRunId);
111-
spanId = computeSpanId(this.parentRunId);
112-
} else if (this.parentRunId != null) {
113-
traceId = computeTraceId(this.parentRunId);
114-
spanId = computeSpanId(this.parentRunId);
78+
if (digest != null && parentJobNamespace != null && parentRunId != null) {
79+
traceId = computeTraceId(digest, parentJobNamespace, parentJobName, parentRunId);
80+
spanId = DDSpanId.ZERO;
81+
82+
childRootSpanId =
83+
computeChildRootSpanId(digest, parentJobNamespace, parentJobName, parentRunId);
11584
} else {
11685
traceId = DDTraceId.ZERO;
11786
spanId = DDSpanId.ZERO;
87+
88+
childRootSpanId = DDSpanId.ZERO;
11889
}
11990

12091
log.debug("Created OpenlineageParentContext with traceId: {}, spanId: {}", traceId, spanId);
12192
}
12293

123-
private long computeSpanId(String runId) {
124-
return FNV64Hash.generateHash(runId, FNV64Hash.Version.v1A);
94+
private long computeChildRootSpanId(
95+
MessageDigest digest, String parentJobNamespace, String parentJobName, String parentRunId) {
96+
byte[] inputBytes =
97+
(parentJobNamespace + parentJobName + parentRunId).getBytes(StandardCharsets.UTF_8);
98+
byte[] hash = digest.digest(inputBytes);
99+
100+
return ByteBuffer.wrap(hash).getLong();
125101
}
126102

127-
private DDTraceId computeTraceId(String runId) {
128-
log.debug("Generating traceID from runId: {}", runId);
129-
return DDTraceId.from(FNV64Hash.generateHash(runId, FNV64Hash.Version.v1A));
103+
private DDTraceId computeTraceId(
104+
MessageDigest digest, String parentJobNamespace, String parentJobName, String parentRunId) {
105+
byte[] inputBytes =
106+
(parentJobNamespace + parentJobName + parentRunId).getBytes(StandardCharsets.UTF_8);
107+
byte[] hash = digest.digest(inputBytes);
108+
109+
return DDTraceId.from(ByteBuffer.wrap(hash).getLong());
130110
}
131111

132112
@Override
@@ -139,6 +119,10 @@ public long getSpanId() {
139119
return spanId;
140120
}
141121

122+
public long getChildRootSpanId() {
123+
return childRootSpanId;
124+
}
125+
142126
@Override
143127
public AgentTraceCollector getTraceCollector() {
144128
return AgentTracer.NoopAgentTraceCollector.INSTANCE;
@@ -175,16 +159,4 @@ public String getParentJobName() {
175159
public String getParentRunId() {
176160
return parentRunId;
177161
}
178-
179-
public String getRootParentJobNamespace() {
180-
return rootParentJobNamespace;
181-
}
182-
183-
public String getRootParentJobName() {
184-
return rootParentJobName;
185-
}
186-
187-
public String getRootParentRunId() {
188-
return rootParentRunId;
189-
}
190162
}

dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ import com.datadoghq.sketch.ddsketch.DDSketchProtoBinding
44
import com.datadoghq.sketch.ddsketch.proto.DDSketch
55
import com.datadoghq.sketch.ddsketch.store.CollapsingLowestDenseStore
66
import datadog.trace.agent.test.AgentTestRunner
7-
import org.apache.spark.SparkConf
87
import org.apache.spark.Success$
98
import org.apache.spark.executor.TaskMetrics
109
import org.apache.spark.scheduler.JobSucceeded$
10+
import org.apache.spark.scheduler.SparkListener
1111
import org.apache.spark.scheduler.SparkListenerApplicationEnd
1212
import org.apache.spark.scheduler.SparkListenerApplicationStart
1313
import org.apache.spark.scheduler.SparkListenerExecutorAdded
@@ -30,8 +30,7 @@ import scala.collection.JavaConverters
3030

3131
abstract class AbstractSparkListenerTest extends AgentTestRunner {
3232

33-
protected abstract AbstractDatadogSparkListener getTestDatadogSparkListener()
34-
protected abstract AbstractDatadogSparkListener getTestDatadogSparkListener(SparkConf conf)
33+
protected abstract SparkListener getTestDatadogSparkListener()
3534

3635
protected applicationStartEvent(time=0L) {
3736
// Constructor of SparkListenerApplicationStart changed starting spark 3.0
@@ -464,22 +463,6 @@ abstract class AbstractSparkListenerTest extends AgentTestRunner {
464463
}
465464
}
466465

467-
def "sets up OpenLineage trace id properly"() {
468-
setup:
469-
def conf = new SparkConf()
470-
conf.set("spark.openlineage.parentRunId", "ad3b6baa-8d88-3b38-8dbe-f06232249a84")
471-
conf.set("spark.openlineage.parentJobNamespace", "default")
472-
conf.set("spark.openlineage.parentJobName", "dag-push-to-s3-spark.upload_to_s3")
473-
conf.set("spark.openlineage.rootParentRunId", "01964820-5280-7674-b04e-82fbed085f39")
474-
conf.set("spark.openlineage.rootParentJobNamespace", "default")
475-
conf.set("spark.openlineage.rootParentJobName", "dag-push-to-s3-spark")
476-
def listener = getTestDatadogSparkListener(conf)
477-
478-
expect:
479-
listener.onApplicationStart(applicationStartEvent(1000L))
480-
assert listener.openLineageSparkConf.get("spark.openlineage.run.tags").contains("13959090542865903119")
481-
}
482-
483466
def "test lastJobFailed is not set when job is cancelled"() {
484467
setup:
485468
def listener = getTestDatadogSparkListener()

0 commit comments

Comments
 (0)