Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions manifests/golang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -396,14 +396,14 @@ tests/:
test_sns_to_sqs.py:
Test_SNS_Propagation:
"*": irrelevant
net-http: missing_feature
net-http: v0.1 # real version not known
test_sqs.py:
Test_SQS_PROPAGATION_VIA_AWS_XRAY_HEADERS:
"*": irrelevant
net-http: missing_feature (Endpoint not implemented)
net-http: v0.1 # real version not known
Test_SQS_PROPAGATION_VIA_MESSAGE_ATTRIBUTES:
"*": irrelevant
net-http: missing_feature (Endpoint not implemented)
net-http: v0.1 # real version not known
test_db_integrations_sql.py:
Test_MsSql: missing_feature
Test_MySql: missing_feature
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ def test_produce(self):
topic=self.WEBLOG_TO_BUDDY_TOPIC,
)

@missing_feature(library="golang", reason="Expected to fail, Golang does not propagate context")
@missing_feature(library="ruby", reason="Expected to fail, Ruby does not propagate context")
def test_produce_trace_equality(self):
"""This test relies on the setup for produce, it currently cannot be run on its own"""
Expand Down Expand Up @@ -205,7 +204,7 @@ def test_consume(self):
topic=self.BUDDY_TO_WEBLOG_TOPIC,
)

@missing_feature(library="golang", reason="Expected to fail, Golang does not propagate context")
@missing_feature(library="golang", reason="Expected to fail, Golang does not extract context")
@missing_feature(library="ruby", reason="Expected to fail, Ruby does not propagate context")
def test_consume_trace_equality(self):
"""This test relies on the setup for consume, it currently cannot be run on its own"""
Expand Down
6 changes: 2 additions & 4 deletions tests/integrations/crossed_integrations/test_sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ def test_produce(self):
queue=self.WEBLOG_TO_BUDDY_QUEUE,
)

@missing_feature(library="golang", reason="Expected to fail, Golang does not propagate context")
@missing_feature(library="ruby", reason="Expected to fail, Ruby does not propagate context")
@missing_feature(
library="java",
Expand Down Expand Up @@ -188,7 +187,7 @@ def test_consume(self):
queue=self.BUDDY_TO_WEBLOG_QUEUE,
)

@missing_feature(library="golang", reason="Expected to fail, Golang does not propagate context")
@missing_feature(library="golang", reason="Expected to fail, Golang does not extract context")
@missing_feature(library="ruby", reason="Expected to fail, Ruby does not propagate context")
@missing_feature(library="dotnet", reason="Expected to fail, Dotnet does not propagate context")
def test_consume_trace_equality(self):
Expand Down Expand Up @@ -267,12 +266,11 @@ class Test_SQS_PROPAGATION_VIA_AWS_XRAY_HEADERS(_Test_SQS):
def test_consume(self):
super().test_consume()

@missing_feature(library="golang", reason="Expected to fail, Golang does not propagate context")
@missing_feature(library="ruby", reason="Expected to fail, Ruby does not propagate context")
def test_produce_trace_equality(self):
super().test_produce_trace_equality()

@missing_feature(library="golang", reason="Expected to fail, Golang does not propagate context")
@missing_feature(library="golang", reason="Expected to fail, Golang does not extract context")
@missing_feature(library="ruby", reason="Expected to fail, Ruby does not propagate context")
@missing_feature(library="python", reason="Expected to fail, Python does not propagate context")
@missing_feature(library="nodejs", reason="Expected to fail, Nodejs does not propagate context")
Expand Down
155 changes: 155 additions & 0 deletions utils/build/docker/golang/app/net-http/aws/sns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package aws

import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/aws/aws-sdk-go-v2/service/sqs"
awstrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/aws/aws-sdk-go-v2/aws"
)

// SnsProduce The goal of this function is to trigger sns producer calls
func SnsProduce(queue, topic, message string) (string, error) {
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1"))
if err != nil {
return "", fmt.Errorf("[SNS->SQS] Error loading AWS configuration: %v", err)
}

awstrace.AppendMiddleware(&cfg)
snsClient := sns.NewFromConfig(cfg)
sqsClient := sqs.NewFromConfig(cfg)

// Create SNS topic
topicResult, err := snsClient.CreateTopic(context.TODO(), &sns.CreateTopicInput{
Name: aws.String(topic),
})
if err != nil {
return "", fmt.Errorf("[SNS->SQS] Error during Go SNS create topic: %v", err)
}
topicArn := *topicResult.TopicArn

// Create SQS queue
queueResult, err := sqsClient.CreateQueue(context.TODO(), &sqs.CreateQueueInput{
QueueName: aws.String(queue),
})
if err != nil {
return "", fmt.Errorf("[SNS->SQS] Error during Go SQS create queue: %v", err)
}
queueUrl := *queueResult.QueueUrl

// Get queue ARN
urlParts := strings.Split(queueUrl, "/")
queueArn := fmt.Sprintf("arn:aws:sqs:%s:%s:%s", "us-east-1", urlParts[len(urlParts)-2], urlParts[len(urlParts)-1])

// Set queue policy
policy := map[string]interface{}{
"Version": "2012-10-17",
"Id": fmt.Sprintf("%s/SQSDefaultPolicy", queueArn),
"Statement": []map[string]interface{}{
{
"Sid": "Allow-SNS-SendMessage",
"Effect": "Allow",
"Principal": map[string]string{
"Service": "sns.amazonaws.com",
},
"Action": "sqs:SendMessage",
"Resource": queueArn,
"Condition": map[string]interface{}{
"ArnEquals": map[string]string{
"aws:SourceArn": topicArn,
},
},
},
},
}
policyJSON, _ := json.Marshal(policy)
_, err = sqsClient.SetQueueAttributes(context.TODO(), &sqs.SetQueueAttributesInput{
QueueUrl: aws.String(queueUrl),
Attributes: map[string]string{
"Policy": string(policyJSON),
},
})
if err != nil {
return "", fmt.Errorf("[SNS->SQS] Error setting queue policy: %v", err)
}

// Subscribe SQS to SNS
_, err = snsClient.Subscribe(context.TODO(), &sns.SubscribeInput{
TopicArn: aws.String(topicArn),
Protocol: aws.String("sqs"),
Endpoint: aws.String(queueArn),
Attributes: map[string]string{"RawMessageDelivery": "true"},
})
if err != nil {
return "", fmt.Errorf("[SNS->SQS] Error subscribing SQS to SNS: %v", err)
}

log.Printf("[SNS->SQS] Created SNS Topic: %s and SQS Queue: %s", topic, queue)

// Publish message to SNS topic
_, err = snsClient.Publish(context.TODO(), &sns.PublishInput{
Message: aws.String(message),
TopicArn: aws.String(topicArn),
})
if err != nil {
log.Printf("[SNS->SQS] Error during Go SNS publish message: %v", err)
return "", fmt.Errorf("[SNS->SQS] Error during Go SNS publish message: %v", err)
}

log.Printf("[SNS->SQS] Go SNS message published successfully")
return "SNS Produce ok", nil
}

// SnsConsume The goal of this function is to trigger sns consumer calls
func SnsConsume(queue, expectedMessage string, timeout int) (string, error) {
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1"))
if err != nil {
return "", fmt.Errorf("[SNS->SQS] Error loading AWS configuration: %v", err)
}

awstrace.AppendMiddleware(&cfg)
sqsClient := sqs.NewFromConfig(cfg)

queueURL := fmt.Sprintf("https://sqs.us-east-1.amazonaws.com/601427279990/%s", queue)
startTime := time.Now()

for time.Since(startTime) < time.Duration(timeout)*time.Second {
output, err := sqsClient.ReceiveMessage(context.TODO(), &sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
})

if err != nil {
log.Printf("[SNS->SQS] %v", err)
continue
}

for _, message := range output.Messages {
log.Printf("[SNS->SQS] Consumed: %+v", message)

if *message.Body == expectedMessage {
log.Printf("[SNS->SQS] Success. Found the following message: %s", *message.Body)
return *message.Body, nil
}

log.Printf("[SNS->SQS] Trying to decode raw message: %s", *message.Body)
var messageJSON map[string]interface{}
if err := json.Unmarshal([]byte(*message.Body), &messageJSON); err == nil {
if msg, ok := messageJSON["Message"].(string); ok && msg == expectedMessage {
log.Printf("[SNS->SQS] Success. Found the following message: %s", msg)
return msg, nil
}
}
}

time.Sleep(time.Second)
}

return "", fmt.Errorf("[SNS->SQS] No messages to consume")
}
84 changes: 84 additions & 0 deletions utils/build/docker/golang/app/net-http/aws/sqs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package aws

import (
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
awstrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/aws/aws-sdk-go-v2/aws"
"log"
"time"
)

// SqsProduce The goal of this function is to trigger sqs producer calls
func SqsProduce(queue, message string) (string, error) {
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1"))
if err != nil {
return "", fmt.Errorf("[SQS] Error loading AWS configuration: %v", err)
}

awstrace.AppendMiddleware(&cfg)
sqsClient := sqs.NewFromConfig(cfg)

// Create SQS queue
_, err = sqsClient.CreateQueue(context.TODO(), &sqs.CreateQueueInput{
QueueName: aws.String(queue),
})
if err != nil {
log.Printf("[SQS] Error during Go SQS create queue: %v", err)
} else {
log.Printf("[SQS] Created SQS Queue with name: %s", queue)
}

// Send message to SQS queue
queueURL := fmt.Sprintf("https://sqs.us-east-1.amazonaws.com/601427279990/%s", queue)
_, err = sqsClient.SendMessage(context.TODO(), &sqs.SendMessageInput{
QueueUrl: aws.String(queueURL),
MessageBody: aws.String(message),
})
if err != nil {
log.Printf("[SQS] Error during Go SQS send message: %v", err)
return "", fmt.Errorf("[SQS] Error during Go SQS send message: %v", err)
}

log.Printf("[SQS] Go SQS message sent successfully")
return "SQS Produce ok", nil
}

// SqsConsume The goal of this function is to trigger sqs consumer calls
func SqsConsume(queue, expectedMessage string, timeout int) (string, error) {
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1"))
if err != nil {
return "", fmt.Errorf("[SQS] Error loading AWS configuration: %v", err)
}

awstrace.AppendMiddleware(&cfg)
sqsClient := sqs.NewFromConfig(cfg)

queueURL := fmt.Sprintf("https://sqs.us-east-1.amazonaws.com/601427279990/%s", queue)
startTime := time.Now()

for time.Since(startTime) < time.Duration(timeout)*time.Second {
output, err := sqsClient.ReceiveMessage(context.TODO(), &sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
})

if err != nil {
log.Printf("[SQS] Error receiving message: %v", err)
continue
}

for _, message := range output.Messages {
if *message.Body == expectedMessage {
log.Printf("Consumed the following SQS message with params: %+v", message)
log.Printf("Consumed the following SQS message: %s", *message.Body)
return *message.Body, nil
}
}

time.Sleep(time.Second)
}

return "", fmt.Errorf("[SQS] No messages to consume")
}
Loading
Loading