From a58c86c5cbdf1d0869ddd92a36f3d23d74df4996 Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Thu, 31 Aug 2023 15:18:50 +0100 Subject: [PATCH 01/10] add pending request cache to allow for resuming in-flight requests that take longer than a single issuance cycle Signed-off-by: James Munnelly --- manager/manager.go | 198 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 184 insertions(+), 14 deletions(-) diff --git a/manager/manager.go b/manager/manager.go index 29d8577..6a96e6d 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -18,6 +18,7 @@ package manager import ( "context" + "crypto" "crypto/x509" "encoding/pem" "errors" @@ -40,6 +41,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" "k8s.io/utils/clock" internalapi "github.com/cert-manager/csi-lib/internal/api" @@ -157,6 +159,9 @@ func NewManager(opts Options) (*Manager, error) { return nil, fmt.Errorf("building node name label selector: %w", err) } + // construct the requestToPrivateKeyMap so we can use event handlers below to manage it + var requestToPrivateKeyLock sync.Mutex + requestToPrivateKeyMap := make(map[types.UID]crypto.PrivateKey) // Create an informer factory informerFactory := cminformers.NewSharedInformerFactoryWithOptions(opts.Client, 0, cminformers.WithTweakListOptions(func(opts *metav1.ListOptions) { opts.LabelSelector = labels.NewSelector().Add(*nodeNameReq).String() @@ -164,18 +169,75 @@ func NewManager(opts Options) (*Manager, error) { // Fetch the lister before calling Start() to ensure this informer is // registered with the factory lister := informerFactory.Certmanager().V1().CertificateRequests().Lister() + informerFactory.Certmanager().V1().CertificateRequests().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(obj interface{}) { + key, ok := obj.(string) + if !ok { + return + } + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return + } + req, err := lister.CertificateRequests(namespace).Get(name) + if err != nil { + // we no longer have a copy of this request, so we can't work out its UID. + // instead the associated pending request entry for this request will be cleaned up by the periodic 'janitor' task. + return + } + + requestToPrivateKeyLock.Lock() + defer requestToPrivateKeyLock.Unlock() + if _, ok := requestToPrivateKeyMap[req.UID]; ok { + delete(requestToPrivateKeyMap, req.UID) + } + }, + }) + + // create a stop channel that manages all sub goroutines stopCh := make(chan struct{}) + // begin a background routine which periodically checks to ensure all members of the pending request map actually + // have corresponding CertificateRequest objects in the apiserver. + // This avoids leaking memory if we don't observe a request being deleted, or we observe it after the lister has purged + // the request data from its cache. + // this routine must be careful to not delete entries from this map that have JUST been added to the map, but haven't + // been observed by the lister yet (else it may purge data we want to keep, causing a whole new request cycle). + // for now, to avoid this case, we only run the routine every 5 minutes. It would be better if we recorded the time we + // added the entry to the map instead, and only purged items from the map that are older that N duration (TBD). + janitorLogger := opts.Log.WithName("pending_request_janitor") + go wait.Until(func() { + reqs, err := lister.List(labels.Everything()) + if err != nil { + janitorLogger.Error(err, "failed listing existing requests") + } + + existsMap := make(map[types.UID]struct{}) + for _, req := range reqs { + existsMap[req.UID] = struct{}{} + } + + requestToPrivateKeyLock.Lock() + defer requestToPrivateKeyLock.Unlock() + for uid := range requestToPrivateKeyMap { + if _, ok := existsMap[uid]; !ok { + // purge the item from the map as it does not exist in the apiserver + delete(requestToPrivateKeyMap, uid) + } + } + }, time.Minute*5, stopCh) // Begin watching the API informerFactory.Start(stopCh) informerFactory.WaitForCacheSync(stopCh) m := &Manager{ - client: opts.Client, - clientForMetadata: opts.ClientForMetadata, - lister: lister, - metadataReader: opts.MetadataReader, - clock: opts.Clock, - log: *opts.Log, + client: opts.Client, + clientForMetadata: opts.ClientForMetadata, + lister: lister, + requestToPrivateKeyLock: &requestToPrivateKeyLock, + requestToPrivateKeyMap: requestToPrivateKeyMap, + metadataReader: opts.MetadataReader, + clock: opts.Clock, + log: *opts.Log, generatePrivateKey: opts.GeneratePrivateKey, generateRequest: opts.GenerateRequest, @@ -260,6 +322,10 @@ type Manager struct { // lister is used as a read-only cache of CertificateRequest resources lister cmlisters.CertificateRequestLister + // A map that associates a CertificateRequest's UID with its private key. + requestToPrivateKeyLock *sync.Mutex + requestToPrivateKeyMap map[types.UID]crypto.PrivateKey + // used to read metadata from the store metadataReader storage.MetadataReader @@ -315,10 +381,32 @@ func (m *Manager) issue(ctx context.Context, volumeID string) error { } log.V(2).Info("Read metadata", "metadata", meta) + // check if there is already a pending request in-flight for this volume. + // if there is, and we still have a copy of its private key in memory, we can resume the existing request and + // avoid creating additional CertificateRequest objects. + existingReq, err := m.findPendingRequest(meta) + if err != nil { + return fmt.Errorf("failed when checking if an existing request exists: %w", err) + } + // if there is an existing in-flight request, attempt to 'resume' it (i.e. re-check to see if it is ready) + if existingReq != nil { + // we can only resume a request if we still have a reference to its private key, so look that up in our pending + // requests map + if key, ok := m.readPendingRequestPrivateKey(existingReq.UID); ok { + log.V(4).Info("Re-using existing certificaterequest") + return m.handleRequest(ctx, volumeID, meta, key, existingReq) + } + + // if we don't have a copy of the associated private key, delete the currently in-flight request + log.V(2).Info("Found existing request that we don't have corresponding private key for - restarting request process") + if err := m.client.CertmanagerV1().CertificateRequests(existingReq.Namespace).Delete(ctx, existingReq.Name, metav1.DeleteOptions{}); err != nil { + return fmt.Errorf("failed to delete existing in-flight request: %w", err) + } + } + if ready, reason := m.readyToRequest(meta); !ready { return fmt.Errorf("driver is not ready to request a certificate for this volume: %v", reason) } - key, err := m.generatePrivateKey(meta) if err != nil { return fmt.Errorf("generating private key: %w", err) @@ -343,6 +431,71 @@ func (m *Manager) issue(ctx context.Context, volumeID string) error { } log.Info("Created new CertificateRequest resource") + // persist the reference to the private key in memory so we can resume this request in future if it doesn't complete + // the first time. + m.writePendingRequestPrivateKey(req.UID, key) + return m.handleRequest(ctx, volumeID, meta, key, req) +} + +func (m *Manager) readPendingRequestPrivateKey(uid types.UID) (crypto.PrivateKey, bool) { + m.requestToPrivateKeyLock.Lock() + defer m.requestToPrivateKeyLock.Unlock() + key, ok := m.requestToPrivateKeyMap[uid] + return key, ok +} + +func (m *Manager) writePendingRequestPrivateKey(uid types.UID, key crypto.PrivateKey) { + m.requestToPrivateKeyLock.Lock() + defer m.requestToPrivateKeyLock.Unlock() + m.requestToPrivateKeyMap[uid] = key +} + +func (m *Manager) deletePendingRequestPrivateKey(uid types.UID) { + m.requestToPrivateKeyLock.Lock() + defer m.requestToPrivateKeyLock.Unlock() + delete(m.requestToPrivateKeyMap, uid) +} + +func (m *Manager) findPendingRequest(meta metadata.Metadata) (*cmapi.CertificateRequest, error) { + reqs, err := m.listAllRequestsForVolume(meta.VolumeID) + if err != nil { + return nil, err + } + + if len(reqs) == 0 { + return nil, nil + } + + // only consider the newest request - we will never resume an older request + req := reqs[0] + if !certificateRequestCanBeResumed(req) { + return nil, nil + } + + // TODO: check if this request is still actually valid for the input metadata + return req, nil +} + +func certificateRequestCanBeResumed(req *cmapi.CertificateRequest) bool { + for _, cond := range req.Status.Conditions { + if cond.Type == cmapi.CertificateRequestConditionReady { + switch cond.Reason { + case cmapi.CertificateRequestReasonPending, cmapi.CertificateRequestReasonIssued, "": + // either explicit Pending, Issued or empty is considered re-sumable + return true + default: + // any other state is a terminal failed state and means the request has failed + return false + } + } + } + // if there is no Ready condition, the request is still pending processing + return true +} + +func (m *Manager) handleRequest(ctx context.Context, volumeID string, meta metadata.Metadata, key crypto.PrivateKey, req *cmapi.CertificateRequest) error { + log := m.log.WithValues("volume_id", volumeID) + // Poll every 1s for the CertificateRequest to be ready lastFailureReason := "" if err := wait.PollUntilWithContext(ctx, time.Second, func(ctx context.Context) (done bool, err error) { @@ -431,22 +584,26 @@ func (m *Manager) issue(ctx context.Context, volumeID string) error { } log.V(2).Info("Wrote new keypair to storage") + // We must explicitly delete the private key from the pending requests map so that the existing Completed + // request will not be re-used upon renewal. + // Without this, the renewal would pick up the existing issued certificate and re-issue, rather than requesting + // a new certificate. + m.deletePendingRequestPrivateKey(req.UID) + return nil } -func (m *Manager) cleanupStaleRequests(ctx context.Context, log logr.Logger, volumeID string) error { +// returns a list of all pending certificaterequest objects for the given volumeID. +// the returned slice will be ordered with the most recent request FIRST. +func (m *Manager) listAllRequestsForVolume(volumeID string) ([]*cmapi.CertificateRequest, error) { sel, err := m.labelSelectorForVolume(volumeID) if err != nil { - return fmt.Errorf("internal error building label selector - this is a bug, please open an issue: %w", err) + return nil, fmt.Errorf("internal error building label selector - this is a bug, please open an issue: %w", err) } reqs, err := m.lister.List(sel) if err != nil { - return fmt.Errorf("listing certificaterequests: %w", err) - } - - if len(reqs) < m.maxRequestsPerVolume { - return nil + return nil, fmt.Errorf("listing certificaterequests: %w", err) } // sort newest first to oldest last @@ -454,6 +611,19 @@ func (m *Manager) cleanupStaleRequests(ctx context.Context, log logr.Logger, vol return reqs[i].CreationTimestamp.After(reqs[j].CreationTimestamp.Time) }) + return reqs, nil +} + +func (m *Manager) cleanupStaleRequests(ctx context.Context, log logr.Logger, volumeID string) error { + reqs, err := m.listAllRequestsForVolume(volumeID) + if err != nil { + return err + } + + if len(reqs) < m.maxRequestsPerVolume { + return nil + } + // start at the end of the slice and work back to maxRequestsPerVolume for i := len(reqs) - 1; i >= m.maxRequestsPerVolume-1; i-- { toDelete := reqs[i] From cf77048ff14c9701d975f879b155ad4fcbafa8d9 Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Fri, 1 Sep 2023 13:03:46 +0100 Subject: [PATCH 02/10] reduce issuance poll interval to 200ms Signed-off-by: James Munnelly --- manager/manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/manager/manager.go b/manager/manager.go index 6a96e6d..224181d 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -496,9 +496,9 @@ func certificateRequestCanBeResumed(req *cmapi.CertificateRequest) bool { func (m *Manager) handleRequest(ctx context.Context, volumeID string, meta metadata.Metadata, key crypto.PrivateKey, req *cmapi.CertificateRequest) error { log := m.log.WithValues("volume_id", volumeID) - // Poll every 1s for the CertificateRequest to be ready + // Poll every 200ms for the CertificateRequest to be ready lastFailureReason := "" - if err := wait.PollUntilWithContext(ctx, time.Second, func(ctx context.Context) (done bool, err error) { + if err := wait.PollUntilWithContext(ctx, time.Millisecond*200, func(ctx context.Context) (done bool, err error) { updatedReq, err := m.lister.CertificateRequests(req.Namespace).Get(req.Name) if apierrors.IsNotFound(err) { // A NotFound error implies something deleted the resource - fail From 2e36109c8c2e6deee6df0de86327dd43df770a78 Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Fri, 1 Sep 2023 13:04:09 +0100 Subject: [PATCH 03/10] add issuance resumption integration tests Signed-off-by: James Munnelly --- test/integration/resume_request_test.go | 183 ++++++++++++++++++++++++ 1 file changed, 183 insertions(+) create mode 100644 test/integration/resume_request_test.go diff --git a/test/integration/resume_request_test.go b/test/integration/resume_request_test.go new file mode 100644 index 0000000..17d30df --- /dev/null +++ b/test/integration/resume_request_test.go @@ -0,0 +1,183 @@ +/* +Copyright 2023 The cert-manager Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "context" + "crypto" + "crypto/x509" + "os" + "reflect" + "testing" + "time" + + cmclient "github.com/cert-manager/cert-manager/pkg/client/clientset/versioned" + "github.com/container-storage-interface/spec/lib/go/csi" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + fakeclock "k8s.io/utils/clock/testing" + + "github.com/cert-manager/csi-lib/manager" + "github.com/cert-manager/csi-lib/metadata" + "github.com/cert-manager/csi-lib/storage" + testdriver "github.com/cert-manager/csi-lib/test/driver" + testutil "github.com/cert-manager/csi-lib/test/util" +) + +func testResumesExistingRequest(t *testing.T, issueBeforeCall bool) { + store := storage.NewMemoryFS() + ns := "certificaterequest-namespace" + clock := fakeclock.NewFakeClock(time.Now()) + opts, cl, stop := testdriver.Run(t, testdriver.Options{ + Store: store, + Clock: clock, + GeneratePrivateKey: func(meta metadata.Metadata) (crypto.PrivateKey, error) { + return nil, nil + }, + GenerateRequest: func(meta metadata.Metadata) (*manager.CertificateRequestBundle, error) { + return &manager.CertificateRequestBundle{ + Namespace: ns, + }, nil + }, + SignRequest: func(meta metadata.Metadata, key crypto.PrivateKey, request *x509.CertificateRequest) (csr []byte, err error) { + return []byte{}, nil + }, + WriteKeypair: func(meta metadata.Metadata, key crypto.PrivateKey, chain []byte, ca []byte) error { + store.WriteFiles(meta, map[string][]byte{ + "ca": ca, + "cert": chain, + }) + nextIssuanceTime := clock.Now().Add(time.Hour) + meta.NextIssuanceTime = &nextIssuanceTime + return store.WriteMetadata(meta.VolumeID, meta) + }, + }) + defer stop() + + tmpDir, err := os.MkdirTemp("", "*") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + // create a root, non-expiring context + ctx := context.Background() + + // We are going to submit this request multiple times, so lets just write it out once + nodePublishVolumeRequest := &csi.NodePublishVolumeRequest{ + VolumeId: "test-vol", + VolumeContext: map[string]string{ + "csi.storage.k8s.io/ephemeral": "true", + "csi.storage.k8s.io/pod.name": "the-pod-name", + "csi.storage.k8s.io/pod.namespace": ns, + }, + TargetPath: tmpDir, + Readonly: true, + } + + // create a context that expires in 2s (enough time for at least a single call of `issue`) + twoSecondCtx, cancel := context.WithTimeout(ctx, time.Second*2) + defer cancel() + _, err = cl.NodePublishVolume(twoSecondCtx, nodePublishVolumeRequest) + // assert that an error has been returned - we don't mind what kind of error, as due to the async nature of + // de-registering metadata from the metadata store upon failures, there is a slim chance that a metadata read error + // can be returned instead of a deadline exceeded error. + if err == nil { + t.Errorf("expected error but got nil") + } + + // ensure a single CertificateRequest exists, and fetch its UID so we can compare it later + existingRequestUID := ensureOneRequestExists(ctx, t, opts.Client, ns, "") + + // run NodePublishVolume once again, with a short timeout. + // here we want to ensure that no second request is completed, and the timeout is reached again. + // we still won't actually complete issuance here. + twoSecondCtx, cancel = context.WithTimeout(ctx, time.Second*2) + defer cancel() + _, err = cl.NodePublishVolume(twoSecondCtx, nodePublishVolumeRequest) + // assert that an error has been returned - we don't mind what kind of error, as due to the async nature of + // de-registering metadata from the metadata store upon failures, there is a slim chance that a metadata read error + // can be returned instead of a deadline exceeded error. + if err == nil { + t.Errorf("expected error but got nil") + } + // ensure the same certificaterequest object still exists + ensureOneRequestExists(ctx, t, opts.Client, ns, existingRequestUID) + + stopCh := make(chan struct{}) + defer close(stopCh) + if issueBeforeCall { + // we don't run this in a goroutine so we can be sure the certificaterequest is completed BEFORE the issue loop is entered + testutil.IssueOneRequest(t, opts.Client, "certificaterequest-namespace", stopCh, selfSignedExampleCertificate, []byte("ca bytes")) + } else { + go func() { + // allow 500ms before actually issuing the request so we can be sure we're within the issue() function call + // when the certificaterequest is finally completed + time.Sleep(time.Millisecond * 500) + testutil.IssueOneRequest(t, opts.Client, "certificaterequest-namespace", stopCh, selfSignedExampleCertificate, []byte("ca bytes")) + }() + } + + // call NodePublishVolume again. this time, we expect NodePublishVolume to return without an error and actually issue + // the certificate using the existing request data. + // We don't use an explicit timeout here to avoid any weird race conditions caused by shorter test timeouts. + _, err = cl.NodePublishVolume(ctx, nodePublishVolumeRequest) + if err != nil { + t.Errorf("expected no error but got: %v", err) + } + // ensure the same certificaterequest object still exists + ensureOneRequestExists(ctx, t, opts.Client, ns, existingRequestUID) + + files, err := store.ReadFiles("test-vol") + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(files["ca"], []byte("ca bytes")) { + t.Errorf("unexpected CA data: %v", files["ca"]) + } + if !reflect.DeepEqual(files["cert"], selfSignedExampleCertificate) { + t.Errorf("unexpected certificate data: %v", files["cert"]) + } +} + +func TestResumesExistingRequest_IssuedBetweenPublishCalls(t *testing.T) { + testResumesExistingRequest(t, true) +} + +func TestResumesExistingRequest_IssuedDuringPublishCall(t *testing.T) { + testResumesExistingRequest(t, false) +} + +// ensureOneRequestExists will fail the test if more than a single CertificateRequest exists. +// If permittedUID is non-empty and a request DOES exist, it will also ensure that the existing request has +// the given UID. +// It will return the UID of the existing request. +func ensureOneRequestExists(ctx context.Context, t *testing.T, client cmclient.Interface, namespace string, permittedUID types.UID) types.UID { + // assert a single CertificateRequest object exists + reqs, err := client.CertmanagerV1().CertificateRequests(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + t.Fatal(err) + } + if len(reqs.Items) != 1 { + t.Fatalf("expected to find one existing CertificateRequest but got %d", len(reqs.Items)) + } + req := reqs.Items[0] + if string(permittedUID) != "" && req.UID != permittedUID { + t.Fatalf("existing request does not have expected UID of %q - this means the request has probably been deleted and re-created", permittedUID) + } + return req.UID +} From 29ed31bbe6bc92d27b22bba1001bee5133992984 Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Fri, 1 Sep 2023 13:15:40 +0100 Subject: [PATCH 04/10] add additional debug logging Signed-off-by: James Munnelly --- manager/manager.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/manager/manager.go b/manager/manager.go index 224181d..3764a54 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -499,8 +499,10 @@ func (m *Manager) handleRequest(ctx context.Context, volumeID string, meta metad // Poll every 200ms for the CertificateRequest to be ready lastFailureReason := "" if err := wait.PollUntilWithContext(ctx, time.Millisecond*200, func(ctx context.Context) (done bool, err error) { + log.V(4).Info("Reading CertificateRequest from lister cache") updatedReq, err := m.lister.CertificateRequests(req.Namespace).Get(req.Name) if apierrors.IsNotFound(err) { + log.V(4).Info("Failed to read CertificateRequest from lister cache", "error", err) // A NotFound error implies something deleted the resource - fail // early to allow a retry to occur at a later time if needed. return false, err @@ -524,6 +526,7 @@ func (m *Manager) handleRequest(ctx context.Context, volumeID string, meta metad isApproved := apiutil.CertificateRequestIsApproved(updatedReq) if !isApproved { + log.V(4).Info("CertificateRequest is not explicitly approved - continuing to check if the request has been issued anyway") lastFailureReason = fmt.Sprintf("request %q has not yet been approved by approval plugin", updatedReq.Name) // we don't stop execution here, as some versions of cert-manager (and some external issuer plugins) // may not be aware/utilise approval. @@ -533,6 +536,7 @@ func (m *Manager) handleRequest(ctx context.Context, volumeID string, meta metad readyCondition := apiutil.GetCertificateRequestCondition(updatedReq, cmapi.CertificateRequestConditionReady) if readyCondition == nil { + log.V(4).Info("Ready condition not found - will recheck...") // only overwrite the approval failure message if the request is actually approved // otherwise we may hide more useful information from the user by accident. if isApproved { @@ -543,10 +547,12 @@ func (m *Manager) handleRequest(ctx context.Context, volumeID string, meta metad switch readyCondition.Reason { case cmapi.CertificateRequestReasonIssued: + log.V(4).Info("CertificateRequest has been issued!") break case cmapi.CertificateRequestReasonFailed: return false, fmt.Errorf("request %q has failed: %s", updatedReq.Name, readyCondition.Message) case cmapi.CertificateRequestReasonPending: + log.V(4).Info("CertificateRequest is still pending...") if isApproved { lastFailureReason = fmt.Sprintf("request %q is pending: %v", updatedReq.Name, readyCondition.Message) } @@ -578,6 +584,7 @@ func (m *Manager) handleRequest(ctx context.Context, volumeID string, meta metad return fmt.Errorf("calculating next issuance time: %w", err) } meta.NextIssuanceTime = &renewalPoint + log.V(4).Info("Persisting next issuance time to metadata store", "next_issuance_time", renewalPoint) if err := m.writeKeypair(meta, key, req.Status.Certificate, req.Status.CA); err != nil { return fmt.Errorf("writing keypair: %w", err) @@ -589,6 +596,7 @@ func (m *Manager) handleRequest(ctx context.Context, volumeID string, meta metad // Without this, the renewal would pick up the existing issued certificate and re-issue, rather than requesting // a new certificate. m.deletePendingRequestPrivateKey(req.UID) + log.V(4).Info("Removed pending request private key from internal cache") return nil } From 4c00ad31f9fbf52a8f959702001e5a0202102070 Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Fri, 1 Sep 2023 13:15:50 +0100 Subject: [PATCH 05/10] fixup cleanupStaleRequests Signed-off-by: James Munnelly --- manager/manager.go | 5 ++--- manager/manager_test.go | 7 +++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/manager/manager.go b/manager/manager.go index 3764a54..38c89d0 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -627,13 +627,12 @@ func (m *Manager) cleanupStaleRequests(ctx context.Context, log logr.Logger, vol if err != nil { return err } - - if len(reqs) < m.maxRequestsPerVolume { + if len(reqs) <= m.maxRequestsPerVolume { return nil } // start at the end of the slice and work back to maxRequestsPerVolume - for i := len(reqs) - 1; i >= m.maxRequestsPerVolume-1; i-- { + for i := len(reqs) - 1; i > m.maxRequestsPerVolume-1; i-- { toDelete := reqs[i] if err := m.client.CertmanagerV1().CertificateRequests(toDelete.Namespace).Delete(ctx, toDelete.Name, metav1.DeleteOptions{}); err != nil { if apierrors.IsNotFound(err) { diff --git a/manager/manager_test.go b/manager/manager_test.go index a226d29..94c00f1 100644 --- a/manager/manager_test.go +++ b/manager/manager_test.go @@ -364,12 +364,12 @@ func TestManager_cleanupStaleRequests(t *testing.T) { wantErr bool }{ { - name: "maxRequestsPerVolume=1: all stale CSRs should be deleted", + name: "maxRequestsPerVolume=1: one stale CSR should be left", objects: []*cmapi.CertificateRequest{ cr("cr-1", defaultTestNamespace, "nodeID-1", "volumeID-1"), cr("cr-2", defaultTestNamespace, "nodeID-1", "volumeID-1"), }, - toBeDeleted: []string{"cr-2", "cr-1"}, + toBeDeleted: []string{"cr-2"}, fields: fields{ nodeID: "nodeID-1", maxRequestsPerVolume: 1, @@ -377,12 +377,11 @@ func TestManager_cleanupStaleRequests(t *testing.T) { wantErr: false, }, { - name: "maxRequestsPerVolume=2: 1 stale CSRs should be left", + name: "maxRequestsPerVolume=2: 2 stale CSRs should be left", objects: []*cmapi.CertificateRequest{ cr("cr-1", defaultTestNamespace, "nodeID-1", "volumeID-1"), cr("cr-2", defaultTestNamespace, "nodeID-1", "volumeID-1"), }, - toBeDeleted: []string{"cr-2"}, fields: fields{ nodeID: "nodeID-1", maxRequestsPerVolume: 2, From a71341ecd261937c23b37a0171ff2642a9b20b4a Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Fri, 1 Sep 2023 13:16:05 +0100 Subject: [PATCH 06/10] testing: set logger verbosity to 999999 Signed-off-by: James Munnelly --- test/driver/driver_testing.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/driver/driver_testing.go b/test/driver/driver_testing.go index c223d52..bbd4252 100644 --- a/test/driver/driver_testing.go +++ b/test/driver/driver_testing.go @@ -27,7 +27,7 @@ import ( fakeclient "github.com/cert-manager/cert-manager/pkg/client/clientset/versioned/fake" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/go-logr/logr" - logrtesting "github.com/go-logr/logr/testing" + "github.com/go-logr/logr/testr" "google.golang.org/grpc" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/mount-utils" @@ -59,7 +59,7 @@ type Options struct { func Run(t *testing.T, opts Options) (Options, csi.NodeClient, func()) { if opts.Log == nil { - logger := logrtesting.NewTestLogger(t) + logger := testr.NewWithOptions(t, testr.Options{Verbosity: 999999}) opts.Log = &logger } if opts.Clock == nil { From 6b92c3b46cab5c00cf18c9c274bbcbba35b5cf57 Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Fri, 1 Sep 2023 13:25:56 +0100 Subject: [PATCH 07/10] return early if we fail to list requests from lister in janitor job Signed-off-by: James Munnelly --- manager/manager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/manager/manager.go b/manager/manager.go index 38c89d0..f72a269 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -209,6 +209,7 @@ func NewManager(opts Options) (*Manager, error) { reqs, err := lister.List(labels.Everything()) if err != nil { janitorLogger.Error(err, "failed listing existing requests") + return } existsMap := make(map[types.UID]struct{}) From 188534229d0dea32ec233793f0ffaedfc004ce83 Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Thu, 28 Sep 2023 13:46:49 +0100 Subject: [PATCH 08/10] address review feedback Signed-off-by: James Munnelly --- manager/manager.go | 12 +++++++----- test/integration/resume_request_test.go | 26 +++++++++++++------------ 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/manager/manager.go b/manager/manager.go index f72a269..8c779a2 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -206,6 +206,8 @@ func NewManager(opts Options) (*Manager, error) { // added the entry to the map instead, and only purged items from the map that are older that N duration (TBD). janitorLogger := opts.Log.WithName("pending_request_janitor") go wait.Until(func() { + requestToPrivateKeyLock.Lock() + defer requestToPrivateKeyLock.Unlock() reqs, err := lister.List(labels.Everything()) if err != nil { janitorLogger.Error(err, "failed listing existing requests") @@ -217,8 +219,6 @@ func NewManager(opts Options) (*Manager, error) { existsMap[req.UID] = struct{}{} } - requestToPrivateKeyLock.Lock() - defer requestToPrivateKeyLock.Unlock() for uid := range requestToPrivateKeyMap { if _, ok := existsMap[uid]; !ok { // purge the item from the map as it does not exist in the apiserver @@ -231,9 +231,11 @@ func NewManager(opts Options) (*Manager, error) { informerFactory.WaitForCacheSync(stopCh) m := &Manager{ - client: opts.Client, - clientForMetadata: opts.ClientForMetadata, - lister: lister, + client: opts.Client, + clientForMetadata: opts.ClientForMetadata, + lister: lister, + // we pass a pointer to the mutex as the janitor routine above also uses this lock, + // so we want to avoid making a copy of it requestToPrivateKeyLock: &requestToPrivateKeyLock, requestToPrivateKeyMap: requestToPrivateKeyMap, metadataReader: opts.MetadataReader, diff --git a/test/integration/resume_request_test.go b/test/integration/resume_request_test.go index 17d30df..f76e721 100644 --- a/test/integration/resume_request_test.go +++ b/test/integration/resume_request_test.go @@ -20,7 +20,6 @@ import ( "context" "crypto" "crypto/x509" - "os" "reflect" "testing" "time" @@ -38,7 +37,14 @@ import ( testutil "github.com/cert-manager/csi-lib/test/util" ) -func testResumesExistingRequest(t *testing.T, issueBeforeCall bool) { +type WhenToCallIssue bool + +const ( + CallIssueDuringPublish = false + CallIssueBetweenPublish = true +) + +func testResumesExistingRequest(t *testing.T, issueBeforeCall WhenToCallIssue) { store := storage.NewMemoryFS() ns := "certificaterequest-namespace" clock := fakeclock.NewFakeClock(time.Now()) @@ -66,13 +72,9 @@ func testResumesExistingRequest(t *testing.T, issueBeforeCall bool) { return store.WriteMetadata(meta.VolumeID, meta) }, }) - defer stop() + t.Cleanup(stop) - tmpDir, err := os.MkdirTemp("", "*") - if err != nil { - t.Fatal(err) - } - defer os.RemoveAll(tmpDir) + tmpDir := t.TempDir() // create a root, non-expiring context ctx := context.Background() @@ -91,8 +93,8 @@ func testResumesExistingRequest(t *testing.T, issueBeforeCall bool) { // create a context that expires in 2s (enough time for at least a single call of `issue`) twoSecondCtx, cancel := context.WithTimeout(ctx, time.Second*2) - defer cancel() - _, err = cl.NodePublishVolume(twoSecondCtx, nodePublishVolumeRequest) + t.Cleanup(cancel) + _, err := cl.NodePublishVolume(twoSecondCtx, nodePublishVolumeRequest) // assert that an error has been returned - we don't mind what kind of error, as due to the async nature of // de-registering metadata from the metadata store upon failures, there is a slim chance that a metadata read error // can be returned instead of a deadline exceeded error. @@ -155,11 +157,11 @@ func testResumesExistingRequest(t *testing.T, issueBeforeCall bool) { } func TestResumesExistingRequest_IssuedBetweenPublishCalls(t *testing.T) { - testResumesExistingRequest(t, true) + testResumesExistingRequest(t, CallIssueBetweenPublish) } func TestResumesExistingRequest_IssuedDuringPublishCall(t *testing.T) { - testResumesExistingRequest(t, false) + testResumesExistingRequest(t, CallIssueDuringPublish) } // ensureOneRequestExists will fail the test if more than a single CertificateRequest exists. From 60a2d8789bad7ddcabb46b0444c82f4c558836c7 Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Fri, 13 Oct 2023 09:58:27 +0100 Subject: [PATCH 09/10] acquire requestToPrivateKey lock at the start of the event handler Signed-off-by: James Munnelly --- manager/manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/manager/manager.go b/manager/manager.go index 8c779a2..8566f84 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -171,6 +171,8 @@ func NewManager(opts Options) (*Manager, error) { lister := informerFactory.Certmanager().V1().CertificateRequests().Lister() informerFactory.Certmanager().V1().CertificateRequests().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: func(obj interface{}) { + requestToPrivateKeyLock.Lock() + defer requestToPrivateKeyLock.Unlock() key, ok := obj.(string) if !ok { return @@ -186,8 +188,6 @@ func NewManager(opts Options) (*Manager, error) { return } - requestToPrivateKeyLock.Lock() - defer requestToPrivateKeyLock.Unlock() if _, ok := requestToPrivateKeyMap[req.UID]; ok { delete(requestToPrivateKeyMap, req.UID) } From 0ce8db06c0a256330cf8203b39bd9bbdf084eaa4 Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Thu, 30 Nov 2023 14:23:29 +0000 Subject: [PATCH 10/10] fix exponential backoff test handling Signed-off-by: James Munnelly --- manager/manager.go | 13 +++++++++++++ manager/manager_test.go | 15 +++++++-------- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/manager/manager.go b/manager/manager.go index 8566f84..ad9e5a7 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -367,10 +367,23 @@ type Manager struct { // requestNameGenerator generates a new random name for a certificaterequest object // Defaults to uuid.NewUUID() from k8s.io/apimachinery/pkg/util/uuid. requestNameGenerator func() string + + // doNotUse_CallOnEachIssue is a field used SOLELY for testing, and cannot be configured by external package consumers. + // It is used to perform some action (e.g. counting) each time issue() is called. + // It will be removed as soon as we have actual metrics support in csi-lib, which will allow us to measure + // things like the number of times issue() is called. + // No thread safety is added around this field, and it MUST NOT be used for any implementation logic. + // It should not be used full-stop :). + doNotUse_CallOnEachIssue func() } // issue will step through the entire issuance flow for a volume. func (m *Manager) issue(ctx context.Context, volumeID string) error { + // TODO: remove this code and replace with actual metrics support + if m.doNotUse_CallOnEachIssue != nil { + m.doNotUse_CallOnEachIssue() + } + log := m.log.WithValues("volume_id", volumeID) log.Info("Processing issuance") diff --git a/manager/manager_test.go b/manager/manager_test.go index 94c00f1..af0c953 100644 --- a/manager/manager_test.go +++ b/manager/manager_test.go @@ -304,7 +304,6 @@ func TestManager_ManageVolume_exponentialBackOffRetryOnIssueErrors(t *testing.T) expectGlobalTimeout := 2 * time.Second var numOfRetries int32 = 0 // init - opts := newDefaultTestOptions(t) opts.RenewalBackoffConfig = &wait.Backoff{ Duration: expBackOffDuration, @@ -313,16 +312,16 @@ func TestManager_ManageVolume_exponentialBackOffRetryOnIssueErrors(t *testing.T) Jitter: expBackOffJitter, Steps: expBackOffSteps, } - opts.ReadyToRequest = func(meta metadata.Metadata) (bool, string) { - // ReadyToRequest will be called by issue() - atomic.AddInt32(&numOfRetries, 1) // run in a goroutine, thus increment it atomically - return true, "" // AlwaysReadyToRequest - } m, err := NewManager(opts) - m.issueRenewalTimeout = issueRenewalTimeout if err != nil { t.Fatal(err) } + m.issueRenewalTimeout = issueRenewalTimeout + // Increment the 'numOfRetries' counter whenever issue() is called. + // TODO: replace usages of this function with reading from metrics. + m.doNotUse_CallOnEachIssue = func() { + atomic.AddInt32(&numOfRetries, 1) // run in a goroutine, thus increment it atomically + } // Register a new volume with the metadata store store := opts.MetadataReader.(storage.Interface) @@ -347,7 +346,7 @@ func TestManager_ManageVolume_exponentialBackOffRetryOnIssueErrors(t *testing.T) actualNumOfRetries := atomic.LoadInt32(&numOfRetries) // read atomically if actualNumOfRetries != expectNumOfRetries { - t.Errorf("expect %d of retires, but got %d", expectNumOfRetries, actualNumOfRetries) + t.Errorf("expect %d retires, but got %d", expectNumOfRetries, actualNumOfRetries) } }