Skip to content

Commit 6b52148

Browse files
committed
certsync+installer: Write object dirs atomically
Use atomicdir.Sync to write target secret/configmap directories to be synchronized with the relevant objects. Added unit tests, but the coverage is not complete. Particularly filesystem operations failing are not being tested.
1 parent 6c2d3d0 commit 6b52148

File tree

7 files changed

+1007
-174
lines changed

7 files changed

+1007
-174
lines changed

pkg/operator/staticpod/certsyncpod/certsync_controller.go

Lines changed: 52 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package certsyncpod
22

33
import (
44
"context"
5+
"fmt"
56
"os"
67
"path/filepath"
78
"reflect"
@@ -17,17 +18,20 @@ import (
1718

1819
"github.com/openshift/library-go/pkg/controller/factory"
1920
"github.com/openshift/library-go/pkg/operator/events"
20-
"github.com/openshift/library-go/pkg/operator/staticpod"
2121
"github.com/openshift/library-go/pkg/operator/staticpod/controller/installer"
22+
"github.com/openshift/library-go/pkg/operator/staticpod/internal/atomicdir"
23+
"github.com/openshift/library-go/pkg/operator/staticpod/internal/dirutils"
2224
)
2325

26+
const stagingDirUID = "cert-sync"
27+
2428
type CertSyncController struct {
2529
destinationDir string
2630
namespace string
2731
configMaps []installer.UnrevisionedResource
2832
secrets []installer.UnrevisionedResource
2933

30-
configmapGetter corev1interface.ConfigMapInterface
34+
configMapGetter corev1interface.ConfigMapInterface
3135
configMapLister v1.ConfigMapLister
3236
secretGetter corev1interface.SecretInterface
3337
secretLister v1.SecretLister
@@ -42,10 +46,10 @@ func NewCertSyncController(targetDir, targetNamespace string, configmaps, secret
4246
secrets: secrets,
4347
eventRecorder: eventRecorder.WithComponentSuffix("cert-sync-controller"),
4448

45-
configmapGetter: kubeClient.CoreV1().ConfigMaps(targetNamespace),
49+
configMapGetter: kubeClient.CoreV1().ConfigMaps(targetNamespace),
4650
configMapLister: informers.Core().V1().ConfigMaps().Lister(),
47-
secretLister: informers.Core().V1().Secrets().Lister(),
4851
secretGetter: kubeClient.CoreV1().Secrets(targetNamespace),
52+
secretLister: informers.Core().V1().Secrets().Lister(),
4953
}
5054

5155
return factory.New().
@@ -60,15 +64,32 @@ func NewCertSyncController(targetDir, targetNamespace string, configmaps, secret
6064
)
6165
}
6266

67+
func getStagingDir(targetDir string) string {
68+
return filepath.Join(targetDir, "staging", stagingDirUID)
69+
}
70+
6371
func getConfigMapDir(targetDir, configMapName string) string {
6472
return filepath.Join(targetDir, "configmaps", configMapName)
6573
}
6674

75+
func getConfigMapStagingDir(targetDir, secretName string) string {
76+
return filepath.Join(getStagingDir(targetDir), "configmaps", secretName)
77+
}
78+
6779
func getSecretDir(targetDir, secretName string) string {
6880
return filepath.Join(targetDir, "secrets", secretName)
6981
}
7082

83+
func getSecretStagingDir(targetDir, secretName string) string {
84+
return filepath.Join(getStagingDir(targetDir), "secrets", secretName)
85+
}
86+
7187
func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
88+
if err := dirutils.RemoveContent(getStagingDir(c.destinationDir)); err != nil {
89+
c.eventRecorder.Warningf("CertificateUpdateFailed", fmt.Sprintf("Failed to prune staging directory: %v", err))
90+
return err
91+
}
92+
7293
errors := []error{}
7394

7495
klog.Infof("Syncing configmaps: %v", c.configMaps)
@@ -88,7 +109,7 @@ func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncConte
88109
}
89110

90111
// Check with the live call it is really missing
91-
configMap, err = c.configmapGetter.Get(ctx, cm.Name, metav1.GetOptions{})
112+
configMap, err = c.configMapGetter.Get(ctx, cm.Name, metav1.GetOptions{})
92113
if err == nil {
93114
klog.Infof("Caches are stale. They don't see configmap '%s/%s', yet it is present", configMap.Namespace, configMap.Name)
94115
// We will get re-queued when we observe the change
@@ -114,8 +135,9 @@ func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncConte
114135
}
115136

116137
contentDir := getConfigMapDir(c.destinationDir, cm.Name)
138+
stagingDir := getConfigMapStagingDir(c.destinationDir, cm.Name)
117139

118-
data := map[string]string{}
140+
data := make(map[string]string, len(configMap.Data))
119141
for filename := range configMap.Data {
120142
fullFilename := filepath.Join(contentDir, filename)
121143

@@ -138,7 +160,7 @@ func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncConte
138160
klog.V(2).Infof("Syncing updated configmap '%s/%s'.", configMap.Namespace, configMap.Name)
139161

140162
// We need to do a live get here so we don't overwrite a newer file with one from a stale cache
141-
configMap, err = c.configmapGetter.Get(ctx, configMap.Name, metav1.GetOptions{})
163+
configMap, err = c.configMapGetter.Get(ctx, configMap.Name, metav1.GetOptions{})
142164
if err != nil {
143165
// Even if the error is not exists we will act on it when caches catch up
144166
c.eventRecorder.Warningf("CertificateUpdateFailed", "Failed getting configmap: %s/%s: %v", c.namespace, cm.Name, err)
@@ -152,27 +174,12 @@ func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncConte
152174
continue
153175
}
154176

155-
klog.Infof("Creating directory %q ...", contentDir)
156-
if err := os.MkdirAll(contentDir, 0755); err != nil && !os.IsExist(err) {
157-
c.eventRecorder.Warningf("CertificateUpdateFailed", "Failed creating directory for configmap: %s/%s: %v", configMap.Namespace, configMap.Name, err)
158-
errors = append(errors, err)
159-
continue
160-
}
161-
for filename, content := range configMap.Data {
162-
fullFilename := filepath.Join(contentDir, filename)
163-
// if the existing is the same, do nothing
164-
if reflect.DeepEqual(data[fullFilename], content) {
165-
continue
166-
}
167-
168-
klog.Infof("Writing configmap manifest %q ...", fullFilename)
169-
if err := staticpod.WriteFileAtomic([]byte(content), 0644, fullFilename); err != nil {
170-
c.eventRecorder.Warningf("CertificateUpdateFailed", "Failed writing file for configmap: %s/%s: %v", configMap.Namespace, configMap.Name, err)
171-
errors = append(errors, err)
172-
continue
173-
}
177+
files := make(map[string][]byte, len(configMap.Data))
178+
for k, v := range configMap.Data {
179+
files[k] = []byte(v)
174180
}
175-
c.eventRecorder.Eventf("CertificateUpdated", "Wrote updated configmap: %s/%s", configMap.Namespace, configMap.Name)
181+
// XXX: Are these permissions correct?
182+
errors = append(errors, syncDirectory(c.eventRecorder, "configmap", configMap.ObjectMeta, contentDir, 0755, stagingDir, files, 0644))
176183
}
177184

178185
klog.Infof("Syncing secrets: %v", c.secrets)
@@ -219,8 +226,9 @@ func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncConte
219226
}
220227

221228
contentDir := getSecretDir(c.destinationDir, s.Name)
229+
stagingDir := getSecretStagingDir(c.destinationDir, s.Name)
222230

223-
data := map[string][]byte{}
231+
data := make(map[string][]byte, len(secret.Data))
224232
for filename := range secret.Data {
225233
fullFilename := filepath.Join(contentDir, filename)
226234

@@ -257,29 +265,22 @@ func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncConte
257265
continue
258266
}
259267

260-
klog.Infof("Creating directory %q ...", contentDir)
261-
if err := os.MkdirAll(contentDir, 0755); err != nil && !os.IsExist(err) {
262-
c.eventRecorder.Warningf("CertificateUpdateFailed", "Failed creating directory for secret: %s/%s: %v", secret.Namespace, secret.Name, err)
263-
errors = append(errors, err)
264-
continue
265-
}
266-
for filename, content := range secret.Data {
267-
// TODO fix permissions
268-
fullFilename := filepath.Join(contentDir, filename)
269-
// if the existing is the same, do nothing
270-
if reflect.DeepEqual(data[fullFilename], content) {
271-
continue
272-
}
273-
274-
klog.Infof("Writing secret manifest %q ...", fullFilename)
275-
if err := staticpod.WriteFileAtomic(content, 0600, fullFilename); err != nil {
276-
c.eventRecorder.Warningf("CertificateUpdateFailed", "Failed writing file for secret: %s/%s: %v", secret.Namespace, secret.Name, err)
277-
errors = append(errors, err)
278-
continue
279-
}
280-
}
281-
c.eventRecorder.Eventf("CertificateUpdated", "Wrote updated secret: %s/%s", secret.Namespace, secret.Name)
268+
errors = append(errors, syncDirectory(c.eventRecorder, "secret", secret.ObjectMeta, contentDir, 0700, stagingDir, secret.Data, 0600))
282269
}
283-
284270
return utilerrors.NewAggregate(errors)
285271
}
272+
273+
func syncDirectory(
274+
eventRecorder events.Recorder,
275+
typeName string, o metav1.ObjectMeta,
276+
targetDir string, targetDirPerm os.FileMode, stagingDir string,
277+
files map[string][]byte, filePerm os.FileMode,
278+
) error {
279+
if err := atomicdir.Sync(targetDir, targetDirPerm, stagingDir, files, filePerm); err != nil {
280+
err = fmt.Errorf("failed to sync %s %s/%s (directory %q): %w", typeName, o.Name, o.Namespace, targetDir, err)
281+
eventRecorder.Warning("CertificateUpdateFailed", err.Error())
282+
return err
283+
}
284+
eventRecorder.Eventf("CertificateUpdated", "Wrote updated %s: %s/%s", typeName, o.Namespace, o.Name)
285+
return nil
286+
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
//go:build linux
2+
3+
package certsyncpod
4+
5+
import (
6+
"bytes"
7+
"context"
8+
"crypto/ecdsa"
9+
"crypto/elliptic"
10+
"crypto/rand"
11+
"crypto/x509"
12+
"crypto/x509/pkix"
13+
"encoding/pem"
14+
"math/big"
15+
"os"
16+
"path/filepath"
17+
"sync"
18+
"testing"
19+
"time"
20+
21+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
22+
"k8s.io/apimachinery/pkg/util/wait"
23+
"k8s.io/apiserver/pkg/server/dynamiccertificates"
24+
25+
"github.com/openshift/library-go/pkg/operator/staticpod/internal/atomicdir"
26+
)
27+
28+
// TestDynamicCertificates makes sure the receiving side of certificate synchronization works as expected.
29+
// It reads and watches the certificates being synchronized in the same way as e.g. kube-apiserver,
30+
// the very same libraries are being used.
31+
func TestDynamicCertificates(t *testing.T) {
32+
const typeName = "secret"
33+
om := metav1.ObjectMeta{
34+
Namespace: "openshift-kube-apiserver",
35+
Name: "s1",
36+
}
37+
38+
// Generate all necessary keypairs.
39+
tlsCert, tlsKey := generateKeypair(t)
40+
tlsCertUpdated, tlsKeyUpdated := generateKeypair(t)
41+
42+
// Write the keypair into a secret directory.
43+
secretDir := filepath.Join(t.TempDir(), "secrets", om.Name)
44+
stagingDir := filepath.Join(t.TempDir(), "staging", stagingDirUID, "secrets", om.Name)
45+
certFile := filepath.Join(secretDir, "tls.crt")
46+
keyFile := filepath.Join(secretDir, "tls.key")
47+
48+
if err := os.MkdirAll(secretDir, 0700); err != nil {
49+
t.Fatalf("Failed to create secret directory %q: %v", secretDir, err)
50+
}
51+
if err := os.WriteFile(certFile, tlsCert, 0600); err != nil {
52+
t.Fatalf("Failed to write TLS certificate into %q: %v", certFile, err)
53+
}
54+
if err := os.WriteFile(keyFile, tlsKey, 0600); err != nil {
55+
t.Fatalf("Failed to write TLS key into %q: %v", keyFile, err)
56+
}
57+
58+
// Start the watcher.
59+
// This reads the keypair synchronously so the initial state is loaded here.
60+
dc, err := dynamiccertificates.NewDynamicServingContentFromFiles("localhost TLS", certFile, keyFile)
61+
if err != nil {
62+
t.Fatalf("Failed to init dynamic certificate: %v", err)
63+
}
64+
65+
// Check the initial keypair is loaded.
66+
cert, key := dc.CurrentCertKeyContent()
67+
if !bytes.Equal(cert, tlsCert) || !bytes.Equal(key, tlsKey) {
68+
t.Fatal("Unexpected initial keypair loaded")
69+
}
70+
71+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
72+
var wg sync.WaitGroup
73+
wg.Add(1)
74+
go func() {
75+
defer wg.Done()
76+
dc.Run(ctx, 1)
77+
}()
78+
defer wg.Wait()
79+
defer cancel()
80+
81+
// Poll until update detected.
82+
files := map[string][]byte{
83+
"tls.crt": tlsCertUpdated,
84+
"tls.key": tlsKeyUpdated,
85+
}
86+
err = wait.PollUntilContextCancel(ctx, 250*time.Millisecond, true, func(ctx context.Context) (bool, error) {
87+
// Replace the secret directory.
88+
if err := atomicdir.Sync(secretDir, 0700, stagingDir, files, 0600); err != nil {
89+
t.Errorf("Failed to write files: %v", err)
90+
return false, err
91+
}
92+
93+
// Check the loaded content matches.
94+
// This is most probably updated based on write in a previous Poll invocation.
95+
cert, key := dc.CurrentCertKeyContent()
96+
return bytes.Equal(cert, tlsCertUpdated) && bytes.Equal(key, tlsKeyUpdated), nil
97+
})
98+
if err != nil {
99+
t.Fatalf("Failed to wait for dynamic certificate: %v", err)
100+
}
101+
}
102+
103+
// generateKeypair returns (cert, key).
104+
func generateKeypair(t *testing.T) ([]byte, []byte) {
105+
t.Helper()
106+
107+
privateKey, err := ecdsa.GenerateKey(elliptic.P224(), rand.Reader)
108+
if err != nil {
109+
t.Fatalf("Failed to generate TLS key: %v", err)
110+
}
111+
112+
notBefore := time.Now()
113+
notAfter := notBefore.Add(1 * time.Hour)
114+
115+
serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128)
116+
serialNumber, err := rand.Int(rand.Reader, serialNumberLimit)
117+
if err != nil {
118+
t.Fatalf("Failed to generate serial number for TLS keypair: %v", err)
119+
}
120+
121+
template := x509.Certificate{
122+
SerialNumber: serialNumber,
123+
Subject: pkix.Name{
124+
Organization: []string{"Example Org"},
125+
},
126+
NotBefore: notBefore,
127+
NotAfter: notAfter,
128+
KeyUsage: x509.KeyUsageDigitalSignature,
129+
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
130+
BasicConstraintsValid: true,
131+
DNSNames: []string{"example.com"},
132+
}
133+
134+
publicKeyBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &privateKey.PublicKey, privateKey)
135+
if err != nil {
136+
t.Fatalf("Failed to create TLS certificate: %v", err)
137+
}
138+
139+
var certOut bytes.Buffer
140+
if err := pem.Encode(&certOut, &pem.Block{Type: "CERTIFICATE", Bytes: publicKeyBytes}); err != nil {
141+
t.Fatalf("Failed to write certificate PEM: %v", err)
142+
}
143+
144+
privateKeyBytes, err := x509.MarshalPKCS8PrivateKey(privateKey)
145+
if err != nil {
146+
t.Fatalf("Unable to marshal private key: %v", err)
147+
}
148+
149+
var keyOut bytes.Buffer
150+
if err := pem.Encode(&keyOut, &pem.Block{Type: "PRIVATE KEY", Bytes: privateKeyBytes}); err != nil {
151+
t.Fatalf("Failed to write certificate PEM: %v", err)
152+
}
153+
154+
return certOut.Bytes(), keyOut.Bytes()
155+
}

0 commit comments

Comments
 (0)