Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions audb/core/define.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
TIMEOUT = 86400 # 24 h
CACHED_VERSIONS_TIMEOUT = 10 # Timeout to acquire access to cached versions
LOCK_FILE = ".lock"
COMPLETE_FILE = ".complete"
TIMEOUT_MSG = "Lock could not be acquired. Timeout exceeded."


Expand Down
285 changes: 166 additions & 119 deletions audb/core/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,21 @@ def check() -> bool:
)
audeer.rmdir(db_root_tmp)

# Create .complete file to signal completion
complete_file = os.path.join(db_root, define.COMPLETE_FILE)
audeer.touch(complete_file)
Comment on lines +196 to +198
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Consider atomic file creation for the .complete file.

audeer.touch may not guarantee atomicity, risking race conditions if multiple processes run concurrently. Use an atomic method, like writing to a temp file and renaming, to ensure .complete is reliably created after completion.

Suggested change
# Create .complete file to signal completion
complete_file = os.path.join(db_root, define.COMPLETE_FILE)
audeer.touch(complete_file)
# Atomically create .complete file to signal completion
complete_file = os.path.join(db_root, define.COMPLETE_FILE)
tmp_complete_file = complete_file + ".tmp"
with open(tmp_complete_file, "w") as f:
pass # create empty file
os.replace(tmp_complete_file, complete_file)



def _database_is_complete(
db: audformat.Database,
) -> bool:
# First check for .complete file
if "audb" in db.meta and "root" in db.meta["audb"]:
complete_file = os.path.join(db.meta["audb"]["root"], define.COMPLETE_FILE)
if os.path.exists(complete_file):
Comment on lines +205 to +207
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Checking for the .complete file before metadata may mask incomplete states.

A stale or incorrectly created .complete file could cause a false positive. Consider adding validation to ensure the .complete file reliably indicates database completeness.

return True

# Fallback to checking metadata
complete = False
if "audb" in db.meta:
if "complete" in db.meta["audb"]:
Expand Down Expand Up @@ -1152,64 +1163,48 @@ def load(
)

try:
with FolderLock(db_root, timeout=timeout):
# Start with database header without tables
# Check if database is already complete by looking for .complete file
complete_file = os.path.join(db_root, define.COMPLETE_FILE)
if os.path.exists(complete_file):
# Database is complete, no need to lock
db, backend_interface = load_header_to(
Comment on lines 1163 to 1170
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Bypassing the FolderLock when .complete exists could allow concurrent modifications.

This approach may introduce race conditions if .complete is created before all data is written. Evaluate whether the lock should still be used for reads or if other safeguards are necessary.

db_root,
name,
version,
flavor=flavor,
add_audb_meta=True,
)

db_is_complete = _database_is_complete(db)

# load attachments
if not db_is_complete and not only_metadata:
# filter attachments
requested_attachments = filter_deps(
attachments,
db.attachments,
"attachment",
)

cached_versions = _load_attachments(
requested_attachments,
backend_interface,
db_is_complete = True
else:
# Database is not complete, need to lock
with FolderLock(db_root, timeout=timeout):
# Start with database header without tables
db, backend_interface = load_header_to(
db_root,
db,
name,
version,
cached_versions,
deps,
flavor,
cache_root,
num_workers,
verbose,
flavor=flavor,
add_audb_meta=True,
)

# filter tables (convert regexp pattern to list of tables)
requested_tables = filter_deps(tables, list(db), "table")

# add/split into misc tables used in a scheme
# and all other (misc) tables
requested_misc_tables = _misc_tables_used_in_scheme(db)
requested_tables = [
table
for table in requested_tables
if table not in requested_misc_tables
]

# load missing tables
if not db_is_complete:
for _tables in [
requested_misc_tables,
requested_tables,
]:
# need to load misc tables used in a scheme first
# as loading is done in parallel
cached_versions = _load_files(
_tables,
"table",
# Double-check completion status after acquiring lock
complete_file = os.path.join(db_root, define.COMPLETE_FILE)
if os.path.exists(complete_file):
db_is_complete = True
else:
Comment on lines +1190 to +1194
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Double-checking .complete after acquiring the lock may not be sufficient if file creation is not atomic.

Using non-atomic file creation for .complete can lead to race conditions where other processes detect the file before the database is fully ready. Use atomic file creation or a more reliable signaling method to prevent this issue.

Suggested change
# Double-check completion status after acquiring lock
complete_file = os.path.join(db_root, define.COMPLETE_FILE)
if os.path.exists(complete_file):
db_is_complete = True
else:
# Double-check completion status after acquiring lock using atomic file creation
complete_file = os.path.join(db_root, define.COMPLETE_FILE)
try:
# Try to atomically create the .complete file
fd = os.open(complete_file, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
with os.fdopen(fd, "w") as f:
f.write("complete\n")
db_is_complete = False # We are the first to create it, so not yet complete
except FileExistsError:
# If file exists, another process has completed the database
db_is_complete = True

db_is_complete = _database_is_complete(db)

# load attachments
if not db_is_complete and not only_metadata:
# filter attachments
requested_attachments = filter_deps(
attachments,
db.attachments,
"attachment",
)

cached_versions = _load_attachments(
requested_attachments,
backend_interface,
db_root,
db,
Expand All @@ -1218,34 +1213,31 @@ def load(
deps,
flavor,
cache_root,
pickle_tables,
num_workers,
verbose,
)
requested_tables = requested_misc_tables + requested_tables

# filter tables
if tables is not None:
db.pick_tables(requested_tables)

# load tables
for table in requested_tables:
db[table].load(os.path.join(db_root, f"db.{table}"))

# filter media
requested_media = filter_deps(
media,
db.files,
"media",
name,
version,
)

# load missing media
if not db_is_complete and not only_metadata:
# filter tables (convert regexp pattern to list of tables)
requested_tables = filter_deps(tables, list(db), "table")

# add/split into misc tables used in a scheme
# and all other (misc) tables
requested_misc_tables = _misc_tables_used_in_scheme(db)
requested_tables = [
table for table in requested_tables if table not in requested_misc_tables
]

# load missing tables
if not db_is_complete:
for _tables in [
requested_misc_tables,
requested_tables,
]:
# need to load misc tables used in a scheme first
# as loading is done in parallel
cached_versions = _load_files(
requested_media,
"media",
_tables,
"table",
backend_interface,
db_root,
db,
Expand All @@ -1254,45 +1246,81 @@ def load(
deps,
flavor,
cache_root,
False,
pickle_tables,
num_workers,
verbose,
)
requested_tables = requested_misc_tables + requested_tables

# filter media
if media is not None or tables is not None:
db.pick_files(requested_media)
# filter tables
Comment on lines 1246 to +1255
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Potential for incomplete state if .complete file is created before all data is written.

Ensure the .complete file is only created after all data and metadata are fully written and flushed to disk to prevent incorrect assumptions about database completeness.

if tables is not None:
db.pick_tables(requested_tables)

if not removed_media:
_remove_media(db, deps, num_workers, verbose)
# load tables
for table in requested_tables:
db[table].load(os.path.join(db_root, f"db.{table}"))

# Adjust full paths and file extensions in tables
_update_path(
db,
# filter media
requested_media = filter_deps(
media,
db.files,
"media",
name,
version,
)

# load missing media
if not db_is_complete and not only_metadata:
cached_versions = _load_files(
requested_media,
"media",
backend_interface,
db_root,
full_path,
flavor.format,
db,
version,
cached_versions,
deps,
flavor,
cache_root,
False,
num_workers,
verbose,
)

# set file durations
_files_duration(
# filter media
if media is not None or tables is not None:
db.pick_files(requested_media)

if not removed_media:
_remove_media(db, deps, num_workers, verbose)

# Adjust full paths and file extensions in tables
_update_path(
db,
db_root,
full_path,
flavor.format,
num_workers,
verbose,
)

# set file durations
_files_duration(
db,
deps,
requested_media,
flavor.format,
)

# check if database is now complete
if not db_is_complete:
_database_check_complete(
db,
db_root,
flavor,
deps,
requested_media,
flavor.format,
)

# check if database is now complete
if not db_is_complete:
_database_check_complete(
db,
db_root,
flavor,
deps,
)

except filelock.Timeout:
utils.timeout_warning()

Expand Down Expand Up @@ -1592,42 +1620,61 @@ def load_media(
raise ValueError(msg)

try:
with FolderLock(db_root, timeout=timeout):
# Start with database header without tables
# Check if database is already complete by looking for .complete file
complete_file = os.path.join(db_root, define.COMPLETE_FILE)
if os.path.exists(complete_file):
# Database is complete, no need to lock
db, backend_interface = load_header_to(
db_root,
name,
version,
flavor=flavor,
add_audb_meta=True,
)

db_is_complete = _database_is_complete(db)

# load missing media
if not db_is_complete:
_load_files(
media,
"media",
backend_interface,
db_is_complete = True
else:
# Database is not complete, need to lock
with FolderLock(db_root, timeout=timeout):
# Start with database header without tables
db, backend_interface = load_header_to(
db_root,
db,
name,
version,
None,
deps,
flavor,
cache_root,
False,
num_workers,
verbose,
flavor=flavor,
add_audb_meta=True,
)

if format is not None:
media = [audeer.replace_file_extension(m, format) for m in media]
files = [
os.path.join(db_root, os.path.normpath(file)) # convert "/" to os.sep
for file in media
]
# Double-check completion status after acquiring lock
complete_file = os.path.join(db_root, define.COMPLETE_FILE)
if os.path.exists(complete_file):
db_is_complete = True
else:
db_is_complete = _database_is_complete(db)

# load missing media
if not db_is_complete:
_load_files(
media,
"media",
backend_interface,
db_root,
db,
version,
None,
deps,
flavor,
cache_root,
False,
num_workers,
verbose,
)

if format is not None:
media = [audeer.replace_file_extension(m, format) for m in media]
files = [
os.path.join(db_root, os.path.normpath(file)) # convert "/" to os.sep
for file in media
]

except filelock.Timeout:
utils.timeout_warning()
Expand Down
Loading
Loading