Skip to content

Commit c471269

Browse files
Implement telemetry for CI Visibility (#6664)
1 parent 8f172ea commit c471269

File tree

32 files changed

+1194
-1
lines changed

32 files changed

+1194
-1
lines changed
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package datadog.trace.civisibility.telemetry;
2+
3+
import datadog.trace.api.civisibility.telemetry.CiVisibilityCountMetric;
4+
import datadog.trace.api.civisibility.telemetry.CiVisibilityDistributionMetric;
5+
import datadog.trace.api.civisibility.telemetry.CiVisibilityMetricCollector;
6+
import datadog.trace.api.civisibility.telemetry.CiVisibilityMetricData;
7+
import datadog.trace.api.civisibility.telemetry.TagValue;
8+
import java.util.ArrayList;
9+
import java.util.Arrays;
10+
import java.util.Collection;
11+
import java.util.Collections;
12+
import java.util.List;
13+
import java.util.concurrent.ArrayBlockingQueue;
14+
import java.util.concurrent.BlockingQueue;
15+
import java.util.concurrent.atomic.AtomicBoolean;
16+
import java.util.concurrent.atomic.AtomicLongArray;
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
19+
20+
public class CiVisibilityMetricCollectorImpl implements CiVisibilityMetricCollector {
21+
22+
private static final Logger log = LoggerFactory.getLogger(CiVisibilityMetricCollectorImpl.class);
23+
24+
private static final int COUNTER_CARD_SIZE = 64;
25+
26+
private final BlockingQueue<CiVisibilityMetricData> rawMetricsQueue;
27+
private final BlockingQueue<DistributionSeriesPoint> rawDistributionPointsQueue;
28+
private final AtomicLongArray counters;
29+
/**
30+
* Cards are used to avoid iterating over the entire {@link
31+
* CiVisibilityMetricCollectorImpl#counters} array every time {@link
32+
* CiVisibilityMetricCollectorImpl#prepareMetrics} is called.
33+
*
34+
* <p>Every card corresponds to {@link CiVisibilityMetricCollectorImpl#COUNTER_CARD_SIZE} elements
35+
* of the counters array. If a card is "dirty" (set to {@code true}), it means one or more of the
36+
* counters that it "covers" was modified. If it is not, then none of the corresponding counters
37+
* were touched, and iterating that part of the array can be skipped.
38+
*/
39+
private final AtomicBoolean[] counterDirtyCards;
40+
41+
public CiVisibilityMetricCollectorImpl() {
42+
this(
43+
new ArrayBlockingQueue<>(RAW_QUEUE_SIZE),
44+
new ArrayBlockingQueue<>(RAW_QUEUE_SIZE),
45+
CiVisibilityCountMetric.count());
46+
}
47+
48+
CiVisibilityMetricCollectorImpl(
49+
final BlockingQueue<CiVisibilityMetricData> rawMetricsQueue,
50+
final BlockingQueue<DistributionSeriesPoint> rawDistributionPointsQueue,
51+
final int countersTotal) {
52+
this.rawMetricsQueue = rawMetricsQueue;
53+
this.rawDistributionPointsQueue = rawDistributionPointsQueue;
54+
this.counters = new AtomicLongArray(countersTotal);
55+
56+
counterDirtyCards = new AtomicBoolean[(countersTotal - 1) / COUNTER_CARD_SIZE + 1];
57+
for (int i = 0; i < counterDirtyCards.length; i++) {
58+
counterDirtyCards[i] = new AtomicBoolean(false);
59+
}
60+
}
61+
62+
@Override
63+
public void add(CiVisibilityDistributionMetric metric, int value, TagValue... tags) {
64+
DistributionSeriesPoint point = metric.createDataPoint(value, tags);
65+
if (!rawDistributionPointsQueue.offer(point)) {
66+
log.debug(
67+
"Discarding metric {}:{}:{} because the queue is full",
68+
metric,
69+
value,
70+
Arrays.toString(tags));
71+
}
72+
}
73+
74+
@Override
75+
public Collection<DistributionSeriesPoint> drainDistributionSeries() {
76+
if (!this.rawDistributionPointsQueue.isEmpty()) {
77+
List<DistributionSeriesPoint> drained =
78+
new ArrayList<>(this.rawDistributionPointsQueue.size());
79+
this.rawDistributionPointsQueue.drainTo(drained);
80+
return drained;
81+
}
82+
return Collections.emptyList();
83+
}
84+
85+
@Override
86+
public void add(CiVisibilityCountMetric metric, long value, TagValue... tags) {
87+
int counterIdx = metric.getIndex(tags);
88+
counters.getAndAdd(counterIdx, value);
89+
90+
int dirtyCardIdx = counterIdx / COUNTER_CARD_SIZE;
91+
counterDirtyCards[dirtyCardIdx].set(true);
92+
}
93+
94+
@Override
95+
public void prepareMetrics() {
96+
int metricIdx = 0;
97+
CiVisibilityCountMetric[] countMetrics = CiVisibilityCountMetric.values();
98+
99+
for (int dirtyCardIdx = 0; dirtyCardIdx < counterDirtyCards.length; dirtyCardIdx++) {
100+
boolean dirty = counterDirtyCards[dirtyCardIdx].getAndSet(false);
101+
if (!dirty) {
102+
// none of the counters in this card was touched
103+
continue;
104+
}
105+
106+
int beginIdx = dirtyCardIdx * COUNTER_CARD_SIZE;
107+
int endIdx = Math.min(beginIdx + COUNTER_CARD_SIZE, counters.length());
108+
for (int counterIdx = beginIdx; counterIdx < endIdx; counterIdx++) {
109+
while (countMetrics[metricIdx].getEndIndex() <= counterIdx) {
110+
metricIdx++;
111+
}
112+
113+
long counter = counters.getAndSet(counterIdx, 0);
114+
if (counter == 0) {
115+
continue;
116+
}
117+
118+
CiVisibilityCountMetric metric = countMetrics[metricIdx];
119+
TagValue[] tagValues = metric.getTagValues(counterIdx);
120+
CiVisibilityMetricData metricData = metric.createData(counter, tagValues);
121+
if (!rawMetricsQueue.offer(metricData)) {
122+
// re-updating the counter to avoid losing metric data
123+
add(metric, counter, tagValues);
124+
return;
125+
}
126+
}
127+
}
128+
}
129+
130+
@Override
131+
public Collection<CiVisibilityMetricData> drain() {
132+
if (!this.rawMetricsQueue.isEmpty()) {
133+
List<CiVisibilityMetricData> drained = new ArrayList<>(this.rawMetricsQueue.size());
134+
this.rawMetricsQueue.drainTo(drained);
135+
return drained;
136+
}
137+
return Collections.emptyList();
138+
}
139+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
package datadog.trace.civisibility.telemetry
2+
3+
import datadog.trace.api.civisibility.telemetry.CiVisibilityCountMetric
4+
import datadog.trace.api.civisibility.telemetry.CiVisibilityDistributionMetric
5+
import datadog.trace.api.civisibility.telemetry.CiVisibilityMetricData
6+
import datadog.trace.api.civisibility.telemetry.TagValue
7+
import datadog.trace.api.civisibility.telemetry.tag.Endpoint
8+
import datadog.trace.api.civisibility.telemetry.tag.EventType
9+
import datadog.trace.api.civisibility.telemetry.tag.Library
10+
import datadog.trace.api.telemetry.MetricCollector
11+
import spock.lang.Specification
12+
13+
class CiVisibilityMetricCollectorTest extends Specification {
14+
15+
def "test distribution metric submission"() {
16+
setup:
17+
def collector = new CiVisibilityMetricCollectorImpl()
18+
19+
when:
20+
collector.add(CiVisibilityDistributionMetric.ENDPOINT_PAYLOAD_BYTES, 123, Endpoint.CODE_COVERAGE)
21+
collector.prepareMetrics()
22+
def metrics = collector.drainDistributionSeries()
23+
24+
then:
25+
metrics == [
26+
new MetricCollector.DistributionSeriesPoint(CiVisibilityDistributionMetric.ENDPOINT_PAYLOAD_BYTES.getName(), true, "civisibility", 123, [Endpoint.CODE_COVERAGE.asString()])
27+
]
28+
}
29+
30+
def "test distribution metrics are not aggregated"() {
31+
setup:
32+
def collector = new CiVisibilityMetricCollectorImpl()
33+
34+
when:
35+
collector.add(CiVisibilityDistributionMetric.ENDPOINT_PAYLOAD_BYTES, 123, Endpoint.CODE_COVERAGE)
36+
collector.add(CiVisibilityDistributionMetric.ENDPOINT_PAYLOAD_BYTES, 456, Endpoint.CODE_COVERAGE)
37+
collector.prepareMetrics()
38+
def metrics = collector.drainDistributionSeries()
39+
40+
then:
41+
metrics == [
42+
new MetricCollector.DistributionSeriesPoint(CiVisibilityDistributionMetric.ENDPOINT_PAYLOAD_BYTES.getName(), true, "civisibility", 123, [Endpoint.CODE_COVERAGE.asString()]),
43+
new MetricCollector.DistributionSeriesPoint(CiVisibilityDistributionMetric.ENDPOINT_PAYLOAD_BYTES.getName(), true, "civisibility", 456, [Endpoint.CODE_COVERAGE.asString()])
44+
]
45+
}
46+
47+
def "test count metric submission"() {
48+
setup:
49+
def collector = new CiVisibilityMetricCollectorImpl()
50+
51+
when:
52+
collector.add(CiVisibilityCountMetric.CODE_COVERAGE_ERRORS, 123)
53+
collector.prepareMetrics()
54+
def metrics = collector.drain()
55+
56+
then:
57+
metrics == [new CiVisibilityMetricData(CiVisibilityCountMetric.CODE_COVERAGE_ERRORS.getName(), 123)]
58+
}
59+
60+
def "test count metric aggregation"() {
61+
setup:
62+
def collector = new CiVisibilityMetricCollectorImpl()
63+
64+
when:
65+
collector.add(CiVisibilityCountMetric.CODE_COVERAGE_ERRORS, 123)
66+
collector.add(CiVisibilityCountMetric.CODE_COVERAGE_ERRORS, 123)
67+
collector.prepareMetrics()
68+
def metrics = collector.drain()
69+
70+
then:
71+
metrics == [new CiVisibilityMetricData(CiVisibilityCountMetric.CODE_COVERAGE_ERRORS.getName(), 246)]
72+
}
73+
74+
def "test count metrics submitted in different cycles are not aggregated"() {
75+
setup:
76+
def collector = new CiVisibilityMetricCollectorImpl()
77+
78+
when:
79+
collector.add(CiVisibilityCountMetric.CODE_COVERAGE_ERRORS, 123)
80+
collector.prepareMetrics()
81+
82+
collector.add(CiVisibilityCountMetric.CODE_COVERAGE_ERRORS, 123)
83+
collector.prepareMetrics()
84+
85+
def metrics = collector.drain()
86+
87+
then:
88+
metrics == [
89+
new CiVisibilityMetricData(CiVisibilityCountMetric.CODE_COVERAGE_ERRORS.getName(), 246),
90+
new CiVisibilityMetricData(CiVisibilityCountMetric.CODE_COVERAGE_ERRORS.getName(), 246)
91+
]
92+
}
93+
94+
def "test count metrics with different tags are not aggregated"() {
95+
setup:
96+
def collector = new CiVisibilityMetricCollectorImpl()
97+
98+
when:
99+
collector.add(CiVisibilityCountMetric.ITR_FORCED_RUN, 123, EventType.MODULE)
100+
collector.add(CiVisibilityCountMetric.ITR_FORCED_RUN, 456, EventType.SESSION)
101+
collector.prepareMetrics()
102+
103+
def metrics = collector.drain()
104+
105+
then:
106+
metrics == [
107+
new CiVisibilityMetricData(CiVisibilityCountMetric.ITR_FORCED_RUN.getName(), 123, EventType.MODULE),
108+
new CiVisibilityMetricData(CiVisibilityCountMetric.ITR_FORCED_RUN.getName(), 456, EventType.SESSION)
109+
]
110+
}
111+
112+
def "test different count metrics are not aggregated"() {
113+
setup:
114+
def collector = new CiVisibilityMetricCollectorImpl()
115+
116+
when:
117+
collector.add(CiVisibilityCountMetric.ITR_FORCED_RUN, 123, EventType.MODULE)
118+
collector.add(CiVisibilityCountMetric.ITR_SKIPPED, 456, EventType.MODULE)
119+
collector.prepareMetrics()
120+
121+
def metrics = collector.drain()
122+
123+
then:
124+
metrics.size() == 2
125+
metrics.contains(new CiVisibilityMetricData(CiVisibilityCountMetric.ITR_FORCED_RUN.getName(), 123, EventType.MODULE))
126+
metrics.contains(new CiVisibilityMetricData(CiVisibilityCountMetric.ITR_SKIPPED.getName(), 456, EventType.MODULE))
127+
}
128+
129+
def "test exception is thrown when a distribution metric is tagged with a tag that is not allowed for it"() {
130+
setup:
131+
def collector = new CiVisibilityMetricCollectorImpl()
132+
133+
when:
134+
collector.add(CiVisibilityDistributionMetric.GIT_COMMAND_MS, 123, Library.JACOCO)
135+
136+
then:
137+
thrown IllegalArgumentException
138+
}
139+
140+
def "test exception is thrown when a count metric is tagged with a tag that is not allowed for it"() {
141+
setup:
142+
def collector = new CiVisibilityMetricCollectorImpl()
143+
144+
when:
145+
collector.add(CiVisibilityCountMetric.ITR_FORCED_RUN, 123, Library.JACOCO)
146+
147+
then:
148+
thrown IllegalArgumentException
149+
}
150+
151+
/**
152+
* This test enumerates all possible metric+tags variants,
153+
* then tries submitting all possible variant pairs (combinations of 2 different metric+tags).
154+
* The goal is to ensure that index calculation logic and card-marking are done right.
155+
*/
156+
def "test submission of all possible count metric pairs"() {
157+
setup:
158+
List<PossibleMetric> possibleMetrics = []
159+
160+
for (CiVisibilityCountMetric metric : CiVisibilityCountMetric.values()) {
161+
def metricTags = metric.getTags()
162+
// iterate over all possible combinations of metric tags
163+
for (TagValue[] tags : cartesianProduct(metricTags)) {
164+
possibleMetrics += new PossibleMetric(metric, tags)
165+
}
166+
}
167+
168+
def collector = new CiVisibilityMetricCollectorImpl()
169+
170+
expect:
171+
for (int i = 0; i < possibleMetrics.size() - 1; i++) {
172+
def firstMetric = possibleMetrics.get(i)
173+
for (int j = i + 1; j < possibleMetrics.size(); j++) {
174+
def secondMetric = possibleMetrics.get(j)
175+
176+
// deriving metric values from indices (as indices are unique, it's convenient to check that every metric has the correct value when drained)
177+
// +1 is needed because 0 cannot be used as a metric value - metrics with value 0 are not submitted
178+
int firstMetricValue = i + 1
179+
int secondMetricValue = j + 1
180+
181+
collector.add(firstMetric.metric, firstMetricValue, firstMetric.tags)
182+
collector.add(secondMetric.metric, secondMetricValue, secondMetric.tags)
183+
collector.prepareMetrics()
184+
185+
def metrics = collector.drain()
186+
assert metrics.size() == 2
187+
assert metrics.contains(new CiVisibilityMetricData(firstMetric.metric.getName(), firstMetricValue, firstMetric.tags))
188+
assert metrics.contains(new CiVisibilityMetricData(secondMetric.metric.getName(), secondMetricValue, secondMetric.tags))
189+
}
190+
}
191+
}
192+
193+
private Collection<TagValue[]> cartesianProduct(Class<? extends TagValue>[] sets) {
194+
Collection<TagValue[]> tuples = new ArrayList<>()
195+
cartesianProductBacktrack(sets, tuples, new ArrayDeque<>(), 0)
196+
return tuples
197+
}
198+
199+
private void cartesianProductBacktrack(Class<? extends TagValue>[] sets, Collection<TagValue[]> tuples, Deque<TagValue> currentTuple, int offset) {
200+
if (offset == sets.length) {
201+
int idx = 0
202+
TagValue[] tuple = new TagValue[currentTuple.size()]
203+
for (TagValue element : currentTuple) {
204+
tuple[tuple.length - ++idx] = element
205+
}
206+
tuples.add(tuple)
207+
return
208+
}
209+
210+
// a branch where we omit current tag
211+
cartesianProductBacktrack(sets, tuples, currentTuple, offset + 1)
212+
213+
for (TagValue element : sets[offset].getEnumConstants()) {
214+
currentTuple.push(element)
215+
cartesianProductBacktrack(sets, tuples, currentTuple, offset + 1)
216+
currentTuple.pop()
217+
}
218+
}
219+
220+
private static final class PossibleMetric {
221+
private final CiVisibilityCountMetric metric
222+
private final TagValue[] tags
223+
224+
PossibleMetric(CiVisibilityCountMetric metric, TagValue[] tags) {
225+
this.metric = metric
226+
this.tags = tags
227+
}
228+
}
229+
230+
}

dd-trace-api/src/main/java/datadog/trace/api/config/CiVisibilityConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public final class CiVisibilityConfig {
5353
public static final String CIVISIBILITY_FLAKY_RETRY_ENABLED = "civisibility.flaky.retry.enabled";
5454
public static final String CIVISIBILITY_FLAKY_RETRY_COUNT = "civisibility.flaky.retry.count";
5555
public static final String CIVISIBILITY_MODULE_NAME = "civisibility.module.name";
56+
public static final String CIVISIBILITY_TELEMETRY_ENABLED = "civisibility.telemetry.enabled";
5657

5758
/* COVERAGE SETTINGS */
5859
public static final String CIVISIBILITY_CODE_COVERAGE_ENABLED =

internal-api/build.gradle

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,14 @@ excludedClassesCoverage += [
9191
"datadog.trace.api.civisibility.coverage.TestReport",
9292
"datadog.trace.api.civisibility.coverage.TestReportFileEntry",
9393
"datadog.trace.api.civisibility.coverage.TestReportFileEntry.Segment",
94-
"datadog.trace.api.civisibility.InstrumentationBridge",
9594
"datadog.trace.api.civisibility.events.BuildEventsHandler.ModuleInfo",
95+
"datadog.trace.api.civisibility.telemetry.tag.ErrorType",
96+
"datadog.trace.api.civisibility.telemetry.tag.ExitCode",
97+
"datadog.trace.api.civisibility.telemetry.CiVisibilityCountMetric.IndexHolder",
98+
"datadog.trace.api.civisibility.telemetry.CiVisibilityDistributionMetric",
99+
"datadog.trace.api.civisibility.telemetry.CiVisibilityMetricData",
100+
"datadog.trace.api.civisibility.telemetry.NoOpMetricCollector",
101+
"datadog.trace.api.civisibility.InstrumentationBridge",
96102
// POJO
97103
"datadog.trace.api.git.GitInfo",
98104
"datadog.trace.api.git.GitInfoProvider",

0 commit comments

Comments
 (0)