diff --git a/cmd/node-termination-handler.go b/cmd/node-termination-handler.go index 76f7fe49..a35e0f48 100644 --- a/cmd/node-termination-handler.go +++ b/cmd/node-termination-handler.go @@ -401,7 +401,7 @@ func cordonNode(node node.Node, nodeName string, drainEvent *monitor.Interruptio } func cordonAndDrainNode(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder, sqsTerminationDraining bool) error { - err := node.CordonAndDrain(nodeName, drainEvent.Description) + err := node.CordonAndDrain(nodeName, drainEvent.Description, recorder.EventRecorder) if err != nil { if errors.IsNotFound(err) { log.Err(err).Msgf("node '%s' not found in the cluster", nodeName) diff --git a/pkg/node/node.go b/pkg/node/node.go index 789e4716..0004f371 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -64,6 +65,13 @@ const ( maxTaintValueLength = 63 ) +const ( + // PodEvictReason is the event reason emitted for Pod evictions during node drain + PodEvictReason = "PodEviction" + // PodEvictMsg is the event message emitted for Pod evictions during node drain + PodEvictMsg = "Pod evicted due to node drain" +) + var ( maxRetryDeadline time.Duration = 5 * time.Second conflictRetryInterval time.Duration = 750 * time.Millisecond @@ -95,7 +103,7 @@ func NewWithValues(nthConfig config.Config, drainHelper *drain.Helper, uptime up } // CordonAndDrain will cordon the node and evict pods based on the config -func (n Node) CordonAndDrain(nodeName string, reason string) error { +func (n Node) CordonAndDrain(nodeName string, reason string, recorder recorderInterface) error { if n.nthConfig.DryRun { log.Info().Str("node_name", nodeName).Str("reason", reason).Msg("Node would have been cordoned and drained, but dry-run flag was set.") return nil @@ -114,6 +122,25 @@ func (n Node) CordonAndDrain(nodeName string, reason string) error { if err != nil { return err } + // Emit events for all pods that will be evicted + if recorder != nil { + pods, err := n.fetchAllPods(nodeName) + if err == nil { + for _, pod := range pods.Items { + podRef := &corev1.ObjectReference{ + Kind: "Pod", + Name: pod.Name, + Namespace: pod.Namespace, + } + annotations := make(map[string]string) + annotations["node"] = nodeName + for k, v := range pod.GetLabels() { + annotations[k] = v + } + recorder.AnnotatedEventf(podRef, annotations, "Normal", PodEvictReason, PodEvictMsg) + } + } + } err = drain.RunNodeDrain(n.drainHelper, node.Name) if err != nil { return err @@ -800,3 +827,7 @@ func filterPodForDeletion(podName string) func(pod corev1.Pod) drain.PodDeleteSt return drain.MakePodDeleteStatusOkay() } } + +type recorderInterface interface { + AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) +} diff --git a/pkg/node/node_test.go b/pkg/node/node_test.go index 3fd723cc..9d73db07 100644 --- a/pkg/node/node_test.go +++ b/pkg/node/node_test.go @@ -16,6 +16,7 @@ package node_test import ( "context" "strconv" + "strings" "testing" "time" @@ -28,6 +29,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/record" "k8s.io/kubectl/pkg/drain" ) @@ -61,7 +63,8 @@ func TestDryRun(t *testing.T) { tNode, err := node.New(config.Config{DryRun: true}) h.Ok(t, err) - err = tNode.CordonAndDrain(nodeName, "cordonReason") + fakeRecorder := record.NewFakeRecorder(100) + err = tNode.CordonAndDrain(nodeName, "cordonReason", fakeRecorder) h.Ok(t, err) err = tNode.Cordon(nodeName, "cordonReason") @@ -98,6 +101,7 @@ func TestNewFailure(t *testing.T) { } func TestDrainSuccess(t *testing.T) { + controllerBool := true client := fake.NewSimpleClientset() _, err := client.CoreV1().Nodes().Create( context.Background(), @@ -106,14 +110,47 @@ func TestDrainSuccess(t *testing.T) { }, metav1.CreateOptions{}) h.Ok(t, err) + + _, err = client.CoreV1().Pods("default").Create( + context.Background(), + &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "cool-app-pod-", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Name: "cool-app", + Kind: "ReplicaSet", + Controller: &controllerBool, + }, + }, + }, + Spec: v1.PodSpec{ + NodeName: nodeName, + }, + }, + metav1.CreateOptions{}) + h.Ok(t, err) + + fakeRecorder := record.NewFakeRecorder(100) + tNode := getNode(t, getDrainHelper(client)) - err = tNode.CordonAndDrain(nodeName, "cordonReason") + err = tNode.CordonAndDrain(nodeName, "cordonReason", fakeRecorder) h.Ok(t, err) + close(fakeRecorder.Events) + expectedEventArrived := false + for event := range fakeRecorder.Events { + if strings.Contains(event, "Normal PodEviction Pod evicted due to node drain") { + expectedEventArrived = true + } + } + h.Assert(t, expectedEventArrived, "PodEvicted event was not emitted") } func TestDrainCordonNodeFailure(t *testing.T) { + fakeRecorder := record.NewFakeRecorder(100) tNode := getNode(t, getDrainHelper(fake.NewSimpleClientset())) - err := tNode.CordonAndDrain(nodeName, "cordonReason") + err := tNode.CordonAndDrain(nodeName, "cordonReason", fakeRecorder) h.Assert(t, true, "Failed to return error on CordonAndDrain failing to cordon node", err != nil) } diff --git a/pkg/observability/k8s-events.go b/pkg/observability/k8s-events.go index 60f8cba9..4a06eeb4 100644 --- a/pkg/observability/k8s-events.go +++ b/pkg/observability/k8s-events.go @@ -113,7 +113,7 @@ func InitK8sEventRecorder(enabled bool, nodeName string, sqsMode bool, nodeMetad } broadcaster := record.NewBroadcaster() - broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("default")}) + broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")}) return K8sEventRecorder{ annotations: annotations,