Skip to content
This repository was archived by the owner on Aug 4, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
# elastic-apm-http-client changelog

## v11.0.0

- Add support for coordinating data flushing in an AWS Lambda environment. The
following two API additions are used to ensure that (a) the Elastic Lambda
extension is signaled at invocation end [per spec](https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing)
and (b) a new intake request is not started when a Lambda function invocation
is not active.

- `Client#lambdaStart()` should be used to indicate when a Lambda function
invocation begins.
- `Client#flush([opts,] cb)` now supports an optional `opts.lambdaEnd`
boolean. Set it to true to indicate this is a flush at the end of a Lambda
function invocation.

This is a **BREAKING CHANGE**, because current versions of elastic-apm-node
depend on `^10.4.0`. If this were released as another 10.x, then usage of
current elastic-apm-node with this version of the client would break
behavior in a Lambda environment.

## v10.4.0

- Add APM Server version checking to the client. On creation the client will
Expand Down
34 changes: 33 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,33 @@ is provided. For example, in an AWS Lambda function some metadata is not
available until the first function invocation -- which is some async time after
Client creation.

### `client.lambdaStart()`

Tells the client that a Lambda function invocation has started.

#### Notes on Lambda usage

To properly handle [data flushing for instrumented Lambda functions](https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing)
this Client should be used as follows in a Lambda environment.

- When a Lambda invocation starts, `client.lambdaStart()` must be called.

The Client prevents intake requests to APM Server when in a Lambda environment
when a function invocation is *not* active. This is to ensure that an intake
request does not accidentally span a period when a Lambda VM is frozen,
which can lead to timeouts and lost APM data.

- When a Lambda invocation finishes, `client.flush({lambdaEnd: true}, cb)` must
be called.

The `lambdaEnd: true` tells the Client to (a) mark the lambda as inactive so
a subsequent intake request is not started until the next invocation, and
(b) signal the Elastic AWS Lambda Extension that this invocation is done.
The user's Lambda handler should not finish until `cb` is called. This
ensures that the extension receives tracing data and the end signal before
the Lambda Runtime freezes the VM.


### `client.sendSpan(span[, callback])`

Send a span to the APM Server.
Expand Down Expand Up @@ -381,7 +408,7 @@ Arguments:
- `callback` - Callback is called when the `metricset` have been flushed to
the underlying system

### `client.flush([callback])`
### `client.flush([opts,] [callback])`

Flush the internal buffer and end the current HTTP request to the APM
Server. If no HTTP request is in process nothing happens. In an AWS Lambda
Expand All @@ -390,6 +417,11 @@ because the APM agent always flushes at the end of a Lambda handler.

Arguments:

- `opts`:
- `opts.lambdaEnd` - An optional boolean to indicate if this is the final
flush at the end of the Lambda function invocation. The client will do
some extra handling if this is the case. See notes in `client.lambdaStart()`
above.
- `callback` - Callback is called when the internal buffer has been
flushed and the HTTP request ended. If no HTTP request is in progress
the callback is called in the next tick.
Expand Down
186 changes: 154 additions & 32 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const http = require('http')
const https = require('https')
const util = require('util')
const os = require('os')
const { performance } = require('perf_hooks')
const { URL } = require('url')
const zlib = require('zlib')

Expand All @@ -25,11 +26,12 @@ const truncate = require('./lib/truncate')

module.exports = Client

// This symbol is used as a marker in the client stream to indicate special
// These symbols are used as markers in the client stream to indicate special
// flush handling.
const kFlush = Symbol('flush')
const kLambdaEndFlush = Symbol('lambdaEndFlush')
function isFlushMarker (obj) {
return obj === kFlush
return obj === kFlush || obj === kLambdaEndFlush
}

const hostname = os.hostname()
Expand Down Expand Up @@ -103,6 +105,9 @@ function Client (opts) {
this._cloudMetadata = null
this._extraMetadata = null
this._metadataFilters = new Filters()
// _lambdaActive indicates if a Lambda function invocation is active. It is
// only meaningful if `isLambdaExecutionEnvironment`.
this._lambdaActive = false

// Internal runtime stats for developer debugging/tuning.
this._numEvents = 0 // number of events given to the client
Expand Down Expand Up @@ -187,6 +192,14 @@ function Client (opts) {
this._index = clientsToAutoEnd.length
clientsToAutoEnd.push(this)

// The 'beforeExit' event is significant in Lambda invocation completion
// handling, so we log it for debugging.
if (isLambdaExecutionEnvironment && this._log.isLevelEnabled('trace')) {
process.prependListener('beforeExit', () => {
this._log.trace('process "beforeExit"')
})
}

if (this._conf.centralConfig) {
this._pollConfig()
}
Expand Down Expand Up @@ -280,6 +293,7 @@ Client.prototype.config = function (opts) {
// http request options
this._conf.requestIntake = getIntakeRequestOptions(this._conf, this._agent)
this._conf.requestConfig = getConfigRequestOptions(this._conf, this._agent)
this._conf.requestSignalLambdaEnd = getSignalLambdaEndRequestOptions(this._conf, this._agent)

this._conf.metadata = getMetadata(this._conf)

Expand Down Expand Up @@ -307,8 +321,8 @@ Client.prototype.setExtraMetadata = function (extraMetadata) {
this._resetEncodedMetadata()

if (this._conf.expectExtraMetadata) {
this._log.trace('maybe uncork (expectExtraMetadata)')
this._maybeUncork()
this._log.trace('uncorked (expectExtraMetadata)')
}
}

Expand Down Expand Up @@ -429,7 +443,7 @@ Client.prototype._ref = function () {

Client.prototype._write = function (obj, enc, cb) {
if (isFlushMarker(obj)) {
this._writeFlush(cb)
this._writeFlush(obj, cb)
} else {
const t = process.hrtime()
const chunk = this._encode(obj, enc)
Expand Down Expand Up @@ -481,10 +495,10 @@ Client.prototype._writev = function (objs, cb) {
offset = flushIdx
} else if (flushIdx === objs.length - 1) {
// The next item is a flush marker, and it is the *last* item in the queue.
this._writeFlush(cb)
this._writeFlush(objs[flushIdx].chunk, cb)
} else {
// The next item in the queue is a flush.
this._writeFlush(processBatch)
this._writeFlush(objs[flushIdx].chunk, processBatch)
offset++
}
}
Expand Down Expand Up @@ -525,42 +539,55 @@ Client.prototype._writeBatch = function (objs, cb) {
}, '_writeBatch')
}

Client.prototype._writeFlush = function (cb) {
this._log.trace({ activeIntakeReq: this._activeIntakeReq }, '_writeFlush')
if (this._activeIntakeReq) {
// In a Lambda environment a flush is almost certainly a signal that the
// runtime environment is about to be frozen: tell the intake request
// to finish up quickly.
if (this._intakeRequestGracefulExitFn && isLambdaExecutionEnvironment) {
this._intakeRequestGracefulExitFn()
Client.prototype._writeFlush = function (flushMarker, cb) {
this._log.trace({ activeIntakeReq: this._activeIntakeReq, lambdaEnd: flushMarker === kLambdaEndFlush }, '_writeFlush')

let onFlushed = cb
if (isLambdaExecutionEnvironment && flushMarker === kLambdaEndFlush) {
onFlushed = () => {
// Signal the Elastic AWS Lambda extension that it is done passing data
// for this invocation, then call `cb()` so the wrapped Lambda handler
// can finish.
this._signalLambdaEnd(cb)
}
this._onIntakeReqConcluded = cb
}

if (this._activeIntakeReq) {
this._onIntakeReqConcluded = onFlushed
this._chopper.chop()
} else {
this._chopper.chop(cb)
this._chopper.chop(onFlushed)
}
}

Client.prototype._maybeCork = function () {
if (!this._writableState.corked && this._conf.bufferWindowTime !== -1) {
this.cork()
if (this._corkTimer && this._corkTimer.refresh) {
// the refresh function was added in Node 10.2.0
this._corkTimer.refresh()
} else {
this._corkTimer = setTimeout(() => {
this.uncork()
}, this._conf.bufferWindowTime)
if (!this._writableState.corked) {
if (isLambdaExecutionEnvironment && !this._lambdaActive) {
this.cork()
} else if (this._conf.bufferWindowTime !== -1) {
this.cork()
if (this._corkTimer && this._corkTimer.refresh) {
// the refresh function was added in Node 10.2.0
this._corkTimer.refresh()
} else {
this._corkTimer = setTimeout(() => {
this.uncork()
}, this._conf.bufferWindowTime)
}
}
} else if (this._writableState.length >= this._conf.bufferWindowSize) {
this._maybeUncork()
}
}

Client.prototype._maybeUncork = function () {
// client must remain corked until cloud metadata has been
// fetched-or-skipped.
if (!this._encodedMetadata) {
// The client must remain corked until cloud metadata has been
// fetched-or-skipped.
return
} else if (isLambdaExecutionEnvironment && !this._lambdaActive) {
// In a Lambda env, we must only uncork when an invocation is active,
// otherwise we could start an intake request just before the VM is frozen.
return
}

Expand All @@ -569,7 +596,7 @@ Client.prototype._maybeUncork = function () {
// to `_maybeUncork` have time to be added to the queue. If we didn't do
// this, that last write would trigger a single call to `_write`.
process.nextTick(() => {
if (this.destroyed === false) {
if (this.destroyed === false && !(isLambdaExecutionEnvironment && !this._lambdaActive)) {
this.uncork()
}
})
Expand Down Expand Up @@ -603,6 +630,10 @@ Client.prototype._encode = function (obj, enc) {
return ndjson.serialize(out)
}

Client.prototype.lambdaStart = function () {
this._lambdaActive = true
}

// With the cork/uncork handling on this stream, `this.write`ing on this
// stream when already destroyed will lead to:
// Error: Cannot call write after a stream was destroyed
Expand Down Expand Up @@ -652,14 +683,44 @@ Client.prototype.sendMetricSet = function (metricset, cb) {
return this.write({ metricset }, Client.encoding.METRICSET, cb)
}

Client.prototype.flush = function (cb) {
this._maybeUncork()
/**
* If possible, start a flush of currently queued APM events to APM server.
*
* "If possible," because there are some guards on uncorking. See `_maybeUncork`.
*
* @param {Object} opts - Optional.
* - {Boolean} opts.lambdaEnd - Optional. Default false. Setting this true
* tells the client to also handle the end of a Lambda function invocation.
* @param {Function} cb - Optional. `cb()` will be called when the data has
* be sent to APM Server (or failed in the attempt).
*/
Client.prototype.flush = function (opts, cb) {
if (typeof opts === 'function') {
cb = opts
opts = {}
} else if (!opts) {
opts = {}
}
const lambdaEnd = !!opts.lambdaEnd

// Write the special "flush" signal. We do this so that the order of writes
// and flushes are kept. If we where to just flush the client right here, the
// internal Writable buffer might still contain data that hasn't yet been
// given to the _write function.
return this.write(kFlush, cb)

if (lambdaEnd && isLambdaExecutionEnvironment && this._lambdaActive) {
// To flush the current data and ensure that subsequently sent events *in
// the same tick* do not start a new intake request, we must uncork
// synchronously -- rather than the nextTick uncork done in `_maybeUncork()`.
assert(this._encodedMetadata, 'client.flush({lambdaEnd:true}) must not be called before metadata has been set')
const rv = this.write(kLambdaEndFlush, cb)
this.uncork()
this._lambdaActive = false
return rv
} else {
this._maybeUncork()
return this.write(kFlush, cb)
}
}

// A handler that can be called on process "beforeExit" to attempt quick and
Expand Down Expand Up @@ -909,7 +970,12 @@ function getChoppedStreamHandler (client, onerror) {
// during a request. Given that the normal makeIntakeRequest behaviour
// is to keep a request open for up to 10s (`apiRequestTimeout`), we must
// manually unref the socket.
if (!intakeRequestGracefulExitCalled) {
//
// The exception is when in a Lambda environment, where we *do* want to
// keep the node process running to complete this intake request.
// Otherwise a 'beforeExit' event can be sent, which the Lambda runtime
// interprets as "the Lambda handler callback was never called".
if (!isLambdaExecutionEnvironment && !intakeRequestGracefulExitCalled) {
log.trace('intakeReq "socket": unref it')
intakeReqSocket.unref()
}
Expand Down Expand Up @@ -1048,6 +1114,55 @@ Client.prototype.supportsKeepingUnsampledTransaction = function () {
}
}

/**
* Signal to the Elastic AWS Lambda extension that a lambda function execution
* is done.
* https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing
*
* @param {Function} cb() is called when finished. There are no arguments.
*/
Client.prototype._signalLambdaEnd = function (cb) {
this._log.trace('_signalLambdaEnd start')
const startTime = performance.now()
const finish = errOrErrMsg => {
const durationMs = performance.now() - startTime
if (errOrErrMsg) {
this._log.error({ err: errOrErrMsg, durationMs }, 'error signaling lambda invocation done')
} else {
this._log.trace({ durationMs }, 'signaled lambda invocation done')
}
cb()
}

// We expect to be talking to the localhost Elastic Lambda extension, so we
// want a shorter timeout than `_conf.serverTimeout`.
const TIMEOUT_MS = 5000

const req = this._transportRequest(this._conf.requestSignalLambdaEnd, res => {
res.on('error', err => {
// Not sure this event can ever be emitted, but just in case.
res.destroy(err)
})
res.resume()
if (res.statusCode !== 202) {
finish(`unexpected response status code: ${res.statusCode}`)
return
}
res.on('end', function () {
finish()
})
})
req.setTimeout(TIMEOUT_MS)
req.on('timeout', () => {
this._log.trace('_signalLambdaEnd timeout')
req.destroy(new Error(`timeout (${TIMEOUT_MS}ms) signaling Lambda invocation done`))
})
req.on('error', err => {
finish(err)
})
req.end()
}

/**
* Fetch the APM Server version and set `this._apmServerVersion`.
* https://www.elastic.co/guide/en/apm/server/current/server-info.html
Expand Down Expand Up @@ -1164,6 +1279,13 @@ function getIntakeRequestOptions (opts, agent) {
return getBasicRequestOptions('POST', '/intake/v2/events', headers, opts, agent)
}

function getSignalLambdaEndRequestOptions (opts, agent) {
const headers = getHeaders(opts)
headers['Content-Length'] = 0

return getBasicRequestOptions('POST', '/intake/v2/events?flushed=true', headers, opts, agent)
}

function getConfigRequestOptions (opts, agent) {
const path = '/config/v1/agents?' + querystring.stringify({
'service.name': opts.serviceName,
Expand Down
Loading