Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions apm-lambda-extension/extension/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package extension

import (
"context"
"net"
"net/http"
"time"
Expand All @@ -26,9 +27,9 @@ import (
var agentDataServer *http.Server

// StartHttpServer starts the server listening for APM agent data.
func StartHttpServer(agentDataChan chan AgentData, config *extensionConfig) (err error) {
func StartHttpServer(ctx context.Context, agentDataChan chan AgentData, config *extensionConfig) (err error) {
mux := http.NewServeMux()
mux.HandleFunc("/", handleInfoRequest(config.apmServerUrl))
mux.HandleFunc("/", handleInfoRequest(ctx, config.apmServerUrl, config))
mux.HandleFunc("/intake/v2/events", handleIntakeV2Events(agentDataChan))
timeout := time.Duration(config.dataReceiverTimeoutSeconds) * time.Second
agentDataServer = &http.Server{
Expand All @@ -47,8 +48,11 @@ func StartHttpServer(agentDataChan chan AgentData, config *extensionConfig) (err
go func() {
Log.Infof("Extension listening for apm data on %s", agentDataServer.Addr)
if err = agentDataServer.Serve(ln); err != nil {
Log.Errorf("Error upon APM data server start : %v", err)
return
if err.Error() == "http: Server closed" {
Log.Debug(err)
} else {
Log.Errorf("Error upon APM data server start : %v", err)
}
}
}()
return nil
Expand Down
13 changes: 7 additions & 6 deletions apm-lambda-extension/extension/http_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package extension

import (
"bytes"
"context"
"errors"
"io/ioutil"
"net"
Expand Down Expand Up @@ -59,7 +60,7 @@ func TestInfoProxy(t *testing.T) {
dataReceiverTimeoutSeconds: 15,
}

if err := StartHttpServer(dataChannel, &config); err != nil {
if err := StartHttpServer(context.Background(), dataChannel, &config); err != nil {
t.Fail()
return
}
Expand Down Expand Up @@ -108,7 +109,7 @@ func TestInfoProxyErrorStatusCode(t *testing.T) {
dataReceiverTimeoutSeconds: 15,
}

if err := StartHttpServer(dataChannel, &config); err != nil {
if err := StartHttpServer(context.Background(), dataChannel, &config); err != nil {
t.Fail()
return
}
Expand Down Expand Up @@ -152,7 +153,7 @@ func Test_handleInfoRequest(t *testing.T) {
}

// Start extension server
if err := StartHttpServer(dataChannel, &config); err != nil {
if err := StartHttpServer(context.Background(), dataChannel, &config); err != nil {
t.Fail()
return
}
Expand Down Expand Up @@ -217,7 +218,7 @@ func Test_handleIntakeV2EventsQueryParam(t *testing.T) {
dataReceiverTimeoutSeconds: 15,
}

if err := StartHttpServer(dataChannel, &config); err != nil {
if err := StartHttpServer(context.Background(), dataChannel, &config); err != nil {
t.Fail()
return
}
Expand Down Expand Up @@ -269,7 +270,7 @@ func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) {
dataReceiverTimeoutSeconds: 15,
}

if err := StartHttpServer(dataChannel, &config); err != nil {
if err := StartHttpServer(context.Background(), dataChannel, &config); err != nil {
t.Fail()
return
}
Expand Down Expand Up @@ -313,7 +314,7 @@ func Test_handleIntakeV2EventsQueryParamEmptyData(t *testing.T) {
dataReceiverTimeoutSeconds: 15,
}

if err := StartHttpServer(dataChannel, &config); err != nil {
if err := StartHttpServer(context.Background(), dataChannel, &config); err != nil {
t.Fail()
return
}
Expand Down
23 changes: 21 additions & 2 deletions apm-lambda-extension/extension/route_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package extension

import (
"context"
"io/ioutil"
"net/http"
"net/http/httputil"
"net/url"
"time"
)

type AgentData struct {
Expand All @@ -30,19 +32,30 @@ type AgentData struct {
}

var AgentDoneSignal chan struct{}
var mainExtensionContext context.Context

// URL: http://server/
func handleInfoRequest(apmServerUrl string) func(w http.ResponseWriter, r *http.Request) {
func handleInfoRequest(ctx context.Context, apmServerUrl string, config *extensionConfig) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {

Log.Debug("Handling APM Server Info Request")
mainExtensionContext = ctx

// Init reverse proxy
parsedApmServerUrl, err := url.Parse(apmServerUrl)
if err != nil {
Log.Errorf("could not parse APM server URL: %v", err)
return
}

reverseProxy := httputil.NewSingleHostReverseProxy(parsedApmServerUrl)

reverseProxyTimeout := time.Duration(config.DataForwarderTimeoutSeconds) * time.Second
reverseProxy.Transport = http.DefaultTransport
Copy link
Contributor

@simitt simitt Apr 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller of httputil.NewSingleHostReverseProxy(parsedApmServerUrl) should not need to have any knowledge about internas of how the reverseProxy is built. With this change the whole transport of the reverseProxy is still switched out; if any other settings (TLS, further timeouts etc) were set in the proxy they would be lost. What I originally meant, was that only the timeout should be customized here, or even better, be passed into the abstracting httputil.NewSingleHostReverseProxy method.

reverseProxy.Transport.(*http.Transport).ResponseHeaderTimeout = reverseProxyTimeout

reverseProxy.ErrorHandler = reverseProxyErrorHandler

// Process request (the Golang doc suggests removing any pre-existing X-Forwarded-For header coming
// from the client or an untrusted proxy to prevent IP spoofing : https://pkg.go.dev/net/http/httputil#ReverseProxy
r.Header.Del("X-Forwarded-For")
Expand All @@ -58,10 +71,16 @@ func handleInfoRequest(apmServerUrl string) func(w http.ResponseWriter, r *http.
}
}

func reverseProxyErrorHandler(res http.ResponseWriter, req *http.Request, err error) {
SetApmServerTransportState(Failing, mainExtensionContext)
Log.Errorf("Error querying version from the APM Server: %v", err)
}

// URL: http://server/intake/v2/events
func handleIntakeV2Events(agentDataChan chan AgentData) func(w http.ResponseWriter, r *http.Request) {

return func(w http.ResponseWriter, r *http.Request) {

Log.Debug("Handling APM Data Intake")
rawBytes, err := ioutil.ReadAll(r.Body)
defer r.Body.Close()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion apm-lambda-extension/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func main() {
agentDataChannel := make(chan extension.AgentData, 100)

// Start http server to receive data from agent
if err = extension.StartHttpServer(agentDataChannel, config); err != nil {
if err = extension.StartHttpServer(ctx, agentDataChannel, config); err != nil {
extension.Log.Errorf("Could not start APM data receiver : %v", err)
}

Expand Down
Loading