Skip to content
51 changes: 14 additions & 37 deletions apm-lambda-extension/extension/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,50 +18,27 @@
package extension

import (
"log"
"net"
"net/http"
"time"
)

type serverHandler struct {
data chan AgentData
config *extensionConfig
}

func (handler *serverHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/intake/v2/events" {
handleIntakeV2Events(handler, w, r)
return
}

if r.URL.Path == "/" {
handleInfoRequest(handler, w, r)
return
}

// if we have not yet returned, 404
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("404"))

}
var agentDataServer *http.Server

func NewHttpServer(dataChannel chan AgentData, config *extensionConfig) *http.Server {
var handler = serverHandler{data: dataChannel, config: config}
timeout := time.Duration(config.dataReceiverTimeoutSeconds) * time.Second
s := &http.Server{
Addr: config.dataReceiverServerPort,
Handler: &handler,
ReadTimeout: timeout,
WriteTimeout: timeout,
MaxHeaderBytes: 1 << 20,
}
func StartHttpServer(agentDataChan chan AgentData, config *extensionConfig) (err error) {
mux := http.NewServeMux()
mux.HandleFunc("/", handleInfoRequest(config.apmServerUrl))
mux.HandleFunc("/intake/v2/events", handleIntakeV2Events(agentDataChan))
agentDataServer = &http.Server{Addr: config.dataReceiverServerPort, Handler: mux}

addr := s.Addr
ln, err := net.Listen("tcp", addr)
ln, err := net.Listen("tcp", agentDataServer.Addr)
if err != nil {
return s
return
}
go s.Serve(ln)

return s
go func() {
log.Printf("Extension listening for apm data on %s", agentDataServer.Addr)
agentDataServer.Serve(ln)
}()
return nil
}
16 changes: 10 additions & 6 deletions apm-lambda-extension/extension/http_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package extension

import (
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"testing"
Expand Down Expand Up @@ -46,15 +47,17 @@ func TestInfoProxy(t *testing.T) {
apmServerUrl: apmServer.URL,
apmServerSecretToken: "foo",
apmServerApiKey: "bar",
dataReceiverServerPort: "127.0.0.1:1234",
dataReceiverServerPort: ":1234",
dataReceiverTimeoutSeconds: 15,
}
extensionServer := NewHttpServer(dataChannel, &config)
defer extensionServer.Close()

StartHttpServer(dataChannel, &config)
defer agentDataServer.Close()

hosts, _ := net.LookupHost("localhost")
url := "http://" + hosts[0] + ":1234"

// Create a request to send to the extension
client := &http.Client{}
url := "http://" + extensionServer.Addr
req, err := http.NewRequest("GET", url, nil)
if err != nil {
t.Logf("Could not create request")
Expand All @@ -64,9 +67,10 @@ func TestInfoProxy(t *testing.T) {
}

// Send the request to the extension
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
t.Logf("Error fetching %s, [%v]", extensionServer.Addr, err)
t.Logf("Error fetching %s, [%v]", agentDataServer.Addr, err)
t.Fail()
} else {
body, _ := ioutil.ReadAll(resp.Body)
Expand Down
1 change: 1 addition & 0 deletions apm-lambda-extension/extension/process_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
func ProcessShutdown() {
log.Println("Received SHUTDOWN event")
log.Println("Exiting")
agentDataServer.Close()
}

func FlushAPMData(client *http.Client, dataChannel chan AgentData, config *extensionConfig) {
Expand Down
105 changes: 55 additions & 50 deletions apm-lambda-extension/extension/route_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,68 +29,73 @@ type AgentData struct {
}

// URL: http://server/
func handleInfoRequest(handler *serverHandler, w http.ResponseWriter, r *http.Request) {
client := &http.Client{}
func handleInfoRequest(apmServerUrl string) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
client := &http.Client{}

req, err := http.NewRequest(r.Method, handler.config.apmServerUrl, nil)
//forward every header received
for name, values := range r.Header {
// Loop over all values for the name.
for _, value := range values {
req.Header.Set(name, value)
req, err := http.NewRequest(r.Method, apmServerUrl, nil)
//forward every header received
for name, values := range r.Header {
// Loop over all values for the name.
for _, value := range values {
req.Header.Set(name, value)
}
}
if err != nil {
log.Printf("could not create request object for %s:%s: %v", r.Method, apmServerUrl, err)
return
}
}
if err != nil {
log.Printf("could not create request object for %s:%s: %v", r.Method, handler.config.apmServerUrl, err)
return
}

resp, err := client.Do(req)
if err != nil {
log.Printf("error forwarding info request (`/`) to APM Server: %v", err)
return
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("could not read info request response to APM Server: %v", err)
return
}
resp, err := client.Do(req)
if err != nil {
log.Printf("error forwarding info request (`/`) to APM Server: %v", err)
return
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The body shouldn't be massive in size, but a good habit is to operate on the resp.Body as an io.Reader, and use something like io.Copy to keep memory allocation down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I'll change that 👍🏼

if err != nil {
log.Printf("could not read info request response to APM Server: %v", err)
return
}

// send status code
w.WriteHeader(resp.StatusCode)
// send status code
w.WriteHeader(resp.StatusCode)

// send every header received
for name, values := range resp.Header {
// Loop over all values for the name.
for _, value := range values {
w.Header().Add(name, value)
// send every header received
for name, values := range resp.Header {
// Loop over all values for the name.
for _, value := range values {
w.Header().Add(name, value)
}
}
// send body
w.Write([]byte(body))
}
// send body
w.Write([]byte(body))
}

// URL: http://server/intake/v2/events
func handleIntakeV2Events(handler *serverHandler, w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusAccepted)
w.Write([]byte("ok"))
func handleIntakeV2Events(agentDataChan chan AgentData) func(w http.ResponseWriter, r *http.Request) {

if r.Body == nil {
log.Println("Could not get bytes from agent request body")
return
}
return func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusAccepted)
w.Write([]byte("ok"))

rawBytes, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println("Could not read bytes from agent request body")
return
}
if r.Body == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you encounter the body being nil? I took a look at the docs because I was checking to see if you need to close the request body, and it says:

	// For server requests, the Request Body is always non-nil
	// but will return EOF immediately when no body is present.
	// The Server will close the request body. The ServeHTTP
	// Handler does not need to.

so I'm not sure if this nil check is required. I suppose it doesn't hurt, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the docs say it will never be nil, it doesn't seem necessary. I'll remove it :)

log.Println("No body in agent request")
return
}

agentData := AgentData{
Data: rawBytes,
ContentEncoding: r.Header.Get("Content-Encoding"),
rawBytes, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println("Could not read bytes from agent request body")
return
}

agentData := AgentData{
Data: rawBytes,
ContentEncoding: r.Header.Get("Content-Encoding"),
}
log.Println("Adding agent data to buffer to be sent to apm server")
agentDataChan <- agentData
}
log.Println("Adding agent data to buffer to be sent to apm server")
handler.data <- agentData
}
2 changes: 1 addition & 1 deletion apm-lambda-extension/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func main() {
// and get a channel to listen for that data
agentDataChannel := make(chan extension.AgentData, 100)

extension.NewHttpServer(agentDataChannel, config)
extension.StartHttpServer(agentDataChannel, config)

// Make channel for collecting logs and create a HTTP server to listen for them
logsChannel := make(chan logsapi.LogEvent)
Expand Down