From 071776a2600ffe9080f47114cbf23f597dd3afaf Mon Sep 17 00:00:00 2001 From: Jerad C Date: Mon, 18 Apr 2022 15:33:43 -0500 Subject: [PATCH 1/6] add test specs --- src/test/reconciliation_test.go | 470 +++++++++++++++++++++++++++++++- 1 file changed, 469 insertions(+), 1 deletion(-) diff --git a/src/test/reconciliation_test.go b/src/test/reconciliation_test.go index 64e273a6..339e0064 100644 --- a/src/test/reconciliation_test.go +++ b/src/test/reconciliation_test.go @@ -485,7 +485,7 @@ var _ = Describe("Reconciliation", func() { }) }) - When("the SQS queue contains a State Change (stopping) Notification", func() { + When("the SQS queue contains a State Change Notification", func() { When("the state is stopping", func() { BeforeEach(func() { resizeCluster(3) @@ -906,6 +906,474 @@ var _ = Describe("Reconciliation", func() { }) }) + When("the terminator has event configuration", func() { + When("Cordon on ASG Termination v1", func() { + BeforeEach(func() { + resizeCluster(3) + + terminator := terminators[terminatorNamespaceName] + terminator.Spec.Events.AutoScalingTermination = "Cordon" + + sqsQueues[queueURL] = append(sqsQueues[queueURL], &sqs.Message{ + ReceiptHandle: aws.String("msg-1"), + Body: aws.String(fmt.Sprintf(`{ + "source": "aws.autoscaling", + "detail-type": "EC2 Instance-terminate Lifecycle Action", + "version": "1", + "detail": { + "EC2InstanceId": "%s", + "LifecycleTransition": "autoscaling:EC2_INSTANCE_TERMINATING" + } + }`, instanceIDs[1])), + }) + + createPendingASGLifecycleAction(instanceIDs[1]) + }) + + It("returns success and requeues the request with the reconciler's configured interval", func() { + Expect(result, err).To(HaveField("RequeueAfter", Equal(reconciler.RequeueInterval))) + }) + + It("cordons the targeted node", func() { + Expect(cordonedNodes).To(And(HaveKey(nodeNames[1]), HaveLen(1))) + }) + + It("does not drain the targeted node", func() { + Expect(drainedNodes).To(BeEmpty()) + }) + + It("completes the ASG lifecycle action", func() { + Expect(asgLifecycleActions).To(And(HaveKeyWithValue(instanceIDs[1], Equal(StateComplete)), HaveLen(1))) + }) + + It("deletes the message from the SQS queue", func() { + Expect(sqsQueues[queueURL]).To(BeEmpty()) + }) + }) + + When("\"No Action\" on ASG Termination v1", func() { + BeforeEach(func() { + resizeCluster(3) + + terminator := terminators[terminatorNamespaceName] + terminator.Spec.Events.AutoScalingTermination = "NoAction" + + sqsQueues[queueURL] = append(sqsQueues[queueURL], &sqs.Message{ + ReceiptHandle: aws.String("msg-1"), + Body: aws.String(fmt.Sprintf(`{ + "source": "aws.autoscaling", + "detail-type": "EC2 Instance-terminate Lifecycle Action", + "version": "1", + "detail": { + "EC2InstanceId": "%s", + "LifecycleTransition": "autoscaling:EC2_INSTANCE_TERMINATING" + } + }`, instanceIDs[1])), + }) + + createPendingASGLifecycleAction(instanceIDs[1]) + }) + + It("returns success and requeues the request with the reconciler's configured interval", func() { + Expect(result, err).To(HaveField("RequeueAfter", Equal(reconciler.RequeueInterval))) + }) + + It("does not cordon or drain the targeted node", func() { + Expect(cordonedNodes).To(BeEmpty()) + Expect(drainedNodes).To(BeEmpty()) + }) + + It("completes the ASG lifecycle action", func() { + Expect(asgLifecycleActions).To(And(HaveKeyWithValue(instanceIDs[1], Equal(StateComplete)), HaveLen(1))) + }) + + It("deletes the message from the SQS queue", func() { + Expect(sqsQueues[queueURL]).To(BeEmpty()) + }) + }) + + When("Cordon on ASG Termination v2", func() { + BeforeEach(func() { + resizeCluster(3) + + terminator := terminators[terminatorNamespaceName] + terminator.Spec.Events.AutoScalingTermination = "Cordon" + + sqsQueues[queueURL] = append(sqsQueues[queueURL], &sqs.Message{ + ReceiptHandle: aws.String("msg-1"), + Body: aws.String(fmt.Sprintf(`{ + "source": "aws.autoscaling", + "detail-type": "EC2 Instance-terminate Lifecycle Action", + "version": "2", + "detail": { + "EC2InstanceId": "%s", + "LifecycleTransition": "autoscaling:EC2_INSTANCE_TERMINATING" + } + }`, instanceIDs[1])), + }) + + createPendingASGLifecycleAction(instanceIDs[1]) + }) + + It("returns success and requeues the request with the reconciler's configured interval", func() { + Expect(result, err).To(HaveField("RequeueAfter", Equal(reconciler.RequeueInterval))) + }) + + It("cordons the targeted node", func() { + Expect(cordonedNodes).To(And(HaveKey(nodeNames[1]), HaveLen(1))) + }) + + It("does not drain the targeted node", func() { + Expect(drainedNodes).To(BeEmpty()) + }) + + It("completes the ASG lifecycle action", func() { + Expect(asgLifecycleActions).To(And(HaveKeyWithValue(instanceIDs[1], Equal(StateComplete)), HaveLen(1))) + }) + + It("deletes the message from the SQS queue", func() { + Expect(sqsQueues[queueURL]).To(BeEmpty()) + }) + }) + + When("\"No Action\" on ASG Termination v2", func() { + BeforeEach(func() { + resizeCluster(3) + + terminator := terminators[terminatorNamespaceName] + terminator.Spec.Events.AutoScalingTermination = "NoAction" + + sqsQueues[queueURL] = append(sqsQueues[queueURL], &sqs.Message{ + ReceiptHandle: aws.String("msg-1"), + Body: aws.String(fmt.Sprintf(`{ + "source": "aws.autoscaling", + "detail-type": "EC2 Instance-terminate Lifecycle Action", + "version": "2", + "detail": { + "EC2InstanceId": "%s", + "LifecycleTransition": "autoscaling:EC2_INSTANCE_TERMINATING" + } + }`, instanceIDs[1])), + }) + + createPendingASGLifecycleAction(instanceIDs[1]) + }) + + It("returns success and requeues the request with the reconciler's configured interval", func() { + Expect(result, err).To(HaveField("RequeueAfter", Equal(reconciler.RequeueInterval))) + }) + + It("does not cordon or drain the targeted node", func() { + Expect(cordonedNodes).To(BeEmpty()) + Expect(drainedNodes).To(BeEmpty()) + }) + + It("completes the ASG lifecycle action", func() { + Expect(asgLifecycleActions).To(And(HaveKeyWithValue(instanceIDs[1], Equal(StateComplete)), HaveLen(1))) + }) + + It("deletes the message from the SQS queue", func() { + Expect(sqsQueues[queueURL]).To(BeEmpty()) + }) + }) + + When("Cordon on Rebalance Recommendation", func() { + BeforeEach(func() { + resizeCluster(3) + + terminator := terminators[terminatorNamespaceName] + terminator.Spec.Events.RebalanceRecommendation = "Cordon" + + sqsQueues[queueURL] = append(sqsQueues[queueURL], &sqs.Message{ + ReceiptHandle: aws.String("msg-1"), + Body: aws.String(fmt.Sprintf(`{ + "source": "aws.ec2", + "detail-type": "EC2 Instance Rebalance Recommendation", + "version": "0", + "detail": { + "instance-id": "%s" + } + }`, instanceIDs[1])), + }) + }) + + It("returns success and requeues the request with the reconciler's configured interval", func() { + Expect(result, err).To(HaveField("RequeueAfter", Equal(reconciler.RequeueInterval))) + }) + + It("cordons the targeted node", func() { + Expect(cordonedNodes).To(And(HaveKey(nodeNames[1]), HaveLen(1))) + }) + + It("does not drain the targeted node", func() { + Expect(drainedNodes).To(BeEmpty()) + }) + + It("deletes the message from the SQS queue", func() { + Expect(sqsQueues[queueURL]).To(BeEmpty()) + }) + }) + + When("\"No Action\" on Rebalance Recommendation", func() { + BeforeEach(func() { + resizeCluster(3) + + terminator := terminators[terminatorNamespaceName] + terminator.Spec.Events.RebalanceRecommendation = "NoAction" + + sqsQueues[queueURL] = append(sqsQueues[queueURL], &sqs.Message{ + ReceiptHandle: aws.String("msg-1"), + Body: aws.String(fmt.Sprintf(`{ + "source": "aws.ec2", + "detail-type": "EC2 Instance Rebalance Recommendation", + "version": "0", + "detail": { + "instance-id": "%s" + } + }`, instanceIDs[1])), + }) + }) + + It("returns success and requeues the request with the reconciler's configured interval", func() { + Expect(result, err).To(HaveField("RequeueAfter", Equal(reconciler.RequeueInterval))) + }) + + It("does not cordon or drain the targeted node", func() { + Expect(cordonedNodes).To(BeEmpty()) + Expect(drainedNodes).To(BeEmpty()) + }) + + It("deletes the message from the SQS queue", func() { + Expect(sqsQueues[queueURL]).To(BeEmpty()) + }) + }) + + When("Cordon on Scheduled Change", func() { + BeforeEach(func() { + resizeCluster(4) + + terminator := terminators[terminatorNamespaceName] + terminator.Spec.Events.ScheduledChange = "Cordon" + + sqsQueues[queueURL] = append(sqsQueues[queueURL], &sqs.Message{ + ReceiptHandle: aws.String("msg-1"), + Body: aws.String(fmt.Sprintf(`{ + "source": "aws.health", + "detail-type": "AWS Health Event", + "version": "1", + "detail": { + "service": "EC2", + "eventTypeCategory": "scheduledChange", + "affectedEntities": [ + {"entityValue": "%s"}, + {"entityValue": "%s"} + ] + } + }`, instanceIDs[1], instanceIDs[2])), + }) + }) + + It("returns success and requeues the request with the reconciler's configured interval", func() { + Expect(result, err).To(HaveField("RequeueAfter", Equal(reconciler.RequeueInterval))) + }) + + It("cordons the targeted node", func() { + Expect(cordonedNodes).To(And(HaveKey(nodeNames[1]), HaveKey(nodeNames[2]), HaveLen(2))) + }) + + It("does not drain the targeted node", func() { + Expect(drainedNodes).To(BeEmpty()) + }) + + It("deletes the message from the SQS queue", func() { + Expect(sqsQueues[queueURL]).To(BeEmpty()) + }) + }) + + When("\"No Action\" on Scheduled Change", func() { + BeforeEach(func() { + resizeCluster(4) + + terminator := terminators[terminatorNamespaceName] + terminator.Spec.Events.ScheduledChange = "NoAction" + + sqsQueues[queueURL] = append(sqsQueues[queueURL], &sqs.Message{ + ReceiptHandle: aws.String("msg-1"), + Body: aws.String(fmt.Sprintf(`{ + "source": "aws.health", + "detail-type": "AWS Health Event", + "version": "1", + "detail": { + "service": "EC2", + "eventTypeCategory": "scheduledChange", + "affectedEntities": [ + {"entityValue": "%s"}, + {"entityValue": "%s"} + ] + } + }`, instanceIDs[1], instanceIDs[2])), + }) + }) + + It("returns success and requeues the request with the reconciler's configured interval", func() { + Expect(result, err).To(HaveField("RequeueAfter", Equal(reconciler.RequeueInterval))) + }) + + It("does not cordon or drain the targeted node", func() { + Expect(cordonedNodes).To(BeEmpty()) + Expect(drainedNodes).To(BeEmpty()) + }) + + It("deletes the message from the SQS queue", func() { + Expect(sqsQueues[queueURL]).To(BeEmpty()) + }) + }) + + When("Cordon on Spot Interruption", func() { + BeforeEach(func() { + resizeCluster(3) + + terminator := terminators[terminatorNamespaceName] + terminator.Spec.Events.SpotInterruption = "Cordon" + + sqsQueues[queueURL] = append(sqsQueues[queueURL], &sqs.Message{ + ReceiptHandle: aws.String("msg-1"), + Body: aws.String(fmt.Sprintf(`{ + "source": "aws.ec2", + "detail-type": "EC2 Spot Instance Interruption Warning", + "version": "1", + "detail": { + "instance-id": "%s" + } + }`, instanceIDs[1])), + }) + }) + + It("returns success and requeues the request with the reconciler's configured interval", func() { + Expect(result, err).To(HaveField("RequeueAfter", Equal(reconciler.RequeueInterval))) + }) + + It("cordons the targeted node", func() { + Expect(cordonedNodes).To(And(HaveKey(nodeNames[1]), HaveLen(1))) + }) + + It("does not drain the targeted node", func() { + Expect(drainedNodes).To(BeEmpty()) + }) + + It("deletes the message from the SQS queue", func() { + Expect(sqsQueues[queueURL]).To(BeEmpty()) + }) + }) + + When("\"No Action\" on Spot Interruption", func() { + BeforeEach(func() { + resizeCluster(3) + + terminator := terminators[terminatorNamespaceName] + terminator.Spec.Events.SpotInterruption = "NoAction" + + sqsQueues[queueURL] = append(sqsQueues[queueURL], &sqs.Message{ + ReceiptHandle: aws.String("msg-1"), + Body: aws.String(fmt.Sprintf(`{ + "source": "aws.ec2", + "detail-type": "EC2 Spot Instance Interruption Warning", + "version": "1", + "detail": { + "instance-id": "%s" + } + }`, instanceIDs[1])), + }) + }) + + It("returns success and requeues the request with the reconciler's configured interval", func() { + Expect(result, err).To(HaveField("RequeueAfter", Equal(reconciler.RequeueInterval))) + }) + + It("does not cordon or drain the targeted node", func() { + Expect(cordonedNodes).To(BeEmpty()) + Expect(drainedNodes).To(BeEmpty()) + }) + + It("deletes the message from the SQS queue", func() { + Expect(sqsQueues[queueURL]).To(BeEmpty()) + }) + }) + + When("Cordon on State Change", func() { + BeforeEach(func() { + resizeCluster(3) + + terminator := terminators[terminatorNamespaceName] + terminator.Spec.Events.StateChange = "Cordon" + + sqsQueues[queueURL] = append(sqsQueues[queueURL], &sqs.Message{ + ReceiptHandle: aws.String("msg-1"), + Body: aws.String(fmt.Sprintf(`{ + "source": "aws.ec2", + "detail-type": "EC2 Instance State-change Notification", + "version": "1", + "detail": { + "instance-id": "%s", + "state": "stopping" + } + }`, instanceIDs[1])), + }) + }) + + It("returns success and requeues the request with the reconciler's configured interval", func() { + Expect(result, err).To(HaveField("RequeueAfter", Equal(reconciler.RequeueInterval))) + }) + + It("cordons the targeted node", func() { + Expect(cordonedNodes).To(And(HaveKey(nodeNames[1]), HaveLen(1))) + }) + + It("does not drain the targeted node", func() { + Expect(drainedNodes).To(BeEmpty()) + }) + + It("deletes the message from the SQS queue", func() { + Expect(sqsQueues[queueURL]).To(BeEmpty()) + }) + }) + + When("\"No Action\" on State Change", func() { + BeforeEach(func() { + resizeCluster(3) + + terminator := terminators[terminatorNamespaceName] + terminator.Spec.Events.StateChange = "NoAction" + + sqsQueues[queueURL] = append(sqsQueues[queueURL], &sqs.Message{ + ReceiptHandle: aws.String("msg-1"), + Body: aws.String(fmt.Sprintf(`{ + "source": "aws.ec2", + "detail-type": "EC2 Instance State-change Notification", + "version": "1", + "detail": { + "instance-id": "%s", + "state": "stopping" + } + }`, instanceIDs[1])), + }) + }) + + It("returns success and requeues the request with the reconciler's configured interval", func() { + Expect(result, err).To(HaveField("RequeueAfter", Equal(reconciler.RequeueInterval))) + }) + + It("does not cordon or drain the targeted node", func() { + Expect(cordonedNodes).To(BeEmpty()) + Expect(drainedNodes).To(BeEmpty()) + }) + + It("deletes the message from the SQS queue", func() { + Expect(sqsQueues[queueURL]).To(BeEmpty()) + }) + }) + }) + When("the terminator cannot be found", func() { BeforeEach(func() { delete(terminators, terminatorNamespaceName) From 9fd1d90ce72ade39b38bd08137bf28bce1c69b3d Mon Sep 17 00:00:00 2001 From: Jerad C Date: Mon, 18 Apr 2022 16:11:01 -0500 Subject: [PATCH 2/6] add events to Terminator spec --- src/api/v1alpha1/terminator_logging.go | 10 +++++ src/api/v1alpha1/terminator_types.go | 22 +++++++++++ src/api/v1alpha1/terminator_validation.go | 27 ++++++++++--- .../templates/node.k8s.aws_terminators.yaml | 39 +++++++++++++++++++ 4 files changed, 93 insertions(+), 5 deletions(-) diff --git a/src/api/v1alpha1/terminator_logging.go b/src/api/v1alpha1/terminator_logging.go index 06df4785..95797d21 100644 --- a/src/api/v1alpha1/terminator_logging.go +++ b/src/api/v1alpha1/terminator_logging.go @@ -31,6 +31,7 @@ func (t *TerminatorSpec) MarshalLogObject(enc zapcore.ObjectEncoder) error { } enc.AddObject("sqs", t.SQS) enc.AddObject("drain", t.Drain) + enc.AddObject("events", t.Events) return nil } @@ -47,3 +48,12 @@ func (d DrainSpec) MarshalLogObject(enc zapcore.ObjectEncoder) error { enc.AddInt("timeoutSeconds", d.TimeoutSeconds) return nil } + +func (e EventsSpec) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddString("autoScalingTermination", e.AutoScalingTermination) + enc.AddString("rebalanceRecommendation", e.RebalanceRecommendation) + enc.AddString("scheduledChange", e.ScheduledChange) + enc.AddString("spotInterruption", e.SpotInterruption) + enc.AddString("stateChange", e.StateChange) + return nil +} diff --git a/src/api/v1alpha1/terminator_types.go b/src/api/v1alpha1/terminator_types.go index 0853c130..6919ef1a 100644 --- a/src/api/v1alpha1/terminator_types.go +++ b/src/api/v1alpha1/terminator_types.go @@ -33,6 +33,7 @@ type TerminatorSpec struct { MatchLabels map[string]string `json:"matchLabels,omitempty"` SQS SQSSpec `json:"sqs,omitempty"` Drain DrainSpec `json:"drain,omitempty"` + Events EventsSpec `json:"events,omitempty"` } // SQSSpec defines inputs to SQS "receive messages" requests. @@ -51,6 +52,27 @@ type DrainSpec struct { TimeoutSeconds int `json:"timeoutSeconds,omitempty"` } +type Action = string + +var Actions = struct { + CordonAndDrain, + Cordon, + NoAction Action +}{ + CordonAndDrain: "CordonAndDrain", + Cordon: "Cordon", + NoAction: "NoAction", +} + +// EventsSpec defines the action(s) that should be performed in response to a particular event. +type EventsSpec struct { + AutoScalingTermination Action `json:"autoScalingTermination,omitempty"` + RebalanceRecommendation Action `json:"rebalanceRecommendation,omitempty"` + ScheduledChange Action `json:"scheduledChange,omitempty"` + SpotInterruption Action `json:"spotInterruption,omitempty"` + StateChange Action `json:"stateChange,omitempty"` +} + // TerminatorStatus defines the observed state of Terminator type TerminatorStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster diff --git a/src/api/v1alpha1/terminator_validation.go b/src/api/v1alpha1/terminator_validation.go index 9a6648d9..b4e0642a 100644 --- a/src/api/v1alpha1/terminator_validation.go +++ b/src/api/v1alpha1/terminator_validation.go @@ -18,18 +18,18 @@ package v1alpha1 import ( "context" + "fmt" "net/url" - "github.com/aws/aws-sdk-go/service/sqs" - "k8s.io/apimachinery/pkg/util/sets" "knative.dev/pkg/apis" ) -var ( - // https://github.com/aws/aws-sdk-go/blob/v1.38.55/service/sqs/api.go#L3966-L3994 - knownSQSAttributeNames = sets.NewString(sqs.MessageSystemAttributeName_Values()...) +var knownActions = sets.NewString( + Actions.CordonAndDrain, + Actions.Cordon, + Actions.NoAction, ) func (t *Terminator) Validate(_ context.Context) (errs *apis.FieldError) { @@ -43,6 +43,7 @@ func (t *TerminatorSpec) validate() (errs *apis.FieldError) { return errs.Also( t.validateMatchLabels().ViaField("matchLabels"), t.SQS.validate().ViaField("sqs"), + t.Events.validate().ViaField("events"), ) } @@ -61,3 +62,19 @@ func (s *SQSSpec) validate() (errs *apis.FieldError) { } return errs } + +func (e *EventsSpec) validate() (errs *apis.FieldError) { + errMsg := fmt.Sprintf("must be one of %s", knownActions.List()) + for name, value := range map[string]string{ + "autoScalingTermination": e.AutoScalingTermination, + "rebalanceRecommendation": e.RebalanceRecommendation, + "scheduledChange": e.ScheduledChange, + "spotInterruption": e.SpotInterruption, + "stateChange": e.StateChange, + } { + if !knownActions.Has(value) { + errs = errs.Also(apis.ErrInvalidValue(value, name, errMsg)) + } + } + return errs +} diff --git a/src/charts/aws-node-termination-handler-2/templates/node.k8s.aws_terminators.yaml b/src/charts/aws-node-termination-handler-2/templates/node.k8s.aws_terminators.yaml index c2cd0852..aa57064f 100644 --- a/src/charts/aws-node-termination-handler-2/templates/node.k8s.aws_terminators.yaml +++ b/src/charts/aws-node-termination-handler-2/templates/node.k8s.aws_terminators.yaml @@ -97,6 +97,45 @@ spec: {{- with .Values.terminator.defaults.drain.timeoutSeconds }} default: {{ . }} {{- end }} + events: + description: Specify what action should be taken when a particular message type is received. + type: object + properties: + autoScalingTermination: + type: string + enum: + - CordonAndDrain + - Cordon + - NoAction + default: CordonAndDrain + rebalanceRecommendation: + type: string + enum: + - CordonAndDrain + - Cordon + - NoAction + default: CordonAndDrain + scheduledChange: + type: string + enum: + - CordonAndDrain + - Cordon + - NoAction + default: CordonAndDrain + spotInterruption: + type: string + enum: + - CordonAndDrain + - Cordon + - NoAction + default: CordonAndDrain + stateChange: + type: string + enum: + - CordonAndDrain + - Cordon + - NoAction + default: CordonAndDrain status: description: TerminatorStatus defines the observed state of Terminator type: object From c7c4dc756eab9e75664092364ef9f68d468b157b Mon Sep 17 00:00:00 2001 From: Jerad C Date: Wed, 20 Apr 2022 11:36:46 -0500 Subject: [PATCH 3/6] add EventKind --- src/pkg/event/asgterminate/v1/handler.go | 5 +++ src/pkg/event/asgterminate/v2/handler.go | 5 +++ src/pkg/event/noop.go | 10 ++++- .../rebalancerecommendation/v0/handler.go | 8 +++- src/pkg/event/scheduledchange/v1/handler.go | 8 +++- src/pkg/event/spotinterruption/v1/handler.go | 8 +++- src/pkg/event/statechange/v1/handler.go | 8 +++- src/pkg/terminator/eventkind.go | 39 +++++++++++++++++++ src/pkg/terminator/reconciler.go | 1 + 9 files changed, 86 insertions(+), 6 deletions(-) create mode 100644 src/pkg/terminator/eventkind.go diff --git a/src/pkg/event/asgterminate/v1/handler.go b/src/pkg/event/asgterminate/v1/handler.go index c3c543c9..f386b736 100644 --- a/src/pkg/event/asgterminate/v1/handler.go +++ b/src/pkg/event/asgterminate/v1/handler.go @@ -20,6 +20,7 @@ import ( "context" "github.com/aws/aws-node-termination-handler/pkg/event/asgterminate/lifecycleaction" + "github.com/aws/aws-node-termination-handler/pkg/terminator" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -30,6 +31,10 @@ type EC2InstanceTerminateLifecycleAction struct { AWSEvent } +func (EC2InstanceTerminateLifecycleAction) Kind() terminator.EventKind { + return terminator.EventKinds.AutoScalingTermination +} + func (e EC2InstanceTerminateLifecycleAction) EC2InstanceIDs() []string { return []string{e.Detail.EC2InstanceID} } diff --git a/src/pkg/event/asgterminate/v2/handler.go b/src/pkg/event/asgterminate/v2/handler.go index 26c727a7..01550074 100644 --- a/src/pkg/event/asgterminate/v2/handler.go +++ b/src/pkg/event/asgterminate/v2/handler.go @@ -20,6 +20,7 @@ import ( "context" "github.com/aws/aws-node-termination-handler/pkg/event/asgterminate/lifecycleaction" + "github.com/aws/aws-node-termination-handler/pkg/terminator" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -30,6 +31,10 @@ type EC2InstanceTerminateLifecycleAction struct { AWSEvent } +func (EC2InstanceTerminateLifecycleAction) Kind() terminator.EventKind { + return terminator.EventKinds.AutoScalingTermination +} + func (e EC2InstanceTerminateLifecycleAction) EC2InstanceIDs() []string { return []string{e.Detail.EC2InstanceID} } diff --git a/src/pkg/event/noop.go b/src/pkg/event/noop.go index 8d67e49e..2f70ab78 100644 --- a/src/pkg/event/noop.go +++ b/src/pkg/event/noop.go @@ -19,17 +19,23 @@ package event import ( "context" + "github.com/aws/aws-node-termination-handler/pkg/terminator" + "go.uber.org/zap" "go.uber.org/zap/zapcore" ) type noop AWSMetadata -func (n noop) EC2InstanceIDs() []string { +func (noop) Kind() terminator.EventKind { + return terminator.EventKinds.Noop +} + +func (noop) EC2InstanceIDs() []string { return []string{} } -func (n noop) Done(_ context.Context) (bool, error) { +func (noop) Done(_ context.Context) (bool, error) { return true, nil } diff --git a/src/pkg/event/rebalancerecommendation/v0/handler.go b/src/pkg/event/rebalancerecommendation/v0/handler.go index 8e873f80..7b8dbf37 100644 --- a/src/pkg/event/rebalancerecommendation/v0/handler.go +++ b/src/pkg/event/rebalancerecommendation/v0/handler.go @@ -19,17 +19,23 @@ package v0 import ( "context" + "github.com/aws/aws-node-termination-handler/pkg/terminator" + "go.uber.org/zap" "go.uber.org/zap/zapcore" ) type EC2InstanceRebalanceRecommendation AWSEvent +func (EC2InstanceRebalanceRecommendation) Kind() terminator.EventKind { + return terminator.EventKinds.RebalanceRecommendation +} + func (e EC2InstanceRebalanceRecommendation) EC2InstanceIDs() []string { return []string{e.Detail.InstanceID} } -func (e EC2InstanceRebalanceRecommendation) Done(_ context.Context) (bool, error) { +func (EC2InstanceRebalanceRecommendation) Done(_ context.Context) (bool, error) { return false, nil } diff --git a/src/pkg/event/scheduledchange/v1/handler.go b/src/pkg/event/scheduledchange/v1/handler.go index e1f3ccf9..9e93ffbd 100644 --- a/src/pkg/event/scheduledchange/v1/handler.go +++ b/src/pkg/event/scheduledchange/v1/handler.go @@ -19,12 +19,18 @@ package v1 import ( "context" + "github.com/aws/aws-node-termination-handler/pkg/terminator" + "go.uber.org/zap" "go.uber.org/zap/zapcore" ) type AWSHealthEvent AWSEvent +func (AWSHealthEvent) Kind() terminator.EventKind { + return terminator.EventKinds.ScheduledChange +} + func (e AWSHealthEvent) EC2InstanceIDs() []string { ids := make([]string, len(e.Detail.AffectedEntities)) for i, entity := range e.Detail.AffectedEntities { @@ -33,7 +39,7 @@ func (e AWSHealthEvent) EC2InstanceIDs() []string { return ids } -func (e AWSHealthEvent) Done(_ context.Context) (bool, error) { +func (AWSHealthEvent) Done(_ context.Context) (bool, error) { return false, nil } diff --git a/src/pkg/event/spotinterruption/v1/handler.go b/src/pkg/event/spotinterruption/v1/handler.go index 42c8b1e2..d8da27bf 100644 --- a/src/pkg/event/spotinterruption/v1/handler.go +++ b/src/pkg/event/spotinterruption/v1/handler.go @@ -19,17 +19,23 @@ package v1 import ( "context" + "github.com/aws/aws-node-termination-handler/pkg/terminator" + "go.uber.org/zap" "go.uber.org/zap/zapcore" ) type EC2SpotInstanceInterruptionWarning AWSEvent +func (EC2SpotInstanceInterruptionWarning) Kind() terminator.EventKind { + return terminator.EventKinds.SpotInterruption +} + func (e EC2SpotInstanceInterruptionWarning) EC2InstanceIDs() []string { return []string{e.Detail.InstanceID} } -func (e EC2SpotInstanceInterruptionWarning) Done(_ context.Context) (bool, error) { +func (EC2SpotInstanceInterruptionWarning) Done(_ context.Context) (bool, error) { return false, nil } diff --git a/src/pkg/event/statechange/v1/handler.go b/src/pkg/event/statechange/v1/handler.go index 0527f2a1..02781679 100644 --- a/src/pkg/event/statechange/v1/handler.go +++ b/src/pkg/event/statechange/v1/handler.go @@ -19,17 +19,23 @@ package v1 import ( "context" + "github.com/aws/aws-node-termination-handler/pkg/terminator" + "go.uber.org/zap" "go.uber.org/zap/zapcore" ) type EC2InstanceStateChangeNotification AWSEvent +func (EC2InstanceStateChangeNotification) Kind() terminator.EventKind { + return terminator.EventKinds.StateChange +} + func (e EC2InstanceStateChangeNotification) EC2InstanceIDs() []string { return []string{e.Detail.InstanceID} } -func (e EC2InstanceStateChangeNotification) Done(_ context.Context) (bool, error) { +func (EC2InstanceStateChangeNotification) Done(_ context.Context) (bool, error) { return false, nil } diff --git a/src/pkg/terminator/eventkind.go b/src/pkg/terminator/eventkind.go new file mode 100644 index 00000000..c4546b32 --- /dev/null +++ b/src/pkg/terminator/eventkind.go @@ -0,0 +1,39 @@ +/* +Copyright 2022 Amazon.com, Inc. or its affiliates. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package terminator + +type EventKind string + +var EventKinds = struct { + AutoScalingTermination, + RebalanceRecommendation, + ScheduledChange, + SpotInterruption, + StateChange, + Noop EventKind +}{ + AutoScalingTermination: EventKind("autoScalingTermination"), + RebalanceRecommendation: EventKind("rebalanceRecommendation"), + ScheduledChange: EventKind("scheduledChange"), + SpotInterruption: EventKind("spotInterruption"), + StateChange: EventKind("stateChange"), + Noop: EventKind("noop"), +} + +func (e EventKind) String() string { + return string(e) +} diff --git a/src/pkg/terminator/reconciler.go b/src/pkg/terminator/reconciler.go index b1cbcfa8..27bac409 100644 --- a/src/pkg/terminator/reconciler.go +++ b/src/pkg/terminator/reconciler.go @@ -51,6 +51,7 @@ type ( Done(context.Context) (tryAgain bool, err error) EC2InstanceIDs() []string + Kind() EventKind } Getter interface { From c1a002ec3e57f2efa90f081f5727de650d7b698e Mon Sep 17 00:00:00 2001 From: Jerad C Date: Wed, 20 Apr 2022 11:48:44 -0500 Subject: [PATCH 4/6] update reconciliation --- src/pkg/terminator/reconciler.go | 36 +++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/src/pkg/terminator/reconciler.go b/src/pkg/terminator/reconciler.go index 27bac409..0985acdb 100644 --- a/src/pkg/terminator/reconciler.go +++ b/src/pkg/terminator/reconciler.go @@ -133,9 +133,15 @@ func (r Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (recon evt := r.Parse(ctx, msg) ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("event", evt)) + evtAction := actionForEvent(evt, terminator) + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("action", evtAction)) + allInstancesHandled := true + ec2InstanceIDs := evt.EC2InstanceIDs() savedCtx := ctx - for _, ec2InstanceID := range evt.EC2InstanceIDs() { + for i := 0; i < len(ec2InstanceIDs) && evtAction != v1alpha1.Actions.NoAction; i++ { + ec2InstanceID := ec2InstanceIDs[i] + ctx = logging.WithLogger(savedCtx, logging.FromContext(savedCtx). With("ec2InstanceID", ec2InstanceID), ) @@ -165,6 +171,10 @@ func (r Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (recon continue } + if evtAction == v1alpha1.Actions.Cordon { + continue + } + if e = cordondrainer.Drain(ctx, node); e != nil { err = multierr.Append(err, e) continue @@ -201,3 +211,27 @@ func (r Reconciler) BuildController(builder *builder.Builder) error { For(&v1alpha1.Terminator{}). Complete(r) } + +func actionForEvent(evt Event, terminator *v1alpha1.Terminator) v1alpha1.Action { + events := terminator.Spec.Events + + switch evt.Kind() { + case EventKinds.AutoScalingTermination: + return events.AutoScalingTermination + + case EventKinds.RebalanceRecommendation: + return events.RebalanceRecommendation + + case EventKinds.ScheduledChange: + return events.ScheduledChange + + case EventKinds.SpotInterruption: + return events.SpotInterruption + + case EventKinds.StateChange: + return events.StateChange + + default: + return v1alpha1.Actions.NoAction + } +} From 2adee9b443af93316cd3f9447839f8f9cc2165eb Mon Sep 17 00:00:00 2001 From: Jerad C Date: Wed, 27 Apr 2022 08:05:53 -0500 Subject: [PATCH 5/6] clarify enum definition --- src/api/v1alpha1/terminator_types.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/api/v1alpha1/terminator_types.go b/src/api/v1alpha1/terminator_types.go index 6919ef1a..70edb4f6 100644 --- a/src/api/v1alpha1/terminator_types.go +++ b/src/api/v1alpha1/terminator_types.go @@ -59,9 +59,9 @@ var Actions = struct { Cordon, NoAction Action }{ - CordonAndDrain: "CordonAndDrain", - Cordon: "Cordon", - NoAction: "NoAction", + CordonAndDrain: Action("CordonAndDrain"), + Cordon: Action("Cordon"), + NoAction: Action("NoAction"), } // EventsSpec defines the action(s) that should be performed in response to a particular event. From 54b697702a536f9085fd5901d41d5b20964e97fe Mon Sep 17 00:00:00 2001 From: Jerad C Date: Wed, 27 Apr 2022 10:13:33 -0500 Subject: [PATCH 6/6] refactor reconcile method --- src/pkg/terminator/reconciler.go | 129 +++++++++++++++---------------- 1 file changed, 64 insertions(+), 65 deletions(-) diff --git a/src/pkg/terminator/reconciler.go b/src/pkg/terminator/reconciler.go index 0985acdb..cb35aebc 100644 --- a/src/pkg/terminator/reconciler.go +++ b/src/pkg/terminator/reconciler.go @@ -124,81 +124,80 @@ func (r Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (recon return reconcile.Result{}, err } - origCtx := ctx for _, msg := range sqsMessages { - ctx = logging.WithLogger(origCtx, logging.FromContext(origCtx). - With("sqsMessage", logging.NewMessageMarshaler(msg)), - ) - - evt := r.Parse(ctx, msg) - ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("event", evt)) - - evtAction := actionForEvent(evt, terminator) - ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("action", evtAction)) - - allInstancesHandled := true - ec2InstanceIDs := evt.EC2InstanceIDs() - savedCtx := ctx - for i := 0; i < len(ec2InstanceIDs) && evtAction != v1alpha1.Actions.NoAction; i++ { - ec2InstanceID := ec2InstanceIDs[i] - - ctx = logging.WithLogger(savedCtx, logging.FromContext(savedCtx). - With("ec2InstanceID", ec2InstanceID), - ) - - nodeName, e := r.GetNodeName(ctx, ec2InstanceID) - if e != nil { - err = multierr.Append(err, e) - allInstancesHandled = false - continue - } - - ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("node", nodeName)) - - node, e := nodeGetter.GetNode(ctx, nodeName) - if node == nil { - logger := logging.FromContext(ctx) - if e != nil { - logger = logger.With("error", e) - } - logger.Warn("no matching node found") - allInstancesHandled = false - continue - } - - if e = cordondrainer.Cordon(ctx, node); e != nil { - err = multierr.Append(err, e) - continue - } - - if evtAction == v1alpha1.Actions.Cordon { - continue - } - - if e = cordondrainer.Drain(ctx, node); e != nil { - err = multierr.Append(err, e) - continue - } - } - ctx = savedCtx + e := r.handleMessage(ctx, msg, terminator, nodeGetter, cordondrainer, sqsClient) + err = multierr.Append(err, e) + } + + if err != nil { + return reconcile.Result{}, err + } + return reconcile.Result{RequeueAfter: r.RequeueInterval}, nil +} - tryAgain, e := evt.Done(ctx) - if e != nil { +func (r Reconciler) handleMessage(ctx context.Context, msg *sqs.Message, terminator *v1alpha1.Terminator, nodeGetter NodeGetter, cordondrainer CordonDrainer, sqsClient SQSClient) (err error) { + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("sqsMessage", logging.NewMessageMarshaler(msg))) + + evt := r.Parse(ctx, msg) + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("event", evt)) + + evtAction := actionForEvent(evt, terminator) + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("action", evtAction)) + + allInstancesHandled := true + if evtAction != v1alpha1.Actions.NoAction { + for _, ec2InstanceID := range evt.EC2InstanceIDs() { + instanceHandled, e := r.handleInstance(ctx, ec2InstanceID, evtAction, nodeGetter, cordondrainer) err = multierr.Append(err, e) + allInstancesHandled = allInstancesHandled && instanceHandled } + } - if tryAgain || !allInstancesHandled { - continue - } + tryAgain, e := evt.Done(ctx) + if e != nil { + err = multierr.Append(err, e) + } - err = multierr.Append(err, sqsClient.DeleteSQSMessage(ctx, msg)) + if tryAgain || !allInstancesHandled { + return err } - ctx = origCtx + return multierr.Append(err, sqsClient.DeleteSQSMessage(ctx, msg)) +} + +func (r Reconciler) handleInstance(ctx context.Context, ec2InstanceID string, evtAction v1alpha1.Action, nodeGetter NodeGetter, cordondrainer CordonDrainer) (bool, error) { + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("ec2InstanceID", ec2InstanceID)) + + nodeName, err := r.GetNodeName(ctx, ec2InstanceID) if err != nil { - return reconcile.Result{}, err + return false, err } - return reconcile.Result{RequeueAfter: r.RequeueInterval}, nil + + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("node", nodeName)) + + node, err := nodeGetter.GetNode(ctx, nodeName) + if node == nil { + logger := logging.FromContext(ctx) + if err != nil { + logger = logger.With("error", err) + } + logger.Warn("no matching node found") + return false, nil + } + + if err = cordondrainer.Cordon(ctx, node); err != nil { + return true, err + } + + if evtAction == v1alpha1.Actions.Cordon { + return true, nil + } + + if err = cordondrainer.Drain(ctx, node); err != nil { + return true, err + } + + return true, nil } func (r Reconciler) BuildController(builder *builder.Builder) error {