Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 163 additions & 111 deletions cadetrdm/batch_running/case.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
from __future__ import annotations

import os
import traceback
import warnings
from pathlib import Path
import subprocess
from typing import Dict
from typing import Any

# from cadetrdm.container.containerAdapter import ContainerAdapter
from cadetrdm.batch_running import Study
from cadetrdm.repositories import ProjectRepo
from cadetrdm import Options
from cadetrdm.environment import Environment
from cadetrdm.logging import LogEntry


class Case:
Expand Down Expand Up @@ -40,15 +41,26 @@ def __init__(
project_repo = ProjectRepo(project_repo)
self.project_repo = project_repo
self.options = options
self.environment = environment
self._options_hash = options.get_hash()
self.results_branch = None

self._current_environment = None
self.environment = environment

self.run_method = run_method

self._results_branch = None
self._results_path = None

def __str__(self):
return self.name

@property
def options_hash(self):
return self.options.get_hash()

@property
def output_repo(self):
return self.project_repo.output_repo

@property
def status_file(self):
return Path(self.project_repo.path).parent / (Path(self.project_repo.path).name + ".status")
Expand Down Expand Up @@ -103,89 +115,19 @@ def is_running(self, ):

return False

@property
def has_results_for_this_run(self):
self.results_branch = self._get_results_branch()
if self.results_branch is None:
return False
else:
return True

def _get_results_branch(self):
"""
Search the output log for an entry (i.e. a results branch) with matching study commit hash and options hash.
If environment is given, the environment is also enforced.

:return:
str: name of the results branch or None.
"""
output_log: Dict[str, LogEntry] = self.project_repo.output_log.entries
options_hash = self.options.get_hash()
study_hash = self.project_repo.current_commit_hash

found_results_with_incorrect_study_hash = False
found_results_with_incorrect_options = False
found_results_with_incorrect_environment = False

semi_correct_hits = []
for output_branch, log_entry in reversed(output_log.items()):
matches_study_hash = log_entry.matches_study_hash(study_hash)
matches_options_hash = log_entry.matches_options_hash(options_hash)
matches_environment = log_entry.fulfils_environment(self.environment)

if matches_study_hash and matches_options_hash and matches_environment:
return log_entry.output_repo_branch

elif matches_study_hash and not matches_options_hash and matches_environment:
found_results_with_incorrect_options = True
semi_correct_hits.append(
f"Found matching study commit hash {study_hash[:7]}, but incorrect options hash "
f"(needs: {options_hash[:7]}, has: {log_entry.options_hash[:7]})"
)

elif not matches_study_hash and matches_options_hash and matches_environment:
found_results_with_incorrect_study_hash = True
semi_correct_hits.append(
f"Found matching options hash {options_hash[:7]}, but incorrect study commit hash "
f"(needs: {study_hash[:7]}, has: {log_entry.project_repo_commit_hash[:7]})"
)
elif matches_study_hash and matches_options_hash and not matches_environment:
found_results_with_incorrect_environment = True
semi_correct_hits.append(
f"Found matching options hash {options_hash[:7]}, matching study commit hash "
f"{study_hash[:7]}, but wrong environment specification."
)

if found_results_with_incorrect_study_hash:
[print(line) for line in semi_correct_hits]
print(
"No matching results were found for this study version, but results with these options were found for "
"other study versions. Did you recently update the study?"
)
elif found_results_with_incorrect_options:
[print(line) for line in semi_correct_hits]
print(
"No matching results were found for these options, but results with other options were found for "
"this study versions. Did you recently change the options?"
)
elif found_results_with_incorrect_environment:
[print(line) for line in semi_correct_hits]
print(
"No matching results were found for this environment, but results with other environments were "
"found for this study versions."
)

else:
print("No matching results were found for these options and study version.")
return None

def run_study(self, force=False, container_adapter: "ContainerAdapter" = None, command: str = None) -> bool:
def run_study(
self,
force: bool = False,
container_adapter: "ContainerAdapter" | None = None,
command: str | None = None,
**load_kwargs: Any,
) -> Path | None:
"""
Run specified study commands in the given repository.

:returns
boolean indicating if the results for this case are available, either pre-computed or computed now.

Return path to results for this case if available (either
pre-computed or newly computed), else return None.
"""
if not force and self.is_running:
print(f"{self.project_repo.name} is currently running. Skipping...")
Expand All @@ -197,14 +139,16 @@ def run_study(self, force=False, container_adapter: "ContainerAdapter" = None, c
else:
print("WARNING: Not updating the repositories while in debug mode.")

if self.has_results_for_this_run and not force:
results_path = self.load(**load_kwargs)

if results_path and not force:
print(f"{self.project_repo.path} has already been computed with these options. Skipping...")
return True
return results_path

if container_adapter is None and self.can_run_study is False:
print(f"Current environment does not match required environment. Skipping...")
self.status = 'failed'
return False
return

try:
self.status = 'running'
Expand All @@ -213,55 +157,163 @@ def run_study(self, force=False, container_adapter: "ContainerAdapter" = None, c
log, return_code = container_adapter.run_case(self, command=command)
if return_code != 0:
self.status = "failed"
return False
return
else:
module = self.project_repo.module
run_method = getattr(module, self.run_method)
run_method(self.options, str(self.project_repo.path))

print("Command execution successful.")
self.status = 'finished'
return True
results_path = self.load()
return results_path

except (KeyboardInterrupt, Exception) as e:
traceback.print_exc()
self.status = 'failed'
return False
return

@property
def can_run_study(self) -> bool:
return self.environment is None or self._check_execution_environment()

def _check_execution_environment(self):
return (
self.environment is None
or
self.current_environment.fulfils_environment(self.environment)
)

@property
def current_environment(self):
if self._current_environment is None:
existing_environment = subprocess.check_output(f"conda env export", shell=True).decode()
self._current_environment = Environment.from_yml_string(existing_environment)
existing_environment = subprocess.check_output(
f"conda env export", shell=True
).decode()
self._current_environment = Environment.from_yml_string(
existing_environment
)

return self._current_environment.fulfils_environment(self.environment)
return self._current_environment

@property
def _results_path(self):
def has_results_for_this_run(self):
if self.results_branch is None:
return None
return False
else:
return self.project_repo.cache_folder_for_branch(self.results_branch)
return True

def load(self, ):
if self.results_branch is None or self.options.get_hash() != self._options_hash:
self.results_branch = self._get_results_branch()
self._options_hash = self.options.get_hash()
@property
def results_branch(self) -> str | None:
return self._get_results_branch()

if self.results_branch is None:
print(f"No results available for Case({self.project_repo.path, self.options.get_hash()[:7]})")
return None
def _get_results_branch(
self,
allow_commit_hash_mismatch: bool = False,
allow_options_hash_mismatch: bool = False,
allow_environment_mismatch: bool = False,
) -> str | None:
"""
Return the output branch matching the current study and options.

if self._results_path.exists():
return
Args:
allow_commit_hash_mismatch: If True, allow mismatched study commit hash.
allow_options_hash_mismatch: If True, allow mismatched options hash.
allow_environment_mismatch: If True, allow mismatched environment.

self.project_repo.copy_data_to_cache(self.results_branch)
Returns:
str | None: Name of the results branch, or None if no match found.
"""
options_hash = self.options_hash
commit_hash = self.project_repo.current_commit_hash
log_entries = self.output_repo.output_log.entries

for output_repo_branch, entry in reversed(log_entries.items()):
environment_ok = entry.fulfils_environment(self.environment)

# Exact match (all properties match)
if (
entry.options_hash == options_hash
and entry.project_repo_commit_hash == commit_hash
and environment_ok
):
return output_repo_branch

# Check environment
if not allow_environment_mismatch and not environment_ok:
continue

# Check study hash
if entry.project_repo_commit_hash != commit_hash and not allow_commit_hash_mismatch:
continue

# Check options hash
if entry.options_hash != options_hash and not allow_options_hash_mismatch:
continue

# Semi-correct match (at least one mismatch, but allowed)
msg_parts = []
if entry.options_hash != options_hash:
msg_parts.append(
"mismatched options hash "
f"(needs: {options_hash[:7]}, "
f"has: {entry.options_hash[:7]})"
)
if entry.project_repo_commit_hash != commit_hash:
msg_parts.append(
"mismatched project repo hash "
f"(needs: {commit_hash[:7]}, "
f"has: {entry.project_repo_commit_hash[:7]})"
)
if not entry.fulfils_environment(self.environment):
msg_parts.append("mismatched environment")

if msg_parts:
msg = "Found matching entry with: " + ", ".join(msg_parts)
print(f"{msg}. Using results from branch: {output_repo_branch}")
return output_repo_branch

print("No matching results found.")

@property
def results_path(self):
self.load()
def results_path(self) -> Path | None:
return self.load()

def load(
self,
allow_commit_hash_mismatch: bool = False,
allow_options_hash_mismatch: bool = False,
allow_environment_mismatch: bool = False,
) -> Path | None:
"""
Load results for the current case.

Args:
allow_commit_hash_mismatch: If True, allow loading results with mismatched study commit hash.
allow_options_hash_mismatch: If True, allow loading results with mismatched options hash.
allow_environment_mismatch: If True, allow loading results with mismatched environment.

Returns:
Path to results.
"""
results_branch = self._get_results_branch(
allow_commit_hash_mismatch=allow_commit_hash_mismatch,
allow_options_hash_mismatch=allow_options_hash_mismatch,
allow_environment_mismatch=allow_environment_mismatch,
)

# Early exit if no results are available
if results_branch is None:
print(
f"No results available for Case("
f"project_repo={self.project_repo.path}, "
f"options_hash={self.options_hash[:7]}"
f")"
)
return

# Load results if the path exists, otherwise fetch them
results_path = self.project_repo.copy_data_to_cache(results_branch)
if not results_path.exists():
print("Failed to fetch results.")
return

return self._results_path
return results_path
Loading