From df017c8ad49137103a05cb8553ce237b217d46c5 Mon Sep 17 00:00:00 2001 From: Trent Mick Date: Tue, 22 Feb 2022 16:02:16 -0800 Subject: [PATCH 01/13] refactor: flush to kFlush and add isFlushMarker() function --- index.js | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/index.js b/index.js index 7c80e52..58582b8 100644 --- a/index.js +++ b/index.js @@ -25,7 +25,13 @@ const truncate = require('./lib/truncate') module.exports = Client -const flush = Symbol('flush') +// This symbol is used as a marker in the client stream to indicate special +// flush handling. +const kFlush = Symbol('flush') +function isFlushMarker (obj) { + return obj === kFlush +} + const hostname = os.hostname() const requiredOpts = [ 'agentName', @@ -422,7 +428,7 @@ Client.prototype._ref = function () { } Client.prototype._write = function (obj, enc, cb) { - if (obj === flush) { + if (isFlushMarker(obj)) { this._writeFlush(cb) } else { const t = process.hrtime() @@ -455,29 +461,29 @@ Client.prototype._writev = function (objs, cb) { let flushIdx = -1 const limit = Math.min(objs.length, offset + MAX_WRITE_BATCH_SIZE) for (let i = offset; i < limit; i++) { - if (objs[i].chunk === flush) { + if (isFlushMarker(objs[i].chunk)) { flushIdx = i break } } if (offset === 0 && flushIdx === -1 && objs.length <= MAX_WRITE_BATCH_SIZE) { - // A shortcut if there is no `flush` and the whole `objs` fits in a batch. + // A shortcut if there is no flush marker and the whole `objs` fits in a batch. this._writeBatch(objs, cb) } else if (flushIdx === -1) { - // No `flush` in this batch. + // No flush marker in this batch. this._writeBatch(objs.slice(offset, limit), limit === objs.length ? cb : processBatch) offset = limit } else if (flushIdx > offset) { - // There are some events in the queue before a `flush`. + // There are some events in the queue before a flush marker. this._writeBatch(objs.slice(offset, flushIdx), processBatch) offset = flushIdx } else if (flushIdx === objs.length - 1) { - // The next item is a flush, and it is the *last* item in the queue. + // The next item is a flush marker, and it is the *last* item in the queue. this._writeFlush(cb) } else { - // the next item in the queue is a flush + // The next item in the queue is a flush. this._writeFlush(processBatch) offset++ } @@ -653,7 +659,7 @@ Client.prototype.flush = function (cb) { // and flushes are kept. If we where to just flush the client right here, the // internal Writable buffer might still contain data that hasn't yet been // given to the _write function. - return this.write(flush, cb) + return this.write(kFlush, cb) } // A handler that can be called on process "beforeExit" to attempt quick and From d23c3f83342347b9ff89dcd9474db45f682135dc Mon Sep 17 00:00:00 2001 From: Trent Mick Date: Tue, 22 Feb 2022 16:05:11 -0800 Subject: [PATCH 02/13] refactor: clearer name for this internal var --- index.js | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/index.js b/index.js index 58582b8..4d8f218 100644 --- a/index.js +++ b/index.js @@ -93,7 +93,7 @@ function Client (opts) { this._corkTimer = null this._agent = null - this._active = false + this._activeIntakeReq = false this._onflushed = null this._transport = null this._configTimer = null @@ -526,8 +526,8 @@ Client.prototype._writeBatch = function (objs, cb) { } Client.prototype._writeFlush = function (cb) { - this._log.trace({ active: this._active }, '_writeFlush') - if (this._active) { + this._log.trace({ activeIntakeReq: this._activeIntakeReq }, '_writeFlush') + if (this._activeIntakeReq) { // In a Lambda environment a flush is almost certainly a signal that the // runtime environment is about to be frozen: tell the intake request // to finish up quickly. @@ -783,8 +783,8 @@ function getChoppedStreamHandler (client, onerror) { const intakeResTimeout = client._conf.intakeResTimeout const intakeResTimeoutOnEnd = client._conf.intakeResTimeoutOnEnd - // `_active` is used to coordinate the callback to `client.flush(db)`. - client._active = true + // `_activeIntakeReq` is used to coordinate the callback to `client.flush(db)`. + client._activeIntakeReq = true // Handle conclusion of this intake request. Each "part" is expected to call // `completePart()` at least once -- multiple calls are okay for cases like @@ -840,7 +840,7 @@ function getChoppedStreamHandler (client, onerror) { client._intakeRequestGracefulExitFn = null client.sent = client._numEventsEnqueued - client._active = false + client._activeIntakeReq = false const backoffDelayMs = client._getBackoffDelay(!!err) if (err) { log.trace({ timeline, bytesWritten, backoffDelayMs, err }, From dc85ec9fe7bb3872158844a5702a6d2be0adb0d5 Mon Sep 17 00:00:00 2001 From: Trent Mick Date: Tue, 22 Feb 2022 16:08:11 -0800 Subject: [PATCH 03/13] update test case for '_active' refactor --- test/basic.test.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/basic.test.js b/test/basic.test.js index bea5245..f24de2a 100644 --- a/test/basic.test.js +++ b/test/basic.test.js @@ -221,11 +221,11 @@ test('client.flush(callback) - with active request', function (t) { res.end() }) }).client({ bufferWindowTime: -1, apmServerVersion: '8.0.0' }, function (client) { - t.equal(client._active, false, 'no outgoing HTTP request to begin with') + t.equal(client._activeIntakeReq, false, 'no outgoing HTTP request to begin with') client.sendSpan({ foo: 42 }) - t.equal(client._active, true, 'an outgoing HTTP request should be active') + t.equal(client._activeIntakeReq, true, 'an outgoing HTTP request should be active') client.flush(function () { - t.equal(client._active, false, 'the outgoing HTTP request should be done') + t.equal(client._activeIntakeReq, false, 'the outgoing HTTP request should be done') client.end() server.close() t.end() @@ -256,9 +256,9 @@ test('client.flush(callback) - with queued request', function (t) { client.sendSpan({ req: 1 }) client.flush() client.sendSpan({ req: 2 }) - t.equal(client._active, true, 'an outgoing HTTP request should be active') + t.equal(client._activeIntakeReq, true, 'an outgoing HTTP request should be active') client.flush(function () { - t.equal(client._active, false, 'the outgoing HTTP request should be done') + t.equal(client._activeIntakeReq, false, 'the outgoing HTTP request should be done') client.end() server.close() t.end() From f44213b5f3d9c4cf1a9d6dca0393a5efec703db2 Mon Sep 17 00:00:00 2001 From: Trent Mick Date: Tue, 22 Feb 2022 16:08:44 -0800 Subject: [PATCH 04/13] refactor: clearer internal var name --- index.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/index.js b/index.js index 4d8f218..d017d76 100644 --- a/index.js +++ b/index.js @@ -94,7 +94,7 @@ function Client (opts) { this._corkTimer = null this._agent = null this._activeIntakeReq = false - this._onflushed = null + this._onIntakeReqConcluded = null this._transport = null this._configTimer = null this._backoffReconnectCount = 0 @@ -534,7 +534,7 @@ Client.prototype._writeFlush = function (cb) { if (this._intakeRequestGracefulExitFn && isLambdaExecutionEnvironment) { this._intakeRequestGracefulExitFn() } - this._onflushed = cb + this._onIntakeReqConcluded = cb this._chopper.chop() } else { this._chopper.chop(cb) @@ -850,9 +850,9 @@ function getChoppedStreamHandler (client, onerror) { log.trace({ timeline, bytesWritten, backoffDelayMs }, 'conclude intake request: success') } - if (client._onflushed) { - client._onflushed() - client._onflushed = null + if (client._onIntakeReqConcluded) { + client._onIntakeReqConcluded() + client._onIntakeReqConcluded = null } if (backoffDelayMs > 0) { From 70dfa74c71b7715fd502d47a7a12087ad2d07cf1 Mon Sep 17 00:00:00 2001 From: Trent Mick Date: Thu, 24 Feb 2022 10:29:01 -0800 Subject: [PATCH 05/13] feat: add support for Lambda data flushing This adds 'Client#lambdaStart()' and the 'lambdaEnd: true' option to 'Client#flush([opts,] [cb])' to support for flushing Lambda invocation tracing data and signaling the Elastic Lambda extension per spec. https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing Refs: https://github.com/elastic/apm-agent-nodejs/issues/2485 --- CHANGELOG.md | 14 +++++ README.md | 34 +++++++++- index.js | 174 +++++++++++++++++++++++++++++++++++++++++++-------- package.json | 2 +- 4 files changed, 195 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f67276..a54a42b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,19 @@ # elastic-apm-http-client changelog +## v10.5.0 + +- Add support for coordinating data flushing in an AWS Lambda environment. The + following two API additions are used to ensure that (a) the Elastic Lambda + extension is signaled at invocation end [per spec](https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing) + and (b) a new intake request is not started when a Lambda function invocation + is not active. + + - `Client#lambdaStart()` should be used to indicate when a Lambda function + invocation begins. + - `Client#flush([opts,] cb)` now supports an optional `opts.lambdaEnd` + boolean. Set it to true to indicate this is a flush at the end of a Lambda + function invocation. + ## v10.4.0 - Add APM Server version checking to the client. On creation the client will diff --git a/README.md b/README.md index 0b24a17..08e74f6 100644 --- a/README.md +++ b/README.md @@ -341,6 +341,33 @@ is provided. For example, in an AWS Lambda function some metadata is not available until the first function invocation -- which is some async time after Client creation. +### `client.lambdaStart()` + +Tells the client that a Lambda function invocation has started. + +#### Notes on Lambda usage + +To properly handle [data flushing for instrumented Lambda functions](https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing) +this Client should be used as follows in a Lambda environment. + +- When a Lambda invocation starts, `client.lambdaStart()` must be called. + + The Client prevents intake requests to APM Server when in a Lambda environment + when a function invocation is *not* active. This is to ensure that an intake + request does not accidentally span a period when a Lambda VM is frozen, + which can lead to timeouts and lost APM data. + +- When a Lambda invocation finishes, `client.flush({lambdaEnd: true}, cb)` must + be called. + + The `lambdaEnd: true` tells the Client to (a) mark the lambda as inactive so + a subsequent intake request is not started until the next invocation, and + (b) signal the Elastic AWS Lambda Extension the this invocation is done. + The user's Lambda handler should not finish until `cb` is called. This + ensures that the extension receives tracing data and the end signal before + the Lambda Runtime freezes the VM. + + ### `client.sendSpan(span[, callback])` Send a span to the APM Server. @@ -381,7 +408,7 @@ Arguments: - `callback` - Callback is called when the `metricset` have been flushed to the underlying system -### `client.flush([callback])` +### `client.flush([opts,] [callback])` Flush the internal buffer and end the current HTTP request to the APM Server. If no HTTP request is in process nothing happens. In an AWS Lambda @@ -390,6 +417,11 @@ because the APM agent always flushes at the end of a Lambda handler. Arguments: +- `opts`: + - `opts.lambdaEnd` - An optional boolean to indicate if this is the final + flush at the end of the Lambda function invocation. The client will do + some extra handling if this is the case. See notes in `client.lambdaStart()` + above. - `callback` - Callback is called when the internal buffer has been flushed and the HTTP request ended. If no HTTP request is in progress the callback is called in the next tick. diff --git a/index.js b/index.js index d017d76..8fdc2eb 100644 --- a/index.js +++ b/index.js @@ -7,6 +7,7 @@ const http = require('http') const https = require('https') const util = require('util') const os = require('os') +const { performance } = require('perf_hooks') const { URL } = require('url') const zlib = require('zlib') @@ -25,11 +26,12 @@ const truncate = require('./lib/truncate') module.exports = Client -// This symbol is used as a marker in the client stream to indicate special +// These symbols are used as markers in the client stream to indicate special // flush handling. const kFlush = Symbol('flush') +const kLambdaEndFlush = Symbol('lambdaEndFlush') function isFlushMarker (obj) { - return obj === kFlush + return obj === kFlush || obj === kLambdaEndFlush } const hostname = os.hostname() @@ -103,6 +105,9 @@ function Client (opts) { this._cloudMetadata = null this._extraMetadata = null this._metadataFilters = new Filters() + // _lambdaActive indicates if a Lambda function invocation is active. It is + // only meaningful if `isLambdaExecutionEnvironment`. + this._lambdaActive = false // Internal runtime stats for developer debugging/tuning. this._numEvents = 0 // number of events given to the client @@ -280,6 +285,7 @@ Client.prototype.config = function (opts) { // http request options this._conf.requestIntake = getIntakeRequestOptions(this._conf, this._agent) this._conf.requestConfig = getConfigRequestOptions(this._conf, this._agent) + this._conf.requestSignalLambdaEnd = getSignalLambdaEndRequestOptions(this._conf, this._agent) this._conf.metadata = getMetadata(this._conf) @@ -307,8 +313,8 @@ Client.prototype.setExtraMetadata = function (extraMetadata) { this._resetEncodedMetadata() if (this._conf.expectExtraMetadata) { + this._log.trace('maybe uncork (expectExtraMetadata)') this._maybeUncork() - this._log.trace('uncorked (expectExtraMetadata)') } } @@ -429,7 +435,7 @@ Client.prototype._ref = function () { Client.prototype._write = function (obj, enc, cb) { if (isFlushMarker(obj)) { - this._writeFlush(cb) + this._writeFlush(obj, cb) } else { const t = process.hrtime() const chunk = this._encode(obj, enc) @@ -481,10 +487,10 @@ Client.prototype._writev = function (objs, cb) { offset = flushIdx } else if (flushIdx === objs.length - 1) { // The next item is a flush marker, and it is the *last* item in the queue. - this._writeFlush(cb) + this._writeFlush(objs[flushIdx].chunk, cb) } else { // The next item in the queue is a flush. - this._writeFlush(processBatch) + this._writeFlush(objs[flushIdx].chunk, processBatch) offset++ } } @@ -525,32 +531,54 @@ Client.prototype._writeBatch = function (objs, cb) { }, '_writeBatch') } -Client.prototype._writeFlush = function (cb) { - this._log.trace({ activeIntakeReq: this._activeIntakeReq }, '_writeFlush') - if (this._activeIntakeReq) { - // In a Lambda environment a flush is almost certainly a signal that the - // runtime environment is about to be frozen: tell the intake request - // to finish up quickly. - if (this._intakeRequestGracefulExitFn && isLambdaExecutionEnvironment) { - this._intakeRequestGracefulExitFn() +Client.prototype._writeFlush = function (flushMarker, cb) { + this._log.trace({ activeIntakeReq: this._activeIntakeReq, + lambdaEnd: flushMarker === kLambdaEndFlush }, '_writeFlush') + + let onFlushed = cb + if (isLambdaExecutionEnvironment && flushMarker === kLambdaEndFlush) { + // XXX For now the "micdrop" (the `?flushed=true` signal to the ext) is + // disabled by default until there is an extension release after v0.0.3 + // with https://github.com/elastic/apm-aws-lambda/pull/132 which fixes + // panics reported in https://github.com/elastic/apm-aws-lambda/issues/133 + // To enable: XXX_ELASTIC_APM_ENABLE_MICDROP=1 + const micdropEnabled = process.env.XXX_ELASTIC_APM_ENABLE_MICDROP && + process.env.XXX_ELASTIC_APM_ENABLE_MICDROP !== '0' && + process.env.XXX_ELASTIC_APM_ENABLE_MICDROP !== 'false' + if (micdropEnabled) { + onFlushed = () => { + // Signal the Elastic AWS Lambda extension that it is done passing data + // for this invocation, then call `cb()` so the wrapped Lambda handler + // can finish. + this._signalLambdaEnd(cb) + } + } else { + console.log('XXX micdrop disabled, use XXX_ELASTIC_APM_ENABLE_MICDROP=1 to enable') } - this._onIntakeReqConcluded = cb + } + + if (this._activeIntakeReq) { + this._onIntakeReqConcluded = onFlushed this._chopper.chop() } else { - this._chopper.chop(cb) + this._chopper.chop(onFlushed) } } Client.prototype._maybeCork = function () { - if (!this._writableState.corked && this._conf.bufferWindowTime !== -1) { - this.cork() - if (this._corkTimer && this._corkTimer.refresh) { - // the refresh function was added in Node 10.2.0 - this._corkTimer.refresh() - } else { - this._corkTimer = setTimeout(() => { - this.uncork() - }, this._conf.bufferWindowTime) + if (!this._writableState.corked) { + if (isLambdaExecutionEnvironment && !this._lambdaActive) { + this.cork() + } else if (this._conf.bufferWindowTime !== -1) { + this.cork() + if (this._corkTimer && this._corkTimer.refresh) { + // the refresh function was added in Node 10.2.0 + this._corkTimer.refresh() + } else { + this._corkTimer = setTimeout(() => { + this.uncork() + }, this._conf.bufferWindowTime) + } } } else if (this._writableState.length >= this._conf.bufferWindowSize) { this._maybeUncork() @@ -562,6 +590,8 @@ Client.prototype._maybeUncork = function () { // fetched-or-skipped. if (!this._encodedMetadata) { return + } else if (isLambdaExecutionEnvironment && !this._lambdaActive) { + return } if (this._writableState.corked) { @@ -603,6 +633,10 @@ Client.prototype._encode = function (obj, enc) { return ndjson.serialize(out) } +Client.prototype.lambdaStart = function () { + this._lambdaActive = true +} + // With the cork/uncork handling on this stream, `this.write`ing on this // stream when already destroyed will lead to: // Error: Cannot call write after a stream was destroyed @@ -652,14 +686,42 @@ Client.prototype.sendMetricSet = function (metricset, cb) { return this.write({ metricset }, Client.encoding.METRICSET, cb) } -Client.prototype.flush = function (cb) { +/** + * If possible, start a flush of currently queued APM events to APM server. + * + * "If possible," because there are some guards on uncorking. See `_maybeUncork`. + * + * @param {Object} opts - Optional. + * - {Boolean} opts.lambdaEnd - Optional. Default false. Setting this true + * tells the client to also handle the end of a Lambda function invocation. + * @param {Function} cb - Optional. `cb()` will be called when the data has + * be sent to APM Server (or failed in the attempt). + */ +Client.prototype.flush = function (opts, cb) { + if (typeof opts === 'function') { + cb = opts + opts = {} + } else if (!opts) { + opts = {} + } + const lambdaEnd = !!opts.lambdaEnd + this._maybeUncork() + let flushMarker = kFlush + if (lambdaEnd) { + flushMarker = kLambdaEndFlush + // Set this synchronously after _maybeUncork to ensure that subsequently + // sent events (via `sendSpan` et al) will result in corking the stream + // (in `_maybeCork`) until the *next* Lambda function invocation. + this._lambdaActive = false + } + // Write the special "flush" signal. We do this so that the order of writes // and flushes are kept. If we where to just flush the client right here, the // internal Writable buffer might still contain data that hasn't yet been // given to the _write function. - return this.write(kFlush, cb) + return this.write(flushMarker, cb) } // A handler that can be called on process "beforeExit" to attempt quick and @@ -1048,6 +1110,57 @@ Client.prototype.supportsKeepingUnsampledTransaction = function () { } } +/** + * Signal to the Elastic AWS Lambda extension that a lambda function execution + * is done. + * https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing + * + * @param {Function} cb() is called when finished. There are no arguments. + */ +Client.prototype._signalLambdaEnd = function (cb) { + this._log.trace('_signalLambdaEnd start') + const startTime = performance.now() + const finish = errOrErrMsg => { + const durationMs = performance.now() - startTime + let err = null + if (errOrErrMsg) { + this._log.error({ err: errOrErrMsg, durationMs }, 'error signaling lambda invocation done') + } else { + this._log.trace({ durationMs }, 'signaled lambda invocation done') + } + cb() + } + + // We expect to be talking to the localhost Elastic Lambda extension, so we + // want a shorter timeout than `_conf.serverTimeout`. + const TIMEOUT_MS = 5000 + + const req = this._transportRequest(this._conf.requestSignalLambdaEnd, res => { + res.on('error', err => { + // Not sure this event can ever be emitted, but just in case. + res.destroy(err) + }) + res.resume() + if (res.statusCode !== 202) { + finish(`unexpected response status code: ${res.statusCode}`) + return + } + res.on('end', function () { + finish() + }) + }) + req.setTimeout(TIMEOUT_MS) + req.on('timeout', () => { + this._log.trace('_signalLambdaEnd timeout') + req.destroy(new Error(`timeout (${TIMEOUT_MS}ms) signaling Lambda invocation done`)) + }) + req.on('error', err => { + finish(err) + }) + req.end() +} + + /** * Fetch the APM Server version and set `this._apmServerVersion`. * https://www.elastic.co/guide/en/apm/server/current/server-info.html @@ -1164,6 +1277,13 @@ function getIntakeRequestOptions (opts, agent) { return getBasicRequestOptions('POST', '/intake/v2/events', headers, opts, agent) } +function getSignalLambdaEndRequestOptions (opts, agent) { + const headers = getHeaders(opts) + headers['Content-Length'] = 0 + + return getBasicRequestOptions('POST', '/intake/v2/events?flushed=true', headers, opts, agent) +} + function getConfigRequestOptions (opts, agent) { const path = '/config/v1/agents?' + querystring.stringify({ 'service.name': opts.serviceName, diff --git a/package.json b/package.json index 02661a5..9dd9594 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "elastic-apm-http-client", - "version": "10.4.0", + "version": "10.5.0", "description": "A low-level HTTP client for communicating with the Elastic APM intake API", "main": "index.js", "directories": { From 0933f537a7e056b9fa4f4de5e8e418f69a7f55b3 Mon Sep 17 00:00:00 2001 From: Trent Mick Date: Thu, 24 Feb 2022 10:39:06 -0800 Subject: [PATCH 06/13] make 'standard' happy --- index.js | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/index.js b/index.js index 8fdc2eb..1fe7777 100644 --- a/index.js +++ b/index.js @@ -532,8 +532,7 @@ Client.prototype._writeBatch = function (objs, cb) { } Client.prototype._writeFlush = function (flushMarker, cb) { - this._log.trace({ activeIntakeReq: this._activeIntakeReq, - lambdaEnd: flushMarker === kLambdaEndFlush }, '_writeFlush') + this._log.trace({ activeIntakeReq: this._activeIntakeReq, lambdaEnd: flushMarker === kLambdaEndFlush }, '_writeFlush') let onFlushed = cb if (isLambdaExecutionEnvironment && flushMarker === kLambdaEndFlush) { @@ -1122,7 +1121,6 @@ Client.prototype._signalLambdaEnd = function (cb) { const startTime = performance.now() const finish = errOrErrMsg => { const durationMs = performance.now() - startTime - let err = null if (errOrErrMsg) { this._log.error({ err: errOrErrMsg, durationMs }, 'error signaling lambda invocation done') } else { @@ -1160,7 +1158,6 @@ Client.prototype._signalLambdaEnd = function (cb) { req.end() } - /** * Fetch the APM Server version and set `this._apmServerVersion`. * https://www.elastic.co/guide/en/apm/server/current/server-info.html From 29e09ab361726452fe421b96fd9c477d7b814b54 Mon Sep 17 00:00:00 2001 From: Trent Mick Date: Mon, 28 Feb 2022 15:02:17 -0800 Subject: [PATCH 07/13] fix a broken merge --- index.js | 1 + 1 file changed, 1 insertion(+) diff --git a/index.js b/index.js index c94f318..1fe7777 100644 --- a/index.js +++ b/index.js @@ -32,6 +32,7 @@ const kFlush = Symbol('flush') const kLambdaEndFlush = Symbol('lambdaEndFlush') function isFlushMarker (obj) { return obj === kFlush || obj === kLambdaEndFlush +} const hostname = os.hostname() const requiredOpts = [ From 30c73317dad493df4b81b707db401fa9cefd70f1 Mon Sep 17 00:00:00 2001 From: Trent Mick Date: Wed, 2 Mar 2022 11:11:23 -0800 Subject: [PATCH 08/13] fix the occasional 'null' handler result due to unintended 'beforeExit' due to this socket unref Given that we now ensure the intake request and flush completes before calling the Lambda handler callback, we *want* to keep that socket ref'd. --- index.js | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/index.js b/index.js index 1fe7777..7b424ba 100644 --- a/index.js +++ b/index.js @@ -192,6 +192,14 @@ function Client (opts) { this._index = clientsToAutoEnd.length clientsToAutoEnd.push(this) + // The 'beforeExit' event is significant in Lambda invocation completion + // handling, so we log it for debugging. + if (isLambdaExecutionEnvironment && this._log.isLevelEnabled('trace')) { + process.prependListener('beforeExit', () => { + this._log.trace('process "beforeExit"') + }) + } + if (this._conf.centralConfig) { this._pollConfig() } @@ -585,11 +593,13 @@ Client.prototype._maybeCork = function () { } Client.prototype._maybeUncork = function () { - // client must remain corked until cloud metadata has been - // fetched-or-skipped. if (!this._encodedMetadata) { + // The client must remain corked until cloud metadata has been + // fetched-or-skipped. return } else if (isLambdaExecutionEnvironment && !this._lambdaActive) { + // In a Lambda env, we must only uncork when an invocation is active, + // otherwise we could start an intake request just before the VM is frozen. return } @@ -970,7 +980,12 @@ function getChoppedStreamHandler (client, onerror) { // during a request. Given that the normal makeIntakeRequest behaviour // is to keep a request open for up to 10s (`apiRequestTimeout`), we must // manually unref the socket. - if (!intakeRequestGracefulExitCalled) { + // + // The exception is when in a Lambda environment, where we *do* want to + // keep the node process running to complete this intake request. + // Otherwise a 'beforeExit' event can be sent, which the Lambda runtime + // interprets as "the Lambda handler callback was never called". + if (!isLambdaExecutionEnvironment && !intakeRequestGracefulExitCalled) { log.trace('intakeReq "socket": unref it') intakeReqSocket.unref() } From 54a025de893d96f843a5f909c3d4cc9409bd525e Mon Sep 17 00:00:00 2001 From: Trent Mick Date: Wed, 2 Mar 2022 17:04:47 -0800 Subject: [PATCH 09/13] grammar fix --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 08e74f6..fd4f7b8 100644 --- a/README.md +++ b/README.md @@ -362,7 +362,7 @@ this Client should be used as follows in a Lambda environment. The `lambdaEnd: true` tells the Client to (a) mark the lambda as inactive so a subsequent intake request is not started until the next invocation, and - (b) signal the Elastic AWS Lambda Extension the this invocation is done. + (b) signal the Elastic AWS Lambda Extension that this invocation is done. The user's Lambda handler should not finish until `cb` is called. This ensures that the extension receives tracing data and the end signal before the Lambda Runtime freezes the VM. From bc9454b5ba1a9c9cb6fb5f6f9abac7f7d6fdd2e1 Mon Sep 17 00:00:00 2001 From: Trent Mick Date: Wed, 2 Mar 2022 17:12:27 -0800 Subject: [PATCH 10/13] remove the for-dev XXX_ELASTIC_APM_ENABLE_MICDROP disabling of the micdrop by default --- index.js | 22 +++++----------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/index.js b/index.js index 7b424ba..def5601 100644 --- a/index.js +++ b/index.js @@ -544,23 +544,11 @@ Client.prototype._writeFlush = function (flushMarker, cb) { let onFlushed = cb if (isLambdaExecutionEnvironment && flushMarker === kLambdaEndFlush) { - // XXX For now the "micdrop" (the `?flushed=true` signal to the ext) is - // disabled by default until there is an extension release after v0.0.3 - // with https://github.com/elastic/apm-aws-lambda/pull/132 which fixes - // panics reported in https://github.com/elastic/apm-aws-lambda/issues/133 - // To enable: XXX_ELASTIC_APM_ENABLE_MICDROP=1 - const micdropEnabled = process.env.XXX_ELASTIC_APM_ENABLE_MICDROP && - process.env.XXX_ELASTIC_APM_ENABLE_MICDROP !== '0' && - process.env.XXX_ELASTIC_APM_ENABLE_MICDROP !== 'false' - if (micdropEnabled) { - onFlushed = () => { - // Signal the Elastic AWS Lambda extension that it is done passing data - // for this invocation, then call `cb()` so the wrapped Lambda handler - // can finish. - this._signalLambdaEnd(cb) - } - } else { - console.log('XXX micdrop disabled, use XXX_ELASTIC_APM_ENABLE_MICDROP=1 to enable') + onFlushed = () => { + // Signal the Elastic AWS Lambda extension that it is done passing data + // for this invocation, then call `cb()` so the wrapped Lambda handler + // can finish. + this._signalLambdaEnd(cb) } } From 8e56aa5a0bbd215a1d810aab0ce16bda6ee0cfb6 Mon Sep 17 00:00:00 2001 From: Trent Mick Date: Thu, 3 Mar 2022 15:37:45 -0800 Subject: [PATCH 11/13] add tests; a corking fix This fixes an issue with corking when events come after a client.flush({lambdaEnd: true}) in *the same tick*. Before this those events could slip into the stream and result in a subsequent intake request starting after the `lambdaEnd`... which would often result in those events being lost. --- index.js | 28 ++++--- lib/logging.js | 1 + package.json | 3 +- test/lambda-usage.test.js | 161 ++++++++++++++++++++++++++++++++++++++ test/run_tests.sh | 12 +++ 5 files changed, 190 insertions(+), 15 deletions(-) create mode 100644 test/lambda-usage.test.js create mode 100755 test/run_tests.sh diff --git a/index.js b/index.js index def5601..1a30978 100644 --- a/index.js +++ b/index.js @@ -596,7 +596,7 @@ Client.prototype._maybeUncork = function () { // to `_maybeUncork` have time to be added to the queue. If we didn't do // this, that last write would trigger a single call to `_write`. process.nextTick(() => { - if (this.destroyed === false) { + if (this.destroyed === false && !(isLambdaExecutionEnvironment && !this._lambdaActive)) { this.uncork() } }) @@ -703,22 +703,24 @@ Client.prototype.flush = function (opts, cb) { } const lambdaEnd = !!opts.lambdaEnd - this._maybeUncork() - - let flushMarker = kFlush - if (lambdaEnd) { - flushMarker = kLambdaEndFlush - // Set this synchronously after _maybeUncork to ensure that subsequently - // sent events (via `sendSpan` et al) will result in corking the stream - // (in `_maybeCork`) until the *next* Lambda function invocation. - this._lambdaActive = false - } - // Write the special "flush" signal. We do this so that the order of writes // and flushes are kept. If we where to just flush the client right here, the // internal Writable buffer might still contain data that hasn't yet been // given to the _write function. - return this.write(flushMarker, cb) + + if (lambdaEnd && isLambdaExecutionEnvironment && this._lambdaActive) { + // To flush the current data and ensure that subsequently sent events *in + // the same tick* do not start a new intake request, we must uncork + // synchronously -- rather than the nextTick uncork done in `_maybeUncork()`. + assert(this._encodedMetadata, 'client.flush({lambdaEnd:true}) must not be called before metadata has been set') + const rv = this.write(kLambdaEndFlush, cb) + this.uncork() + this._lambdaActive = false + return rv + } else { + this._maybeUncork() + return this.write(kFlush, cb) + } } // A handler that can be called on process "beforeExit" to attempt quick and diff --git a/lib/logging.js b/lib/logging.js index 9d5d674..439737c 100644 --- a/lib/logging.js +++ b/lib/logging.js @@ -13,6 +13,7 @@ class NoopLogger { error () {} fatal () {} child () { return this } + isLevelEnabled (_level) { return false } } module.exports = { diff --git a/package.json b/package.json index 9dd9594..51a7cde 100644 --- a/package.json +++ b/package.json @@ -9,9 +9,8 @@ "files": [ "lib" ], - "// scripts.test": "quoting arg to tape to avoid too long argv, let tape do globbing", "scripts": { - "test": "standard && nyc tape \"test/*.test.js\"" + "test": "standard && nyc ./test/run_tests.sh" }, "engines": { "node": "^8.6.0 || 10 || >=12" diff --git a/test/lambda-usage.test.js b/test/lambda-usage.test.js new file mode 100644 index 0000000..8bde59a --- /dev/null +++ b/test/lambda-usage.test.js @@ -0,0 +1,161 @@ +'use strict' + +// Test the expected usage of this Client in an AWS Lambda environment. +// The "Notes on Lambda usage" section in the README.md describes the +// expected usage. + +// Must set this before the Client is imported so it thinks it is in a Lambda env. +process.env.AWS_LAMBDA_FUNCTION_NAME = 'myFn' + +const { URL } = require('url') +const zlib = require('zlib') +const test = require('tape') +const { APMServer } = require('./lib/utils') + +test('lambda usage', suite => { + let server + let client + let reqsToServer = [] + let lateSpanInSameTickCallbackCalled = false + let lateSpanInNextTickCallbackCalled = false + + test('setup mock APM server', t => { + server = APMServer(function (req, res) { + // Capture intake req data to this mock APM server to `reqsToServer`. + const reqInfo = { + method: req.method, + path: req.url, + url: new URL(req.url, 'http://localhost'), + headers: req.headers, + events: [] + } + let instream = req + if (req.headers['content-encoding'] === 'gzip') { + instream = req.pipe(zlib.createGunzip()) + } else { + instream.setEncoding('utf8') + } + let body = '' + instream.on('data', chunk => { + body += chunk + }) + instream.on('end', () => { + body + .split(/\n/g) // parse each line + .filter(line => line.trim()) // ... if it is non-empty + .forEach(line => { + reqInfo.events.push(JSON.parse(line)) // ... append to reqInfo.events + }) + reqsToServer.push(reqInfo) + res.writeHead(202) // the expected response from intake API endpoint + res.end('{}') + }) + }) + + server.client({ + apmServerVersion: '8.0.0', + centralConfig: false + }, function (client_) { + client = client_ + t.end() + }) + }) + + test('clients stays corked before .lambdaStart()', t => { + // Add more events than `bufferWindowSize` and wait for more than + // `bufferWindowTime`, and the Client should *still* be corked. + const aTrans = { name: 'aTrans', type: 'custom', result: 'success' /* ... */ } + for (let i = 0; i < client._conf.bufferWindowSize + 1; i++) { + client.sendTransaction(aTrans) + } + setTimeout(() => { + t.equal(client._writableState.corked, 1, + 'corked after bufferWindowSize events and bufferWindowTime') + t.equal(reqsToServer.length, 0, 'no intake request was made to APM Server') + t.end() + }, client._conf.bufferWindowTime + 10) + }) + + test('lambda invocation', t => { + client.lambdaStart() // 1. start of invocation + setTimeout(() => { + client.sendTransaction({ name: 'GET /aStage/myFn', type: 'lambda', result: 'success' /* ... */ }) + client.sendSpan({ name: 'mySpan', type: 'custom', result: 'success' /* ... */ }) + + // 2. end of invocation + client.flush({ lambdaEnd: true }, function () { + t.ok(reqsToServer.length > 1, 'at least 2 intake requests to APM Server') + t.equal(reqsToServer[reqsToServer.length - 1].url.searchParams.get('flushed'), 'true', + 'the last intake request had "?flushed=true" query param') + + let allEvents = [] + reqsToServer.forEach(r => { allEvents = allEvents.concat(r.events) }) + t.equal(allEvents[allEvents.length - 2].transaction.name, 'GET /aStage/myFn', + 'second last event is the lambda transaction') + t.equal(allEvents[allEvents.length - 1].span.name, 'mySpan', + 'last event is the lambda span') + + reqsToServer = [] // reset + t.end() + }) + + // Explicitly send late events and flush *after* the + // `client.flush({lambdaEnd:true})` -- both in the same tick and next + // ticks -- to test that these get buffered until the next lambda + // invocation. + client.sendSpan({ name: 'lateSpanInSameTick', type: 'custom' /* ... */ }) + client.flush(function () { + lateSpanInSameTickCallbackCalled = true + }) + setImmediate(() => { + client.sendSpan({ name: 'lateSpanInNextTick', type: 'custom' /* ... */ }) + client.flush(function () { + lateSpanInNextTickCallbackCalled = true + }) + }) + }, 10) + }) + + // Give some time to make sure there isn't some unexpected short async + // interaction. + test('pause between lambda invocations', t => { + setTimeout(() => { + t.end() + }, 1000) + }) + + test('second lambda invocation', t => { + t.equal(lateSpanInSameTickCallbackCalled, false, 'lateSpanInSameTick flush callback not yet called') + t.equal(lateSpanInNextTickCallbackCalled, false, 'lateSpanInNextTick flush callback not yet called') + t.equal(reqsToServer.length, 0, 'no intake request was made to APM Server since last lambdaEnd') + + client.lambdaStart() + setTimeout(() => { + client.flush({ lambdaEnd: true }, () => { + t.equal(reqsToServer.length, 3, '3 intake requests to APM Server') + t.equal(lateSpanInSameTickCallbackCalled, true, 'lateSpanInSameTick flush callback has now been called') + t.equal(lateSpanInNextTickCallbackCalled, true, 'lateSpanInNextTick flush callback has now been called') + + t.equal(reqsToServer[0].events.length, 2, + 'the first intake request has 2 events') + t.equal(reqsToServer[0].events[1].span.name, 'lateSpanInSameTick', + 'of which the second event is the lateSpanInSameTick') + t.equal(reqsToServer[1].events.length, 2, + 'the second intake request has 2 events') + t.equal(reqsToServer[1].events[1].span.name, 'lateSpanInNextTick', + 'of which the second event is the lateSpanInNextTick') + t.equal(reqsToServer[reqsToServer.length - 1].url.searchParams.get('flushed'), 'true', + 'the last intake request had "?flushed=true" query param') + t.end() + }) + }, 10) + }) + + test('teardown', t => { + server.close() + client.destroy() + t.end() + }) + + suite.end() +}) diff --git a/test/run_tests.sh b/test/run_tests.sh new file mode 100755 index 0000000..8f6df96 --- /dev/null +++ b/test/run_tests.sh @@ -0,0 +1,12 @@ +#!/bin/bash +# +# Run each test/*.test.js file in a separate process. +# + +TOP=$(cd $(dirname $0)/../ >/dev/null; pwd) + +ls $TOP/test/*.test.js | while read f; do + echo "" + echo "# runnign 'node $f'" + node $f +done From 0e9f22dd40c9310a8c6237e7020b9fafbc02ac1a Mon Sep 17 00:00:00 2001 From: Trent Mick Date: Thu, 3 Mar 2022 15:40:28 -0800 Subject: [PATCH 12/13] note for why we are running test files in separate processes now --- test/lambda-usage.test.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/lambda-usage.test.js b/test/lambda-usage.test.js index 8bde59a..e76c8ef 100644 --- a/test/lambda-usage.test.js +++ b/test/lambda-usage.test.js @@ -3,6 +3,8 @@ // Test the expected usage of this Client in an AWS Lambda environment. // The "Notes on Lambda usage" section in the README.md describes the // expected usage. +// +// Note: This test file needs to be run in its own process. // Must set this before the Client is imported so it thinks it is in a Lambda env. process.env.AWS_LAMBDA_FUNCTION_NAME = 'myFn' From 470b1ab0ee984365faceb0c2578d44cc338b56c0 Mon Sep 17 00:00:00 2001 From: Trent Mick Date: Thu, 3 Mar 2022 16:57:58 -0800 Subject: [PATCH 13/13] need to make this a new major --- CHANGELOG.md | 7 ++++++- package.json | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a54a42b..6b1b802 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # elastic-apm-http-client changelog -## v10.5.0 +## v11.0.0 - Add support for coordinating data flushing in an AWS Lambda environment. The following two API additions are used to ensure that (a) the Elastic Lambda @@ -14,6 +14,11 @@ boolean. Set it to true to indicate this is a flush at the end of a Lambda function invocation. + This is a **BREAKING CHANGE**, because current versions of elastic-apm-node + depend on `^10.4.0`. If this were released as another 10.x, then usage of + current elastic-apm-node with this version of the client would break + behavior in a Lambda environment. + ## v10.4.0 - Add APM Server version checking to the client. On creation the client will diff --git a/package.json b/package.json index 51a7cde..46b5ef5 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "elastic-apm-http-client", - "version": "10.5.0", + "version": "11.0.0", "description": "A low-level HTTP client for communicating with the Elastic APM intake API", "main": "index.js", "directories": {