From 5ea084b9b5fab58539d91612763a3386e2176def Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Fri, 18 Aug 2023 14:48:52 -0400 Subject: [PATCH 1/6] simplify cache --- .../queuejob/queuejob_controller_ex.go | 61 ++++++++++++++++--- 1 file changed, 51 insertions(+), 10 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index a7c61b1d..88e6abd4 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -108,6 +108,7 @@ type XController struct { // QJ queue that needs to be allocated qjqueue SchedulingQueue + //TODO: Do we need this local cache? // our own local cache, used for computing total amount of resources cache clusterstatecache.Cache @@ -154,6 +155,46 @@ func GetQueueJobKey(obj interface{}) (string, error) { return fmt.Sprintf("%s/%s", qj.Namespace, qj.Name), nil } +//allocatableCapacity calculates the capacity available on each node by substracting resources +//consumed by existing pods. +//For a large cluster with thousands of nodes and hundreds of thousands of pods this +//method could be a performance bottleneck +//We can then move this method to a seperate thread that basically runs every X interval and +//provides resources available to the next AW that needs to be dispatched. +//Obviously the thread would need locking and timer to expire cache. +//May be move to controller runtime can help. +func (qjm *XController) allocatableCapacity() *clusterstateapi.Resource { + capacity := clusterstateapi.EmptyResource() + nodes, _ := qjm.clients.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) + startTime := time.Now() + for _, node := range nodes.Items { + // skip unschedulable nodes + if node.Spec.Unschedulable { + continue + } + nodeResource := clusterstateapi.NewResource(node.Status.Allocatable) + capacity.Add(nodeResource) + var specNodeName = "spec.nodeName" + labelSelector := fmt.Sprintf("%s=%s", specNodeName, node.Name) + podList, err := qjm.clients.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{FieldSelector: labelSelector}) + //TODO: when no pods are listed, do we send entire node capacity as available + //this will cause false positive dispatch. + if err != nil { + klog.Errorf("[allocatableCapacity] Error listing pods %v", err) + } + for _, pod := range podList.Items { + if _, ok := pod.GetLabels()["appwrappers.mcad.ibm.com"]; !ok && pod.Status.Phase != v1.PodFailed && pod.Status.Phase != v1.PodSucceeded { + for _, container := range pod.Spec.Containers { + usedResource := clusterstateapi.NewResource(container.Resources.Requests) + capacity.Sub(usedResource) + } + } + } + } + klog.Info("[allocatableCapacity] The avaible capacity to dispatch appwrapper is %v and time took to calculate is %v", capacity, time.Now().Sub(startTime)) + return capacity +} + // NewJobController create new AppWrapper Controller func NewJobController(config *rest.Config, serverOption *options.ServerOption) *XController { cc := &XController{ @@ -166,8 +207,9 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) * initQueue: cache.NewFIFO(GetQueueJobKey), updateQueue: cache.NewFIFO(GetQueueJobKey), qjqueue: NewSchedulingQueue(), - cache: clusterstatecache.New(config), - schedulingAW: nil, + //TODO: do we still need cache to be initialized? + cache: clusterstatecache.New(config), + schedulingAW: nil, } //TODO: work on enabling metrics adapter for correct MCAD mode //metrics adapter is implemented through dynamic client which looks at all the @@ -1098,19 +1140,16 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { if qjm.serverOption.DynamicPriority { priorityindex = -math.MaxFloat64 } - //cache.go updatestate method fails resulting in empty resource object - //cache upate failure costly, as it will put current AW in backoff queue plus take another dispatch cycle - //In worst case the cache update could fail for subsequent dispatche cycles causing test cases to fail or AW never getting dispatched - //To avoid non-determinism below code is workaround. this should be issue should be fixed: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/550 + //cache now is a method inside the controller. + //The reimplementation should fix issue : https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/550 var unallocatedResources = clusterstateapi.EmptyResource() - unallocatedResources = qjm.cache.GetUnallocatedResources() + unallocatedResources = qjm.allocatableCapacity() for unallocatedResources.IsEmpty() { - unallocatedResources.Add(qjm.cache.GetUnallocatedResources()) + unallocatedResources.Add(qjm.allocatableCapacity()) if !unallocatedResources.IsEmpty() { break } } - resources, proposedPreemptions := qjm.getAggregatedAvailableResourcesPriority( unallocatedResources, priorityindex, qj, "") klog.Infof("[ScheduleNext] [Agent Mode] Appwrapper '%s/%s' with resources %v to be scheduled on aggregated idle resources %v", qj.Namespace, qj.Name, aggqj, resources) @@ -1118,6 +1157,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { // Assume preemption will remove low priroity AWs in the system, optimistically dispatch such AWs if aggqj.LessEqual(resources) { + //TODO: should we turn-off histograms? unallocatedHistogramMap := qjm.cache.GetUnallocatedHistograms() if !qjm.nodeChecks(unallocatedHistogramMap, qj) { klog.Infof("[ScheduleNext] [Agent Mode] Optimistic dispatch for AW '%s/%s' requesting aggregated resources %v histogram for point in-time fragmented resources are available in the cluster %s", @@ -1424,8 +1464,9 @@ func (cc *XController) Run(stopCh <-chan struct{}) { cache.WaitForCacheSync(stopCh, cc.appWrapperSynced) + //TODO: do we still need to run cache every second? // update snapshot of ClientStateCache every second - cc.cache.Run(stopCh) + //cc.cache.Run(stopCh) // start preempt thread is used to preempt AWs that have partial pods or have reached dispatch duration go wait.Until(cc.PreemptQueueJobs, 60*time.Second, stopCh) From 7c4acf00f3fb520d1b8764fa82304b53a84b8215 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Fri, 18 Aug 2023 18:57:05 -0400 Subject: [PATCH 2/6] fix test --- test/e2e/queue.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/e2e/queue.go b/test/e2e/queue.go index cf8bbbca..31b54107 100644 --- a/test/e2e/queue.go +++ b/test/e2e/queue.go @@ -120,7 +120,6 @@ var _ = Describe("AppWrapper E2E Test", func() { // This should fill up the worker node and most of the master node aw := createDeploymentAWwith550CPU(context, appendRandomString("aw-deployment-2-550cpu")) appwrappers = append(appwrappers, aw) - time.Sleep(1 * time.Minute) err := waitAWPodsReady(context, aw) Expect(err).NotTo(HaveOccurred()) @@ -128,7 +127,8 @@ var _ = Describe("AppWrapper E2E Test", func() { aw2 := createDeploymentAWwith426CPU(context, appendRandomString("aw-deployment-2-426cpu")) appwrappers = append(appwrappers, aw2) err = waitAWAnyPodsExists(context, aw2) - Expect(err).NotTo(HaveOccurred()) + //With improved accounting, no pods will be spawned + Expect(err).To(HaveOccurred()) // This should fit on cluster, initially queued because of aw2 above but should eventually // run after prevention of aw2 above. From f90b8ae6631f93dafa97311417c7766af75246e5 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Fri, 18 Aug 2023 20:00:29 -0400 Subject: [PATCH 3/6] fix test-1 --- test/e2e/util.go | 1 - 1 file changed, 1 deletion(-) diff --git a/test/e2e/util.go b/test/e2e/util.go index d7abc009..78f94edf 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -1687,7 +1687,6 @@ func createGenericDeploymentAWWithMultipleItems(context *context, name string) * GenericTemplate: runtime.RawExtension{ Raw: rb, }, - CompletionStatus: "Progressing", }, { DesiredAvailable: 1, From f63f2359d8323376a65dbba02ed18dcc9165831a Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Fri, 18 Aug 2023 21:23:36 -0400 Subject: [PATCH 4/6] add polling logic --- test/e2e/queue.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/test/e2e/queue.go b/test/e2e/queue.go index 31b54107..bdce4343 100644 --- a/test/e2e/queue.go +++ b/test/e2e/queue.go @@ -713,8 +713,22 @@ var _ = Describe("AppWrapper E2E Test", func() { Expect(err1).NotTo(HaveOccurred(), "Expecting pods to be ready for app wrapper: aw-deployment-rhc") aw1, err := context.karclient.McadV1beta1().AppWrappers(aw.Namespace).Get(context.ctx, aw.Name, metav1.GetOptions{}) Expect(err).NotTo(HaveOccurred(), "Expecting to get app wrapper status") + pass := false + for true { + aw1, err := context.karclient.McadV1beta1().AppWrappers(aw.Namespace).Get(context.ctx, aw.Name, metav1.GetOptions{}) + if err != nil { + fmt.Fprint(GinkgoWriter, "Error getting status") + } + fmt.Fprintf(GinkgoWriter, "[e2e] status of AW %v.\n", aw1.Status.State) + if aw1.Status.State == arbv1.AppWrapperStateRunningHoldCompletion { + pass = true + } + if pass { + break + } + } fmt.Fprintf(GinkgoWriter, "[e2e] status of AW %v.\n", aw1.Status.State) - Expect(aw1.Status.State).To(Equal(arbv1.AppWrapperStateRunningHoldCompletion)) + Expect(pass).To(BeTrue()) fmt.Fprintf(os.Stdout, "[e2e] MCAD Deployment RuningHoldCompletion Test - Completed. Awaiting app wrapper cleanup.\n") }) From 92ac11a5b2f6146e0540b83532a4383142ca56af Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Fri, 18 Aug 2023 22:39:59 -0400 Subject: [PATCH 5/6] address review --- test/e2e/queue.go | 2 -- test/e2e/util.go | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/test/e2e/queue.go b/test/e2e/queue.go index bdce4343..49375546 100644 --- a/test/e2e/queue.go +++ b/test/e2e/queue.go @@ -722,8 +722,6 @@ var _ = Describe("AppWrapper E2E Test", func() { fmt.Fprintf(GinkgoWriter, "[e2e] status of AW %v.\n", aw1.Status.State) if aw1.Status.State == arbv1.AppWrapperStateRunningHoldCompletion { pass = true - } - if pass { break } } diff --git a/test/e2e/util.go b/test/e2e/util.go index 78f94edf..d7abc009 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -1687,6 +1687,7 @@ func createGenericDeploymentAWWithMultipleItems(context *context, name string) * GenericTemplate: runtime.RawExtension{ Raw: rb, }, + CompletionStatus: "Progressing", }, { DesiredAvailable: 1, From 414be46e1fbdebd695fc6307195c5ee251675c7e Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Sat, 19 Aug 2023 00:51:37 -0400 Subject: [PATCH 6/6] preempt queue job runs at 60 sec interval --- test/e2e/queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/queue.go b/test/e2e/queue.go index 49375546..a103a821 100644 --- a/test/e2e/queue.go +++ b/test/e2e/queue.go @@ -174,7 +174,7 @@ var _ = Describe("AppWrapper E2E Test", func() { aw := createJobAWWithInitContainer(context, "aw-job-3-init-container", 1, "none", 3) appwrappers = append(appwrappers, aw) - err := waitAWPodsCompleted(context, aw, 300*time.Second) + err := waitAWPodsCompleted(context, aw, 200*time.Second) Expect(err).To(HaveOccurred()) })