From 959e08cae9b76b0da544d1f487c5fc82d6941548 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Sun, 27 Jun 2021 21:05:00 +0530 Subject: [PATCH 01/11] Initial Changes: 1. Added ClockskewAdjuster.java, NoOpClockskewAdjuster.java and JaegarBasedClockskewAdjuster.java. 2. Added lombok dependency to gateway-service. --- gateway-service-impl/build.gradle.kts | 2 + .../service/span/ClockskewAdjuster.java | 9 ++ .../span/JaegarBasedClockskewAdjuster.java | 114 ++++++++++++++++++ .../service/span/NoOpClockskewAdjuster.java | 13 ++ .../gateway/service/span/SpanService.java | 5 +- 5 files changed, 142 insertions(+), 1 deletion(-) create mode 100644 gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java create mode 100644 gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java create mode 100644 gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java diff --git a/gateway-service-impl/build.gradle.kts b/gateway-service-impl/build.gradle.kts index 1e27419c..c999b63f 100644 --- a/gateway-service-impl/build.gradle.kts +++ b/gateway-service-impl/build.gradle.kts @@ -29,6 +29,8 @@ dependencies { // Needed by clusters snapshots implementation("com.fasterxml.jackson.core:jackson-annotations:2.11.1") implementation("com.fasterxml.jackson.core:jackson-databind:2.11.1") + annotationProcessor("org.projectlombok:lombok:1.18.18") + compileOnly("org.projectlombok:lombok:1.18.18") testImplementation("org.junit.jupiter:junit-jupiter:5.7.0") testImplementation("org.mockito:mockito-core:3.9.0") diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java new file mode 100644 index 00000000..6142186d --- /dev/null +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java @@ -0,0 +1,9 @@ +package org.hypertrace.gateway.service.span; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import org.hypertrace.gateway.service.v1.span.SpanEvent; + +public interface ClockskewAdjuster { + List adjustSpansForClockSkew(ImmutableList spans); +} diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java new file mode 100644 index 00000000..2221d1cc --- /dev/null +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java @@ -0,0 +1,114 @@ +package org.hypertrace.gateway.service.span; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import lombok.Data; +import lombok.experimental.Accessors; +import org.hypertrace.gateway.service.v1.common.Value; +import org.hypertrace.gateway.service.v1.common.ValueType; +import org.hypertrace.gateway.service.v1.span.SpanEvent; +import org.hypertrace.gateway.service.v1.span.SpanEvent.Builder; + +class JaegarBasedClockskewAdjuster implements ClockskewAdjuster { + + @Override + public List adjustSpansForClockSkew(ImmutableList spans) { + Map idToSpanMap = + spans.parallelStream() + .map( + spanEvent -> { + Map attrMap = spanEvent.getAttributesMap(); + String spanId = attrMap.get("EVENT.id").getString(); + String parentSpanId = attrMap.get("EVENT.parentSpanId").getString(); + long startTime = attrMap.get("EVENT.startTime").getLong(); + long endTime = attrMap.get("EVENT.endTime").getLong(); + return new Span() + .id(spanId) + .startTime(Instant.ofEpochMilli(startTime)) + .endTime(Instant.ofEpochMilli(endTime)) + .parentSpanId(parentSpanId) + .duration( + Duration.between( + Instant.ofEpochMilli(endTime), Instant.ofEpochMilli(startTime))) + .spanEvent(spanEvent); + }) + .collect(toMap(Span::id, span -> span)); + // update child spanId of each span + idToSpanMap.forEach((spanId, span) -> idToSpanMap.get(span.parentSpanId()).childSpanId(spanId)); + // Start from root spans, and adjust each parent-child pair + idToSpanMap.entrySet().stream() + .filter(entry -> null == entry.getValue().parentSpanId()) + .forEach(entry -> adjustSpan(entry.getValue(), idToSpanMap)); + return idToSpanMap.values().stream().map(Span::getAdjustedSpanEvent).collect(toList()); + } + + private void adjustSpan(Span span, Map idToSpanMap) { + if (null != span && null != span.childSpanId()) { + Span childSpan = idToSpanMap.get(span.childSpanId()); + Duration adjustment = getAdjustmentForChildSpan(childSpan, span); + adjustTimestamp(childSpan, adjustment); + adjustSpan(childSpan, idToSpanMap); + } + } + + private void adjustTimestamp(Span childSpan, Duration adjustment) { + childSpan.startTime().plus(adjustment); + } + + private Duration getAdjustmentForChildSpan(Span childSpan, Span parentSpan) { + // if child span is greater than parent span + if (childSpan.duration().compareTo(parentSpan.duration()) > 0) { + // in this case, we can only ensure that it does not start before its parent + if (childSpan.startTime().isBefore(parentSpan.startTime())) { + return Duration.between(parentSpan.startTime(), childSpan.startTime()); + } + return Duration.ofMillis(0); + } + // if child already fits in its parent, do not adjust + if (!childSpan.startTime().isBefore(parentSpan.startTime()) + && !childSpan.endTime().isAfter(parentSpan.endTime())) { + return Duration.ofMillis(0); + } + var latency = (parentSpan.duration().minus(childSpan.duration()).toMillis()) >> 1; + return Duration.between(childSpan.startTime(), parentSpan.startTime().plusMillis(latency)); + } + + @Data + @Accessors(chain = true, fluent = true) + private static class Span { + private String id; + private String parentSpanId; + private String childSpanId; + private Instant startTime; + private Instant endTime; + private Duration duration; + private SpanEvent spanEvent; + + public SpanEvent getAdjustedSpanEvent() { + Preconditions.checkArgument(null != spanEvent); + Preconditions.checkArgument(null != startTime); + Preconditions.checkArgument(null != endTime); + Builder builder = SpanEvent.newBuilder().putAllAttributes(spanEvent.getAttributesMap()); + builder.putAttributes( + "EVENT.startTime", + Value.newBuilder() + .setLong(startTime.toEpochMilli()) + .setValueType(ValueType.LONG) + .build()); + builder.putAttributes( + "EVENT.endTime", + Value.newBuilder() + .setLong(endTime.toEpochMilli()) + .setValueType(ValueType.LONG) + .build()); + return builder.build(); + } + } +} diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java new file mode 100644 index 00000000..27520705 --- /dev/null +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java @@ -0,0 +1,13 @@ +package org.hypertrace.gateway.service.span; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import org.hypertrace.gateway.service.v1.span.SpanEvent; + +public class NoOpClockskewAdjuster implements ClockskewAdjuster{ + + @Override + public List adjustSpansForClockSkew(ImmutableList spans) { + return spans; + } +} diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanService.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanService.java index 95c0020c..8d6e457d 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanService.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanService.java @@ -5,6 +5,7 @@ import static org.hypertrace.gateway.service.common.util.AttributeMetadataUtil.getTimestampAttributeId; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.micrometer.core.instrument.Timer; import java.time.Duration; @@ -44,6 +45,7 @@ public class SpanService { private final QueryServiceClient queryServiceClient; private final int requestTimeout; private final AttributeMetadataProvider attributeMetadataProvider; + private final ClockskewAdjuster clockskewAdjuster; private Timer queryExecutionTimer; @@ -54,6 +56,7 @@ public SpanService( this.queryServiceClient = queryServiceClient; this.requestTimeout = requestTimeout; this.attributeMetadataProvider = attributeMetadataProvider; + this.clockskewAdjuster = new JaegarBasedClockskewAdjuster(); initMetrics(); } @@ -139,7 +142,7 @@ List filterSpanEvents( spanEventsResult.add(spanEventBuilder.build()); } } - return spanEventsResult; + return clockskewAdjuster.adjustSpansForClockSkew(ImmutableList.copyOf(spanEventsResult)); } // Adds the sort, limit and offset information to the QueryService if it is requested From df7eda4b466918752c93fd5300bbff6cc045716d Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 28 Jun 2021 15:14:37 +0530 Subject: [PATCH 02/11] 1. Added SpanProcessingStage.java. 2. Added SpanTransformationPipeline.java. 3. Added factory for ClockskewAdjuster.java. --- .../gateway/service/GatewayServiceImpl.java | 14 ++++++-- .../service/span/ClockskewAdjuster.java | 10 +++--- .../service/span/ClockskewAdjusters.java | 19 +++++++++++ .../span/JaegarBasedClockskewAdjuster.java | 3 +- .../service/span/NoOpClockskewAdjuster.java | 8 ++--- .../service/span/SpanProcessingStage.java | 8 +++++ .../gateway/service/span/SpanService.java | 18 +++++++--- .../span/SpanTransformationPipeline.java | 34 +++++++++++++++++++ .../resources/configs/common/application.conf | 3 +- 9 files changed, 97 insertions(+), 20 deletions(-) create mode 100644 gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjusters.java create mode 100644 gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanProcessingStage.java create mode 100644 gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationPipeline.java diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/GatewayServiceImpl.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/GatewayServiceImpl.java index 5c7a9931..0cc2fa8f 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/GatewayServiceImpl.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/GatewayServiceImpl.java @@ -26,6 +26,7 @@ import org.hypertrace.gateway.service.entity.config.LogConfig; import org.hypertrace.gateway.service.explore.ExploreService; import org.hypertrace.gateway.service.logevent.LogEventsService; +import org.hypertrace.gateway.service.span.ClockskewAdjuster; import org.hypertrace.gateway.service.span.SpanService; import org.hypertrace.gateway.service.trace.TracesService; import org.hypertrace.gateway.service.v1.baseline.BaselineEntitiesRequest; @@ -78,6 +79,8 @@ public GatewayServiceImpl(Config appConfig) { new QueryServiceClient(new QueryServiceConfig(qsConfig)); int qsRequestTimeout = getRequestTimeoutMillis(qsConfig); + Config clockskewConfig = appConfig.getConfig("clockskew.adjuster"); + EntityServiceClientConfig esConfig = EntityServiceClientConfig.from(appConfig); ManagedChannel entityServiceChannel = ManagedChannelBuilder.forAddress(esConfig.getHost(), esConfig.getPort()) @@ -91,7 +94,11 @@ public GatewayServiceImpl(Config appConfig) { new TracesService( queryServiceClient, qsRequestTimeout, attributeMetadataProvider, scopeFilterConfigs); this.spanService = - new SpanService(queryServiceClient, qsRequestTimeout, attributeMetadataProvider); + new SpanService( + queryServiceClient, + qsRequestTimeout, + attributeMetadataProvider, + ClockskewAdjuster.getAdjuster(appConfig)); this.entityService = new EntityService( queryServiceClient, @@ -174,8 +181,11 @@ public void getSpans( org.hypertrace.core.grpcutils.context.RequestContext.CURRENT .get() .getRequestHeaders()); + SpansResponse response = spanService.getSpansByFilter(context, request); - responseObserver.onNext(response); + SpansResponse spansResponse = spanService.processSpans(response.getSpansList()); + + responseObserver.onNext(spansResponse); responseObserver.onCompleted(); } catch (Exception e) { LOG.error("Error while handling spans request: {}", request, e); diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java index 6142186d..93ee4b3b 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java @@ -1,9 +1,9 @@ package org.hypertrace.gateway.service.span; -import com.google.common.collect.ImmutableList; -import java.util.List; -import org.hypertrace.gateway.service.v1.span.SpanEvent; +import com.typesafe.config.Config; -public interface ClockskewAdjuster { - List adjustSpansForClockSkew(ImmutableList spans); +public interface ClockskewAdjuster extends SpanProcessingStage { + static ClockskewAdjuster getAdjuster(Config appConfig) { + return ClockskewAdjusters.getAdjuster("noop"); + } } diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjusters.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjusters.java new file mode 100644 index 00000000..cb1ace57 --- /dev/null +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjusters.java @@ -0,0 +1,19 @@ +package org.hypertrace.gateway.service.span; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; + +class ClockskewAdjusters { + private static final Map> REGISTRY = + new HashMap<>(); + + static { + REGISTRY.put("jaeger", JaegarBasedClockskewAdjuster::new); + REGISTRY.put("noop", NoOpClockskewAdjuster::new); + } + + static ClockskewAdjuster getAdjuster(String type) { + return REGISTRY.get(type).get(); + } +} diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java index 2221d1cc..4597eb65 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java @@ -4,7 +4,6 @@ import static java.util.stream.Collectors.toMap; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import java.time.Duration; import java.time.Instant; import java.util.List; @@ -19,7 +18,7 @@ class JaegarBasedClockskewAdjuster implements ClockskewAdjuster { @Override - public List adjustSpansForClockSkew(ImmutableList spans) { + public List process(List spans) { Map idToSpanMap = spans.parallelStream() .map( diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java index 27520705..7c7c7ac2 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java @@ -1,13 +1,11 @@ package org.hypertrace.gateway.service.span; -import com.google.common.collect.ImmutableList; +import java.util.ArrayList; import java.util.List; import org.hypertrace.gateway.service.v1.span.SpanEvent; public class NoOpClockskewAdjuster implements ClockskewAdjuster{ - - @Override - public List adjustSpansForClockSkew(ImmutableList spans) { - return spans; + public List process(List spans) { + return new ArrayList<>(spans); } } diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanProcessingStage.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanProcessingStage.java new file mode 100644 index 00000000..48230330 --- /dev/null +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanProcessingStage.java @@ -0,0 +1,8 @@ +package org.hypertrace.gateway.service.span; + +import java.util.List; +import org.hypertrace.gateway.service.v1.span.SpanEvent; + +interface SpanProcessingStage { + List process(List spans); +} diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanService.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanService.java index 8d6e457d..3dbd5b1e 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanService.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanService.java @@ -5,7 +5,6 @@ import static org.hypertrace.gateway.service.common.util.AttributeMetadataUtil.getTimestampAttributeId; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.micrometer.core.instrument.Timer; import java.time.Duration; @@ -45,18 +44,19 @@ public class SpanService { private final QueryServiceClient queryServiceClient; private final int requestTimeout; private final AttributeMetadataProvider attributeMetadataProvider; - private final ClockskewAdjuster clockskewAdjuster; + private final SpanTransformationPipeline pipeline; private Timer queryExecutionTimer; public SpanService( QueryServiceClient queryServiceClient, int requestTimeout, - AttributeMetadataProvider attributeMetadataProvider) { + AttributeMetadataProvider attributeMetadataProvider, + ClockskewAdjuster clockskewAdjuster) { this.queryServiceClient = queryServiceClient; this.requestTimeout = requestTimeout; this.attributeMetadataProvider = attributeMetadataProvider; - this.clockskewAdjuster = new JaegarBasedClockskewAdjuster(); + pipeline = SpanTransformationPipeline.getNewPipeline().addProcessingStage(clockskewAdjuster); initMetrics(); } @@ -65,6 +65,14 @@ private void initMetrics() { PlatformMetricsRegistry.registerTimer("hypertrace.span.query.execution", ImmutableMap.of()); } + public SpansResponse processSpans(List spans) { + List processedSpans = pipeline.execute(spans); + return SpansResponse.newBuilder() + .addAllSpans(processedSpans) + .setTotal(processedSpans.size()) + .build(); + } + public SpansResponse getSpansByFilter(RequestContext context, SpansRequest request) { Instant start = Instant.now(); try { @@ -142,7 +150,7 @@ List filterSpanEvents( spanEventsResult.add(spanEventBuilder.build()); } } - return clockskewAdjuster.adjustSpansForClockSkew(ImmutableList.copyOf(spanEventsResult)); + return spanEventsResult; } // Adds the sort, limit and offset information to the QueryService if it is requested diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationPipeline.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationPipeline.java new file mode 100644 index 00000000..0df16bc9 --- /dev/null +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationPipeline.java @@ -0,0 +1,34 @@ +package org.hypertrace.gateway.service.span; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; +import org.hypertrace.gateway.service.v1.span.SpanEvent; + +class SpanTransformationPipeline { + + private final Function, List> pipeline; + + private SpanTransformationPipeline() { + this.pipeline = ArrayList::new; + } + + public static SpanTransformationPipeline getNewPipeline() { + return new SpanTransformationPipeline(); + } + + private SpanTransformationPipeline( + Function, List> pipeline) { + this.pipeline = pipeline; + } + + public SpanTransformationPipeline addProcessingStage(SpanProcessingStage processingStage) { + Function, List> updatedPipeline = + pipeline.andThen(processingStage::process); + return new SpanTransformationPipeline(updatedPipeline); + } + + public List execute(List spans) { + return pipeline.apply(spans); + } +} diff --git a/gateway-service/src/main/resources/configs/common/application.conf b/gateway-service/src/main/resources/configs/common/application.conf index 0319460f..e428ecb0 100644 --- a/gateway-service/src/main/resources/configs/common/application.conf +++ b/gateway-service/src/main/resources/configs/common/application.conf @@ -1,6 +1,7 @@ main.class = org.hypertrace.gateway.service.GatewayServiceStarter service.name = gateway-service -service.admin.port = 50072 +service.admin.port = 5007 +clockskew.adjuster = noop entity.service.config = { host = localhost From 8389c3b1f84a7d5b4e614bbf7d119b9636e92636 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Tue, 29 Jun 2021 23:47:46 +0530 Subject: [PATCH 03/11] 1. Properly implemented JaegarBasedClockskewAdjuster.java. 2. Refactored pipeline code. --- .../gateway/service/GatewayServiceImpl.java | 2 - .../service/span/ClockskewAdjuster.java | 4 +- .../service/span/ClockskewAdjusters.java | 6 +- .../span/JaegarBasedClockskewAdjuster.java | 95 +++++++++---------- .../service/span/NoOpClockskewAdjuster.java | 2 +- .../gateway/service/span/SpanService.java | 9 +- .../span/SpanTransformationPipeline.java | 17 +++- ...tage.java => SpanTransformationStage.java} | 4 +- 8 files changed, 74 insertions(+), 65 deletions(-) rename gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/{SpanProcessingStage.java => SpanTransformationStage.java} (53%) diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/GatewayServiceImpl.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/GatewayServiceImpl.java index 0cc2fa8f..4c542475 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/GatewayServiceImpl.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/GatewayServiceImpl.java @@ -79,8 +79,6 @@ public GatewayServiceImpl(Config appConfig) { new QueryServiceClient(new QueryServiceConfig(qsConfig)); int qsRequestTimeout = getRequestTimeoutMillis(qsConfig); - Config clockskewConfig = appConfig.getConfig("clockskew.adjuster"); - EntityServiceClientConfig esConfig = EntityServiceClientConfig.from(appConfig); ManagedChannel entityServiceChannel = ManagedChannelBuilder.forAddress(esConfig.getHost(), esConfig.getPort()) diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java index 93ee4b3b..add66ccf 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java @@ -2,8 +2,8 @@ import com.typesafe.config.Config; -public interface ClockskewAdjuster extends SpanProcessingStage { +public interface ClockskewAdjuster extends SpanTransformationStage { static ClockskewAdjuster getAdjuster(Config appConfig) { - return ClockskewAdjusters.getAdjuster("noop"); + return new NoOpClockskewAdjuster(); } } diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjusters.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjusters.java index cb1ace57..543ca961 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjusters.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjusters.java @@ -14,6 +14,10 @@ class ClockskewAdjusters { } static ClockskewAdjuster getAdjuster(String type) { - return REGISTRY.get(type).get(); + Supplier supplier = REGISTRY.get(type); + if (null == supplier) { + return REGISTRY.get("noop").get(); + } + return supplier.get(); } } diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java index 4597eb65..60637c92 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java @@ -3,9 +3,9 @@ import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; -import com.google.common.base.Preconditions; import java.time.Duration; import java.time.Instant; +import java.util.HashMap; import java.util.List; import java.util.Map; import lombok.Data; @@ -13,47 +13,67 @@ import org.hypertrace.gateway.service.v1.common.Value; import org.hypertrace.gateway.service.v1.common.ValueType; import org.hypertrace.gateway.service.v1.span.SpanEvent; -import org.hypertrace.gateway.service.v1.span.SpanEvent.Builder; class JaegarBasedClockskewAdjuster implements ClockskewAdjuster { @Override - public List process(List spans) { + public List process(List spans) { + Map parentChildMap = new HashMap<>(); Map idToSpanMap = - spans.parallelStream() + spans.stream() .map( - spanEvent -> { - Map attrMap = spanEvent.getAttributesMap(); - String spanId = attrMap.get("EVENT.id").getString(); - String parentSpanId = attrMap.get("EVENT.parentSpanId").getString(); - long startTime = attrMap.get("EVENT.startTime").getLong(); - long endTime = attrMap.get("EVENT.endTime").getLong(); + spanBuilder -> { + Map attributesMap = spanBuilder.getAttributesMap(); + String spanId = attributesMap.get("EVENT.id").getString(), + parentSpanId = attributesMap.get("EVENT.parentSpanId").getString(); + Instant + startTime = + Instant.ofEpochMilli(attributesMap.get("EVENT.startTime").getLong()), + endTime = Instant.ofEpochMilli(attributesMap.get("EVENT.endTime").getLong()); + Duration duration = Duration.between(startTime, endTime); return new Span() .id(spanId) - .startTime(Instant.ofEpochMilli(startTime)) - .endTime(Instant.ofEpochMilli(endTime)) .parentSpanId(parentSpanId) - .duration( - Duration.between( - Instant.ofEpochMilli(endTime), Instant.ofEpochMilli(startTime))) - .spanEvent(spanEvent); + .startTime(startTime) + .endTime(endTime) + .duration(duration) + .spanBuilder(spanBuilder); + }) + .peek( + span -> { + if (null != span.parentSpanId()) { + parentChildMap.putIfAbsent(span.parentSpanId(), span.id()); + } }) .collect(toMap(Span::id, span -> span)); - // update child spanId of each span - idToSpanMap.forEach((spanId, span) -> idToSpanMap.get(span.parentSpanId()).childSpanId(spanId)); // Start from root spans, and adjust each parent-child pair - idToSpanMap.entrySet().stream() - .filter(entry -> null == entry.getValue().parentSpanId()) - .forEach(entry -> adjustSpan(entry.getValue(), idToSpanMap)); - return idToSpanMap.values().stream().map(Span::getAdjustedSpanEvent).collect(toList()); + return idToSpanMap.entrySet().stream() + .filter(span -> null == span.getValue().parentSpanId()) + .peek(entry -> adjustSpan(entry.getValue(), idToSpanMap, parentChildMap)) + .map( + entry -> { + Span updatedSpan = entry.getValue(); + entry + .getValue() + .spanBuilder() + .putAttributes( + "EVENT.startTime", + Value.newBuilder() + .setValueType(ValueType.LONG) + .setLong(updatedSpan.startTime().toEpochMilli()) + .build()); + return updatedSpan.spanBuilder(); + }) + .collect(toList()); } - private void adjustSpan(Span span, Map idToSpanMap) { - if (null != span && null != span.childSpanId()) { - Span childSpan = idToSpanMap.get(span.childSpanId()); + private void adjustSpan( + Span span, Map idToSpanMap, Map parentToChildMap) { + if (null != span) { + Span childSpan = idToSpanMap.get(parentToChildMap.get(span.id())); Duration adjustment = getAdjustmentForChildSpan(childSpan, span); adjustTimestamp(childSpan, adjustment); - adjustSpan(childSpan, idToSpanMap); + adjustSpan(childSpan, idToSpanMap, parentToChildMap); } } @@ -84,30 +104,9 @@ private Duration getAdjustmentForChildSpan(Span childSpan, Span parentSpan) { private static class Span { private String id; private String parentSpanId; - private String childSpanId; private Instant startTime; private Instant endTime; private Duration duration; - private SpanEvent spanEvent; - - public SpanEvent getAdjustedSpanEvent() { - Preconditions.checkArgument(null != spanEvent); - Preconditions.checkArgument(null != startTime); - Preconditions.checkArgument(null != endTime); - Builder builder = SpanEvent.newBuilder().putAllAttributes(spanEvent.getAttributesMap()); - builder.putAttributes( - "EVENT.startTime", - Value.newBuilder() - .setLong(startTime.toEpochMilli()) - .setValueType(ValueType.LONG) - .build()); - builder.putAttributes( - "EVENT.endTime", - Value.newBuilder() - .setLong(endTime.toEpochMilli()) - .setValueType(ValueType.LONG) - .build()); - return builder.build(); - } + private SpanEvent.Builder spanBuilder; } } diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java index 7c7c7ac2..e4d0f75d 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java @@ -5,7 +5,7 @@ import org.hypertrace.gateway.service.v1.span.SpanEvent; public class NoOpClockskewAdjuster implements ClockskewAdjuster{ - public List process(List spans) { + public List process(List spans) { return new ArrayList<>(spans); } } diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanService.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanService.java index 3dbd5b1e..47e6b8c1 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanService.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanService.java @@ -44,7 +44,7 @@ public class SpanService { private final QueryServiceClient queryServiceClient; private final int requestTimeout; private final AttributeMetadataProvider attributeMetadataProvider; - private final SpanTransformationPipeline pipeline; + private final SpanTransformationPipeline spanTransformationPipeline; private Timer queryExecutionTimer; @@ -56,7 +56,7 @@ public SpanService( this.queryServiceClient = queryServiceClient; this.requestTimeout = requestTimeout; this.attributeMetadataProvider = attributeMetadataProvider; - pipeline = SpanTransformationPipeline.getNewPipeline().addProcessingStage(clockskewAdjuster); + spanTransformationPipeline = SpanTransformationPipeline.getNewPipeline().addProcessingStage(clockskewAdjuster); initMetrics(); } @@ -66,7 +66,7 @@ private void initMetrics() { } public SpansResponse processSpans(List spans) { - List processedSpans = pipeline.execute(spans); + List processedSpans = spanTransformationPipeline.execute(spans); return SpansResponse.newBuilder() .addAllSpans(processedSpans) .setTotal(processedSpans.size()) @@ -100,7 +100,8 @@ Collection filterSpans( RequestContext context, SpansRequest request, Map attributeMetadataMap) { - return filterSpanEvents(context, request, attributeMetadataMap); + return spanTransformationPipeline + .execute(filterSpanEvents(context, request, attributeMetadataMap)); } @VisibleForTesting diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationPipeline.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationPipeline.java index 0df16bc9..e094c9aa 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationPipeline.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationPipeline.java @@ -1,13 +1,16 @@ package org.hypertrace.gateway.service.span; +import static java.util.stream.Collectors.toList; + import java.util.ArrayList; import java.util.List; import java.util.function.Function; import org.hypertrace.gateway.service.v1.span.SpanEvent; +import org.hypertrace.gateway.service.v1.span.SpanEvent.Builder; class SpanTransformationPipeline { - private final Function, List> pipeline; + private final Function, List> pipeline; private SpanTransformationPipeline() { this.pipeline = ArrayList::new; @@ -18,17 +21,21 @@ public static SpanTransformationPipeline getNewPipeline() { } private SpanTransformationPipeline( - Function, List> pipeline) { + Function, List> pipeline) { this.pipeline = pipeline; } - public SpanTransformationPipeline addProcessingStage(SpanProcessingStage processingStage) { - Function, List> updatedPipeline = + public SpanTransformationPipeline addProcessingStage(SpanTransformationStage processingStage) { + Function, List> updatedPipeline = pipeline.andThen(processingStage::process); return new SpanTransformationPipeline(updatedPipeline); } public List execute(List spans) { - return pipeline.apply(spans); + List mutableSpans = + spans.stream() + .map(span -> SpanEvent.newBuilder().putAllAttributes(span.getAttributesMap())) + .collect(toList()); + return pipeline.apply(mutableSpans).stream().map(SpanEvent.Builder::build).collect(toList()); } } diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanProcessingStage.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java similarity index 53% rename from gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanProcessingStage.java rename to gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java index 48230330..238ec162 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanProcessingStage.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java @@ -3,6 +3,6 @@ import java.util.List; import org.hypertrace.gateway.service.v1.span.SpanEvent; -interface SpanProcessingStage { - List process(List spans); +interface SpanTransformationStage { + List process(List spans); } From eba61d0edb4e1f99511e00bce65861d3ffdb040e Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Wed, 30 Jun 2021 14:47:55 +0530 Subject: [PATCH 04/11] Fixed fallback config issue --- .../gateway/service/span/ClockskewAdjuster.java | 13 ++++++++++++- .../service/span/JaegarBasedClockskewAdjuster.java | 2 +- .../gateway/service/span/NoOpClockskewAdjuster.java | 2 +- .../service/span/SpanTransformationPipeline.java | 2 +- .../service/span/SpanTransformationStage.java | 2 +- 5 files changed, 16 insertions(+), 5 deletions(-) diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java index add66ccf..d8f21dbb 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java @@ -1,9 +1,20 @@ package org.hypertrace.gateway.service.span; import com.typesafe.config.Config; +import com.typesafe.config.ConfigException; public interface ClockskewAdjuster extends SpanTransformationStage { + + String CONFIG_PATH = "clockskew.adjuster"; + String FALLBACK_TYPE = "noop"; + static ClockskewAdjuster getAdjuster(Config appConfig) { - return new NoOpClockskewAdjuster(); + String type; + try { + type = appConfig.getString(CONFIG_PATH); + return ClockskewAdjusters.getAdjuster(type); + } catch (ConfigException.Missing | ConfigException.WrongType e) { + return ClockskewAdjusters.getAdjuster(FALLBACK_TYPE); + } } } diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java index 60637c92..210d61e9 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java @@ -17,7 +17,7 @@ class JaegarBasedClockskewAdjuster implements ClockskewAdjuster { @Override - public List process(List spans) { + public List transform(List spans) { Map parentChildMap = new HashMap<>(); Map idToSpanMap = spans.stream() diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java index e4d0f75d..1e61437f 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java @@ -5,7 +5,7 @@ import org.hypertrace.gateway.service.v1.span.SpanEvent; public class NoOpClockskewAdjuster implements ClockskewAdjuster{ - public List process(List spans) { + public List transform(List spans) { return new ArrayList<>(spans); } } diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationPipeline.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationPipeline.java index e094c9aa..d985d37a 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationPipeline.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationPipeline.java @@ -27,7 +27,7 @@ private SpanTransformationPipeline( public SpanTransformationPipeline addProcessingStage(SpanTransformationStage processingStage) { Function, List> updatedPipeline = - pipeline.andThen(processingStage::process); + pipeline.andThen(processingStage::transform); return new SpanTransformationPipeline(updatedPipeline); } diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java index 238ec162..3a1530da 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java @@ -4,5 +4,5 @@ import org.hypertrace.gateway.service.v1.span.SpanEvent; interface SpanTransformationStage { - List process(List spans); + List transform(List spans); } From 81bbbdf68e7ea7515790977bb5ba542d0d7e3d40 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Wed, 30 Jun 2021 15:35:48 +0530 Subject: [PATCH 05/11] Added documentation --- .../gateway/service/GatewayServiceImpl.java | 3 +- .../service/span/ClockskewAdjuster.java | 42 ++++++++++++++++--- .../service/span/ClockskewAdjusters.java | 16 +++++-- .../span/JaegarBasedClockskewAdjuster.java | 15 +------ .../service/span/NoOpClockskewAdjuster.java | 2 +- .../gateway/service/span/SpanService.java | 15 ++----- .../span/SpanTransformationPipeline.java | 5 +++ .../service/span/SpanTransformationStage.java | 3 ++ 8 files changed, 64 insertions(+), 37 deletions(-) diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/GatewayServiceImpl.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/GatewayServiceImpl.java index 4c542475..2e06a9cb 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/GatewayServiceImpl.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/GatewayServiceImpl.java @@ -181,9 +181,8 @@ public void getSpans( .getRequestHeaders()); SpansResponse response = spanService.getSpansByFilter(context, request); - SpansResponse spansResponse = spanService.processSpans(response.getSpansList()); - responseObserver.onNext(spansResponse); + responseObserver.onNext(response); responseObserver.onCompleted(); } catch (Exception e) { LOG.error("Error while handling spans request: {}", request, e); diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java index d8f21dbb..3c43befd 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java @@ -1,20 +1,50 @@ package org.hypertrace.gateway.service.span; import com.typesafe.config.Config; -import com.typesafe.config.ConfigException; +import java.time.Duration; +import java.time.Instant; +import lombok.Data; +import lombok.experimental.Accessors; +import org.hypertrace.gateway.service.v1.span.SpanEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public interface ClockskewAdjuster extends SpanTransformationStage { +public abstract class ClockskewAdjuster implements SpanTransformationStage { - String CONFIG_PATH = "clockskew.adjuster"; - String FALLBACK_TYPE = "noop"; + private static final Logger LOG = LoggerFactory.getLogger(ClockskewAdjuster.class); - static ClockskewAdjuster getAdjuster(Config appConfig) { + private static final String CONFIG_PATH = "clockskew.adjuster"; + private static final String FALLBACK_TYPE = "noop"; + + /** + * Returns a concrete implementation of {@link ClockskewAdjuster} based on the supplied type. If + * an incorrect configuration is supplied, then it fallbacks to {@link NoOpClockskewAdjuster}. + * This method never throws and exception. Any exception propogated up from downstream is + * swallowed and the fallback adjuster is returned + * + * @param appConfig the app configuration + * @return a concrete implementation of {@link ClockskewAdjuster} + */ + public static ClockskewAdjuster getAdjuster(Config appConfig) { String type; try { type = appConfig.getString(CONFIG_PATH); return ClockskewAdjusters.getAdjuster(type); - } catch (ConfigException.Missing | ConfigException.WrongType e) { + } catch (Exception e) { + LOG.warn( + "Some exception occurred while trying to get the clockskew adjuster, falling back to no-op adjuster"); return ClockskewAdjusters.getAdjuster(FALLBACK_TYPE); } } + + @Data + @Accessors(chain = true, fluent = true) + static class Span { + private String id; + private String parentSpanId; + private Instant startTime; + private Instant endTime; + private Duration duration; + private SpanEvent.Builder spanBuilder; + } } diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjusters.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjusters.java index 543ca961..013b9544 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjusters.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjusters.java @@ -3,20 +3,30 @@ import java.util.HashMap; import java.util.Map; import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +/** Factory to return concrete instances of {@link ClockskewAdjuster} based on the supplied type */ class ClockskewAdjusters { + private static final Map> REGISTRY = new HashMap<>(); + private static final Logger LOG = LoggerFactory.getLogger(ClockskewAdjusters.class); + + private static final String JAEGAR_BASED = "jaegar"; + private static final String NOOP_BASED = "noop"; + static { - REGISTRY.put("jaeger", JaegarBasedClockskewAdjuster::new); - REGISTRY.put("noop", NoOpClockskewAdjuster::new); + REGISTRY.put(JAEGAR_BASED, JaegarBasedClockskewAdjuster::new); + REGISTRY.put(NOOP_BASED, NoOpClockskewAdjuster::new); } static ClockskewAdjuster getAdjuster(String type) { Supplier supplier = REGISTRY.get(type); if (null == supplier) { - return REGISTRY.get("noop").get(); + LOG.warn("No clockskew adjuster for the supplied config, falling back to no-op adjuster"); + return REGISTRY.get(NOOP_BASED).get(); } return supplier.get(); } diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java index 210d61e9..04914ba3 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java @@ -8,13 +8,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import lombok.Data; -import lombok.experimental.Accessors; import org.hypertrace.gateway.service.v1.common.Value; import org.hypertrace.gateway.service.v1.common.ValueType; import org.hypertrace.gateway.service.v1.span.SpanEvent; -class JaegarBasedClockskewAdjuster implements ClockskewAdjuster { +class JaegarBasedClockskewAdjuster extends ClockskewAdjuster { @Override public List transform(List spans) { @@ -98,15 +96,4 @@ private Duration getAdjustmentForChildSpan(Span childSpan, Span parentSpan) { var latency = (parentSpan.duration().minus(childSpan.duration()).toMillis()) >> 1; return Duration.between(childSpan.startTime(), parentSpan.startTime().plusMillis(latency)); } - - @Data - @Accessors(chain = true, fluent = true) - private static class Span { - private String id; - private String parentSpanId; - private Instant startTime; - private Instant endTime; - private Duration duration; - private SpanEvent.Builder spanBuilder; - } } diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java index 1e61437f..52a4db9d 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java @@ -4,7 +4,7 @@ import java.util.List; import org.hypertrace.gateway.service.v1.span.SpanEvent; -public class NoOpClockskewAdjuster implements ClockskewAdjuster{ +public class NoOpClockskewAdjuster extends ClockskewAdjuster{ public List transform(List spans) { return new ArrayList<>(spans); } diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanService.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanService.java index 47e6b8c1..69ecc2b1 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanService.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanService.java @@ -56,7 +56,8 @@ public SpanService( this.queryServiceClient = queryServiceClient; this.requestTimeout = requestTimeout; this.attributeMetadataProvider = attributeMetadataProvider; - spanTransformationPipeline = SpanTransformationPipeline.getNewPipeline().addProcessingStage(clockskewAdjuster); + spanTransformationPipeline = + SpanTransformationPipeline.getNewPipeline().addProcessingStage(clockskewAdjuster); initMetrics(); } @@ -65,14 +66,6 @@ private void initMetrics() { PlatformMetricsRegistry.registerTimer("hypertrace.span.query.execution", ImmutableMap.of()); } - public SpansResponse processSpans(List spans) { - List processedSpans = spanTransformationPipeline.execute(spans); - return SpansResponse.newBuilder() - .addAllSpans(processedSpans) - .setTotal(processedSpans.size()) - .build(); - } - public SpansResponse getSpansByFilter(RequestContext context, SpansRequest request) { Instant start = Instant.now(); try { @@ -100,8 +93,8 @@ Collection filterSpans( RequestContext context, SpansRequest request, Map attributeMetadataMap) { - return spanTransformationPipeline - .execute(filterSpanEvents(context, request, attributeMetadataMap)); + return spanTransformationPipeline.execute( + filterSpanEvents(context, request, attributeMetadataMap)); } @VisibleForTesting diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationPipeline.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationPipeline.java index d985d37a..f2902fd5 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationPipeline.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationPipeline.java @@ -31,6 +31,11 @@ public SpanTransformationPipeline addProcessingStage(SpanTransformationStage pro return new SpanTransformationPipeline(updatedPipeline); } + /** + * Processes the passed list of spans through the pipeline + * @param spans list of spans to process + * @return processed spans + */ public List execute(List spans) { List mutableSpans = spans.stream() diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java index 3a1530da..9e28b257 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java @@ -3,6 +3,9 @@ import java.util.List; import org.hypertrace.gateway.service.v1.span.SpanEvent; +/** + * Represents a transformation stage in the {@link SpanTransformationPipeline} + */ interface SpanTransformationStage { List transform(List spans); } From afaf2b744b6c646b6d7cc614dbc3fa4de13a0db0 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Wed, 30 Jun 2021 15:41:32 +0530 Subject: [PATCH 06/11] Some more documentation --- .../hypertrace/gateway/service/span/ClockskewAdjuster.java | 3 ++- .../gateway/service/span/SpanTransformationPipeline.java | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java index 3c43befd..e9a26d23 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java @@ -19,7 +19,7 @@ public abstract class ClockskewAdjuster implements SpanTransformationStage { /** * Returns a concrete implementation of {@link ClockskewAdjuster} based on the supplied type. If * an incorrect configuration is supplied, then it fallbacks to {@link NoOpClockskewAdjuster}. - * This method never throws and exception. Any exception propogated up from downstream is + * This method never throws an exception. Any exception propagated up from downstream is * swallowed and the fallback adjuster is returned * * @param appConfig the app configuration @@ -45,6 +45,7 @@ static class Span { private Instant startTime; private Instant endTime; private Duration duration; + //the mutable builder object private SpanEvent.Builder spanBuilder; } } diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationPipeline.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationPipeline.java index f2902fd5..ff293e6c 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationPipeline.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationPipeline.java @@ -8,6 +8,9 @@ import org.hypertrace.gateway.service.v1.span.SpanEvent; import org.hypertrace.gateway.service.v1.span.SpanEvent.Builder; +/** + * A pipeline of handlers that transform a list of spans through it. This class is not thread-safe. + */ class SpanTransformationPipeline { private final Function, List> pipeline; @@ -33,6 +36,7 @@ public SpanTransformationPipeline addProcessingStage(SpanTransformationStage pro /** * Processes the passed list of spans through the pipeline + * * @param spans list of spans to process * @return processed spans */ From 9ef314322bed1c6ef4eec1e4846ae21b18013caa Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Wed, 30 Jun 2021 15:43:32 +0530 Subject: [PATCH 07/11] Removed unneeded file --- .../src/main/resources/configs/common/application.conf | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/gateway-service/src/main/resources/configs/common/application.conf b/gateway-service/src/main/resources/configs/common/application.conf index e428ecb0..0319460f 100644 --- a/gateway-service/src/main/resources/configs/common/application.conf +++ b/gateway-service/src/main/resources/configs/common/application.conf @@ -1,7 +1,6 @@ main.class = org.hypertrace.gateway.service.GatewayServiceStarter service.name = gateway-service -service.admin.port = 5007 -clockskew.adjuster = noop +service.admin.port = 50072 entity.service.config = { host = localhost From f27b1049cab6305bdf38aa55aba119292471ac59 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Sun, 4 Jul 2021 19:18:04 +0530 Subject: [PATCH 08/11] Pop back --- .../hypertrace/gateway/service/span/ClockskewAdjuster.java | 6 +++--- .../gateway/service/span/NoOpClockskewAdjuster.java | 2 +- .../gateway/service/span/SpanTransformationStage.java | 4 +--- .../hypertrace/gateway/service/GatewayServiceImplTest.java | 1 + 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java index e9a26d23..49a012c4 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java @@ -19,8 +19,8 @@ public abstract class ClockskewAdjuster implements SpanTransformationStage { /** * Returns a concrete implementation of {@link ClockskewAdjuster} based on the supplied type. If * an incorrect configuration is supplied, then it fallbacks to {@link NoOpClockskewAdjuster}. - * This method never throws an exception. Any exception propagated up from downstream is - * swallowed and the fallback adjuster is returned + * This method never throws an exception. Any exception propagated up from downstream is swallowed + * and the fallback adjuster is returned * * @param appConfig the app configuration * @return a concrete implementation of {@link ClockskewAdjuster} @@ -45,7 +45,7 @@ static class Span { private Instant startTime; private Instant endTime; private Duration duration; - //the mutable builder object + // the mutable builder object private SpanEvent.Builder spanBuilder; } } diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java index 52a4db9d..72087b37 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java @@ -4,7 +4,7 @@ import java.util.List; import org.hypertrace.gateway.service.v1.span.SpanEvent; -public class NoOpClockskewAdjuster extends ClockskewAdjuster{ +public class NoOpClockskewAdjuster extends ClockskewAdjuster { public List transform(List spans) { return new ArrayList<>(spans); } diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java index 9e28b257..784c2660 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java @@ -3,9 +3,7 @@ import java.util.List; import org.hypertrace.gateway.service.v1.span.SpanEvent; -/** - * Represents a transformation stage in the {@link SpanTransformationPipeline} - */ +/** Represents a transformation stage in the {@link SpanTransformationPipeline} */ interface SpanTransformationStage { List transform(List spans); } diff --git a/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/GatewayServiceImplTest.java b/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/GatewayServiceImplTest.java index e4c90b4f..ee34da6f 100644 --- a/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/GatewayServiceImplTest.java +++ b/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/GatewayServiceImplTest.java @@ -8,6 +8,7 @@ /** Unit tests for {@link GatewayServiceImpl} */ public class GatewayServiceImplTest { + private static Config appConfig; @BeforeAll From 8f00d524e1e9f4cd8652d12fc8327053f4fbf94c Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Tue, 6 Jul 2021 19:54:01 +0530 Subject: [PATCH 09/11] Added UTs for SpanTransformationPipeline.java --- .../span/SpanTransformationPipelineTest.java | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 gateway-service-impl/src/test/java/org/hypertrace/gateway/service/span/SpanTransformationPipelineTest.java diff --git a/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/span/SpanTransformationPipelineTest.java b/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/span/SpanTransformationPipelineTest.java new file mode 100644 index 00000000..4146cc7f --- /dev/null +++ b/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/span/SpanTransformationPipelineTest.java @@ -0,0 +1,85 @@ +package org.hypertrace.gateway.service.span; + +import static java.util.stream.Collectors.toList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Map; +import org.hypertrace.gateway.service.v1.common.Value; +import org.hypertrace.gateway.service.v1.common.ValueType; +import org.hypertrace.gateway.service.v1.span.SpanEvent; +import org.hypertrace.gateway.service.v1.span.SpanEvent.Builder; +import org.junit.jupiter.api.Test; + +class SpanTransformationPipelineTest { + + private static final String ATTRIBUTE_KEY_ID = "id"; + + private static final List ORIGINAL_SPANS = + List.of( + createSpan(Map.of(ATTRIBUTE_KEY_ID, "firstSpan")), + createSpan(Map.of(ATTRIBUTE_KEY_ID, "secondSpan"))); + + @Test + void returnsUnchangedSpansIfNoTransformationsRegistered() { + assertEquals( + ORIGINAL_SPANS, SpanTransformationPipeline.getNewPipeline().execute(ORIGINAL_SPANS)); + } + + @Test + void appliesMultipleTransformationsInOrderOfRegistry() { + + var originalSpans = + List.of( + createSpanBuilder(Map.of(ATTRIBUTE_KEY_ID, "firstSpan")), + createSpanBuilder(Map.of(ATTRIBUTE_KEY_ID, "secondSpan"))); + + // first transformation stage + var firstTransformation = mock(SpanTransformationStage.class); + var firstTransformationResult = + List.of( + createSpanBuilder(Map.of(ATTRIBUTE_KEY_ID, "firstTransformationFirstSpan")), + createSpanBuilder(Map.of(ATTRIBUTE_KEY_ID, "firstTransformationSecondSpan"))); + + when(firstTransformation.transform(anyList())).thenReturn(firstTransformationResult); + + // second transformation stage + var secondTransformation = mock(SpanTransformationStage.class); + var secondTransformationResult = + List.of( + createSpanBuilder(Map.of(ATTRIBUTE_KEY_ID, "secondTransformationFirstSpan")), + createSpanBuilder(Map.of(ATTRIBUTE_KEY_ID, "secondTransformationSecondSpan"))); + + when(secondTransformation.transform(eq(firstTransformationResult))) + .thenReturn(secondTransformationResult); + + List pipelineTransformationExpectedResult = + secondTransformationResult.stream().map(Builder::build).collect(toList()); + + assertEquals( + pipelineTransformationExpectedResult, + SpanTransformationPipeline.getNewPipeline() + .addProcessingStage(firstTransformation) + .addProcessingStage(secondTransformation) + .execute(ORIGINAL_SPANS)); + } + + private static SpanEvent.Builder createSpanBuilder(Map attributes) { + Builder builder = SpanEvent.newBuilder(); + attributes.forEach( + (attributeName, attributeValue) -> { + builder.putAttributes( + attributeName, + Value.newBuilder().setValueType(ValueType.STRING).setString(attributeValue).build()); + }); + return builder; + } + + private static SpanEvent createSpan(Map attributes) { + return createSpanBuilder(attributes).build(); + } +} From 7e1367573e129770f11dbb6f9359190573ee04e6 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Tue, 6 Jul 2021 22:06:41 +0530 Subject: [PATCH 10/11] Added UTs for NoOpClockskewAdjuster.java, added @NonNull in SpanTransformationStage --- .../span/JaegarBasedClockskewAdjuster.java | 3 +- .../service/span/NoOpClockskewAdjuster.java | 3 +- .../service/span/SpanTransformationStage.java | 3 +- .../span/NoOpClockskewAdjusterTest.java | 45 +++++++++++++++++++ .../span/SpanTransformationPipelineTest.java | 6 --- 5 files changed, 51 insertions(+), 9 deletions(-) create mode 100644 gateway-service-impl/src/test/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjusterTest.java diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java index 04914ba3..59ce5be8 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java @@ -8,6 +8,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import lombok.NonNull; import org.hypertrace.gateway.service.v1.common.Value; import org.hypertrace.gateway.service.v1.common.ValueType; import org.hypertrace.gateway.service.v1.span.SpanEvent; @@ -15,7 +16,7 @@ class JaegarBasedClockskewAdjuster extends ClockskewAdjuster { @Override - public List transform(List spans) { + public List transform(@NonNull List spans) { Map parentChildMap = new HashMap<>(); Map idToSpanMap = spans.stream() diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java index 72087b37..5106e7cc 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java @@ -2,10 +2,11 @@ import java.util.ArrayList; import java.util.List; +import lombok.NonNull; import org.hypertrace.gateway.service.v1.span.SpanEvent; public class NoOpClockskewAdjuster extends ClockskewAdjuster { - public List transform(List spans) { + public List transform(@NonNull List spans) { return new ArrayList<>(spans); } } diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java index 784c2660..25f3ee4a 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java @@ -1,9 +1,10 @@ package org.hypertrace.gateway.service.span; import java.util.List; +import lombok.NonNull; import org.hypertrace.gateway.service.v1.span.SpanEvent; /** Represents a transformation stage in the {@link SpanTransformationPipeline} */ interface SpanTransformationStage { - List transform(List spans); + List transform(@NonNull List spans); } diff --git a/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjusterTest.java b/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjusterTest.java new file mode 100644 index 00000000..c2cf4f21 --- /dev/null +++ b/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjusterTest.java @@ -0,0 +1,45 @@ +package org.hypertrace.gateway.service.span; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.hypertrace.gateway.service.v1.common.Value; +import org.hypertrace.gateway.service.v1.common.ValueType; +import org.hypertrace.gateway.service.v1.span.SpanEvent; +import org.hypertrace.gateway.service.v1.span.SpanEvent.Builder; +import org.junit.jupiter.api.Test; + +class NoOpClockskewAdjusterTest { + + private static final ClockskewAdjuster CLOCKSKEW_ADJUSTER = new NoOpClockskewAdjuster(); + + private static final String ATTRIBUTE_KEY_ID = "id"; + + @Test + void returnsUnchangedSpanList() { + var originalSpans = + List.of( + createSpanBuilder(Map.of(ATTRIBUTE_KEY_ID, UUID.randomUUID().toString())), + createSpanBuilder(Map.of(ATTRIBUTE_KEY_ID, UUID.randomUUID().toString()))); + assertEquals(originalSpans, CLOCKSKEW_ADJUSTER.transform(originalSpans)); + } + + @Test + void throwsNPEWhenNullSpanListArePassed() { + assertThrows(NullPointerException.class, () -> CLOCKSKEW_ADJUSTER.transform(null)); + } + + private static SpanEvent.Builder createSpanBuilder(Map attributes) { + Builder builder = SpanEvent.newBuilder(); + attributes.forEach( + (attributeName, attributeValue) -> { + builder.putAttributes( + attributeName, + Value.newBuilder().setValueType(ValueType.STRING).setString(attributeValue).build()); + }); + return builder; + } +} diff --git a/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/span/SpanTransformationPipelineTest.java b/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/span/SpanTransformationPipelineTest.java index 4146cc7f..77665960 100644 --- a/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/span/SpanTransformationPipelineTest.java +++ b/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/span/SpanTransformationPipelineTest.java @@ -32,12 +32,6 @@ void returnsUnchangedSpansIfNoTransformationsRegistered() { @Test void appliesMultipleTransformationsInOrderOfRegistry() { - - var originalSpans = - List.of( - createSpanBuilder(Map.of(ATTRIBUTE_KEY_ID, "firstSpan")), - createSpanBuilder(Map.of(ATTRIBUTE_KEY_ID, "secondSpan"))); - // first transformation stage var firstTransformation = mock(SpanTransformationStage.class); var firstTransformationResult = From 5fd806e0b79d2f6159f17a96f9e1920111bb9423 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Tue, 6 Jul 2021 22:12:12 +0530 Subject: [PATCH 11/11] Fixed formatting of SpanTransformationStage.java --- .../gateway/service/span/SpanTransformationStage.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java index 25f3ee4a..7fe1a329 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java @@ -6,5 +6,5 @@ /** Represents a transformation stage in the {@link SpanTransformationPipeline} */ interface SpanTransformationStage { - List transform(@NonNull List spans); + List transform(@NonNull List spans); }