Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 27 additions & 90 deletions component/s3storage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,85 +449,23 @@ func (cl *Client) DeleteFile(name string) error {
return nil
}

// DeleteDirectory : Recursively delete all objects with the given prefix.
// DeleteDirectory : Delete the directory marker in the container
// If name is given without a trailing slash, a slash will be added.
// If the directory does not exist, no error will be returned.
// If the directory marker does not exist, no error will be returned.
// Deletion is carried out regardless of the enableDirMarker flag
func (cl *Client) DeleteDirectory(name string) error {
log.Trace("Client::DeleteDirectory : name %s", name)

// Delete the current directory
// make sure name has a trailing slash
name = internal.ExtendDirName(name)

done := false
var marker *string
var err error
for !done {
// list all objects with the prefix
objects, marker, err := cl.List(name, marker, 0)
if err != nil {
log.Warn(
"Client::DeleteDirectory : Failed to list object with prefix %s. Here's why: %v",
name,
err,
)
return err
}

// we have no way of indicating empty folders in the bucket
// so if there are no objects with this prefix we can either:
// 1. return an error when the user tries to delete an empty directory, or
// 2. fail to return an error when trying to delete a non-existent directory
// the second one seems much less risky, so we don't check for an empty list here

// List only returns the objects and prefixes up to the next "/" character after the prefix
// This is because List is setting the Delimiter field to "/"
// This means that recursive directory deletion actually needs to be recursive.
// Delete all found objects *and prefixes ("directories")*.
// For improved performance, we'll use one call to delete all objects in this directory.
// To make one call, we need to make a list of objects to delete first.
var objectsToDelete []*internal.ObjAttr
for _, object := range objects {
if object.IsDir() {
err = cl.DeleteDirectory(object.Path)
if err != nil {
log.Err(
"Client::DeleteDirectory : Failed to delete directory %s. Here's why: %v",
object.Path,
err,
)
}
} else {
objectsToDelete = append(
objectsToDelete,
object,
) //consider just object instead of object.path to pass down attributes that come from list.
}
}
// Delete the collected files
err = cl.deleteObjects(objectsToDelete)
if err != nil {
log.Err(
"Client::DeleteDirectory : deleteObjects() failed when called with %d objects. Here's why: %v",
len(objectsToDelete),
err,
)
}

if marker == nil {
done = true
}
}

// Delete the current directory
if cl.Config.enableDirMarker {
err = cl.deleteObject(name, false, true)
if err != nil {
log.Err(
"Client::DeleteDirectory : Failed to delete directory %s. Here's why: %v",
name,
err,
)
}
err := cl.deleteObject(name, false, true)
if err != nil {
log.Err(
"Client::DeleteDirectory : Failed to delete directory %s. Here's why: %v",
name,
err,
)
}

return err
Expand Down Expand Up @@ -667,23 +605,8 @@ func (cl *Client) getDirectoryAttr(
) (*internal.ObjAttr, error) {
log.Trace("Client::getDirectoryAttr : name %s", dirName)

objects, _, listErr := cl.List(dirName, nil, 1)

// Otherwise, the cloud does not support directory markers, or there is no
// marker, so look for an object in the directory.
if listErr != nil {
log.Err("Client::getDirectoryAttr : List(%s) failed. Here's why: %v", dirName, listErr)
return nil, listErr
}
if len(objects) > 0 {
// create and return an objAttr for the directory
attr := internal.CreateObjAttrDir(dirName)
return attr, nil
}

// Only check for explicit empty directory markers when needed.
// For file-like names, this saves one extra HeadObject
// call on miss-heavy paths that are not directories.
// When directory markers are enabled, check for the marker first via
// HeadObject (cheap, single-key lookup) before falling back to a List.
if cl.Config.enableDirMarker && shouldProbeDirMarker(dirName, explicitDirLookup) {
headAttr, headErr := cl.headObject(dirName, false, true)
if headErr == nil {
Expand All @@ -699,6 +622,20 @@ func (cl *Client) getDirectoryAttr(
}
}

// Either directory markers are disabled, there is no marker, or the name
// was skipped by shouldProbeDirMarker. Fall back to listing objects under
// the prefix to detect a non-empty directory.
objects, _, listErr := cl.List(dirName, nil, 1)
if listErr != nil {
log.Err("Client::getDirectoryAttr : List(%s) failed. Here's why: %v", dirName, listErr)
return nil, listErr
}
if len(objects) > 0 {
// create and return an objAttr for the directory
attr := internal.CreateObjAttrDir(dirName)
return attr, nil
}

// directory not found in bucket
log.Err("Client::getDirectoryAttr : not found: %s", dirName)
return nil, syscall.ENOENT
Expand Down
4 changes: 2 additions & 2 deletions component/s3storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,13 +826,13 @@ func (s *clientTestSuite) TestDeleteDirectory() {
err = s.client.DeleteDirectory(dirName)
s.assert.NoError(err)

// file in directory should no longer be there
// file in directory should still be there (only marker is deleted, not contents)
_, err = s.awsS3Client.GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String(s.client.Config.AuthConfig.BucketName),
Key: aws.String(path.Join(dirName, fileName)),
ChecksumMode: types.ChecksumModeEnabled,
})
s.assert.Error(err)
s.assert.NoError(err)
}
func (s *clientTestSuite) TestRenameFile() {
defer s.cleanupTest()
Expand Down
40 changes: 24 additions & 16 deletions component/s3storage/s3storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ func (s *s3StorageTestSuite) TestDeleteDir() {
)
s.assert.NoError(err)

// Directory should be in the account
// Directory marker should be in the account
key := internal.ExtendDirName(
common.JoinUnixFilepath(s.s3Storage.stConfig.prefixPath, obj_path),
)
Expand All @@ -515,7 +515,7 @@ func (s *s3StorageTestSuite) TestDeleteDir() {
s.assert.NoError(err)

s.assert.NoError(err)
// Directory should not be in the account
// Directory marker should not be in the account
key = internal.ExtendDirName(
common.JoinUnixFilepath(s.s3Storage.stConfig.prefixPath, obj_path),
)
Expand All @@ -528,9 +528,9 @@ func (s *s3StorageTestSuite) TestDeleteDir() {
})
s.assert.Error(err)

// Directory be empty
// Directory is not empty (file still exists)
dirEmpty := s.s3Storage.IsDirEmpty(internal.IsDirEmptyOptions{Name: obj_path})
s.assert.True(dirEmpty)
s.assert.False(dirEmpty)
})
}
}
Expand Down Expand Up @@ -626,13 +626,24 @@ func (s *s3StorageTestSuite) TestDeleteDirHierarchy() {

s.assert.NoError(err)

/// a paths should be deleted
// Only the directory marker for base is deleted, all nested paths still exist
// Check that the directory marker is removed
baseMarkerKey := internal.ExtendDirName(
common.JoinUnixFilepath(s.s3Storage.stConfig.prefixPath, base),
)
_, err = s.awsS3Client.GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String(
s.s3Storage.Storage.(*Client).Config.AuthConfig.BucketName,
),
Key: aws.String(baseMarkerKey),
ChecksumMode: types.ChecksumModeEnabled,
})
s.assert.Error(err)

// But all nested paths should still exist
a.PushBackList(ab)
a.PushBackList(ac)
for p := a.Front(); p != nil; p = p.Next() {
_, err = s.s3Storage.GetAttr(internal.GetAttrOptions{Name: p.Value.(string)})
s.assert.Error(err)
}
ab.PushBackList(ac) // ab and ac paths should exist
for p := ab.Front(); p != nil; p = p.Next() {
_, err = s.s3Storage.GetAttr(internal.GetAttrOptions{Name: p.Value.(string)})
s.assert.NoError(err)
}
Expand All @@ -659,14 +670,11 @@ func (s *s3StorageTestSuite) TestDeleteSubDirPrefixPath() {

err = s.s3Storage.Storage.SetPrefixPath(s.s3Storage.stConfig.prefixPath)
s.assert.NoError(err)
// a paths under c1 should be deleted
// Only the c1 directory marker is deleted, nested paths under c1 still exist
for p := a.Front(); p != nil; p = p.Next() {
_, err = s.s3Storage.GetAttr(internal.GetAttrOptions{Name: p.Value.(string)})
if strings.HasPrefix(p.Value.(string), base+"/c1") {
s.assert.Error(err)
} else {
s.assert.NoError(err)
}
// All nested paths should still exist (marker deletion doesn't affect children)
s.assert.NoError(err)
}
ab.PushBackList(ac) // ab and ac paths should exist
for p := ab.Front(); p != nil; p = p.Next() {
Expand Down
41 changes: 0 additions & 41 deletions component/s3storage/s3wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,47 +208,6 @@ func (cl *Client) deleteObject(name string, isSymLink bool, isDir bool) error {
return parseS3Err(err, attemptedAction)
}

// Wrapper for awsS3Client.DeleteObjects.
// names is a list of paths to the objects.
func (cl *Client) deleteObjects(objects []*internal.ObjAttr) error {
if objects == nil {
return nil
}
log.Trace("Client::deleteObjects : deleting %d objects", len(objects))
// build list to send to DeleteObjects
keyList := make([]types.ObjectIdentifier, len(objects))
for i, object := range objects {
key := cl.getKey(object.Path, object.IsSymlink(), object.IsDir())
keyList[i] = types.ObjectIdentifier{
Key: &key,
}
}
// send keyList for deletion
result, err := cl.AwsS3Client.DeleteObjects(context.Background(), &s3.DeleteObjectsInput{
Bucket: &cl.Config.AuthConfig.BucketName,
Delete: &types.Delete{
Objects: keyList,
Quiet: aws.Bool(true),
},
})
if err != nil {
log.Err(
"Client::DeleteDirectory : Failed to delete %d files. Here's why: %v",
len(objects),
err,
)
for i := 0; i < len(result.Errors); i++ {
log.Err(
"Client::DeleteDirectory : Failed to delete key %s. Here's why: %s",
*result.Errors[i].Key,
*result.Errors[i].Message,
)
}
}

return err
}

// Wrapper for awsS3Client.HeadObject.
// HeadObject() acts just like GetObject, except no contents are returned.
// So this is used to get metadata / attributes for an object.
Expand Down
Loading