Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 102 additions & 69 deletions component/attr_cache/attr_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,6 @@ func (ac *AttrCache) deleteDirectory(path string, deletedAt time.Time) error {
return nil
}

// does the cache show this path as existing?
func (ac *AttrCache) pathExistsInCache(path string) bool {
item, found := ac.cache.get(path)
if !found {
return false
}
return item.exists()
}

// returns the parent directory (without a trailing slash)
func getParentDir(childPath string) string {
parentDir := path.Dir(internal.TruncateDirName(childPath))
Expand Down Expand Up @@ -466,33 +457,43 @@ func (ac *AttrCache) CreateDir(options internal.CreateDirOptions) error {
ac.cacheLock.Lock()
defer ac.cacheLock.Unlock()
// does the directory already exist?
oldDirAttrCacheItem, found := ac.cache.get(options.Name)
directoryAlreadyExists := found && oldDirAttrCacheItem.exists()
dirAttrCacheItem, found := ac.cache.get(options.Name)
directoryAlreadyExists := found && dirAttrCacheItem.exists()
// if the attribute cache tracks directory existence
// then prevent redundant directory creation
if ac.cacheDirs && directoryAlreadyExists {
return os.ErrExist
}
// invalidate existing directory entry (this is redundant but readable)
if found {
oldDirAttrCacheItem.invalidate()
}
// add (or replace) the directory entry
newDirAttr := internal.CreateObjAttrDir(options.Name)
newDirAttrCacheItem := ac.cache.insert(insertOptions{
attr: newDirAttr,
exists: true,
cachedAt: currentTime,
})
if newDirAttrCacheItem != nil {
newDirAttrCacheItem.setMode(options.Mode)
}
// update flags for tracking directory existence
if ac.cacheDirs && newDirAttrCacheItem != nil {
newDirAttrCacheItem.markInCloud(false)
if directoryAlreadyExists {
if ac.cacheDirs {
return os.ErrExist
}
} else {
// invalidate existing directory entry (this is redundant but readable)
if found {
dirAttrCacheItem.invalidate()
}
// add (or replace) the directory entry
newDirAttr := internal.CreateObjAttrDir(options.Name)
dirAttrCacheItem = ac.cache.insert(insertOptions{
attr: newDirAttr,
exists: true,
cachedAt: currentTime,
})
// insert returns nil when entries are maxed out
if dirAttrCacheItem != nil {
// update flag for tracking directory existence
if ac.cacheDirs {
dirAttrCacheItem.markInCloud(false)
}
// this is a new directory, so we have a complete (empty) listing for it
dirAttrCacheItem.listingComplete = true
}
// if this is a new entry, update the parent directory timestamps
if err == nil {
ac.touchParentDirTimes(options.Name, currentTime, ac.cacheDirs)
}
}
if err == nil && !directoryAlreadyExists {
ac.touchParentDirTimes(options.Name, currentTime, ac.cacheDirs)
// if returning success, update the mode
if err == nil && dirAttrCacheItem != nil {
dirAttrCacheItem.setMode(options.Mode)
}
}
return err
Expand Down Expand Up @@ -579,15 +580,6 @@ func (ac *AttrCache) StreamDir(
}
}
}
// add cached items in
if len(cachedPathList) > 0 {
log.Info(
"AttrCache::StreamDir : %s merging in %d list cache entries...",
options.Name,
len(cachedPathList),
)
pathList = append(pathList, cachedPathList...)
}
// values should be returned in ascending order by key, without duplicates
// sort
slices.SortFunc[[]*internal.ObjAttr, *internal.ObjAttr](
Expand All @@ -603,7 +595,18 @@ func (ac *AttrCache) StreamDir(
return a.Path == b.Path
},
)
ac.cacheListSegment(pathList, options.Name, options.Token, nextToken)
// cache the listing (if there was no error)
if err == nil {
// record when the directory was listed, an up to what token
// this will allow us to serve directory listings from this cache
ac.cacheListSegment(pathList, options.Name, options.Token, nextToken)
// if the listing is complete, record the fact that we have a complete listing
if nextToken == "" {
ac.markListingComplete(options.Name)
}
} else {
log.Err("AttrCache::StreamDir : %s encountered error [%v]", options.Name, err)
}
log.Trace("AttrCache::StreamDir : %s returning %d entries", options.Name, len(pathList))
return pathList, nextToken, err
}
Expand All @@ -616,9 +619,8 @@ func (ac *AttrCache) fetchCachedDirList(
path string,
token string,
) ([]*internal.ObjAttr, string, error) {
var pathList []*internal.ObjAttr
if !ac.cacheOnList {
return pathList, "", fmt.Errorf("cache on list is disabled")
return nil, "", fmt.Errorf("cache on list is disabled")
}
// start accessing the cache
ac.cacheLock.RLock()
Expand All @@ -627,25 +629,22 @@ func (ac *AttrCache) fetchCachedDirList(
listDirCache, found := ac.cache.get(path)
if !found {
log.Warn("AttrCache::fetchCachedDirList : %s directory not found in cache", path)
return pathList, "", fmt.Errorf("%s directory not found in cache", path)
return nil, "", fmt.Errorf("%s directory not found in cache", path)
}
// is the requested data cached?
if listDirCache.listCache == nil {
listDirCache.listCache = make(map[string]listCacheSegment)
}
cachedListSegment, found := listDirCache.listCache[token]
if !found {
// the data for this token is not in the cache
// don't provide cached data when new (uncached) data is being requested
log.Info("AttrCache::fetchCachedDirList : %s listing segment %s not cached", path, token)
return pathList, "", fmt.Errorf("%s directory listing segment %s not cached", path, token)
return nil, "", fmt.Errorf("%s directory listing segment %s not cached", path, token)
}
// check timeout
if time.Since(cachedListSegment.cachedAt).Seconds() >= float64(ac.cacheTimeout) {
log.Info("AttrCache::fetchCachedDirList : %s listing segment %s cache expired", path, token)
// drop the invalid segment from the list cache
delete(listDirCache.listCache, token)
return pathList, "", fmt.Errorf(
return nil, "", fmt.Errorf(
"%s directory listing segment %s cache expired",
path,
token,
Expand Down Expand Up @@ -727,6 +726,15 @@ func (ac *AttrCache) cacheListSegment(
listDirPath, token, nextToken, len(pathList))
}

func (ac *AttrCache) markListingComplete(listDirPath string) {
ac.cacheLock.Lock()
defer ac.cacheLock.Unlock()
listDirItem, found := ac.cache.get(listDirPath)
if found {
listDirItem.listingComplete = true
}
}

// IsDirEmpty: Whether or not the directory is empty
func (ac *AttrCache) IsDirEmpty(options internal.IsDirEmptyOptions) bool {
log.Trace("AttrCache::IsDirEmpty : %s", options.Name)
Expand All @@ -737,14 +745,15 @@ func (ac *AttrCache) IsDirEmpty(options internal.IsDirEmptyOptions) bool {
"AttrCache::IsDirEmpty : %s Dir cache is disabled. Checking with container",
options.Name,
)
// when offline, this will return false
return ac.NextComponent().IsDirEmpty(options)
}
// Is the directory in our cache?
ac.cacheLock.RLock()
pathInCache := ac.pathExistsInCache(options.Name)
ac.cacheLock.RUnlock()
defer ac.cacheLock.RUnlock()
item, found := ac.cache.get(options.Name)
// If the directory does not exist in the attribute cache then let the next component answer
if !pathInCache {
if !found || !item.exists() {
log.Debug(
"AttrCache::IsDirEmpty : %s not found in attr_cache. Checking with container",
options.Name,
Expand All @@ -753,10 +762,15 @@ func (ac *AttrCache) IsDirEmpty(options internal.IsDirEmptyOptions) bool {
}
log.Debug("AttrCache::IsDirEmpty : %s found in attr_cache", options.Name)
// Check if the cached directory is empty or not
if ac.anyContentsInCache(options.Name) {
if item.hasExistingChildren() {
log.Debug("AttrCache::IsDirEmpty : %s has a subpath in attr_cache", options.Name)
return false
}
// do we have a complete listing?
if item.listingComplete {
// we know the directory is empty
return true
}
// Dir is in cache but no contents are, so check with container
log.Debug(
"AttrCache::IsDirEmpty : %s children not found in cache. Checking with container",
Expand All @@ -765,16 +779,10 @@ func (ac *AttrCache) IsDirEmpty(options internal.IsDirEmptyOptions) bool {
return ac.NextComponent().IsDirEmpty(options)
}

func (ac *AttrCache) anyContentsInCache(prefix string) bool {
ac.cacheLock.RLock()
defer ac.cacheLock.RUnlock()

directory, found := ac.cache.get(prefix)
if found && directory.exists() {
for _, chldItem := range directory.children {
if chldItem.exists() {
return true
}
func (value *attrCacheItem) hasExistingChildren() bool {
for _, childItem := range value.children {
if childItem.exists() {
return true
}
}
return false
Expand All @@ -796,7 +804,7 @@ func (ac *AttrCache) RenameDir(options internal.RenameDirOptions) error {
if ac.cacheDirs {
// if attr_cache is tracking directories, validate this rename
// First, check if the destination directory already exists
if ac.pathExistsInCache(options.Dst) {
if item, found := ac.cache.get(options.Dst); found && item.exists() {
return os.ErrExist
}
} else {
Expand Down Expand Up @@ -1159,17 +1167,42 @@ func (ac *AttrCache) GetAttr(options internal.GetAttrOptions) (*internal.ObjAttr

ac.cacheLock.RLock()
value, found := ac.cache.get(options.Name)
ac.cacheLock.RUnlock()
if found && value.valid() && time.Since(value.cachedAt).Seconds() < float64(ac.cacheTimeout) {
// Try to serve the request from the attribute cache
// Is the entry marked deleted?
ac.cacheLock.RUnlock()
// Serve the request from the attribute cache
if !value.exists() {
log.Debug("AttrCache::GetAttr : %s (ENOENT) served from cache", options.Name)
return nil, syscall.ENOENT
} else {
return value.attr, nil
}
}
if ac.cacheDirs {
// drill up for the nearest valid parent directory attribute cache
foundCachedParent := false
for parent := getParentDir(options.Name); ; parent = getParentDir(parent) {
value, found = ac.cache.get(parent)
// skip invalid data
if !found || !value.valid() {
if parent == "" {
// no valid parent found
break
}
continue
}
// don't trust expired entries
if time.Since(value.cachedAt).Seconds() > float64(ac.cacheTimeout) {
break
}
// found the nearest cached parent
// if it does not exist, or has a complete listing, then our target does not exist
foundCachedParent = !value.exists() || value.listingComplete
}
ac.cacheLock.RUnlock()
if foundCachedParent {
return nil, syscall.ENOENT
}
}

// Get the attributes from next component and cache them
pathAttr, err := ac.NextComponent().GetAttr(options)
Expand Down
2 changes: 2 additions & 0 deletions component/attr_cache/cacheMap.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type attrCacheItem struct {
attrFlag common.BitMap64
children map[string]*attrCacheItem
parent *attrCacheItem

listingComplete bool
}

// all cache entries are organized into this structure
Expand Down
Loading