From b113a549b19114716be1cc1411ce42391cf3a174 Mon Sep 17 00:00:00 2001 From: Austin Siu Date: Thu, 14 Oct 2021 01:40:25 -0500 Subject: [PATCH 01/10] Add AWS Health event support for QP mode --- README.md | 10 +- go.mod | 1 + go.sum | 6 + pkg/monitor/sqsevent/asg-lifecycle-event.go | 2 +- .../sqsevent/ec2-state-change-event.go | 2 +- .../rebalance-recommendation-event.go | 2 +- pkg/monitor/sqsevent/scheduled-event.go | 130 +++++++++++ pkg/monitor/sqsevent/spot-itn-event.go | 2 +- pkg/monitor/sqsevent/sqs-monitor.go | 153 ++++++++----- test/e2e/scheduled-change-event-sqs-test | 215 ++++++++++++++++++ 10 files changed, 466 insertions(+), 57 deletions(-) create mode 100644 pkg/monitor/sqsevent/scheduled-event.go create mode 100755 test/e2e/scheduled-change-event-sqs-test diff --git a/README.md b/README.md index 1608100e..f230fb6e 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,7 @@ You can run the termination handler on any Kubernetes cluster running on AWS, in - EC2 Instance Rebalance Recommendation - EC2 Auto-Scaling Group Termination Lifecycle Hooks to take care of ASG Scale-In, AZ-Rebalance, Unhealthy Instances, and more! - EC2 Status Change Events + - EC2 Scheduled Change events from AWS Health - Helm installation and event configuration support - Webhook feature to send shutdown or restart notification messages - Unit & Integration Tests @@ -265,7 +266,7 @@ $ aws sqs create-queue --queue-name "${SQS_QUEUE_NAME}" --attributes file:///tmp #### 4. Create Amazon EventBridge Rules -Here are AWS CLI commands to create Amazon EventBridge rules so that ASG termination events, Spot Interruptions, Instance state changes and Rebalance Recommendations are sent to the SQS queue created in the previous step. This should really be configured via your favorite infrastructure-as-code tool like CloudFormation or Terraform: +Here are AWS CLI commands to create Amazon EventBridge rules so that ASG termination events, Spot Interruptions, Instance state changes, Rebalance Recommendations, and AWS Health Scheduled Changes are sent to the SQS queue created in the previous step. This should really be configured via your favorite infrastructure-as-code tool like CloudFormation or Terraform: ``` $ aws events put-rule \ @@ -295,6 +296,13 @@ $ aws events put-rule \ $ aws events put-targets --rule MyK8sInstanceStateChangeRule \ --targets "Id"="1","Arn"="arn:aws:sqs:us-east-1:123456789012:MyK8sTermQueue" + +$ aws events put-rule \ + --name MyK8sScheduledChangeRule \ + --event-pattern "{\"source\": [\"aws.health\"],\"detail-type\": [\"AWS Health Event\"]}" + +$ aws events put-targets --rule MyK8sScheduledChangeRule \ + --targets "Id"="1","Arn"="arn:aws:sqs:us-east-1:123456789012:MyK8sTermQueue" ``` #### 5. Create an IAM Role for the Pods diff --git a/go.mod b/go.mod index f80bc078..0b522a20 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( go.opentelemetry.io/otel v0.20.0 go.opentelemetry.io/otel/exporters/metric/prometheus v0.20.0 go.opentelemetry.io/otel/metric v0.20.0 + go.uber.org/multierr v1.7.0 golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a // indirect golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect golang.org/x/sys v0.0.0-20210608053332-aa57babbf139 diff --git a/go.sum b/go.sum index 13b17fed..83f67433 100644 --- a/go.sum +++ b/go.sum @@ -608,8 +608,12 @@ go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5/go.mod h1:nmDLcffg48OtT/PSW0H go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= +go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec= +go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= @@ -916,6 +920,8 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= gotest.tools/v3 v3.0.3 h1:4AuOwCGf4lLR9u3YOe2awrHygurzhO/HeQ6laiA6Sx0= gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8= diff --git a/pkg/monitor/sqsevent/asg-lifecycle-event.go b/pkg/monitor/sqsevent/asg-lifecycle-event.go index 1feda2aa..efe092b2 100644 --- a/pkg/monitor/sqsevent/asg-lifecycle-event.go +++ b/pkg/monitor/sqsevent/asg-lifecycle-event.go @@ -57,7 +57,7 @@ type LifecycleDetail struct { LifecycleTransition string `json:"LifecycleTransition"` } -func (m SQSMonitor) asgTerminationToInterruptionEvent(event EventBridgeEvent, message *sqs.Message) (*monitor.InterruptionEvent, error) { +func (m SQSMonitor) asgTerminationToInterruptionEvent(event *EventBridgeEvent, message *sqs.Message) (*monitor.InterruptionEvent, error) { lifecycleDetail := &LifecycleDetail{} err := json.Unmarshal(event.Detail, lifecycleDetail) if err != nil { diff --git a/pkg/monitor/sqsevent/ec2-state-change-event.go b/pkg/monitor/sqsevent/ec2-state-change-event.go index dd5d6ace..59725a19 100644 --- a/pkg/monitor/sqsevent/ec2-state-change-event.go +++ b/pkg/monitor/sqsevent/ec2-state-change-event.go @@ -50,7 +50,7 @@ type EC2StateChangeDetail struct { const instanceStatesToDrain = "stopping,stopped,shutting-down,terminated" -func (m SQSMonitor) ec2StateChangeToInterruptionEvent(event EventBridgeEvent, message *sqs.Message) (*monitor.InterruptionEvent, error) { +func (m SQSMonitor) ec2StateChangeToInterruptionEvent(event *EventBridgeEvent, message *sqs.Message) (*monitor.InterruptionEvent, error) { ec2StateChangeDetail := &EC2StateChangeDetail{} err := json.Unmarshal(event.Detail, ec2StateChangeDetail) if err != nil { diff --git a/pkg/monitor/sqsevent/rebalance-recommendation-event.go b/pkg/monitor/sqsevent/rebalance-recommendation-event.go index d1368935..8bf882b1 100644 --- a/pkg/monitor/sqsevent/rebalance-recommendation-event.go +++ b/pkg/monitor/sqsevent/rebalance-recommendation-event.go @@ -46,7 +46,7 @@ type RebalanceRecommendationDetail struct { InstanceID string `json:"instance-id"` } -func (m SQSMonitor) rebalanceRecommendationToInterruptionEvent(event EventBridgeEvent, message *sqs.Message) (*monitor.InterruptionEvent, error) { +func (m SQSMonitor) rebalanceRecommendationToInterruptionEvent(event *EventBridgeEvent, message *sqs.Message) (*monitor.InterruptionEvent, error) { rebalanceRecDetail := &RebalanceRecommendationDetail{} err := json.Unmarshal(event.Detail, rebalanceRecDetail) if err != nil { diff --git a/pkg/monitor/sqsevent/scheduled-event.go b/pkg/monitor/sqsevent/scheduled-event.go new file mode 100644 index 00000000..fb9caa63 --- /dev/null +++ b/pkg/monitor/sqsevent/scheduled-event.go @@ -0,0 +1,130 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package sqsevent + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/aws/aws-node-termination-handler/pkg/monitor" + "github.com/aws/aws-node-termination-handler/pkg/node" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/rs/zerolog/log" +) + +/* Example AWS Health Scheduled Change EC2 Event: +{ + "version": "0", + "id": "7fb65329-1628-4cf3-a740-95fg457h1402", + "detail-type": "AWS Health Event", + "source": "aws.health", + "account": "account id", + "time": "2016-06-05T06:27:57Z", + "region": "us-east-1", + "resources": ["i-12345678"], + "detail": { + "eventArn": "arn:aws:health:region::event/id", + "service": "EC2", + "eventTypeCode": "AWS_EC2_DEDICATED_HOST_NETWORK_MAINTENANCE_SCHEDULED", + "eventTypeCategory": "scheduledChange", + "startTime": "Sat, 05 Jun 2016 15:10:09 GMT", + "eventDescription": [{ + "language": "en_US", + "latestDescription": "A description of the event will be provided here" + }], + "affectedEntities": [{ + "entityValue": "i-12345678", + "tags": { + "stage": "prod", + "app": "my-app" + } + }] + } +} +*/ + +// AffectedEntity holds information about an entity that is affected by a Health event +type AffectedEntity struct { + EntityValue string `json:"entityValue"` +} + +// ScheduledEventDetail holds the event details for AWS Health scheduled EC2 change events from Amazon EventBridge +type ScheduledEventDetail struct { + EventTypeCategory string `json:"eventTypeCategory"` + Service string `json:"service"` + AffectedEntities []AffectedEntity `json:"affectedEntities"` +} + +const supportedEventCategoryTypes = "scheduledChange" + +// func (m SQSMonitor) scheduledEventToInterruptionEvents(event EventBridgeEvent, message *sqs.Message) ([]InterruptionEventWrapper, error) { +func (m SQSMonitor) scheduledEventToInterruptionEvents(event *EventBridgeEvent, message *sqs.Message) []InterruptionEventWrapper { + scheduledEventDetail := &ScheduledEventDetail{} + + if err := json.Unmarshal(event.Detail, scheduledEventDetail); err != nil { + return []InterruptionEventWrapper{InterruptionEventWrapper{nil, err}} + } + + if scheduledEventDetail.Service != "EC2" { + return []InterruptionEventWrapper{InterruptionEventWrapper{nil, fmt.Errorf("events from Amazon EventBridge for service (%s) are not supported", scheduledEventDetail.Service)}} + } + + if !strings.Contains(supportedEventCategoryTypes, scheduledEventDetail.EventTypeCategory) { + return nil, fmt.Errorf("events from Amazon EventBridge with EventTypeCategory (%s) are not supported", scheduledEventDetail.EventTypeCategory) + } + + // interruptionEventWrappers := make([]InterruptionEventWrapper, len(event.Resources)) + interruptionEventWrappers := []InterruptionEventWrapper{} + + for _, affectedEntity := range scheduledEventDetail.AffectedEntities { + nodeName, err := m.retrieveNodeName(affectedEntity.EntityValue) + if err != nil { + // interruptionEventWrappers[i] = InterruptionEventWrapper{nil, err} + interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{nil, err}) + continue + } + asgName, _ := m.retrieveAutoScalingGroupName(affectedEntity.EntityValue) + interruptionEvent := monitor.InterruptionEvent{ + EventID: fmt.Sprintf("aws-health-maintenance-event-%x", event.ID), + Kind: SQSTerminateKind, + AutoScalingGroupName: asgName, + StartTime: time.Now(), + NodeName: nodeName, + InstanceID: affectedEntity.EntityValue, + Description: fmt.Sprintf("AWS Health maintenance event received. Instance %s will be interrupted at %s \n", affectedEntity.EntityValue, event.getTime()), + } + interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error { + errs := m.deleteMessages([]*sqs.Message{message}) + if errs != nil { + return errs[0] + } + return nil + } + interruptionEvent.PreDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error { + err := n.TaintScheduledMaintenance(interruptionEvent.NodeName, interruptionEvent.EventID) + if err != nil { + log.Err(err).Msgf("Unable to taint node with taint %s:%s", node.ScheduledMaintenanceTaint, interruptionEvent.EventID) + } + return nil + } + + // interruptionEventWrappers[i] = InterruptionEventWrapper{&interruptionEvent, nil} + interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{&interruptionEvent, nil}) + + } + + return interruptionEventWrappers, nil +} diff --git a/pkg/monitor/sqsevent/spot-itn-event.go b/pkg/monitor/sqsevent/spot-itn-event.go index 6b5d56c5..87fcffad 100644 --- a/pkg/monitor/sqsevent/spot-itn-event.go +++ b/pkg/monitor/sqsevent/spot-itn-event.go @@ -48,7 +48,7 @@ type SpotInterruptionDetail struct { InstanceAction string `json:"instance-action"` } -func (m SQSMonitor) spotITNTerminationToInterruptionEvent(event EventBridgeEvent, message *sqs.Message) (*monitor.InterruptionEvent, error) { +func (m SQSMonitor) spotITNTerminationToInterruptionEvent(event *EventBridgeEvent, message *sqs.Message) (*monitor.InterruptionEvent, error) { spotInterruptionDetail := &SpotInterruptionDetail{} err := json.Unmarshal(event.Detail, spotInterruptionDetail) if err != nil { diff --git a/pkg/monitor/sqsevent/sqs-monitor.go b/pkg/monitor/sqsevent/sqs-monitor.go index 5f25723b..c0da1552 100644 --- a/pkg/monitor/sqsevent/sqs-monitor.go +++ b/pkg/monitor/sqsevent/sqs-monitor.go @@ -53,6 +53,12 @@ type SQSMonitor struct { ManagedAsgTag string } +// Convenience wrapper for handling a pair of an interruption event and a related error +type InterruptionEventWrapper struct { + InterruptionEvent *monitor.InterruptionEvent + Err error +} + // Kind denotes the kind of event that is processed func (m SQSMonitor) Kind() string { return SQSTerminateKind @@ -66,51 +72,24 @@ func (m SQSMonitor) Monitor() error { return err } - failedEvents := 0 + failedEventBridgeEvents := 0 for _, message := range messages { - interruptionEvent, err := m.processSQSMessage(message) - dropMessage := false - switch { - case errors.Is(err, ErrNodeStateNotRunning): - // If the node is no longer running, just log and delete the message. If message deletion fails, count it as an error. - log.Warn().Err(err).Msg("dropping event for an already terminated node") - dropMessage = true - - case err != nil: - // Log errors and record as failed events - log.Err(err).Msg("ignoring event due to error") - failedEvents++ - - case interruptionEvent == nil: - log.Debug().Msg("dropping non-actionable event") - dropMessage = true - - case m.CheckIfManaged && !interruptionEvent.IsManaged: - // This event isn't for an instance that is managed by this process - log.Debug().Str("instance-id", interruptionEvent.InstanceID).Msg("dropping event for unmanaged node") - dropMessage = true - - case interruptionEvent.Kind == SQSTerminateKind: - // Successfully processed SQS message into a SQSTerminateKind interruption event - log.Debug().Msgf("Sending %s interruption event to the interruption channel", SQSTerminateKind) - m.InterruptionChan <- *interruptionEvent - - default: - eventJSON, _ := json.MarshalIndent(interruptionEvent, " ", " ") - log.Warn().Msgf("dropping event of an unrecognized kind: %s", eventJSON) - dropMessage = true + eventBridgeEvent, err := m.processSQSMessage(message) + if err != nil { + log.Err(err).Msg("error processing SQS message") + failedEventBridgeEvents++ } - if dropMessage { - errs := m.deleteMessages([]*sqs.Message{message}) - if len(errs) > 0 { - log.Err(errs[0]).Msg("Error deleting message from SQS") - failedEvents++ - } + interruptionEventWrappers := m.processEventBridgeEvent(eventBridgeEvent, message) + + err = m.processInterruptionEvents(interruptionEventWrappers, message) + if err != nil { + log.Err(err).Msg("error processing interruption events") + failedEventBridgeEvents++ } } - if len(messages) > 0 && failedEvents == len(messages) { + if len(messages) > 0 && failedEventBridgeEvents == len(messages) { return fmt.Errorf("none of the waiting queue events could be processed") } @@ -118,27 +97,97 @@ func (m SQSMonitor) Monitor() error { } // processSQSMessage checks sqs for new messages and returns interruption events -func (m SQSMonitor) processSQSMessage(message *sqs.Message) (*monitor.InterruptionEvent, error) { +// func (m SQSMonitor) processSQSMessage(message *sqs.Message) (*monitor.InterruptionEvent, error) { +func (m SQSMonitor) processSQSMessage(message *sqs.Message) (*EventBridgeEvent, error) { event := EventBridgeEvent{} err := json.Unmarshal([]byte(*message.Body), &event) - if err != nil { - return nil, err - } - switch event.Source { + return &event, err +} + +// +func (m SQSMonitor) processEventBridgeEvent(eventBridgeEvent *EventBridgeEvent, message *sqs.Message) []InterruptionEventWrapper { + interruptionEventWrappers := []InterruptionEventWrapper{} + interruptionEvent := &monitor.InterruptionEvent{} + var err error + + switch eventBridgeEvent.Source { case "aws.autoscaling": - return m.asgTerminationToInterruptionEvent(event, message) + interruptionEvent, err = m.asgTerminationToInterruptionEvent(eventBridgeEvent, message) case "aws.ec2": - if event.DetailType == "EC2 Instance State-change Notification" { - return m.ec2StateChangeToInterruptionEvent(event, message) - } else if event.DetailType == "EC2 Spot Instance Interruption Warning" { - return m.spotITNTerminationToInterruptionEvent(event, message) - } else if event.DetailType == "EC2 Instance Rebalance Recommendation" { - return m.rebalanceRecommendationToInterruptionEvent(event, message) + if eventBridgeEvent.DetailType == "EC2 Instance State-change Notification" { + interruptionEvent, err = m.ec2StateChangeToInterruptionEvent(eventBridgeEvent, message) + } else if eventBridgeEvent.DetailType == "EC2 Spot Instance Interruption Warning" { + interruptionEvent, err = m.spotITNTerminationToInterruptionEvent(eventBridgeEvent, message) + } else if eventBridgeEvent.DetailType == "EC2 Instance Rebalance Recommendation" { + interruptionEvent, err = m.rebalanceRecommendationToInterruptionEvent(eventBridgeEvent, message) + } + + case "aws.health": + if eventBridgeEvent.DetailType == "AWS Health Event" { + interruptionEventWrappers = m.scheduledEventToInterruptionEvents(eventBridgeEvent, message) + } + + default: + return append(interruptionEventWrappers, InterruptionEventWrapper{nil, fmt.Errorf("Event source (%s) is not supported", eventBridgeEvent.Source)}) + } + + return append(interruptionEventWrappers, InterruptionEventWrapper{interruptionEvent, err}) +} + +// +func (m SQSMonitor) processInterruptionEvents(interruptionEventWrappers []InterruptionEventWrapper, message *sqs.Message) error { + dropMessageSuggestionCount := 0 + failedInterruptionEventsCount := 0 + + for _, eventWrapper := range interruptionEventWrappers { + switch { + case errors.Is(eventWrapper.Err, ErrNodeStateNotRunning): + // If the node is no longer running, just log and delete the message + log.Warn().Err(eventWrapper.Err).Msg("dropping interruption event for an already terminated node") + dropMessageSuggestionCount++ + + case eventWrapper.Err != nil: + // Log errors and record as failed events. Don't delete the message in order to allow retries + log.Err(eventWrapper.Err).Msg("ignoring interruption event due to error") + failedInterruptionEventsCount++ // seems useless + + case eventWrapper.InterruptionEvent == nil: + log.Debug().Msg("dropping non-actionable interruption event") + dropMessageSuggestionCount++ + + case m.CheckIfManaged && !eventWrapper.InterruptionEvent.IsManaged: + // This event isn't for an instance that is managed by this process + log.Debug().Str("instance-id", eventWrapper.InterruptionEvent.InstanceID).Msg("dropping interruption event for unmanaged node") + dropMessageSuggestionCount++ + + case eventWrapper.InterruptionEvent.Kind == SQSTerminateKind: + // Successfully processed SQS message into a SQSTerminateKind interruption event + log.Debug().Msgf("Sending %s interruption event to the interruption channel", SQSTerminateKind) + m.InterruptionChan <- *eventWrapper.InterruptionEvent + + default: + eventJSON, _ := json.MarshalIndent(eventWrapper.InterruptionEvent, " ", " ") + log.Warn().Msgf("dropping interruption event of an unrecognized kind: %s", eventJSON) + dropMessageSuggestionCount++ + } + } + + if dropMessageSuggestionCount == len(interruptionEventWrappers) { + // All interruption events weren't actionable, just delete the message. If message deletion fails, count it as an error + errs := m.deleteMessages([]*sqs.Message{message}) + if len(errs) > 0 { + log.Err(errs[0]).Msg("Error deleting message from SQS") + // failedInterruptionEventsCount++ // wront count? } } - return nil, fmt.Errorf("Event source (%s) is not supported", event.Source) + + if failedInterruptionEventsCount == 0 { + return nil + } else { + return fmt.Errorf("%b of %b interruption events for message Id %b could not be processed", failedInterruptionEventsCount, len(interruptionEventWrappers), message.MessageId) + } } // receiveQueueMessages checks the configured SQS queue for new messages diff --git a/test/e2e/scheduled-change-event-sqs-test b/test/e2e/scheduled-change-event-sqs-test new file mode 100755 index 00000000..9db41955 --- /dev/null +++ b/test/e2e/scheduled-change-event-sqs-test @@ -0,0 +1,215 @@ +#!/bin/bash +set -euo pipefail + +# Available env vars: +# $TMP_DIR +# $CLUSTER_NAME +# $KUBECONFIG +# $NODE_TERMINATION_HANDLER_DOCKER_REPO +# $NODE_TERMINATION_HANDLER_DOCKER_TAG +# $WEBHOOK_DOCKER_REPO +# $WEBHOOK_DOCKER_TAG +# $AEMM_URL +# $AEMM_VERSION + + +function fail_and_exit { + echo "❌ AWS Health Maintenance SQS Test failed $CLUSTER_NAME ❌" + exit "${1:-1}" +} + +echo "Starting AWS Health Maintenance SQS Test for Node Termination Handler" +START_TIME=$(date -u +"%Y-%m-%dT%TZ") + +SCRIPTPATH="$( cd "$(dirname "$0")" ; pwd -P )" + +common_helm_args=() + +localstack_helm_args=( + upgrade + --install + "$CLUSTER_NAME-localstack" + "$SCRIPTPATH/../../config/helm/localstack/" + --wait + --namespace default + --set nodeSelector."$NTH_CONTROL_LABEL" + --set defaultRegion="${AWS_REGION}" +) + +set -x +helm "${localstack_helm_args[@]}" +set +x + +sleep 10 + +RUN_INSTANCE_CMD="awslocal ec2 run-instances --private-ip-address ${WORKER_IP} --region ${AWS_REGION}" +localstack_pod=$(kubectl get pods --selector app=localstack --field-selector="status.phase=Running" \ + -o go-template --template '{{range .items}}{{.metadata.name}} {{.metadata.creationTimestamp}}{{"\n"}}{{end}}' \ + | awk '$2 >= "'"${START_TIME//+0000/Z}"'" { print $1 }') +echo "🥑 Using localstack pod ${localstack_pod}" +run_instances_resp=$(kubectl exec -i "${localstack_pod}" -- bash -c "${RUN_INSTANCE_CMD}") +private_dns_name=$(echo "${run_instances_resp}" | jq -r '.Instances[] .PrivateDnsName') +instance_id=$(echo "${run_instances_resp}" | jq -r '.Instances[] .InstanceId') +echo "🥑 Started mock EC2 instance (${instance_id}) w/ private DNS name: ${private_dns_name}" + +CREATE_SQS_CMD="awslocal sqs create-queue --queue-name "${CLUSTER_NAME}-queue" --attributes MessageRetentionPeriod=300 --region ${AWS_REGION}" +queue_url=$(kubectl exec -i "${localstack_pod}" -- bash -c "${CREATE_SQS_CMD}" | jq -r .QueueUrl) + +echo "🥑 Created SQS Queue ${queue_url}" + +anth_helm_args=( + upgrade + --install + "$CLUSTER_NAME-acth" + "$SCRIPTPATH/../../config/helm/aws-node-termination-handler/" + --namespace kube-system + --set image.repository="$NODE_TERMINATION_HANDLER_DOCKER_REPO" + --set image.tag="$NODE_TERMINATION_HANDLER_DOCKER_TAG" + --set awsAccessKeyID=foo + --set awsSecretAccessKey=bar + --set awsRegion="${AWS_REGION}" + --set awsEndpoint="http://localstack.default" + --set checkASGTagBeforeDraining=false + --set enableSqsTerminationDraining=true + --set enableScheduledEventDraining=false + --set enableSpotInterruptionDraining=false + --set enableRebalanceMonitoring=false + --set taintNode="true" + --set nodeSelector."$NTH_CONTROL_LABEL" + --set "queueURL=${queue_url}" +) +[[ -n "${NODE_TERMINATION_HANDLER_DOCKER_PULL_POLICY-}" ]] && + anth_helm_args+=(--set image.pullPolicy="$NODE_TERMINATION_HANDLER_DOCKER_PULL_POLICY") +[[ ${#common_helm_args[@]} -gt 0 ]] && + anth_helm_args+=("${common_helm_args[@]}") + +set -x +helm "${anth_helm_args[@]}" +set +x + +emtp_helm_args=( + upgrade + --install + "$CLUSTER_NAME-emtp" + "$SCRIPTPATH/../../config/helm/webhook-test-proxy/" + --namespace default + --set webhookTestProxy.image.repository="$WEBHOOK_DOCKER_REPO" + --set webhookTestProxy.image.tag="$WEBHOOK_DOCKER_TAG" +) +[[ -n "${WEBHOOK_DOCKER_PULL_POLICY-}" ]] && + emtp_helm_args+=(--set webhookTestProxy.image.pullPolicy="$WEBHOOK_DOCKER_PULL_POLICY") +[[ ${#common_helm_args[@]} -gt 0 ]] && + emtp_helm_args+=("${common_helm_args[@]}") + +set -x +helm "${emtp_helm_args[@]}" +set +x + +TAINT_CHECK_CYCLES=15 +TAINT_CHECK_SLEEP=15 + +DEPLOYED=0 + +for i in $(seq 1 $TAINT_CHECK_CYCLES); do + if [[ $(kubectl get deployments regular-pod-test -o jsonpath='{.status.unavailableReplicas}') -eq 0 ]]; then + echo "✅ Verified regular-pod-test pod was scheduled and started!" + DEPLOYED=1 + break + fi + echo "Setup Loop $i/$TAINT_CHECK_CYCLES, sleeping for $TAINT_CHECK_SLEEP seconds" + sleep $TAINT_CHECK_SLEEP +done + +if [[ $DEPLOYED -eq 0 ]]; then + echo "❌ regular-pod-test pod deployment failed" + fail_and_exit 2 +fi + +AWS_HEALTH_MAINTENANCE_EVENT=$(cat < /dev/null; then + echo "✅ Verified the worker node was cordoned!" + cordoned=1 + fi + + if [[ $cordoned -eq 1 ]] && kubectl get nodes "${test_node}" -o json | grep "aws-node-termination-handler/scheduled-maintenance" >/dev/null; then + echo "✅ Verified the worker node was tainted!" + tainted=1 + fi + + if [[ $tainted -eq 1 && $(kubectl get deployments regular-pod-test -o=jsonpath='{.status.unavailableReplicas}') -eq 1 ]]; then + echo "✅ Verified the regular-pod-test pod was evicted!" + evicted=1 + fi + + if [[ ${evicted} -eq 1 && $(kubectl exec -i "${localstack_pod}" -- bash -c "${GET_ATTRS_SQS_CMD}" | jq '(.Attributes.ApproximateNumberOfMessagesNotVisible|tonumber) + (.Attributes.ApproximateNumberOfMessages|tonumber)' ) -eq 0 ]]; then + kubectl exec -i "${localstack_pod}" -- bash -c "${GET_ATTRS_SQS_CMD}" + echo "✅ Verified the message was deleted from the queue after processing!" + message_deleted=1 + echo "✅ AWS Health Maintenance SQS Test Passed $CLUSTER_NAME! ✅" + exit 0 + fi + + echo "Assertion Loop $i/$TAINT_CHECK_CYCLES, sleeping for $TAINT_CHECK_SLEEP seconds" + sleep $TAINT_CHECK_SLEEP +done + +if [[ $cordoned -eq 0 ]]; then + echo "❌ Worker node was not cordoned" + fail_and_exit 3 +elif [[ $tainted -eq 0 ]]; then + echo "❌ Worker node was not tainted" + fail_and_exit 3 +elif [[ $evicted -eq 0 ]]; then + echo "❌ regular-pod-test was NOT evicted" + fail_and_exit 3 +elif [[ $message_deleted -eq 0 ]]; then + echo "❌ message was not removed from the queue after processing" + fail_and_exit 3 +fi + +echo "❌ AWS Health Maintenance SQS Test Failed $CLUSTER_NAME ❌" +fail_and_exit 1 \ No newline at end of file From a3ee4769228367598ffc0629681c5ea33dd57cee Mon Sep 17 00:00:00 2001 From: Austin Siu Date: Wed, 20 Oct 2021 00:20:16 -0500 Subject: [PATCH 02/10] Update QP Scheduled Change event drain time, README, taint type --- ...led-event.go => scheduled-change-event.go} | 50 ++++++++----------- pkg/monitor/sqsevent/sqs-monitor.go | 12 ++--- 2 files changed, 27 insertions(+), 35 deletions(-) rename pkg/monitor/sqsevent/{scheduled-event.go => scheduled-change-event.go} (65%) diff --git a/pkg/monitor/sqsevent/scheduled-event.go b/pkg/monitor/sqsevent/scheduled-change-event.go similarity index 65% rename from pkg/monitor/sqsevent/scheduled-event.go rename to pkg/monitor/sqsevent/scheduled-change-event.go index fb9caa63..154c409e 100644 --- a/pkg/monitor/sqsevent/scheduled-event.go +++ b/pkg/monitor/sqsevent/scheduled-change-event.go @@ -16,7 +16,6 @@ package sqsevent import ( "encoding/json" "fmt" - "strings" "time" "github.com/aws/aws-node-termination-handler/pkg/monitor" @@ -61,50 +60,47 @@ type AffectedEntity struct { EntityValue string `json:"entityValue"` } -// ScheduledEventDetail holds the event details for AWS Health scheduled EC2 change events from Amazon EventBridge -type ScheduledEventDetail struct { +// ScheduledChangeEventDetail holds the event details for AWS Health scheduled EC2 change events from Amazon EventBridge +type ScheduledChangeEventDetail struct { EventTypeCategory string `json:"eventTypeCategory"` Service string `json:"service"` AffectedEntities []AffectedEntity `json:"affectedEntities"` } -const supportedEventCategoryTypes = "scheduledChange" - // func (m SQSMonitor) scheduledEventToInterruptionEvents(event EventBridgeEvent, message *sqs.Message) ([]InterruptionEventWrapper, error) { func (m SQSMonitor) scheduledEventToInterruptionEvents(event *EventBridgeEvent, message *sqs.Message) []InterruptionEventWrapper { - scheduledEventDetail := &ScheduledEventDetail{} + scheduledChangeEventDetail := &ScheduledChangeEventDetail{} + interruptionEventWrappers := []InterruptionEventWrapper{} + var err error - if err := json.Unmarshal(event.Detail, scheduledEventDetail); err != nil { - return []InterruptionEventWrapper{InterruptionEventWrapper{nil, err}} + if err = json.Unmarshal(event.Detail, scheduledChangeEventDetail); err != nil { + return append(interruptionEventWrappers, InterruptionEventWrapper{nil, err}) } - if scheduledEventDetail.Service != "EC2" { - return []InterruptionEventWrapper{InterruptionEventWrapper{nil, fmt.Errorf("events from Amazon EventBridge for service (%s) are not supported", scheduledEventDetail.Service)}} + if scheduledChangeEventDetail.Service != "EC2" { + err = fmt.Errorf("events from Amazon EventBridge for service (%s) are not supported", scheduledChangeEventDetail.Service) + return append(interruptionEventWrappers, InterruptionEventWrapper{nil, err}) } - if !strings.Contains(supportedEventCategoryTypes, scheduledEventDetail.EventTypeCategory) { - return nil, fmt.Errorf("events from Amazon EventBridge with EventTypeCategory (%s) are not supported", scheduledEventDetail.EventTypeCategory) + if scheduledChangeEventDetail.EventTypeCategory != "scheduledChange" { + err = fmt.Errorf("events from Amazon EventBridge with EventTypeCategory (%s) are not supported", scheduledChangeEventDetail.EventTypeCategory) + return append(interruptionEventWrappers, InterruptionEventWrapper{nil, err}) } - // interruptionEventWrappers := make([]InterruptionEventWrapper, len(event.Resources)) - interruptionEventWrappers := []InterruptionEventWrapper{} - - for _, affectedEntity := range scheduledEventDetail.AffectedEntities { - nodeName, err := m.retrieveNodeName(affectedEntity.EntityValue) + for _, affectedEntity := range scheduledChangeEventDetail.AffectedEntities { + nodeInfo, err := m.getNodeInfo(affectedEntity.EntityValue) if err != nil { - // interruptionEventWrappers[i] = InterruptionEventWrapper{nil, err} interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{nil, err}) continue } - asgName, _ := m.retrieveAutoScalingGroupName(affectedEntity.EntityValue) interruptionEvent := monitor.InterruptionEvent{ - EventID: fmt.Sprintf("aws-health-maintenance-event-%x", event.ID), + EventID: fmt.Sprintf("aws-health-scheduled-change-event-%x", event.ID), Kind: SQSTerminateKind, - AutoScalingGroupName: asgName, - StartTime: time.Now(), - NodeName: nodeName, - InstanceID: affectedEntity.EntityValue, - Description: fmt.Sprintf("AWS Health maintenance event received. Instance %s will be interrupted at %s \n", affectedEntity.EntityValue, event.getTime()), + AutoScalingGroupName: nodeInfo.AsgName, + StartTime: time.Now(), // interrupt immediately rather than waiting + NodeName: nodeInfo.Name, + InstanceID: nodeInfo.InstanceID, + Description: fmt.Sprintf("AWS Health scheduled change event received. Instance %s will be interrupted at %s \n", nodeInfo.InstanceID, event.getTime()), } interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error { errs := m.deleteMessages([]*sqs.Message{message}) @@ -121,10 +117,8 @@ func (m SQSMonitor) scheduledEventToInterruptionEvents(event *EventBridgeEvent, return nil } - // interruptionEventWrappers[i] = InterruptionEventWrapper{&interruptionEvent, nil} interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{&interruptionEvent, nil}) - } - return interruptionEventWrappers, nil + return interruptionEventWrappers } diff --git a/pkg/monitor/sqsevent/sqs-monitor.go b/pkg/monitor/sqsevent/sqs-monitor.go index c0da1552..cd79f3dc 100644 --- a/pkg/monitor/sqsevent/sqs-monitor.go +++ b/pkg/monitor/sqsevent/sqs-monitor.go @@ -64,7 +64,7 @@ func (m SQSMonitor) Kind() string { return SQSTerminateKind } -// Monitor continuously monitors SQS for events and sends interruption events to the passed in channel +// Monitor continuously monitors SQS for events and coordinates processing of the events func (m SQSMonitor) Monitor() error { log.Debug().Msg("Checking for queue messages") messages, err := m.receiveQueueMessages(m.QueueURL) @@ -96,8 +96,7 @@ func (m SQSMonitor) Monitor() error { return nil } -// processSQSMessage checks sqs for new messages and returns interruption events -// func (m SQSMonitor) processSQSMessage(message *sqs.Message) (*monitor.InterruptionEvent, error) { +// processSQSMessage interprets an SQS message and returns an EventBridge event func (m SQSMonitor) processSQSMessage(message *sqs.Message) (*EventBridgeEvent, error) { event := EventBridgeEvent{} err := json.Unmarshal([]byte(*message.Body), &event) @@ -105,7 +104,7 @@ func (m SQSMonitor) processSQSMessage(message *sqs.Message) (*EventBridgeEvent, return &event, err } -// +// processEventBridgeEvent processes an EventBridge event and returns interruption event wrappers func (m SQSMonitor) processEventBridgeEvent(eventBridgeEvent *EventBridgeEvent, message *sqs.Message) []InterruptionEventWrapper { interruptionEventWrappers := []InterruptionEventWrapper{} interruptionEvent := &monitor.InterruptionEvent{} @@ -136,7 +135,7 @@ func (m SQSMonitor) processEventBridgeEvent(eventBridgeEvent *EventBridgeEvent, return append(interruptionEventWrappers, InterruptionEventWrapper{interruptionEvent, err}) } -// +// processInterruptionEvents takes interruption event wrappers and sends interruption events to the passed-in channel func (m SQSMonitor) processInterruptionEvents(interruptionEventWrappers []InterruptionEventWrapper, message *sqs.Message) error { dropMessageSuggestionCount := 0 failedInterruptionEventsCount := 0 @@ -179,12 +178,11 @@ func (m SQSMonitor) processInterruptionEvents(interruptionEventWrappers []Interr errs := m.deleteMessages([]*sqs.Message{message}) if len(errs) > 0 { log.Err(errs[0]).Msg("Error deleting message from SQS") - // failedInterruptionEventsCount++ // wront count? } } if failedInterruptionEventsCount == 0 { - return nil + return nil // revisit, don't like that this can happen if err goes when dropping message } else { return fmt.Errorf("%b of %b interruption events for message Id %b could not be processed", failedInterruptionEventsCount, len(interruptionEventWrappers), message.MessageId) } From 8efb322c54257e282f77dc4fb31385be7f76e773 Mon Sep 17 00:00:00 2001 From: Austin Siu Date: Thu, 14 Oct 2021 01:40:25 -0500 Subject: [PATCH 03/10] Add AWS Health event support for QP mode --- pkg/monitor/sqsevent/scheduled-event.go | 129 ++++++++++++++++++++++++ pkg/monitor/sqsevent/sqs-monitor.go | 6 ++ 2 files changed, 135 insertions(+) create mode 100644 pkg/monitor/sqsevent/scheduled-event.go diff --git a/pkg/monitor/sqsevent/scheduled-event.go b/pkg/monitor/sqsevent/scheduled-event.go new file mode 100644 index 00000000..f8fd73f9 --- /dev/null +++ b/pkg/monitor/sqsevent/scheduled-event.go @@ -0,0 +1,129 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package sqsevent + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/aws/aws-node-termination-handler/pkg/monitor" + "github.com/aws/aws-node-termination-handler/pkg/node" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/rs/zerolog/log" +) + +/* Example AWS Health Scheduled Change EC2 Event: +{ + "version": "0", + "id": "7fb65329-1628-4cf3-a740-95fg457h1402", + "detail-type": "AWS Health Event", + "source": "aws.health", + "account": "account id", + "time": "2016-06-05T06:27:57Z", + "region": "us-east-1", + "resources": ["i-12345678"], + "detail": { + "eventArn": "arn:aws:health:region::event/id", + "service": "EC2", + "eventTypeCode": "AWS_EC2_DEDICATED_HOST_NETWORK_MAINTENANCE_SCHEDULED", + "eventTypeCategory": "scheduledChange", + "startTime": "Sat, 05 Jun 2016 15:10:09 GMT", + "eventDescription": [{ + "language": "en_US", + "latestDescription": "A description of the event will be provided here" + }], + "affectedEntities": [{ + "entityValue": "i-12345678", + "tags": { + "stage": "prod", + "app": "my-app" + } + }] + } +} +*/ + +// AffectedEntity holds information about an entity that is affected by a Health event +type AffectedEntity struct { + EntityValue string `json:"entityValue"` +} + +// ScheduledEventDetail holds the event details for AWS Health scheduled EC2 change events from Amazon EventBridge +type ScheduledEventDetail struct { + EventTypeCategory string `json:"eventTypeCategory"` + Service string `json:"service"` + AffectedEntities []AffectedEntity `json:"affectedEntities"` +} + +const supportedEventCategoryTypes = "scheduledChange" + +func (m SQSMonitor) scheduledEventToInterruptionEvents(event EventBridgeEvent, message *sqs.Message) ([]InterruptionEventWrapper, error) { + scheduledEventDetail := &ScheduledEventDetail{} + + if err := json.Unmarshal(event.Detail, scheduledEventDetail); err != nil { + return nil, err + } + + if scheduledEventDetail.Service != "EC2" { + return nil, fmt.Errorf("events from Amazon EventBridge for service (%s) are not supported", scheduledEventDetail.Service) + } + + if !strings.Contains(supportedEventCategoryTypes, scheduledEventDetail.EventTypeCategory) { + return nil, fmt.Errorf("events from Amazon EventBridge with EventTypeCategory (%s) are not supported", scheduledEventDetail.EventTypeCategory) + } + + // interruptionEventWrappers := make([]InterruptionEventWrapper, len(event.Resources)) + interruptionEventWrappers := []InterruptionEventWrapper{} + + for _, affectedEntity := range scheduledEventDetail.AffectedEntities { + nodeName, err := m.retrieveNodeName(affectedEntity.EntityValue) + if err != nil { + // interruptionEventWrappers[i] = InterruptionEventWrapper{nil, err} + interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{nil, err}) + continue + } + asgName, _ := m.retrieveAutoScalingGroupName(affectedEntity.EntityValue) + interruptionEvent := monitor.InterruptionEvent{ + EventID: fmt.Sprintf("aws-health-maintenance-event-%x", event.ID), + Kind: SQSTerminateKind, + AutoScalingGroupName: asgName, + StartTime: time.Now(), + NodeName: nodeName, + InstanceID: affectedEntity.EntityValue, + Description: fmt.Sprintf("AWS Health maintenance event received. Instance %s will be interrupted at %s \n", affectedEntity.EntityValue, event.getTime()), + } + interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error { + errs := m.deleteMessages([]*sqs.Message{message}) + if errs != nil { + return errs[0] + } + return nil + } + interruptionEvent.PreDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error { + err := n.TaintScheduledMaintenance(interruptionEvent.NodeName, interruptionEvent.EventID) + if err != nil { + log.Err(err).Msgf("Unable to taint node with taint %s:%s", node.ScheduledMaintenanceTaint, interruptionEvent.EventID) + } + return nil + } + + // interruptionEventWrappers[i] = InterruptionEventWrapper{&interruptionEvent, nil} + interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{&interruptionEvent, nil}) + + } + + return interruptionEventWrappers, nil +} diff --git a/pkg/monitor/sqsevent/sqs-monitor.go b/pkg/monitor/sqsevent/sqs-monitor.go index cd79f3dc..924b9f65 100644 --- a/pkg/monitor/sqsevent/sqs-monitor.go +++ b/pkg/monitor/sqsevent/sqs-monitor.go @@ -59,6 +59,12 @@ type InterruptionEventWrapper struct { Err error } +// Convenience wrapper for handling a pair of an interruption event and a related error +type InterruptionEventWrapper struct { + InterruptionEvent *monitor.InterruptionEvent + Err error +} + // Kind denotes the kind of event that is processed func (m SQSMonitor) Kind() string { return SQSTerminateKind From 814a35a2606b74330b7baba938ac528e731023e0 Mon Sep 17 00:00:00 2001 From: Austin Siu Date: Wed, 20 Oct 2021 00:20:16 -0500 Subject: [PATCH 04/10] Update QP Scheduled Change event drain time, README, taint type --- pkg/monitor/sqsevent/scheduled-event.go | 129 ------------------------ 1 file changed, 129 deletions(-) delete mode 100644 pkg/monitor/sqsevent/scheduled-event.go diff --git a/pkg/monitor/sqsevent/scheduled-event.go b/pkg/monitor/sqsevent/scheduled-event.go deleted file mode 100644 index f8fd73f9..00000000 --- a/pkg/monitor/sqsevent/scheduled-event.go +++ /dev/null @@ -1,129 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -package sqsevent - -import ( - "encoding/json" - "fmt" - "strings" - "time" - - "github.com/aws/aws-node-termination-handler/pkg/monitor" - "github.com/aws/aws-node-termination-handler/pkg/node" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/rs/zerolog/log" -) - -/* Example AWS Health Scheduled Change EC2 Event: -{ - "version": "0", - "id": "7fb65329-1628-4cf3-a740-95fg457h1402", - "detail-type": "AWS Health Event", - "source": "aws.health", - "account": "account id", - "time": "2016-06-05T06:27:57Z", - "region": "us-east-1", - "resources": ["i-12345678"], - "detail": { - "eventArn": "arn:aws:health:region::event/id", - "service": "EC2", - "eventTypeCode": "AWS_EC2_DEDICATED_HOST_NETWORK_MAINTENANCE_SCHEDULED", - "eventTypeCategory": "scheduledChange", - "startTime": "Sat, 05 Jun 2016 15:10:09 GMT", - "eventDescription": [{ - "language": "en_US", - "latestDescription": "A description of the event will be provided here" - }], - "affectedEntities": [{ - "entityValue": "i-12345678", - "tags": { - "stage": "prod", - "app": "my-app" - } - }] - } -} -*/ - -// AffectedEntity holds information about an entity that is affected by a Health event -type AffectedEntity struct { - EntityValue string `json:"entityValue"` -} - -// ScheduledEventDetail holds the event details for AWS Health scheduled EC2 change events from Amazon EventBridge -type ScheduledEventDetail struct { - EventTypeCategory string `json:"eventTypeCategory"` - Service string `json:"service"` - AffectedEntities []AffectedEntity `json:"affectedEntities"` -} - -const supportedEventCategoryTypes = "scheduledChange" - -func (m SQSMonitor) scheduledEventToInterruptionEvents(event EventBridgeEvent, message *sqs.Message) ([]InterruptionEventWrapper, error) { - scheduledEventDetail := &ScheduledEventDetail{} - - if err := json.Unmarshal(event.Detail, scheduledEventDetail); err != nil { - return nil, err - } - - if scheduledEventDetail.Service != "EC2" { - return nil, fmt.Errorf("events from Amazon EventBridge for service (%s) are not supported", scheduledEventDetail.Service) - } - - if !strings.Contains(supportedEventCategoryTypes, scheduledEventDetail.EventTypeCategory) { - return nil, fmt.Errorf("events from Amazon EventBridge with EventTypeCategory (%s) are not supported", scheduledEventDetail.EventTypeCategory) - } - - // interruptionEventWrappers := make([]InterruptionEventWrapper, len(event.Resources)) - interruptionEventWrappers := []InterruptionEventWrapper{} - - for _, affectedEntity := range scheduledEventDetail.AffectedEntities { - nodeName, err := m.retrieveNodeName(affectedEntity.EntityValue) - if err != nil { - // interruptionEventWrappers[i] = InterruptionEventWrapper{nil, err} - interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{nil, err}) - continue - } - asgName, _ := m.retrieveAutoScalingGroupName(affectedEntity.EntityValue) - interruptionEvent := monitor.InterruptionEvent{ - EventID: fmt.Sprintf("aws-health-maintenance-event-%x", event.ID), - Kind: SQSTerminateKind, - AutoScalingGroupName: asgName, - StartTime: time.Now(), - NodeName: nodeName, - InstanceID: affectedEntity.EntityValue, - Description: fmt.Sprintf("AWS Health maintenance event received. Instance %s will be interrupted at %s \n", affectedEntity.EntityValue, event.getTime()), - } - interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error { - errs := m.deleteMessages([]*sqs.Message{message}) - if errs != nil { - return errs[0] - } - return nil - } - interruptionEvent.PreDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error { - err := n.TaintScheduledMaintenance(interruptionEvent.NodeName, interruptionEvent.EventID) - if err != nil { - log.Err(err).Msgf("Unable to taint node with taint %s:%s", node.ScheduledMaintenanceTaint, interruptionEvent.EventID) - } - return nil - } - - // interruptionEventWrappers[i] = InterruptionEventWrapper{&interruptionEvent, nil} - interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{&interruptionEvent, nil}) - - } - - return interruptionEventWrappers, nil -} From d621abdef658d643e4c7d549d5350c118ef26ce3 Mon Sep 17 00:00:00 2001 From: Austin Siu Date: Tue, 26 Oct 2021 01:14:36 -0500 Subject: [PATCH 05/10] Update AWS Health event handler names, update e2e run command with ASG system tag --- .../sqsevent/scheduled-change-event.go | 2 +- pkg/monitor/sqsevent/sqs-monitor.go | 23 ++++++++----------- test/e2e/scheduled-change-event-sqs-test | 18 +++++++-------- 3 files changed, 20 insertions(+), 23 deletions(-) diff --git a/pkg/monitor/sqsevent/scheduled-change-event.go b/pkg/monitor/sqsevent/scheduled-change-event.go index 154c409e..740a0e13 100644 --- a/pkg/monitor/sqsevent/scheduled-change-event.go +++ b/pkg/monitor/sqsevent/scheduled-change-event.go @@ -97,7 +97,7 @@ func (m SQSMonitor) scheduledEventToInterruptionEvents(event *EventBridgeEvent, EventID: fmt.Sprintf("aws-health-scheduled-change-event-%x", event.ID), Kind: SQSTerminateKind, AutoScalingGroupName: nodeInfo.AsgName, - StartTime: time.Now(), // interrupt immediately rather than waiting + StartTime: time.Now(), // begin draining immediately after the notification is processed NodeName: nodeInfo.Name, InstanceID: nodeInfo.InstanceID, Description: fmt.Sprintf("AWS Health scheduled change event received. Instance %s will be interrupted at %s \n", nodeInfo.InstanceID, event.getTime()), diff --git a/pkg/monitor/sqsevent/sqs-monitor.go b/pkg/monitor/sqsevent/sqs-monitor.go index 924b9f65..b00571b4 100644 --- a/pkg/monitor/sqsevent/sqs-monitor.go +++ b/pkg/monitor/sqsevent/sqs-monitor.go @@ -59,12 +59,6 @@ type InterruptionEventWrapper struct { Err error } -// Convenience wrapper for handling a pair of an interruption event and a related error -type InterruptionEventWrapper struct { - InterruptionEvent *monitor.InterruptionEvent - Err error -} - // Kind denotes the kind of event that is processed func (m SQSMonitor) Kind() string { return SQSTerminateKind @@ -84,6 +78,7 @@ func (m SQSMonitor) Monitor() error { if err != nil { log.Err(err).Msg("error processing SQS message") failedEventBridgeEvents++ + continue } interruptionEventWrappers := m.processEventBridgeEvent(eventBridgeEvent, message) @@ -119,6 +114,7 @@ func (m SQSMonitor) processEventBridgeEvent(eventBridgeEvent *EventBridgeEvent, switch eventBridgeEvent.Source { case "aws.autoscaling": interruptionEvent, err = m.asgTerminationToInterruptionEvent(eventBridgeEvent, message) + return append(interruptionEventWrappers, InterruptionEventWrapper{interruptionEvent, err}) case "aws.ec2": if eventBridgeEvent.DetailType == "EC2 Instance State-change Notification" { @@ -128,17 +124,17 @@ func (m SQSMonitor) processEventBridgeEvent(eventBridgeEvent *EventBridgeEvent, } else if eventBridgeEvent.DetailType == "EC2 Instance Rebalance Recommendation" { interruptionEvent, err = m.rebalanceRecommendationToInterruptionEvent(eventBridgeEvent, message) } + return append(interruptionEventWrappers, InterruptionEventWrapper{interruptionEvent, err}) case "aws.health": if eventBridgeEvent.DetailType == "AWS Health Event" { interruptionEventWrappers = m.scheduledEventToInterruptionEvents(eventBridgeEvent, message) + return interruptionEventWrappers } - - default: - return append(interruptionEventWrappers, InterruptionEventWrapper{nil, fmt.Errorf("Event source (%s) is not supported", eventBridgeEvent.Source)}) } - return append(interruptionEventWrappers, InterruptionEventWrapper{interruptionEvent, err}) + err = fmt.Errorf("event source (%s) is not supported", eventBridgeEvent.Source) + return append(interruptionEventWrappers, InterruptionEventWrapper{nil, err}) } // processInterruptionEvents takes interruption event wrappers and sends interruption events to the passed-in channel @@ -184,13 +180,14 @@ func (m SQSMonitor) processInterruptionEvents(interruptionEventWrappers []Interr errs := m.deleteMessages([]*sqs.Message{message}) if len(errs) > 0 { log.Err(errs[0]).Msg("Error deleting message from SQS") + failedInterruptionEventsCount++ } } - if failedInterruptionEventsCount == 0 { - return nil // revisit, don't like that this can happen if err goes when dropping message + if failedInterruptionEventsCount != 0 { + return fmt.Errorf("some interruption events for message Id %b could not be processed", message.MessageId) } else { - return fmt.Errorf("%b of %b interruption events for message Id %b could not be processed", failedInterruptionEventsCount, len(interruptionEventWrappers), message.MessageId) + return nil } } diff --git a/test/e2e/scheduled-change-event-sqs-test b/test/e2e/scheduled-change-event-sqs-test index 9db41955..0fc33b9a 100755 --- a/test/e2e/scheduled-change-event-sqs-test +++ b/test/e2e/scheduled-change-event-sqs-test @@ -14,11 +14,11 @@ set -euo pipefail function fail_and_exit { - echo "❌ AWS Health Maintenance SQS Test failed $CLUSTER_NAME ❌" + echo "❌ AWS Scheduled Change SQS Test failed $CLUSTER_NAME ❌" exit "${1:-1}" } -echo "Starting AWS Health Maintenance SQS Test for Node Termination Handler" +echo "Starting AWS Scheduled Change SQS Test for Node Termination Handler" START_TIME=$(date -u +"%Y-%m-%dT%TZ") SCRIPTPATH="$( cd "$(dirname "$0")" ; pwd -P )" @@ -42,7 +42,7 @@ set +x sleep 10 -RUN_INSTANCE_CMD="awslocal ec2 run-instances --private-ip-address ${WORKER_IP} --region ${AWS_REGION}" +RUN_INSTANCE_CMD="awslocal ec2 run-instances --private-ip-address ${WORKER_IP} --region ${AWS_REGION} --tag-specifications 'ResourceType=instance,Tags=[{Key=aws:autoscaling:groupName,Value=nth-integ-test}]'" localstack_pod=$(kubectl get pods --selector app=localstack --field-selector="status.phase=Running" \ -o go-template --template '{{range .items}}{{.metadata.name}} {{.metadata.creationTimestamp}}{{"\n"}}{{end}}' \ | awk '$2 >= "'"${START_TIME//+0000/Z}"'" { print $1 }') @@ -125,7 +125,7 @@ if [[ $DEPLOYED -eq 0 ]]; then fail_and_exit 2 fi -AWS_HEALTH_MAINTENANCE_EVENT=$(cat < Date: Thu, 14 Oct 2021 01:40:25 -0500 Subject: [PATCH 06/10] Add AWS Health event support for QP mode --- pkg/monitor/sqsevent/scheduled-event.go | 129 ++++++++++++++++++++++++ pkg/monitor/sqsevent/sqs-monitor.go | 6 ++ 2 files changed, 135 insertions(+) create mode 100644 pkg/monitor/sqsevent/scheduled-event.go diff --git a/pkg/monitor/sqsevent/scheduled-event.go b/pkg/monitor/sqsevent/scheduled-event.go new file mode 100644 index 00000000..f8fd73f9 --- /dev/null +++ b/pkg/monitor/sqsevent/scheduled-event.go @@ -0,0 +1,129 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package sqsevent + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/aws/aws-node-termination-handler/pkg/monitor" + "github.com/aws/aws-node-termination-handler/pkg/node" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/rs/zerolog/log" +) + +/* Example AWS Health Scheduled Change EC2 Event: +{ + "version": "0", + "id": "7fb65329-1628-4cf3-a740-95fg457h1402", + "detail-type": "AWS Health Event", + "source": "aws.health", + "account": "account id", + "time": "2016-06-05T06:27:57Z", + "region": "us-east-1", + "resources": ["i-12345678"], + "detail": { + "eventArn": "arn:aws:health:region::event/id", + "service": "EC2", + "eventTypeCode": "AWS_EC2_DEDICATED_HOST_NETWORK_MAINTENANCE_SCHEDULED", + "eventTypeCategory": "scheduledChange", + "startTime": "Sat, 05 Jun 2016 15:10:09 GMT", + "eventDescription": [{ + "language": "en_US", + "latestDescription": "A description of the event will be provided here" + }], + "affectedEntities": [{ + "entityValue": "i-12345678", + "tags": { + "stage": "prod", + "app": "my-app" + } + }] + } +} +*/ + +// AffectedEntity holds information about an entity that is affected by a Health event +type AffectedEntity struct { + EntityValue string `json:"entityValue"` +} + +// ScheduledEventDetail holds the event details for AWS Health scheduled EC2 change events from Amazon EventBridge +type ScheduledEventDetail struct { + EventTypeCategory string `json:"eventTypeCategory"` + Service string `json:"service"` + AffectedEntities []AffectedEntity `json:"affectedEntities"` +} + +const supportedEventCategoryTypes = "scheduledChange" + +func (m SQSMonitor) scheduledEventToInterruptionEvents(event EventBridgeEvent, message *sqs.Message) ([]InterruptionEventWrapper, error) { + scheduledEventDetail := &ScheduledEventDetail{} + + if err := json.Unmarshal(event.Detail, scheduledEventDetail); err != nil { + return nil, err + } + + if scheduledEventDetail.Service != "EC2" { + return nil, fmt.Errorf("events from Amazon EventBridge for service (%s) are not supported", scheduledEventDetail.Service) + } + + if !strings.Contains(supportedEventCategoryTypes, scheduledEventDetail.EventTypeCategory) { + return nil, fmt.Errorf("events from Amazon EventBridge with EventTypeCategory (%s) are not supported", scheduledEventDetail.EventTypeCategory) + } + + // interruptionEventWrappers := make([]InterruptionEventWrapper, len(event.Resources)) + interruptionEventWrappers := []InterruptionEventWrapper{} + + for _, affectedEntity := range scheduledEventDetail.AffectedEntities { + nodeName, err := m.retrieveNodeName(affectedEntity.EntityValue) + if err != nil { + // interruptionEventWrappers[i] = InterruptionEventWrapper{nil, err} + interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{nil, err}) + continue + } + asgName, _ := m.retrieveAutoScalingGroupName(affectedEntity.EntityValue) + interruptionEvent := monitor.InterruptionEvent{ + EventID: fmt.Sprintf("aws-health-maintenance-event-%x", event.ID), + Kind: SQSTerminateKind, + AutoScalingGroupName: asgName, + StartTime: time.Now(), + NodeName: nodeName, + InstanceID: affectedEntity.EntityValue, + Description: fmt.Sprintf("AWS Health maintenance event received. Instance %s will be interrupted at %s \n", affectedEntity.EntityValue, event.getTime()), + } + interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error { + errs := m.deleteMessages([]*sqs.Message{message}) + if errs != nil { + return errs[0] + } + return nil + } + interruptionEvent.PreDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error { + err := n.TaintScheduledMaintenance(interruptionEvent.NodeName, interruptionEvent.EventID) + if err != nil { + log.Err(err).Msgf("Unable to taint node with taint %s:%s", node.ScheduledMaintenanceTaint, interruptionEvent.EventID) + } + return nil + } + + // interruptionEventWrappers[i] = InterruptionEventWrapper{&interruptionEvent, nil} + interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{&interruptionEvent, nil}) + + } + + return interruptionEventWrappers, nil +} diff --git a/pkg/monitor/sqsevent/sqs-monitor.go b/pkg/monitor/sqsevent/sqs-monitor.go index b00571b4..a71c4fdd 100644 --- a/pkg/monitor/sqsevent/sqs-monitor.go +++ b/pkg/monitor/sqsevent/sqs-monitor.go @@ -59,6 +59,12 @@ type InterruptionEventWrapper struct { Err error } +// Convenience wrapper for handling a pair of an interruption event and a related error +type InterruptionEventWrapper struct { + InterruptionEvent *monitor.InterruptionEvent + Err error +} + // Kind denotes the kind of event that is processed func (m SQSMonitor) Kind() string { return SQSTerminateKind From 9df853bc59dd1e06045705455fbf1812ac61af8d Mon Sep 17 00:00:00 2001 From: Austin Siu Date: Wed, 20 Oct 2021 00:20:16 -0500 Subject: [PATCH 07/10] Update QP Scheduled Change event drain time, README, taint type --- pkg/monitor/sqsevent/scheduled-event.go | 129 ------------------------ 1 file changed, 129 deletions(-) delete mode 100644 pkg/monitor/sqsevent/scheduled-event.go diff --git a/pkg/monitor/sqsevent/scheduled-event.go b/pkg/monitor/sqsevent/scheduled-event.go deleted file mode 100644 index f8fd73f9..00000000 --- a/pkg/monitor/sqsevent/scheduled-event.go +++ /dev/null @@ -1,129 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -package sqsevent - -import ( - "encoding/json" - "fmt" - "strings" - "time" - - "github.com/aws/aws-node-termination-handler/pkg/monitor" - "github.com/aws/aws-node-termination-handler/pkg/node" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/rs/zerolog/log" -) - -/* Example AWS Health Scheduled Change EC2 Event: -{ - "version": "0", - "id": "7fb65329-1628-4cf3-a740-95fg457h1402", - "detail-type": "AWS Health Event", - "source": "aws.health", - "account": "account id", - "time": "2016-06-05T06:27:57Z", - "region": "us-east-1", - "resources": ["i-12345678"], - "detail": { - "eventArn": "arn:aws:health:region::event/id", - "service": "EC2", - "eventTypeCode": "AWS_EC2_DEDICATED_HOST_NETWORK_MAINTENANCE_SCHEDULED", - "eventTypeCategory": "scheduledChange", - "startTime": "Sat, 05 Jun 2016 15:10:09 GMT", - "eventDescription": [{ - "language": "en_US", - "latestDescription": "A description of the event will be provided here" - }], - "affectedEntities": [{ - "entityValue": "i-12345678", - "tags": { - "stage": "prod", - "app": "my-app" - } - }] - } -} -*/ - -// AffectedEntity holds information about an entity that is affected by a Health event -type AffectedEntity struct { - EntityValue string `json:"entityValue"` -} - -// ScheduledEventDetail holds the event details for AWS Health scheduled EC2 change events from Amazon EventBridge -type ScheduledEventDetail struct { - EventTypeCategory string `json:"eventTypeCategory"` - Service string `json:"service"` - AffectedEntities []AffectedEntity `json:"affectedEntities"` -} - -const supportedEventCategoryTypes = "scheduledChange" - -func (m SQSMonitor) scheduledEventToInterruptionEvents(event EventBridgeEvent, message *sqs.Message) ([]InterruptionEventWrapper, error) { - scheduledEventDetail := &ScheduledEventDetail{} - - if err := json.Unmarshal(event.Detail, scheduledEventDetail); err != nil { - return nil, err - } - - if scheduledEventDetail.Service != "EC2" { - return nil, fmt.Errorf("events from Amazon EventBridge for service (%s) are not supported", scheduledEventDetail.Service) - } - - if !strings.Contains(supportedEventCategoryTypes, scheduledEventDetail.EventTypeCategory) { - return nil, fmt.Errorf("events from Amazon EventBridge with EventTypeCategory (%s) are not supported", scheduledEventDetail.EventTypeCategory) - } - - // interruptionEventWrappers := make([]InterruptionEventWrapper, len(event.Resources)) - interruptionEventWrappers := []InterruptionEventWrapper{} - - for _, affectedEntity := range scheduledEventDetail.AffectedEntities { - nodeName, err := m.retrieveNodeName(affectedEntity.EntityValue) - if err != nil { - // interruptionEventWrappers[i] = InterruptionEventWrapper{nil, err} - interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{nil, err}) - continue - } - asgName, _ := m.retrieveAutoScalingGroupName(affectedEntity.EntityValue) - interruptionEvent := monitor.InterruptionEvent{ - EventID: fmt.Sprintf("aws-health-maintenance-event-%x", event.ID), - Kind: SQSTerminateKind, - AutoScalingGroupName: asgName, - StartTime: time.Now(), - NodeName: nodeName, - InstanceID: affectedEntity.EntityValue, - Description: fmt.Sprintf("AWS Health maintenance event received. Instance %s will be interrupted at %s \n", affectedEntity.EntityValue, event.getTime()), - } - interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error { - errs := m.deleteMessages([]*sqs.Message{message}) - if errs != nil { - return errs[0] - } - return nil - } - interruptionEvent.PreDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error { - err := n.TaintScheduledMaintenance(interruptionEvent.NodeName, interruptionEvent.EventID) - if err != nil { - log.Err(err).Msgf("Unable to taint node with taint %s:%s", node.ScheduledMaintenanceTaint, interruptionEvent.EventID) - } - return nil - } - - // interruptionEventWrappers[i] = InterruptionEventWrapper{&interruptionEvent, nil} - interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{&interruptionEvent, nil}) - - } - - return interruptionEventWrappers, nil -} From acfa0bbe5e8311e237499f2c4526eeade104e9dc Mon Sep 17 00:00:00 2001 From: Austin Siu Date: Tue, 26 Oct 2021 01:14:36 -0500 Subject: [PATCH 08/10] Update AWS Health event handler names, update e2e run command with ASG system tag --- pkg/monitor/sqsevent/sqs-monitor.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/monitor/sqsevent/sqs-monitor.go b/pkg/monitor/sqsevent/sqs-monitor.go index a71c4fdd..b00571b4 100644 --- a/pkg/monitor/sqsevent/sqs-monitor.go +++ b/pkg/monitor/sqsevent/sqs-monitor.go @@ -59,12 +59,6 @@ type InterruptionEventWrapper struct { Err error } -// Convenience wrapper for handling a pair of an interruption event and a related error -type InterruptionEventWrapper struct { - InterruptionEvent *monitor.InterruptionEvent - Err error -} - // Kind denotes the kind of event that is processed func (m SQSMonitor) Kind() string { return SQSTerminateKind From 23a7c6f4e362e94c504252cd94abb3451e5ae2ab Mon Sep 17 00:00:00 2001 From: Austin Siu Date: Tue, 26 Oct 2021 11:52:45 -0500 Subject: [PATCH 09/10] Improve comments related to Health events and scheduled changes --- pkg/monitor/sqsevent/scheduled-change-event.go | 5 +++-- pkg/monitor/sqsevent/sqs-monitor.go | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/monitor/sqsevent/scheduled-change-event.go b/pkg/monitor/sqsevent/scheduled-change-event.go index 740a0e13..664f59df 100644 --- a/pkg/monitor/sqsevent/scheduled-change-event.go +++ b/pkg/monitor/sqsevent/scheduled-change-event.go @@ -67,7 +67,6 @@ type ScheduledChangeEventDetail struct { AffectedEntities []AffectedEntity `json:"affectedEntities"` } -// func (m SQSMonitor) scheduledEventToInterruptionEvents(event EventBridgeEvent, message *sqs.Message) ([]InterruptionEventWrapper, error) { func (m SQSMonitor) scheduledEventToInterruptionEvents(event *EventBridgeEvent, message *sqs.Message) []InterruptionEventWrapper { scheduledChangeEventDetail := &ScheduledChangeEventDetail{} interruptionEventWrappers := []InterruptionEventWrapper{} @@ -93,11 +92,13 @@ func (m SQSMonitor) scheduledEventToInterruptionEvents(event *EventBridgeEvent, interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{nil, err}) continue } + + // Begin drain immediately for scheduled change events to avoid disruptions in cases such as degraded hardware interruptionEvent := monitor.InterruptionEvent{ EventID: fmt.Sprintf("aws-health-scheduled-change-event-%x", event.ID), Kind: SQSTerminateKind, AutoScalingGroupName: nodeInfo.AsgName, - StartTime: time.Now(), // begin draining immediately after the notification is processed + StartTime: time.Now(), NodeName: nodeInfo.Name, InstanceID: nodeInfo.InstanceID, Description: fmt.Sprintf("AWS Health scheduled change event received. Instance %s will be interrupted at %s \n", nodeInfo.InstanceID, event.getTime()), diff --git a/pkg/monitor/sqsevent/sqs-monitor.go b/pkg/monitor/sqsevent/sqs-monitor.go index b00571b4..75972c96 100644 --- a/pkg/monitor/sqsevent/sqs-monitor.go +++ b/pkg/monitor/sqsevent/sqs-monitor.go @@ -53,7 +53,7 @@ type SQSMonitor struct { ManagedAsgTag string } -// Convenience wrapper for handling a pair of an interruption event and a related error +// InterruptionEventWrapper is a convenience wrapper for associating an interruption event with its error, if any type InterruptionEventWrapper struct { InterruptionEvent *monitor.InterruptionEvent Err error @@ -137,7 +137,7 @@ func (m SQSMonitor) processEventBridgeEvent(eventBridgeEvent *EventBridgeEvent, return append(interruptionEventWrappers, InterruptionEventWrapper{nil, err}) } -// processInterruptionEvents takes interruption event wrappers and sends interruption events to the passed-in channel +// processInterruptionEvents takes interruption event wrappers and sends events to the interruption channel func (m SQSMonitor) processInterruptionEvents(interruptionEventWrappers []InterruptionEventWrapper, message *sqs.Message) error { dropMessageSuggestionCount := 0 failedInterruptionEventsCount := 0 From 5ad7ffc1a5fd031cb2e292496287dff7345dd33e Mon Sep 17 00:00:00 2001 From: Austin Siu Date: Wed, 27 Oct 2021 10:26:48 -0500 Subject: [PATCH 10/10] Reduce scope of several variables --- pkg/monitor/sqsevent/scheduled-change-event.go | 13 +++++-------- pkg/monitor/sqsevent/sqs-monitor.go | 10 ++++------ 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/pkg/monitor/sqsevent/scheduled-change-event.go b/pkg/monitor/sqsevent/scheduled-change-event.go index 664f59df..caca3c20 100644 --- a/pkg/monitor/sqsevent/scheduled-change-event.go +++ b/pkg/monitor/sqsevent/scheduled-change-event.go @@ -70,19 +70,18 @@ type ScheduledChangeEventDetail struct { func (m SQSMonitor) scheduledEventToInterruptionEvents(event *EventBridgeEvent, message *sqs.Message) []InterruptionEventWrapper { scheduledChangeEventDetail := &ScheduledChangeEventDetail{} interruptionEventWrappers := []InterruptionEventWrapper{} - var err error - if err = json.Unmarshal(event.Detail, scheduledChangeEventDetail); err != nil { + if err := json.Unmarshal(event.Detail, scheduledChangeEventDetail); err != nil { return append(interruptionEventWrappers, InterruptionEventWrapper{nil, err}) } if scheduledChangeEventDetail.Service != "EC2" { - err = fmt.Errorf("events from Amazon EventBridge for service (%s) are not supported", scheduledChangeEventDetail.Service) + err := fmt.Errorf("events from Amazon EventBridge for service (%s) are not supported", scheduledChangeEventDetail.Service) return append(interruptionEventWrappers, InterruptionEventWrapper{nil, err}) } if scheduledChangeEventDetail.EventTypeCategory != "scheduledChange" { - err = fmt.Errorf("events from Amazon EventBridge with EventTypeCategory (%s) are not supported", scheduledChangeEventDetail.EventTypeCategory) + err := fmt.Errorf("events from Amazon EventBridge with EventTypeCategory (%s) are not supported", scheduledChangeEventDetail.EventTypeCategory) return append(interruptionEventWrappers, InterruptionEventWrapper{nil, err}) } @@ -104,15 +103,13 @@ func (m SQSMonitor) scheduledEventToInterruptionEvents(event *EventBridgeEvent, Description: fmt.Sprintf("AWS Health scheduled change event received. Instance %s will be interrupted at %s \n", nodeInfo.InstanceID, event.getTime()), } interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error { - errs := m.deleteMessages([]*sqs.Message{message}) - if errs != nil { + if errs := m.deleteMessages([]*sqs.Message{message}); errs != nil { return errs[0] } return nil } interruptionEvent.PreDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error { - err := n.TaintScheduledMaintenance(interruptionEvent.NodeName, interruptionEvent.EventID) - if err != nil { + if err := n.TaintScheduledMaintenance(interruptionEvent.NodeName, interruptionEvent.EventID); err != nil { log.Err(err).Msgf("Unable to taint node with taint %s:%s", node.ScheduledMaintenanceTaint, interruptionEvent.EventID) } return nil diff --git a/pkg/monitor/sqsevent/sqs-monitor.go b/pkg/monitor/sqsevent/sqs-monitor.go index 75972c96..5cc3f6fd 100644 --- a/pkg/monitor/sqsevent/sqs-monitor.go +++ b/pkg/monitor/sqsevent/sqs-monitor.go @@ -83,8 +83,7 @@ func (m SQSMonitor) Monitor() error { interruptionEventWrappers := m.processEventBridgeEvent(eventBridgeEvent, message) - err = m.processInterruptionEvents(interruptionEventWrappers, message) - if err != nil { + if err = m.processInterruptionEvents(interruptionEventWrappers, message); err != nil { log.Err(err).Msg("error processing interruption events") failedEventBridgeEvents++ } @@ -128,8 +127,7 @@ func (m SQSMonitor) processEventBridgeEvent(eventBridgeEvent *EventBridgeEvent, case "aws.health": if eventBridgeEvent.DetailType == "AWS Health Event" { - interruptionEventWrappers = m.scheduledEventToInterruptionEvents(eventBridgeEvent, message) - return interruptionEventWrappers + return m.scheduledEventToInterruptionEvents(eventBridgeEvent, message) } } @@ -186,9 +184,9 @@ func (m SQSMonitor) processInterruptionEvents(interruptionEventWrappers []Interr if failedInterruptionEventsCount != 0 { return fmt.Errorf("some interruption events for message Id %b could not be processed", message.MessageId) - } else { - return nil } + + return nil } // receiveQueueMessages checks the configured SQS queue for new messages