Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public class AgentInfo {
"com.blogspot.mydailyjava.weaklockfree",
"com.lmax.disruptor",
"com.dslplatform.json",
"com.googlecode.concurrentlinkedhashmap"
"com.googlecode.concurrentlinkedhashmap",
"co.elastic.otel"
));

private static final Set<String> agentRootPackages = new HashSet<>(Arrays.asList(
Expand Down
5 changes: 5 additions & 0 deletions apm-agent-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@
<artifactId>HdrHistogram</artifactId>
<version>2.1.11</version>
</dependency>
<dependency>
<groupId>co.elastic.otel</groupId>
<artifactId>jvmti-access</artifactId>
<version>0.3.0</version>
</dependency>
<!--
We can't use caffeine due to requiring Java 7.
As recommended by the author, we use concurrentlinkedhashmap-lru instead:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package co.elastic.apm.agent.configuration;

import org.stagemonitor.configuration.ConfigurationOption;
import org.stagemonitor.configuration.ConfigurationOptionProvider;

import static co.elastic.apm.agent.tracer.configuration.RangeValidator.isInRange;

public class UniversalProfilingConfiguration extends ConfigurationOptionProvider {

private static final String PROFILING_CATEGORY = "Profiling";

private final ConfigurationOption<Boolean> enabled = ConfigurationOption.booleanOption()
.key("universal_profiling_integration_enabled")
.tags("added[1.50.0]", "internal")
.configurationCategory(PROFILING_CATEGORY)
.description("If enabled, the apm agent will correlate it's transaction with the profiling data from elastic universal profiling running on the same host.")
.buildWithDefault(false);

private final ConfigurationOption<Long> bufferSize = ConfigurationOption.longOption()
.key("universal_profiling_integration_buffer_size")
.addValidator(isInRange(64L, Long.MAX_VALUE))
.tags("added[1.50.0]", "internal")
.configurationCategory(PROFILING_CATEGORY)
.description("The feature needs to buffer ended local-root spans for a short duration to ensure that all of its profiling data has been received." +
"This configuration option configures the buffer size in number of spans. " +
"The higher the number of local root spans per second, the higher this buffer size should be set.\n" +
"The agent will log a warning if it is not capable of buffering a span due to insufficient buffer size. " +
"This will cause the span to be exported immediately instead with possibly incomplete profiling correlation data.")
.buildWithDefault(4096L);

private final ConfigurationOption<String> socketDir = ConfigurationOption.stringOption()
.key("universal_profiling_integration_socket_dir")
.tags("added[1.50.0]", "internal")
.configurationCategory(PROFILING_CATEGORY)
.description("The extension needs to bind a socket to a file for communicating with the universal profiling host agent." +
"This configuration option can be used to change the location. " +
"Note that the total path name (including the socket) must not exceed 100 characters due to OS restrictions.\n" +
"If unset, the value of the `java.io.tmpdir` system property will be used.")
.build();

public boolean isEnabled() {
return enabled.get();
}

public long getBufferSize() {
return bufferSize.get();
}

public String getSocketDir() {
String dir = socketDir.get();
return dir == null || dir.isEmpty() ? System.getProperty("java.io.tmpdir") : dir;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import co.elastic.apm.agent.tracer.dispatch.HeaderGetter;
import co.elastic.apm.agent.tracer.reference.ReferenceCounted;
import co.elastic.apm.agent.tracer.reference.ReferenceCountedMap;
import co.elastic.apm.agent.universalprofiling.UniversalProfilingIntegration;
import co.elastic.apm.agent.util.DependencyInjectingServiceLoader;
import co.elastic.apm.agent.util.ExecutorUtils;
import com.dslplatform.json.JsonWriter;
Expand Down Expand Up @@ -143,6 +144,8 @@ protected ActiveStack initialValue() {
private final SpanConfiguration spanConfiguration;
private final List<ActivationListener> activationListeners;
private final MetricRegistry metricRegistry;

private final UniversalProfilingIntegration profilingIntegration;
private final ScheduledThreadPoolExecutor sharedPool;
private final int approximateContextSize;
private Sampler sampler;
Expand Down Expand Up @@ -261,6 +264,7 @@ public void onChange(ConfigurationOption<?> configurationOption, Double oldValue
// sets the assertionsEnabled flag to true if indeed enabled
//noinspection AssertWithSideEffects
assert assertionsEnabled = true;
profilingIntegration = new UniversalProfilingIntegration();
}

@Override
Expand Down Expand Up @@ -341,6 +345,7 @@ private void afterTransactionStart(@Nullable ClassLoader initiatingClassLoader,
if (serviceInfo != null) {
transaction.getTraceContext().setServiceInfo(serviceInfo.getServiceName(), serviceInfo.getServiceVersion());
}
profilingIntegration.afterTransactionStart(transaction);
}

public Transaction noopTransaction() {
Expand Down Expand Up @@ -525,8 +530,9 @@ public void endTransaction(Transaction transaction) {
if (!transaction.isNoop() &&
(transaction.isSampled() || apmServerClient.supportsKeepingUnsampledTransaction())) {
// we do report non-sampled transactions (without the context)
reporter.report(transaction);
profilingIntegration.correlateAndReport(transaction);
} else {
profilingIntegration.drop(transaction);
transaction.decrementReferences();
}
}
Expand Down Expand Up @@ -633,6 +639,7 @@ public synchronized void stop() {
logger.debug("Tracer stop stack trace: ", new Throwable("Expected - for debugging purposes"));
}

profilingIntegration.stop();
try {
configurationRegistry.close();
reporter.close();
Expand Down Expand Up @@ -738,6 +745,7 @@ private synchronized void startSync() {
}
apmServerClient.start();
reporter.start();
profilingIntegration.start(this);
for (LifecycleListener lifecycleListener : lifecycleListeners) {
try {
lifecycleListener.start(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public static SystemInfo create(final @Nullable String configuredHostname, final
return systemInfo.findContainerDetails();
}

static boolean isWindows(String osName) {
public static boolean isWindows(String osName) {
return osName.startsWith("Windows");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ public int toBytes(byte[] bytes, int offset) {
return offset + data.length;
}

public void writeToBuffer(ByteBuffer buffer) {
buffer.put(data);
}

public void fromLongs(long... values) {
if (values.length * Long.BYTES != data.length) {
throw new IllegalArgumentException("Invalid number of long values");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package co.elastic.apm.agent.universalprofiling;

import co.elastic.apm.agent.impl.transaction.AbstractSpan;
import co.elastic.apm.agent.impl.transaction.Transaction;
import co.elastic.apm.agent.sdk.logging.Logger;
import co.elastic.apm.agent.sdk.logging.LoggerFactory;
import co.elastic.otel.UniversalProfilingCorrelation;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;

public class ProfilerSharedMemoryWriter {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


private static final Logger log = LoggerFactory.getLogger(ProfilerSharedMemoryWriter.class);

private static final int TLS_MINOR_VERSION_OFFSET = 0;
private static final int TLS_VALID_OFFSET = 2;
private static final int TLS_TRACE_PRESENT_OFFSET = 3;
private static final int TLS_TRACE_FLAGS_OFFSET = 4;
private static final int TLS_TRACE_ID_OFFSET = 5;
private static final int TLS_SPAN_ID_OFFSET = 21;
private static final int TLS_LOCAL_ROOT_SPAN_ID_OFFSET = 29;
static final int TLS_STORAGE_SIZE = 37;

private static volatile int writeForMemoryBarrier = 0;

static ByteBuffer generateProcessCorrelationStorage(String serviceName, @Nullable String environment, String socketFilePath) {
ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
buffer.order(ByteOrder.nativeOrder());
buffer.position(0);

buffer.putChar((char) 1); // layout-minor-version
writeUtf8Str(buffer, serviceName);
writeUtf8Str(buffer, environment == null ? "" : environment);
writeUtf8Str(buffer, socketFilePath);
return buffer;
}

private static void writeUtf8Str(ByteBuffer buffer, String str) {
byte[] utf8 = str.getBytes(StandardCharsets.UTF_8);
buffer.putInt(utf8.length);
buffer.put(utf8);
}

/**
* This method ensures that all writes which happened prior to this method call are not moved
* after the method call due to reordering.
*
* <p>This is realized based on the Java Memory Model guarantess for volatile variables. Relevant
* resources:
*
* <ul>
* <li><a
* href="https://stackoverflow.com/questions/17108541/happens-before-relationships-with-volatile-fields-and-synchronized-blocks-in-jav">StackOverflow
* topic</a>
* <li><a href="https://gee.cs.oswego.edu/dl/jmm/cookbook.html">JSR Compiler Cookbook</a>
* </ul>
*/
private static void memoryStoreStoreBarrier() {
writeForMemoryBarrier = 42;
}

static void updateThreadCorrelationStorage(@Nullable AbstractSpan<?> newSpan) {
try {
ByteBuffer tls = UniversalProfilingCorrelation.getCurrentThreadStorage(true, TLS_STORAGE_SIZE);
// tls might be null if unsupported or something went wrong on initialization
if (tls != null) {
// the valid flag is used to signal the host-agent that it is reading incomplete data
tls.put(TLS_VALID_OFFSET, (byte) 0);
memoryStoreStoreBarrier();
tls.putChar(TLS_MINOR_VERSION_OFFSET, (char) 1);

if (newSpan != null) {
Transaction tx = newSpan.getParentTransaction();
tls.put(TLS_TRACE_PRESENT_OFFSET, (byte) 1);
tls.put(TLS_TRACE_FLAGS_OFFSET, newSpan.getTraceContext().getFlags());
tls.position(TLS_TRACE_ID_OFFSET);
newSpan.getTraceContext().getTraceId().writeToBuffer(tls);
tls.position(TLS_SPAN_ID_OFFSET);
newSpan.getTraceContext().getId().writeToBuffer(tls);
tls.position(TLS_LOCAL_ROOT_SPAN_ID_OFFSET);
tx.getTraceContext().getId().writeToBuffer(tls);
} else {
tls.put(TLS_TRACE_PRESENT_OFFSET, (byte) 0);
}
memoryStoreStoreBarrier();
tls.put(TLS_VALID_OFFSET, (byte) 1);
}
} catch (Exception e) {
log.error("Failed to write profiling correlation tls", e);
}
}
}

Loading