-
Notifications
You must be signed in to change notification settings - Fork 35
Preserve compressed agent data and compress agent data that is not already compressed #72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
💚 Build Succeeded
Expand to view the summary
Build stats
Test stats 🧪
🤖 GitHub commentsTo re-run your PR in the CI, just comment with:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a great start and the general shape of things looks correct to me. Two things to address and (I think) we'll be set
-
I've not done it before but Felix's suggestion of using an
io.Pipe()
to avoid thecompressedBytes
buffer variable is sound -
I have a question about error handling in the new code that reads the agent request body that might lead to a little extra work (or an explanation)
if r.Body == nil { | ||
log.Println("Could not get bytes from agent request body") | ||
} else { | ||
rawBytes, _ := ioutil.ReadAll(r.Body) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like ioutil.ReadAll
doesn't promise success. Should we be handling the error here instead of suppressing with an _
? Or is that not needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll ask what the apm server does, as we should probably handle the error similarly to how they would.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/elastic/apm-server/blob/master/beater/api/intake/handler.go#L72-L100
If the apm server can't read the request, it doesn't log the error but rather writes the error back to the client. I'm not sure we would want to write errors back to the agent, as the agent can't do anything more with the information than the extension can, and we don't want to hold up the lambda function. It seems to me like the best way to handle this would be to log the error and move on. @astorm what do you think? Maybe @felixbarny has some thoughts as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM. If we find use cases that require sending a response to the agent, we can always add it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@estolfo logging the error and moving on seem sufficient to me as well.
@felixbarny If the extension tries to compress non-compressed agent data and fails, what should the extension do? Send the data non-compressed? |
The compression happens while sending data to APM Server. That means if an error happens, some data might have already been sent to the Server. I think the primary source of errors would be network issues rather than issues related to compressing the data. In these cases, I think it's fine to log the error and discard the data. |
gw.Close() | ||
pw.Close() | ||
if err != nil { | ||
log.Printf("Failed to compress data: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor suggestion]
IIUC, this function will only be executed once the request has been established and is then streamed (via chunked-encoding) to the server.
If that's correct, the message may be a bit misleading as the more common source of failure is that there's a network issue when streaming the data to APM Server.
log.Printf("Failed to compress data: %v", err) | |
log.Printf("Failed to send compressed data: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are actually not streaming data yet, but rather sending in batches. At this point in the code, the actual failure is that the data could not be compressed in a go routine. If compression failed in that go routine, we will also get an error that is logged on line 52 (failed to create a new request when posting to APM server
) and the function will return. So I think the error message here is accurate and should stay as-is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this streams the batches. See also https://medium.com/stupid-gopher-tricks/streaming-data-in-go-without-buffering-3285ddd2a1e5#2342
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you're right that the actual send could be chunked by the go internal transport code, I thought you were referring to the way that we send the data as it's received from the agent. But at this point in the code, I'm calling io.Copy
, which ends up calling Write on the gzipWriter
before it gets to the network code. So any compression failures would be returned by io.Copy
. Network errors would be returned by resp, err := client.Do(req).
But that actually brings up another point: the error returned from client.Do(req)
actually wraps the compression error, which means we don't really need to do anything with the error returned from io.Copy
. In other words, we could removing the logging of the error and just rely on the error returned in this line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm calling io.Copy, which ends up calling Write on the gzipWriter before it gets to the network code.
What I'm understanding from the blog that I linked is that io.Copy
blocks until the PipeReader
is being read from. This happens when client.Do(req)
is invoked and the data from the PipeReader
is written into the HTTP request body via chunked encoding. It's because io.Copy
blocks that we have to execute it in a concurrently running goroutine. If we didn't do that, we'd never get to the client.Do(req)
part.
See also the docs for io.Pipe
each Write to the PipeWriter blocks until it has satisfied one or more Reads from the PipeReader that fully consume the written data. The data is copied directly from the Write to the corresponding Read (or Reads); there is no internal buffering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy to approve once we have logging for the error case on
rawBytes, _ := ioutil.ReadAll(r.Body)
* Adding local lambdas and execution script * Change current transaction name instead of creating a new one Also added an env variable to toggle output, and modified the test flow to make sure that the right permissions are always set for the java agent layer * Parallelize Lambda execution * Replace Elasticsearch by a mock APM server * Cleanup env variable checks and go.mod * Change test name and write Lambda paths as variables * Make the Java APM Agent version an env. variable * Remove test files and fix folder detection * Improve request response decoding (Based on PR #72) * Refactor channel use to avoid test block by a single lambda * Make the tests single-language and fix Gradle * Set the default config values * Fix default values * Add timer defer and add units to doc * Add tolerance for Uppercase, but set the documented language value to lowercase. * Add supported languages in Panic message * Replace"node" by "nodejs" * Print the UUID * Variable/Function name refactor * Return empty string upon timeout and add new line to server log
This PR resolve #71
The changes include:
AgentData
structgetDecompressedBytesFromRequest
function, as we won't need it anymoregetDecompressedBytesFromRequest
function