diff --git a/beater/pub.go b/beater/pub.go index 4425b27360..c5c4bbe2cc 100644 --- a/beater/pub.go +++ b/beater/pub.go @@ -43,6 +43,8 @@ func newPublisher(pipeline beat.Pipeline, N int) (*publisher, error) { // TODO: We want to wait for events in pipeline on shutdown? // If set >0 `Close` will block for the duration or until pipeline is empty WaitClose: 0, + + SkipNormalization: true, }) if err != nil { return nil, err diff --git a/docs/data/elasticsearch/transaction.json b/docs/data/elasticsearch/transaction.json index 22137fc49c..3ef57b0521 100644 --- a/docs/data/elasticsearch/transaction.json +++ b/docs/data/elasticsearch/transaction.json @@ -20,7 +20,13 @@ "title": "node" }, "request": { - "body": "Hello World", + "body": { + "additional": { + "bar": 123, + "req": "additional information" + }, + "str": "hello world" + }, "cookies": { "c1": "v1", "c2": "v2" @@ -111,10 +117,9 @@ "id": "945254c5-67a5-417e-8a4e-aa29efcbfb79", "marks": { "navigationTiming": { - "appBeforeBootstrap": 608.9300000000001, + "appBeforeBootstrap": 608.930000, "navigationStart": -21 - }, - "performance": {} + } }, "name": "GET /api/types", "result": "success", diff --git a/docs/data/intake-api/generated/transaction/payload.json b/docs/data/intake-api/generated/transaction/payload.json index 2e378452a1..0daa78c6fb 100644 --- a/docs/data/intake-api/generated/transaction/payload.json +++ b/docs/data/intake-api/generated/transaction/payload.json @@ -85,7 +85,14 @@ "SERVER_SOFTWARE": "nginx", "GATEWAY_INTERFACE": "CGI/1.1" }, - "body": "Hello World" + "body": { + "str": "hello world", + "additional": { + "foo": {}, + "bar": 123, + "req": "additional information" + } + } }, "response": { "status_code": 200, diff --git a/model/context.go b/model/context.go new file mode 100644 index 0000000000..4931517682 --- /dev/null +++ b/model/context.go @@ -0,0 +1,38 @@ +package model + +import ( + "github.com/elastic/apm-server/utility" + "github.com/elastic/beats/libbeat/common" +) + +type Context struct { + service common.MapStr + process common.MapStr + system common.MapStr + user common.MapStr +} + +func NewContext(service *Service, process *Process, system *System, user common.MapStr) *Context { + return &Context{ + service: service.Transform(), + process: process.Transform(), + system: system.Transform(), + user: user, + } +} + +func (c *Context) Transform(m common.MapStr) common.MapStr { + if m == nil { + m = common.MapStr{} + } else { + for k, v := range m { + // normalize map entries by calling utility.Add + utility.Add(m, k, v) + } + } + utility.Add(m, "service", c.service) + utility.Add(m, "process", c.process) + utility.Add(m, "system", c.system) + utility.MergeAdd(m, "user", c.user) + return m +} diff --git a/model/context_test.go b/model/context_test.go new file mode 100644 index 0000000000..974962e616 --- /dev/null +++ b/model/context_test.go @@ -0,0 +1,103 @@ +package model + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" +) + +var ip = "127.0.0.1" + +func TestContext(t *testing.T) { + tests := []struct { + process *Process + system *System + service *Service + user common.MapStr + context *Context + }{ + { + process: nil, + system: nil, + service: nil, + user: nil, + context: &Context{}, + }, + { + process: &Process{}, + system: &System{}, + service: &Service{}, + user: common.MapStr{}, + context: &Context{ + process: common.MapStr{"pid": 0}, + service: common.MapStr{"name": "", "agent": common.MapStr{"version": "", "name": ""}}, + system: common.MapStr{}, + user: common.MapStr{}, + }, + }, + { + process: &Process{Pid: 123}, + system: &System{IP: &ip}, + service: &Service{Name: "service"}, + user: common.MapStr{"id": 456}, + context: &Context{ + process: common.MapStr{"pid": 123}, + system: common.MapStr{"ip": ip}, + service: common.MapStr{"name": "service", "agent": common.MapStr{"version": "", "name": ""}}, + user: common.MapStr{"id": 456}, + }, + }, + } + + for idx, te := range tests { + ctx := NewContext(te.service, te.process, te.system, te.user) + assert.Equal(t, te.context, ctx, + fmt.Sprintf("<%v> Expected: %v, Actual: %v", idx, te.context, ctx)) + } + +} + +func TestContextTransform(t *testing.T) { + + tests := []struct { + context *Context + m common.MapStr + out common.MapStr + }{ + { + context: &Context{}, + m: common.MapStr{}, + out: common.MapStr{}, + }, + { + context: &Context{}, + m: common.MapStr{"user": common.MapStr{"id": 123}}, + out: common.MapStr{"user": common.MapStr{"id": 123}}, + }, + { + context: &Context{ + process: common.MapStr{"pid": 123}, + system: common.MapStr{"ip": ip}, + service: common.MapStr{"name": "service", "agent": common.MapStr{"version": "", "name": ""}}, + user: common.MapStr{"id": 456}, + }, + m: common.MapStr{"foo": "bar", "user": common.MapStr{"id": 123, "username": "foo"}}, + out: common.MapStr{ + "foo": "bar", + "user": common.MapStr{"id": 456, "username": "foo"}, + "process": common.MapStr{"pid": 123}, + "system": common.MapStr{"ip": ip}, + "service": common.MapStr{"name": "service", "agent": common.MapStr{"version": "", "name": ""}}, + }, + }, + } + + for idx, te := range tests { + out := te.context.Transform(te.m) + assert.Equal(t, te.out, out, + fmt.Sprintf("<%v> Expected: %v, Actual: %v", idx, te.out, out)) + } +} diff --git a/model/process.go b/model/process.go index fa4b7e3188..f084a60766 100644 --- a/model/process.go +++ b/model/process.go @@ -12,8 +12,6 @@ type Process struct { Argv []string } -type TransformProcess func(a *Process) common.MapStr - func (p *Process) Transform() common.MapStr { if p == nil { return nil diff --git a/model/process_test.go b/model/process_test.go index faba26ed35..b54829f52d 100644 --- a/model/process_test.go +++ b/model/process_test.go @@ -8,12 +8,6 @@ import ( "github.com/elastic/beats/libbeat/common" ) -func TestProcessTransformDefinition(t *testing.T) { - myfn := func(fn TransformProcess) string { return "ok" } - res := myfn((*Process).Transform) - assert.Equal(t, "ok", res) -} - func TestProcessTransform(t *testing.T) { pid := 1234 processTitle := "node" diff --git a/model/service.go b/model/service.go index 6dcb1733bb..f5d19279d6 100644 --- a/model/service.go +++ b/model/service.go @@ -32,20 +32,22 @@ type Agent struct { Version string } -type TransformService func(a *Service) common.MapStr - func (s *Service) MinimalTransform() common.MapStr { - svc := common.MapStr{ - "name": s.Name, - "agent": common.MapStr{ - "name": s.Agent.Name, - "version": s.Agent.Version, - }, + if s == nil { + return nil } + svc := common.MapStr{"name": s.Name} + agent := common.MapStr{} + utility.Add(agent, "name", s.Agent.Name) + utility.Add(agent, "version", s.Agent.Version) + utility.Add(svc, "agent", agent) return svc } func (s *Service) Transform() common.MapStr { + if s == nil { + return nil + } svc := s.MinimalTransform() utility.Add(svc, "version", s.Version) utility.Add(svc, "environment", s.Environment) diff --git a/model/service_test.go b/model/service_test.go index 628702e67d..80ab962eec 100644 --- a/model/service_test.go +++ b/model/service_test.go @@ -8,12 +8,6 @@ import ( "github.com/elastic/beats/libbeat/common" ) -func TestServiceTransformDefinition(t *testing.T) { - myfn := func(fn TransformService) string { return "ok" } - res := myfn((*Service).Transform) - assert.Equal(t, "ok", res) -} - func TestServiceTransform(t *testing.T) { version := "5.1.3" diff --git a/model/system.go b/model/system.go index 6e33676245..f48902401c 100644 --- a/model/system.go +++ b/model/system.go @@ -21,6 +21,5 @@ func (s *System) Transform() common.MapStr { utility.Add(system, "architecture", s.Architecture) utility.Add(system, "platform", s.Platform) utility.Add(system, "ip", s.IP) - return system } diff --git a/processor/error/event.go b/processor/error/event.go index a6ed9d3b8a..5e1819f659 100644 --- a/processor/error/event.go +++ b/processor/error/event.go @@ -62,15 +62,6 @@ func (e *Event) Transform(config *pr.Config, service m.Service) common.MapStr { return e.data } -// This updates the event in place -func (e *Event) contextTransform(pa *payload) common.MapStr { - if e.Context == nil { - e.Context = make(map[string]interface{}) - } - utility.InsertInMap(e.Context, "user", pa.User) - return e.Context -} - func (e *Event) updateCulprit(config *pr.Config) { if config == nil || config.SmapMapper == nil { return diff --git a/processor/error/package_tests/TestProcessErrorNullValues.approved.json b/processor/error/package_tests/TestProcessErrorNullValues.approved.json index 49db3dcb0c..5412dce823 100644 --- a/processor/error/package_tests/TestProcessErrorNullValues.approved.json +++ b/processor/error/package_tests/TestProcessErrorNullValues.approved.json @@ -42,22 +42,7 @@ "some_other_value": "foo bar" }, "request": { - "headers": { - "content-type": null, - "cookie": null, - "user-agent": null - }, - "method": "POST", - "socket": { - "encrypted": null, - "remote_address": null - }, - "url": {} - }, - "response": { - "headers": { - "content-type": null - } + "method": "POST" }, "service": { "agent": { @@ -87,31 +72,8 @@ { "@timestamp": "2017-05-09T15:04:05Z", "context": { - "custom": null, "request": { - "body": null, - "cookies": null, - "env": null, - "headers": null, - "http_version": null, - "method": "POST", - "socket": null, - "url": { - "full": null, - "hash": null, - "hostname": null, - "pathname": null, - "port": null, - "protocol": null, - "raw": null, - "search": null - } - }, - "response": { - "finished": null, - "headers": null, - "headers_sent": null, - "status_code": null + "method": "POST" }, "service": { "agent": { @@ -122,11 +84,6 @@ "name": "ruby" }, "name": "1234_service-12a3" - }, - "user": { - "email": null, - "id": null, - "username": null } }, "error": { @@ -143,9 +100,6 @@ { "@timestamp": "2017-05-09T15:04:05.999Z", "context": { - "custom": null, - "request": null, - "response": null, "service": { "agent": { "name": "ruby", @@ -155,9 +109,7 @@ "name": "ruby" }, "name": "1234_service-12a3" - }, - "tags": null, - "user": null + } }, "error": { "grouping_key": "d6b3f958dfea98dc9ed2b57d5f0c48bb", diff --git a/processor/error/payload.go b/processor/error/payload.go index 2d229ab54a..5bccdcbf1f 100644 --- a/processor/error/payload.go +++ b/processor/error/payload.go @@ -3,7 +3,6 @@ package error import ( m "github.com/elastic/apm-server/model" pr "github.com/elastic/apm-server/processor" - "github.com/elastic/apm-server/utility" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -19,28 +18,20 @@ type payload struct { Service m.Service System *m.System Process *m.Process - Events []Event `mapstructure:"errors"` - User map[string]interface{} + User common.MapStr + + Events []Event `mapstructure:"errors"` } func (pa *payload) transform(config *pr.Config) []beat.Event { var events []beat.Event - service := pa.Service.Transform() - system := pa.System.Transform() - process := pa.Process.Transform() + context := m.NewContext(&pa.Service, pa.Process, pa.System, pa.User) logp.NewLogger("transform").Debugf("Transform error events: events=%d, service=%s, agent=%s:%s", len(pa.Events), pa.Service.Name, pa.Service.Agent.Name, pa.Service.Agent.Version) errorCounter.Add(int64(len(pa.Events))) for _, event := range pa.Events { - context := event.contextTransform(pa) - if context == nil { - context = common.MapStr{} - } - utility.Add(context, "service", service) - utility.Add(context, "system", system) - utility.Add(context, "process", process) - + context := context.Transform(event.Context) ev := beat.Event{ Fields: common.MapStr{ "processor": processorEntry, diff --git a/processor/transaction/event.go b/processor/transaction/event.go index 2090c1fbff..33d8190e67 100644 --- a/processor/transaction/event.go +++ b/processor/transaction/event.go @@ -51,12 +51,3 @@ func (t *Event) Transform() common.MapStr { } return tx } - -// This updates the event in place -func (t *Event) contextTransform(pa *payload) common.MapStr { - if t.Context == nil { - t.Context = make(map[string]interface{}) - } - utility.InsertInMap(t.Context, "user", pa.User) - return t.Context -} diff --git a/processor/transaction/package_tests/TestProcessTransactionFrontend.approved.json b/processor/transaction/package_tests/TestProcessTransactionFrontend.approved.json index 966622cfbd..a17cc34e4a 100644 --- a/processor/transaction/package_tests/TestProcessTransactionFrontend.approved.json +++ b/processor/transaction/package_tests/TestProcessTransactionFrontend.approved.json @@ -3,7 +3,6 @@ { "@timestamp": "2017-12-08T12:52:53.681Z", "context": { - "_debug": {}, "_metrics": { "connectEnd": 14, "connectStart": 14, diff --git a/processor/transaction/package_tests/TestProcessTransactionFull.approved.json b/processor/transaction/package_tests/TestProcessTransactionFull.approved.json index e91bdbf9f6..9d8e0aa3b2 100644 --- a/processor/transaction/package_tests/TestProcessTransactionFull.approved.json +++ b/processor/transaction/package_tests/TestProcessTransactionFull.approved.json @@ -23,7 +23,13 @@ "title": "node" }, "request": { - "body": "Hello World", + "body": { + "additional": { + "bar": 123, + "req": "additional information" + }, + "str": "hello world" + }, "cookies": { "c1": "v1", "c2": "v2" @@ -114,10 +120,9 @@ "id": "945254c5-67a5-417e-8a4e-aa29efcbfb79", "marks": { "navigationTiming": { - "appBeforeBootstrap": 608.9300000000001, + "appBeforeBootstrap": 608.93, "navigationStart": -21 - }, - "performance": {} + } }, "name": "GET /api/types", "result": "success", diff --git a/processor/transaction/package_tests/TestProcessTransactionNullValues.approved.json b/processor/transaction/package_tests/TestProcessTransactionNullValues.approved.json index 66c4409d9e..36c945e4b3 100644 --- a/processor/transaction/package_tests/TestProcessTransactionNullValues.approved.json +++ b/processor/transaction/package_tests/TestProcessTransactionNullValues.approved.json @@ -31,9 +31,6 @@ { "@timestamp": "2017-05-30T18:53:42Z", "context": { - "custom": null, - "request": null, - "response": null, "service": { "agent": { "name": "ruby", @@ -43,9 +40,7 @@ "name": "ruby" }, "name": "1234_service-12a3" - }, - "tags": null, - "user": null + } }, "processor": { "event": "transaction", @@ -63,31 +58,8 @@ { "@timestamp": "2017-05-30T18:53:42.281999Z", "context": { - "custom": null, "request": { - "body": null, - "cookies": null, - "env": null, - "headers": null, - "http_version": null, - "method": "POST", - "socket": null, - "url": { - "full": null, - "hash": null, - "hostname": null, - "pathname": null, - "port": null, - "protocol": null, - "raw": null, - "search": null - } - }, - "response": { - "finished": null, - "headers": null, - "headers_sent": null, - "status_code": null + "method": "POST" }, "service": { "agent": { @@ -98,11 +70,6 @@ "name": "ruby" }, "name": "1234_service-12a3" - }, - "user": { - "email": null, - "id": null, - "username": null } }, "processor": { @@ -132,22 +99,7 @@ "some_other_value": "foo bar" }, "request": { - "headers": { - "content-type": null, - "cookie": null, - "user-agent": null - }, - "method": "POST", - "socket": { - "encrypted": null, - "remote_address": null - }, - "url": {} - }, - "response": { - "headers": { - "content-type": null - } + "method": "POST" }, "service": { "agent": { @@ -208,7 +160,6 @@ { "@timestamp": "2017-05-30T18:53:42.281Z", "context": { - "db": null, "service": { "agent": { "name": "ruby", @@ -247,12 +198,6 @@ { "@timestamp": "2017-05-30T18:53:42.281Z", "context": { - "db": { - "instance": null, - "statement": null, - "type": null, - "user": null - }, "service": { "agent": { "name": "ruby", diff --git a/processor/transaction/package_tests/json_schema_test.go b/processor/transaction/package_tests/json_schema_test.go index bb37f42ac9..012c74db22 100644 --- a/processor/transaction/package_tests/json_schema_test.go +++ b/processor/transaction/package_tests/json_schema_test.go @@ -20,6 +20,11 @@ func TestPayloadAttributesInSchema(t *testing.T) { "transactions.context.request.env.SERVER_SOFTWARE", "transactions.context.request.env.GATEWAY_INTERFACE", "transactions.context.request.body", + "transactions.context.request.body.str", + "transactions.context.request.body.additional", + "transactions.context.request.body.additional.foo", + "transactions.context.request.body.additional.bar", + "transactions.context.request.body.additional.req", "transactions.context.request.cookies.c1", "transactions.context.request.cookies.c2", "transactions.context.custom", diff --git a/processor/transaction/payload.go b/processor/transaction/payload.go index a2fed2e1e2..200a86fd23 100644 --- a/processor/transaction/payload.go +++ b/processor/transaction/payload.go @@ -3,7 +3,6 @@ package transaction import ( m "github.com/elastic/apm-server/model" pr "github.com/elastic/apm-server/processor" - "github.com/elastic/apm-server/utility" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -22,33 +21,25 @@ type payload struct { System *m.System Process *m.Process Events []Event `mapstructure:"transactions"` - User map[string]interface{} + Context common.MapStr + User common.MapStr } func (pa *payload) transform(config *pr.Config) []beat.Event { var events []beat.Event - spanService := pa.Service.MinimalTransform() - service := pa.Service.Transform() - system := pa.System.Transform() - process := pa.Process.Transform() + context := m.NewContext(&pa.Service, pa.Process, pa.System, pa.User) + spanContext := NewSpanContext(&pa.Service) logp.NewLogger("transaction").Debugf("Transform transaction events: events=%d, service=%s, agent=%s:%s", len(pa.Events), pa.Service.Name, pa.Service.Agent.Name, pa.Service.Agent.Version) transactionCounter.Add(int64(len(pa.Events))) for _, event := range pa.Events { - context := event.contextTransform(pa) - if context == nil { - context = common.MapStr{} - } - utility.Add(context, "service", service) - utility.Add(context, "system", system) - utility.Add(context, "process", process) ev := beat.Event{ Fields: common.MapStr{ "processor": processorTransEntry, transactionDocType: event.Transform(), - "context": context, + "context": context.Transform(event.Context), }, Timestamp: event.Timestamp, } @@ -57,18 +48,12 @@ func (pa *payload) transform(config *pr.Config) []beat.Event { trId := common.MapStr{"id": event.Id} spanCounter.Add(int64(len(event.Spans))) for _, sp := range event.Spans { - c := sp.Context - if c == nil && spanService != nil { - c = common.MapStr{} - } - utility.Add(c, "service", spanService) - ev := beat.Event{ Fields: common.MapStr{ "processor": processorSpanEntry, spanDocType: sp.Transform(config, pa.Service), "transaction": trId, - "context": c, + "context": spanContext.Transform(sp.Context), }, Timestamp: event.Timestamp, } diff --git a/processor/transaction/span_context.go b/processor/transaction/span_context.go new file mode 100644 index 0000000000..5bf6302124 --- /dev/null +++ b/processor/transaction/span_context.go @@ -0,0 +1,28 @@ +package transaction + +import ( + m "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/utility" + "github.com/elastic/beats/libbeat/common" +) + +type SpanContext struct { + service common.MapStr +} + +func NewSpanContext(service *m.Service) *SpanContext { + return &SpanContext{service: service.MinimalTransform()} +} + +func (c *SpanContext) Transform(m common.MapStr) common.MapStr { + if m == nil { + m = common.MapStr{} + } else { + for k, v := range m { + // normalize map entries by calling utility.Add + utility.Add(m, k, v) + } + } + utility.Add(m, "service", c.service) + return m +} diff --git a/processor/transaction/span_context_test.go b/processor/transaction/span_context_test.go new file mode 100644 index 0000000000..401b3cec97 --- /dev/null +++ b/processor/transaction/span_context_test.go @@ -0,0 +1,78 @@ +package transaction + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/apm-server/model" + "github.com/elastic/beats/libbeat/common" +) + +func TestSpanContext(t *testing.T) { + tests := []struct { + service *model.Service + context *SpanContext + }{ + { + service: nil, + context: &SpanContext{}, + }, + { + service: &model.Service{}, + context: &SpanContext{ + service: common.MapStr{"name": "", "agent": common.MapStr{"version": "", "name": ""}}, + }, + }, + { + service: &model.Service{Name: "service"}, + context: &SpanContext{ + service: common.MapStr{"name": "service", "agent": common.MapStr{"version": "", "name": ""}}, + }, + }, + } + + for idx, te := range tests { + ctx := NewSpanContext(te.service) + assert.Equal(t, te.context, ctx, + fmt.Sprintf("<%v> Expected: %v, Actual: %v", idx, te.context, ctx)) + } +} + +func TestSpanContextTransform(t *testing.T) { + + tests := []struct { + context *SpanContext + m common.MapStr + out common.MapStr + }{ + { + context: &SpanContext{}, + m: common.MapStr{}, + out: common.MapStr{}, + }, + { + context: &SpanContext{}, + m: common.MapStr{"user": common.MapStr{"id": 123}}, + out: common.MapStr{"user": common.MapStr{"id": 123}}, + }, + { + context: &SpanContext{ + service: common.MapStr{"name": "service", "agent": common.MapStr{"version": "", "name": ""}}, + }, + m: common.MapStr{"foo": "bar", "user": common.MapStr{"id": 123, "username": "foo"}}, + out: common.MapStr{ + "foo": "bar", + "user": common.MapStr{"id": 123, "username": "foo"}, + "service": common.MapStr{"name": "service", "agent": common.MapStr{"version": "", "name": ""}}, + }, + }, + } + + for idx, te := range tests { + out := te.context.Transform(te.m) + assert.Equal(t, te.out, out, + fmt.Sprintf("<%v> Expected: %v, Actual: %v", idx, te.out, out)) + } +} diff --git a/tests/data/valid/transaction/payload.json b/tests/data/valid/transaction/payload.json index 2e378452a1..0daa78c6fb 100644 --- a/tests/data/valid/transaction/payload.json +++ b/tests/data/valid/transaction/payload.json @@ -85,7 +85,14 @@ "SERVER_SOFTWARE": "nginx", "GATEWAY_INTERFACE": "CGI/1.1" }, - "body": "Hello World" + "body": { + "str": "hello world", + "additional": { + "foo": {}, + "bar": 123, + "req": "additional information" + } + } }, "response": { "status_code": 200, diff --git a/tests/system/transaction.approved.json b/tests/system/transaction.approved.json index 324d61d3c7..9fe095efde 100644 --- a/tests/system/transaction.approved.json +++ b/tests/system/transaction.approved.json @@ -92,7 +92,13 @@ "remote_address": "12.53.12.1", "encrypted": true }, - "body": "Hello World", + "body": { + "additional": { + "req": "additional information", + "bar": 123 + }, + "str": "hello world" + }, "env": { "GATEWAY_INTERFACE": "CGI/1.1", "SERVER_SOFTWARE": "nginx" @@ -127,8 +133,7 @@ "navigationTiming": { "navigationStart": -21, "appBeforeBootstrap": 608.9300000000001 - }, - "performance": {} + } }, "id": "945254c5-67a5-417e-8a4e-aa29efcbfb79", "name": "GET /api/types", diff --git a/utility/map_str_enhancer.go b/utility/map_str_enhancer.go index 37c90e2703..19debbeb7a 100644 --- a/utility/map_str_enhancer.go +++ b/utility/map_str_enhancer.go @@ -1,51 +1,126 @@ package utility import ( + "reflect" + "github.com/elastic/beats/libbeat/common" ) -func AddStrWithDefault(m common.MapStr, key string, val *string, defaultVal string) { - if val != nil { - m[key] = *val - } else if defaultVal != "" { - m[key] = defaultVal - } -} - func Add(m common.MapStr, key string, val interface{}) { + if m == nil || key == "" { + return + } + if val == nil { + delete(m, key) + return + } switch val.(type) { case *bool: if newVal := val.(*bool); newVal != nil { m[key] = *newVal + } else { + delete(m, key) } case *int: if newVal := val.(*int); newVal != nil { m[key] = *newVal + } else { + delete(m, key) } case *string: if newVal := val.(*string); newVal != nil { m[key] = *newVal + } else { + delete(m, key) } case common.MapStr: if valMap := val.(common.MapStr); len(valMap) > 0 { - m[key] = valMap + for k, v := range valMap { + Add(valMap, k, v) + } + if len(valMap) > 0 { + m[key] = valMap + } else { + delete(m, key) + } + } else { + delete(m, key) + } + case map[string]interface{}: + if valMap := val.(map[string]interface{}); len(valMap) > 0 { + for k, v := range valMap { + Add(valMap, k, v) + } + if len(valMap) > 0 { + m[key] = valMap + } else { + delete(m, key) + } + } else { + delete(m, key) } - case []string: - if valArr := val.([]string); len(valArr) > 0 { - m[key] = valArr + case float64: + floatVal := val.(float64) + if floatVal == float64(int64(floatVal)) { + m[key] = int64(floatVal) + } else { + m[key] = common.Float(floatVal) } - case []common.MapStr: - if valArr := val.([]common.MapStr); len(valArr) > 0 { - m[key] = valArr + case float32: + floatVal := val.(float32) + if floatVal == float32(int32(floatVal)) { + m[key] = int32(floatVal) + } else { + m[key] = common.Float(floatVal) } + case string, bool, complex64, complex128: + m[key] = val + case int, int8, int16, int32, int64, uint, uint8, uint32, uint64: + m[key] = val default: - if val != nil { + v := reflect.ValueOf(val) + switch v.Type().Kind() { + case reflect.Slice, reflect.Array: + if v.Len() == 0 { + delete(m, key) + } else { + m[key] = val + } + + // do not store values of following type + // has been rejected so far by the libbeat normalization + case reflect.Interface, reflect.Chan, reflect.Func, reflect.UnsafePointer, reflect.Uintptr: + + default: m[key] = val } } } +// MergeAdd modifies `m` *in place*, inserting `valu` at the given `key`. +// If `key` doesn't exist in m(at the top level), it gets created. +// If the value under `key` is not a map, MergeAdd does nothing. +func MergeAdd(m common.MapStr, key string, val common.MapStr) { + if m == nil || key == "" || val == nil || len(val) == 0 { + return + } + + if _, ok := m[key]; !ok { + m[key] = common.MapStr{} + } + + if nested, ok := m[key].(common.MapStr); ok { + for k, v := range val { + Add(nested, k, v) + } + } else if nested, ok := m[key].(map[string]interface{}); ok { + for k, v := range val { + Add(nested, k, v) + } + } +} + func MillisAsMicros(ms float64) common.MapStr { m := common.MapStr{} m["us"] = int(ms * 1000) diff --git a/utility/map_str_enhancer_test.go b/utility/map_str_enhancer_test.go index 38fbc1b034..6dd7314336 100644 --- a/utility/map_str_enhancer_test.go +++ b/utility/map_str_enhancer_test.go @@ -1,7 +1,9 @@ package utility import ( + "fmt" "testing" + "unsafe" "github.com/stretchr/testify/assert" @@ -10,52 +12,298 @@ import ( const addKey = "added" +func TestAddGeneral(t *testing.T) { + var m common.MapStr + Add(m, "s", "s") + assert.Nil(t, m) + + m = common.MapStr{} + Add(m, "", "") + assert.Equal(t, common.MapStr{}, m) +} + +func TestEmptyCollections(t *testing.T) { + m := common.MapStr{"foo": "bar", "user": common.MapStr{"id": "1", "name": "bar"}} + add := common.MapStr{} + Add(m, "user", add) + assert.Equal(t, common.MapStr{"foo": "bar"}, m) + + m = common.MapStr{"foo": "bar", "user": common.MapStr{"id": "1", "name": "bar"}} + add = common.MapStr{"id": nil, "email": nil, "info": common.MapStr{"a": nil}} + Add(m, "user", add) + assert.Equal(t, common.MapStr{"foo": "bar"}, m) + + m = common.MapStr{"foo": "bar", "user": common.MapStr{"id": "1", "name": "bar"}} + add = map[string]interface{}{"id": nil, "email": nil, "info": map[string]interface{}{"a": nil}} + Add(m, "user", add) + assert.Equal(t, common.MapStr{"foo": "bar"}, m) + + m = common.MapStr{"foo": "bar", "user": common.MapStr{"id": "1", "name": "bar"}} + add = map[string]interface{}{} + Add(m, "user", add) + assert.Equal(t, common.MapStr{"foo": "bar"}, m) +} + +func TestIgnoredTypes(t *testing.T) { + m := common.MapStr{} + + Add(m, "foo", make(chan int)) + assert.Equal(t, common.MapStr{}, m) + + Add(m, "foo", func() {}) + assert.Equal(t, common.MapStr{}, m) + + uintPtr := uintptr(8) + Add(m, "foo", uintPtr) + assert.Equal(t, common.MapStr{}, m) + + a := []int{} + Add(m, "foo", unsafe.Pointer(&a)) + assert.Equal(t, common.MapStr{}, m) +} + func TestAdd(t *testing.T) { - base := common.MapStr{"existing": "foo"} - addTrue, updateTrue, expectedTrue := true, false, common.MapStr{"existing": "foo", addKey: true} - addFalse, updateFalse, expectedFalse := false, true, common.MapStr{"existing": "foo", addKey: false} - addInt, updateInt, expectedInt := 1, 2, common.MapStr{"existing": "foo", addKey: 1} - addStr, updateStr, expectedStr := "foo", "bar", common.MapStr{"existing": "foo", addKey: "foo"} - addCommonMapStr, updateCommonMapStr, expectedCommonMapStr := common.MapStr{"foo": "bar"}, common.MapStr{"john": "doe"}, common.MapStr{"existing": "foo", addKey: common.MapStr{"foo": "bar"}} - addCommonMapStrEmpty, updateCommonMapStrEmpty, expectedCommonMapStrEmpty := common.MapStr{}, common.MapStr{}, base - - var addBoolNil *bool - var addIntNil *int - var addStrNil *string - var addStrArrNil []string - testData := [][]interface{}{ - {&addTrue, &updateTrue, expectedTrue}, - {&addFalse, &updateFalse, expectedFalse}, - {&addInt, &updateInt, expectedInt}, - {&addStr, &updateStr, expectedStr}, - {addCommonMapStr, updateCommonMapStr, expectedCommonMapStr}, - {addCommonMapStrEmpty, updateCommonMapStrEmpty, expectedCommonMapStrEmpty}, - {addBoolNil, addBoolNil, base}, - {addIntNil, addIntNil, base}, - {addStrNil, addStrNil, base}, - {addStrArrNil, []string{"something"}, base}, + existing := "foo" + newArrMapStr := []common.MapStr{{"b": "bar"}} + var nilArrMapStr []common.MapStr + + newArrStr := []string{"bar"} + var nilArrStr []string + + newMap := map[string]interface{}{"b": "bar"} + var nilMap map[string]interface{} + + newMapStr := common.MapStr{"b": "bar"} + var nilMapStr common.MapStr + + newStr := "bar" + var nilStr *string + + newInt := 123 + var nilInt *int + + newBool := true + var nilBool *bool + + tests := []struct { + v interface{} + expV interface{} + nilV interface{} + }{ + { + v: "some string", + expV: "some string", + nilV: nil, + }, + { + v: &newBool, + expV: newBool, + nilV: nilBool, + }, + { + v: &newInt, + expV: newInt, + nilV: nilInt, + }, + { + v: &newStr, + expV: newStr, + nilV: nilStr, + }, + { + v: newMapStr, + expV: newMapStr, + nilV: nilMapStr, + }, + { + v: newMap, + expV: newMap, + nilV: nilMap, + }, + { + v: newArrStr, + expV: newArrStr, + nilV: nilArrStr, + }, + { + v: newArrMapStr, + expV: newArrMapStr, + nilV: nilArrMapStr, + }, + { + v: float64(5.98), + expV: common.Float(5.980000), + nilV: nil, + }, + { + v: float32(5.987654321), + expV: common.Float(float32(5.987654321)), + nilV: nil, + }, + { + v: float64(5), + expV: int64(5), + nilV: nil, + }, + { + v: float32(5), + expV: int32(5), + nilV: nil, + }, + } + + for idx, te := range tests { + // add new value + m := common.MapStr{"existing": existing} + Add(m, addKey, te.v) + expected := common.MapStr{"existing": existing, addKey: te.expV} + assert.Equal(t, expected, m, + fmt.Sprintf("<%v>: Add new value - Expected: %v, Actual: %v", idx, expected, m)) + + // replace existing value + m = common.MapStr{addKey: existing} + Add(m, addKey, te.v) + expected = common.MapStr{addKey: te.expV} + assert.Equal(t, expected, m, + fmt.Sprintf("<%v>: Replace existing value - Expected: %v, Actual: %v", idx, expected, m)) + + // remove empty value + m = common.MapStr{addKey: existing} + Add(m, addKey, te.nilV) + expected = common.MapStr{} + assert.Equal(t, expected, m, + fmt.Sprintf("<%v>: Remove empty value - Expected: %v, Actual: %v", idx, expected, m)) } - for _, testDataRow := range testData { - base := baseMapStr() +} - Add(base, addKey, testDataRow[0]) - assert.Equal(t, testDataRow[2], base) +func TestMergeAddCommonMapStr(t *testing.T) { + type M = common.MapStr + testData := []struct { + data M + key string + values M + result M + }{ + { + //map is nil + nil, + "a", + M{"a": 1}, + nil, + }, + { + //key is nil + M{"a": 1}, + "", + M{"a": 2}, + M{"a": 1}, + }, + { + //val is nil + M{"a": 1}, + "a", + nil, + M{"a": 1}, + }, + { + //val is empty + M{"a": 1}, + "a", + M{}, + M{"a": 1}, + }, + { + M{"a": 1}, + "b", + M{"c": 2}, + M{"a": 1, "b": M{"c": 2}}, + }, + { + M{"a": 1}, + "a", + M{"b": 2}, + M{"a": 1}, + }, + { + M{"a": M{"b": 1}}, + "a", + M{"c": 2}, + M{"a": M{"b": 1, "c": 2}}, + }, + } + for idx, test := range testData { + MergeAdd(test.data, test.key, test.values) + assert.Equal(t, test.result, test.data, + fmt.Sprintf("At (%v): Expected %s, got %s", idx, test.result, test.data)) } } -func TestStringWithDefault(t *testing.T) { - base := baseMapStr() - add := "foo" - newMap := common.MapStr{"existing": "foo", "added": "foo"} - AddStrWithDefault(base, addKey, &add, "bar") - assert.Equal(t, newMap, base) - - base = baseMapStr() - AddStrWithDefault(base, addKey, nil, "bar") - newMap = common.MapStr{"existing": "foo", "added": "bar"} - assert.Equal(t, newMap, base) +func TestMergeAddMap(t *testing.T) { + type M = map[string]interface{} + type CM = common.MapStr + testData := []struct { + data M + key string + values CM + result M + }{ + { + //map is nil + nil, + "a", + M{"a": 1}, + nil, + }, + { + //key is nil + M{"a": 1}, + "", + M{"a": 2}, + M{"a": 1}, + }, + { + //val is nil + M{"a": 1}, + "a", + nil, + M{"a": 1}, + }, + { + //val is empty + M{"a": 1}, + "a", + CM{}, + M{"a": 1}, + }, + { + M{"a": 1}, + "b", + CM{"c": 2}, + M{"a": 1, "b": CM{"c": 2}}, + }, + { + M{"a": 1}, + "a", + CM{"b": 2}, + M{"a": 1}, + }, + { + M{"a": M{"b": 1}}, + "a", + CM{"c": 2}, + M{"a": M{"b": 1, "c": 2}}, + }, + } + for idx, test := range testData { + MergeAdd(test.data, test.key, test.values) + assert.Equal(t, test.result, test.data, + fmt.Sprintf("At (%v): Expected %s, got %s", idx, test.result, test.data)) + } } -func baseMapStr() common.MapStr { - return common.MapStr{"existing": "foo"} +func TestMillisAsMicros(t *testing.T) { + ms := 4.5 + m := MillisAsMicros(ms) + expectedMap := common.MapStr{"us": 4500} + assert.Equal(t, expectedMap, m) } diff --git a/utility/rfc3399decoder.go b/utility/rfc3399decoder.go index f761c6b8d9..b351f85192 100644 --- a/utility/rfc3399decoder.go +++ b/utility/rfc3399decoder.go @@ -8,7 +8,11 @@ import ( func RFC3339DecoderHook(f reflect.Type, t reflect.Type, data interface{}) (interface{}, error) { // stolen from https://github.com/mitchellh/mapstructure/issues/41 if t == reflect.TypeOf(time.Time{}) && f == reflect.TypeOf("") { - return time.Parse(time.RFC3339, data.(string)) + parsed, err := time.Parse(time.RFC3339, data.(string)) + if err != nil { + return nil, err + } + return time.Time(parsed.UTC()), nil } return data, nil }