@@ -50,6 +50,12 @@ type SQSMonitor struct {
5050 ManagedAsgTag string
5151}
5252
53+ // Convenience wrapper for handling a pair of an interruption event and a related error
54+ type InterruptionEventWrapper struct {
55+ InterruptionEvent * monitor.InterruptionEvent
56+ Err error
57+ }
58+
5359// Kind denotes the kind of event that is processed
5460func (m SQSMonitor ) Kind () string {
5561 return SQSTerminateKind
@@ -63,53 +69,70 @@ func (m SQSMonitor) Monitor() error {
6369 return err
6470 }
6571
66- failedEvents := 0
72+ failedQueueEventsCount := 0
6773 for _ , message := range messages {
68- interruptionEvent , err := m .processSQSMessage (message )
69- switch {
70- case errors .Is (err , ErrNodeStateNotRunning ):
71- // If the node is no longer running, just log and delete the message. If message deletion fails, count it as an error.
72- log .Warn ().Err (err ).Msg ("dropping event for an already terminated node" )
73- errs := m .deleteMessages ([]* sqs.Message {message })
74- if len (errs ) > 0 {
75- log .Err (errs [0 ]).Msg ("error deleting event for already terminated node" )
76- failedEvents ++
77- }
74+ interruptionEventWrappers , err := m .processSQSMessage (message )
75+ if err != nil {
76+ log .Err (err ).Msg ("ignoring SQS message due to error while processing" )
77+ failedQueueEventsCount ++
78+ continue
79+ }
80+ failedInterruptionEventsCount := 0
81+ for i , eventWrapper := range interruptionEventWrappers {
82+ switch {
83+ case errors .Is (eventWrapper .Err , ErrNodeStateNotRunning ):
84+ // If the node is no longer running, just log and delete the message. If message deletion fails, count it as an error.
85+ log .Warn ().Err (eventWrapper .Err ).Msg ("dropping interruption event for an already terminated node" )
86+ if (i == len (interruptionEventWrappers )- 1 ) && (failedInterruptionEventsCount == len (interruptionEventWrappers )- 1 ) {
87+ // Log that all events failed, and delete the message from the queue
88+ log .Warn ().Err (eventWrapper .Err ).Msg ("all interruption events failed, moving to delete message from queue" )
89+ deletionErrs := m .deleteMessages ([]* sqs.Message {message })
90+ if len (deletionErrs ) > 0 {
91+ log .Err (deletionErrs [0 ]).Msg ("error deleting queue event for already terminated node(s)" )
92+ failedInterruptionEventsCount ++
93+ }
94+ }
7895
79- case err != nil :
80- // Log errors and record as failed events
81- log .Err (err ).Msg ("ignoring event due to error" )
82- failedEvents ++
96+ case eventWrapper . Err != nil :
97+ // Log errors and record as failed events
98+ log .Warn (). Err (eventWrapper . Err ).Msg ("ignoring interruption event due to error" ) // is this what I want? Or break out specific unsupported error state?
99+ failedInterruptionEventsCount ++
83100
84- case err == nil && interruptionEvent != nil && interruptionEvent .Kind == SQSTerminateKind :
85- // Successfully processed SQS message into a SQSTerminateKind interruption event
86- log .Debug ().Msgf ("Sending %s interruption event to the interruption channel" , SQSTerminateKind )
87- m .InterruptionChan <- * interruptionEvent
101+ case eventWrapper .Err == nil && eventWrapper .InterruptionEvent != nil && eventWrapper .InterruptionEvent .Kind == SQSTerminateKind :
102+ // Successfully processed SQS message into an SQSTerminateKind interruption event
103+ log .Debug ().Msgf ("Sending %s interruption event to the interruption channel" , SQSTerminateKind )
104+ m .InterruptionChan <- * eventWrapper .InterruptionEvent
105+ }
106+ }
107+ if failedInterruptionEventsCount == len (interruptionEventWrappers ) {
108+ failedQueueEventsCount ++
88109 }
89110 }
90111
91- if len (messages ) > 0 && failedEvents == len (messages ) {
112+ if len (messages ) > 0 && failedQueueEventsCount == len (messages ) {
92113 return fmt .Errorf ("none of the waiting queue events could be processed" )
93114 }
94115
95116 return nil
96117}
97118
98119// processSQSMessage checks sqs for new messages and returns interruption events
99- func (m SQSMonitor ) processSQSMessage (message * sqs.Message ) (* monitor. InterruptionEvent , error ) {
120+ func (m SQSMonitor ) processSQSMessage (message * sqs.Message ) ([] InterruptionEventWrapper , error ) {
100121 event := EventBridgeEvent {}
101122 err := json .Unmarshal ([]byte (* message .Body ), & event )
102123 if err != nil {
103124 return nil , err
104125 }
105126
127+ interruptionEventWrappers := []InterruptionEventWrapper {}
106128 interruptionEvent := monitor.InterruptionEvent {}
107129
108130 switch event .Source {
109131 case "aws.autoscaling" :
110132 interruptionEvent , err = m .asgTerminationToInterruptionEvent (event , message )
111133 if err != nil {
112- return nil , err
134+ return append (interruptionEventWrappers , InterruptionEventWrapper {& interruptionEvent , err }), nil
135+
113136 }
114137 case "aws.ec2" :
115138 if event .DetailType == "EC2 Instance State-change Notification" {
@@ -120,28 +143,46 @@ func (m SQSMonitor) processSQSMessage(message *sqs.Message) (*monitor.Interrupti
120143 interruptionEvent , err = m .rebalanceRecommendationToInterruptionEvent (event , message )
121144 }
122145 if err != nil {
123- return nil , err
146+ return append (interruptionEventWrappers , InterruptionEventWrapper {& interruptionEvent , err }), nil
147+ }
148+ case "aws.health" :
149+ if event .DetailType == "AWS Health Event" {
150+ interruptionEventWrappers , err = m .scheduledEventToInterruptionEvents (event , message )
151+ }
152+ if err != nil {
153+ return append (interruptionEventWrappers , InterruptionEventWrapper {& interruptionEvent , err }), nil
124154 }
125155 default :
126- return nil , fmt .Errorf ("Event source (%s) is not supported" , event .Source )
156+ return nil , fmt .Errorf ("event source (%s) is not supported" , event .Source )
127157 }
128158
129- // Bail if empty event is returned after parsing
130- if interruptionEvent .EventID == "" {
131- return nil , nil
159+ if interruptionEvent .EventID != "" {
160+ interruptionEventWrappers = append (interruptionEventWrappers , InterruptionEventWrapper {& interruptionEvent , err })
132161 }
133162
134- if m .CheckIfManaged {
135- isManaged , err := m .isInstanceManaged (interruptionEvent .InstanceID )
136- if err != nil {
137- return & interruptionEvent , err
163+ // override problematic events
164+ for i := range interruptionEventWrappers {
165+ // Bail if empty event is returned after parsing
166+ if interruptionEventWrappers [i ].InterruptionEvent .EventID == "" {
167+ interruptionEventWrappers [i ].InterruptionEvent = nil
168+ interruptionEventWrappers [i ].Err = nil
169+ continue
138170 }
139- if ! isManaged {
140- return nil , nil
171+
172+ if m .CheckIfManaged {
173+ isManaged , err := m .isInstanceManaged (interruptionEventWrappers [i ].InterruptionEvent .InstanceID )
174+ if err != nil {
175+ interruptionEventWrappers [i ].Err = err
176+ continue
177+ }
178+ if ! isManaged {
179+ interruptionEventWrappers [i ].InterruptionEvent = nil
180+ interruptionEventWrappers [i ].Err = nil
181+ }
141182 }
142183 }
143184
144- return & interruptionEvent , err
185+ return interruptionEventWrappers , err
145186}
146187
147188// receiveQueueMessages checks the configured SQS queue for new messages
0 commit comments