diff --git a/component/s3storage/client.go b/component/s3storage/client.go index dc56789d9..66693b18b 100644 --- a/component/s3storage/client.go +++ b/component/s3storage/client.go @@ -449,85 +449,23 @@ func (cl *Client) DeleteFile(name string) error { return nil } -// DeleteDirectory : Recursively delete all objects with the given prefix. +// DeleteDirectory : Delete the directory marker in the container // If name is given without a trailing slash, a slash will be added. -// If the directory does not exist, no error will be returned. +// If the directory marker does not exist, no error will be returned. +// Deletion is carried out regardless of the enableDirMarker flag func (cl *Client) DeleteDirectory(name string) error { log.Trace("Client::DeleteDirectory : name %s", name) + // Delete the current directory // make sure name has a trailing slash name = internal.ExtendDirName(name) - - done := false - var marker *string - var err error - for !done { - // list all objects with the prefix - objects, marker, err := cl.List(name, marker, 0) - if err != nil { - log.Warn( - "Client::DeleteDirectory : Failed to list object with prefix %s. Here's why: %v", - name, - err, - ) - return err - } - - // we have no way of indicating empty folders in the bucket - // so if there are no objects with this prefix we can either: - // 1. return an error when the user tries to delete an empty directory, or - // 2. fail to return an error when trying to delete a non-existent directory - // the second one seems much less risky, so we don't check for an empty list here - - // List only returns the objects and prefixes up to the next "/" character after the prefix - // This is because List is setting the Delimiter field to "/" - // This means that recursive directory deletion actually needs to be recursive. - // Delete all found objects *and prefixes ("directories")*. - // For improved performance, we'll use one call to delete all objects in this directory. - // To make one call, we need to make a list of objects to delete first. - var objectsToDelete []*internal.ObjAttr - for _, object := range objects { - if object.IsDir() { - err = cl.DeleteDirectory(object.Path) - if err != nil { - log.Err( - "Client::DeleteDirectory : Failed to delete directory %s. Here's why: %v", - object.Path, - err, - ) - } - } else { - objectsToDelete = append( - objectsToDelete, - object, - ) //consider just object instead of object.path to pass down attributes that come from list. - } - } - // Delete the collected files - err = cl.deleteObjects(objectsToDelete) - if err != nil { - log.Err( - "Client::DeleteDirectory : deleteObjects() failed when called with %d objects. Here's why: %v", - len(objectsToDelete), - err, - ) - } - - if marker == nil { - done = true - } - } - - // Delete the current directory - if cl.Config.enableDirMarker { - err = cl.deleteObject(name, false, true) - if err != nil { - log.Err( - "Client::DeleteDirectory : Failed to delete directory %s. Here's why: %v", - name, - err, - ) - } + err := cl.deleteObject(name, false, true) + if err != nil { + log.Err( + "Client::DeleteDirectory : Failed to delete directory %s. Here's why: %v", + name, + err, + ) } return err @@ -667,23 +605,8 @@ func (cl *Client) getDirectoryAttr( ) (*internal.ObjAttr, error) { log.Trace("Client::getDirectoryAttr : name %s", dirName) - objects, _, listErr := cl.List(dirName, nil, 1) - - // Otherwise, the cloud does not support directory markers, or there is no - // marker, so look for an object in the directory. - if listErr != nil { - log.Err("Client::getDirectoryAttr : List(%s) failed. Here's why: %v", dirName, listErr) - return nil, listErr - } - if len(objects) > 0 { - // create and return an objAttr for the directory - attr := internal.CreateObjAttrDir(dirName) - return attr, nil - } - - // Only check for explicit empty directory markers when needed. - // For file-like names, this saves one extra HeadObject - // call on miss-heavy paths that are not directories. + // When directory markers are enabled, check for the marker first via + // HeadObject (cheap, single-key lookup) before falling back to a List. if cl.Config.enableDirMarker && shouldProbeDirMarker(dirName, explicitDirLookup) { headAttr, headErr := cl.headObject(dirName, false, true) if headErr == nil { @@ -699,6 +622,20 @@ func (cl *Client) getDirectoryAttr( } } + // Either directory markers are disabled, there is no marker, or the name + // was skipped by shouldProbeDirMarker. Fall back to listing objects under + // the prefix to detect a non-empty directory. + objects, _, listErr := cl.List(dirName, nil, 1) + if listErr != nil { + log.Err("Client::getDirectoryAttr : List(%s) failed. Here's why: %v", dirName, listErr) + return nil, listErr + } + if len(objects) > 0 { + // create and return an objAttr for the directory + attr := internal.CreateObjAttrDir(dirName) + return attr, nil + } + // directory not found in bucket log.Err("Client::getDirectoryAttr : not found: %s", dirName) return nil, syscall.ENOENT diff --git a/component/s3storage/client_test.go b/component/s3storage/client_test.go index 97f46b308..dea2cb9b9 100644 --- a/component/s3storage/client_test.go +++ b/component/s3storage/client_test.go @@ -826,13 +826,13 @@ func (s *clientTestSuite) TestDeleteDirectory() { err = s.client.DeleteDirectory(dirName) s.assert.NoError(err) - // file in directory should no longer be there + // file in directory should still be there (only marker is deleted, not contents) _, err = s.awsS3Client.GetObject(context.Background(), &s3.GetObjectInput{ Bucket: aws.String(s.client.Config.AuthConfig.BucketName), Key: aws.String(path.Join(dirName, fileName)), ChecksumMode: types.ChecksumModeEnabled, }) - s.assert.Error(err) + s.assert.NoError(err) } func (s *clientTestSuite) TestRenameFile() { defer s.cleanupTest() diff --git a/component/s3storage/s3storage_test.go b/component/s3storage/s3storage_test.go index 65eb1b624..73a976cfe 100644 --- a/component/s3storage/s3storage_test.go +++ b/component/s3storage/s3storage_test.go @@ -499,7 +499,7 @@ func (s *s3StorageTestSuite) TestDeleteDir() { ) s.assert.NoError(err) - // Directory should be in the account + // Directory marker should be in the account key := internal.ExtendDirName( common.JoinUnixFilepath(s.s3Storage.stConfig.prefixPath, obj_path), ) @@ -515,7 +515,7 @@ func (s *s3StorageTestSuite) TestDeleteDir() { s.assert.NoError(err) s.assert.NoError(err) - // Directory should not be in the account + // Directory marker should not be in the account key = internal.ExtendDirName( common.JoinUnixFilepath(s.s3Storage.stConfig.prefixPath, obj_path), ) @@ -528,9 +528,9 @@ func (s *s3StorageTestSuite) TestDeleteDir() { }) s.assert.Error(err) - // Directory be empty + // Directory is not empty (file still exists) dirEmpty := s.s3Storage.IsDirEmpty(internal.IsDirEmptyOptions{Name: obj_path}) - s.assert.True(dirEmpty) + s.assert.False(dirEmpty) }) } } @@ -626,13 +626,24 @@ func (s *s3StorageTestSuite) TestDeleteDirHierarchy() { s.assert.NoError(err) - /// a paths should be deleted + // Only the directory marker for base is deleted, all nested paths still exist + // Check that the directory marker is removed + baseMarkerKey := internal.ExtendDirName( + common.JoinUnixFilepath(s.s3Storage.stConfig.prefixPath, base), + ) + _, err = s.awsS3Client.GetObject(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String( + s.s3Storage.Storage.(*Client).Config.AuthConfig.BucketName, + ), + Key: aws.String(baseMarkerKey), + ChecksumMode: types.ChecksumModeEnabled, + }) + s.assert.Error(err) + + // But all nested paths should still exist + a.PushBackList(ab) + a.PushBackList(ac) for p := a.Front(); p != nil; p = p.Next() { - _, err = s.s3Storage.GetAttr(internal.GetAttrOptions{Name: p.Value.(string)}) - s.assert.Error(err) - } - ab.PushBackList(ac) // ab and ac paths should exist - for p := ab.Front(); p != nil; p = p.Next() { _, err = s.s3Storage.GetAttr(internal.GetAttrOptions{Name: p.Value.(string)}) s.assert.NoError(err) } @@ -659,14 +670,11 @@ func (s *s3StorageTestSuite) TestDeleteSubDirPrefixPath() { err = s.s3Storage.Storage.SetPrefixPath(s.s3Storage.stConfig.prefixPath) s.assert.NoError(err) - // a paths under c1 should be deleted + // Only the c1 directory marker is deleted, nested paths under c1 still exist for p := a.Front(); p != nil; p = p.Next() { _, err = s.s3Storage.GetAttr(internal.GetAttrOptions{Name: p.Value.(string)}) - if strings.HasPrefix(p.Value.(string), base+"/c1") { - s.assert.Error(err) - } else { - s.assert.NoError(err) - } + // All nested paths should still exist (marker deletion doesn't affect children) + s.assert.NoError(err) } ab.PushBackList(ac) // ab and ac paths should exist for p := ab.Front(); p != nil; p = p.Next() { diff --git a/component/s3storage/s3wrappers.go b/component/s3storage/s3wrappers.go index 810272815..ff3712b9d 100644 --- a/component/s3storage/s3wrappers.go +++ b/component/s3storage/s3wrappers.go @@ -208,47 +208,6 @@ func (cl *Client) deleteObject(name string, isSymLink bool, isDir bool) error { return parseS3Err(err, attemptedAction) } -// Wrapper for awsS3Client.DeleteObjects. -// names is a list of paths to the objects. -func (cl *Client) deleteObjects(objects []*internal.ObjAttr) error { - if objects == nil { - return nil - } - log.Trace("Client::deleteObjects : deleting %d objects", len(objects)) - // build list to send to DeleteObjects - keyList := make([]types.ObjectIdentifier, len(objects)) - for i, object := range objects { - key := cl.getKey(object.Path, object.IsSymlink(), object.IsDir()) - keyList[i] = types.ObjectIdentifier{ - Key: &key, - } - } - // send keyList for deletion - result, err := cl.AwsS3Client.DeleteObjects(context.Background(), &s3.DeleteObjectsInput{ - Bucket: &cl.Config.AuthConfig.BucketName, - Delete: &types.Delete{ - Objects: keyList, - Quiet: aws.Bool(true), - }, - }) - if err != nil { - log.Err( - "Client::DeleteDirectory : Failed to delete %d files. Here's why: %v", - len(objects), - err, - ) - for i := 0; i < len(result.Errors); i++ { - log.Err( - "Client::DeleteDirectory : Failed to delete key %s. Here's why: %s", - *result.Errors[i].Key, - *result.Errors[i].Message, - ) - } - } - - return err -} - // Wrapper for awsS3Client.HeadObject. // HeadObject() acts just like GetObject, except no contents are returned. // So this is used to get metadata / attributes for an object.