From 04c43bf2e55ce989a6c3b0759c13061a72e436a3 Mon Sep 17 00:00:00 2001 From: Nicholas Hulston Date: Tue, 8 Oct 2024 16:28:07 -0400 Subject: [PATCH 1/5] Add ``/sns/produce` endpoint --- .../build/docker/golang/app/net-http/main.go | 131 ++++++++++++++++++ 1 file changed, 131 insertions(+) diff --git a/utils/build/docker/golang/app/net-http/main.go b/utils/build/docker/golang/app/net-http/main.go index a506c8e2945..503ba78cf8a 100644 --- a/utils/build/docker/golang/app/net-http/main.go +++ b/utils/build/docker/golang/app/net-http/main.go @@ -5,6 +5,9 @@ import ( "encoding/json" "errors" "fmt" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" "io" "log" "math/rand" @@ -19,6 +22,8 @@ import ( "weblog/internal/rasp" "github.com/Shopify/sarama" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/sns" saramatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/Shopify/sarama" "gopkg.in/DataDog/dd-trace-go.v1/datastreams" @@ -514,6 +519,132 @@ func main() { mux.HandleFunc("/rasp/ssrf", rasp.SSRF) mux.HandleFunc("/rasp/sqli", rasp.SQLi) + mux.HandleFunc("/sns/produce", func(w http.ResponseWriter, r *http.Request) { + queue := r.URL.Query().Get("queue") + if queue == "" { + queue = "DistributedTracing SNS" + } + topic := r.URL.Query().Get("topic") + if topic == "" { + topic = "DistributedTracing SNS Topic" + } + message := r.URL.Query().Get("message") + if message == "" { + message = "Hello from Go SNS -> SQS" + } + + cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1")) + if err != nil { + log.Printf("[SNS->SQS] Error loading AWS configuration: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + snsClient := sns.NewFromConfig(cfg) + sqsClient := sqs.NewFromConfig(cfg) + + var topicArn string + var queueUrl string + var queueArn string + + // Create SNS topic and SQS queue + topicResult, err := snsClient.CreateTopic(context.TODO(), &sns.CreateTopicInput{ + Name: aws.String(topic), + }) + if err != nil { + log.Printf("[SNS->SQS] Error during Go SNS create topic: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + topicArn = *topicResult.TopicArn + + queueResult, err := sqsClient.CreateQueue(context.TODO(), &sqs.CreateQueueInput{ + QueueName: aws.String(queue), + }) + if err != nil { + log.Printf("[SNS->SQS] Error during Go SQS create queue: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + queueUrl = *queueResult.QueueUrl + + // Get queue ARN + queueAttrs, err := sqsClient.GetQueueAttributes(context.TODO(), &sqs.GetQueueAttributesInput{ + QueueUrl: aws.String(queueUrl), + AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameQueueArn}, + }) + if err != nil { + log.Printf("[SNS->SQS] Error getting queue attributes: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + queueArn = queueAttrs.Attributes[string(types.QueueAttributeNameQueueArn)] + + // Set queue policy + policy := map[string]interface{}{ + "Version": "2012-10-17", + "Id": fmt.Sprintf("%s/SQSDefaultPolicy", queueArn), + "Statement": []map[string]interface{}{ + { + "Sid": "Allow-SNS-SendMessage", + "Effect": "Allow", + "Principal": map[string]string{ + "Service": "sns.amazonaws.com", + }, + "Action": "sqs:SendMessage", + "Resource": queueArn, + "Condition": map[string]interface{}{ + "ArnEquals": map[string]string{ + "aws:SourceArn": topicArn, + }, + }, + }, + }, + } + policyJSON, _ := json.Marshal(policy) + _, err = sqsClient.SetQueueAttributes(context.TODO(), &sqs.SetQueueAttributesInput{ + QueueUrl: aws.String(queueUrl), + Attributes: map[string]string{ + "Policy": string(policyJSON), + }, + }) + if err != nil { + log.Printf("[SNS->SQS] Error setting queue policy: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + // Subscribe SQS to SNS + _, err = snsClient.Subscribe(context.TODO(), &sns.SubscribeInput{ + TopicArn: aws.String(topicArn), + Protocol: aws.String("sqs"), + Endpoint: aws.String(queueArn), + Attributes: map[string]string{"RawMessageDelivery": "true"}, + }) + if err != nil { + log.Printf("[SNS->SQS] Error subscribing SQS to SNS: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + log.Printf("[SNS->SQS] Created SNS Topic: %s and SQS Queue: %s", topic, queue) + + // Publish message to SNS topic + _, err = snsClient.Publish(context.TODO(), &sns.PublishInput{ + Message: aws.String(message), + TopicArn: aws.String(topicArn), + }) + if err != nil { + log.Printf("[SNS->SQS] Error during Go SNS publish message: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + log.Printf("[SNS->SQS] Go SNS message published successfully") + w.WriteHeader(http.StatusOK) + w.Write([]byte("SNS Produce ok")) + }) + common.InitDatadog() go grpc.ListenAndServe() http.ListenAndServe(":7777", mux) From f48646f0c7bbbb1a84307e94b030f02b368686a3 Mon Sep 17 00:00:00 2001 From: Nicholas Hulston Date: Wed, 9 Oct 2024 10:32:25 -0400 Subject: [PATCH 2/5] add `/sns/consume` endpoint and refactor --- .../docker/golang/app/net-http/aws/sns.go | 150 ++++++++++++++++++ .../build/docker/golang/app/net-http/main.go | 125 +++------------ 2 files changed, 174 insertions(+), 101 deletions(-) create mode 100644 utils/build/docker/golang/app/net-http/aws/sns.go diff --git a/utils/build/docker/golang/app/net-http/aws/sns.go b/utils/build/docker/golang/app/net-http/aws/sns.go new file mode 100644 index 00000000000..22dc05bc58e --- /dev/null +++ b/utils/build/docker/golang/app/net-http/aws/sns.go @@ -0,0 +1,150 @@ +package aws + +import ( + "context" + "encoding/json" + "fmt" + "log" + "strings" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/sns" + "github.com/aws/aws-sdk-go-v2/service/sqs" +) + +func SnsProduce(queue, topic, message string) (string, error) { + cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1")) + if err != nil { + return "", fmt.Errorf("[SNS->SQS] Error loading AWS configuration: %v", err) + } + + snsClient := sns.NewFromConfig(cfg) + sqsClient := sqs.NewFromConfig(cfg) + + // Create SNS topic + topicResult, err := snsClient.CreateTopic(context.TODO(), &sns.CreateTopicInput{ + Name: aws.String(topic), + }) + if err != nil { + return "", fmt.Errorf("[SNS->SQS] Error during Go SNS create topic: %v", err) + } + topicArn := *topicResult.TopicArn + + // Create SQS queue + queueResult, err := sqsClient.CreateQueue(context.TODO(), &sqs.CreateQueueInput{ + QueueName: aws.String(queue), + }) + if err != nil { + return "", fmt.Errorf("[SNS->SQS] Error during Go SQS create queue: %v", err) + } + queueUrl := *queueResult.QueueUrl + + // Get queue ARN + urlParts := strings.Split(queueUrl, "/") + queueArn := fmt.Sprintf("arn:aws:sqs:%s:%s:%s", "us-east-1", urlParts[len(urlParts)-2], urlParts[len(urlParts)-1]) + + // Set queue policy + policy := map[string]interface{}{ + "Version": "2012-10-17", + "Id": fmt.Sprintf("%s/SQSDefaultPolicy", queueArn), + "Statement": []map[string]interface{}{ + { + "Sid": "Allow-SNS-SendMessage", + "Effect": "Allow", + "Principal": map[string]string{ + "Service": "sns.amazonaws.com", + }, + "Action": "sqs:SendMessage", + "Resource": queueArn, + "Condition": map[string]interface{}{ + "ArnEquals": map[string]string{ + "aws:SourceArn": topicArn, + }, + }, + }, + }, + } + policyJSON, _ := json.Marshal(policy) + _, err = sqsClient.SetQueueAttributes(context.TODO(), &sqs.SetQueueAttributesInput{ + QueueUrl: aws.String(queueUrl), + Attributes: map[string]string{ + "Policy": string(policyJSON), + }, + }) + if err != nil { + return "", fmt.Errorf("[SNS->SQS] Error setting queue policy: %v", err) + } + + // Subscribe SQS to SNS + _, err = snsClient.Subscribe(context.TODO(), &sns.SubscribeInput{ + TopicArn: aws.String(topicArn), + Protocol: aws.String("sqs"), + Endpoint: aws.String(queueArn), + Attributes: map[string]string{"RawMessageDelivery": "true"}, + }) + if err != nil { + return "", fmt.Errorf("[SNS->SQS] Error subscribing SQS to SNS: %v", err) + } + + log.Printf("[SNS->SQS] Created SNS Topic: %s and SQS Queue: %s", topic, queue) + + // Publish message to SNS topic + _, err = snsClient.Publish(context.TODO(), &sns.PublishInput{ + Message: aws.String(message), + TopicArn: aws.String(topicArn), + }) + if err != nil { + log.Printf("[SNS->SQS] Error during Go SNS publish message: %v", err) + return "", fmt.Errorf("[SNS->SQS] Error during Go SNS publish message: %v", err) + } + + log.Printf("[SNS->SQS] Go SNS message published successfully") + return "SNS Produce ok", nil +} + +func SnsConsume(queue, expectedMessage string, timeout int) (map[string]string, error) { + cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1")) + if err != nil { + return nil, fmt.Errorf("[SNS->SQS] Error loading AWS configuration: %v", err) + } + + sqsClient := sqs.NewFromConfig(cfg) + + queueURL := fmt.Sprintf("https://sqs.us-east-1.amazonaws.com/601427279990/%s", queue) + startTime := time.Now() + + for time.Since(startTime) < time.Duration(timeout)*time.Second { + output, err := sqsClient.ReceiveMessage(context.TODO(), &sqs.ReceiveMessageInput{ + QueueUrl: aws.String(queueURL), + }) + + if err != nil { + log.Printf("[SNS->SQS] %v", err) + continue + } + + for _, message := range output.Messages { + log.Printf("[SNS->SQS] Consumed: %+v", message) + + if *message.Body == expectedMessage { + log.Printf("[SNS->SQS] Success. Found the following message: %s", *message.Body) + return map[string]string{"message": *message.Body}, nil + } + + log.Printf("[SNS->SQS] Trying to decode raw message: %s", *message.Body) + var messageJSON map[string]interface{} + if err := json.Unmarshal([]byte(*message.Body), &messageJSON); err == nil { + if msg, ok := messageJSON["Message"].(string); ok && msg == expectedMessage { + log.Printf("[SNS->SQS] Success. Found the following message: %s", msg) + return map[string]string{"message": msg}, nil + } + } + } + + time.Sleep(time.Second) + } + + return map[string]string{"error": "[SNS->SQS] No messages to consume"}, nil +} diff --git a/utils/build/docker/golang/app/net-http/main.go b/utils/build/docker/golang/app/net-http/main.go index 503ba78cf8a..8806d63cb08 100644 --- a/utils/build/docker/golang/app/net-http/main.go +++ b/utils/build/docker/golang/app/net-http/main.go @@ -5,9 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/sqs" - "github.com/aws/aws-sdk-go-v2/service/sqs/types" "io" "log" "math/rand" @@ -16,15 +13,13 @@ import ( "os" "strconv" "time" + awsHelpers "weblog/net-http/aws" "weblog/internal/common" "weblog/internal/grpc" "weblog/internal/rasp" "github.com/Shopify/sarama" - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/service/sns" - saramatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/Shopify/sarama" "gopkg.in/DataDog/dd-trace-go.v1/datastreams" @@ -533,116 +528,44 @@ func main() { message = "Hello from Go SNS -> SQS" } - cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1")) + result, err := awsHelpers.SnsProduce(queue, topic, message) if err != nil { - log.Printf("[SNS->SQS] Error loading AWS configuration: %v", err) - w.WriteHeader(http.StatusInternalServerError) + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) return } - snsClient := sns.NewFromConfig(cfg) - sqsClient := sqs.NewFromConfig(cfg) - - var topicArn string - var queueUrl string - var queueArn string - - // Create SNS topic and SQS queue - topicResult, err := snsClient.CreateTopic(context.TODO(), &sns.CreateTopicInput{ - Name: aws.String(topic), - }) - if err != nil { - log.Printf("[SNS->SQS] Error during Go SNS create topic: %v", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - topicArn = *topicResult.TopicArn + w.WriteHeader(http.StatusOK) + w.Write([]byte(result)) + }) - queueResult, err := sqsClient.CreateQueue(context.TODO(), &sqs.CreateQueueInput{ - QueueName: aws.String(queue), - }) - if err != nil { - log.Printf("[SNS->SQS] Error during Go SQS create queue: %v", err) - w.WriteHeader(http.StatusInternalServerError) - return + mux.HandleFunc("/sns/consume", func(w http.ResponseWriter, r *http.Request) { + queue := r.URL.Query().Get("queue") + if queue == "" { + queue = "DistributedTracing SNS" } - queueUrl = *queueResult.QueueUrl - - // Get queue ARN - queueAttrs, err := sqsClient.GetQueueAttributes(context.TODO(), &sqs.GetQueueAttributesInput{ - QueueUrl: aws.String(queueUrl), - AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameQueueArn}, - }) - if err != nil { - log.Printf("[SNS->SQS] Error getting queue attributes: %v", err) - w.WriteHeader(http.StatusInternalServerError) - return + timeout, err := strconv.Atoi(r.URL.Query().Get("timeout")) + if err != nil || timeout <= 0 { + timeout = 60 // Default timeout } - queueArn = queueAttrs.Attributes[string(types.QueueAttributeNameQueueArn)] - - // Set queue policy - policy := map[string]interface{}{ - "Version": "2012-10-17", - "Id": fmt.Sprintf("%s/SQSDefaultPolicy", queueArn), - "Statement": []map[string]interface{}{ - { - "Sid": "Allow-SNS-SendMessage", - "Effect": "Allow", - "Principal": map[string]string{ - "Service": "sns.amazonaws.com", - }, - "Action": "sqs:SendMessage", - "Resource": queueArn, - "Condition": map[string]interface{}{ - "ArnEquals": map[string]string{ - "aws:SourceArn": topicArn, - }, - }, - }, - }, - } - policyJSON, _ := json.Marshal(policy) - _, err = sqsClient.SetQueueAttributes(context.TODO(), &sqs.SetQueueAttributesInput{ - QueueUrl: aws.String(queueUrl), - Attributes: map[string]string{ - "Policy": string(policyJSON), - }, - }) - if err != nil { - log.Printf("[SNS->SQS] Error setting queue policy: %v", err) - w.WriteHeader(http.StatusInternalServerError) - return + expectedMessage := r.URL.Query().Get("message") + if expectedMessage == "" { + expectedMessage = "Hello from Go SNS -> SQS" } - // Subscribe SQS to SNS - _, err = snsClient.Subscribe(context.TODO(), &sns.SubscribeInput{ - TopicArn: aws.String(topicArn), - Protocol: aws.String("sqs"), - Endpoint: aws.String(queueArn), - Attributes: map[string]string{"RawMessageDelivery": "true"}, - }) + result, err := awsHelpers.SnsConsume(queue, expectedMessage, timeout) if err != nil { - log.Printf("[SNS->SQS] Error subscribing SQS to SNS: %v", err) w.WriteHeader(http.StatusInternalServerError) + json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) return } - log.Printf("[SNS->SQS] Created SNS Topic: %s and SQS Queue: %s", topic, queue) - - // Publish message to SNS topic - _, err = snsClient.Publish(context.TODO(), &sns.PublishInput{ - Message: aws.String(message), - TopicArn: aws.String(topicArn), - }) - if err != nil { - log.Printf("[SNS->SQS] Error during Go SNS publish message: %v", err) - w.WriteHeader(http.StatusInternalServerError) - return + if _, hasError := result["error"]; hasError { + w.WriteHeader(http.StatusBadRequest) + } else { + w.WriteHeader(http.StatusOK) } - - log.Printf("[SNS->SQS] Go SNS message published successfully") - w.WriteHeader(http.StatusOK) - w.Write([]byte("SNS Produce ok")) + json.NewEncoder(w).Encode(result) }) common.InitDatadog() From f81687d93f42dda5bdf722bb3672f7c208739907 Mon Sep 17 00:00:00 2001 From: Nicholas Hulston Date: Wed, 9 Oct 2024 10:39:51 -0400 Subject: [PATCH 3/5] add `/sqs/produce` endpoint --- .../docker/golang/app/net-http/aws/sqs.go | 43 +++++++++++++++++++ .../build/docker/golang/app/net-http/main.go | 21 +++++++++ 2 files changed, 64 insertions(+) create mode 100644 utils/build/docker/golang/app/net-http/aws/sqs.go diff --git a/utils/build/docker/golang/app/net-http/aws/sqs.go b/utils/build/docker/golang/app/net-http/aws/sqs.go new file mode 100644 index 00000000000..1a0cc933a36 --- /dev/null +++ b/utils/build/docker/golang/app/net-http/aws/sqs.go @@ -0,0 +1,43 @@ +package aws + +import ( + "context" + "fmt" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "log" +) + +func SqsProduce(queue, message string) (string, error) { + cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1")) + if err != nil { + return "", fmt.Errorf("[SQS] Error loading AWS configuration: %v", err) + } + + sqsClient := sqs.NewFromConfig(cfg) + + // Create SQS queue + _, err = sqsClient.CreateQueue(context.TODO(), &sqs.CreateQueueInput{ + QueueName: aws.String(queue), + }) + if err != nil { + log.Printf("[SQS] Error during Go SQS create queue: %v", err) + } else { + log.Printf("[SQS] Created SQS Queue with name: %s", queue) + } + + // Send message to SQS queue + queueURL := fmt.Sprintf("https://sqs.us-east-1.amazonaws.com/601427279990/%s", queue) + _, err = sqsClient.SendMessage(context.TODO(), &sqs.SendMessageInput{ + QueueUrl: aws.String(queueURL), + MessageBody: aws.String(message), + }) + if err != nil { + log.Printf("[SQS] Error during Go SQS send message: %v", err) + return "", fmt.Errorf("[SQS] Error during Go SQS send message: %v", err) + } + + log.Printf("[SQS] Go SQS message sent successfully") + return "SQS Produce ok", nil +} diff --git a/utils/build/docker/golang/app/net-http/main.go b/utils/build/docker/golang/app/net-http/main.go index 8806d63cb08..d85a23d8a80 100644 --- a/utils/build/docker/golang/app/net-http/main.go +++ b/utils/build/docker/golang/app/net-http/main.go @@ -568,6 +568,27 @@ func main() { json.NewEncoder(w).Encode(result) }) + mux.HandleFunc("/sqs/produce", func(w http.ResponseWriter, r *http.Request) { + queue := r.URL.Query().Get("queue") + if queue == "" { + queue = "DistributedTracing" + } + message := r.URL.Query().Get("message") + if message == "" { + message = "Hello from Go SQS" + } + + result, err := awsHelpers.SqsProduce(queue, message) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) + return + } + + w.WriteHeader(http.StatusOK) + w.Write([]byte(result)) + }) + common.InitDatadog() go grpc.ListenAndServe() http.ListenAndServe(":7777", mux) From a52a549b1cad5ab7051f19f82f269aeb58e3cde7 Mon Sep 17 00:00:00 2001 From: Nicholas Hulston Date: Wed, 9 Oct 2024 10:44:12 -0400 Subject: [PATCH 4/5] add `/sqs/consume` endpoint; add comments --- .../docker/golang/app/net-http/aws/sns.go | 2 + .../docker/golang/app/net-http/aws/sqs.go | 38 +++++++++++++++++++ .../build/docker/golang/app/net-http/main.go | 32 +++++++++++++++- 3 files changed, 71 insertions(+), 1 deletion(-) diff --git a/utils/build/docker/golang/app/net-http/aws/sns.go b/utils/build/docker/golang/app/net-http/aws/sns.go index 22dc05bc58e..4e6c467c1cf 100644 --- a/utils/build/docker/golang/app/net-http/aws/sns.go +++ b/utils/build/docker/golang/app/net-http/aws/sns.go @@ -14,6 +14,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/sqs" ) +// SnsProduce The goal of this function is to trigger sns producer calls func SnsProduce(queue, topic, message string) (string, error) { cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1")) if err != nil { @@ -104,6 +105,7 @@ func SnsProduce(queue, topic, message string) (string, error) { return "SNS Produce ok", nil } +// SnsConsume The goal of this function is to trigger sns consumer calls func SnsConsume(queue, expectedMessage string, timeout int) (map[string]string, error) { cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1")) if err != nil { diff --git a/utils/build/docker/golang/app/net-http/aws/sqs.go b/utils/build/docker/golang/app/net-http/aws/sqs.go index 1a0cc933a36..a0b94fe4fc0 100644 --- a/utils/build/docker/golang/app/net-http/aws/sqs.go +++ b/utils/build/docker/golang/app/net-http/aws/sqs.go @@ -7,8 +7,10 @@ import ( "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sqs" "log" + "time" ) +// SqsProduce The goal of this function is to trigger sqs producer calls func SqsProduce(queue, message string) (string, error) { cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1")) if err != nil { @@ -41,3 +43,39 @@ func SqsProduce(queue, message string) (string, error) { log.Printf("[SQS] Go SQS message sent successfully") return "SQS Produce ok", nil } + +// SqsConsume The goal of this function is to trigger sqs consumer calls +func SqsConsume(queue, expectedMessage string, timeout int) (map[string]string, error) { + cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1")) + if err != nil { + return nil, fmt.Errorf("[SQS] Error loading AWS configuration: %v", err) + } + + sqsClient := sqs.NewFromConfig(cfg) + + queueURL := fmt.Sprintf("https://sqs.us-east-1.amazonaws.com/601427279990/%s", queue) + startTime := time.Now() + + for time.Since(startTime) < time.Duration(timeout)*time.Second { + output, err := sqsClient.ReceiveMessage(context.TODO(), &sqs.ReceiveMessageInput{ + QueueUrl: aws.String(queueURL), + }) + + if err != nil { + log.Printf("[SQS] Error receiving message: %v", err) + continue + } + + for _, message := range output.Messages { + if *message.Body == expectedMessage { + log.Printf("Consumed the following SQS message with params: %+v", message) + log.Printf("Consumed the following SQS message: %s", *message.Body) + return map[string]string{"message": *message.Body}, nil + } + } + + time.Sleep(time.Second) + } + + return map[string]string{"error": "No messages to consume"}, nil +} diff --git a/utils/build/docker/golang/app/net-http/main.go b/utils/build/docker/golang/app/net-http/main.go index d85a23d8a80..ce012474eaf 100644 --- a/utils/build/docker/golang/app/net-http/main.go +++ b/utils/build/docker/golang/app/net-http/main.go @@ -13,13 +13,14 @@ import ( "os" "strconv" "time" - awsHelpers "weblog/net-http/aws" "weblog/internal/common" "weblog/internal/grpc" "weblog/internal/rasp" + awsHelpers "weblog/net-http/aws" "github.com/Shopify/sarama" + saramatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/Shopify/sarama" "gopkg.in/DataDog/dd-trace-go.v1/datastreams" @@ -589,6 +590,35 @@ func main() { w.Write([]byte(result)) }) + mux.HandleFunc("/sqs/consume", func(w http.ResponseWriter, r *http.Request) { + queue := r.URL.Query().Get("queue") + if queue == "" { + queue = "DistributedTracing" + } + timeout, err := strconv.Atoi(r.URL.Query().Get("timeout")) + if err != nil || timeout <= 0 { + timeout = 60 // Default timeout + } + expectedMessage := r.URL.Query().Get("message") + if expectedMessage == "" { + expectedMessage = "Hello from Go SQS" + } + + result, err := awsHelpers.SqsConsume(queue, expectedMessage, timeout) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) + return + } + + if _, hasError := result["error"]; hasError { + w.WriteHeader(http.StatusBadRequest) + } else { + w.WriteHeader(http.StatusOK) + } + json.NewEncoder(w).Encode(result) + }) + common.InitDatadog() go grpc.ListenAndServe() http.ListenAndServe(":7777", mux) From a246709b8a31cc30afda31aa0c21d373285bc368 Mon Sep 17 00:00:00 2001 From: Nicholas Hulston Date: Fri, 11 Oct 2024 10:48:07 -0400 Subject: [PATCH 5/5] Enable `produce` tests for sqs and sns_to_sqs --- manifests/golang.yml | 6 ++--- .../crossed_integrations/test_sns_to_sqs.py | 3 +-- .../crossed_integrations/test_sqs.py | 6 ++--- .../docker/golang/app/net-http/aws/sns.go | 13 ++++++---- .../docker/golang/app/net-http/aws/sqs.go | 11 +++++---- .../build/docker/golang/app/net-http/main.go | 24 +++++++------------ 6 files changed, 29 insertions(+), 34 deletions(-) diff --git a/manifests/golang.yml b/manifests/golang.yml index 9062f5cefc7..f1f2285fb1b 100644 --- a/manifests/golang.yml +++ b/manifests/golang.yml @@ -392,14 +392,14 @@ tests/: test_sns_to_sqs.py: Test_SNS_Propagation: "*": irrelevant - net-http: missing_feature + net-http: v0.1 # real version not known test_sqs.py: Test_SQS_PROPAGATION_VIA_AWS_XRAY_HEADERS: "*": irrelevant - net-http: missing_feature (Endpoint not implemented) + net-http: v0.1 # real version not known Test_SQS_PROPAGATION_VIA_MESSAGE_ATTRIBUTES: "*": irrelevant - net-http: missing_feature (Endpoint not implemented) + net-http: v0.1 # real version not known test_db_integrations_sql.py: Test_MsSql: missing_feature Test_MySql: missing_feature diff --git a/tests/integrations/crossed_integrations/test_sns_to_sqs.py b/tests/integrations/crossed_integrations/test_sns_to_sqs.py index 3ff737631aa..89fb045b6ff 100644 --- a/tests/integrations/crossed_integrations/test_sns_to_sqs.py +++ b/tests/integrations/crossed_integrations/test_sns_to_sqs.py @@ -139,7 +139,6 @@ def test_produce(self): topic=self.WEBLOG_TO_BUDDY_TOPIC, ) - @missing_feature(library="golang", reason="Expected to fail, Golang does not propagate context") @missing_feature(library="ruby", reason="Expected to fail, Ruby does not propagate context") def test_produce_trace_equality(self): """This test relies on the setup for produce, it currently cannot be run on its own""" @@ -205,7 +204,7 @@ def test_consume(self): topic=self.BUDDY_TO_WEBLOG_TOPIC, ) - @missing_feature(library="golang", reason="Expected to fail, Golang does not propagate context") + @missing_feature(library="golang", reason="Expected to fail, Golang does not extract context") @missing_feature(library="ruby", reason="Expected to fail, Ruby does not propagate context") def test_consume_trace_equality(self): """This test relies on the setup for consume, it currently cannot be run on its own""" diff --git a/tests/integrations/crossed_integrations/test_sqs.py b/tests/integrations/crossed_integrations/test_sqs.py index a8ad101cfe7..23c4c6d0ae7 100644 --- a/tests/integrations/crossed_integrations/test_sqs.py +++ b/tests/integrations/crossed_integrations/test_sqs.py @@ -121,7 +121,6 @@ def test_produce(self): queue=self.WEBLOG_TO_BUDDY_QUEUE, ) - @missing_feature(library="golang", reason="Expected to fail, Golang does not propagate context") @missing_feature(library="ruby", reason="Expected to fail, Ruby does not propagate context") @missing_feature( library="java", @@ -188,7 +187,7 @@ def test_consume(self): queue=self.BUDDY_TO_WEBLOG_QUEUE, ) - @missing_feature(library="golang", reason="Expected to fail, Golang does not propagate context") + @missing_feature(library="golang", reason="Expected to fail, Golang does not extract context") @missing_feature(library="ruby", reason="Expected to fail, Ruby does not propagate context") @missing_feature(library="dotnet", reason="Expected to fail, Dotnet does not propagate context") def test_consume_trace_equality(self): @@ -267,12 +266,11 @@ class Test_SQS_PROPAGATION_VIA_AWS_XRAY_HEADERS(_Test_SQS): def test_consume(self): super().test_consume() - @missing_feature(library="golang", reason="Expected to fail, Golang does not propagate context") @missing_feature(library="ruby", reason="Expected to fail, Ruby does not propagate context") def test_produce_trace_equality(self): super().test_produce_trace_equality() - @missing_feature(library="golang", reason="Expected to fail, Golang does not propagate context") + @missing_feature(library="golang", reason="Expected to fail, Golang does not extract context") @missing_feature(library="ruby", reason="Expected to fail, Ruby does not propagate context") @missing_feature(library="python", reason="Expected to fail, Python does not propagate context") @missing_feature(library="nodejs", reason="Expected to fail, Nodejs does not propagate context") diff --git a/utils/build/docker/golang/app/net-http/aws/sns.go b/utils/build/docker/golang/app/net-http/aws/sns.go index 4e6c467c1cf..f31d0ed81e8 100644 --- a/utils/build/docker/golang/app/net-http/aws/sns.go +++ b/utils/build/docker/golang/app/net-http/aws/sns.go @@ -12,6 +12,7 @@ import ( "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sns" "github.com/aws/aws-sdk-go-v2/service/sqs" + awstrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/aws/aws-sdk-go-v2/aws" ) // SnsProduce The goal of this function is to trigger sns producer calls @@ -21,6 +22,7 @@ func SnsProduce(queue, topic, message string) (string, error) { return "", fmt.Errorf("[SNS->SQS] Error loading AWS configuration: %v", err) } + awstrace.AppendMiddleware(&cfg) snsClient := sns.NewFromConfig(cfg) sqsClient := sqs.NewFromConfig(cfg) @@ -106,12 +108,13 @@ func SnsProduce(queue, topic, message string) (string, error) { } // SnsConsume The goal of this function is to trigger sns consumer calls -func SnsConsume(queue, expectedMessage string, timeout int) (map[string]string, error) { +func SnsConsume(queue, expectedMessage string, timeout int) (string, error) { cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1")) if err != nil { - return nil, fmt.Errorf("[SNS->SQS] Error loading AWS configuration: %v", err) + return "", fmt.Errorf("[SNS->SQS] Error loading AWS configuration: %v", err) } + awstrace.AppendMiddleware(&cfg) sqsClient := sqs.NewFromConfig(cfg) queueURL := fmt.Sprintf("https://sqs.us-east-1.amazonaws.com/601427279990/%s", queue) @@ -132,7 +135,7 @@ func SnsConsume(queue, expectedMessage string, timeout int) (map[string]string, if *message.Body == expectedMessage { log.Printf("[SNS->SQS] Success. Found the following message: %s", *message.Body) - return map[string]string{"message": *message.Body}, nil + return *message.Body, nil } log.Printf("[SNS->SQS] Trying to decode raw message: %s", *message.Body) @@ -140,7 +143,7 @@ func SnsConsume(queue, expectedMessage string, timeout int) (map[string]string, if err := json.Unmarshal([]byte(*message.Body), &messageJSON); err == nil { if msg, ok := messageJSON["Message"].(string); ok && msg == expectedMessage { log.Printf("[SNS->SQS] Success. Found the following message: %s", msg) - return map[string]string{"message": msg}, nil + return msg, nil } } } @@ -148,5 +151,5 @@ func SnsConsume(queue, expectedMessage string, timeout int) (map[string]string, time.Sleep(time.Second) } - return map[string]string{"error": "[SNS->SQS] No messages to consume"}, nil + return "", fmt.Errorf("[SNS->SQS] No messages to consume") } diff --git a/utils/build/docker/golang/app/net-http/aws/sqs.go b/utils/build/docker/golang/app/net-http/aws/sqs.go index a0b94fe4fc0..3bf69ed61fc 100644 --- a/utils/build/docker/golang/app/net-http/aws/sqs.go +++ b/utils/build/docker/golang/app/net-http/aws/sqs.go @@ -6,6 +6,7 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sqs" + awstrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/aws/aws-sdk-go-v2/aws" "log" "time" ) @@ -17,6 +18,7 @@ func SqsProduce(queue, message string) (string, error) { return "", fmt.Errorf("[SQS] Error loading AWS configuration: %v", err) } + awstrace.AppendMiddleware(&cfg) sqsClient := sqs.NewFromConfig(cfg) // Create SQS queue @@ -45,12 +47,13 @@ func SqsProduce(queue, message string) (string, error) { } // SqsConsume The goal of this function is to trigger sqs consumer calls -func SqsConsume(queue, expectedMessage string, timeout int) (map[string]string, error) { +func SqsConsume(queue, expectedMessage string, timeout int) (string, error) { cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1")) if err != nil { - return nil, fmt.Errorf("[SQS] Error loading AWS configuration: %v", err) + return "", fmt.Errorf("[SQS] Error loading AWS configuration: %v", err) } + awstrace.AppendMiddleware(&cfg) sqsClient := sqs.NewFromConfig(cfg) queueURL := fmt.Sprintf("https://sqs.us-east-1.amazonaws.com/601427279990/%s", queue) @@ -70,12 +73,12 @@ func SqsConsume(queue, expectedMessage string, timeout int) (map[string]string, if *message.Body == expectedMessage { log.Printf("Consumed the following SQS message with params: %+v", message) log.Printf("Consumed the following SQS message: %s", *message.Body) - return map[string]string{"message": *message.Body}, nil + return *message.Body, nil } } time.Sleep(time.Second) } - return map[string]string{"error": "No messages to consume"}, nil + return "", fmt.Errorf("[SQS] No messages to consume") } diff --git a/utils/build/docker/golang/app/net-http/main.go b/utils/build/docker/golang/app/net-http/main.go index ce012474eaf..c3055e0dd0e 100644 --- a/utils/build/docker/golang/app/net-http/main.go +++ b/utils/build/docker/golang/app/net-http/main.go @@ -532,7 +532,7 @@ func main() { result, err := awsHelpers.SnsProduce(queue, topic, message) if err != nil { w.WriteHeader(http.StatusBadRequest) - json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) + w.Write([]byte(fmt.Sprintf(`{"error": "%s"}`, err.Error()))) return } @@ -556,17 +556,13 @@ func main() { result, err := awsHelpers.SnsConsume(queue, expectedMessage, timeout) if err != nil { - w.WriteHeader(http.StatusInternalServerError) + w.WriteHeader(http.StatusBadRequest) json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) return } - if _, hasError := result["error"]; hasError { - w.WriteHeader(http.StatusBadRequest) - } else { - w.WriteHeader(http.StatusOK) - } - json.NewEncoder(w).Encode(result) + w.WriteHeader(http.StatusOK) + w.Write([]byte(fmt.Sprintf(`{"message": "%s"}`, result))) }) mux.HandleFunc("/sqs/produce", func(w http.ResponseWriter, r *http.Request) { @@ -582,7 +578,7 @@ func main() { result, err := awsHelpers.SqsProduce(queue, message) if err != nil { w.WriteHeader(http.StatusBadRequest) - json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) + w.Write([]byte(fmt.Sprintf(`{"error": "%s"}`, err.Error()))) return } @@ -607,16 +603,12 @@ func main() { result, err := awsHelpers.SqsConsume(queue, expectedMessage, timeout) if err != nil { w.WriteHeader(http.StatusBadRequest) - json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) + w.Write([]byte(fmt.Sprintf(`{"error": "%s"}`, result))) return } - if _, hasError := result["error"]; hasError { - w.WriteHeader(http.StatusBadRequest) - } else { - w.WriteHeader(http.StatusOK) - } - json.NewEncoder(w).Encode(result) + w.WriteHeader(http.StatusOK) + w.Write([]byte(fmt.Sprintf(`{"message": "%s"}`, result))) }) common.InitDatadog()