diff --git a/README.md b/README.md index 1608100e..f230fb6e 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,7 @@ You can run the termination handler on any Kubernetes cluster running on AWS, in - EC2 Instance Rebalance Recommendation - EC2 Auto-Scaling Group Termination Lifecycle Hooks to take care of ASG Scale-In, AZ-Rebalance, Unhealthy Instances, and more! - EC2 Status Change Events + - EC2 Scheduled Change events from AWS Health - Helm installation and event configuration support - Webhook feature to send shutdown or restart notification messages - Unit & Integration Tests @@ -265,7 +266,7 @@ $ aws sqs create-queue --queue-name "${SQS_QUEUE_NAME}" --attributes file:///tmp #### 4. Create Amazon EventBridge Rules -Here are AWS CLI commands to create Amazon EventBridge rules so that ASG termination events, Spot Interruptions, Instance state changes and Rebalance Recommendations are sent to the SQS queue created in the previous step. This should really be configured via your favorite infrastructure-as-code tool like CloudFormation or Terraform: +Here are AWS CLI commands to create Amazon EventBridge rules so that ASG termination events, Spot Interruptions, Instance state changes, Rebalance Recommendations, and AWS Health Scheduled Changes are sent to the SQS queue created in the previous step. This should really be configured via your favorite infrastructure-as-code tool like CloudFormation or Terraform: ``` $ aws events put-rule \ @@ -295,6 +296,13 @@ $ aws events put-rule \ $ aws events put-targets --rule MyK8sInstanceStateChangeRule \ --targets "Id"="1","Arn"="arn:aws:sqs:us-east-1:123456789012:MyK8sTermQueue" + +$ aws events put-rule \ + --name MyK8sScheduledChangeRule \ + --event-pattern "{\"source\": [\"aws.health\"],\"detail-type\": [\"AWS Health Event\"]}" + +$ aws events put-targets --rule MyK8sScheduledChangeRule \ + --targets "Id"="1","Arn"="arn:aws:sqs:us-east-1:123456789012:MyK8sTermQueue" ``` #### 5. Create an IAM Role for the Pods diff --git a/go.mod b/go.mod index f80bc078..0b522a20 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( go.opentelemetry.io/otel v0.20.0 go.opentelemetry.io/otel/exporters/metric/prometheus v0.20.0 go.opentelemetry.io/otel/metric v0.20.0 + go.uber.org/multierr v1.7.0 golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a // indirect golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect golang.org/x/sys v0.0.0-20210608053332-aa57babbf139 diff --git a/go.sum b/go.sum index 13b17fed..83f67433 100644 --- a/go.sum +++ b/go.sum @@ -608,8 +608,12 @@ go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5/go.mod h1:nmDLcffg48OtT/PSW0H go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= +go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec= +go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= @@ -916,6 +920,8 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= gotest.tools/v3 v3.0.3 h1:4AuOwCGf4lLR9u3YOe2awrHygurzhO/HeQ6laiA6Sx0= gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8= diff --git a/pkg/monitor/sqsevent/asg-lifecycle-event.go b/pkg/monitor/sqsevent/asg-lifecycle-event.go index 1feda2aa..efe092b2 100644 --- a/pkg/monitor/sqsevent/asg-lifecycle-event.go +++ b/pkg/monitor/sqsevent/asg-lifecycle-event.go @@ -57,7 +57,7 @@ type LifecycleDetail struct { LifecycleTransition string `json:"LifecycleTransition"` } -func (m SQSMonitor) asgTerminationToInterruptionEvent(event EventBridgeEvent, message *sqs.Message) (*monitor.InterruptionEvent, error) { +func (m SQSMonitor) asgTerminationToInterruptionEvent(event *EventBridgeEvent, message *sqs.Message) (*monitor.InterruptionEvent, error) { lifecycleDetail := &LifecycleDetail{} err := json.Unmarshal(event.Detail, lifecycleDetail) if err != nil { diff --git a/pkg/monitor/sqsevent/ec2-state-change-event.go b/pkg/monitor/sqsevent/ec2-state-change-event.go index dd5d6ace..59725a19 100644 --- a/pkg/monitor/sqsevent/ec2-state-change-event.go +++ b/pkg/monitor/sqsevent/ec2-state-change-event.go @@ -50,7 +50,7 @@ type EC2StateChangeDetail struct { const instanceStatesToDrain = "stopping,stopped,shutting-down,terminated" -func (m SQSMonitor) ec2StateChangeToInterruptionEvent(event EventBridgeEvent, message *sqs.Message) (*monitor.InterruptionEvent, error) { +func (m SQSMonitor) ec2StateChangeToInterruptionEvent(event *EventBridgeEvent, message *sqs.Message) (*monitor.InterruptionEvent, error) { ec2StateChangeDetail := &EC2StateChangeDetail{} err := json.Unmarshal(event.Detail, ec2StateChangeDetail) if err != nil { diff --git a/pkg/monitor/sqsevent/rebalance-recommendation-event.go b/pkg/monitor/sqsevent/rebalance-recommendation-event.go index d1368935..8bf882b1 100644 --- a/pkg/monitor/sqsevent/rebalance-recommendation-event.go +++ b/pkg/monitor/sqsevent/rebalance-recommendation-event.go @@ -46,7 +46,7 @@ type RebalanceRecommendationDetail struct { InstanceID string `json:"instance-id"` } -func (m SQSMonitor) rebalanceRecommendationToInterruptionEvent(event EventBridgeEvent, message *sqs.Message) (*monitor.InterruptionEvent, error) { +func (m SQSMonitor) rebalanceRecommendationToInterruptionEvent(event *EventBridgeEvent, message *sqs.Message) (*monitor.InterruptionEvent, error) { rebalanceRecDetail := &RebalanceRecommendationDetail{} err := json.Unmarshal(event.Detail, rebalanceRecDetail) if err != nil { diff --git a/pkg/monitor/sqsevent/scheduled-change-event.go b/pkg/monitor/sqsevent/scheduled-change-event.go new file mode 100644 index 00000000..caca3c20 --- /dev/null +++ b/pkg/monitor/sqsevent/scheduled-change-event.go @@ -0,0 +1,122 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package sqsevent + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/aws/aws-node-termination-handler/pkg/monitor" + "github.com/aws/aws-node-termination-handler/pkg/node" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/rs/zerolog/log" +) + +/* Example AWS Health Scheduled Change EC2 Event: +{ + "version": "0", + "id": "7fb65329-1628-4cf3-a740-95fg457h1402", + "detail-type": "AWS Health Event", + "source": "aws.health", + "account": "account id", + "time": "2016-06-05T06:27:57Z", + "region": "us-east-1", + "resources": ["i-12345678"], + "detail": { + "eventArn": "arn:aws:health:region::event/id", + "service": "EC2", + "eventTypeCode": "AWS_EC2_DEDICATED_HOST_NETWORK_MAINTENANCE_SCHEDULED", + "eventTypeCategory": "scheduledChange", + "startTime": "Sat, 05 Jun 2016 15:10:09 GMT", + "eventDescription": [{ + "language": "en_US", + "latestDescription": "A description of the event will be provided here" + }], + "affectedEntities": [{ + "entityValue": "i-12345678", + "tags": { + "stage": "prod", + "app": "my-app" + } + }] + } +} +*/ + +// AffectedEntity holds information about an entity that is affected by a Health event +type AffectedEntity struct { + EntityValue string `json:"entityValue"` +} + +// ScheduledChangeEventDetail holds the event details for AWS Health scheduled EC2 change events from Amazon EventBridge +type ScheduledChangeEventDetail struct { + EventTypeCategory string `json:"eventTypeCategory"` + Service string `json:"service"` + AffectedEntities []AffectedEntity `json:"affectedEntities"` +} + +func (m SQSMonitor) scheduledEventToInterruptionEvents(event *EventBridgeEvent, message *sqs.Message) []InterruptionEventWrapper { + scheduledChangeEventDetail := &ScheduledChangeEventDetail{} + interruptionEventWrappers := []InterruptionEventWrapper{} + + if err := json.Unmarshal(event.Detail, scheduledChangeEventDetail); err != nil { + return append(interruptionEventWrappers, InterruptionEventWrapper{nil, err}) + } + + if scheduledChangeEventDetail.Service != "EC2" { + err := fmt.Errorf("events from Amazon EventBridge for service (%s) are not supported", scheduledChangeEventDetail.Service) + return append(interruptionEventWrappers, InterruptionEventWrapper{nil, err}) + } + + if scheduledChangeEventDetail.EventTypeCategory != "scheduledChange" { + err := fmt.Errorf("events from Amazon EventBridge with EventTypeCategory (%s) are not supported", scheduledChangeEventDetail.EventTypeCategory) + return append(interruptionEventWrappers, InterruptionEventWrapper{nil, err}) + } + + for _, affectedEntity := range scheduledChangeEventDetail.AffectedEntities { + nodeInfo, err := m.getNodeInfo(affectedEntity.EntityValue) + if err != nil { + interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{nil, err}) + continue + } + + // Begin drain immediately for scheduled change events to avoid disruptions in cases such as degraded hardware + interruptionEvent := monitor.InterruptionEvent{ + EventID: fmt.Sprintf("aws-health-scheduled-change-event-%x", event.ID), + Kind: SQSTerminateKind, + AutoScalingGroupName: nodeInfo.AsgName, + StartTime: time.Now(), + NodeName: nodeInfo.Name, + InstanceID: nodeInfo.InstanceID, + Description: fmt.Sprintf("AWS Health scheduled change event received. Instance %s will be interrupted at %s \n", nodeInfo.InstanceID, event.getTime()), + } + interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error { + if errs := m.deleteMessages([]*sqs.Message{message}); errs != nil { + return errs[0] + } + return nil + } + interruptionEvent.PreDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error { + if err := n.TaintScheduledMaintenance(interruptionEvent.NodeName, interruptionEvent.EventID); err != nil { + log.Err(err).Msgf("Unable to taint node with taint %s:%s", node.ScheduledMaintenanceTaint, interruptionEvent.EventID) + } + return nil + } + + interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{&interruptionEvent, nil}) + } + + return interruptionEventWrappers +} diff --git a/pkg/monitor/sqsevent/spot-itn-event.go b/pkg/monitor/sqsevent/spot-itn-event.go index 6b5d56c5..87fcffad 100644 --- a/pkg/monitor/sqsevent/spot-itn-event.go +++ b/pkg/monitor/sqsevent/spot-itn-event.go @@ -48,7 +48,7 @@ type SpotInterruptionDetail struct { InstanceAction string `json:"instance-action"` } -func (m SQSMonitor) spotITNTerminationToInterruptionEvent(event EventBridgeEvent, message *sqs.Message) (*monitor.InterruptionEvent, error) { +func (m SQSMonitor) spotITNTerminationToInterruptionEvent(event *EventBridgeEvent, message *sqs.Message) (*monitor.InterruptionEvent, error) { spotInterruptionDetail := &SpotInterruptionDetail{} err := json.Unmarshal(event.Detail, spotInterruptionDetail) if err != nil { diff --git a/pkg/monitor/sqsevent/sqs-monitor.go b/pkg/monitor/sqsevent/sqs-monitor.go index 5f25723b..5cc3f6fd 100644 --- a/pkg/monitor/sqsevent/sqs-monitor.go +++ b/pkg/monitor/sqsevent/sqs-monitor.go @@ -53,12 +53,18 @@ type SQSMonitor struct { ManagedAsgTag string } +// InterruptionEventWrapper is a convenience wrapper for associating an interruption event with its error, if any +type InterruptionEventWrapper struct { + InterruptionEvent *monitor.InterruptionEvent + Err error +} + // Kind denotes the kind of event that is processed func (m SQSMonitor) Kind() string { return SQSTerminateKind } -// Monitor continuously monitors SQS for events and sends interruption events to the passed in channel +// Monitor continuously monitors SQS for events and coordinates processing of the events func (m SQSMonitor) Monitor() error { log.Debug().Msg("Checking for queue messages") messages, err := m.receiveQueueMessages(m.QueueURL) @@ -66,79 +72,121 @@ func (m SQSMonitor) Monitor() error { return err } - failedEvents := 0 + failedEventBridgeEvents := 0 for _, message := range messages { - interruptionEvent, err := m.processSQSMessage(message) - dropMessage := false - switch { - case errors.Is(err, ErrNodeStateNotRunning): - // If the node is no longer running, just log and delete the message. If message deletion fails, count it as an error. - log.Warn().Err(err).Msg("dropping event for an already terminated node") - dropMessage = true - - case err != nil: - // Log errors and record as failed events - log.Err(err).Msg("ignoring event due to error") - failedEvents++ - - case interruptionEvent == nil: - log.Debug().Msg("dropping non-actionable event") - dropMessage = true - - case m.CheckIfManaged && !interruptionEvent.IsManaged: - // This event isn't for an instance that is managed by this process - log.Debug().Str("instance-id", interruptionEvent.InstanceID).Msg("dropping event for unmanaged node") - dropMessage = true - - case interruptionEvent.Kind == SQSTerminateKind: - // Successfully processed SQS message into a SQSTerminateKind interruption event - log.Debug().Msgf("Sending %s interruption event to the interruption channel", SQSTerminateKind) - m.InterruptionChan <- *interruptionEvent - - default: - eventJSON, _ := json.MarshalIndent(interruptionEvent, " ", " ") - log.Warn().Msgf("dropping event of an unrecognized kind: %s", eventJSON) - dropMessage = true + eventBridgeEvent, err := m.processSQSMessage(message) + if err != nil { + log.Err(err).Msg("error processing SQS message") + failedEventBridgeEvents++ + continue } - if dropMessage { - errs := m.deleteMessages([]*sqs.Message{message}) - if len(errs) > 0 { - log.Err(errs[0]).Msg("Error deleting message from SQS") - failedEvents++ - } + interruptionEventWrappers := m.processEventBridgeEvent(eventBridgeEvent, message) + + if err = m.processInterruptionEvents(interruptionEventWrappers, message); err != nil { + log.Err(err).Msg("error processing interruption events") + failedEventBridgeEvents++ } } - if len(messages) > 0 && failedEvents == len(messages) { + if len(messages) > 0 && failedEventBridgeEvents == len(messages) { return fmt.Errorf("none of the waiting queue events could be processed") } return nil } -// processSQSMessage checks sqs for new messages and returns interruption events -func (m SQSMonitor) processSQSMessage(message *sqs.Message) (*monitor.InterruptionEvent, error) { +// processSQSMessage interprets an SQS message and returns an EventBridge event +func (m SQSMonitor) processSQSMessage(message *sqs.Message) (*EventBridgeEvent, error) { event := EventBridgeEvent{} err := json.Unmarshal([]byte(*message.Body), &event) - if err != nil { - return nil, err - } - switch event.Source { + return &event, err +} + +// processEventBridgeEvent processes an EventBridge event and returns interruption event wrappers +func (m SQSMonitor) processEventBridgeEvent(eventBridgeEvent *EventBridgeEvent, message *sqs.Message) []InterruptionEventWrapper { + interruptionEventWrappers := []InterruptionEventWrapper{} + interruptionEvent := &monitor.InterruptionEvent{} + var err error + + switch eventBridgeEvent.Source { case "aws.autoscaling": - return m.asgTerminationToInterruptionEvent(event, message) + interruptionEvent, err = m.asgTerminationToInterruptionEvent(eventBridgeEvent, message) + return append(interruptionEventWrappers, InterruptionEventWrapper{interruptionEvent, err}) case "aws.ec2": - if event.DetailType == "EC2 Instance State-change Notification" { - return m.ec2StateChangeToInterruptionEvent(event, message) - } else if event.DetailType == "EC2 Spot Instance Interruption Warning" { - return m.spotITNTerminationToInterruptionEvent(event, message) - } else if event.DetailType == "EC2 Instance Rebalance Recommendation" { - return m.rebalanceRecommendationToInterruptionEvent(event, message) + if eventBridgeEvent.DetailType == "EC2 Instance State-change Notification" { + interruptionEvent, err = m.ec2StateChangeToInterruptionEvent(eventBridgeEvent, message) + } else if eventBridgeEvent.DetailType == "EC2 Spot Instance Interruption Warning" { + interruptionEvent, err = m.spotITNTerminationToInterruptionEvent(eventBridgeEvent, message) + } else if eventBridgeEvent.DetailType == "EC2 Instance Rebalance Recommendation" { + interruptionEvent, err = m.rebalanceRecommendationToInterruptionEvent(eventBridgeEvent, message) + } + return append(interruptionEventWrappers, InterruptionEventWrapper{interruptionEvent, err}) + + case "aws.health": + if eventBridgeEvent.DetailType == "AWS Health Event" { + return m.scheduledEventToInterruptionEvents(eventBridgeEvent, message) } } - return nil, fmt.Errorf("Event source (%s) is not supported", event.Source) + + err = fmt.Errorf("event source (%s) is not supported", eventBridgeEvent.Source) + return append(interruptionEventWrappers, InterruptionEventWrapper{nil, err}) +} + +// processInterruptionEvents takes interruption event wrappers and sends events to the interruption channel +func (m SQSMonitor) processInterruptionEvents(interruptionEventWrappers []InterruptionEventWrapper, message *sqs.Message) error { + dropMessageSuggestionCount := 0 + failedInterruptionEventsCount := 0 + + for _, eventWrapper := range interruptionEventWrappers { + switch { + case errors.Is(eventWrapper.Err, ErrNodeStateNotRunning): + // If the node is no longer running, just log and delete the message + log.Warn().Err(eventWrapper.Err).Msg("dropping interruption event for an already terminated node") + dropMessageSuggestionCount++ + + case eventWrapper.Err != nil: + // Log errors and record as failed events. Don't delete the message in order to allow retries + log.Err(eventWrapper.Err).Msg("ignoring interruption event due to error") + failedInterruptionEventsCount++ // seems useless + + case eventWrapper.InterruptionEvent == nil: + log.Debug().Msg("dropping non-actionable interruption event") + dropMessageSuggestionCount++ + + case m.CheckIfManaged && !eventWrapper.InterruptionEvent.IsManaged: + // This event isn't for an instance that is managed by this process + log.Debug().Str("instance-id", eventWrapper.InterruptionEvent.InstanceID).Msg("dropping interruption event for unmanaged node") + dropMessageSuggestionCount++ + + case eventWrapper.InterruptionEvent.Kind == SQSTerminateKind: + // Successfully processed SQS message into a SQSTerminateKind interruption event + log.Debug().Msgf("Sending %s interruption event to the interruption channel", SQSTerminateKind) + m.InterruptionChan <- *eventWrapper.InterruptionEvent + + default: + eventJSON, _ := json.MarshalIndent(eventWrapper.InterruptionEvent, " ", " ") + log.Warn().Msgf("dropping interruption event of an unrecognized kind: %s", eventJSON) + dropMessageSuggestionCount++ + } + } + + if dropMessageSuggestionCount == len(interruptionEventWrappers) { + // All interruption events weren't actionable, just delete the message. If message deletion fails, count it as an error + errs := m.deleteMessages([]*sqs.Message{message}) + if len(errs) > 0 { + log.Err(errs[0]).Msg("Error deleting message from SQS") + failedInterruptionEventsCount++ + } + } + + if failedInterruptionEventsCount != 0 { + return fmt.Errorf("some interruption events for message Id %b could not be processed", message.MessageId) + } + + return nil } // receiveQueueMessages checks the configured SQS queue for new messages diff --git a/test/e2e/scheduled-change-event-sqs-test b/test/e2e/scheduled-change-event-sqs-test new file mode 100755 index 00000000..0fc33b9a --- /dev/null +++ b/test/e2e/scheduled-change-event-sqs-test @@ -0,0 +1,215 @@ +#!/bin/bash +set -euo pipefail + +# Available env vars: +# $TMP_DIR +# $CLUSTER_NAME +# $KUBECONFIG +# $NODE_TERMINATION_HANDLER_DOCKER_REPO +# $NODE_TERMINATION_HANDLER_DOCKER_TAG +# $WEBHOOK_DOCKER_REPO +# $WEBHOOK_DOCKER_TAG +# $AEMM_URL +# $AEMM_VERSION + + +function fail_and_exit { + echo "❌ AWS Scheduled Change SQS Test failed $CLUSTER_NAME ❌" + exit "${1:-1}" +} + +echo "Starting AWS Scheduled Change SQS Test for Node Termination Handler" +START_TIME=$(date -u +"%Y-%m-%dT%TZ") + +SCRIPTPATH="$( cd "$(dirname "$0")" ; pwd -P )" + +common_helm_args=() + +localstack_helm_args=( + upgrade + --install + "$CLUSTER_NAME-localstack" + "$SCRIPTPATH/../../config/helm/localstack/" + --wait + --namespace default + --set nodeSelector."$NTH_CONTROL_LABEL" + --set defaultRegion="${AWS_REGION}" +) + +set -x +helm "${localstack_helm_args[@]}" +set +x + +sleep 10 + +RUN_INSTANCE_CMD="awslocal ec2 run-instances --private-ip-address ${WORKER_IP} --region ${AWS_REGION} --tag-specifications 'ResourceType=instance,Tags=[{Key=aws:autoscaling:groupName,Value=nth-integ-test}]'" +localstack_pod=$(kubectl get pods --selector app=localstack --field-selector="status.phase=Running" \ + -o go-template --template '{{range .items}}{{.metadata.name}} {{.metadata.creationTimestamp}}{{"\n"}}{{end}}' \ + | awk '$2 >= "'"${START_TIME//+0000/Z}"'" { print $1 }') +echo "🥑 Using localstack pod ${localstack_pod}" +run_instances_resp=$(kubectl exec -i "${localstack_pod}" -- bash -c "${RUN_INSTANCE_CMD}") +private_dns_name=$(echo "${run_instances_resp}" | jq -r '.Instances[] .PrivateDnsName') +instance_id=$(echo "${run_instances_resp}" | jq -r '.Instances[] .InstanceId') +echo "🥑 Started mock EC2 instance (${instance_id}) w/ private DNS name: ${private_dns_name}" + +CREATE_SQS_CMD="awslocal sqs create-queue --queue-name "${CLUSTER_NAME}-queue" --attributes MessageRetentionPeriod=300 --region ${AWS_REGION}" +queue_url=$(kubectl exec -i "${localstack_pod}" -- bash -c "${CREATE_SQS_CMD}" | jq -r .QueueUrl) + +echo "🥑 Created SQS Queue ${queue_url}" + +anth_helm_args=( + upgrade + --install + "$CLUSTER_NAME-acth" + "$SCRIPTPATH/../../config/helm/aws-node-termination-handler/" + --namespace kube-system + --set image.repository="$NODE_TERMINATION_HANDLER_DOCKER_REPO" + --set image.tag="$NODE_TERMINATION_HANDLER_DOCKER_TAG" + --set awsAccessKeyID=foo + --set awsSecretAccessKey=bar + --set awsRegion="${AWS_REGION}" + --set awsEndpoint="http://localstack.default" + --set checkASGTagBeforeDraining=false + --set enableSqsTerminationDraining=true + --set enableScheduledEventDraining=false + --set enableSpotInterruptionDraining=false + --set enableRebalanceMonitoring=false + --set taintNode="true" + --set nodeSelector."$NTH_CONTROL_LABEL" + --set "queueURL=${queue_url}" +) +[[ -n "${NODE_TERMINATION_HANDLER_DOCKER_PULL_POLICY-}" ]] && + anth_helm_args+=(--set image.pullPolicy="$NODE_TERMINATION_HANDLER_DOCKER_PULL_POLICY") +[[ ${#common_helm_args[@]} -gt 0 ]] && + anth_helm_args+=("${common_helm_args[@]}") + +set -x +helm "${anth_helm_args[@]}" +set +x + +emtp_helm_args=( + upgrade + --install + "$CLUSTER_NAME-emtp" + "$SCRIPTPATH/../../config/helm/webhook-test-proxy/" + --namespace default + --set webhookTestProxy.image.repository="$WEBHOOK_DOCKER_REPO" + --set webhookTestProxy.image.tag="$WEBHOOK_DOCKER_TAG" +) +[[ -n "${WEBHOOK_DOCKER_PULL_POLICY-}" ]] && + emtp_helm_args+=(--set webhookTestProxy.image.pullPolicy="$WEBHOOK_DOCKER_PULL_POLICY") +[[ ${#common_helm_args[@]} -gt 0 ]] && + emtp_helm_args+=("${common_helm_args[@]}") + +set -x +helm "${emtp_helm_args[@]}" +set +x + +TAINT_CHECK_CYCLES=15 +TAINT_CHECK_SLEEP=15 + +DEPLOYED=0 + +for i in $(seq 1 $TAINT_CHECK_CYCLES); do + if [[ $(kubectl get deployments regular-pod-test -o jsonpath='{.status.unavailableReplicas}') -eq 0 ]]; then + echo "✅ Verified regular-pod-test pod was scheduled and started!" + DEPLOYED=1 + break + fi + echo "Setup Loop $i/$TAINT_CHECK_CYCLES, sleeping for $TAINT_CHECK_SLEEP seconds" + sleep $TAINT_CHECK_SLEEP +done + +if [[ $DEPLOYED -eq 0 ]]; then + echo "❌ regular-pod-test pod deployment failed" + fail_and_exit 2 +fi + +AWS_SCHEDULED_CHANGE_EVENT=$(cat < /dev/null; then + echo "✅ Verified the worker node was cordoned!" + cordoned=1 + fi + + if [[ $cordoned -eq 1 ]] && kubectl get nodes "${test_node}" -o json | grep "aws-node-termination-handler/scheduled-maintenance" >/dev/null; then + echo "✅ Verified the worker node was tainted!" + tainted=1 + fi + + if [[ $tainted -eq 1 && $(kubectl get deployments regular-pod-test -o=jsonpath='{.status.unavailableReplicas}') -eq 1 ]]; then + echo "✅ Verified the regular-pod-test pod was evicted!" + evicted=1 + fi + + if [[ ${evicted} -eq 1 && $(kubectl exec -i "${localstack_pod}" -- bash -c "${GET_ATTRS_SQS_CMD}" | jq '(.Attributes.ApproximateNumberOfMessagesNotVisible|tonumber) + (.Attributes.ApproximateNumberOfMessages|tonumber)' ) -eq 0 ]]; then + kubectl exec -i "${localstack_pod}" -- bash -c "${GET_ATTRS_SQS_CMD}" + echo "✅ Verified the message was deleted from the queue after processing!" + message_deleted=1 + echo "✅ AWS Scheduled Change SQS Test Passed $CLUSTER_NAME! ✅" + exit 0 + fi + + echo "Assertion Loop $i/$TAINT_CHECK_CYCLES, sleeping for $TAINT_CHECK_SLEEP seconds" + sleep $TAINT_CHECK_SLEEP +done + +if [[ $cordoned -eq 0 ]]; then + echo "❌ Worker node was not cordoned" + fail_and_exit 3 +elif [[ $tainted -eq 0 ]]; then + echo "❌ Worker node was not tainted" + fail_and_exit 3 +elif [[ $evicted -eq 0 ]]; then + echo "❌ regular-pod-test was NOT evicted" + fail_and_exit 3 +elif [[ $message_deleted -eq 0 ]]; then + echo "❌ message was not removed from the queue after processing" + fail_and_exit 3 +fi + +echo "❌ AWS Scheduled Change SQS Test Failed $CLUSTER_NAME ❌" +fail_and_exit 1 \ No newline at end of file