diff --git a/apm-lambda-extension/extension/http_server.go b/apm-lambda-extension/extension/http_server.go index 30bd45a8..3df5d9e3 100644 --- a/apm-lambda-extension/extension/http_server.go +++ b/apm-lambda-extension/extension/http_server.go @@ -18,50 +18,27 @@ package extension import ( + "log" "net" "net/http" - "time" ) -type serverHandler struct { - data chan AgentData - config *extensionConfig -} - -func (handler *serverHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if r.URL.Path == "/intake/v2/events" { - handleIntakeV2Events(handler, w, r) - return - } - - if r.URL.Path == "/" { - handleInfoRequest(handler, w, r) - return - } - - // if we have not yet returned, 404 - w.WriteHeader(http.StatusNotFound) - w.Write([]byte("404")) - -} +var agentDataServer *http.Server -func NewHttpServer(dataChannel chan AgentData, config *extensionConfig) *http.Server { - var handler = serverHandler{data: dataChannel, config: config} - timeout := time.Duration(config.dataReceiverTimeoutSeconds) * time.Second - s := &http.Server{ - Addr: config.dataReceiverServerPort, - Handler: &handler, - ReadTimeout: timeout, - WriteTimeout: timeout, - MaxHeaderBytes: 1 << 20, - } +func StartHttpServer(agentDataChan chan AgentData, config *extensionConfig) (err error) { + mux := http.NewServeMux() + mux.HandleFunc("/", handleInfoRequest(config.apmServerUrl)) + mux.HandleFunc("/intake/v2/events", handleIntakeV2Events(agentDataChan)) + agentDataServer = &http.Server{Addr: config.dataReceiverServerPort, Handler: mux} - addr := s.Addr - ln, err := net.Listen("tcp", addr) + ln, err := net.Listen("tcp", agentDataServer.Addr) if err != nil { - return s + return } - go s.Serve(ln) - return s + go func() { + log.Printf("Extension listening for apm data on %s", agentDataServer.Addr) + agentDataServer.Serve(ln) + }() + return nil } diff --git a/apm-lambda-extension/extension/http_server_test.go b/apm-lambda-extension/extension/http_server_test.go index f765d875..3a914cd8 100644 --- a/apm-lambda-extension/extension/http_server_test.go +++ b/apm-lambda-extension/extension/http_server_test.go @@ -19,6 +19,7 @@ package extension import ( "io/ioutil" + "net" "net/http" "net/http/httptest" "testing" @@ -46,15 +47,17 @@ func TestInfoProxy(t *testing.T) { apmServerUrl: apmServer.URL, apmServerSecretToken: "foo", apmServerApiKey: "bar", - dataReceiverServerPort: "127.0.0.1:1234", + dataReceiverServerPort: ":1234", dataReceiverTimeoutSeconds: 15, } - extensionServer := NewHttpServer(dataChannel, &config) - defer extensionServer.Close() + + StartHttpServer(dataChannel, &config) + defer agentDataServer.Close() + + hosts, _ := net.LookupHost("localhost") + url := "http://" + hosts[0] + ":1234" // Create a request to send to the extension - client := &http.Client{} - url := "http://" + extensionServer.Addr req, err := http.NewRequest("GET", url, nil) if err != nil { t.Logf("Could not create request") @@ -64,9 +67,10 @@ func TestInfoProxy(t *testing.T) { } // Send the request to the extension + client := &http.Client{} resp, err := client.Do(req) if err != nil { - t.Logf("Error fetching %s, [%v]", extensionServer.Addr, err) + t.Logf("Error fetching %s, [%v]", agentDataServer.Addr, err) t.Fail() } else { body, _ := ioutil.ReadAll(resp.Body) diff --git a/apm-lambda-extension/extension/process_events.go b/apm-lambda-extension/extension/process_events.go index 1debb836..e1585bf1 100644 --- a/apm-lambda-extension/extension/process_events.go +++ b/apm-lambda-extension/extension/process_events.go @@ -26,6 +26,7 @@ import ( func ProcessShutdown() { log.Println("Received SHUTDOWN event") log.Println("Exiting") + agentDataServer.Close() } func FlushAPMData(client *http.Client, dataChannel chan AgentData, config *extensionConfig) { diff --git a/apm-lambda-extension/extension/route_handlers.go b/apm-lambda-extension/extension/route_handlers.go index 60d4373c..865fe1d2 100644 --- a/apm-lambda-extension/extension/route_handlers.go +++ b/apm-lambda-extension/extension/route_handlers.go @@ -29,68 +29,73 @@ type AgentData struct { } // URL: http://server/ -func handleInfoRequest(handler *serverHandler, w http.ResponseWriter, r *http.Request) { - client := &http.Client{} +func handleInfoRequest(apmServerUrl string) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + client := &http.Client{} - req, err := http.NewRequest(r.Method, handler.config.apmServerUrl, nil) - //forward every header received - for name, values := range r.Header { - // Loop over all values for the name. - for _, value := range values { - req.Header.Set(name, value) + req, err := http.NewRequest(r.Method, apmServerUrl, nil) + //forward every header received + for name, values := range r.Header { + // Loop over all values for the name. + for _, value := range values { + req.Header.Set(name, value) + } + } + if err != nil { + log.Printf("could not create request object for %s:%s: %v", r.Method, apmServerUrl, err) + return } - } - if err != nil { - log.Printf("could not create request object for %s:%s: %v", r.Method, handler.config.apmServerUrl, err) - return - } - resp, err := client.Do(req) - if err != nil { - log.Printf("error forwarding info request (`/`) to APM Server: %v", err) - return - } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - log.Printf("could not read info request response to APM Server: %v", err) - return - } + resp, err := client.Do(req) + if err != nil { + log.Printf("error forwarding info request (`/`) to APM Server: %v", err) + return + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Printf("could not read info request response to APM Server: %v", err) + return + } - // send status code - w.WriteHeader(resp.StatusCode) + // send status code + w.WriteHeader(resp.StatusCode) - // send every header received - for name, values := range resp.Header { - // Loop over all values for the name. - for _, value := range values { - w.Header().Add(name, value) + // send every header received + for name, values := range resp.Header { + // Loop over all values for the name. + for _, value := range values { + w.Header().Add(name, value) + } } + // send body + w.Write([]byte(body)) } - // send body - w.Write([]byte(body)) } // URL: http://server/intake/v2/events -func handleIntakeV2Events(handler *serverHandler, w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusAccepted) - w.Write([]byte("ok")) +func handleIntakeV2Events(agentDataChan chan AgentData) func(w http.ResponseWriter, r *http.Request) { - if r.Body == nil { - log.Println("Could not get bytes from agent request body") - return - } + return func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusAccepted) + w.Write([]byte("ok")) - rawBytes, err := ioutil.ReadAll(r.Body) - if err != nil { - log.Println("Could not read bytes from agent request body") - return - } + if r.Body == nil { + log.Println("No body in agent request") + return + } - agentData := AgentData{ - Data: rawBytes, - ContentEncoding: r.Header.Get("Content-Encoding"), + rawBytes, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Println("Could not read bytes from agent request body") + return + } + + agentData := AgentData{ + Data: rawBytes, + ContentEncoding: r.Header.Get("Content-Encoding"), + } + log.Println("Adding agent data to buffer to be sent to apm server") + agentDataChan <- agentData } - log.Println("Adding agent data to buffer to be sent to apm server") - handler.data <- agentData } diff --git a/apm-lambda-extension/main.go b/apm-lambda-extension/main.go index 421d3231..42d1cc45 100644 --- a/apm-lambda-extension/main.go +++ b/apm-lambda-extension/main.go @@ -64,7 +64,7 @@ func main() { // and get a channel to listen for that data agentDataChannel := make(chan extension.AgentData, 100) - extension.NewHttpServer(agentDataChannel, config) + extension.StartHttpServer(agentDataChannel, config) // Make channel for collecting logs and create a HTTP server to listen for them logsChannel := make(chan logsapi.LogEvent)