Skip to content

Commit b312b87

Browse files
committed
Add AWS Health event support for QP mode
1 parent 6a22485 commit b312b87

File tree

5 files changed

+421
-34
lines changed

5 files changed

+421
-34
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ require (
1313
go.opentelemetry.io/otel v0.20.0
1414
go.opentelemetry.io/otel/exporters/metric/prometheus v0.20.0
1515
go.opentelemetry.io/otel/metric v0.20.0
16+
go.uber.org/multierr v1.7.0
1617
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a // indirect
1718
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
1819
golang.org/x/sys v0.0.0-20210608053332-aa57babbf139

go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -608,8 +608,12 @@ go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5/go.mod h1:nmDLcffg48OtT/PSW0H
608608
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
609609
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
610610
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
611+
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
612+
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
611613
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
612614
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
615+
go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec=
616+
go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
613617
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
614618
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
615619
go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
@@ -916,6 +920,8 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
916920
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
917921
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
918922
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
923+
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
924+
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
919925
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
920926
gotest.tools/v3 v3.0.3 h1:4AuOwCGf4lLR9u3YOe2awrHygurzhO/HeQ6laiA6Sx0=
921927
gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8=
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"). You may
4+
// not use this file except in compliance with the License. A copy of the
5+
// License is located at
6+
//
7+
// http://aws.amazon.com/apache2.0/
8+
//
9+
// or in the "license" file accompanying this file. This file is distributed
10+
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
// express or implied. See the License for the specific language governing
12+
// permissions and limitations under the License.
13+
14+
package sqsevent
15+
16+
import (
17+
"encoding/json"
18+
"fmt"
19+
"strings"
20+
21+
"github.com/aws/aws-node-termination-handler/pkg/monitor"
22+
"github.com/aws/aws-node-termination-handler/pkg/node"
23+
"github.com/aws/aws-sdk-go/service/sqs"
24+
"github.com/rs/zerolog/log"
25+
)
26+
27+
/* Example AWS Health Scheduled Maintenance EC2 Event:
28+
{
29+
"version": "0",
30+
"id": "7fb65329-1628-4cf3-a740-95fg457h1402",
31+
"detail-type": "AWS Health Event",
32+
"source": "aws.health",
33+
"account": "account id",
34+
"time": "2016-06-05T06:27:57Z",
35+
"region": "us-east-1",
36+
"resources": ["i-12345678"],
37+
"detail": {
38+
"eventArn": "arn:aws:health:region::event/id",
39+
"service": "EC2",
40+
"eventTypeCode": "AWS_EC2_DEDICATED_HOST_NETWORK_MAINTENANCE_SCHEDULED",
41+
"eventTypeCategory": "scheduledChange",
42+
"startTime": "Sat, 05 Jun 2016 15:10:09 GMT",
43+
"eventDescription": [{
44+
"language": "en_US",
45+
"latestDescription": "A description of the event will be provided here"
46+
}],
47+
"affectedEntities": [{
48+
"entityValue": "i-12345678",
49+
"tags": {
50+
"stage": "prod",
51+
"app": "my-app"
52+
}
53+
}]
54+
}
55+
}
56+
*/
57+
58+
// AffectedEntity holds information about an entity that is affected by a Health event
59+
type AffectedEntity struct {
60+
EntityValue string `json:"entityValue"`
61+
}
62+
63+
// ScheduledMaintenanceDetail holds the event details for AWS Health scheduled EC2 change events from Amazon EventBridge
64+
type ScheduledMaintenanceDetail struct {
65+
EventTypeCategory string `json:"eventTypeCategory"`
66+
Service string `json:"service"`
67+
AffectedEntities []AffectedEntity `json:"affectedEntities"`
68+
}
69+
70+
const supportedEventCategoryTypes = "scheduledChange"
71+
72+
func (m SQSMonitor) maintenanceNoticeToInterruptionEvents(event EventBridgeEvent, message *sqs.Message) ([]InterruptionEventWrapper, error) {
73+
scheduledMaintenanceDetail := &ScheduledMaintenanceDetail{}
74+
err := json.Unmarshal(event.Detail, scheduledMaintenanceDetail)
75+
if err != nil {
76+
return nil, err
77+
}
78+
79+
if scheduledMaintenanceDetail.Service != "EC2" {
80+
return nil, fmt.Errorf("Amazon EventBridge events for service (%s) are not supported", scheduledMaintenanceDetail.Service)
81+
}
82+
83+
if !strings.Contains(supportedEventCategoryTypes, scheduledMaintenanceDetail.EventTypeCategory) {
84+
return nil, fmt.Errorf("Amazon EventBridge events with EventTypeCategory (%s) are not supported", scheduledMaintenanceDetail.EventTypeCategory)
85+
}
86+
87+
interruptionEventWrappers := make([]InterruptionEventWrapper, len(event.Resources))
88+
89+
for i, affectedEntity := range scheduledMaintenanceDetail.AffectedEntities {
90+
nodeName, err := m.retrieveNodeName(affectedEntity.EntityValue)
91+
if err != nil {
92+
interruptionEventWrappers[i] = InterruptionEventWrapper{nil, err}
93+
continue
94+
}
95+
asgName, _ := m.retrieveAutoScalingGroupName(affectedEntity.EntityValue)
96+
interruptionEvent := monitor.InterruptionEvent{
97+
EventID: fmt.Sprintf("aws-health-maintenance-event-%x", event.ID),
98+
Kind: SQSTerminateKind,
99+
AutoScalingGroupName: asgName,
100+
StartTime: event.getTime(),
101+
NodeName: nodeName,
102+
InstanceID: affectedEntity.EntityValue,
103+
Description: fmt.Sprintf("AWS Health maintenance event received. Instance %s will be interrupted at %s \n", affectedEntity.EntityValue, event.getTime()),
104+
}
105+
interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error {
106+
errs := m.deleteMessages([]*sqs.Message{message})
107+
if errs != nil {
108+
return errs[0]
109+
}
110+
return nil
111+
}
112+
interruptionEvent.PreDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error {
113+
err := n.TaintSpotItn(interruptionEvent.NodeName, interruptionEvent.EventID)
114+
if err != nil {
115+
log.Err(err).Msgf("Unable to taint node with taint %s:%s", node.SpotInterruptionTaint, interruptionEvent.EventID)
116+
}
117+
return nil
118+
}
119+
120+
interruptionEventWrappers[i] = InterruptionEventWrapper{&interruptionEvent, nil}
121+
}
122+
123+
return interruptionEventWrappers, nil
124+
}

pkg/monitor/sqsevent/sqs-monitor.go

Lines changed: 75 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ type SQSMonitor struct {
5050
ManagedAsgTag string
5151
}
5252

53+
// Convenience wrapper for handling a pair of an interruption event and a related error
54+
type InterruptionEventWrapper struct {
55+
InterruptionEvent *monitor.InterruptionEvent
56+
Err error
57+
}
58+
5359
// Kind denotes the kind of event that is processed
5460
func (m SQSMonitor) Kind() string {
5561
return SQSTerminateKind
@@ -63,53 +69,70 @@ func (m SQSMonitor) Monitor() error {
6369
return err
6470
}
6571

66-
failedEvents := 0
72+
failedQueueEventsCount := 0
6773
for _, message := range messages {
68-
interruptionEvent, err := m.processSQSMessage(message)
69-
switch {
70-
case errors.Is(err, ErrNodeStateNotRunning):
71-
// If the node is no longer running, just log and delete the message. If message deletion fails, count it as an error.
72-
log.Warn().Err(err).Msg("dropping event for an already terminated node")
73-
errs := m.deleteMessages([]*sqs.Message{message})
74-
if len(errs) > 0 {
75-
log.Err(errs[0]).Msg("error deleting event for already terminated node")
76-
failedEvents++
77-
}
74+
interruptionEventWrappers, err := m.processSQSMessage(message)
75+
if err != nil {
76+
log.Err(err).Msg("ignoring SQS message due to error while processing")
77+
failedQueueEventsCount++
78+
continue
79+
}
80+
failedInterruptionEventsCount := 0
81+
for i, eventWrapper := range interruptionEventWrappers {
82+
switch {
83+
case errors.Is(eventWrapper.Err, ErrNodeStateNotRunning):
84+
// If the node is no longer running, just log and delete the message. If message deletion fails, count it as an error.
85+
log.Warn().Err(eventWrapper.Err).Msg("dropping interruption event for an already terminated node")
86+
if (i == len(interruptionEventWrappers)-1) && (failedInterruptionEventsCount == len(interruptionEventWrappers)-1) {
87+
// Log that all events failed, and delete the message from the queue
88+
log.Warn().Err(eventWrapper.Err).Msg("all interruption events failed drop, moving to delete message from queue")
89+
deletionErrs := m.deleteMessages([]*sqs.Message{message})
90+
if len(deletionErrs) > 0 {
91+
log.Err(deletionErrs[0]).Msg("error deleting queue event for already terminated node(s)")
92+
failedInterruptionEventsCount++
93+
}
94+
}
7895

79-
case err != nil:
80-
// Log errors and record as failed events
81-
log.Err(err).Msg("ignoring event due to error")
82-
failedEvents++
96+
case eventWrapper.Err != nil:
97+
// Log errors and record as failed events
98+
log.Err(eventWrapper.Err).Msg("ignoring interruption event due to error")
99+
failedInterruptionEventsCount++
83100

84-
case err == nil && interruptionEvent != nil && interruptionEvent.Kind == SQSTerminateKind:
85-
// Successfully processed SQS message into a SQSTerminateKind interruption event
86-
log.Debug().Msgf("Sending %s interruption event to the interruption channel", SQSTerminateKind)
87-
m.InterruptionChan <- *interruptionEvent
101+
case eventWrapper.Err == nil && eventWrapper.InterruptionEvent != nil && eventWrapper.InterruptionEvent.Kind == SQSTerminateKind:
102+
// Successfully processed SQS message into a SQSTerminateKind interruption event
103+
log.Debug().Msgf("Sending %s interruption event to the interruption channel", SQSTerminateKind)
104+
m.InterruptionChan <- *eventWrapper.InterruptionEvent
105+
}
106+
}
107+
if failedInterruptionEventsCount == len(interruptionEventWrappers) {
108+
failedQueueEventsCount++
88109
}
89110
}
90111

91-
if len(messages) > 0 && failedEvents == len(messages) {
112+
if len(messages) > 0 && failedQueueEventsCount == len(messages) {
92113
return fmt.Errorf("none of the waiting queue events could be processed")
93114
}
94115

95116
return nil
96117
}
97118

98119
// processSQSMessage checks sqs for new messages and returns interruption events
99-
func (m SQSMonitor) processSQSMessage(message *sqs.Message) (*monitor.InterruptionEvent, error) {
120+
func (m SQSMonitor) processSQSMessage(message *sqs.Message) ([]InterruptionEventWrapper, error) {
100121
event := EventBridgeEvent{}
101122
err := json.Unmarshal([]byte(*message.Body), &event)
102123
if err != nil {
103124
return nil, err
104125
}
105126

127+
interruptionEventWrappers := []InterruptionEventWrapper{}
106128
interruptionEvent := monitor.InterruptionEvent{}
107129

108130
switch event.Source {
109131
case "aws.autoscaling":
110132
interruptionEvent, err = m.asgTerminationToInterruptionEvent(event, message)
111133
if err != nil {
112-
return nil, err
134+
return append(interruptionEventWrappers, InterruptionEventWrapper{&interruptionEvent, err}), nil
135+
113136
}
114137
case "aws.ec2":
115138
if event.DetailType == "EC2 Instance State-change Notification" {
@@ -120,28 +143,46 @@ func (m SQSMonitor) processSQSMessage(message *sqs.Message) (*monitor.Interrupti
120143
interruptionEvent, err = m.rebalanceRecommendationToInterruptionEvent(event, message)
121144
}
122145
if err != nil {
123-
return nil, err
146+
return append(interruptionEventWrappers, InterruptionEventWrapper{&interruptionEvent, err}), nil
147+
}
148+
case "aws.health":
149+
if event.DetailType == "AWS Health Event" {
150+
interruptionEventWrappers, err = m.maintenanceNoticeToInterruptionEvents(event, message)
151+
}
152+
if err != nil {
153+
return append(interruptionEventWrappers, InterruptionEventWrapper{&interruptionEvent, err}), nil
124154
}
125155
default:
126-
return nil, fmt.Errorf("Event source (%s) is not supported", event.Source)
156+
return nil, fmt.Errorf("event source (%s) is not supported", event.Source)
127157
}
128158

129-
// Bail if empty event is returned after parsing
130-
if interruptionEvent.EventID == "" {
131-
return nil, nil
159+
if interruptionEvent.EventID != "" {
160+
interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{&interruptionEvent, err})
132161
}
133162

134-
if m.CheckIfManaged {
135-
isManaged, err := m.isInstanceManaged(interruptionEvent.InstanceID)
136-
if err != nil {
137-
return &interruptionEvent, err
163+
// override problematic events
164+
for i := range interruptionEventWrappers {
165+
// Bail if empty event is returned after parsing
166+
if interruptionEventWrappers[i].InterruptionEvent.EventID == "" {
167+
interruptionEventWrappers[i].InterruptionEvent = nil
168+
interruptionEventWrappers[i].Err = nil
169+
continue
138170
}
139-
if !isManaged {
140-
return nil, nil
171+
172+
if m.CheckIfManaged {
173+
isManaged, err := m.isInstanceManaged(interruptionEventWrappers[i].InterruptionEvent.InstanceID)
174+
if err != nil {
175+
interruptionEventWrappers[i].Err = err
176+
continue
177+
}
178+
if !isManaged {
179+
interruptionEventWrappers[i].InterruptionEvent = nil
180+
interruptionEventWrappers[i].Err = nil
181+
}
141182
}
142183
}
143184

144-
return &interruptionEvent, err
185+
return interruptionEventWrappers, err
145186
}
146187

147188
// receiveQueueMessages checks the configured SQS queue for new messages

0 commit comments

Comments
 (0)