From 28beefad33931723788f5af105d963c7d3871dc1 Mon Sep 17 00:00:00 2001 From: Carlos Motta Date: Tue, 7 Jun 2022 16:57:54 -0300 Subject: [PATCH 1/2] feat(observability): add eventID to exposed metrics --- cmd/node-termination-handler.go | 30 ++++++++++++++++-------------- pkg/observability/opentelemetry.go | 5 +++-- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/cmd/node-termination-handler.go b/cmd/node-termination-handler.go index 93604e95..d936c8a0 100644 --- a/cmd/node-termination-handler.go +++ b/cmd/node-termination-handler.go @@ -287,6 +287,7 @@ func watchForCancellationEvents(cancelChan <-chan monitor.InterruptionEvent, int for { interruptionEvent := <-cancelChan nodeName := interruptionEvent.NodeName + eventID := interruptionEvent.EventID interruptionEventStore.CancelInterruptionEvent(interruptionEvent.EventID) if interruptionEventStore.ShouldUncordonNode(nodeName) { log.Info().Msg("Uncordoning the node due to a cancellation event") @@ -297,7 +298,7 @@ func watchForCancellationEvents(cancelChan <-chan monitor.InterruptionEvent, int } else { recorder.Emit(nodeName, observability.Normal, observability.UncordonReason, observability.UncordonMsg) } - metrics.NodeActionsInc("uncordon", nodeName, err) + metrics.NodeActionsInc("uncordon", nodeName, eventID, err) err = node.RemoveNTHLabels(nodeName) if err != nil { @@ -317,6 +318,7 @@ func watchForCancellationEvents(cancelChan <-chan monitor.InterruptionEvent, int func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Store, drainEvent *monitor.InterruptionEvent, node node.Node, nthConfig config.Config, nodeMetadata ec2metadata.NodeMetadata, metrics observability.Metrics, recorder observability.K8sEventRecorder, wg *sync.WaitGroup) { defer wg.Done() nodeName := drainEvent.NodeName + eventID := drainEvent.EventID if nthConfig.UseProviderId { newNodeName, err := node.GetNodeNameFromProviderID(drainEvent.ProviderID) @@ -334,7 +336,7 @@ func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Sto } drainEvent.NodeLabels = nodeLabels if drainEvent.PreDrainTask != nil { - runPreDrainTask(node, nodeName, drainEvent, metrics, recorder) + runPreDrainTask(node, nodeName, eventID, drainEvent, metrics, recorder) } podNameList, err := node.FetchPodNameList(nodeName) @@ -348,9 +350,9 @@ func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Sto } if nthConfig.CordonOnly || (!nthConfig.EnableSQSTerminationDraining && drainEvent.IsRebalanceRecommendation() && !nthConfig.EnableRebalanceDraining) { - err = cordonNode(node, nodeName, drainEvent, metrics, recorder) + err = cordonNode(node, nodeName, eventID, drainEvent, metrics, recorder) } else { - err = cordonAndDrainNode(node, nodeName, drainEvent, metrics, recorder, nthConfig.EnableSQSTerminationDraining) + err = cordonAndDrainNode(node, nodeName, eventID, drainEvent, metrics, recorder, nthConfig.EnableSQSTerminationDraining) } if nthConfig.WebhookURL != "" { @@ -363,14 +365,14 @@ func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Sto } else { interruptionEventStore.MarkAllAsProcessed(nodeName) if drainEvent.PostDrainTask != nil { - runPostDrainTask(node, nodeName, drainEvent, metrics, recorder) + runPostDrainTask(node, nodeName, eventID, drainEvent, metrics, recorder) } <-interruptionEventStore.Workers } } -func runPreDrainTask(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) { +func runPreDrainTask(node node.Node, nodeName string, eventID string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) { err := drainEvent.PreDrainTask(*drainEvent, node) if err != nil { log.Err(err).Msg("There was a problem executing the pre-drain task") @@ -378,10 +380,10 @@ func runPreDrainTask(node node.Node, nodeName string, drainEvent *monitor.Interr } else { recorder.Emit(nodeName, observability.Normal, observability.PreDrainReason, observability.PreDrainMsg) } - metrics.NodeActionsInc("pre-drain", nodeName, err) + metrics.NodeActionsInc("pre-drain", nodeName, eventID, err) } -func cordonNode(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) error { +func cordonNode(node node.Node, nodeName string, eventID string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) error { err := node.Cordon(nodeName, drainEvent.Description) if err != nil { if errors.IsNotFound(err) { @@ -393,32 +395,32 @@ func cordonNode(node node.Node, nodeName string, drainEvent *monitor.Interruptio return err } else { log.Info().Str("node_name", nodeName).Str("reason", drainEvent.Description).Msg("Node successfully cordoned") - metrics.NodeActionsInc("cordon", nodeName, err) + metrics.NodeActionsInc("cordon", nodeName, eventID, err) recorder.Emit(nodeName, observability.Normal, observability.CordonReason, observability.CordonMsg) } return nil } -func cordonAndDrainNode(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder, sqsTerminationDraining bool) error { +func cordonAndDrainNode(node node.Node, nodeName string, eventID string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder, sqsTerminationDraining bool) error { err := node.CordonAndDrain(nodeName, drainEvent.Description) if err != nil { if errors.IsNotFound(err) { log.Err(err).Msgf("node '%s' not found in the cluster", nodeName) } else { log.Err(err).Msg("There was a problem while trying to cordon and drain the node") - metrics.NodeActionsInc("cordon-and-drain", nodeName, err) + metrics.NodeActionsInc("cordon-and-drain", nodeName, eventID, err) recorder.Emit(nodeName, observability.Warning, observability.CordonAndDrainErrReason, observability.CordonAndDrainErrMsgFmt, err.Error()) } return err } else { log.Info().Str("node_name", nodeName).Str("reason", drainEvent.Description).Msg("Node successfully cordoned and drained") - metrics.NodeActionsInc("cordon-and-drain", nodeName, err) + metrics.NodeActionsInc("cordon-and-drain", nodeName, eventID, err) recorder.Emit(nodeName, observability.Normal, observability.CordonAndDrainReason, observability.CordonAndDrainMsg) } return nil } -func runPostDrainTask(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) { +func runPostDrainTask(node node.Node, nodeName string, eventID string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) { err := drainEvent.PostDrainTask(*drainEvent, node) if err != nil { log.Err(err).Msg("There was a problem executing the post-drain task") @@ -426,7 +428,7 @@ func runPostDrainTask(node node.Node, nodeName string, drainEvent *monitor.Inter } else { recorder.Emit(nodeName, observability.Normal, observability.PostDrainReason, observability.PostDrainMsg) } - metrics.NodeActionsInc("post-drain", nodeName, err) + metrics.NodeActionsInc("post-drain", nodeName, eventID, err) } func getRegionFromQueueURL(queueURL string) string { diff --git a/pkg/observability/opentelemetry.go b/pkg/observability/opentelemetry.go index af0c71c9..4993be96 100644 --- a/pkg/observability/opentelemetry.go +++ b/pkg/observability/opentelemetry.go @@ -32,6 +32,7 @@ var ( labelNodeActionKey = attribute.Key("node/action") labelNodeStatusKey = attribute.Key("node/status") labelNodeNameKey = attribute.Key("node/name") + labelEventIDKey = attribute.Key("node/event-id") ) // Metrics represents the stats for observability @@ -88,12 +89,12 @@ func (m Metrics) ErrorEventsInc(where string) { } // NodeActionsInc will increment one for the node stats counter, partitioned by action, nodeName and status, and only if metrics are enabled. -func (m Metrics) NodeActionsInc(action, nodeName string, err error) { +func (m Metrics) NodeActionsInc(action, nodeName string, eventID string, err error) { if !m.enabled { return } - labels := []attribute.KeyValue{labelNodeActionKey.String(action), labelNodeNameKey.String(nodeName)} + labels := []attribute.KeyValue{labelNodeActionKey.String(action), labelNodeNameKey.String(nodeName), labelEventIDKey.String(eventID)} if err != nil { labels = append(labels, labelNodeStatusKey.String("error")) } else { From bcfecb33204f097cc4d69849518b844eb270e494 Mon Sep 17 00:00:00 2001 From: Carlos Motta Date: Wed, 15 Jun 2022 15:12:01 -0300 Subject: [PATCH 2/2] chore(observability): using drainEvent parameter instead of rewriting function signature --- cmd/node-termination-handler.go | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/cmd/node-termination-handler.go b/cmd/node-termination-handler.go index d936c8a0..76cad7bc 100644 --- a/cmd/node-termination-handler.go +++ b/cmd/node-termination-handler.go @@ -318,7 +318,6 @@ func watchForCancellationEvents(cancelChan <-chan monitor.InterruptionEvent, int func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Store, drainEvent *monitor.InterruptionEvent, node node.Node, nthConfig config.Config, nodeMetadata ec2metadata.NodeMetadata, metrics observability.Metrics, recorder observability.K8sEventRecorder, wg *sync.WaitGroup) { defer wg.Done() nodeName := drainEvent.NodeName - eventID := drainEvent.EventID if nthConfig.UseProviderId { newNodeName, err := node.GetNodeNameFromProviderID(drainEvent.ProviderID) @@ -336,7 +335,7 @@ func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Sto } drainEvent.NodeLabels = nodeLabels if drainEvent.PreDrainTask != nil { - runPreDrainTask(node, nodeName, eventID, drainEvent, metrics, recorder) + runPreDrainTask(node, nodeName, drainEvent, metrics, recorder) } podNameList, err := node.FetchPodNameList(nodeName) @@ -350,9 +349,9 @@ func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Sto } if nthConfig.CordonOnly || (!nthConfig.EnableSQSTerminationDraining && drainEvent.IsRebalanceRecommendation() && !nthConfig.EnableRebalanceDraining) { - err = cordonNode(node, nodeName, eventID, drainEvent, metrics, recorder) + err = cordonNode(node, nodeName, drainEvent, metrics, recorder) } else { - err = cordonAndDrainNode(node, nodeName, eventID, drainEvent, metrics, recorder, nthConfig.EnableSQSTerminationDraining) + err = cordonAndDrainNode(node, nodeName, drainEvent, metrics, recorder, nthConfig.EnableSQSTerminationDraining) } if nthConfig.WebhookURL != "" { @@ -365,14 +364,14 @@ func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Sto } else { interruptionEventStore.MarkAllAsProcessed(nodeName) if drainEvent.PostDrainTask != nil { - runPostDrainTask(node, nodeName, eventID, drainEvent, metrics, recorder) + runPostDrainTask(node, nodeName, drainEvent, metrics, recorder) } <-interruptionEventStore.Workers } } -func runPreDrainTask(node node.Node, nodeName string, eventID string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) { +func runPreDrainTask(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) { err := drainEvent.PreDrainTask(*drainEvent, node) if err != nil { log.Err(err).Msg("There was a problem executing the pre-drain task") @@ -380,10 +379,10 @@ func runPreDrainTask(node node.Node, nodeName string, eventID string, drainEvent } else { recorder.Emit(nodeName, observability.Normal, observability.PreDrainReason, observability.PreDrainMsg) } - metrics.NodeActionsInc("pre-drain", nodeName, eventID, err) + metrics.NodeActionsInc("pre-drain", nodeName, drainEvent.EventID, err) } -func cordonNode(node node.Node, nodeName string, eventID string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) error { +func cordonNode(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) error { err := node.Cordon(nodeName, drainEvent.Description) if err != nil { if errors.IsNotFound(err) { @@ -395,32 +394,32 @@ func cordonNode(node node.Node, nodeName string, eventID string, drainEvent *mon return err } else { log.Info().Str("node_name", nodeName).Str("reason", drainEvent.Description).Msg("Node successfully cordoned") - metrics.NodeActionsInc("cordon", nodeName, eventID, err) + metrics.NodeActionsInc("cordon", nodeName, drainEvent.EventID, err) recorder.Emit(nodeName, observability.Normal, observability.CordonReason, observability.CordonMsg) } return nil } -func cordonAndDrainNode(node node.Node, nodeName string, eventID string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder, sqsTerminationDraining bool) error { +func cordonAndDrainNode(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder, sqsTerminationDraining bool) error { err := node.CordonAndDrain(nodeName, drainEvent.Description) if err != nil { if errors.IsNotFound(err) { log.Err(err).Msgf("node '%s' not found in the cluster", nodeName) } else { log.Err(err).Msg("There was a problem while trying to cordon and drain the node") - metrics.NodeActionsInc("cordon-and-drain", nodeName, eventID, err) + metrics.NodeActionsInc("cordon-and-drain", nodeName, drainEvent.EventID, err) recorder.Emit(nodeName, observability.Warning, observability.CordonAndDrainErrReason, observability.CordonAndDrainErrMsgFmt, err.Error()) } return err } else { log.Info().Str("node_name", nodeName).Str("reason", drainEvent.Description).Msg("Node successfully cordoned and drained") - metrics.NodeActionsInc("cordon-and-drain", nodeName, eventID, err) + metrics.NodeActionsInc("cordon-and-drain", nodeName, drainEvent.EventID, err) recorder.Emit(nodeName, observability.Normal, observability.CordonAndDrainReason, observability.CordonAndDrainMsg) } return nil } -func runPostDrainTask(node node.Node, nodeName string, eventID string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) { +func runPostDrainTask(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) { err := drainEvent.PostDrainTask(*drainEvent, node) if err != nil { log.Err(err).Msg("There was a problem executing the post-drain task") @@ -428,7 +427,7 @@ func runPostDrainTask(node node.Node, nodeName string, eventID string, drainEven } else { recorder.Emit(nodeName, observability.Normal, observability.PostDrainReason, observability.PostDrainMsg) } - metrics.NodeActionsInc("post-drain", nodeName, eventID, err) + metrics.NodeActionsInc("post-drain", nodeName, drainEvent.EventID, err) } func getRegionFromQueueURL(queueURL string) string {