From ebb045880076c01ceb289b64bf49dc6bdb12a558 Mon Sep 17 00:00:00 2001 From: Alexander McKinlay Date: Wed, 18 Jun 2025 14:22:08 +1000 Subject: [PATCH 01/13] handle array body in SQSSend --- proxy/sqs.go | 68 ++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 58 insertions(+), 10 deletions(-) diff --git a/proxy/sqs.go b/proxy/sqs.go index b7b5c01..9ed2225 100644 --- a/proxy/sqs.go +++ b/proxy/sqs.go @@ -3,6 +3,7 @@ package main import ( "context" "encoding/hex" + "encoding/json" "fmt" "io" "io/ioutil" @@ -18,6 +19,7 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" awsconfig "github.com/aws/aws-sdk-go-v2/config" sqs "github.com/aws/aws-sdk-go-v2/service/sqs" + sqstypes "github.com/aws/aws-sdk-go-v2/service/sqs/types" ) var sqsARNPat = regexp.MustCompile(`^arn:aws:sqs:([^:]+):([^:]+):(.+)$`) @@ -168,17 +170,63 @@ func handleSQSSend(w http.ResponseWriter, r *http.Request) { } sqsCl := sqs.NewFromConfig(c) - if _, err := sqsCl.SendMessage(context.Background(), &sqs.SendMessageInput{ - MessageBody: aws.String(string(body)), - QueueUrl: aws.String(qURL), - MessageGroupId: groupID, - }); err != nil { - log.Printf("error sending SQS message: %v", err) - http.Error(w, fmt.Sprintf("Error sending SQS message: %v", err), http.StatusInternalServerError) - return - } + // Check if body starts with '[' to detect JSON array (multiple messages) + if len(body) > 0 && body[0] == '[' { + // Multiple messages - parse and use batch send + var messages []json.RawMessage + if err := json.Unmarshal(body, &messages); err != nil { + http.Error(w, "Invalid JSON array", http.StatusBadRequest) + return + } + + if len(messages) == 0 { + http.Error(w, "Empty message array", http.StatusBadRequest) + return + } + + // Build entries for batch send (split into groups of 10) + var allEntries []sqstypes.SendMessageBatchRequestEntry + for i, msg := range messages { + allEntries = append(allEntries, sqstypes.SendMessageBatchRequestEntry{ + Id: aws.String(fmt.Sprintf("msg-%d", i)), + MessageBody: aws.String(string(msg)), + MessageGroupId: groupID, + }) + } + + // Send in batches of 10 (SQS limit) + for i := 0; i < len(allEntries); i += 10 { + end := i + 10 + if end > len(allEntries) { + end = len(allEntries) + } + batch := allEntries[i:end] + + if _, err := sqsCl.SendMessageBatch(context.Background(), &sqs.SendMessageBatchInput{ + QueueUrl: aws.String(qURL), + Entries: batch, + }); err != nil { + log.Printf("error sending SQS message batch: %v", err) + http.Error(w, fmt.Sprintf("Error sending SQS message batch: %v", err), http.StatusInternalServerError) + return + } + } - log.Printf("sent an SQS message to '%s'", qURL) + log.Printf("sent %d SQS messages to '%s'", len(messages), qURL) + } else { + // Single message - use regular send + if _, err := sqsCl.SendMessage(context.Background(), &sqs.SendMessageInput{ + MessageBody: aws.String(string(body)), + QueueUrl: aws.String(qURL), + MessageGroupId: groupID, + }); err != nil { + log.Printf("error sending SQS message: %v", err) + http.Error(w, fmt.Sprintf("Error sending SQS message: %v", err), http.StatusInternalServerError) + return + } + + log.Printf("sent an SQS message to '%s'", qURL) + } } From 5d333e2a99931969bbb2669451fb0868b251658d Mon Sep 17 00:00:00 2001 From: Alexander McKinlay Date: Wed, 18 Jun 2025 14:22:42 +1000 Subject: [PATCH 02/13] update example-spec for array body in handleSQSSend --- example-spec.yaml | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/example-spec.yaml b/example-spec.yaml index 4bd3a55..11c8424 100644 --- a/example-spec.yaml +++ b/example-spec.yaml @@ -56,9 +56,11 @@ role: generate # the starenv derefers, lambdafy adds the following derefers: # # - lambdafy_sqs_send: This derefer will be replaced with a URL which when POSTed -# to will send a message to the SQS queue whose ARN is specified. The body -# of the POST will be sent as the SQS message body. If header -# 'Lambdafy-SQS-Group-Id' is set, it will be used as Group ID for the +# to will send a message to the SQS queue whose ARN is specified. This accepts +# either a JSON array of messages or a single message. If an array, the body +# of the POST will be split into batches and sent as entries in SQS send message batch. +# Otherwise, if a single messsage, the body of the POST will be sent as the SQS message body. +# If header 'Lambdafy-SQS-Group-Id' is set, it will be used as Group ID for the # message. A 2xx/3xx response is considered a success, otherwise a fail. See # the example below for usage. # Note: The necessary IAM role permissions to send SQS messages are added From 86ed16f33dbdcd67fc2101dd327ae02d35b09b11 Mon Sep 17 00:00:00 2001 From: AlexEmSiKay Date: Mon, 23 Jun 2025 15:24:13 +1000 Subject: [PATCH 03/13] use Header to detect batch messagea and rigorously parse Content-Type --- proxy/sqs.go | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/proxy/sqs.go b/proxy/sqs.go index 9ed2225..2864f1c 100644 --- a/proxy/sqs.go +++ b/proxy/sqs.go @@ -9,6 +9,7 @@ import ( "io/ioutil" "log" "math/rand" + "mime" "net/http" "net/url" "regexp" @@ -130,10 +131,13 @@ func (d sqsSendDerefer) Deref(arn string) (string, error) { var sqsIDToQueueURL = sqsSendDerefer{} const sqsGroupIDHeader = "Lambdafy-SQS-Group-Id" +const batchMessageHeader = "Lambdafy-SQS-Batch-Message" // handleSQSSend handles HTTP POST requests and translates them to SQS send // message. // Lambdafy-SQS-Group-Id header is used to set the message group ID. +// Lambdafy-SQS-Batch-Message header is used to indicate that the request body +// contains a JSON array of messages to be sent in a batch. func handleSQSSend(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) @@ -162,6 +166,24 @@ func handleSQSSend(w http.ResponseWriter, r *http.Request) { groupID = &g } + isBatchMessage := r.Header.Get(batchMessageHeader) != "" + // Check if the Content-Type media type is application/json + // instead of direct string equality check, as it may contain additional parameters. + if isBatchMessage { + contentType := r.Header.Get("Content-Type") + mediaType, _, err := mime.ParseMediaType(contentType) + + if err != nil { + log.Printf("error parsing Content-Type header: %v", err) + http.Error(w, fmt.Sprintf("Error parsing Content-Type header: %v", err), http.StatusBadRequest) + return + } + if mediaType != "application/json" { + http.Error(w, "Content-Type must be application/json for batch messages", http.StatusBadRequest) + return + } + } + c, err := awsconfig.LoadDefaultConfig(context.Background()) if err != nil { log.Printf("error loading AWS config: %v", err) @@ -170,9 +192,7 @@ func handleSQSSend(w http.ResponseWriter, r *http.Request) { } sqsCl := sqs.NewFromConfig(c) - // Check if body starts with '[' to detect JSON array (multiple messages) - if len(body) > 0 && body[0] == '[' { - // Multiple messages - parse and use batch send + if len(body) > 0 && isBatchMessage { var messages []json.RawMessage if err := json.Unmarshal(body, &messages); err != nil { http.Error(w, "Invalid JSON array", http.StatusBadRequest) From 867c53f64b66e1f6dde9de5921f375b15641becb Mon Sep 17 00:00:00 2001 From: AlexEmSiKay Date: Mon, 23 Jun 2025 15:26:36 +1000 Subject: [PATCH 04/13] Expect a slice of strings from unmarshalling the batched messages --- proxy/sqs.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/proxy/sqs.go b/proxy/sqs.go index 2864f1c..ab1670f 100644 --- a/proxy/sqs.go +++ b/proxy/sqs.go @@ -193,7 +193,7 @@ func handleSQSSend(w http.ResponseWriter, r *http.Request) { sqsCl := sqs.NewFromConfig(c) if len(body) > 0 && isBatchMessage { - var messages []json.RawMessage + var messages []string if err := json.Unmarshal(body, &messages); err != nil { http.Error(w, "Invalid JSON array", http.StatusBadRequest) return @@ -209,7 +209,7 @@ func handleSQSSend(w http.ResponseWriter, r *http.Request) { for i, msg := range messages { allEntries = append(allEntries, sqstypes.SendMessageBatchRequestEntry{ Id: aws.String(fmt.Sprintf("msg-%d", i)), - MessageBody: aws.String(string(msg)), + MessageBody: aws.String(msg), MessageGroupId: groupID, }) } From 6f4af4f87101988f03cef3963877d586ada0445f Mon Sep 17 00:00:00 2001 From: AlexEmSiKay Date: Mon, 23 Jun 2025 15:28:54 +1000 Subject: [PATCH 05/13] just use range index for message ids --- proxy/sqs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy/sqs.go b/proxy/sqs.go index ab1670f..39f6cbf 100644 --- a/proxy/sqs.go +++ b/proxy/sqs.go @@ -208,7 +208,7 @@ func handleSQSSend(w http.ResponseWriter, r *http.Request) { var allEntries []sqstypes.SendMessageBatchRequestEntry for i, msg := range messages { allEntries = append(allEntries, sqstypes.SendMessageBatchRequestEntry{ - Id: aws.String(fmt.Sprintf("msg-%d", i)), + Id: aws.String(fmt.Sprintf("%d", i)), MessageBody: aws.String(msg), MessageGroupId: groupID, }) From 2580855274d18bb11540a4f486ccccc183cab19f Mon Sep 17 00:00:00 2001 From: AlexEmSiKay Date: Mon, 23 Jun 2025 15:30:25 +1000 Subject: [PATCH 06/13] define maxSQSBatchSize const --- proxy/sqs.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/proxy/sqs.go b/proxy/sqs.go index 39f6cbf..1809d19 100644 --- a/proxy/sqs.go +++ b/proxy/sqs.go @@ -23,6 +23,8 @@ import ( sqstypes "github.com/aws/aws-sdk-go-v2/service/sqs/types" ) +const maxSQSBatchSize = 10 // SQS allows a maximum of 10 messages per batch + var sqsARNPat = regexp.MustCompile(`^arn:aws:sqs:([^:]+):([^:]+):(.+)$`) // getSQSQueueURL returns the URL of the SQS queue given its ARN. @@ -214,9 +216,8 @@ func handleSQSSend(w http.ResponseWriter, r *http.Request) { }) } - // Send in batches of 10 (SQS limit) - for i := 0; i < len(allEntries); i += 10 { - end := i + 10 + for i := 0; i < len(allEntries); i += maxSQSBatchSize { + end := i + maxSQSBatchSize if end > len(allEntries) { end = len(allEntries) } From 1a175c49302fa1125d1b3dbf4794450f7020dc7a Mon Sep 17 00:00:00 2001 From: AlexEmSiKay Date: Mon, 23 Jun 2025 15:47:18 +1000 Subject: [PATCH 07/13] construct entries in loop to save memory --- proxy/sqs.go | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/proxy/sqs.go b/proxy/sqs.go index 1809d19..6dda990 100644 --- a/proxy/sqs.go +++ b/proxy/sqs.go @@ -206,26 +206,24 @@ func handleSQSSend(w http.ResponseWriter, r *http.Request) { return } - // Build entries for batch send (split into groups of 10) - var allEntries []sqstypes.SendMessageBatchRequestEntry - for i, msg := range messages { - allEntries = append(allEntries, sqstypes.SendMessageBatchRequestEntry{ - Id: aws.String(fmt.Sprintf("%d", i)), - MessageBody: aws.String(msg), - MessageGroupId: groupID, - }) - } - - for i := 0; i < len(allEntries); i += maxSQSBatchSize { + for i := 0; i < len(messages); i += maxSQSBatchSize { end := i + maxSQSBatchSize - if end > len(allEntries) { - end = len(allEntries) + if end > len(messages) { + end = len(messages) + } + messageBatch := messages[i:end] + entryBatch := make([]sqstypes.SendMessageBatchRequestEntry, len(messageBatch)) + for j, msg := range messageBatch { + entryBatch[j] = sqstypes.SendMessageBatchRequestEntry{ + Id: aws.String(fmt.Sprintf("%d", j)), + MessageBody: aws.String(msg), + MessageGroupId: groupID, + } } - batch := allEntries[i:end] if _, err := sqsCl.SendMessageBatch(context.Background(), &sqs.SendMessageBatchInput{ QueueUrl: aws.String(qURL), - Entries: batch, + Entries: entryBatch, }); err != nil { log.Printf("error sending SQS message batch: %v", err) http.Error(w, fmt.Sprintf("Error sending SQS message batch: %v", err), http.StatusInternalServerError) From 3271606e6765fcdb7c0c217d61ffe29ea07b6f84 Mon Sep 17 00:00:00 2001 From: AlexEmSiKay Date: Mon, 23 Jun 2025 15:59:57 +1000 Subject: [PATCH 08/13] use an early return on single message send instead of if/else --- proxy/sqs.go | 117 ++++++++++++++++++++++++++------------------------- 1 file changed, 59 insertions(+), 58 deletions(-) diff --git a/proxy/sqs.go b/proxy/sqs.go index 6dda990..ae21b3c 100644 --- a/proxy/sqs.go +++ b/proxy/sqs.go @@ -168,24 +168,6 @@ func handleSQSSend(w http.ResponseWriter, r *http.Request) { groupID = &g } - isBatchMessage := r.Header.Get(batchMessageHeader) != "" - // Check if the Content-Type media type is application/json - // instead of direct string equality check, as it may contain additional parameters. - if isBatchMessage { - contentType := r.Header.Get("Content-Type") - mediaType, _, err := mime.ParseMediaType(contentType) - - if err != nil { - log.Printf("error parsing Content-Type header: %v", err) - http.Error(w, fmt.Sprintf("Error parsing Content-Type header: %v", err), http.StatusBadRequest) - return - } - if mediaType != "application/json" { - http.Error(w, "Content-Type must be application/json for batch messages", http.StatusBadRequest) - return - } - } - c, err := awsconfig.LoadDefaultConfig(context.Background()) if err != nil { log.Printf("error loading AWS config: %v", err) @@ -194,46 +176,9 @@ func handleSQSSend(w http.ResponseWriter, r *http.Request) { } sqsCl := sqs.NewFromConfig(c) - if len(body) > 0 && isBatchMessage { - var messages []string - if err := json.Unmarshal(body, &messages); err != nil { - http.Error(w, "Invalid JSON array", http.StatusBadRequest) - return - } - - if len(messages) == 0 { - http.Error(w, "Empty message array", http.StatusBadRequest) - return - } - - for i := 0; i < len(messages); i += maxSQSBatchSize { - end := i + maxSQSBatchSize - if end > len(messages) { - end = len(messages) - } - messageBatch := messages[i:end] - entryBatch := make([]sqstypes.SendMessageBatchRequestEntry, len(messageBatch)) - for j, msg := range messageBatch { - entryBatch[j] = sqstypes.SendMessageBatchRequestEntry{ - Id: aws.String(fmt.Sprintf("%d", j)), - MessageBody: aws.String(msg), - MessageGroupId: groupID, - } - } - - if _, err := sqsCl.SendMessageBatch(context.Background(), &sqs.SendMessageBatchInput{ - QueueUrl: aws.String(qURL), - Entries: entryBatch, - }); err != nil { - log.Printf("error sending SQS message batch: %v", err) - http.Error(w, fmt.Sprintf("Error sending SQS message batch: %v", err), http.StatusInternalServerError) - return - } - } - - log.Printf("sent %d SQS messages to '%s'", len(messages), qURL) - } else { - // Single message - use regular send + isBatchMessage := r.Header.Get(batchMessageHeader) != "" + // Single message - use regular send + if !isBatchMessage { if _, err := sqsCl.SendMessage(context.Background(), &sqs.SendMessageInput{ MessageBody: aws.String(string(body)), QueueUrl: aws.String(qURL), @@ -245,8 +190,64 @@ func handleSQSSend(w http.ResponseWriter, r *http.Request) { } log.Printf("sent an SQS message to '%s'", qURL) + return + } + + // Batch send message - expect the correct Content-Type and + // a JSON array of string messages in the request body + + // Check if the Content-Type media type is application/json + // instead of direct string equality check, as it may contain additional parameters. + contentType := r.Header.Get("Content-Type") + mediaType, _, err := mime.ParseMediaType(contentType) + + if err != nil { + log.Printf("error parsing Content-Type header: %v", err) + http.Error(w, fmt.Sprintf("Error parsing Content-Type header: %v", err), http.StatusBadRequest) + return + } + if mediaType != "application/json" { + http.Error(w, "Content-Type must be application/json for batch messages", http.StatusBadRequest) + return + } + + var messages []string + if err := json.Unmarshal(body, &messages); err != nil { + http.Error(w, "Invalid JSON array", http.StatusBadRequest) + return + } + + if len(messages) == 0 { + http.Error(w, "Empty message array", http.StatusBadRequest) + return + } + + for i := 0; i < len(messages); i += maxSQSBatchSize { + end := i + maxSQSBatchSize + if end > len(messages) { + end = len(messages) + } + messageBatch := messages[i:end] + entryBatch := make([]sqstypes.SendMessageBatchRequestEntry, len(messageBatch)) + for j, msg := range messageBatch { + entryBatch[j] = sqstypes.SendMessageBatchRequestEntry{ + Id: aws.String(fmt.Sprintf("%d", j)), + MessageBody: aws.String(msg), + MessageGroupId: groupID, + } + } + + if _, err := sqsCl.SendMessageBatch(context.Background(), &sqs.SendMessageBatchInput{ + QueueUrl: aws.String(qURL), + Entries: entryBatch, + }); err != nil { + log.Printf("error sending SQS message batch: %v", err) + http.Error(w, fmt.Sprintf("Error sending SQS message batch: %v", err), http.StatusInternalServerError) + return + } } + log.Printf("sent %d SQS messages to '%s'", len(messages), qURL) } const sendSQSStarenvTag = "lambdafy_sqs_send" From b71fcfddda10eff25fc2a6b066dd92cd5073bf99 Mon Sep 17 00:00:00 2001 From: AlexEmSiKay Date: Tue, 24 Jun 2025 11:51:20 +1000 Subject: [PATCH 09/13] limit sqs batch call to max 10 messages --- proxy/sqs.go | 39 ++++++++++++++++++--------------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/proxy/sqs.go b/proxy/sqs.go index ae21b3c..ea1cdfb 100644 --- a/proxy/sqs.go +++ b/proxy/sqs.go @@ -221,30 +221,27 @@ func handleSQSSend(w http.ResponseWriter, r *http.Request) { http.Error(w, "Empty message array", http.StatusBadRequest) return } + if len(messages) > maxSQSBatchSize { + http.Error(w, fmt.Sprintf("Too many messages in batch, maximum is %d", maxSQSBatchSize), http.StatusBadRequest) + return + } - for i := 0; i < len(messages); i += maxSQSBatchSize { - end := i + maxSQSBatchSize - if end > len(messages) { - end = len(messages) - } - messageBatch := messages[i:end] - entryBatch := make([]sqstypes.SendMessageBatchRequestEntry, len(messageBatch)) - for j, msg := range messageBatch { - entryBatch[j] = sqstypes.SendMessageBatchRequestEntry{ - Id: aws.String(fmt.Sprintf("%d", j)), - MessageBody: aws.String(msg), - MessageGroupId: groupID, - } + entries := make([]sqstypes.SendMessageBatchRequestEntry, len(messages)) + for j, msg := range messages { + entries[j] = sqstypes.SendMessageBatchRequestEntry{ + Id: aws.String(fmt.Sprintf("%d", j)), + MessageBody: aws.String(msg), + MessageGroupId: groupID, } + } - if _, err := sqsCl.SendMessageBatch(context.Background(), &sqs.SendMessageBatchInput{ - QueueUrl: aws.String(qURL), - Entries: entryBatch, - }); err != nil { - log.Printf("error sending SQS message batch: %v", err) - http.Error(w, fmt.Sprintf("Error sending SQS message batch: %v", err), http.StatusInternalServerError) - return - } + if _, err := sqsCl.SendMessageBatch(context.Background(), &sqs.SendMessageBatchInput{ + QueueUrl: aws.String(qURL), + Entries: entries, + }); err != nil { + log.Printf("error sending SQS message batch: %v", err) + http.Error(w, fmt.Sprintf("Error sending SQS message batch: %v", err), http.StatusInternalServerError) + return } log.Printf("sent %d SQS messages to '%s'", len(messages), qURL) From 0895201bd3b27c7eb71d1c7f4e48355814ece14f Mon Sep 17 00:00:00 2001 From: AlexEmSiKay Date: Tue, 24 Jun 2025 12:14:42 +1000 Subject: [PATCH 10/13] retries and logging for partial sqs batch send message failures --- proxy/sqs.go | 47 +++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 41 insertions(+), 6 deletions(-) diff --git a/proxy/sqs.go b/proxy/sqs.go index ea1cdfb..856f2c5 100644 --- a/proxy/sqs.go +++ b/proxy/sqs.go @@ -235,12 +235,47 @@ func handleSQSSend(w http.ResponseWriter, r *http.Request) { } } - if _, err := sqsCl.SendMessageBatch(context.Background(), &sqs.SendMessageBatchInput{ - QueueUrl: aws.String(qURL), - Entries: entries, - }); err != nil { - log.Printf("error sending SQS message batch: %v", err) - http.Error(w, fmt.Sprintf("Error sending SQS message batch: %v", err), http.StatusInternalServerError) + var attempts int = 0 + var retryable_entries []sqstypes.SendMessageBatchRequestEntry = entries + var nonRetryableEntries []sqstypes.SendMessageBatchRequestEntry = nil + + for (attempts == 0 || len(retryable_entries) > 0) && attempts < 5 { + attempts++ + if output, err := sqsCl.SendMessageBatch(context.Background(), &sqs.SendMessageBatchInput{ + QueueUrl: aws.String(qURL), + Entries: retryable_entries, + }); err != nil { + log.Printf("error sending SQS message batch: %v", err) + http.Error(w, fmt.Sprintf("Error sending SQS message batch: %v", err), http.StatusInternalServerError) + return + } else if len(output.Failed) > 0 { + retryable_entries = nil // Reset retryable entries for the next attempt + log.Printf("failed to send %d SQS messages in batch", len(output.Failed)) + for _, f := range output.Failed { + fmt.Printf( + "failed to send SQS message %s: %s (SenderFault: %t, Code: %s)\n", + *f.Id, *f.Message, f.SenderFault, *f.Code, + ) + id, err := strconv.Atoi(*f.Id) + if err != nil { + log.Printf("error parsing SQS message ID '%s': %v", *f.Id, err) + http.Error(w, fmt.Sprintf("Error parsing SQS message ID '%s': %v", *f.Id, err), http.StatusInternalServerError) + return + } + if f.SenderFault { + // Non-retryable error + nonRetryableEntries = append(nonRetryableEntries, entries[id]) + } else { + // Retryable error + retryable_entries = append(retryable_entries, entries[id]) + } + } + } + } + + if len(retryable_entries)+len(nonRetryableEntries) > 0 { + log.Printf("%d of %d SQS messages in batch failed", len(retryable_entries)+len(nonRetryableEntries), len(entries)) + http.Error(w, fmt.Sprintf("%d of %d SQS messages in batch failed", len(retryable_entries)+len(nonRetryableEntries), len(entries)), http.StatusInternalServerError) return } From a05a5acee90d45ab334dfe625cd1282f972b0116 Mon Sep 17 00:00:00 2001 From: AlexEmSiKay Date: Wed, 25 Jun 2025 10:04:29 +1000 Subject: [PATCH 11/13] fix retryable entries reset --- proxy/sqs.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/proxy/sqs.go b/proxy/sqs.go index 856f2c5..d3347e9 100644 --- a/proxy/sqs.go +++ b/proxy/sqs.go @@ -241,15 +241,17 @@ func handleSQSSend(w http.ResponseWriter, r *http.Request) { for (attempts == 0 || len(retryable_entries) > 0) && attempts < 5 { attempts++ - if output, err := sqsCl.SendMessageBatch(context.Background(), &sqs.SendMessageBatchInput{ + output, err := sqsCl.SendMessageBatch(context.Background(), &sqs.SendMessageBatchInput{ QueueUrl: aws.String(qURL), Entries: retryable_entries, - }); err != nil { + }) + if err != nil { log.Printf("error sending SQS message batch: %v", err) http.Error(w, fmt.Sprintf("Error sending SQS message batch: %v", err), http.StatusInternalServerError) return - } else if len(output.Failed) > 0 { - retryable_entries = nil // Reset retryable entries for the next attempt + } + retryable_entries = nil // Reset retryable entries for the next attempt + if len(output.Failed) > 0 { log.Printf("failed to send %d SQS messages in batch", len(output.Failed)) for _, f := range output.Failed { fmt.Printf( From 32fe950913f58f35820326377378f77e4ded6403 Mon Sep 17 00:00:00 2001 From: AlexEmSiKay Date: Wed, 25 Jun 2025 10:04:58 +1000 Subject: [PATCH 12/13] log send message batch bad request errors --- proxy/sqs.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/proxy/sqs.go b/proxy/sqs.go index d3347e9..0e3fdb6 100644 --- a/proxy/sqs.go +++ b/proxy/sqs.go @@ -213,15 +213,18 @@ func handleSQSSend(w http.ResponseWriter, r *http.Request) { var messages []string if err := json.Unmarshal(body, &messages); err != nil { + log.Printf("Send message batch failure - Invalid JSON array: %v", err) http.Error(w, "Invalid JSON array", http.StatusBadRequest) return } if len(messages) == 0 { + log.Printf("Send message batch failure - Empty message array") http.Error(w, "Empty message array", http.StatusBadRequest) return } if len(messages) > maxSQSBatchSize { + log.Printf("Send message batch failure - Too many messages in batch, maximum is %d", maxSQSBatchSize) http.Error(w, fmt.Sprintf("Too many messages in batch, maximum is %d", maxSQSBatchSize), http.StatusBadRequest) return } From 607e387bc2fb9f1927bb9a275182b5f8c01b4a3d Mon Sep 17 00:00:00 2001 From: AlexEmSiKay Date: Wed, 25 Jun 2025 10:12:01 +1000 Subject: [PATCH 13/13] implement exponential backoff for SQS send message retries --- proxy/sqs.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/proxy/sqs.go b/proxy/sqs.go index 0e3fdb6..6f87ec5 100644 --- a/proxy/sqs.go +++ b/proxy/sqs.go @@ -15,6 +15,7 @@ import ( "regexp" "strconv" "strings" + "time" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-sdk-go-v2/aws" @@ -243,6 +244,13 @@ func handleSQSSend(w http.ResponseWriter, r *http.Request) { var nonRetryableEntries []sqstypes.SendMessageBatchRequestEntry = nil for (attempts == 0 || len(retryable_entries) > 0) && attempts < 5 { + // Sleep for exponential backoff on retry + if attempts > 0 { + // bit shift to calculate the sleep duration -> 500ms, 1s, 2s, 4s, 8s + sleepDuration := (1 << attempts) * 500 // Exponential backoff in milliseconds + time.Sleep(time.Duration(sleepDuration) * time.Millisecond) + } + attempts++ output, err := sqsCl.SendMessageBatch(context.Background(), &sqs.SendMessageBatchInput{ QueueUrl: aws.String(qURL),