diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f67276..6b1b802 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,24 @@ # elastic-apm-http-client changelog +## v11.0.0 + +- Add support for coordinating data flushing in an AWS Lambda environment. The + following two API additions are used to ensure that (a) the Elastic Lambda + extension is signaled at invocation end [per spec](https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing) + and (b) a new intake request is not started when a Lambda function invocation + is not active. + + - `Client#lambdaStart()` should be used to indicate when a Lambda function + invocation begins. + - `Client#flush([opts,] cb)` now supports an optional `opts.lambdaEnd` + boolean. Set it to true to indicate this is a flush at the end of a Lambda + function invocation. + + This is a **BREAKING CHANGE**, because current versions of elastic-apm-node + depend on `^10.4.0`. If this were released as another 10.x, then usage of + current elastic-apm-node with this version of the client would break + behavior in a Lambda environment. + ## v10.4.0 - Add APM Server version checking to the client. On creation the client will diff --git a/README.md b/README.md index 0b24a17..fd4f7b8 100644 --- a/README.md +++ b/README.md @@ -341,6 +341,33 @@ is provided. For example, in an AWS Lambda function some metadata is not available until the first function invocation -- which is some async time after Client creation. +### `client.lambdaStart()` + +Tells the client that a Lambda function invocation has started. + +#### Notes on Lambda usage + +To properly handle [data flushing for instrumented Lambda functions](https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing) +this Client should be used as follows in a Lambda environment. + +- When a Lambda invocation starts, `client.lambdaStart()` must be called. + + The Client prevents intake requests to APM Server when in a Lambda environment + when a function invocation is *not* active. This is to ensure that an intake + request does not accidentally span a period when a Lambda VM is frozen, + which can lead to timeouts and lost APM data. + +- When a Lambda invocation finishes, `client.flush({lambdaEnd: true}, cb)` must + be called. + + The `lambdaEnd: true` tells the Client to (a) mark the lambda as inactive so + a subsequent intake request is not started until the next invocation, and + (b) signal the Elastic AWS Lambda Extension that this invocation is done. + The user's Lambda handler should not finish until `cb` is called. This + ensures that the extension receives tracing data and the end signal before + the Lambda Runtime freezes the VM. + + ### `client.sendSpan(span[, callback])` Send a span to the APM Server. @@ -381,7 +408,7 @@ Arguments: - `callback` - Callback is called when the `metricset` have been flushed to the underlying system -### `client.flush([callback])` +### `client.flush([opts,] [callback])` Flush the internal buffer and end the current HTTP request to the APM Server. If no HTTP request is in process nothing happens. In an AWS Lambda @@ -390,6 +417,11 @@ because the APM agent always flushes at the end of a Lambda handler. Arguments: +- `opts`: + - `opts.lambdaEnd` - An optional boolean to indicate if this is the final + flush at the end of the Lambda function invocation. The client will do + some extra handling if this is the case. See notes in `client.lambdaStart()` + above. - `callback` - Callback is called when the internal buffer has been flushed and the HTTP request ended. If no HTTP request is in progress the callback is called in the next tick. diff --git a/index.js b/index.js index d017d76..1a30978 100644 --- a/index.js +++ b/index.js @@ -7,6 +7,7 @@ const http = require('http') const https = require('https') const util = require('util') const os = require('os') +const { performance } = require('perf_hooks') const { URL } = require('url') const zlib = require('zlib') @@ -25,11 +26,12 @@ const truncate = require('./lib/truncate') module.exports = Client -// This symbol is used as a marker in the client stream to indicate special +// These symbols are used as markers in the client stream to indicate special // flush handling. const kFlush = Symbol('flush') +const kLambdaEndFlush = Symbol('lambdaEndFlush') function isFlushMarker (obj) { - return obj === kFlush + return obj === kFlush || obj === kLambdaEndFlush } const hostname = os.hostname() @@ -103,6 +105,9 @@ function Client (opts) { this._cloudMetadata = null this._extraMetadata = null this._metadataFilters = new Filters() + // _lambdaActive indicates if a Lambda function invocation is active. It is + // only meaningful if `isLambdaExecutionEnvironment`. + this._lambdaActive = false // Internal runtime stats for developer debugging/tuning. this._numEvents = 0 // number of events given to the client @@ -187,6 +192,14 @@ function Client (opts) { this._index = clientsToAutoEnd.length clientsToAutoEnd.push(this) + // The 'beforeExit' event is significant in Lambda invocation completion + // handling, so we log it for debugging. + if (isLambdaExecutionEnvironment && this._log.isLevelEnabled('trace')) { + process.prependListener('beforeExit', () => { + this._log.trace('process "beforeExit"') + }) + } + if (this._conf.centralConfig) { this._pollConfig() } @@ -280,6 +293,7 @@ Client.prototype.config = function (opts) { // http request options this._conf.requestIntake = getIntakeRequestOptions(this._conf, this._agent) this._conf.requestConfig = getConfigRequestOptions(this._conf, this._agent) + this._conf.requestSignalLambdaEnd = getSignalLambdaEndRequestOptions(this._conf, this._agent) this._conf.metadata = getMetadata(this._conf) @@ -307,8 +321,8 @@ Client.prototype.setExtraMetadata = function (extraMetadata) { this._resetEncodedMetadata() if (this._conf.expectExtraMetadata) { + this._log.trace('maybe uncork (expectExtraMetadata)') this._maybeUncork() - this._log.trace('uncorked (expectExtraMetadata)') } } @@ -429,7 +443,7 @@ Client.prototype._ref = function () { Client.prototype._write = function (obj, enc, cb) { if (isFlushMarker(obj)) { - this._writeFlush(cb) + this._writeFlush(obj, cb) } else { const t = process.hrtime() const chunk = this._encode(obj, enc) @@ -481,10 +495,10 @@ Client.prototype._writev = function (objs, cb) { offset = flushIdx } else if (flushIdx === objs.length - 1) { // The next item is a flush marker, and it is the *last* item in the queue. - this._writeFlush(cb) + this._writeFlush(objs[flushIdx].chunk, cb) } else { // The next item in the queue is a flush. - this._writeFlush(processBatch) + this._writeFlush(objs[flushIdx].chunk, processBatch) offset++ } } @@ -525,32 +539,41 @@ Client.prototype._writeBatch = function (objs, cb) { }, '_writeBatch') } -Client.prototype._writeFlush = function (cb) { - this._log.trace({ activeIntakeReq: this._activeIntakeReq }, '_writeFlush') - if (this._activeIntakeReq) { - // In a Lambda environment a flush is almost certainly a signal that the - // runtime environment is about to be frozen: tell the intake request - // to finish up quickly. - if (this._intakeRequestGracefulExitFn && isLambdaExecutionEnvironment) { - this._intakeRequestGracefulExitFn() +Client.prototype._writeFlush = function (flushMarker, cb) { + this._log.trace({ activeIntakeReq: this._activeIntakeReq, lambdaEnd: flushMarker === kLambdaEndFlush }, '_writeFlush') + + let onFlushed = cb + if (isLambdaExecutionEnvironment && flushMarker === kLambdaEndFlush) { + onFlushed = () => { + // Signal the Elastic AWS Lambda extension that it is done passing data + // for this invocation, then call `cb()` so the wrapped Lambda handler + // can finish. + this._signalLambdaEnd(cb) } - this._onIntakeReqConcluded = cb + } + + if (this._activeIntakeReq) { + this._onIntakeReqConcluded = onFlushed this._chopper.chop() } else { - this._chopper.chop(cb) + this._chopper.chop(onFlushed) } } Client.prototype._maybeCork = function () { - if (!this._writableState.corked && this._conf.bufferWindowTime !== -1) { - this.cork() - if (this._corkTimer && this._corkTimer.refresh) { - // the refresh function was added in Node 10.2.0 - this._corkTimer.refresh() - } else { - this._corkTimer = setTimeout(() => { - this.uncork() - }, this._conf.bufferWindowTime) + if (!this._writableState.corked) { + if (isLambdaExecutionEnvironment && !this._lambdaActive) { + this.cork() + } else if (this._conf.bufferWindowTime !== -1) { + this.cork() + if (this._corkTimer && this._corkTimer.refresh) { + // the refresh function was added in Node 10.2.0 + this._corkTimer.refresh() + } else { + this._corkTimer = setTimeout(() => { + this.uncork() + }, this._conf.bufferWindowTime) + } } } else if (this._writableState.length >= this._conf.bufferWindowSize) { this._maybeUncork() @@ -558,9 +581,13 @@ Client.prototype._maybeCork = function () { } Client.prototype._maybeUncork = function () { - // client must remain corked until cloud metadata has been - // fetched-or-skipped. if (!this._encodedMetadata) { + // The client must remain corked until cloud metadata has been + // fetched-or-skipped. + return + } else if (isLambdaExecutionEnvironment && !this._lambdaActive) { + // In a Lambda env, we must only uncork when an invocation is active, + // otherwise we could start an intake request just before the VM is frozen. return } @@ -569,7 +596,7 @@ Client.prototype._maybeUncork = function () { // to `_maybeUncork` have time to be added to the queue. If we didn't do // this, that last write would trigger a single call to `_write`. process.nextTick(() => { - if (this.destroyed === false) { + if (this.destroyed === false && !(isLambdaExecutionEnvironment && !this._lambdaActive)) { this.uncork() } }) @@ -603,6 +630,10 @@ Client.prototype._encode = function (obj, enc) { return ndjson.serialize(out) } +Client.prototype.lambdaStart = function () { + this._lambdaActive = true +} + // With the cork/uncork handling on this stream, `this.write`ing on this // stream when already destroyed will lead to: // Error: Cannot call write after a stream was destroyed @@ -652,14 +683,44 @@ Client.prototype.sendMetricSet = function (metricset, cb) { return this.write({ metricset }, Client.encoding.METRICSET, cb) } -Client.prototype.flush = function (cb) { - this._maybeUncork() +/** + * If possible, start a flush of currently queued APM events to APM server. + * + * "If possible," because there are some guards on uncorking. See `_maybeUncork`. + * + * @param {Object} opts - Optional. + * - {Boolean} opts.lambdaEnd - Optional. Default false. Setting this true + * tells the client to also handle the end of a Lambda function invocation. + * @param {Function} cb - Optional. `cb()` will be called when the data has + * be sent to APM Server (or failed in the attempt). + */ +Client.prototype.flush = function (opts, cb) { + if (typeof opts === 'function') { + cb = opts + opts = {} + } else if (!opts) { + opts = {} + } + const lambdaEnd = !!opts.lambdaEnd // Write the special "flush" signal. We do this so that the order of writes // and flushes are kept. If we where to just flush the client right here, the // internal Writable buffer might still contain data that hasn't yet been // given to the _write function. - return this.write(kFlush, cb) + + if (lambdaEnd && isLambdaExecutionEnvironment && this._lambdaActive) { + // To flush the current data and ensure that subsequently sent events *in + // the same tick* do not start a new intake request, we must uncork + // synchronously -- rather than the nextTick uncork done in `_maybeUncork()`. + assert(this._encodedMetadata, 'client.flush({lambdaEnd:true}) must not be called before metadata has been set') + const rv = this.write(kLambdaEndFlush, cb) + this.uncork() + this._lambdaActive = false + return rv + } else { + this._maybeUncork() + return this.write(kFlush, cb) + } } // A handler that can be called on process "beforeExit" to attempt quick and @@ -909,7 +970,12 @@ function getChoppedStreamHandler (client, onerror) { // during a request. Given that the normal makeIntakeRequest behaviour // is to keep a request open for up to 10s (`apiRequestTimeout`), we must // manually unref the socket. - if (!intakeRequestGracefulExitCalled) { + // + // The exception is when in a Lambda environment, where we *do* want to + // keep the node process running to complete this intake request. + // Otherwise a 'beforeExit' event can be sent, which the Lambda runtime + // interprets as "the Lambda handler callback was never called". + if (!isLambdaExecutionEnvironment && !intakeRequestGracefulExitCalled) { log.trace('intakeReq "socket": unref it') intakeReqSocket.unref() } @@ -1048,6 +1114,55 @@ Client.prototype.supportsKeepingUnsampledTransaction = function () { } } +/** + * Signal to the Elastic AWS Lambda extension that a lambda function execution + * is done. + * https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing + * + * @param {Function} cb() is called when finished. There are no arguments. + */ +Client.prototype._signalLambdaEnd = function (cb) { + this._log.trace('_signalLambdaEnd start') + const startTime = performance.now() + const finish = errOrErrMsg => { + const durationMs = performance.now() - startTime + if (errOrErrMsg) { + this._log.error({ err: errOrErrMsg, durationMs }, 'error signaling lambda invocation done') + } else { + this._log.trace({ durationMs }, 'signaled lambda invocation done') + } + cb() + } + + // We expect to be talking to the localhost Elastic Lambda extension, so we + // want a shorter timeout than `_conf.serverTimeout`. + const TIMEOUT_MS = 5000 + + const req = this._transportRequest(this._conf.requestSignalLambdaEnd, res => { + res.on('error', err => { + // Not sure this event can ever be emitted, but just in case. + res.destroy(err) + }) + res.resume() + if (res.statusCode !== 202) { + finish(`unexpected response status code: ${res.statusCode}`) + return + } + res.on('end', function () { + finish() + }) + }) + req.setTimeout(TIMEOUT_MS) + req.on('timeout', () => { + this._log.trace('_signalLambdaEnd timeout') + req.destroy(new Error(`timeout (${TIMEOUT_MS}ms) signaling Lambda invocation done`)) + }) + req.on('error', err => { + finish(err) + }) + req.end() +} + /** * Fetch the APM Server version and set `this._apmServerVersion`. * https://www.elastic.co/guide/en/apm/server/current/server-info.html @@ -1164,6 +1279,13 @@ function getIntakeRequestOptions (opts, agent) { return getBasicRequestOptions('POST', '/intake/v2/events', headers, opts, agent) } +function getSignalLambdaEndRequestOptions (opts, agent) { + const headers = getHeaders(opts) + headers['Content-Length'] = 0 + + return getBasicRequestOptions('POST', '/intake/v2/events?flushed=true', headers, opts, agent) +} + function getConfigRequestOptions (opts, agent) { const path = '/config/v1/agents?' + querystring.stringify({ 'service.name': opts.serviceName, diff --git a/lib/logging.js b/lib/logging.js index 9d5d674..439737c 100644 --- a/lib/logging.js +++ b/lib/logging.js @@ -13,6 +13,7 @@ class NoopLogger { error () {} fatal () {} child () { return this } + isLevelEnabled (_level) { return false } } module.exports = { diff --git a/package.json b/package.json index 02661a5..46b5ef5 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "elastic-apm-http-client", - "version": "10.4.0", + "version": "11.0.0", "description": "A low-level HTTP client for communicating with the Elastic APM intake API", "main": "index.js", "directories": { @@ -9,9 +9,8 @@ "files": [ "lib" ], - "// scripts.test": "quoting arg to tape to avoid too long argv, let tape do globbing", "scripts": { - "test": "standard && nyc tape \"test/*.test.js\"" + "test": "standard && nyc ./test/run_tests.sh" }, "engines": { "node": "^8.6.0 || 10 || >=12" diff --git a/test/lambda-usage.test.js b/test/lambda-usage.test.js new file mode 100644 index 0000000..e76c8ef --- /dev/null +++ b/test/lambda-usage.test.js @@ -0,0 +1,163 @@ +'use strict' + +// Test the expected usage of this Client in an AWS Lambda environment. +// The "Notes on Lambda usage" section in the README.md describes the +// expected usage. +// +// Note: This test file needs to be run in its own process. + +// Must set this before the Client is imported so it thinks it is in a Lambda env. +process.env.AWS_LAMBDA_FUNCTION_NAME = 'myFn' + +const { URL } = require('url') +const zlib = require('zlib') +const test = require('tape') +const { APMServer } = require('./lib/utils') + +test('lambda usage', suite => { + let server + let client + let reqsToServer = [] + let lateSpanInSameTickCallbackCalled = false + let lateSpanInNextTickCallbackCalled = false + + test('setup mock APM server', t => { + server = APMServer(function (req, res) { + // Capture intake req data to this mock APM server to `reqsToServer`. + const reqInfo = { + method: req.method, + path: req.url, + url: new URL(req.url, 'http://localhost'), + headers: req.headers, + events: [] + } + let instream = req + if (req.headers['content-encoding'] === 'gzip') { + instream = req.pipe(zlib.createGunzip()) + } else { + instream.setEncoding('utf8') + } + let body = '' + instream.on('data', chunk => { + body += chunk + }) + instream.on('end', () => { + body + .split(/\n/g) // parse each line + .filter(line => line.trim()) // ... if it is non-empty + .forEach(line => { + reqInfo.events.push(JSON.parse(line)) // ... append to reqInfo.events + }) + reqsToServer.push(reqInfo) + res.writeHead(202) // the expected response from intake API endpoint + res.end('{}') + }) + }) + + server.client({ + apmServerVersion: '8.0.0', + centralConfig: false + }, function (client_) { + client = client_ + t.end() + }) + }) + + test('clients stays corked before .lambdaStart()', t => { + // Add more events than `bufferWindowSize` and wait for more than + // `bufferWindowTime`, and the Client should *still* be corked. + const aTrans = { name: 'aTrans', type: 'custom', result: 'success' /* ... */ } + for (let i = 0; i < client._conf.bufferWindowSize + 1; i++) { + client.sendTransaction(aTrans) + } + setTimeout(() => { + t.equal(client._writableState.corked, 1, + 'corked after bufferWindowSize events and bufferWindowTime') + t.equal(reqsToServer.length, 0, 'no intake request was made to APM Server') + t.end() + }, client._conf.bufferWindowTime + 10) + }) + + test('lambda invocation', t => { + client.lambdaStart() // 1. start of invocation + setTimeout(() => { + client.sendTransaction({ name: 'GET /aStage/myFn', type: 'lambda', result: 'success' /* ... */ }) + client.sendSpan({ name: 'mySpan', type: 'custom', result: 'success' /* ... */ }) + + // 2. end of invocation + client.flush({ lambdaEnd: true }, function () { + t.ok(reqsToServer.length > 1, 'at least 2 intake requests to APM Server') + t.equal(reqsToServer[reqsToServer.length - 1].url.searchParams.get('flushed'), 'true', + 'the last intake request had "?flushed=true" query param') + + let allEvents = [] + reqsToServer.forEach(r => { allEvents = allEvents.concat(r.events) }) + t.equal(allEvents[allEvents.length - 2].transaction.name, 'GET /aStage/myFn', + 'second last event is the lambda transaction') + t.equal(allEvents[allEvents.length - 1].span.name, 'mySpan', + 'last event is the lambda span') + + reqsToServer = [] // reset + t.end() + }) + + // Explicitly send late events and flush *after* the + // `client.flush({lambdaEnd:true})` -- both in the same tick and next + // ticks -- to test that these get buffered until the next lambda + // invocation. + client.sendSpan({ name: 'lateSpanInSameTick', type: 'custom' /* ... */ }) + client.flush(function () { + lateSpanInSameTickCallbackCalled = true + }) + setImmediate(() => { + client.sendSpan({ name: 'lateSpanInNextTick', type: 'custom' /* ... */ }) + client.flush(function () { + lateSpanInNextTickCallbackCalled = true + }) + }) + }, 10) + }) + + // Give some time to make sure there isn't some unexpected short async + // interaction. + test('pause between lambda invocations', t => { + setTimeout(() => { + t.end() + }, 1000) + }) + + test('second lambda invocation', t => { + t.equal(lateSpanInSameTickCallbackCalled, false, 'lateSpanInSameTick flush callback not yet called') + t.equal(lateSpanInNextTickCallbackCalled, false, 'lateSpanInNextTick flush callback not yet called') + t.equal(reqsToServer.length, 0, 'no intake request was made to APM Server since last lambdaEnd') + + client.lambdaStart() + setTimeout(() => { + client.flush({ lambdaEnd: true }, () => { + t.equal(reqsToServer.length, 3, '3 intake requests to APM Server') + t.equal(lateSpanInSameTickCallbackCalled, true, 'lateSpanInSameTick flush callback has now been called') + t.equal(lateSpanInNextTickCallbackCalled, true, 'lateSpanInNextTick flush callback has now been called') + + t.equal(reqsToServer[0].events.length, 2, + 'the first intake request has 2 events') + t.equal(reqsToServer[0].events[1].span.name, 'lateSpanInSameTick', + 'of which the second event is the lateSpanInSameTick') + t.equal(reqsToServer[1].events.length, 2, + 'the second intake request has 2 events') + t.equal(reqsToServer[1].events[1].span.name, 'lateSpanInNextTick', + 'of which the second event is the lateSpanInNextTick') + t.equal(reqsToServer[reqsToServer.length - 1].url.searchParams.get('flushed'), 'true', + 'the last intake request had "?flushed=true" query param') + t.end() + }) + }, 10) + }) + + test('teardown', t => { + server.close() + client.destroy() + t.end() + }) + + suite.end() +}) diff --git a/test/run_tests.sh b/test/run_tests.sh new file mode 100755 index 0000000..8f6df96 --- /dev/null +++ b/test/run_tests.sh @@ -0,0 +1,12 @@ +#!/bin/bash +# +# Run each test/*.test.js file in a separate process. +# + +TOP=$(cd $(dirname $0)/../ >/dev/null; pwd) + +ls $TOP/test/*.test.js | while read f; do + echo "" + echo "# runnign 'node $f'" + node $f +done