Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions internal/controller/vrg_volgrouprep.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,42 @@ func (v *VRGInstance) isVGRandVGRCArchivedAlready(vgr *volrep.VolumeGroupReplica
return false
}

// Both annotations are present and correct. Now check if S3 data actually exists.
// If S3 data is missing (e.g., due to S3 profile change), we should return false to
// trigger re-upload.
if !v.isVGRAndVGRCDataInS3(vgr, log) {
log.Info("VGR/VGRC data not found in S3 stores, will trigger re-upload", "vgr", vgr.Name)

return false
}

return true
}

// isVGRAndVGRCDataInS3 checks if VGR and VGRC data exists in any of the configured S3 stores.
// This is used to detect when S3 profile has changed and data needs to be re-uploaded.
// Returns true only if data is found in at least one S3 store.
func (v *VRGInstance) isVGRAndVGRCDataInS3(vgr *volrep.VolumeGroupReplication, log logr.Logger) bool {
vgrc, err := v.getVGRCFromVGR(vgr)
if err != nil {
log.Error(err, "Failed to get VGRC for checking S3 data existence")

return false
}

keyPrefix := v.s3KeyPrefix()
vgrNamespacedName := types.NamespacedName{Namespace: vgr.Namespace, Name: vgr.Name}
vgrNamespacedNameString := vgrNamespacedName.String()

// Keys that should exist if data has been uploaded
expectedKeys := []string{
TypedObjectKey(keyPrefix, vgrc.Name, volrep.VolumeGroupReplicationContent{}),
TypedObjectKey(keyPrefix, vgrNamespacedNameString, volrep.VolumeGroupReplication{}),
}

return v.isDataInS3Stores(keyPrefix, expectedKeys, log, "VGR/VGRC")
}

// Upload VGRCs and VGRs to the list of S3 stores in the VRG spec
func (v *VRGInstance) uploadVGRandVGRCtoS3Stores(vrNamespacedName types.NamespacedName, log logr.Logger) error {
vgr := &volrep.VolumeGroupReplication{}
Expand Down
93 changes: 93 additions & 0 deletions internal/controller/vrg_volrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,9 +631,102 @@ func (v *VRGInstance) isArchivedAlready(pvc *corev1.PersistentVolumeClaim, log l
return false
}

// Both annotations are present and correct. Now check if S3 data actually exists.
// If S3 data is missing (e.g., due to S3 profile change), we should return false to
// trigger re-upload.
if !v.isPVAndPVCDataInS3(pvc, log) {
log.Info("PV/PVC data not found in S3 stores, will trigger re-upload", "pvc", pvc.Name)

return false
}

return true
}

// isDataInS3Stores checks if any of the expected keys exist in any configured S3 stores.
// This is a generic helper for detecting S3 data existence across profiles.
// Returns true if any expected key is found in any S3 store.
func (v *VRGInstance) isDataInS3Stores(keyPrefix string, expectedKeys []string, log logr.Logger, dataType string) bool {
// Check if data exists in any of the S3 profiles
for _, s3ProfileName := range v.instance.Spec.S3Profiles {
if s3ProfileName == NoS3StoreAvailable {
continue
}

if v.checkDataInProfile(s3ProfileName, keyPrefix, expectedKeys, log, dataType) {
return true
}
}

log.Info("No "+dataType+" data found in any S3 profile for VRG", "vrgNamespacedName", v.namespacedName)

return false
}

// checkDataInProfile checks if any expected key exists in a specific S3 profile.
// Returns true if any expected key is found, false otherwise.
func (v *VRGInstance) checkDataInProfile(
s3ProfileName string,
keyPrefix string,
expectedKeys []string,
log logr.Logger,
dataType string,
) bool {
objectStore, err := v.getObjectStorer(s3ProfileName)
if err != nil {
log.Info("Failed to get object store for S3 profile, will check next profile",
"profile", s3ProfileName, "error", err)

return false
}

// List keys in this store to see if any of the expected keys exist
keys, err := objectStore.ListKeys(keyPrefix)
if err != nil {
log.Info("Failed to list keys in S3 profile, will check next profile",
"profile", s3ProfileName, "error", err)

return false
}

// Check if any of the expected keys exist in this profile
for _, expectedKey := range expectedKeys {
for _, existingKey := range keys {
if existingKey == expectedKey {
log.Info("Found "+dataType+" data in S3 profile", "profile", s3ProfileName, "key", expectedKey)

return true
}
}
}

return false
}

// isPVAndPVCDataInS3 checks if PV and PVC data exists in any of the configured S3 stores.
// This is used to detect when S3 profile has changed and data needs to be re-uploaded.
// Returns true only if data is found in at least one S3 store.
func (v *VRGInstance) isPVAndPVCDataInS3(pvc *corev1.PersistentVolumeClaim, log logr.Logger) bool {
pv, err := v.getPVFromPVC(pvc)
if err != nil {
log.Error(err, "Failed to get PV for checking S3 data existence")

return false
}

keyPrefix := v.s3KeyPrefix()
pvcNamespacedName := types.NamespacedName{Namespace: pvc.Namespace, Name: pvc.Name}
pvcNamespacedNameString := pvcNamespacedName.String()

// Keys that should exist if data has been uploaded
expectedKeys := []string{
TypedObjectKey(keyPrefix, pv.Name, corev1.PersistentVolume{}),
TypedObjectKey(keyPrefix, pvcNamespacedNameString, corev1.PersistentVolumeClaim{}),
}

return v.isDataInS3Stores(keyPrefix, expectedKeys, log, "PV/PVC")
}

func (v *VRGInstance) isPVCResizeCompleted(pvc *corev1.PersistentVolumeClaim) bool {
requested := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
actual := pvc.Status.Capacity[corev1.ResourceStorage]
Expand Down
204 changes: 204 additions & 0 deletions internal/controller/vrg_volrep_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3449,3 +3449,207 @@ func genVGRCLabels(replicationID, storageID, protectionKey string) map[string]st

return vrcLabel
}

var _ = Describe("VRG S3 Profile Change Detection - isPVAndPVCDataInS3", func() {
var fakeStorer *fakeObjectStorer

BeforeEach(func() {
fakeStorer = &fakeObjectStorer{
name: "test-storer",
bucketName: "test-bucket",
objects: make(map[string]interface{}),
}
})

Context("FakeObjectStorer ListKeys", func() {
When("PV/PVC data exists in S3", func() {
It("should find keys matching the prefix", func() {
s3KeyPrefix := "test-ns/test-vrg"
pvKey := s3KeyPrefix + "/PersistentVolume.test-pv"
pvcKey := s3KeyPrefix + "/PersistentVolumeClaim.test-ns.test-pvc"

// Store dummy data
fakeStorer.objects[pvKey] = "dummy-pv"
fakeStorer.objects[pvcKey] = "dummy-pvc"

keys, err := fakeStorer.ListKeys(s3KeyPrefix)
Expect(err).NotTo(HaveOccurred())
Expect(keys).To(ContainElement(pvKey))
Expect(keys).To(ContainElement(pvcKey))
Expect(keys).To(HaveLen(2))
})
})

When("no S3 data exists under prefix", func() {
It("should return empty list", func() {
s3KeyPrefix := "test-ns/test-vrg"

keys, err := fakeStorer.ListKeys(s3KeyPrefix)
Expect(err).NotTo(HaveOccurred())
Expect(keys).To(BeEmpty())
})
})

When("multiple prefixes exist but only matching prefix requested", func() {
It("should only return keys for requested prefix", func() {
prefix1 := "ns1/vrg1"
prefix2 := "ns2/vrg2"

// Store in different prefixes
fakeStorer.objects[prefix1+"/pv1"] = "data1"
fakeStorer.objects[prefix2+"/pv2"] = "data2"

// List only prefix1
keys, err := fakeStorer.ListKeys(prefix1)
Expect(err).NotTo(HaveOccurred())
Expect(keys).To(ConsistOf(prefix1 + "/pv1"))
Expect(keys).NotTo(ContainElement(prefix2 + "/pv2"))
})
})
})

Context("S3 Profile Change Detection - Scenario", func() {
It("should detect when data missing after profile change", func() {
s3KeyPrefix := "test-ns/test-vrg"
pvKey := s3KeyPrefix + "/PersistentVolume.test-pv"
pvcKey := s3KeyPrefix + "/PersistentVolumeClaim.test-ns.test-pvc"

// Step 1: Data exists in original S3 profile
fakeStorer.objects[pvKey] = "pv-data"
fakeStorer.objects[pvcKey] = "pvc-data"

keys, err := fakeStorer.ListKeys(s3KeyPrefix)
Expect(err).NotTo(HaveOccurred())
Expect(keys).To(HaveLen(2))

// Step 2: User changes S3 profile in config map
// New objectStorer is created (empty)
newStorer := &fakeObjectStorer{
name: "new-storer",
bucketName: "new-bucket",
objects: make(map[string]interface{}),
}

// Step 3: Check if data exists in new profile
keys, err = newStorer.ListKeys(s3KeyPrefix)
Expect(err).NotTo(HaveOccurred())
Expect(keys).To(BeEmpty()) // Data missing in new profile

// This missing data detection would trigger isPVAndPVCDataInS3()
// to return false, causing isArchivedAlready() to return false,
// which triggers re-upload of PV/PVC on next reconcile
})
})
})

var _ = Describe("VRG S3 Profile Change Detection - isVGRAndVGRCDataInS3", func() {
var fakeStorer *fakeObjectStorer

BeforeEach(func() {
fakeStorer = &fakeObjectStorer{
name: "test-storer",
bucketName: "test-bucket",
objects: make(map[string]interface{}),
}
})

Context("VGR/VGRC data existence checks", func() {
When("VGR and VGRC data exists in S3", func() {
It("should find keys matching the VGR/VGRC prefix", func() {
s3KeyPrefix := "test-ns/test-vrg"
vgrKey := s3KeyPrefix + "/VolumeGroupReplication.test-vgr"
vgrcKey := s3KeyPrefix + "/VolumeGroupReplicationContent.test-vgrc"

// Store VGR/VGRC data
fakeStorer.objects[vgrKey] = "dummy-vgr"
fakeStorer.objects[vgrcKey] = "dummy-vgrc"

keys, err := fakeStorer.ListKeys(s3KeyPrefix)
Expect(err).NotTo(HaveOccurred())
Expect(keys).To(ContainElement(vgrKey))
Expect(keys).To(ContainElement(vgrcKey))
Expect(keys).To(HaveLen(2))
})
})

When("VGR/VGRC data does not exist in S3", func() {
It("should return empty list", func() {
s3KeyPrefix := "test-ns/test-vrg"

keys, err := fakeStorer.ListKeys(s3KeyPrefix)
Expect(err).NotTo(HaveOccurred())
Expect(keys).To(BeEmpty())
})
})

When("mixed PV/PVC and VGR/VGRC data exists", func() {
It("should retrieve all objects under the prefix", func() {
s3KeyPrefix := "test-ns/test-vrg"

// Store mixed data
pvKey := s3KeyPrefix + "/PersistentVolume.test-pv"
pvcKey := s3KeyPrefix + "/PersistentVolumeClaim.test-ns.test-pvc"
vgrKey := s3KeyPrefix + "/VolumeGroupReplication.test-vgr"
vgrcKey := s3KeyPrefix + "/VolumeGroupReplicationContent.test-vgrc"

fakeStorer.objects[pvKey] = "pv-data"
fakeStorer.objects[pvcKey] = "pvc-data"
fakeStorer.objects[vgrKey] = "vgr-data"
fakeStorer.objects[vgrcKey] = "vgrc-data"

keys, err := fakeStorer.ListKeys(s3KeyPrefix)
Expect(err).NotTo(HaveOccurred())
Expect(keys).To(HaveLen(4))
Expect(keys).To(ContainElement(pvKey))
Expect(keys).To(ContainElement(pvcKey))
Expect(keys).To(ContainElement(vgrKey))
Expect(keys).To(ContainElement(vgrcKey))
})
})
})

Context("VGR/VGRC S3 Profile Change Detection", func() {
It("should detect when VGR/VGRC data missing after profile change", func() {
s3KeyPrefix := "test-ns/test-vrg"
vgrKey := s3KeyPrefix + "/VolumeGroupReplication.test-vgr"
vgrcKey := s3KeyPrefix + "/VolumeGroupReplicationContent.test-vgrc"

// Step 1: Data exists in original S3 profile
fakeStorer.objects[vgrKey] = "vgr-data"
fakeStorer.objects[vgrcKey] = "vgrc-data"

keys, err := fakeStorer.ListKeys(s3KeyPrefix)
Expect(err).NotTo(HaveOccurred())
Expect(keys).To(HaveLen(2))

// Step 2: User changes S3 profile in config map
newStorer := &fakeObjectStorer{
name: "new-storer",
bucketName: "new-bucket",
objects: make(map[string]interface{}),
}

// Step 3: Check if data exists in new profile
keys, err = newStorer.ListKeys(s3KeyPrefix)
Expect(err).NotTo(HaveOccurred())
Expect(keys).To(BeEmpty()) // Data missing in new profile

// This would trigger isVGRAndVGRCDataInS3() to return false,
// causing isVGRandVGRCArchivedAlready() to return false,
// which triggers re-upload of VGR/VGRC on next reconcile
})

It("should handle partial data (one component missing)", func() {
s3KeyPrefix := "test-ns/test-vrg"
vgrKey := s3KeyPrefix + "/VolumeGroupReplication.test-vgr"
// Only VGR exists, VGRC is missing

fakeStorer.objects[vgrKey] = "vgr-data"

keys, err := fakeStorer.ListKeys(s3KeyPrefix)
Expect(err).NotTo(HaveOccurred())
Expect(keys).To(HaveLen(1))
Expect(keys).To(ContainElement(vgrKey))
})
})
})