From 5b5a5bc987eab7a0457b92ee7459949b405caa6e Mon Sep 17 00:00:00 2001 From: Dominik Schubert Date: Tue, 21 Mar 2023 14:21:43 +0100 Subject: [PATCH 1/2] require opt-in for xray telemetry --- cmd/localstack/main.go | 35 +++++++++++++++++++---------------- cmd/localstack/xraydaemon.go | 27 ++++++++++++++++++--------- 2 files changed, 37 insertions(+), 25 deletions(-) diff --git a/cmd/localstack/main.go b/cmd/localstack/main.go index 8a4187a..baafc37 100644 --- a/cmd/localstack/main.go +++ b/cmd/localstack/main.go @@ -15,17 +15,18 @@ import ( ) type LsOpts struct { - InteropPort string - RuntimeEndpoint string - RuntimeId string - InitTracingPort string - User string - CodeArchives string - HotReloadingPaths []string - EnableDnsServer string - LocalstackIP string - InitLogLevel string - EdgePort string + InteropPort string + RuntimeEndpoint string + RuntimeId string + InitTracingPort string + User string + CodeArchives string + HotReloadingPaths []string + EnableDnsServer string + LocalstackIP string + InitLogLevel string + EdgePort string + EnableXRayTelemetry string } func GetEnvOrDie(env string) string { @@ -48,10 +49,11 @@ func InitLsOpts() *LsOpts { InitLogLevel: GetenvWithDefault("LOCALSTACK_INIT_LOG_LEVEL", "debug"), EdgePort: GetenvWithDefault("EDGE_PORT", "4566"), // optional or empty - CodeArchives: os.Getenv("LOCALSTACK_CODE_ARCHIVES"), - HotReloadingPaths: strings.Split(GetenvWithDefault("LOCALSTACK_HOT_RELOADING_PATHS", ""), ","), - EnableDnsServer: os.Getenv("LOCALSTACK_ENABLE_DNS_SERVER"), - LocalstackIP: os.Getenv("LOCALSTACK_HOSTNAME"), + CodeArchives: os.Getenv("LOCALSTACK_CODE_ARCHIVES"), + HotReloadingPaths: strings.Split(GetenvWithDefault("LOCALSTACK_HOT_RELOADING_PATHS", ""), ","), + EnableDnsServer: os.Getenv("LOCALSTACK_ENABLE_DNS_SERVER"), + EnableXRayTelemetry: os.Getenv("LOCALSTACK_ENABLE_XRAY_TELEMETRY"), + LocalstackIP: os.Getenv("LOCALSTACK_HOSTNAME"), } } @@ -67,6 +69,7 @@ func UnsetLsEnvs() { "LOCALSTACK_CODE_ARCHIVES", "LOCALSTACK_HOT_RELOADING_PATHS", "LOCALSTACK_ENABLE_DNS_SERVER", + "LOCALSTACK_ENABLE_XRAY_TELEMETRY", "LOCALSTACK_INIT_LOG_LEVEL", // Docker container ID "HOSTNAME", @@ -146,7 +149,7 @@ func main() { // xray daemon xrayConfig := initConfig("http://" + lsOpts.LocalstackIP + ":" + lsOpts.EdgePort) - d := initDaemon(xrayConfig) + d := initDaemon(xrayConfig, lsOpts.EnableXRayTelemetry == "1") sandbox.AddShutdownFunc(func() { log.Debugln("Shutting down xray daemon") d.stop() diff --git a/cmd/localstack/xraydaemon.go b/cmd/localstack/xraydaemon.go index 8d2b768..6f95692 100644 --- a/cmd/localstack/xraydaemon.go +++ b/cmd/localstack/xraydaemon.go @@ -85,7 +85,7 @@ func initConfig(endpoint string) *cfg.Config { return xrayConfig } -func initDaemon(config *cfg.Config) *Daemon { +func initDaemon(config *cfg.Config, enableTelemetry bool) *Daemon { if logFile != "" { var fileWriter io.Writer if *config.Logging.LogRotation { @@ -133,8 +133,9 @@ func initDaemon(config *cfg.Config) *Daemon { awsConfig, session := conn.GetAWSConfigSession(&conn.Conn{}, config, config.RoleARN, config.Region, noMetadata) log.Infof("Using region: %v", aws.StringValue(awsConfig.Region)) - log.Debugf("ARN of the AWS resource running the daemon: %v", config.ResourceARN) - telemetry.Init(awsConfig, session, config.ResourceARN, noMetadata) + if enableTelemetry { + telemetry.Init(awsConfig, session, config.ResourceARN, noMetadata) + } // If calculated number of buffer is lower than our default, use calculated one. Otherwise, use default value. parameterConfig.Processor.BatchSize = util.GetMinIntValue(parameterConfig.Processor.BatchSize, buffers) @@ -179,10 +180,14 @@ func (d *Daemon) close() { // Signal routines to finish // This will push telemetry and customer segments in parallel d.std.Close() - telemetry.T.Quit <- true + if telemetry.T != nil { + telemetry.T.Quit <- true + } <-d.processor.Done - <-telemetry.T.Done + if telemetry.T != nil { + <-telemetry.T.Done + } log.Debugf("Trace segment: received: %d, truncated: %d, processed: %d", atomic.LoadUint64(&d.count), d.std.TruncatedCount(), d.processor.ProcessedCount()) log.Debugf("Shutdown finished. Current epoch in nanoseconds: %v", time.Now().UnixNano()) @@ -226,7 +231,7 @@ func (d *Daemon) poll() { fallbackPointerUsed = true } rlen := d.read(bufPointer) - if rlen > 0 { + if rlen > 0 && telemetry.T != nil { telemetry.T.SegmentReceived(1) } if rlen == 0 { @@ -235,7 +240,7 @@ func (d *Daemon) poll() { } continue } - if fallbackPointerUsed { + if fallbackPointerUsed && telemetry.T != nil { log.Warn("Segment dropped. Consider increasing memory limit") telemetry.T.SegmentSpillover(1) continue @@ -250,7 +255,9 @@ func (d *Daemon) poll() { if len(slices[1]) == 0 { log.Warnf("Missing header or segment: %s", string(slices[0])) d.pool.Return(bufPointer) - telemetry.T.SegmentRejected(1) + if telemetry.T != nil { + telemetry.T.SegmentRejected(1) + } continue } @@ -264,7 +271,9 @@ func (d *Daemon) poll() { default: log.Warnf("Invalid header: %s", string(header)) d.pool.Return(bufPointer) - telemetry.T.SegmentRejected(1) + if telemetry.T != nil { + telemetry.T.SegmentRejected(1) + } continue } From 0478d760cff759be5e65e2fd3718ecbc2bf5088c Mon Sep 17 00:00:00 2001 From: Dominik Schubert Date: Tue, 21 Mar 2023 14:35:42 +0100 Subject: [PATCH 2/2] only skip telemetry call if disabled --- cmd/localstack/xraydaemon.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cmd/localstack/xraydaemon.go b/cmd/localstack/xraydaemon.go index 6f95692..686c2a6 100644 --- a/cmd/localstack/xraydaemon.go +++ b/cmd/localstack/xraydaemon.go @@ -240,9 +240,11 @@ func (d *Daemon) poll() { } continue } - if fallbackPointerUsed && telemetry.T != nil { + if fallbackPointerUsed { log.Warn("Segment dropped. Consider increasing memory limit") - telemetry.T.SegmentSpillover(1) + if telemetry.T != nil { + telemetry.T.SegmentSpillover(1) + } continue } else if rlen == -1 { return