Skip to content

Commit 542e674

Browse files
committed
certsync+installer: Write object dirs atomically
Use atomicdir.Sync to write target secret/configmap directories to be synchronized with the relevant objects. Added unit tests, but the coverage is not complete. Particularly filesystem operations failing are not being tested.
1 parent 235f9f1 commit 542e674

File tree

9 files changed

+1129
-235
lines changed

9 files changed

+1129
-235
lines changed

pkg/operator/staticpod/certsyncpod/certsync_controller.go

Lines changed: 77 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@ package certsyncpod
22

33
import (
44
"context"
5+
"fmt"
56
"os"
67
"path/filepath"
78
"reflect"
9+
"strings"
810

11+
"github.com/openshift/library-go/pkg/operator/staticpod/internal/atomicdir/types"
912
apierrors "k8s.io/apimachinery/pkg/api/errors"
1013
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1114
utilerrors "k8s.io/apimachinery/pkg/util/errors"
@@ -17,17 +20,19 @@ import (
1720

1821
"github.com/openshift/library-go/pkg/controller/factory"
1922
"github.com/openshift/library-go/pkg/operator/events"
20-
"github.com/openshift/library-go/pkg/operator/staticpod"
2123
"github.com/openshift/library-go/pkg/operator/staticpod/controller/installer"
24+
"github.com/openshift/library-go/pkg/operator/staticpod/internal/atomicdir"
2225
)
2326

27+
const stagingDirUID = "cert-sync"
28+
2429
type CertSyncController struct {
2530
destinationDir string
2631
namespace string
2732
configMaps []installer.UnrevisionedResource
2833
secrets []installer.UnrevisionedResource
2934

30-
configmapGetter corev1interface.ConfigMapInterface
35+
configMapGetter corev1interface.ConfigMapInterface
3136
configMapLister v1.ConfigMapLister
3237
secretGetter corev1interface.SecretInterface
3338
secretLister v1.SecretLister
@@ -42,10 +47,10 @@ func NewCertSyncController(targetDir, targetNamespace string, configmaps, secret
4247
secrets: secrets,
4348
eventRecorder: eventRecorder.WithComponentSuffix("cert-sync-controller"),
4449

45-
configmapGetter: kubeClient.CoreV1().ConfigMaps(targetNamespace),
50+
configMapGetter: kubeClient.CoreV1().ConfigMaps(targetNamespace),
4651
configMapLister: informers.Core().V1().ConfigMaps().Lister(),
47-
secretLister: informers.Core().V1().Secrets().Lister(),
4852
secretGetter: kubeClient.CoreV1().Secrets(targetNamespace),
53+
secretLister: informers.Core().V1().Secrets().Lister(),
4954
}
5055

5156
return factory.New().
@@ -60,15 +65,12 @@ func NewCertSyncController(targetDir, targetNamespace string, configmaps, secret
6065
)
6166
}
6267

63-
func getConfigMapDir(targetDir, configMapName string) string {
64-
return filepath.Join(targetDir, "configmaps", configMapName)
65-
}
66-
67-
func getSecretDir(targetDir, secretName string) string {
68-
return filepath.Join(targetDir, "secrets", secretName)
69-
}
70-
7168
func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
69+
if err := os.RemoveAll(getStagingDir(c.destinationDir)); err != nil {
70+
c.eventRecorder.Warningf("CertificateUpdateFailed", fmt.Sprintf("Failed to prune staging directory: %v", err))
71+
return err
72+
}
73+
7274
errors := []error{}
7375

7476
klog.Infof("Syncing configmaps: %v", c.configMaps)
@@ -80,15 +82,15 @@ func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncConte
8082
continue
8183

8284
case apierrors.IsNotFound(err) && cm.Optional:
83-
configMapFile := getConfigMapDir(c.destinationDir, cm.Name)
85+
configMapFile := getConfigMapTargetDir(c.destinationDir, cm.Name)
8486
if _, err := os.Stat(configMapFile); os.IsNotExist(err) {
8587
// if the configmap file does not exist, there is no work to do, so skip making any live check and just return.
8688
// if the configmap actually exists in the API, we'll eventually see it on the watch.
8789
continue
8890
}
8991

9092
// Check with the live call it is really missing
91-
configMap, err = c.configmapGetter.Get(ctx, cm.Name, metav1.GetOptions{})
93+
configMap, err = c.configMapGetter.Get(ctx, cm.Name, metav1.GetOptions{})
9294
if err == nil {
9395
klog.Infof("Caches are stale. They don't see configmap '%s/%s', yet it is present", configMap.Namespace, configMap.Name)
9496
// We will get re-queued when we observe the change
@@ -113,9 +115,10 @@ func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncConte
113115
continue
114116
}
115117

116-
contentDir := getConfigMapDir(c.destinationDir, cm.Name)
118+
contentDir := getConfigMapTargetDir(c.destinationDir, cm.Name)
119+
stagingDir := getConfigMapStagingDir(c.destinationDir, cm.Name)
117120

118-
data := map[string]string{}
121+
data := make(map[string]string, len(configMap.Data))
119122
for filename := range configMap.Data {
120123
fullFilename := filepath.Join(contentDir, filename)
121124

@@ -138,7 +141,7 @@ func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncConte
138141
klog.V(2).Infof("Syncing updated configmap '%s/%s'.", configMap.Namespace, configMap.Name)
139142

140143
// We need to do a live get here so we don't overwrite a newer file with one from a stale cache
141-
configMap, err = c.configmapGetter.Get(ctx, configMap.Name, metav1.GetOptions{})
144+
configMap, err = c.configMapGetter.Get(ctx, configMap.Name, metav1.GetOptions{})
142145
if err != nil {
143146
// Even if the error is not exists we will act on it when caches catch up
144147
c.eventRecorder.Warningf("CertificateUpdateFailed", "Failed getting configmap: %s/%s: %v", c.namespace, cm.Name, err)
@@ -152,27 +155,11 @@ func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncConte
152155
continue
153156
}
154157

155-
klog.Infof("Creating directory %q ...", contentDir)
156-
if err := os.MkdirAll(contentDir, 0755); err != nil && !os.IsExist(err) {
157-
c.eventRecorder.Warningf("CertificateUpdateFailed", "Failed creating directory for configmap: %s/%s: %v", configMap.Namespace, configMap.Name, err)
158-
errors = append(errors, err)
159-
continue
158+
files := make(map[string][]byte, len(configMap.Data))
159+
for k, v := range configMap.Data {
160+
files[k] = []byte(v)
160161
}
161-
for filename, content := range configMap.Data {
162-
fullFilename := filepath.Join(contentDir, filename)
163-
// if the existing is the same, do nothing
164-
if reflect.DeepEqual(data[fullFilename], content) {
165-
continue
166-
}
167-
168-
klog.Infof("Writing configmap manifest %q ...", fullFilename)
169-
if err := staticpod.WriteFileAtomic([]byte(content), 0644, fullFilename); err != nil {
170-
c.eventRecorder.Warningf("CertificateUpdateFailed", "Failed writing file for configmap: %s/%s: %v", configMap.Namespace, configMap.Name, err)
171-
errors = append(errors, err)
172-
continue
173-
}
174-
}
175-
c.eventRecorder.Eventf("CertificateUpdated", "Wrote updated configmap: %s/%s", configMap.Namespace, configMap.Name)
162+
errors = append(errors, syncDirectory(c.eventRecorder, "configmap", configMap.ObjectMeta, contentDir, 0755, stagingDir, files, 0644))
176163
}
177164

178165
klog.Infof("Syncing secrets: %v", c.secrets)
@@ -184,7 +171,7 @@ func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncConte
184171
continue
185172

186173
case apierrors.IsNotFound(err) && s.Optional:
187-
secretFile := getSecretDir(c.destinationDir, s.Name)
174+
secretFile := getSecretTargetDir(c.destinationDir, s.Name)
188175
if _, err := os.Stat(secretFile); os.IsNotExist(err) {
189176
// if the secret file does not exist, there is no work to do, so skip making any live check and just return.
190177
// if the secret actually exists in the API, we'll eventually see it on the watch.
@@ -218,9 +205,10 @@ func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncConte
218205
continue
219206
}
220207

221-
contentDir := getSecretDir(c.destinationDir, s.Name)
208+
contentDir := getSecretTargetDir(c.destinationDir, s.Name)
209+
stagingDir := getSecretStagingDir(c.destinationDir, s.Name)
222210

223-
data := map[string][]byte{}
211+
data := make(map[string][]byte, len(secret.Data))
224212
for filename := range secret.Data {
225213
fullFilename := filepath.Join(contentDir, filename)
226214

@@ -257,29 +245,57 @@ func (c *CertSyncController) sync(ctx context.Context, syncCtx factory.SyncConte
257245
continue
258246
}
259247

260-
klog.Infof("Creating directory %q ...", contentDir)
261-
if err := os.MkdirAll(contentDir, 0755); err != nil && !os.IsExist(err) {
262-
c.eventRecorder.Warningf("CertificateUpdateFailed", "Failed creating directory for secret: %s/%s: %v", secret.Namespace, secret.Name, err)
263-
errors = append(errors, err)
264-
continue
265-
}
266-
for filename, content := range secret.Data {
267-
// TODO fix permissions
268-
fullFilename := filepath.Join(contentDir, filename)
269-
// if the existing is the same, do nothing
270-
if reflect.DeepEqual(data[fullFilename], content) {
271-
continue
272-
}
248+
errors = append(errors, syncDirectory(c.eventRecorder, "secret", secret.ObjectMeta, contentDir, 0755, stagingDir, secret.Data, 0600))
249+
}
250+
return utilerrors.NewAggregate(errors)
251+
}
273252

274-
klog.Infof("Writing secret manifest %q ...", fullFilename)
275-
if err := staticpod.WriteFileAtomic(content, 0600, fullFilename); err != nil {
276-
c.eventRecorder.Warningf("CertificateUpdateFailed", "Failed writing file for secret: %s/%s: %v", secret.Namespace, secret.Name, err)
277-
errors = append(errors, err)
278-
continue
279-
}
253+
func syncDirectory(
254+
eventRecorder events.Recorder,
255+
typeName string, o metav1.ObjectMeta,
256+
targetDir string, targetDirPerm os.FileMode, stagingDir string,
257+
fileContents map[string][]byte, defaultFilePerm os.FileMode,
258+
) error {
259+
files := make(map[string]types.File, len(fileContents))
260+
for filename, content := range fileContents {
261+
files[filename] = types.File{
262+
Content: content,
263+
Perm: getFilePermissions(filename, defaultFilePerm),
280264
}
281-
c.eventRecorder.Eventf("CertificateUpdated", "Wrote updated secret: %s/%s", secret.Namespace, secret.Name)
282265
}
283266

284-
return utilerrors.NewAggregate(errors)
267+
if err := atomicdir.Sync(targetDir, targetDirPerm, stagingDir, files); err != nil {
268+
err = fmt.Errorf("failed to sync %s %s/%s (directory %q): %w", typeName, o.Name, o.Namespace, targetDir, err)
269+
eventRecorder.Warning("CertificateUpdateFailed", err.Error())
270+
return err
271+
}
272+
eventRecorder.Eventf("CertificateUpdated", "Wrote updated %s: %s/%s", typeName, o.Namespace, o.Name)
273+
return nil
274+
}
275+
276+
func getStagingDir(targetDir string) string {
277+
return filepath.Join(targetDir, "staging", stagingDirUID)
278+
}
279+
280+
func getConfigMapTargetDir(targetDir, configMapName string) string {
281+
return filepath.Join(targetDir, "configmaps", configMapName)
282+
}
283+
284+
func getConfigMapStagingDir(targetDir, secretName string) string {
285+
return filepath.Join(getStagingDir(targetDir), "configmaps", secretName)
286+
}
287+
288+
func getSecretTargetDir(targetDir, secretName string) string {
289+
return filepath.Join(targetDir, "secrets", secretName)
290+
}
291+
292+
func getSecretStagingDir(targetDir, secretName string) string {
293+
return filepath.Join(getStagingDir(targetDir), "secrets", secretName)
294+
}
295+
296+
func getFilePermissions(filename string, defaults os.FileMode) os.FileMode {
297+
if strings.HasSuffix(filename, ".sh") {
298+
defaults |= 0111
299+
}
300+
return defaults
285301
}
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
//go:build linux
2+
3+
package certsyncpod
4+
5+
import (
6+
"bytes"
7+
"context"
8+
"crypto/ecdsa"
9+
"crypto/elliptic"
10+
"crypto/rand"
11+
"crypto/x509"
12+
"crypto/x509/pkix"
13+
"encoding/pem"
14+
"math/big"
15+
"os"
16+
"path/filepath"
17+
"sync"
18+
"testing"
19+
"time"
20+
21+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
22+
"k8s.io/apimachinery/pkg/util/wait"
23+
"k8s.io/apiserver/pkg/server/dynamiccertificates"
24+
25+
"github.com/openshift/library-go/pkg/operator/staticpod/internal/atomicdir"
26+
"github.com/openshift/library-go/pkg/operator/staticpod/internal/atomicdir/types"
27+
)
28+
29+
// TestDynamicCertificates makes sure the receiving side of certificate synchronization works as expected.
30+
// It reads and watches the certificates being synchronized in the same way as e.g. kube-apiserver,
31+
// the very same libraries are being used.
32+
func TestDynamicCertificates(t *testing.T) {
33+
const typeName = "secret"
34+
om := metav1.ObjectMeta{
35+
Namespace: "openshift-kube-apiserver",
36+
Name: "s1",
37+
}
38+
39+
// Generate all necessary keypairs.
40+
tlsCert, tlsKey := generateKeypair(t)
41+
tlsCertUpdated, tlsKeyUpdated := generateKeypair(t)
42+
43+
// Write the keypair into a secret directory.
44+
secretDir := filepath.Join(t.TempDir(), "secrets", om.Name)
45+
stagingDir := filepath.Join(t.TempDir(), "staging", stagingDirUID, "secrets", om.Name)
46+
certFile := filepath.Join(secretDir, "tls.crt")
47+
keyFile := filepath.Join(secretDir, "tls.key")
48+
49+
if err := os.MkdirAll(secretDir, 0700); err != nil {
50+
t.Fatalf("Failed to create secret directory %q: %v", secretDir, err)
51+
}
52+
if err := os.WriteFile(certFile, tlsCert, 0600); err != nil {
53+
t.Fatalf("Failed to write TLS certificate into %q: %v", certFile, err)
54+
}
55+
if err := os.WriteFile(keyFile, tlsKey, 0600); err != nil {
56+
t.Fatalf("Failed to write TLS key into %q: %v", keyFile, err)
57+
}
58+
59+
// Start the watcher.
60+
// This reads the keypair synchronously so the initial state is loaded here.
61+
dc, err := dynamiccertificates.NewDynamicServingContentFromFiles("localhost TLS", certFile, keyFile)
62+
if err != nil {
63+
t.Fatalf("Failed to init dynamic certificate: %v", err)
64+
}
65+
66+
// Check the initial keypair is loaded.
67+
cert, key := dc.CurrentCertKeyContent()
68+
if !bytes.Equal(cert, tlsCert) || !bytes.Equal(key, tlsKey) {
69+
t.Fatal("Unexpected initial keypair loaded")
70+
}
71+
72+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
73+
var wg sync.WaitGroup
74+
wg.Add(1)
75+
go func() {
76+
defer wg.Done()
77+
dc.Run(ctx, 1)
78+
}()
79+
defer wg.Wait()
80+
defer cancel()
81+
82+
// Poll until update detected.
83+
files := map[string]types.File{
84+
"tls.crt": {Content: tlsCertUpdated, Perm: 0600},
85+
"tls.key": {Content: tlsKeyUpdated, Perm: 0600},
86+
}
87+
err = wait.PollUntilContextCancel(ctx, 250*time.Millisecond, true, func(ctx context.Context) (bool, error) {
88+
// Replace the secret directory.
89+
if err := atomicdir.Sync(secretDir, 0700, stagingDir, files); err != nil {
90+
t.Errorf("Failed to write files: %v", err)
91+
return false, err
92+
}
93+
94+
// Check the loaded content matches.
95+
// This is most probably updated based on write in a previous Poll invocation.
96+
cert, key := dc.CurrentCertKeyContent()
97+
return bytes.Equal(cert, tlsCertUpdated) && bytes.Equal(key, tlsKeyUpdated), nil
98+
})
99+
if err != nil {
100+
t.Fatalf("Failed to wait for dynamic certificate: %v", err)
101+
}
102+
}
103+
104+
// generateKeypair returns (cert, key).
105+
func generateKeypair(t *testing.T) ([]byte, []byte) {
106+
t.Helper()
107+
108+
privateKey, err := ecdsa.GenerateKey(elliptic.P224(), rand.Reader)
109+
if err != nil {
110+
t.Fatalf("Failed to generate TLS key: %v", err)
111+
}
112+
113+
notBefore := time.Now()
114+
notAfter := notBefore.Add(1 * time.Hour)
115+
116+
serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128)
117+
serialNumber, err := rand.Int(rand.Reader, serialNumberLimit)
118+
if err != nil {
119+
t.Fatalf("Failed to generate serial number for TLS keypair: %v", err)
120+
}
121+
122+
template := x509.Certificate{
123+
SerialNumber: serialNumber,
124+
Subject: pkix.Name{
125+
Organization: []string{"Example Org"},
126+
},
127+
NotBefore: notBefore,
128+
NotAfter: notAfter,
129+
KeyUsage: x509.KeyUsageDigitalSignature,
130+
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
131+
BasicConstraintsValid: true,
132+
DNSNames: []string{"example.com"},
133+
}
134+
135+
publicKeyBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &privateKey.PublicKey, privateKey)
136+
if err != nil {
137+
t.Fatalf("Failed to create TLS certificate: %v", err)
138+
}
139+
140+
var certOut bytes.Buffer
141+
if err := pem.Encode(&certOut, &pem.Block{Type: "CERTIFICATE", Bytes: publicKeyBytes}); err != nil {
142+
t.Fatalf("Failed to write certificate PEM: %v", err)
143+
}
144+
145+
privateKeyBytes, err := x509.MarshalPKCS8PrivateKey(privateKey)
146+
if err != nil {
147+
t.Fatalf("Unable to marshal private key: %v", err)
148+
}
149+
150+
var keyOut bytes.Buffer
151+
if err := pem.Encode(&keyOut, &pem.Block{Type: "PRIVATE KEY", Bytes: privateKeyBytes}); err != nil {
152+
t.Fatalf("Failed to write certificate PEM: %v", err)
153+
}
154+
155+
return certOut.Bytes(), keyOut.Bytes()
156+
}

0 commit comments

Comments
 (0)