Skip to content
221 changes: 207 additions & 14 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package manager

import (
"context"
"crypto"
"crypto/x509"
"encoding/pem"
"errors"
Expand All @@ -40,6 +41,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/clock"

internalapi "github.com/cert-manager/csi-lib/internal/api"
Expand Down Expand Up @@ -157,14 +159,73 @@ func NewManager(opts Options) (*Manager, error) {
return nil, fmt.Errorf("building node name label selector: %w", err)
}

// construct the requestToPrivateKeyMap so we can use event handlers below to manage it
var requestToPrivateKeyLock sync.Mutex
requestToPrivateKeyMap := make(map[types.UID]crypto.PrivateKey)
// Create an informer factory
informerFactory := cminformers.NewSharedInformerFactoryWithOptions(opts.Client, 0, cminformers.WithTweakListOptions(func(opts *metav1.ListOptions) {
opts.LabelSelector = labels.NewSelector().Add(*nodeNameReq).String()
}))
// Fetch the lister before calling Start() to ensure this informer is
// registered with the factory
lister := informerFactory.Certmanager().V1().CertificateRequests().Lister()
informerFactory.Certmanager().V1().CertificateRequests().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
requestToPrivateKeyLock.Lock()
defer requestToPrivateKeyLock.Unlock()
key, ok := obj.(string)
if !ok {
return
}
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return
}
req, err := lister.CertificateRequests(namespace).Get(name)
if err != nil {
// we no longer have a copy of this request, so we can't work out its UID.
// instead the associated pending request entry for this request will be cleaned up by the periodic 'janitor' task.
return
}

if _, ok := requestToPrivateKeyMap[req.UID]; ok {
delete(requestToPrivateKeyMap, req.UID)
}
},
})

// create a stop channel that manages all sub goroutines
stopCh := make(chan struct{})
// begin a background routine which periodically checks to ensure all members of the pending request map actually
// have corresponding CertificateRequest objects in the apiserver.
// This avoids leaking memory if we don't observe a request being deleted, or we observe it after the lister has purged
// the request data from its cache.
// this routine must be careful to not delete entries from this map that have JUST been added to the map, but haven't
// been observed by the lister yet (else it may purge data we want to keep, causing a whole new request cycle).
// for now, to avoid this case, we only run the routine every 5 minutes. It would be better if we recorded the time we
// added the entry to the map instead, and only purged items from the map that are older that N duration (TBD).
Comment on lines +199 to +206
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious when will the informer lost the delete event from api server ? I thought the informerFactory will guarantee to resync in a period of time to ensure it captures all the events for eventual consistency ?

As mentioned in your comment, not sure how to prevent the newly added entry not being deleted because of lister is not in sync yet. If the request is happened at the 5 mins edge, will be deleted immediately as the lister does not have it in cache yet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep with a resync period, we will definitely see the fact that the CR has been deleted. However, it's not guaranteed that the lister will still have a copy of the object stored - without the object, we can't convert the namespace/name to a known UID to look up in the requestToPrivateKeyMap.

Hence, if we don't have access to the UID once we have observed the delete, we won't be able to de-register/remove it from our own internal map.

janitorLogger := opts.Log.WithName("pending_request_janitor")
go wait.Until(func() {
requestToPrivateKeyLock.Lock()
defer requestToPrivateKeyLock.Unlock()
reqs, err := lister.List(labels.Everything())
if err != nil {
janitorLogger.Error(err, "failed listing existing requests")
return
}

existsMap := make(map[types.UID]struct{})
for _, req := range reqs {
existsMap[req.UID] = struct{}{}
}

for uid := range requestToPrivateKeyMap {
if _, ok := existsMap[uid]; !ok {
// purge the item from the map as it does not exist in the apiserver
delete(requestToPrivateKeyMap, uid)
}
}
}, time.Minute*5, stopCh)
// Begin watching the API
informerFactory.Start(stopCh)
informerFactory.WaitForCacheSync(stopCh)
Expand All @@ -173,9 +234,13 @@ func NewManager(opts Options) (*Manager, error) {
client: opts.Client,
clientForMetadata: opts.ClientForMetadata,
lister: lister,
metadataReader: opts.MetadataReader,
clock: opts.Clock,
log: *opts.Log,
// we pass a pointer to the mutex as the janitor routine above also uses this lock,
// so we want to avoid making a copy of it
requestToPrivateKeyLock: &requestToPrivateKeyLock,
requestToPrivateKeyMap: requestToPrivateKeyMap,
metadataReader: opts.MetadataReader,
clock: opts.Clock,
log: *opts.Log,

generatePrivateKey: opts.GeneratePrivateKey,
generateRequest: opts.GenerateRequest,
Expand Down Expand Up @@ -260,6 +325,10 @@ type Manager struct {
// lister is used as a read-only cache of CertificateRequest resources
lister cmlisters.CertificateRequestLister

// A map that associates a CertificateRequest's UID with its private key.
requestToPrivateKeyLock *sync.Mutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using sync.RWMutex to improve the performance a little bit on read ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't expect to have many concurrent readers so it seemed like a very negligible performance gain (and keeps things a little simpler to use a regular mutex for future readers)

requestToPrivateKeyMap map[types.UID]crypto.PrivateKey

// used to read metadata from the store
metadataReader storage.MetadataReader

Expand Down Expand Up @@ -298,10 +367,23 @@ type Manager struct {
// requestNameGenerator generates a new random name for a certificaterequest object
// Defaults to uuid.NewUUID() from k8s.io/apimachinery/pkg/util/uuid.
requestNameGenerator func() string

// doNotUse_CallOnEachIssue is a field used SOLELY for testing, and cannot be configured by external package consumers.
// It is used to perform some action (e.g. counting) each time issue() is called.
// It will be removed as soon as we have actual metrics support in csi-lib, which will allow us to measure
// things like the number of times issue() is called.
// No thread safety is added around this field, and it MUST NOT be used for any implementation logic.
// It should not be used full-stop :).
doNotUse_CallOnEachIssue func()
}

// issue will step through the entire issuance flow for a volume.
func (m *Manager) issue(ctx context.Context, volumeID string) error {
// TODO: remove this code and replace with actual metrics support
if m.doNotUse_CallOnEachIssue != nil {
m.doNotUse_CallOnEachIssue()
}

log := m.log.WithValues("volume_id", volumeID)
log.Info("Processing issuance")

Expand All @@ -315,10 +397,32 @@ func (m *Manager) issue(ctx context.Context, volumeID string) error {
}
log.V(2).Info("Read metadata", "metadata", meta)

// check if there is already a pending request in-flight for this volume.
// if there is, and we still have a copy of its private key in memory, we can resume the existing request and
// avoid creating additional CertificateRequest objects.
existingReq, err := m.findPendingRequest(meta)
if err != nil {
return fmt.Errorf("failed when checking if an existing request exists: %w", err)
}
// if there is an existing in-flight request, attempt to 'resume' it (i.e. re-check to see if it is ready)
if existingReq != nil {
// we can only resume a request if we still have a reference to its private key, so look that up in our pending
// requests map
if key, ok := m.readPendingRequestPrivateKey(existingReq.UID); ok {
log.V(4).Info("Re-using existing certificaterequest")
return m.handleRequest(ctx, volumeID, meta, key, existingReq)
}

// if we don't have a copy of the associated private key, delete the currently in-flight request
log.V(2).Info("Found existing request that we don't have corresponding private key for - restarting request process")
if err := m.client.CertmanagerV1().CertificateRequests(existingReq.Namespace).Delete(ctx, existingReq.Name, metav1.DeleteOptions{}); err != nil {
return fmt.Errorf("failed to delete existing in-flight request: %w", err)
}
}

if ready, reason := m.readyToRequest(meta); !ready {
return fmt.Errorf("driver is not ready to request a certificate for this volume: %v", reason)
}

key, err := m.generatePrivateKey(meta)
if err != nil {
return fmt.Errorf("generating private key: %w", err)
Expand All @@ -343,11 +447,78 @@ func (m *Manager) issue(ctx context.Context, volumeID string) error {
}
log.Info("Created new CertificateRequest resource")

// Poll every 1s for the CertificateRequest to be ready
// persist the reference to the private key in memory so we can resume this request in future if it doesn't complete
// the first time.
m.writePendingRequestPrivateKey(req.UID, key)
return m.handleRequest(ctx, volumeID, meta, key, req)
}

func (m *Manager) readPendingRequestPrivateKey(uid types.UID) (crypto.PrivateKey, bool) {
m.requestToPrivateKeyLock.Lock()
defer m.requestToPrivateKeyLock.Unlock()
key, ok := m.requestToPrivateKeyMap[uid]
return key, ok
}

func (m *Manager) writePendingRequestPrivateKey(uid types.UID, key crypto.PrivateKey) {
m.requestToPrivateKeyLock.Lock()
defer m.requestToPrivateKeyLock.Unlock()
m.requestToPrivateKeyMap[uid] = key
}

func (m *Manager) deletePendingRequestPrivateKey(uid types.UID) {
m.requestToPrivateKeyLock.Lock()
defer m.requestToPrivateKeyLock.Unlock()
delete(m.requestToPrivateKeyMap, uid)
}

func (m *Manager) findPendingRequest(meta metadata.Metadata) (*cmapi.CertificateRequest, error) {
reqs, err := m.listAllRequestsForVolume(meta.VolumeID)
if err != nil {
return nil, err
}

if len(reqs) == 0 {
return nil, nil
}

// only consider the newest request - we will never resume an older request
req := reqs[0]
if !certificateRequestCanBeResumed(req) {
return nil, nil
}

// TODO: check if this request is still actually valid for the input metadata
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've not done this just yet, as I don't think it's quite as important as we may think (as volumeAttributes on a pod are not mutable).

The only case where this could be problematic is if a drivers implementation of generateRequest is non-deterministic/can change between calls. To properly handle the wide-range of weird setups users may have, we may actually need to push this comparison function to the driver implementers interface...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider check privateKey against CSR's public key here ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the internal cache is in memory, UIDs are guaranteed to be unique, and the CertificateRequest resource is immutable, I don't think it's actually essential for us to implement the privatekey<>public key check...

I think I am also going to leave this TODO for now, as it isn't really something we handle at all at the moment (and does raise questions around timing, e.g. what if the driver is random, when can we ever really stop?). I'd like us to expand on our expectations around how drivers implement generateRequest before over-complicating this code path :)

return req, nil
}

func certificateRequestCanBeResumed(req *cmapi.CertificateRequest) bool {
for _, cond := range req.Status.Conditions {
if cond.Type == cmapi.CertificateRequestConditionReady {
switch cond.Reason {
case cmapi.CertificateRequestReasonPending, cmapi.CertificateRequestReasonIssued, "":
// either explicit Pending, Issued or empty is considered re-sumable
return true
default:
// any other state is a terminal failed state and means the request has failed
return false
}
}
}
// if there is no Ready condition, the request is still pending processing
return true
}

func (m *Manager) handleRequest(ctx context.Context, volumeID string, meta metadata.Metadata, key crypto.PrivateKey, req *cmapi.CertificateRequest) error {
log := m.log.WithValues("volume_id", volumeID)

// Poll every 200ms for the CertificateRequest to be ready
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to choose 200ms now? This looks like a typical round tripper time for remote data center query.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had reduced it here as this whole block only ever reads from a local in-memory cache anyway, and it reduced test flakes (as there were a few awkward timing issues where we had timeouts of 2s, but 1s sleeps in between each 'loop' here)

lastFailureReason := ""
if err := wait.PollUntilWithContext(ctx, time.Second, func(ctx context.Context) (done bool, err error) {
if err := wait.PollUntilWithContext(ctx, time.Millisecond*200, func(ctx context.Context) (done bool, err error) {
log.V(4).Info("Reading CertificateRequest from lister cache")
updatedReq, err := m.lister.CertificateRequests(req.Namespace).Get(req.Name)
if apierrors.IsNotFound(err) {
log.V(4).Info("Failed to read CertificateRequest from lister cache", "error", err)
// A NotFound error implies something deleted the resource - fail
// early to allow a retry to occur at a later time if needed.
return false, err
Expand All @@ -371,6 +542,7 @@ func (m *Manager) issue(ctx context.Context, volumeID string) error {

isApproved := apiutil.CertificateRequestIsApproved(updatedReq)
if !isApproved {
log.V(4).Info("CertificateRequest is not explicitly approved - continuing to check if the request has been issued anyway")
lastFailureReason = fmt.Sprintf("request %q has not yet been approved by approval plugin", updatedReq.Name)
// we don't stop execution here, as some versions of cert-manager (and some external issuer plugins)
// may not be aware/utilise approval.
Expand All @@ -380,6 +552,7 @@ func (m *Manager) issue(ctx context.Context, volumeID string) error {

readyCondition := apiutil.GetCertificateRequestCondition(updatedReq, cmapi.CertificateRequestConditionReady)
if readyCondition == nil {
log.V(4).Info("Ready condition not found - will recheck...")
// only overwrite the approval failure message if the request is actually approved
// otherwise we may hide more useful information from the user by accident.
if isApproved {
Expand All @@ -390,10 +563,12 @@ func (m *Manager) issue(ctx context.Context, volumeID string) error {

switch readyCondition.Reason {
case cmapi.CertificateRequestReasonIssued:
log.V(4).Info("CertificateRequest has been issued!")
break
case cmapi.CertificateRequestReasonFailed:
return false, fmt.Errorf("request %q has failed: %s", updatedReq.Name, readyCondition.Message)
Comment on lines 568 to 569
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we consider call m.deletePendingRequestPrivateKey(req.UID) when the CertificateRequest is in Failed condition ? It likely to fail again with the same private key. Create a new CR might help resolving the problem if it is due to key reuse issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll automatically use a new private key the next time anyway - if a request is failed, it is terminal so will never be returned again by findPendingRequest (i.e. it won't be re-used). This will in turn trigger a new CR to be created and new private key generated (or at least, a new call to generatePrivateKey).

The item will be deleted from the map once the CR is deleted, although yep perhaps a future optimisation could be to delete terminal failed items from the map a bit early just to save on memory.. but it shouldn't have any functional difference :)

case cmapi.CertificateRequestReasonPending:
log.V(4).Info("CertificateRequest is still pending...")
if isApproved {
lastFailureReason = fmt.Sprintf("request %q is pending: %v", updatedReq.Name, readyCondition.Message)
}
Expand Down Expand Up @@ -425,37 +600,55 @@ func (m *Manager) issue(ctx context.Context, volumeID string) error {
return fmt.Errorf("calculating next issuance time: %w", err)
}
meta.NextIssuanceTime = &renewalPoint
log.V(4).Info("Persisting next issuance time to metadata store", "next_issuance_time", renewalPoint)

if err := m.writeKeypair(meta, key, req.Status.Certificate, req.Status.CA); err != nil {
return fmt.Errorf("writing keypair: %w", err)
}
log.V(2).Info("Wrote new keypair to storage")

// We must explicitly delete the private key from the pending requests map so that the existing Completed
// request will not be re-used upon renewal.
// Without this, the renewal would pick up the existing issued certificate and re-issue, rather than requesting
// a new certificate.
m.deletePendingRequestPrivateKey(req.UID)
log.V(4).Info("Removed pending request private key from internal cache")

return nil
}

func (m *Manager) cleanupStaleRequests(ctx context.Context, log logr.Logger, volumeID string) error {
// returns a list of all pending certificaterequest objects for the given volumeID.
// the returned slice will be ordered with the most recent request FIRST.
func (m *Manager) listAllRequestsForVolume(volumeID string) ([]*cmapi.CertificateRequest, error) {
sel, err := m.labelSelectorForVolume(volumeID)
if err != nil {
return fmt.Errorf("internal error building label selector - this is a bug, please open an issue: %w", err)
return nil, fmt.Errorf("internal error building label selector - this is a bug, please open an issue: %w", err)
}

reqs, err := m.lister.List(sel)
if err != nil {
return fmt.Errorf("listing certificaterequests: %w", err)
}

if len(reqs) < m.maxRequestsPerVolume {
return nil
return nil, fmt.Errorf("listing certificaterequests: %w", err)
}

// sort newest first to oldest last
sort.Slice(reqs, func(i, j int) bool {
return reqs[i].CreationTimestamp.After(reqs[j].CreationTimestamp.Time)
})

return reqs, nil
}

func (m *Manager) cleanupStaleRequests(ctx context.Context, log logr.Logger, volumeID string) error {
reqs, err := m.listAllRequestsForVolume(volumeID)
if err != nil {
return err
}
if len(reqs) <= m.maxRequestsPerVolume {
return nil
}

// start at the end of the slice and work back to maxRequestsPerVolume
for i := len(reqs) - 1; i >= m.maxRequestsPerVolume-1; i-- {
for i := len(reqs) - 1; i > m.maxRequestsPerVolume-1; i-- {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why has this been changed? It is because we can now recover the private key between syncs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah exactly - this function was previously doing some ✨ weird ✨ counting logic, which is now fixed (and you can see the behaviour change by taking a look at how the unit tests have changed too)

toDelete := reqs[i]
if err := m.client.CertmanagerV1().CertificateRequests(toDelete.Namespace).Delete(ctx, toDelete.Name, metav1.DeleteOptions{}); err != nil {
if apierrors.IsNotFound(err) {
Expand Down
Loading