diff --git a/pyproject.toml b/pyproject.toml index c6203ad..aaa0397 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,9 +12,12 @@ lxml = "^4.9.3" bagit = "^1.8.1" [tool.poetry.scripts] -report_ftk_extents = 'digarch_scripts.report_ftk_extents:main' -report_hdd_extents = 'digarch_scripts.report_hdd_extents:main' -package_cloud = 'ipres_package_cloud.package_cloud:main' +report_ftk_extents = 'digarch_scripts.report.report_ftk_extents:main' +report_hdd_extents = 'digarch_scripts.report.report_hdd_extents:main' +package_cloud = 'digarch_scripts.package.package_cloud:main' +package_images = 'digarch_scripts.package.package_images:main' +package_filetransfer = 'digarch_scripts.package.package_filetransfer:main' +transfer_rsync = 'digarch_scripts.transfer.transfer_rsync:main' [tool.poetry.group.dev.dependencies] nox = "^2023.4.22" diff --git a/rsync.log b/rsync.log new file mode 100644 index 0000000..1a4094f --- /dev/null +++ b/rsync.log @@ -0,0 +1,3 @@ +2024/05/29 15:07:45 [46235] building file list +2024/05/29 15:07:45 [46235] , tests/fixtures/rsync/rsync_files/file.01, 3072, f075a8d6d4df7509d39a3140bbae9fcd +2024/05/29 15:07:45 [46235] sent 3206 bytes received 41 bytes total size 3072 diff --git a/src/digarch_scripts/lint/lint_ft.py b/src/digarch_scripts/lint/lint_ft.py index 258adbc..334d6c1 100644 --- a/src/digarch_scripts/lint/lint_ft.py +++ b/src/digarch_scripts/lint/lint_ft.py @@ -7,6 +7,7 @@ LOGGER = logging.getLogger(__name__) + def _configure_logging(log_folder: Path): log_fn = datetime.now().strftime("lint_%Y_%m_%d_%H_%M.log") log_fpath = log_folder / log_fn @@ -21,15 +22,14 @@ def _configure_logging(log_folder: Path): encoding="utf-8", ) + def parse_args() -> argparse.Namespace: """Validate and return command-line args""" def extant_dir(p): path = Path(p) if not path.is_dir(): - raise argparse.ArgumentTypeError( - f'{path} does not exist' - ) + raise argparse.ArgumentTypeError(f"{path} does not exist") return path def list_of_paths(p): @@ -43,28 +43,21 @@ def list_of_paths(p): parser = argparse.ArgumentParser() parser.add_argument( - '--package', - type=extant_dir, - nargs='+', - dest='packages', - action='extend' + "--package", type=extant_dir, nargs="+", dest="packages", action="extend" ) parser.add_argument( - '--directory', - type=list_of_paths, - dest='packages', - action='extend' + "--directory", type=list_of_paths, dest="packages", action="extend" ) parser.add_argument( - '--log_folder', - help='''Optional. Designate where to save the log file, - or it will be saved in current directory''', - default='.' + "--log_folder", + help="""Optional. Designate where to save the log file, + or it will be saved in current directory""", + default=".", ) - return parser.parse_args() + def package_has_valid_name(package: Path) -> bool: """Top level folder name has to conform to ACQ_####_######""" folder_name = package.name @@ -76,15 +69,17 @@ def package_has_valid_name(package: Path) -> bool: LOGGER.error(f"{folder_name} does not conform to ACQ_####_######") return False + def package_has_two_subfolders(package: Path) -> bool: """There must be two subfolders in the package""" - pkg_folders = [ x for x in package.iterdir() if x.is_dir() ] + pkg_folders = [x for x in package.iterdir() if x.is_dir()] if len(pkg_folders) == 2: return True else: LOGGER.error(f"{package} does not have exactly two subfolders") return False + def package_has_valid_subfolder_names(package: Path) -> bool: """Second level folders must be objects and metadata folder""" expected = set(["objects", "metadata"]) @@ -98,6 +93,7 @@ def package_has_valid_subfolder_names(package: Path) -> bool: ) return False + def package_has_no_hidden_file(package: Path) -> bool: """The package should not have any hidden file""" hidden_ls = [ @@ -111,10 +107,11 @@ def package_has_no_hidden_file(package: Path) -> bool: else: return True + def package_has_no_zero_bytes_file(package: Path) -> bool: """The package should not have any zero bytes file""" - all_file = [ f for f in package.rglob("*") if f.is_file() ] - zero_bytes_ls = [ f for f in all_file if f.stat().st_size == 0 ] + all_file = [f for f in package.rglob("*") if f.is_file()] + zero_bytes_ls = [f for f in all_file if f.stat().st_size == 0] if zero_bytes_ls: LOGGER.warning(f"{package.name} has zero bytes file {zero_bytes_ls}") @@ -122,6 +119,7 @@ def package_has_no_zero_bytes_file(package: Path) -> bool: else: return True + def metadata_folder_is_flat(package: Path) -> bool: """The metadata folder should not have folder structure""" metadata_path = package / "metadata" @@ -132,40 +130,49 @@ def metadata_folder_is_flat(package: Path) -> bool: else: return True + def metadata_folder_has_files(package: Path) -> bool: """The metadata folder should have one or more file""" metadata_path = package / "metadata" - md_files_ls = [ x for x in metadata_path.rglob("*") if x.is_file() ] + md_files_ls = [x for x in metadata_path.rglob("*") if x.is_file()] if md_files_ls: return True else: LOGGER.warning(f"{package.name} metadata folder does not have any files") return False + def metadata_has_correct_naming_convention(package: Path) -> bool: """The metadata file name should be in the accepted list""" metadata_path = package / "metadata" accepted_fn = ["rclone.log"] - md_files_ls = [ x for x in metadata_path.rglob("*") if x.is_file() ] + md_files_ls = [x for x in metadata_path.rglob("*") if x.is_file()] nonconforming = [] for file in md_files_ls: if not file.name in accepted_fn: nonconforming.append(file) if nonconforming: - LOGGER.error(f"""{package.name} has nonconforming metadata file(s): - {nonconforming}""") + LOGGER.error( + f"""{package.name} has nonconforming metadata file(s): + {nonconforming}""" + ) return False else: return True + def objects_folder_correct_structure(package: Path) -> bool: """objects folder should have a data folder, which includes four files: bag-info.txt, bagit.txt, manifest-md5.txt and tagmanifest-md5.txt""" expected_paths = [] - expected_files = ["bag-info.txt", "bagit.txt", - "manifest-md5.txt", "tagmanifest-md5.txt"] + expected_files = [ + "bag-info.txt", + "bagit.txt", + "manifest-md5.txt", + "tagmanifest-md5.txt", + ] missing = [] data_folder = package / "objects" / "data" @@ -180,16 +187,19 @@ def objects_folder_correct_structure(package: Path) -> bool: missing.append(fp.name) if missing: - LOGGER.error(f"""{package.name} has incorrect structure. - missing {missing}""") + LOGGER.error( + f"""{package.name} has incorrect structure. + missing {missing}""" + ) return False else: return True + def objects_folder_has_no_empty_folder(package: Path) -> bool: """The objects folder should not have any empty folders""" objects_path = package / "objects" - folder_in_obj = [ x for x in objects_path.rglob("*") if x.is_dir() ] + folder_in_obj = [x for x in objects_path.rglob("*") if x.is_dir()] empty = [] for folder in folder_in_obj: @@ -202,6 +212,7 @@ def objects_folder_has_no_empty_folder(package: Path) -> bool: else: return True + def lint_package(package: Path) -> Literal["valide", "invalide", "needs review"]: """Run all linting tests against a package""" result = "valid" @@ -209,7 +220,7 @@ def lint_package(package: Path) -> Literal["valide", "invalide", "needs review"] less_strict_tests = [ package_has_no_hidden_file, package_has_no_zero_bytes_file, - metadata_folder_has_files + metadata_folder_has_files, ] for test in less_strict_tests: @@ -223,7 +234,7 @@ def lint_package(package: Path) -> Literal["valide", "invalide", "needs review"] metadata_folder_is_flat, metadata_has_correct_naming_convention, objects_folder_correct_structure, - objects_folder_has_no_empty_folder + objects_folder_has_no_empty_folder, ] for test in strict_tests: @@ -232,6 +243,7 @@ def lint_package(package: Path) -> Literal["valide", "invalide", "needs review"] return result + def main(): args = parse_args() _configure_logging(args.log_folder) @@ -266,7 +278,9 @@ def main(): print( f""" The following {len(needs_review)} packages need review. - They may be passed without change after review: {needs_review}""") + They may be passed without change after review: {needs_review}""" + ) + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/src/digarch_scripts/package/package_base.py b/src/digarch_scripts/package/package_base.py new file mode 100644 index 0000000..ffb564d --- /dev/null +++ b/src/digarch_scripts/package/package_base.py @@ -0,0 +1,408 @@ +import argparse +import logging +import os +import re +from datetime import date +from pathlib import Path + +import bagit + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.INFO) + + +class TransferParser(argparse.ArgumentParser): + def extant_path(self, p: str) -> Path: + path = Path(p) + if not path.exists(): + raise argparse.ArgumentTypeError(f"{path} does not exist") + return path + + def acq_id(self, id: str) -> Path: + pattern = r"ACQ_\d{4}" + old_pattern = r"M\d{4-6}" + if not re.match(pattern, id): + if not re.match(old_pattern, id): + raise argparse.ArgumentTypeError( + f"{id} does not match the expected {type} pattern, {pattern}" + ) + return id + + def carrier_id(self, id: str) -> Path: + pattern = r"ACQ_\d{4}_\d{6,7}" + old_pattern = r"M\d{4-6}_\d{6,7}" + if not re.match(pattern, id): + if not re.match(old_pattern, id): + raise argparse.ArgumentTypeError( + f"{id} does not match the expected {type} pattern, {pattern}" + ) + return id + + def add_acqid(self) -> None: + self.add_argument( + "--acqid", "--id", required=True, type=self.acq_id, help="ACQ_####" + ) + + def add_carrierid(self) -> None: + self.add_argument( + "--carrierid", required=True, type=self.carrier_id, help="ACQ_####_#######" + ) + + def add_source(self) -> None: + self.add_argument( + "--source", + required=True, + type=self.extant_path, + help="Path to mount carrier", + ) + + def add_payload(self) -> None: + self.add_argument( + "--payload", + required=True, + type=self.extant_path, + help="Path to files transferred from single carrier", + ) + + def add_objects_folder(self) -> None: + self.add_argument( + "--objects-folder", + required=True, + type=self.extant_path, + help="Path to working folder with file transfers from all transfers", + ) + + def add_md5(self) -> None: + self.add_argument( + "--md5", + required=True, + type=self.extant_path, + help="Path to a log with md5 checksums, e.g. rclone or rsync log", + ) + + def add_images_folder(self) -> None: + self.add_argument( + "--images_folder", + required=True, + type=self.extant_path, + help="Path to working images folder", + ) + + def add_log(self) -> None: + self.add_argument( + "--log", + required=True, + type=self.extant_path, + help="Path to a log file from the transfer process", + ) + + def add_logs_folder(self) -> None: + self.add_argument( + "--logs_folder", + required=False, + type=self.extant_path, + help="Path to working folder with logs from all transfers", + ) + + def add_streams_folder(self) -> None: + self.add_argument( + "--streams_folder", + required=False, + type=self.extant_path, + help="Path to working folder with streams from all transfers", + ) + + def add_dest(self) -> None: + self.add_argument("--dest", required=True, type=self.extant_path) + + def add_transfers(self) -> None: + self.add_argument( + "--transfers", + required=True, + type=self.extant_path, + help="Path to the directory containing all transfers", + ) + + def add_quiet(self, **kwargs) -> None: + self.add_argument("-q", "--quiet", action="store_true", **kwargs) + + +def find_category_of_carrier_files( + carrier_files: dict, acq_id: str, source_dir: Path, exts: list, category: str +) -> dict: + for path in source_dir.iterdir(): + if not path.suffix in exts: + continue + carrier_id_match = re.search(rf"{acq_id}_\d\d\d\d\d\d+", path.name) + if not carrier_id_match: + continue + carrier_id = carrier_id_match.group(0) + + if not carrier_id in carrier_files: + carrier_files[carrier_id] = {category: []} + elif not category in carrier_files[carrier_id]: + carrier_files[carrier_id][category] = [] + + carrier_files[carrier_id][category].append(path) + + return carrier_files + + +def create_acq_dir(dest: Path, acq_id: str) -> Path: + acq_dir = dest / acq_id + if acq_dir.exists(): + LOGGER.info(f"Acquisition directory already exits: {acq_dir}") + return acq_dir + + try: + acq_dir.mkdir(parents=True) + except PermissionError: + raise PermissionError(f"{dest} is not writable") + return acq_dir + + +def create_package_dir(dest: Path, id: str) -> Path: + acq_id = id.rsplit("_", 1)[0] + package_base = dest / acq_id / id + if package_base.exists(): + raise FileExistsError( + f"{package_base} already exists. Make sure you are using the correct ID" + ) + + try: + package_base.mkdir(parents=True) + except PermissionError: + raise PermissionError(f"{dest} is not writable") + return package_base + + +def move_file(file_path: Path, pkg_dir: Path, dest: str) -> None: + dest_dir = pkg_dir / dest + if not dest_dir.exists(): + dest_dir.mkdir(parents=True) + + new_file_path = dest_dir / file_path.name + if new_file_path.exists(): + raise FileExistsError( + f"{new_file_path} already exists in {dest} folder. Not moving." + ) + + file_path.rename(new_file_path) + return None + + +def move_files(file_paths: list[Path], pkg_dir: Path, dest: str) -> None: + for file_path in file_paths: + try: + move_file(file_path, pkg_dir, dest) + except FileExistsError as e: + raise Warning( + f"{e} One or more files may have already been moved to the {dest} folder" + ) + return None + + +def move_metadata_file(md_path: Path, pkg_dir: Path) -> None: + return move_file(md_path, pkg_dir, "metadata") + + +def move_metadata_files(md_paths: list[Path], pkg_dir: Path) -> None: + return move_files(md_paths, pkg_dir, "metadata") + + +def move_data_files(data_paths: list[Path], pkg_dir: Path) -> None: + return move_files(data_paths, pkg_dir, "data") + + +def create_bag_in_dir( + paths: list[Path], + pkg_dir: Path, + type: str, + manifest_source: Path = None, + source: str = None, +) -> None: + bag_dir = pkg_dir / type + bag_dir.mkdir() + + if len(paths) == 1 and paths[0].is_dir(): + paths = list(paths[0].iterdir()) + + if source == "rclone": + convert_rclone_md5_to_bagit_manifest(manifest_source, bag_dir) + elif source == "rsync": + convert_rsync_log_to_bagit_manifest(manifest_source, bag_dir) + else: + create_bagit_manifest(paths, bag_dir) + + move_data_files(paths, bag_dir) + create_bag_tag_files(bag_dir) + + +def create_bag_in_images(image_paths: list[Path], pkg_dir: Path) -> None: + create_bag_in_dir(image_paths, pkg_dir, "images") + + return None + + +def create_bag_in_streams(stream_path: Path, pkg_dir: Path) -> None: + create_bag_in_dir([stream_path], pkg_dir, "streams") + if not list(stream_path.iterdir()): + stream_path.rmdir() + + return None + + +def create_bag_in_objects( + objects_path: Path, + pkg_dir: Path, + manifest_source: Path = None, + manifest_type: str = None, +) -> None: + create_bag_in_dir( + [objects_path], pkg_dir, "objects", manifest_source, manifest_type + ) + if not list(objects_path.iterdir()): + objects_path.rmdir() + + return None + + +def create_bagit_manifest(paths: list[Path], bag_dir: Path) -> None: + # paths must be files + manifest_lines = [] + for path in paths: + if path.is_dir(): + raise IsADirectoryError(f"{path} is a directory, skipping") + md5_hash = bagit.generate_manifest_lines(str(path), ["md5"])[0][1] + manifest_lines.append([md5_hash, Path("data") / path.name]) + + with open(bag_dir / "manifest-md5.txt", "w") as f: + for line in manifest_lines: + f.write(f"{line[0]} {line[1]}\n") + + return None + + +def convert_rclone_md5_to_bagit_manifest(md5_path: Path, bag_dir: Path) -> None: + # check for manifest + new_md5_path = bag_dir / "manifest-md5.txt" + if new_md5_path.exists(): + raise FileExistsError("manifest-md5.txt already exists, review package") + + with open(md5_path, "r") as f: + manifest_data = f.readlines() + + updated_manifest = [line.replace(" ", " data/") for line in manifest_data] + # re-writes the manifest lines + with open(md5_path, "w") as f: + f.writelines(updated_manifest) + # move md5 file to manifest-md5.txt in bag + md5_path.rename(new_md5_path) + + return None + + +def convert_rsync_log_to_bagit_manifest( + rsync_log: Path, bag_dir: Path, prefix: Path = None +) -> None: + # check for manifest + new_md5_path = bag_dir / "manifest-md5.txt" + if new_md5_path.exists(): + raise FileExistsError("manifest-md5.txt already exists, review package") + + with open(rsync_log, "r") as f: + log_data = f.readlines() + + if not prefix: + prefix = os.path.commonprefix( + [ + os.path.dirname(line.split(",", 4)[3]) + for line in log_data + if len(line.split(",")) > 1 + ] + ) + else: + prefix = str(prefix) + + manifest_data = [] + + for line in log_data: + parts = line.strip().split(",", 3) + if not len(parts) == 4: + continue + + poss_rel_path = parts[3].strip().replace(prefix[1:], "data") + + poss_md5_hash = parts[2].strip().lower() + if not poss_md5_hash: + continue + elif not re.match(r"[0-9a-f]{32}", poss_md5_hash): + LOGGER.warning( + f"{str(rsync_log.name)} should be formatted with md5 hash in the 3rd comma-separated fields. Skipping this line: {line}" + ) + continue + + manifest_data.append(f"{poss_md5_hash} {poss_rel_path}\n") + + # write the manifest lines + with open(new_md5_path, "w") as f: + f.writelines(manifest_data) + + return None + + +def create_bag_tag_files(bag_dir: Path) -> None: + txt = """BagIt-Version: 0.97\nTag-File-Character-Encoding: UTF-8\n""" + with open(bag_dir / "bagit.txt", "w") as bagit_file: + bagit_file.write(txt) + + bag_info = {} + bag_info["ACQ-Object-ID"] = bag_dir.parent.name + bag_info["Bagging-Date"] = date.strftime(date.today(), "%Y-%m-%d") + bag_info["Bag-Software-Agent"] = "digarch_scripts" + total_bytes, total_files = get_oxum(bag_dir / "data") + bag_info["Payload-Oxum"] = f"{total_bytes}.{total_files}" + bagit._make_tag_file(bag_dir / "bag-info.txt", bag_info) + + return None + + +def get_oxum(payload_dir: Path) -> tuple[int, int]: + total_bytes = 0 + total_files = 0 + + for payload_file in payload_dir.rglob("*"): + if payload_file.is_file(): + total_files += 1 + total_bytes += os.stat(payload_file).st_size + + return total_bytes, total_files + + +def validate_bag(pkg_dir: Path, subfolder: str) -> None: + bag_dir = pkg_dir / subfolder + bag = bagit.Bag(str(bag_dir)) + try: + bag.validate(completeness_only=True) + LOGGER.info(f"{bag.path} is valid.") + except bagit.BagValidationError: + LOGGER.warning(f"{bag.path} is not valid. Check the bag manifest and oxum.") + return None + + +def validate_objects_bag(pkg_dir: Path) -> None: + validate_bag(pkg_dir, "objects") + + return None + + +def validate_images_bag(pkg_dir: Path) -> None: + validate_bag(pkg_dir, "images") + + return None + + +def validate_streams_bag(pkg_dir: Path) -> None: + validate_bag(pkg_dir, "streams") + + return None diff --git a/src/digarch_scripts/package/package_cloud.py b/src/digarch_scripts/package/package_cloud.py index 23e033a..e930370 100644 --- a/src/digarch_scripts/package/package_cloud.py +++ b/src/digarch_scripts/package/package_cloud.py @@ -1,156 +1,39 @@ import argparse -from datetime import date import logging import os -from pathlib import Path import re +from datetime import date +from pathlib import Path import bagit +import digarch_scripts.package.package_base as pb + LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.INFO) def parse_args() -> argparse.Namespace: - def extant_path(p: str) -> Path: - path = Path(p) - if not path.exists(): - raise argparse.ArgumentTypeError(f"{path} does not exist") - return path - - def digital_carrier_label(id: str) -> Path: - pattern = r"ACQ_\d{4}_\d{6}" - if not re.match(r"ACQ_\d{4}_\d{6}", id): - raise argparse.ArgumentTypeError( - f"{id} does not match the expected {type} pattern, {pattern}" - ) - return id - - parser = argparse.ArgumentParser(description="test") - parser.add_argument("--payload", required=True, type=extant_path) - parser.add_argument("--log", required=True, type=extant_path) - parser.add_argument("--md5", required=True, type=extant_path) - parser.add_argument("--dest", required=True, type=extant_path) - parser.add_argument("--id", required=True, type=digital_carrier_label) + parser = pb.TransferParser( + description="Create package for single cloud-based file-transfer." + ) + parser.add_carrierid() + parser.add_payload() + parser.add_log() + parser.add_md5() + parser.add_dest() return parser.parse_args() -def create_base_dir(dest: Path, id: str) -> Path: - acq_id = id.rsplit("_", 1)[0] - package_base = dest / acq_id / id - if package_base.exists(): - raise FileExistsError( - f"{package_base} already exists. Make sure you are using the correct ID" - ) - - try: - package_base.mkdir(parents=True) - except PermissionError: - raise PermissionError(f"{dest} is not writable") - return package_base - -def move_metadata_file(md_path: Path, pkg_dir: Path) -> None: - md_dir = pkg_dir / "metadata" - if not md_dir.exists(): - md_dir.mkdir() - - new_md_path = md_dir / md_path.name - if new_md_path.exists(): - raise FileExistsError(f"{new_md_path} already exists. Not moving.") - - md_path.rename(new_md_path) - return None - -def create_bag_in_objects(payload_path: Path, md5_path: Path, pkg_dir: Path) -> None: - bag_dir = pkg_dir / "objects" - bag_dir.mkdir() - move_payload(payload_path, bag_dir) - convert_to_bagit_manifest(md5_path, bag_dir) - # generate baginfo.txt and bagit.txt (copying code snippet from bagit) - create_bag_tag_files(bag_dir) - return None - -def move_payload(payload_path: Path, bag_dir: Path) -> None: - #instantiate a var for objects dir - payload_dir = bag_dir / "data" - #if the object folder does not exist create it - if not payload_dir.exists(): - payload_dir.mkdir(parents=True) - else: - raise FileExistsError(f"{payload_dir} already exists. Not moving files.") - - for a_file in payload_path.iterdir(): - new_ob_path = payload_dir / a_file.name - #if a payload file is already in the object directory do not move, raise error - if new_ob_path.exists(): - raise FileExistsError(f"{new_ob_path} already exists. Not moving.") - - a_file.rename(new_ob_path) - return None - -def convert_to_bagit_manifest(md5_path: Path, bag_dir: Path) -> None: - #check for manifest - new_md5_path = bag_dir / "manifest-md5.txt" - if new_md5_path.exists(): - raise FileExistsError("manifest-md5.txt already exists, review package") - - with open(md5_path, "r") as f: - manifest_data = f.readlines() - - updated_manifest = [ - line.replace(" ", " data/") for line in manifest_data - ] - #re-writes the manifest lines - with open(md5_path, "w") as f: - f.writelines(updated_manifest) - #move md5 file to manifest-md5.txt in bag - md5_path.rename(new_md5_path) - - return None - -def create_bag_tag_files(bag_dir: Path): - txt = """BagIt-Version: 0.97\nTag-File-Character-Encoding: UTF-8\n""" - with open(bag_dir / "bagit.txt", "w") as bagit_file: - bagit_file.write(txt) - - bag_info = {} - bag_info["Bagging-Date"] = date.strftime(date.today(), "%Y-%m-%d") - bag_info["Bag-Software-Agent"] = "package_cloud.py" - total_bytes, total_files = get_oxum(bag_dir / "data") - bag_info["Payload-Oxum"] = f"{total_bytes}.{total_files}" - bagit._make_tag_file(bag_dir / "bag-info.txt", bag_info) - - -def get_oxum(payload_dir: Path) -> (int, int): - total_bytes = 0 - total_files = 0 - - for payload_file in payload_dir.rglob('*'): - if payload_file.is_file(): - total_files += 1 - total_bytes += os.stat(payload_file).st_size - - return total_bytes, total_files - - -def validate_bag_in_payload(pkg_dir: Path) -> None: - bag_dir = pkg_dir / "objects" - bag = bagit.Bag(str(bag_dir)) - try: - bag.validate(completeness_only=True) - LOGGER.info(f"{bag.path} is valid.") - except bagit.BagValidationError: - LOGGER.warn(f"{bag.path} is not valid. Check the bag manifest and oxum.") - return None - def main(): args = parse_args() - base_dir = create_base_dir(args.dest, args.id) - move_metadata_file(args.log, base_dir) - create_bag_in_objects(args.payload, args.md5, base_dir) - validate_bag_in_payload(base_dir) + base_dir = pb.create_package_dir(args.dest, args.carrierid) + pb.move_metadata_file(args.log, base_dir) + pb.create_bag_in_objects(args.payload, base_dir, args.md5, "rclone") + pb.validate_objects_bag(base_dir) + if __name__ == "__main__": main() diff --git a/src/digarch_scripts/package/package_filetransfer.py b/src/digarch_scripts/package/package_filetransfer.py new file mode 100644 index 0000000..b7e7bf0 --- /dev/null +++ b/src/digarch_scripts/package/package_filetransfer.py @@ -0,0 +1,34 @@ +import argparse +import logging +import re +from pathlib import Path + +import digarch_scripts.package.package_base as pb + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.INFO) + + +def parse_args() -> argparse.Namespace: + parser = pb.TransferParser( + description="Create packages for all file transfer files for a single acquisition." + ) + parser.add_carrierid() + parser.add_payload() + parser.add_log() + parser.add_dest() + + return parser.parse_args() + + +def main(): + args = parse_args() + + base_dir = pb.create_package_dir(args.dest, args.carrierid) + pb.create_bag_in_objects(args.payload, base_dir, args.log, "rsync") + pb.validate_objects_bag(base_dir) + pb.move_metadata_file(args.log, base_dir) + + +if __name__ == "__main__": + main() diff --git a/src/digarch_scripts/package/package_images.py b/src/digarch_scripts/package/package_images.py new file mode 100644 index 0000000..7bab879 --- /dev/null +++ b/src/digarch_scripts/package/package_images.py @@ -0,0 +1,155 @@ +import argparse +import logging +import re +from pathlib import Path + +import digarch_scripts.package.package_base as pb + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.INFO) + +IMG_EXTS = [".001", ".img", ".dsk"] +LOG_EXTS = [".log"] +STREAM_EXTS = [""] + + +def parse_args() -> argparse.Namespace: + """ + Parse command line arguments. + :return: The parsed arguments. + """ + parser = pb.TransferParser( + description="Create packages for all disk imaging files for a single acquisition." + ) + parser.add_acqid() + parser.add_source() + parser.add_dest() + + return parser.parse_args() + + +def find_carriers_image_files( + acq_id: str, source_dir: Path, log_dir: Path = None, stream_dir: Path = None +) -> dict: + """ + Find all carrier files for a given acquisition ID in the source directory. + """ + + # Optional args kept in case process changes back to multiple source folders + if not log_dir: + log_dir = source_dir + if not stream_dir: + stream_dir = source_dir + + carrier_files = pb.find_category_of_carrier_files( + {}, acq_id, source_dir, IMG_EXTS, "images" + ) + carrier_files = pb.find_category_of_carrier_files( + carrier_files, acq_id, log_dir, LOG_EXTS, "logs" + ) + carrier_files = pb.find_category_of_carrier_files( + carrier_files, acq_id, stream_dir, STREAM_EXTS, "streams" + ) + + if not carrier_files: + raise Warning(f"No files found with the acquisition ID {acq_id} in filename") + + return carrier_files + + +def validate_carriers_image_files(carrier_files: dict) -> bool: + """ + Validate that all required files are present for each carrier. + """ + result = True + for carrier_name in carrier_files: + carrier = carrier_files[carrier_name] + + missing = [] + for key in ["images", "logs"]: + if not key in carrier.keys(): + missing.append(key) + + if missing: + LOGGER.warning( + f'The following required categories of files were not found for {carrier_name}: {", ".join(missing)} ' + ) + result = False + + if "images" in carrier: + if len(carrier["images"]) > 1: + two_sided = True + for image in carrier["images"]: + if not re.match(r"s\d\.001", image.name[-6:]): + two_sided = False + if not two_sided: + LOGGER.warning( + f'Multiple image files found for {carrier_name}. Only 1 allowed. If carrier has 2 disk formats, file names must end with s0.001 or s1.001: {carrier["images"]}' + ) + result = False + + for image_file in carrier["images"]: + if image_file.stat().st_size == 0: + LOGGER.warning(f"The following image file is 0-bytes: {image_file}") + result = False + + if "streams" in carrier: + if not len(carrier["streams"]) == 1: + LOGGER.warning( + f'Multiple folders of streams found for {carrier_name}. Only 1 allowed: {carrier["streams"]}' + ) + result = False + if not list(carrier["streams"][0].iterdir()): + LOGGER.warning( + f'Streams folder for {carrier_name} appears to be empty: {carrier["streams"][0]}' + ) + result = False + for child in carrier["streams"][0].iterdir(): + if child.is_dir(): + LOGGER.warning( + f"Folders found with streams folder for {carrier_name}. None allowed: {child}" + ) + result = False + + return result + + +def package_carriers_image_files(carrier_files: dict, acq_dir: Path) -> None: + """ + Create packages for all carriers in the carrier_files dictionary. + """ + for carrier, files in carrier_files.items(): + try: + base_dir = pb.create_package_dir(acq_dir, carrier) + pb.move_metadata_files(files["logs"], base_dir) + pb.create_bag_in_images(files["images"], base_dir) + pb.create_bag_in_streams(files["streams"][0], base_dir) + except Exception as e: + LOGGER.error( + f"Packaging incomplete for {carrier}. Address warnings manually.\n{e}" + ) + finally: + pb.validate_images_bag(base_dir) + pb.validate_streams_bag(base_dir) + + return None + + +def main(): + """ + Main function for packaging images. + """ + args = parse_args() + + carrier_files = find_carriers_image_files(args.acqid, args.source) + + if validate_carriers_image_files(carrier_files): + package_carriers_image_files(carrier_files, args.dest) + else: + LOGGER.error( + "1 or more errors with files for a carrier. Please address warnings and re-run" + ) + + +if __name__ == "__main__": + main() diff --git a/src/digarch_scripts/report/report_ftk_extents.py b/src/digarch_scripts/report/report_ftk_extents.py index f001f36..e6ab6e3 100644 --- a/src/digarch_scripts/report/report_ftk_extents.py +++ b/src/digarch_scripts/report/report_ftk_extents.py @@ -1,84 +1,76 @@ -from lxml import etree -import json -import re import argparse +import json +import logging import os import pathlib -import logging +import re + +from lxml import etree LOGGER = logging.getLogger(__name__) # Namespace for the FTK output XML -FO_NAMESPACE = {'fo': 'http://www.w3.org/1999/XSL/Format'} +FO_NAMESPACE = {"fo": "http://www.w3.org/1999/XSL/Format"} def _make_parser(): - def validate_file_input(f) -> pathlib.Path: - ''' + """ Ensure the input file exists - ''' + """ path = pathlib.Path(f) if not path.exists(): - raise argparse.ArgumentTypeError( - f'Directory or file does not exist: {f}' - ) + raise argparse.ArgumentTypeError(f"Directory or file does not exist: {f}") - if not path.suffix.lower() in ['.xml', '.fo']: + if not path.suffix.lower() in [".xml", ".fo"]: raise argparse.ArgumentTypeError( - 'Not a valid file type. Expect .xml or .fo' + "Not a valid file type. Expect .xml or .fo" ) return path def validate_output_dir(f) -> pathlib.Path: - path = pathlib.Path(f) if not path.exists(): - raise argparse.ArgumentTypeError( - f'Output directory does not exist: {f}' - ) + raise argparse.ArgumentTypeError(f"Output directory does not exist: {f}") return path - parser = argparse.ArgumentParser( - description='Create a JSON report from XML' - ) + parser = argparse.ArgumentParser(description="Create a JSON report from XML") parser.add_argument( - '-f', '--file', + "-f", + "--file", help="path to FTK XML report", type=validate_file_input, - required=True + required=True, ) parser.add_argument( - '-o', '--output', + "-o", + "--output", help="destination directory", type=validate_output_dir, - required=True + required=True, ) return parser.parse_args() -def create_er_list( - tree: etree.ElementTree -) -> list[list[list[str], str, str]]: - - ''' +def create_er_list(tree: etree.ElementTree) -> list[list[list[str], str, str]]: + """ This transforms the table of contents into a list of lists where each list item has the hierarchy of titles and a reference-id. This list is the intermediate data structure used to build the nested dict. The function returns the entire list. - ''' + """ tree = tree.xpath( '/fo:root/fo:page-sequence[@master-reference="TOC"]/fo:flow', - namespaces=FO_NAMESPACE + namespaces=FO_NAMESPACE, )[0] ers = [] @@ -89,27 +81,23 @@ def create_er_list( continue indent = int(child.get("start-indent").split(sep="pt")[0]) - level = (indent//12) - 2 + level = (indent // 12) - 2 if level >= 0: # build a list of parents based on level if level <= len(hierarchy) - 1: hierarchy = hierarchy[:level] elif level > len(hierarchy) + 1: - raise ValueError( - f'Unexpected jump in hierarchy at {child.text}' - ) + raise ValueError(f"Unexpected jump in hierarchy at {child.text}") hierarchy.append(child.text) # only record if entry is an ER possible_ref = child.xpath( - 'fo:basic-link/fo:page-number-citation', namespaces=FO_NAMESPACE + "fo:basic-link/fo:page-number-citation", namespaces=FO_NAMESPACE ) - if possible_ref and hierarchy[-1].startswith('ER'): - refid = possible_ref[0].get('ref-id') - ers.append( - [hierarchy.copy(), refid, hierarchy[-1]] - ) + if possible_ref and hierarchy[-1].startswith("ER"): + refid = possible_ref[0].get("ref-id") + ers.append([hierarchy.copy(), refid, hierarchy[-1]]) audit_ers(ers) @@ -119,11 +107,11 @@ def create_er_list( def audit_ers(ers: list[list[list[str], str, str]]) -> None: er_numbers_used = {} for er in ers: - number = re.match(r'ER (\d+):', er[2]) + number = re.match(r"ER (\d+):", er[2]) if not number: LOGGER.warning( - f'ER is missing a number: {er[2]}. Review the ERs with the processing archivist' + f"ER is missing a number: {er[2]}. Review the ERs with the processing archivist" ) er_number = 0 else: @@ -140,7 +128,7 @@ def audit_ers(ers: list[list[list[str], str, str]]) -> None: for i in range(er_min, er_max): if i not in er_numbers_used.keys(): LOGGER.warning( - f'Collection uses ER {er_min} to ER {er_max}. ER {i} is skipped. Review the ERs with the processing archivist' + f"Collection uses ER {er_min} to ER {er_max}. ER {i} is skipped. Review the ERs with the processing archivist" ) # test for duplicate ers @@ -153,51 +141,46 @@ def audit_ers(ers: list[list[list[str], str, str]]) -> None: return None -def transform_bookmark_tables( - tree: etree.ElementTree -) -> list[dict]: - - ''' +def transform_bookmark_tables(tree: etree.ElementTree) -> list[dict]: + """ transforms each row in the 'bookmarksPage' table into a string. this string contains all the extent information that will be summarized later. the return is a list of lists where the first item is the id with the prefix bk and the second item is a string serialized from the XML. - ''' + """ extent_tree = tree.xpath( '/fo:root/fo:page-sequence[@master-reference="bookmarksPage"]/fo:flow/fo:table[@id]', - namespaces=FO_NAMESPACE + namespaces=FO_NAMESPACE, ) bookmark_contents = [] for row in extent_tree: # row is an /fo:row in /fo:table[@id] file_table = row.xpath( - './fo:table-body/fo:table-row/fo:table-cell/fo:block', - namespaces=FO_NAMESPACE + "./fo:table-body/fo:table-row/fo:table-cell/fo:block", + namespaces=FO_NAMESPACE, ) file_dict = { file_table[i].text: file_table[i + 1].text for i in range(0, len(file_table), 2) } - file_dict['file_id'] = row.get('id') - file_dict['bookmark_id'] = row.get('id').split('_')[0] + file_dict["file_id"] = row.get("id") + file_dict["bookmark_id"] = row.get("id").split("_")[0] bookmark_contents.append(file_dict) return bookmark_contents def add_extents_to_ers( - er_list: list[list[list[str], str, str]], - bookmark_tables: list[dict] + er_list: list[list[list[str], str, str]], bookmark_tables: list[dict] ) -> list[list[str, int, int]]: - - ''' + """ summarizes the extent for each ER by correlating the table of contents with the bookmark tables. Returns list of lists with hierarchal ER string, file size, and file count. - ''' + """ ers_with_extents = [] @@ -208,11 +191,13 @@ def add_extents_to_ers( if count == 0: LOGGER.warning( - f'{er_name} does not contain any files. It will be omitted from the report.') + f"{er_name} does not contain any files. It will be omitted from the report." + ) continue if size == 0: LOGGER.warning( - f'{er_name} contains no files with bytes. This ER is omitted from report. Review this ER with the processing archivist.') + f"{er_name} contains no files with bytes. This ER is omitted from report. Review this ER with the processing archivist." + ) continue ers_with_extents.append([er[0], size, count]) @@ -221,34 +206,31 @@ def add_extents_to_ers( def get_er_report( - er_files: list[dict], - bookmark_id: str, - er_name: str + er_files: list[dict], bookmark_id: str, er_name: str ) -> tuple[int, int]: - - ''' + """ extract the total file size and file count for a given bookmark ID Returns a tuple with the file size and file count. - ''' + """ size = 0 count = 0 - prefix = bookmark_id.replace('k', 'f') + prefix = bookmark_id.replace("k", "f") for entry in er_files: - if entry['bookmark_id'] == prefix: - - byte_string = entry['Logical Size'] - bytes = re.findall(r'(\d+)\sB', byte_string) + if entry["bookmark_id"] == prefix: + byte_string = entry["Logical Size"] + bytes = re.findall(r"(\d+)\sB", byte_string) if bytes: count += 1 file_size = int(bytes[0]) if file_size == 0: - file_name = entry['Name'] - #extract file name, might have to parse file table better + file_name = entry["Name"] + # extract file name, might have to parse file table better LOGGER.warning( - f'{er_name} contains the following 0-byte file: {file_name}. Review this file with the processing archivist.') + f"{er_name} contains the following 0-byte file: {file_name}. Review this file with the processing archivist." + ) size += file_size else: @@ -257,93 +239,86 @@ def get_er_report( return size, count -def create_report( - input: list[list[str], int, int], - report: dict -) -> dict: - - ''' +def create_report(input: list[list[str], int, int], report: dict) -> dict: + """ recursive function to insert a given bookmark into a nested dictionary based on the hierarchy of component titles. Returns a nested dictionary - ''' + """ if len(input[0]) == 1: - number, name = input[0][0].split(':', maxsplit=1) - report['children'].append({ - 'title': input[0][0], - 'er_number': number, - 'er_name': name.strip(), - 'file_size': input[1], - 'file_count': input[2] - }) + number, name = input[0][0].split(":", maxsplit=1) + report["children"].append( + { + "title": input[0][0], + "er_number": number, + "er_name": name.strip(), + "file_size": input[1], + "file_count": input[2], + } + ) else: parent, child = input[0][0], input[0][1:] input[0] = child - for item in report['children']: - if item['title'] == parent: + for item in report["children"]: + if item["title"] == parent: item = create_report(input, item) return report - report['children'].append( - create_report(input, {'title': parent, 'children': []}) + report["children"].append( + create_report(input, {"title": parent, "children": []}) ) return report -def extract_collection_title( - tree: etree.ElementTree - ) -> str: +def extract_collection_title(tree: etree.ElementTree) -> str: case_info = tree.xpath( - '/fo:root/fo:page-sequence[@master-reference="caseInfoPage"]/fo:flow/fo:table'\ - '/fo:table-body/fo:table-row/fo:table-cell/fo:block/text()', - namespaces=FO_NAMESPACE + '/fo:root/fo:page-sequence[@master-reference="caseInfoPage"]/fo:flow/fo:table' + "/fo:table-body/fo:table-row/fo:table-cell/fo:block/text()", + namespaces=FO_NAMESPACE, ) for i, txt in enumerate(case_info): if txt == "Case Name": - collname = case_info[i+1] + collname = case_info[i + 1] return collname -def make_json( - destination: pathlib.Path, - report: dict, - collname -) -> None: - ''' +def make_json(destination: pathlib.Path, report: dict, collname) -> None: + """ creates a json file with the name of the collection as the file name destination is the file path from args parse and report is the collection style dict - ''' + """ name = collname name = name.replace(" ", "_") - with open(os.path.join(destination, f'{name}.json'), 'w') as file: + with open(os.path.join(destination, f"{name}.json"), "w") as file: json.dump(report, file) def main() -> None: args = _make_parser() - print('Parsing XML ...') + print("Parsing XML ...") tree = etree.parse(args.file) - print('Creating report ...') + print("Creating report ...") ers = create_er_list(tree) bookmark_tables = transform_bookmark_tables(tree) ers_with_extents = add_extents_to_ers(ers, bookmark_tables) colltitle = extract_collection_title(tree) - dct = {'title': colltitle, 'children': []} + dct = {"title": colltitle, "children": []} for er in ers_with_extents: dct = create_report(er, dct) print("Writing report ...") make_json(args.output, dct, colltitle) -if __name__ == '__main__': + +if __name__ == "__main__": main() diff --git a/src/digarch_scripts/report/report_hdd_extents.py b/src/digarch_scripts/report/report_hdd_extents.py index 4223ebd..5f1d66f 100644 --- a/src/digarch_scripts/report/report_hdd_extents.py +++ b/src/digarch_scripts/report/report_hdd_extents.py @@ -1,63 +1,56 @@ import argparse -import os import json -import pathlib import logging +import os +import pathlib import re + LOGGER = logging.getLogger(__name__) + def parse_args(): parser = argparse.ArgumentParser() - def validate_dir( - d: str - ) -> pathlib.Path: + def validate_dir(d: str) -> pathlib.Path: path = pathlib.Path(d) if not path.exists(): - raise argparse.ArgumentTypeError( - f'Specified directory does not exist: {d}' - ) + raise argparse.ArgumentTypeError(f"Specified directory does not exist: {d}") if not path.is_dir(): - raise argparse.ArgumentTypeError( - f'Specified path is not a directory: {d}' - ) + raise argparse.ArgumentTypeError(f"Specified path is not a directory: {d}") return path def validate_output_dir(f) -> pathlib.Path: - path = pathlib.Path(f) if not path.exists(): - raise argparse.ArgumentTypeError( - f'Output directory does not exist: {f}' - ) + raise argparse.ArgumentTypeError(f"Output directory does not exist: {f}") return path parser.add_argument( - "-d", "--dir", + "-d", + "--dir", type=validate_dir, help="Path to the parent directory, e.g. M###_FAComponents", - required = True + required=True, ) parser.add_argument( - '-o', '--output', + "-o", + "--output", help="report destination directory", type=validate_output_dir, - required=True + required=True, ) return parser.parse_args() -def get_ers( - facomponent_dir: pathlib.Path -) -> list[str, int, int, str]: +def get_ers(facomponent_dir: pathlib.Path) -> list[str, int, int, str]: ers = [] - for possible_er in facomponent_dir.glob('**/ER *'): - objects_dir = possible_er.joinpath('objects') + for possible_er in facomponent_dir.glob("**/ER *"): + objects_dir = possible_er.joinpath("objects") if possible_er.is_dir(): if objects_dir.is_dir(): er = possible_er.relative_to(facomponent_dir) @@ -69,41 +62,47 @@ def get_ers( fp = os.path.join(path, f) if os.path.getsize(fp) == 0: LOGGER.warning( - f'{possible_er.name} contains the following 0-byte file: {f}. Review this file with the processing archivist.') + f"{possible_er.name} contains the following 0-byte file: {f}. Review this file with the processing archivist." + ) size += os.path.getsize(fp) else: LOGGER.warning( - f'{possible_er.name} does not contain an object folder. It will be omitted from the report.') + f"{possible_er.name} does not contain an object folder. It will be omitted from the report." + ) continue if count == 0: LOGGER.warning( - f'{possible_er.name} does not contain any files. It will be omitted from the report.') + f"{possible_er.name} does not contain any files. It will be omitted from the report." + ) continue if size == 0: LOGGER.warning( - f'{possible_er.name} contains no files with bytes. This ER is omitted from report. Review this ER with the processing archivist.') + f"{possible_er.name} contains no files with bytes. This ER is omitted from report. Review this ER with the processing archivist." + ) continue ers.append([str(er), size, count, possible_er.name]) return ers + def extract_collection_title(hdd_dir: pathlib.Path) -> str: for item in hdd_dir.iterdir(): - if re.match(r'M\d+\_FAcomponents', item.name): + if re.match(r"M\d+\_FAcomponents", item.name): return item.name else: LOGGER.warning( - 'Cannot find CollectionID_FAcomponents directory. Please use CollectionID_FAcomponents naming convention for the directory containing all ERs.' + "Cannot find CollectionID_FAcomponents directory. Please use CollectionID_FAcomponents naming convention for the directory containing all ERs." ) + def audit_ers(ers: list[list[str, str, str]]) -> None: er_numbers_used = {} for er in ers: - number = re.match(r'ER (\d+)', er[3]) + number = re.match(r"ER (\d+)", er[3]) if not number: LOGGER.warning( - f'ER is missing a number: {er[3]}. Review the ERs with the processing archivist' + f"ER is missing a number: {er[3]}. Review the ERs with the processing archivist" ) er_number = 0 else: @@ -120,7 +119,7 @@ def audit_ers(ers: list[list[str, str, str]]) -> None: for i in range(er_min, er_max): if i not in er_numbers_used.keys(): LOGGER.warning( - f'Collection uses ER {er_min} to ER {er_max}. ER {i} is skipped. Review the ERs with the processing archivist' + f"Collection uses ER {er_min} to ER {er_max}. ER {i} is skipped. Review the ERs with the processing archivist" ) # test for duplicate ers @@ -133,66 +132,60 @@ def audit_ers(ers: list[list[str, str, str]]) -> None: return None -def create_report( - input: list[list[str, int, int]], - report: dict -) -> dict: +def create_report(input: list[list[str, int, int]], report: dict) -> dict: for er in input: report = process_item(er, report) return report -def process_item( - input: list[str, int, int], - report: dict -) -> dict: - if not '/' in input[0]: - parts = re.match(r'(ER \d+)\s(.*)', input[0]) - report['children'].append({ - 'title': input[0], - 'er_number': parts.group(1), - 'er_name': parts.group(2), - 'file_size': input[1], - 'file_count': input[2] - }) +def process_item(input: list[str, int, int], report: dict) -> dict: + if not "/" in input[0]: + parts = re.match(r"(ER \d+)\s(.*)", input[0]) + report["children"].append( + { + "title": input[0], + "er_number": parts.group(1), + "er_name": parts.group(2), + "file_size": input[1], + "file_count": input[2], + } + ) else: - parent, child = input[0].split('/', maxsplit=1) + parent, child = input[0].split("/", maxsplit=1) input[0] = child - for item in report['children']: - if item['title'] == parent: + for item in report["children"]: + if item["title"] == parent: item = process_item(input, item) return report - report['children'].append( - process_item(input, {'title': parent, 'children': []}) + report["children"].append( + process_item(input, {"title": parent, "children": []}) ) return report -def write_report( - report: dict, - dest: pathlib.Path -) -> None: - with open(dest, 'w') as f: + +def write_report(report: dict, dest: pathlib.Path) -> None: + with open(dest, "w") as f: json.dump(report, f) + def main(): args = parse_args() - LOGGER.info('retrieving ER folder paths') + LOGGER.info("retrieving ER folder paths") ers = get_ers(args.dir) - LOGGER.info('creating report') + LOGGER.info("creating report") colltitle = extract_collection_title(args.dir) - stub_report = {'title': colltitle, 'children': []} + stub_report = {"title": colltitle, "children": []} full_report = create_report(ers, stub_report) - - LOGGER.info('writing report') - report_file = args.output.joinpath(f'{colltitle}.json') + LOGGER.info("writing report") + report_file = args.output.joinpath(f"{colltitle}.json") write_report(full_report, report_file) -if __name__=="__main__": +if __name__ == "__main__": main() diff --git a/src/digarch_scripts/report/report_transfers.py b/src/digarch_scripts/report/report_transfers.py new file mode 100644 index 0000000..5fd817f --- /dev/null +++ b/src/digarch_scripts/report/report_transfers.py @@ -0,0 +1,158 @@ +import csv +import logging +from datetime import date +from pathlib import Path + +from digarch_scripts.package import package_base + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.INFO) + + +def parse_args(): + """ + Parse command line arguments. + + :return: The parsed arguments. + """ + parser = package_base.TransferParser( + description="Report on the transfers in a directory." + ) + parser.add_acqid() + parser.add_transfers() + parser.add_dest() + + return parser.parse_args() + + +def collect_stats(transfer_path: Path) -> list[date, int, int, int, int]: + """ + Collects statistics about the transfers in the given directory. + + :param path: Path to the directory containing the transfer files. + :return: A tuple containing the date of transfer, number of image files, + cumulative size of image files, number of object files, and + cumulative size of object files. + """ + + # initialize the image and object statistics + image_date = None + image_stats = [] + object_date = None + object_stats = [] + + # Iterate over the files in the directory. + for path in transfer_path.iterdir(): + # Skip directories. + if path.name == "images": + image_date, image_stats = collect_bag_stats(path) + elif path.name == "objects": + object_date, object_stats = collect_bag_stats(path) + else: + continue + + # Return the statistics + stats_stub = transfer_path.name.rsplit("_", 1) + + if not object_stats and not image_stats: + LOGGER.info(f"No images or objects found for {transfer_path}.") + return None + else: + if image_date: + stats_stub.append(image_date) + else: + stats_stub.append(object_date) + stats_stub.extend(image_stats if image_stats else [0, 0]) + stats_stub.extend(object_stats if object_stats else [0, 0]) + + return stats_stub + + +def collect_bag_stats(bag_path: Path) -> tuple[date, list[int, int]]: + """ + Collects statistics from a bag in the given directory. + + :param path: Path to the directory containing the object transfer files. + :return: A tuple containing the date of the transfers and a list of the + number of files and cumulative size of files. + """ + + # Initialize the statistics + bagdate = None + size = 0 + files = 0 + + # Check that image_path is a bag + possible_bag_info = bag_path / "bag-info.txt" + if not possible_bag_info.exists(): + LOGGER.warning(f"Directory should be formatted as a bag: {bag_path}") + return None + + else: + with open(possible_bag_info, "r") as bag_info: + for line in bag_info: + if line.startswith("Bagging-Date:"): + bagdate = date.fromisoformat(line.split(":")[1].strip()) + elif line.startswith("Payload-Oxum:"): + size, files = line.split(":")[1].strip().split(".") + + if not bagdate: + LOGGER.warning(f"Bagging date not found in {possible_bag_info}") + return None + + if not size or not files: + LOGGER.warning(f"Bagging size or files not found in {possible_bag_info}") + return None + + return bagdate, [int(files), int(size)] + + +def write_stats(stats: list, dest: Path, acqid: str) -> None: + """ + Write the statistics to a report file. + + :param stats: A list of lists containing the date of transfer, number of image files, + cumulative size of image files, number of object files, and cumulative size of object files. + :param dest: The destination directory for the report. + :param acqid: The acquisition ID. + """ + with open(dest / f"{acqid}_transfer_report.txt", "w") as report: + writer = csv.writer(report) + writer.writerow( + ["acquisition_id", "object_id", "date", "image_files", "image_size", "object_files", "object_size"] + ) + writer.writerows(stats) + + return None + + +def main(): + """ + Main function for reporting on transfers. + + Collects statistics on the transfers in the given directory and writes them to a report file. + """ + args = parse_args() + + acq_folder = args.transfers / args.acqid + + if not acq_folder.exists(): + LOGGER.error(f"Transfer folder not found: {acq_folder}") + return + else: + all_stats = [] + for transfer in acq_folder.iterdir(): + stats = collect_stats(transfer) + if stats: + LOGGER.info(stats) + all_stats.append(stats) + else: + LOGGER.warning(f"No stats found for {transfer}") + + write_stats(all_stats, args.dest, args.acqid) + + return None + + +if __name__ == "__main__": + main() diff --git a/src/digarch_scripts/transfer/transfer_rsync.py b/src/digarch_scripts/transfer/transfer_rsync.py new file mode 100644 index 0000000..9cccc95 --- /dev/null +++ b/src/digarch_scripts/transfer/transfer_rsync.py @@ -0,0 +1,113 @@ +import argparse +import logging +import re +import subprocess +from pathlib import Path + +import digarch_scripts.package.package_base as pb + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.INFO) + + +def parse_args() -> argparse.Namespace: + parser = pb.TransferParser( + description="Create packages for all file transfer files for a single acquisition." + ) + parser.add_carrierid() + parser.add_source() + parser.add_dest() + parser.add_quiet(help="Suppresses progress bar from rsync") + + return parser.parse_args() + + +def run_rsync(source: Path, dest: Path, quiet: bool = None) -> None: + log_folder = dest / "metadata" + log_folder.mkdir() + log_file = log_folder / f"{dest.name}_rsync.log" + objects_folder = dest / "objects" + objects_folder.mkdir() + + cmd = [ + "rsync", + "-arP", + f"{source}/", + objects_folder / "data", + "--checksum-choice=md5", + f"--log-file={log_file}", + "--log-file-format=, %l, %C, %f", + ] + + if quiet: + cmd.append("-q") + + process = subprocess.run(cmd) + + if process.returncode != 0: + LOGGER.warning( + "Transfer did not complete successfully. Delete transferred files and re-run" + ) + + return + + +def create_bag_files_in_objects(base_dir: Path, rsync_log: Path, source: Path): + objects_dir = base_dir / "objects" + pb.create_bag_tag_files(objects_dir) + pb.convert_rsync_log_to_bagit_manifest(rsync_log, objects_dir, source) + + +def run_disktype(source: Path, dest: Path) -> None: + # determine device to unmount and run disktype on + if not source.is_mount(): + LOGGER.info(f"Disktype log cannot be generated for a folder. Skipping") + return + + output = subprocess.check_output(["df", source]).decode("utf8") + device = re.search(r"(/dev/[a-z0-9]+)", output).group(0) + parent_device = re.search(r"(/dev/[a-z]+\d)", device).group(0) + + LOGGER.info( + f"Dismounting device {device} in order to run disktype, may require password for sudo" + ) + process = subprocess.run(["diskutil", "unmount", device]) + + if process.returncode != 0: + LOGGER.warning( + f"Unable to dismount {source}. Disktype report not generated. Create manually" + ) + return + + output = subprocess.check_output(["sudo", "disktype", parent_device]).decode("utf8") + + LOGGER.info(f"Output from disktype: {output}") + metadata_folder = dest / "metadata" + if not metadata_folder.exists(): + metadata_folder.mkdir() + with open(dest / "metadata" / f"{dest.name}_disktype.log", "w") as f: + f.write(output) + + # remount + subprocess.run(["diskutil", "mount", device]) + LOGGER.info("Device remounted") + + return + + +def main(): + args = parse_args() + + base_dir = pb.create_package_dir(args.dest, args.carrierid) + + run_rsync(args.source, base_dir, args.quiet) + rsync_log = base_dir / "metadata" / f"{base_dir.name}_rsync.log" + create_bag_files_in_objects(base_dir, rsync_log, args.source) + + run_disktype(args.source, base_dir) + + pb.validate_objects_bag(base_dir) + + +if __name__ == "__main__": + main() diff --git a/tests/fixtures/image/ACQ_1234_123456.img b/tests/fixtures/image/ACQ_1234_123456.img new file mode 100644 index 0000000..2e65efe --- /dev/null +++ b/tests/fixtures/image/ACQ_1234_123456.img @@ -0,0 +1 @@ +a \ No newline at end of file diff --git a/tests/fixtures/image/ACQ_1234_123456/ACQ_1234_123456.001 b/tests/fixtures/image/ACQ_1234_123456/ACQ_1234_123456.001 new file mode 100644 index 0000000..e69de29 diff --git a/tests/fixtures/image/ACQ_1234_123456/ACQ_1234_123456.002 b/tests/fixtures/image/ACQ_1234_123456/ACQ_1234_123456.002 new file mode 100644 index 0000000..e69de29 diff --git a/tests/fixtures/image/ACQ_1234_123456_process1.log b/tests/fixtures/image/ACQ_1234_123456_process1.log new file mode 100644 index 0000000..2e65efe --- /dev/null +++ b/tests/fixtures/image/ACQ_1234_123456_process1.log @@ -0,0 +1 @@ +a \ No newline at end of file diff --git a/tests/fixtures/image/ACQ_1234_123456_process2.log b/tests/fixtures/image/ACQ_1234_123456_process2.log new file mode 100644 index 0000000..63d8dbd --- /dev/null +++ b/tests/fixtures/image/ACQ_1234_123456_process2.log @@ -0,0 +1 @@ +b \ No newline at end of file diff --git a/tests/fixtures/image/ACQ_1234_123457.img b/tests/fixtures/image/ACQ_1234_123457.img new file mode 100644 index 0000000..63d8dbd --- /dev/null +++ b/tests/fixtures/image/ACQ_1234_123457.img @@ -0,0 +1 @@ +b \ No newline at end of file diff --git a/tests/fixtures/image/ACQ_1234_123457/ACQ_1234_123457.001 b/tests/fixtures/image/ACQ_1234_123457/ACQ_1234_123457.001 new file mode 100644 index 0000000..e69de29 diff --git a/tests/fixtures/image/ACQ_1234_123457_process21.log b/tests/fixtures/image/ACQ_1234_123457_process21.log new file mode 100644 index 0000000..63d8dbd --- /dev/null +++ b/tests/fixtures/image/ACQ_1234_123457_process21.log @@ -0,0 +1 @@ +b \ No newline at end of file diff --git a/tests/fixtures/rsync/rsync.log b/tests/fixtures/rsync/rsync.log new file mode 100644 index 0000000..38e62fa --- /dev/null +++ b/tests/fixtures/rsync/rsync.log @@ -0,0 +1,5 @@ +2024/05/30 16:20:36 [59347] building file list +2024/05/30 16:20:36 [59347] , 3072, d5116a5a40aab468780a3c03b417a8ac, Users/fortitude/dev/digarch-scripts-poetry/tests/fixtures/rsync/rsync_files/file.01 +2024/05/30 16:20:36 [59347] , 96, , Users/fortitude/dev/digarch-scripts-poetry/tests/fixtures/rsync/rsync_files/folder +2024/05/30 16:20:36 [59347] , 7168, 379bc3d1e529f9645bab5482bbd4ac98, Users/fortitude/dev/digarch-scripts-poetry/tests/fixtures/rsync/rsync_files/folder/file.02 +2024/05/30 16:20:37 [59347] sent 10516 bytes received 75 bytes total size 10240 diff --git a/tests/fixtures/rsync/rsync_files.dmg b/tests/fixtures/rsync/rsync_files.dmg new file mode 100644 index 0000000..58e8d82 Binary files /dev/null and b/tests/fixtures/rsync/rsync_files.dmg differ diff --git a/tests/fixtures/rsync/rsync_files/file.01 b/tests/fixtures/rsync/rsync_files/file.01 new file mode 100644 index 0000000..b70f455 Binary files /dev/null and b/tests/fixtures/rsync/rsync_files/file.01 differ diff --git a/tests/fixtures/rsync/rsync_files/folder/file.02 b/tests/fixtures/rsync/rsync_files/folder/file.02 new file mode 100644 index 0000000..3ccaf24 Binary files /dev/null and b/tests/fixtures/rsync/rsync_files/folder/file.02 differ diff --git a/tests/test_lint_ft.py b/tests/test_lint_ft.py index 8d9eb9b..0b536e2 100644 --- a/tests/test_lint_ft.py +++ b/tests/test_lint_ft.py @@ -4,6 +4,7 @@ import digarch_scripts.lint.lint_ft as lint_ft + # Unit tests # Argument tests def test_package_argument(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): @@ -30,6 +31,7 @@ def test_directory_argument(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): assert child_dir in args.packages + # linting tests @pytest.fixture def good_package(tmp_path: Path): @@ -37,8 +39,7 @@ def good_package(tmp_path: Path): f_object_data = pkg / "objects" / "data" f_object_data.mkdir(parents=True) - bag_files = ["bag-info.txt", "bagit.txt", - "manifest-md5.txt", "tagmanifest-md5.txt"] + bag_files = ["bag-info.txt", "bagit.txt", "manifest-md5.txt", "tagmanifest-md5.txt"] for f in bag_files: filepath = pkg / "objects" / f filepath.touch() @@ -64,6 +65,7 @@ def good_package(tmp_path: Path): return pkg + def test_top_folder_valid_name(good_package): """Top level folder name has to conform to ACQ_####_######""" result = lint_ft.package_has_valid_name(good_package) @@ -81,12 +83,14 @@ def test_top_folder_invalid_name(good_package): assert not result + def test_package_has_two_subfolders(good_package): """Second level folders must be two""" result = lint_ft.package_has_two_subfolders(good_package) assert result + def test_package_does_not_have_two_subfolders(good_package): """Test that package fails function when second level folders are not the correct number, i.e. 2""" @@ -98,6 +102,7 @@ def test_package_does_not_have_two_subfolders(good_package): assert not result + def test_sec_level_folder_valid_names(good_package): """Second level folders must only have objects and metadata folder""" result = lint_ft.package_has_valid_subfolder_names(good_package) @@ -116,12 +121,14 @@ def test_sec_level_folder_invalid_names(good_package): assert not result + def test_package_has_no_hidden_file(good_package): """The package should not have any hidden file""" result = lint_ft.package_has_no_hidden_file(good_package) assert result + def test_package_has_hidden_file(good_package): """Test that package fails function when there is any hidden file""" bad_package = good_package @@ -134,12 +141,14 @@ def test_package_has_hidden_file(good_package): assert not result + def test_package_has_no_zero_bytes_file(good_package): """The package should not have any zero bytes file""" result = lint_ft.package_has_no_zero_bytes_file(good_package) assert result + def test_package_has_zero_bytes_file(good_package): """Test that package fails function when there is any zero bytes file""" bad_package = good_package @@ -150,6 +159,7 @@ def test_package_has_zero_bytes_file(good_package): assert not result + def test_metadata_folder_is_flat(good_package): """The metadata folder should not have folder structure""" result = lint_ft.metadata_folder_is_flat(good_package) @@ -168,12 +178,14 @@ def test_metadata_folder_has_random_folder(good_package): assert not result + def test_metadata_folder_has_files(good_package): """The metadata folder should have one or more file""" result = lint_ft.metadata_folder_has_files(good_package) assert result + def test_metadata_folder_empty(good_package): """Test that package fails function when the metadata does not have any files""" @@ -185,12 +197,14 @@ def test_metadata_folder_empty(good_package): assert not result + def test_metadata_has_correct_naming_convention(good_package): """The metadata file name should be in the accepted list""" result = lint_ft.metadata_has_correct_naming_convention(good_package) assert result + def test_metadata_has_incorrect_naming_convention(good_package): """Test that package fails function when metadata file(s) has incorrect naming conventions""" @@ -202,6 +216,7 @@ def test_metadata_has_incorrect_naming_convention(good_package): assert not result + def test_objects_folder_correct_structure(good_package): """objects folder should have a data folder, which includes four files: bag-info.txt, bagit.txt, manifest-md5.txt and tagmanifest-md5.txt""" @@ -209,6 +224,7 @@ def test_objects_folder_correct_structure(good_package): assert result + def test_objects_folder_incorrect_structure(good_package): """Test that package fails function if it does not have the data folder, or missing any of the four files: bag-info.txt, bagit.txt, manifest-md5.txt @@ -221,12 +237,14 @@ def test_objects_folder_incorrect_structure(good_package): assert not result + def test_objects_folder_has_no_empty_folder(good_package): """The objects folder should not have any empty folders""" result = lint_ft.objects_folder_has_no_empty_folder(good_package) assert result + def test_objects_folder_has_empty_folder(good_package): """Test that package fails function if its objects folder has empty folder(s)""" bad_package = good_package @@ -238,12 +256,14 @@ def test_objects_folder_has_empty_folder(good_package): assert not result + def test_valid_package(good_package): """Test that package returns 'valid' when all tests are passed""" result = lint_ft.lint_package(good_package) assert result == "valid" + def test_invalid_package(good_package): """Test that package returns 'invalid' when failing some tests""" bad_package = good_package @@ -255,6 +275,7 @@ def test_invalid_package(good_package): assert result == "invalid" + def test_unclear_package(good_package): """Test that package returns 'needs review' when failing some tests""" bad_package = good_package diff --git a/tests/test_package_base.py b/tests/test_package_base.py new file mode 100644 index 0000000..b3fcbbe --- /dev/null +++ b/tests/test_package_base.py @@ -0,0 +1,446 @@ +import os +import shutil +from pathlib import Path + +import bagit +import pytest + +import digarch_scripts.package.package_base as pb + + +@pytest.fixture +def transfer_files(tmp_path: Path, request): + fixture_data = Path(request.module.__file__).parent / "fixtures" / "cloud" + shutil.copytree(fixture_data, tmp_path, dirs_exist_ok=True) + return tmp_path + + +@pytest.fixture +def rclone_payload(transfer_files): + return transfer_files / "rclone_files" + + +@pytest.fixture +def rclone_md5_manifest(transfer_files): + return transfer_files / "rclone.md5" + + +@pytest.fixture +def rclone_log(transfer_files): + return transfer_files / "rclone.log" + + +@pytest.fixture +def image_files(tmp_path: Path, request): + fixture_data = Path(request.module.__file__).parent / "fixtures" / "image" + shutil.copytree(fixture_data, tmp_path, dirs_exist_ok=True) + return tmp_path + + +@pytest.fixture +def rsync_files(tmp_path: Path, request): + fixture_data = Path(request.module.__file__).parent / "fixtures" / "rsync" + shutil.copytree(fixture_data, tmp_path, dirs_exist_ok=True) + return tmp_path + + +@pytest.fixture +def rsync_payload(rsync_files): + return rsync_files / "rsync_files" + + +@pytest.fixture +def rsync_log(rsync_files): + return rsync_files / "rsync.log" + + +@pytest.fixture +def acqid(): + return "ACQ_1234_123456" + + +def args(transfer_files): + args = [ + transfer_files / "rclone.md5", + transfer_files / "rclone.log", + transfer_files, + ] + return args + + +CREATE_DIR = [ + (pb.create_acq_dir, "ACQ_1234"), + (pb.create_package_dir, "ACQ_1234_123456"), +] + + +def test_file_found(image_files): + acq_id = "ACQ_1234" + + carrier_files = {} + carrier_files = pb.find_category_of_carrier_files( + carrier_files, acq_id, image_files, [".img"], "images" + ) + + assert ( + image_files / "ACQ_1234_123456.img" + in carrier_files[f"{acq_id}_123456"]["images"] + ) + + +def test_ignore_unknown_extension_for_category(image_files): + acq_id = "ACQ_1234" + + carrier_files = {} + carrier_files = pb.find_category_of_carrier_files( + carrier_files, acq_id, image_files, [".001"], "images" + ) + + assert f"{acq_id}_123456" not in carrier_files + + +def test_multiple_files_found(image_files): + acq_id = "ACQ_1234" + + carrier_files = {} + carrier_files = pb.find_category_of_carrier_files( + carrier_files, acq_id, image_files, [".log"], "logs" + ) + + assert len(carrier_files[f"{acq_id}_123456"]["logs"]) == 2 + + +@pytest.mark.parametrize("tested_function,id", CREATE_DIR) +def test_create_dir_exc_on_readonly(tmp_path: Path, id: str, tested_function): + """Test that package folder maker reports permission error""" + + # make folder read-only + os.chmod(tmp_path, 0o500) + + with pytest.raises(PermissionError) as exc: + tested_function(tmp_path, id) + + # change back to allow clean-up (might not be necessary) + os.chmod(tmp_path, 0o777) + assert f"{str(tmp_path)} is not writable" in str(exc.value) + + +def test_create_acq_dir(tmp_path: Path): + """Test that package folder maker makes ACQ and Carrier folders""" + + id = "ACQ_1234" + base_dir = pb.create_acq_dir(tmp_path, id) + + assert base_dir.name == id + assert base_dir.parent.name == tmp_path.name + + +def test_create_pkg_dir(tmp_path: Path, acqid: str): + """Test that package folder maker makes ACQ and Carrier folders""" + + base_dir = pb.create_package_dir(tmp_path, acqid) + + assert base_dir.name == acqid + assert base_dir.parent.name == acqid[:-7] + + +def test_create_package_basedir_with_existing_acq_dir(tmp_path: Path, acqid: str): + """Test that package folder maker respect existing ACQ folder""" + + (tmp_path / acqid[:-7]).mkdir() + base_dir = pb.create_package_dir(tmp_path, acqid) + + assert base_dir.name == acqid + assert base_dir.parent.name == acqid[:-7] + + +def test_error_on_existing_package_dir(tmp_path: Path, acqid: str): + """Test that package folder maker errors if carrier folder exists""" + + base_dir = tmp_path / acqid[:-7] / acqid + base_dir.mkdir(parents=True) + + with pytest.raises(FileExistsError) as exc: + pb.create_package_dir(tmp_path, acqid) + + assert f"{base_dir} already exists. Make sure you are using the correct ID" in str( + exc.value + ) + + +@pytest.fixture +def package_base_dir(tmp_path: Path, acqid: str): + return pb.create_package_dir(tmp_path, acqid) + + +MOVE_FILE = [ + (pb.move_metadata_file, "metadata"), +] + + +@pytest.mark.parametrize("test_function,dest", MOVE_FILE) +def test_move_file(package_base_dir: Path, rclone_log: Path, test_function, dest: str): + """Test that metadata folder and log file are moved successfully""" + + test_function(rclone_log, package_base_dir) + + assert not rclone_log.exists() + assert (package_base_dir / dest / "rclone.log").exists() + + +@pytest.mark.parametrize("test_function,dest", MOVE_FILE) +def test_do_not_overwrite_file( + package_base_dir: Path, rclone_log: Path, test_function, dest: str +): + """Test that log file is not moved if a same name file exists in dest""" + + rclone_log = package_base_dir / dest / rclone_log.name + rclone_log.parent.mkdir() + rclone_log.touch() + + with pytest.raises(FileExistsError) as exc: + test_function(rclone_log, package_base_dir) + + assert rclone_log.exists() + assert f"{rclone_log} already exists in {dest} folder. Not moving." in str( + exc.value + ) + + +MOVE_FILES = [ + (pb.move_metadata_files, "metadata"), + (pb.move_data_files, "data"), +] + + +@pytest.mark.parametrize("test_function,dest", MOVE_FILES) +def test_move_multiple_file( + package_base_dir: Path, + rclone_log: Path, + rclone_md5_manifest: Path, + test_function, + dest: str, +): + """Test that multiple files are moved successfully""" + parts = dest.split("/") + + md_files = [rclone_log, rclone_md5_manifest] + test_function(md_files, package_base_dir) + + for md_file in md_files: + assert not md_file.exists() + assert (package_base_dir / dest / md_file.name).exists() + + +@pytest.mark.parametrize("test_function,dest", MOVE_FILES) +def test_partial_halt_multiple_files( + package_base_dir: Path, + rclone_log: Path, + rclone_md5_manifest: Path, + test_function, + dest: str, +): + """Test that warning is issued for multiple move if a single metadata move fails""" + + rclone_log = package_base_dir / dest / rclone_log.name + rclone_log.parent.mkdir() + rclone_log.touch() + + md_files = [rclone_log, rclone_md5_manifest] + + with pytest.raises(Warning) as exc: + test_function(md_files, package_base_dir) + + assert rclone_log.exists() + assert ( + f"already exists in {dest} folder. Not moving. One or more files may have already been moved to the {dest} folder" + in str(exc.value) + ) + + +@pytest.fixture +def bag_payload(package_base_dir: Path, rclone_payload: Path): + pb.move_data_files(list(rclone_payload.iterdir()), package_base_dir) + bag_payload = package_base_dir / "data" + + return bag_payload + + +def test_convert_rclone_md5(bag_payload: Path, rclone_md5_manifest: Path): + pb.convert_rclone_md5_to_bagit_manifest(rclone_md5_manifest, bag_payload.parent) + bag_md5 = bag_payload.parent / "manifest-md5.txt" + + # Get path to correct payload in data + # read md5 and extract filepaths + with open(bag_md5) as m: + md5_paths = [line.strip().split(" ")[-1] for line in m.readlines()] + + payload_files = [ + str(path.relative_to(bag_payload.parent)) for path in bag_payload.rglob("*") + ] + + for a_file in md5_paths: + assert a_file in payload_files + + +@pytest.fixture +def rsync_bag_payload(package_base_dir: Path, rsync_payload: Path): + pb.move_data_files(list(rsync_payload.iterdir()), package_base_dir) + bag_payload = package_base_dir / "data" + + return bag_payload + + +def test_convert_rsync_log(rsync_bag_payload: Path, rsync_log: Path, rsync_files): + pb.convert_rsync_log_to_bagit_manifest(rsync_log, rsync_bag_payload.parent) + bag_md5 = rsync_bag_payload.parent / "manifest-md5.txt" + + # Get path to correct payload in data + # read md5 and extract filepaths + with open(bag_md5) as m: + md5_paths = [line.strip().split(" ")[-1] for line in m.readlines()] + + payload_files = [ + str(path.relative_to(rsync_bag_payload.parent)) + for path in rsync_bag_payload.rglob("*") + ] + + for a_file in md5_paths: + assert a_file in payload_files + + +def test_convert_rsync_log_replaces_prefix_with_data( + rsync_bag_payload: Path, rsync_log: Path +): + prefix = ( + "/Users/fortitude/dev/digarch-scripts-poetry/tests/fixtures/rsync/rsync_files" + ) + pb.convert_rsync_log_to_bagit_manifest(rsync_log, rsync_bag_payload.parent, prefix) + bag_md5 = rsync_bag_payload.parent / "manifest-md5.txt" + + # extract paths from manifest + with open(bag_md5) as m: + md5_paths = [line.strip().split(" ")[-1] for line in m.readlines()] + + # extract paths from log + rsync_paths = [] + with open(rsync_log) as m: + lines = m.readlines() + for line in lines: + parts = line.strip().split(", ") + if len(parts) > 3 and parts[2].strip(): + rsync_paths.append( + line.strip().split(", ")[-1].replace(prefix[1:], "data") + ) + + # assert difference + assert set(md5_paths) == set(rsync_paths) + + +def test_convert_rsync_log_requires_specific_format( + rsync_bag_payload: Path, rsync_log: Path, caplog +): + rsync_log.write_text("time, size, not a hash, good/path") + pb.convert_rsync_log_to_bagit_manifest(rsync_log, rsync_bag_payload.parent) + + assert ( + f"{str(rsync_log.name)} should be formatted with md5 hash in the 3rd comma-separated fields" + in caplog.text + ) + + +def test_create_objects_bag( + package_base_dir: Path, rclone_payload: Path, rclone_md5_manifest: Path +): + """Test that all tag files are created and rclone md5sums are correctly converted""" + + bag_path = package_base_dir / "objects" + + # might need further testing of the oxum and manifest converter functions + pb.create_bag_in_objects( + rclone_payload, package_base_dir, rclone_md5_manifest, "rclone" + ) + + assert bagit.Bag(str(bag_path)).validate(completeness_only=True) + assert not rclone_payload.exists() + + +def test_create_streams_bag(package_base_dir: Path, image_files: Path): + """Test that all tag files are created and new md5s are correctly created""" + + streams_path = image_files / "ACQ_1234_123456" + bag_path = package_base_dir / "streams" + + pb.create_bag_in_streams(streams_path, package_base_dir) + + assert bagit.Bag(str(bag_path)).validate(completeness_only=True) + assert not streams_path.exists() + + +def test_error_on_folder_when_creating_streams_bag( + package_base_dir: Path, image_files: Path +): + """Test that Exception is raised when streams folder contains a child directory""" + + streams_path = image_files / "ACQ_1234_123456" + subdir = streams_path / "subdir" + subdir.mkdir() + streams_contents = set(streams_path.iterdir()) + bag_path = package_base_dir / "streams" + + with pytest.raises(IsADirectoryError) as exc: + pb.create_bag_in_streams(streams_path, package_base_dir) + + assert f"{str(subdir)} is a directory, skipping" in str(exc.value) + assert set(streams_path.iterdir()) == streams_contents + assert not list(bag_path.iterdir()) + + +def test_generate_valid_oxum(transfer_files: Path): + """Test that script generates oxum correctly""" + # test with entire fixture to test folder recursion + + total_bytes, total_files = pb.get_oxum(transfer_files) + + assert total_bytes == 59286 + assert total_files == 12 + + +VALIDATE_BAGS = [ + (pb.validate_objects_bag, "objects"), + (pb.validate_images_bag, "images"), + (pb.validate_streams_bag, "streams"), +] + + +@pytest.mark.parametrize("test_function,type", VALIDATE_BAGS) +def test_validate_valid_bag(transfer_files: Path, test_function, type: str, caplog): + """Test the log message""" + + # create tiny bag for testing + sub_dir = transfer_files / type + sub_dir.mkdir() + (transfer_files / "rclone.md5").rename(sub_dir / "rlcone.md5") + test_bag = bagit.make_bag(sub_dir) + + test_function(transfer_files) + + assert f"{test_bag.path} is valid." in caplog.text + + +@pytest.mark.parametrize("test_function,type", VALIDATE_BAGS) +def test_validate_invalid_bag(transfer_files, test_function, type: str, caplog): + """Test the log message if the bag isn't valid for some reason""" + + sub_dir = transfer_files / type + sub_dir.mkdir() + (transfer_files / "rclone.md5").rename(sub_dir / "rlcone.md5") + + test_bag = bagit.make_bag(sub_dir) + print(list(Path(test_bag.path).iterdir())) + (Path(test_bag.path) / "bag-info.txt").unlink() + test_function(transfer_files) + + assert ( + f"{test_bag.path} is not valid. Check the bag manifest and oxum." in caplog.text + ) diff --git a/tests/test_package_cloud.py b/tests/test_package_cloud.py index 55e5561..0a12184 100644 --- a/tests/test_package_cloud.py +++ b/tests/test_package_cloud.py @@ -1,12 +1,12 @@ -import digarch_scripts.package.package_cloud as pc - import argparse import os -from pathlib import Path -import pytest import shutil +from pathlib import Path import bagit +import pytest + +import digarch_scripts.package.package_cloud as pc @pytest.fixture @@ -28,7 +28,7 @@ def args(transfer_files): str(transfer_files / "rclone.log"), "--dest", str(transfer_files), - "--id", + "--carrierid", "ACQ_1234_123456", ] return args @@ -86,200 +86,6 @@ def test_id_arg_must_match_pattern( assert f"bad_id does not match" in stderr -def test_create_package_basedir_exc_on_readonly(tmp_path: Path, args: list): - """Test that package folder maker reports permission error""" - - id = args[-1] - # make folder read-only - os.chmod(tmp_path, 0o500) - - with pytest.raises(PermissionError) as exc: - pc.create_base_dir(tmp_path, id) - - # change back to allow clean-up (might not be necessary) - os.chmod(tmp_path, 0o777) - assert f"{str(tmp_path)} is not writable" in str(exc.value) - - -def test_create_package_basedir(tmp_path: Path, args: list): - """Test that package folder maker makes ACQ and Carrier folders""" - - id = args[-1] - base_dir = pc.create_base_dir(tmp_path, args[-1]) - - assert base_dir.name == id - assert base_dir.parent.name == id[:-7] - - -def test_create_package_basedir_with_existing_acq_dir(tmp_path: Path, args: list): - """Test that package folder maker respect existing ACQ folder""" - - id = args[-1] - (tmp_path / id[:-7]).mkdir() - base_dir = pc.create_base_dir(tmp_path, args[-1]) - - assert base_dir.name == id - assert base_dir.parent.name == id[:-7] - - -def test_error_on_existing_package_dir(tmp_path: Path, args: list): - """Test that package folder maker errors if carrier folder exists""" - - id = args[-1] - base_dir = tmp_path / id[:-7] / id - base_dir.mkdir(parents=True) - - with pytest.raises(FileExistsError) as exc: - pc.create_base_dir(tmp_path, id) - - assert f"{base_dir} already exists. Make sure you are using the correct ID" in str( - exc.value - ) - - -@pytest.fixture -def package_base_dir(tmp_path: Path, args: list): - return pc.create_base_dir(tmp_path, args[-1]) - - -def test_move_metadata(transfer_files: Path, package_base_dir: Path): - """Test that metadata folder and log file are moved successfully""" - - source_log = transfer_files / "rclone.log" - pc.move_metadata_file(source_log, package_base_dir) - - assert not source_log.exists() - assert (package_base_dir / "metadata" / "rclone.log").exists() - - -def test_do_not_overwrite_metadata(transfer_files: Path, package_base_dir: Path): - """Test that log file is not moved if a same name file exists in dest""" - - source_log = transfer_files / "rclone.log" - rclone_log = package_base_dir / "metadata" / "rclone.log" - rclone_log.parent.mkdir() - rclone_log.touch() - - with pytest.raises(FileExistsError) as exc: - pc.move_metadata_file(source_log, package_base_dir) - - assert source_log.exists() - assert f"{rclone_log} already exists. Not moving." in str(exc.value) - -def test_move_payload(transfer_files: Path, package_base_dir: Path): - """Test that entirety of payload is moved and hierarchy is preserved""" - - source_payload = transfer_files / "rclone_files" - source_contents = [ - file.relative_to(source_payload) for file in source_payload.rglob("*") - ] - - data_path = package_base_dir / "objects" / "data" - pc.move_payload(source_payload, package_base_dir / "objects") - - # check that source is empty - assert not any(source_payload.iterdir()) - - assert data_path.exists() - - # compare contents of data and former source - data_contents = [file.relative_to(data_path) for file in data_path.rglob("*")] - assert source_contents == data_contents - - -def test_do_not_overwrite_payload(transfer_files: Path, package_base_dir: Path): - """Test that no payload file is moved if /data exists""" - - source_payload = transfer_files / "rclone_files" - source_contents = [file for file in source_payload.rglob("*")] - - bag_payload = package_base_dir / "objects" / "data" - bag_payload.mkdir(parents=True) - - with pytest.raises(FileExistsError) as exc: - pc.move_payload(source_payload, package_base_dir / "objects") - - # check source has not changed - assert source_contents == [file for file in source_payload.rglob("*")] - assert f"{bag_payload} already exists. Not moving files." in str(exc.value) - -@pytest.fixture -def bag_payload(transfer_files: Path, package_base_dir: Path): - pc.move_payload(transfer_files / "rclone_files", package_base_dir) - bag_payload = package_base_dir / "data" - - return bag_payload - -def test_convert_md5(bag_payload: Path, transfer_files: Path): - rclone_md5 = transfer_files / "rclone.md5" - pc.convert_to_bagit_manifest(rclone_md5, bag_payload.parent) - bag_md5 = bag_payload.parent / "manifest-md5.txt" - - # Get path to correct payload in data - # read md5 and extract filepaths - with open(bag_md5) as m: - md5_paths = [line.strip().split(' ')[-1] for line in m.readlines()] - - payload_files = [ - str(path.relative_to(bag_payload.parent)) for path in bag_payload.rglob('*') - ] - for a_file in md5_paths: - assert a_file in payload_files - - -def test_create_bag(transfer_files: Path, package_base_dir: Path): - """Test that all tag files are created and rclone md5sums are correctly converted""" - - md5_path = transfer_files / "rclone.md5" - bag_path = package_base_dir / "objects" - - # might need further testing of the oxum and manifest converter functions - pc.create_bag_in_objects( - transfer_files / "rclone_files", md5_path, package_base_dir - ) - - assert bagit.Bag(str(bag_path)).validate(completeness_only=True) - - -def test_generate_valid_oxum(transfer_files: Path): - """Test that script generates oxum correctly""" - - total_bytes, total_files = pc.get_oxum(transfer_files) - - assert total_bytes == 59286 - assert total_files == 12 - - -def test_validate_valid_bag(transfer_files: Path, caplog): - """Test the log message""" - - object_dir = transfer_files / "objects" - object_dir.mkdir() - (transfer_files / "rclone.md5").rename(object_dir / "rlcone.md5") - - test_bag = bagit.make_bag(object_dir) - - pc.validate_bag_in_payload(transfer_files) - - assert f"{test_bag.path} is valid." in caplog.text - - -def test_validate_invalid_bag(transfer_files, caplog): - """Test the log message if the bag isn't valid for some reason""" - - object_dir = transfer_files / "objects" - object_dir.mkdir() - (transfer_files / "rclone.md5").rename(object_dir / "rlcone.md5") - - test_bag = bagit.make_bag(object_dir) - print(list(Path(test_bag.path).iterdir())) - (Path(test_bag.path) / 'bag-info.txt').unlink() - pc.validate_bag_in_payload(transfer_files) - - - assert f"{test_bag.path} is not valid. Check the bag manifest and oxum." in caplog.text - - def test_full_run( monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture, args: list ): @@ -290,6 +96,6 @@ def test_full_run( pkg_dir = Path(args[-3]) / args[-1][:-7] / args[-1] assert pkg_dir.exists() - assert bagit.Bag(str(pkg_dir / 'objects')).validate() + assert bagit.Bag(str(pkg_dir / "objects")).validate() - assert 'rclone.log' in [x.name for x in (pkg_dir / 'metadata').iterdir()] + assert "rclone.log" in [x.name for x in (pkg_dir / "metadata").iterdir()] diff --git a/tests/test_package_filetransfer.py b/tests/test_package_filetransfer.py new file mode 100644 index 0000000..1f79bb2 --- /dev/null +++ b/tests/test_package_filetransfer.py @@ -0,0 +1,99 @@ +import argparse +import os +import shutil +from pathlib import Path + +import bagit +import pytest + +import digarch_scripts.package.package_filetransfer as pf + + +@pytest.fixture +def transfer_files(tmp_path: Path, request): + fixture_data = Path(request.module.__file__).parent / "fixtures" / "rsync" + shutil.copytree(fixture_data, tmp_path, dirs_exist_ok=True) + return tmp_path + + +@pytest.fixture +def args(transfer_files): + args = [ + "script_name", + "--payload", + str(transfer_files / "rsync_files"), + "--log", + str(transfer_files / "rsync.log"), + "--dest", + str(transfer_files), + "--carrierid", + "ACQ_1234_123456", + ] + return args + + +def test_requires_args( + monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture, args: list +): + """Test that script requires all five args""" + + for i in range(0, 4): + # remove a pair of list items (arg and value) for each test + part_args = args[0 : 2 * i + 1] + args[2 * i + 3 :] + + monkeypatch.setattr("sys.argv", part_args) + + with pytest.raises(SystemExit): + args = pf.parse_args() + + stderr = capsys.readouterr().err + + assert f"required: {args[2*i+1]}" in stderr + + +def test_arg_paths_must_exist( + monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture, args: list +): + """Test that script errors if path argument doesn't exist""" + + for i in range(1, 4): + bad_args = args + bad_path = "nonexistant" + bad_args[2 * i] = bad_path + + monkeypatch.setattr("sys.argv", bad_args) + with pytest.raises(SystemExit): + args = pf.parse_args() + + stderr = capsys.readouterr().err + + assert f"{bad_path} does not exist" in stderr + + +def test_id_arg_must_match_pattern( + monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture, args: list +): + """Test that script errors if id argument doesn't match ACQ_####_######""" + args[-1] = "bad_id" + monkeypatch.setattr("sys.argv", args) + with pytest.raises(SystemExit): + args = pf.parse_args() + + stderr = capsys.readouterr().err + + assert f"bad_id does not match" in stderr + + +def test_full_run( + monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture, args: list +): + """Test end to end successful run""" + + monkeypatch.setattr("sys.argv", args) + pf.main() + + pkg_dir = Path(args[-3]) / args[-1][:-7] / args[-1] + assert pkg_dir.exists() + assert bagit.Bag(str(pkg_dir / "objects")).validate() + + assert "rsync.log" in [x.name for x in (pkg_dir / "metadata").iterdir()] diff --git a/tests/test_package_images.py b/tests/test_package_images.py new file mode 100644 index 0000000..1f93d0f --- /dev/null +++ b/tests/test_package_images.py @@ -0,0 +1,280 @@ +import shutil +from pathlib import Path + +import bagit +import pytest + +import digarch_scripts.package.package_images as pi + + +@pytest.fixture +def transfer_files(tmp_path: Path, request): + fixture_data = Path(request.module.__file__).parent / "fixtures" / "image" + shutil.copytree(fixture_data, tmp_path, dirs_exist_ok=True) + return tmp_path + + +# Test command-line arguments +@pytest.fixture +def args(transfer_files): + args = [ + "script_name", + "--source", + str(transfer_files), + "--dest", + str(transfer_files), + "--acqid", + "ACQ_1234", + ] + return args + + +def test_requires_args( + monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture, args: list +): + """Test that script requires image, dest, and id (first 3 args)""" + + for i in range(0, 2): + # remove a pair of list items (arg and value) for each test + part_args = args[0 : 2 * i + 1] + args[2 * i + 3 :] + + monkeypatch.setattr("sys.argv", part_args) + + with pytest.raises(SystemExit): + args = pi.parse_args() + + stderr = capsys.readouterr().err + + assert f"required: {args[2*i+1]}" in stderr + + +def test_arg_paths_must_exist( + monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture, args: list +): + """Test that script errors if a path argument doesn't exist""" + + for i in [1, 2]: + bad_args = args + bad_path = "nonexistant" + bad_args[2 * i] = bad_path + + monkeypatch.setattr("sys.argv", bad_args) + with pytest.raises(SystemExit): + args = pi.parse_args() + + stderr = capsys.readouterr().err + + assert f"{bad_path} does not exist" in stderr + + +def test_id_arg_must_match_pattern( + monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture, args: list +): + """Test that script errors if id argument doesn't match ACQ_####_######""" + args[6] = "bad_id" + monkeypatch.setattr("sys.argv", args) + with pytest.raises(SystemExit): + args = pi.parse_args() + + stderr = capsys.readouterr().err + + assert f"bad_id does not match" in stderr + + +def test_carrier_files_found(transfer_files): + acq_id = "ACQ_1234" + + carrier_files = pi.find_carriers_image_files( + acq_id, + transfer_files, + transfer_files, + transfer_files, + ) + + carrier1 = f"{acq_id}_123456" + assert carrier1 in carrier_files + for key in ["images", "logs", "streams"]: + assert key in carrier_files[carrier1] + for key in carrier_files[carrier1]: + for item in carrier_files[carrier1][key]: + assert isinstance(item, Path) + + +def test_acqid_not_found(transfer_files): + acq_id = "ACQ_1111" + + with pytest.raises(Warning) as exc: + pi.find_carriers_image_files( + acq_id, + transfer_files, + ) + + assert f"No files found with the acquisition ID {acq_id} in filename" in str( + exc.value + ) + + +@pytest.fixture +def carrier_files(transfer_files): + acq_id = "ACQ_1234" + + carrier_files = pi.find_carriers_image_files( + acq_id, + transfer_files, + ) + return carrier_files + + +def test_good_validate_carrier(carrier_files, caplog): + result = pi.validate_carriers_image_files(carrier_files) + + assert not caplog.text + assert result + + +@pytest.mark.parametrize("key", ["images", "logs"]) +def test_warn_carrier_with_one_missing_category(carrier_files, key, caplog): + carrier_files["ACQ_1234_123456"].pop(key) + + result = pi.validate_carriers_image_files(carrier_files) + + assert ( + f"The following required categories of files were not found for ACQ_1234_123456: {key}" + in caplog.text + ) + assert not result + + +def test_warn_more_than_one_image(carrier_files, caplog): + carrier = "ACQ_1234_123457" + second_image = carrier_files[carrier]["images"][0].with_suffix(".img2") + second_image.write_text("0") + carrier_files[carrier]["images"].append(second_image) + + result = pi.validate_carriers_image_files(carrier_files) + + assert f"Multiple image files found for {carrier}. Only 1 allowed" in caplog.text + assert not result + + +def test_accept_two_sided_images(carrier_files): + carrier = "ACQ_1234_123457" + + image_name = carrier_files[carrier]["images"][0].name + first_image = carrier_files[carrier]["images"][0].parent / image_name.replace( + ".img", "s0.001" + ) + second_image = carrier_files[carrier]["images"][0].parent / image_name.replace( + ".img", "s1.001" + ) + second_image.write_text("0") + + carrier_files[carrier]["images"][0].rename(first_image) + carrier_files[carrier]["images"] = [first_image, second_image] + + result = pi.validate_carriers_image_files(carrier_files) + + assert result + + +def test_warn_on_malformed_two_sided_image_filename(carrier_files, caplog): + carrier = "ACQ_1234_123457" + + image_name = carrier_files[carrier]["images"][0].name + first_image = carrier_files[carrier]["images"][0].parent / image_name.replace( + ".img", "side0.001" + ) + second_image = carrier_files[carrier]["images"][0].parent / image_name.replace( + ".img", "side1.001" + ) + second_image.write_text("0") + + carrier_files[carrier]["images"][0].rename(first_image) + carrier_files[carrier]["images"] = [first_image, second_image] + + result = pi.validate_carriers_image_files(carrier_files) + + assert ( + "If carrier has 2 disk formats, file names must end with s0.001 or s1.001" + in caplog.text + ) + + assert not result + + +def test_warn_and_skip_0_length_image(carrier_files, caplog): + carrier = "ACQ_1234_123457" + carrier_files[carrier]["images"][0].unlink() + carrier_files[carrier]["images"][0].touch() + + result = pi.validate_carriers_image_files(carrier_files) + + assert ( + f'The following image file is 0-bytes: {str(carrier_files[carrier]["images"][0])}' + in caplog.text + ) + assert not result + + +def test_warn_streams_folder_empty(carrier_files, caplog): + carrier = "ACQ_1234_123457" + for file in carrier_files[carrier]["streams"][0].iterdir(): + file.unlink() + + result = pi.validate_carriers_image_files(carrier_files) + + assert f"Streams folder for {carrier} appears to be empty" in caplog.text + assert not result + + +def test_warn_only_one_stream_folder_allowed(carrier_files, caplog): + carrier = "ACQ_1234_123457" + carrier_files[carrier]["streams"].append("ACQ_1234_123457_2") + result = pi.validate_carriers_image_files(carrier_files) + + assert ( + f"Multiple folders of streams found for {carrier}. Only 1 allowed" + in caplog.text + ) + assert not result + + +def test_warn_stream_folder_contains_folders(carrier_files, caplog): + carrier = "ACQ_1234_123457" + (carrier_files[carrier]["streams"][0] / "subfolder").mkdir() + result = pi.validate_carriers_image_files(carrier_files) + + assert ( + f"Folders found with streams folder for {carrier}. None allowed" in caplog.text + ) + assert not result + + +def test_good_packaging(carrier_files, tmp_path: Path): + pi.package_carriers_image_files(carrier_files, tmp_path) + + for carrier in carrier_files: + assert carrier in [x.name for x in (tmp_path / "ACQ_1234").iterdir()] + + +def test_full_run( + monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture, args: list +): + """Test end to end successful run""" + + monkeypatch.setattr("sys.argv", args) + pi.main() + + acq_dir = Path(args[4]) / args[6] + assert acq_dir.exists() + + carrier = "ACQ_1234_123456" + + assert carrier in [x.name for x in acq_dir.iterdir()] + + for x in ["streams", "images"]: + component = acq_dir / carrier / x + assert component.exists() + assert bagit.Bag(str(component)).validate() + + assert (acq_dir / carrier / "metadata").exists() diff --git a/tests/test_report_ftk_extents.py b/tests/test_report_ftk_extents.py index 3831089..55dd4a5 100644 --- a/tests/test_report_ftk_extents.py +++ b/tests/test_report_ftk_extents.py @@ -1,6 +1,9 @@ -import src.digarch_scripts.report.report_ftk_extents as rfe -import pytest import json + +import pytest + +import src.digarch_scripts.report.report_ftk_extents as rfe + try: from lxml import etree except ImportError: @@ -9,17 +12,19 @@ @pytest.fixture def parsed_report(): - return etree.parse('tests/fixtures/report/Report.xml') + return etree.parse("tests/fixtures/report/Report.xml") + def test_identify_all_ers(parsed_report): """Function should list every bookmark starting with ER""" ers = rfe.create_er_list(parsed_report) - just_ers = [er[0][-1].split(':')[0] for er in ers] + just_ers = [er[0][-1].split(":")[0] for er in ers] for i in range(1, 12): - assert f'ER {i}' in just_ers - assert 'ER 23' in just_ers + assert f"ER {i}" in just_ers + assert "ER 23" in just_ers + def test_hierarchy_nests_down_correctly(parsed_report): """Function should include organization hierarchy. @@ -27,45 +32,91 @@ def test_hierarchy_nests_down_correctly(parsed_report): ers = rfe.create_er_list(parsed_report) just_titles = [er[0] for er in ers] - assert ['Extents Test papers', 'Series 1', 'Subseries(1)', 'ER 1: Text, 2023'] in just_titles - assert ['Extents Test papers', 'Series 1', 'Subseries(1)', 'Subsubseries(2)', 'ER 2: File 15, 2023'] in just_titles + assert [ + "Extents Test papers", + "Series 1", + "Subseries(1)", + "ER 1: Text, 2023", + ] in just_titles + assert [ + "Extents Test papers", + "Series 1", + "Subseries(1)", + "Subsubseries(2)", + "ER 2: File 15, 2023", + ] in just_titles + def test_hierarchy_nests_empty_subseries(parsed_report): """Function should include organization hierarchy including empty levels""" ers = rfe.create_er_list(parsed_report) just_titles = [er[0] for er in ers] - assert ['Extents Test papers', 'Series 1', 'Subseries(1)', 'Subsubseries(2)', 'Subsubsubseries(3)', 'Subsubsubsubseries(4)', 'ER 10: Folder 2, 2023'] in just_titles + assert [ + "Extents Test papers", + "Series 1", + "Subseries(1)", + "Subsubseries(2)", + "Subsubsubseries(3)", + "Subsubsubsubseries(4)", + "ER 10: Folder 2, 2023", + ] in just_titles + def test_hierarchy_nests_up_correctly(parsed_report): """Function should be able to step down in hierarchy""" ers = rfe.create_er_list(parsed_report) just_titles = [er[0] for er in ers] - assert ['Extents Test papers', 'Series 1', 'Subseries(1)', 'Subsubseries(2) the second', 'ER 23: File 17, 2023'] in just_titles - assert ['Extents Test papers', 'Series 1', 'Subseries(1) the second', 'ER 4: File 18, 2023'] in just_titles + assert [ + "Extents Test papers", + "Series 1", + "Subseries(1)", + "Subsubseries(2) the second", + "ER 23: File 17, 2023", + ] in just_titles + assert [ + "Extents Test papers", + "Series 1", + "Subseries(1) the second", + "ER 4: File 18, 2023", + ] in just_titles + def test_hierarchy_nests_reverse_order_bookmarks(parsed_report): """Function should parse bottom-up hierarchy""" ers = rfe.create_er_list(parsed_report) just_titles = [er[0] for er in ers] - assert ['Extents Test papers', 'Series 2', 'ER 9: File 20,2023'] in just_titles - assert ['Extents Test papers', 'Series 2', 'Subseries(1) of Series 2', 'ER 8: File 2, 2023'] in just_titles - assert ['Extents Test papers', 'Series 2', 'Subseries(1) of Series 2', 'Subsubseries(2) of Series 2', 'ER 7: File 19, 2023'] in just_titles + assert ["Extents Test papers", "Series 2", "ER 9: File 20,2023"] in just_titles + assert [ + "Extents Test papers", + "Series 2", + "Subseries(1) of Series 2", + "ER 8: File 2, 2023", + ] in just_titles + assert [ + "Extents Test papers", + "Series 2", + "Subseries(1) of Series 2", + "Subsubseries(2) of Series 2", + "ER 7: File 19, 2023", + ] in just_titles + def test_er_outside_of_series(parsed_report): """Function should include capture ERs even if they're not in a series""" ers = rfe.create_er_list(parsed_report) just_titles = [er[0] for er in ers] - assert ['Extents Test papers', 'ER 10: File 21,2023'] in just_titles + assert ["Extents Test papers", "ER 10: File 21,2023"] in just_titles + def test_correct_report_many_files(parsed_report): """Test if file count and byte count is completed correctly""" bookmark_tables = rfe.transform_bookmark_tables(parsed_report) - er_with_many_files = [['ER 1', 'bk6001']] + er_with_many_files = [["ER 1", "bk6001"]] extents = rfe.add_extents_to_ers(er_with_many_files, bookmark_tables) # bytes @@ -73,12 +124,13 @@ def test_correct_report_many_files(parsed_report): # files assert extents[0][2] == 7 + def test_correct_report_on_er_with_folder_bookmarked(parsed_report): """Test if file count and byte count is completed correctly when bookmark includes a folder that is bookmarked""" bookmark_tables = rfe.transform_bookmark_tables(parsed_report) - er_with_folder = [['ER 10', 'bk12001']] + er_with_folder = [["ER 10", "bk12001"]] extents = rfe.add_extents_to_ers(er_with_folder, bookmark_tables) # bytes @@ -86,12 +138,13 @@ def test_correct_report_on_er_with_folder_bookmarked(parsed_report): # files assert extents[0][2] == 5 + def test_correct_report_on_er_with_folder_not_bookmarked(parsed_report): """Test if file count and byte count is completed correctly when bookmark includes a folder that isn't bookmarked""" bookmark_tables = rfe.transform_bookmark_tables(parsed_report) - er_with_folder = [['ER 3', 'bk11001']] + er_with_folder = [["ER 3", "bk11001"]] extents = rfe.add_extents_to_ers(er_with_folder, bookmark_tables) # bytes @@ -99,11 +152,12 @@ def test_correct_report_on_er_with_folder_not_bookmarked(parsed_report): # files assert extents[0][2] == 5 + def test_correct_report_1_file(parsed_report): """Test if file count and byte count is completed correctly for one file""" bookmark_tables = rfe.transform_bookmark_tables(parsed_report) - er_with_one_file = [['ER 2', 'bk9001']] + er_with_one_file = [["ER 2", "bk9001"]] extents = rfe.add_extents_to_ers(er_with_one_file, bookmark_tables) # bytes @@ -111,38 +165,45 @@ def test_correct_report_1_file(parsed_report): # files assert extents[0][2] == 1 + def test_warn_on_no_files_in_er(parsed_report, caplog): """Test if warning is logged for empty bookmarks and ER is omitted from report""" bookmark_tables = rfe.transform_bookmark_tables(parsed_report) - er_with_no_files = [[['hier', 'archy', 'list'], 'bk27001', 'ER 5: No Files, 2023']] + er_with_no_files = [[["hier", "archy", "list"], "bk27001", "ER 5: No Files, 2023"]] extents = rfe.add_extents_to_ers(er_with_no_files, bookmark_tables) assert extents == [] - log_msg = f'{er_with_no_files[0][-1]} does not contain any files. It will be omitted from the report.' + log_msg = f"{er_with_no_files[0][-1]} does not contain any files. It will be omitted from the report." assert log_msg in caplog.text + def test_warn_on_a_no_byte_file_in_er(parsed_report, caplog): """Test if warning is logged for empty files in an ER""" bookmark_tables = rfe.transform_bookmark_tables(parsed_report) - er_with_no_bytes = [[['hier', 'archy', 'list'], 'bk28001', 'ER 6: Zero Length, 2023']] + er_with_no_bytes = [ + [["hier", "archy", "list"], "bk28001", "ER 6: Zero Length, 2023"] + ] rfe.add_extents_to_ers(er_with_no_bytes, bookmark_tables) - log_msg = f'{er_with_no_bytes[0][-1]} contains the following 0-byte file: file00.txt. Review this file with the processing archivist.' + log_msg = f"{er_with_no_bytes[0][-1]} contains the following 0-byte file: file00.txt. Review this file with the processing archivist." assert log_msg in caplog.text + def test_warn_on_no_bytes_in_er(parsed_report, caplog): """Test if warning is logged for bookmarks with 0 bytes total and ER is omitted from report""" bookmark_tables = rfe.transform_bookmark_tables(parsed_report) - er_with_no_bytes = [[['hier', 'archy', 'list'], 'bk28001', 'ER 6: Zero Length, 2023']] + er_with_no_bytes = [ + [["hier", "archy", "list"], "bk28001", "ER 6: Zero Length, 2023"] + ] extents = rfe.add_extents_to_ers(er_with_no_bytes, bookmark_tables) assert extents == [] - log_msg = f'{er_with_no_bytes[0][-1]} contains no files with bytes. This ER is omitted from report. Review this ER with the processing archivist.' + log_msg = f"{er_with_no_bytes[0][-1]} contains no files with bytes. This ER is omitted from report. Review this ER with the processing archivist." assert log_msg in caplog.text @@ -150,7 +211,8 @@ def test_extract_collection_name_from_report(parsed_report): """Test if collection name is taken from XML""" coll_name = rfe.extract_collection_title(parsed_report) - assert coll_name == 'M12345 Extents Test' + assert coll_name == "M12345 Extents Test" + @pytest.fixture def ers_with_extents_list(parsed_report): @@ -160,40 +222,45 @@ def ers_with_extents_list(parsed_report): return ers_with_extents + def test_json_objects_contains_expected_fields(ers_with_extents_list): """Test if final report aligns with expectations for ASpace import""" - full_dict = {'title': 'slug', 'children': []} + full_dict = {"title": "slug", "children": []} for er in ers_with_extents_list: rfe.create_report(er, full_dict) def recursive_validator(er_dict): for key, value in er_dict.items(): - if key == 'title': + if key == "title": assert type(value) is str - elif key == 'children': + elif key == "children": assert type(value) is list for child in value: recursive_validator(child) - elif key == 'er_number': + elif key == "er_number": assert type(value) is str - elif key == 'er_name': + elif key == "er_name": assert type(value) is str - elif key == 'file_size': + elif key == "file_size": assert type(value) is int - elif key == 'file_count': + elif key == "file_count": assert type(value) is int else: assert False recursive_validator(full_dict) + def test_skipped_ER_number_behavior(parsed_report, caplog): """Test if script flags when ER numbering is not sequential""" ers = rfe.create_er_list(parsed_report) for i in range(13, 23): - assert f'Collection uses ER 1 to ER 23. ER {i} is skipped. Review the ERs with the processing archivist' in caplog.text + assert ( + f"Collection uses ER 1 to ER 23. ER {i} is skipped. Review the ERs with the processing archivist" + in caplog.text + ) def test_ER_missing_number_behavior(parsed_report, caplog): @@ -203,7 +270,7 @@ def test_ER_missing_number_behavior(parsed_report, caplog): rfe.audit_ers(ers) - log_msg = f'ER is missing a number: ER ?: File 21,2023. Review the ERs with the processing archivist' + log_msg = f"ER is missing a number: ER ?: File 21,2023. Review the ERs with the processing archivist" assert log_msg in caplog.text @@ -213,18 +280,20 @@ def test_repeated_ER_number_behavior(parsed_report, caplog): rfe.audit_ers(ers) - log_msg = f'ER 10 is used multiple times: ER 10: File 21,2023, ER 10: Folder 2, 2023. Review the ERs with the processing archivist' + log_msg = f"ER 10 is used multiple times: ER 10: File 21,2023, ER 10: Folder 2, 2023. Review the ERs with the processing archivist" assert log_msg in caplog.text + @pytest.fixture def expected_json(): - with open('tests/fixtures/report/report.json') as f: + with open("tests/fixtures/report/report.json") as f: report = json.load(f) return report + def test_create_correct_json(ers_with_extents_list, expected_json): """Test that final report matches total expectations""" - dct = {'title': 'coll', 'children': []} + dct = {"title": "coll", "children": []} for er in ers_with_extents_list: dct = rfe.create_report(er, dct) diff --git a/tests/test_report_hdd_extents.py b/tests/test_report_hdd_extents.py index ff7cef8..d68b915 100644 --- a/tests/test_report_hdd_extents.py +++ b/tests/test_report_hdd_extents.py @@ -1,27 +1,32 @@ -import src.digarch_scripts.report.report_hdd_extents as rhe -import pytest -import shutil -import re -import pathlib import json +import pathlib +import re +import shutil + +import pytest + +import src.digarch_scripts.report.report_hdd_extents as rhe + @pytest.fixture() def arranged_collection(tmp_path: pathlib.Path): - path = tmp_path.joinpath('hdd') - shutil.copytree('tests/fixtures/report', path) + path = tmp_path.joinpath("hdd") + shutil.copytree("tests/fixtures/report", path) return path + def test_identify_all_ers(arranged_collection): """Function should list every folder starting with ER""" ers = rhe.get_ers(arranged_collection) print(ers) - just_ers = [re.search(r'ER\s\d+', er[0]).group() for er in ers] + just_ers = [re.search(r"ER\s\d+", er[0]).group() for er in ers] for i in range(1, 4): - assert f'ER {i}' in just_ers + assert f"ER {i}" in just_ers for i in range(7, 12): - assert f'ER {i}' in just_ers - assert 'ER 23' in just_ers + assert f"ER {i}" in just_ers + assert "ER 23" in just_ers + def test_hierarchy_nests_down_correctly(arranged_collection): """Function should include organization hierarchy. @@ -30,28 +35,37 @@ def test_hierarchy_nests_down_correctly(arranged_collection): just_titles = [er[0] for er in ers] print(just_titles) - assert 'M12345_FAcomponents/Series 1/Subseries(1)/ER 1 Text, 2023' in just_titles - assert 'M12345_FAcomponents/Series 1/Subseries(1)/Subsubseries(2)/ER 2 File 15, 2023' in just_titles + assert "M12345_FAcomponents/Series 1/Subseries(1)/ER 1 Text, 2023" in just_titles + assert ( + "M12345_FAcomponents/Series 1/Subseries(1)/Subsubseries(2)/ER 2 File 15, 2023" + in just_titles + ) + def test_hierarchy_nests_empty_subseries(arranged_collection): """Function should include organization hierarchy including empty levels""" ers = rhe.get_ers(arranged_collection) just_titles = [er[0] for er in ers] - assert 'M12345_FAcomponents/Series 1/Subseries(1)/Subsubseries(2)/Subsubsubseries(3)/Subsubsubsubseries(4)/ER 10 Folder 2, 2023' in just_titles + assert ( + "M12345_FAcomponents/Series 1/Subseries(1)/Subsubseries(2)/Subsubsubseries(3)/Subsubsubsubseries(4)/ER 10 Folder 2, 2023" + in just_titles + ) + def test_er_outside_of_series(arranged_collection): """Function should include capture ERs even if they're not in a series""" ers = rhe.get_ers(arranged_collection) just_titles = [er[0] for er in ers] - assert 'M12345_FAcomponents/ER 10 File 21,2023' in just_titles + assert "M12345_FAcomponents/ER 10 File 21,2023" in just_titles + def test_correct_report_many_files(arranged_collection): """Test if file count and byte count is completed correctly""" ers = rhe.get_ers(arranged_collection) - er_with_many_files = 'ER 1 Text, 2023' + er_with_many_files = "ER 1 Text, 2023" for er in ers: if er[3] == er_with_many_files: bytes, files = er[1:3] @@ -62,12 +76,13 @@ def test_correct_report_many_files(arranged_collection): # files assert files == 7 + def test_correct_report_on_er_with_folder_included(arranged_collection): """Test if file count and byte count is completed correctly when bookmark includes a folder that is bookmarked""" ers = rhe.get_ers(arranged_collection) - er_with_folder = 'ER 10 Folder 2, 2023' + er_with_folder = "ER 10 Folder 2, 2023" for er in ers: if er[3] == er_with_folder: bytes, files = er[1:3] @@ -82,7 +97,7 @@ def test_correct_report_1_file(arranged_collection): """Test if file count and byte count is completed correctly for one file""" ers = rhe.get_ers(arranged_collection) - er_with_one_file = 'ER 2 File 15, 2023' + er_with_one_file = "ER 2 File 15, 2023" for er in ers: if er[3] == er_with_one_file: bytes, files = er[1:3] @@ -97,9 +112,9 @@ def test_warn_on_no_files_in_er(arranged_collection, caplog): """Test if warning is logged for empty bookmarks and ER is omitted from report""" ers = rhe.get_ers(arranged_collection) - er_with_no_files = 'ER 5 No Files, 2023' + er_with_no_files = "ER 5 No Files, 2023" - log_msg = f'{er_with_no_files} does not contain any files. It will be omitted from the report.' + log_msg = f"{er_with_no_files} does not contain any files. It will be omitted from the report." assert log_msg in caplog.text @@ -107,11 +122,11 @@ def test_warn_on_a_no_byte_file_in_er(arranged_collection, caplog): """Test if warning is logged for empty files in an ER""" ers = rhe.get_ers(arranged_collection) - er_with_no_bytes = 'ER 6 Zero Length, 2023' + er_with_no_bytes = "ER 6 Zero Length, 2023" # rfe.add_extents_to_ers(er_with_no_bytes, bookmark_tables) # log warning, script should continue running # 'ER xxx: Title contain zero byte files.' - log_msg = f'{er_with_no_bytes} contains the following 0-byte file: file00.txt. Review this file with the processing archivist.' + log_msg = f"{er_with_no_bytes} contains the following 0-byte file: file00.txt. Review this file with the processing archivist." assert log_msg in caplog.text @@ -119,11 +134,11 @@ def test_warn_on_no_bytes_in_er(arranged_collection, caplog): """Test if warning is logged for bookmarks with 0 bytes total and ER is omitted from report""" ers = rhe.get_ers(arranged_collection) - er_with_no_bytes = 'ER 6 Zero Length, 2023' + er_with_no_bytes = "ER 6 Zero Length, 2023" # rfe.add_extents_to_ers(er_with_no_bytes, bookmark_tables) # log warning, script should continue running # 'ER xxx: Title does not contain any bytes. It will be omitted from the report' - log_msg = f'{er_with_no_bytes} contains no files with bytes. This ER is omitted from report. Review this ER with the processing archivist.' + log_msg = f"{er_with_no_bytes} contains no files with bytes. This ER is omitted from report. Review this ER with the processing archivist." assert log_msg in caplog.text @@ -131,9 +146,9 @@ def test_warn_on_no_objects_in_er(arranged_collection, caplog): """Test if warning is logged for empty bookmarks and ER is omitted from report""" ers = rhe.get_ers(arranged_collection) - er_with_no_files = 'ER 13 No objects, 2023' + er_with_no_files = "ER 13 No objects, 2023" - log_msg = f'{er_with_no_files} does not contain an object folder. It will be omitted from the report.' + log_msg = f"{er_with_no_files} does not contain an object folder. It will be omitted from the report." assert log_msg in caplog.text @@ -141,16 +156,18 @@ def test_extract_collection_name(arranged_collection): """Test if collection name is taken from XML""" coll_name = rhe.extract_collection_title(arranged_collection) - assert coll_name == 'M12345_FAcomponents' + assert coll_name == "M12345_FAcomponents" + def test_warn_on_bad_collection_name(arranged_collection, caplog): """Test if collection name is taken from XML""" - coll_name_folder = arranged_collection / 'M12345_FAcomponents' - coll_name_folder.rename(arranged_collection / 'Test_Coll') + coll_name_folder = arranged_collection / "M12345_FAcomponents" + coll_name_folder.rename(arranged_collection / "Test_Coll") coll_name = rhe.extract_collection_title(arranged_collection) - log_msg = 'Cannot find CollectionID_FAcomponents directory. Please use CollectionID_FAcomponents naming convention for the directory containing all ERs.' + log_msg = "Cannot find CollectionID_FAcomponents directory. Please use CollectionID_FAcomponents naming convention for the directory containing all ERs." assert log_msg in caplog.text + def test_ER_missing_number_behavior(arranged_collection, caplog): """Test if script flags when ER number is reused""" ers = rhe.get_ers(arranged_collection) @@ -158,51 +175,55 @@ def test_ER_missing_number_behavior(arranged_collection, caplog): rhe.audit_ers(ers) - log_msg = f'ER is missing a number: ER ? File 21,2023. Review the ERs with the processing archivist' + log_msg = f"ER is missing a number: ER ? File 21,2023. Review the ERs with the processing archivist" assert log_msg in caplog.text + def test_skipped_ER_number_behavior(arranged_collection, caplog): ers = rhe.get_ers(arranged_collection) rhe.audit_ers(ers) # log warning, but continue operation - for number in range(13,22): - log_msg = f'Collection uses ER 1 to ER 23. ER {number} is skipped. Review the ERs with the processing archivist' + for number in range(13, 22): + log_msg = f"Collection uses ER 1 to ER 23. ER {number} is skipped. Review the ERs with the processing archivist" assert log_msg in caplog.text + def test_repeated_ER_number_behavior(arranged_collection, caplog): ers = rhe.get_ers(arranged_collection) rhe.audit_ers(ers) - log_msg = 'ER 10 is used multiple times' + log_msg = "ER 10 is used multiple times" assert log_msg in caplog.text + @pytest.fixture def extracted_ers(arranged_collection): return rhe.get_ers(arranged_collection) + def test_json_objects_contains_expected_fields(extracted_ers): """Test if final report aligns with expectations for ASpace import""" - full_dict = rhe.create_report(extracted_ers, {'title': 'test', 'children': []}) + full_dict = rhe.create_report(extracted_ers, {"title": "test", "children": []}) def recursive_validator(er_dict): for key, value in er_dict.items(): - if key == 'title': + if key == "title": assert type(value) is str - elif key == 'children': + elif key == "children": assert type(value) is list for child in value: recursive_validator(child) - elif key == 'er_number': + elif key == "er_number": assert type(value) is str - elif key == 'er_name': + elif key == "er_name": assert type(value) is str - elif key == 'file_size': + elif key == "file_size": assert type(value) is int - elif key == 'file_count': + elif key == "file_count": assert type(value) is int else: assert False @@ -212,19 +233,19 @@ def recursive_validator(er_dict): @pytest.fixture def expected_json(): - with open('tests/fixtures/report/report.json') as f: + with open("tests/fixtures/report/report.json") as f: raw = f.read() - #adjust fixture for hdd conventions + # adjust fixture for hdd conventions colons_removed = re.sub(r"(ER \d+):", r"\1", raw) report = json.loads(colons_removed) - report['children'][0]['title'] = 'M12345_FAcomponents' - + report["children"][0]["title"] = "M12345_FAcomponents" return report + def test_create_correct_json(extracted_ers, expected_json): """Test that final report matches total expectations""" - dct = rhe.create_report(extracted_ers, {'title': 'coll', 'children': []}) + dct = rhe.create_report(extracted_ers, {"title": "coll", "children": []}) assert dct == expected_json diff --git a/tests/test_report_transfers.py b/tests/test_report_transfers.py new file mode 100644 index 0000000..848ccee --- /dev/null +++ b/tests/test_report_transfers.py @@ -0,0 +1,53 @@ +from pathlib import Path +import pytest +import shutil + +import digarch_scripts.package.package_images as pi +import digarch_scripts.report.report_transfers as rt + + +@pytest.fixture +def transfer_dir(tmp_path: Path, request): + fixture_data = Path(request.module.__file__).parent / "fixtures" / "image" + shutil.copytree(fixture_data, tmp_path, dirs_exist_ok=True) + acq_id = "ACQ_1234" + carrier_files = pi.find_carriers_image_files( + acq_id, + tmp_path, + ) + pi.package_carriers_image_files(carrier_files, tmp_path) + return tmp_path / acq_id + + + +def test_parse_args(transfers_dir): + assert rt.parse_args() == rt.parse_args() + + +def test_collect_stats(transfers_dir): + assert rt.collect_stats(Path("test")) == rt.collect_stats(Path("test")) + + +def test_collect_bag_stats(transfers_dir): + assert rt.collect_bag_stats(Path("test")) == rt.collect_bag_stats(Path("test")) + + +def test_warn_on_invalid_bag(image_bag, caplog): + (image_bag / "bagit.txt").unlink() + rt.collect_bag_stats(image_bag) + + assert "Directory should be formatted as a bag" in caplog.text + + +def test_warn_on_missing_date_in_bag(image_bag, caplog): + (image_bag / "bag-info.txt").write_text("Bag-Size: 1234") + rt.collect_bag_stats(image_bag) + + assert "Directory should be formatted as a bag" in caplog.text + + +def test_warn_on_missing_size_in_bag(image_bag, caplog): + (image_bag / "bagit.txt").unlink() + rt.collect_bag_stats(image_bag) + + assert "Directory should be formatted as a bag" in caplog.text diff --git a/tests/test_transfer_rsync.py b/tests/test_transfer_rsync.py new file mode 100644 index 0000000..d94d997 --- /dev/null +++ b/tests/test_transfer_rsync.py @@ -0,0 +1,139 @@ +import shutil +import subprocess +from pathlib import Path + +import bagit +import pytest + +import digarch_scripts.transfer.transfer_rsync as tr + + +@pytest.fixture +def transfer_files(tmp_path: Path, request): + fixture_data = Path(request.module.__file__).parent / "fixtures" / "rsync" + shutil.copytree(fixture_data, tmp_path, dirs_exist_ok=True) + return tmp_path + + +@pytest.fixture +def args(transfer_files): + args = [ + "script_name", + "--source", + str(transfer_files / "rsync_files"), + "--dest", + str(transfer_files), + "--carrierid", + "ACQ_1234_123456", + ] + return args + + +def test_requires_args( + monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture, args: list +): + """Test that script requires all five args""" + + for i in range(0, 3): + # remove a pair of list items (arg and value) for each test + part_args = args[0 : 2 * i + 1] + args[2 * i + 3 :] + + monkeypatch.setattr("sys.argv", part_args) + + with pytest.raises(SystemExit): + args = tr.parse_args() + + stderr = capsys.readouterr().err + + assert f"required: {args[2*i+1]}" in stderr + + +def test_arg_paths_must_exist( + monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture, args: list +): + """Test that script errors if path argument doesn't exist""" + + for i in range(1, 3): + bad_args = args + bad_path = "nonexistant" + bad_args[2 * i] = bad_path + + monkeypatch.setattr("sys.argv", bad_args) + with pytest.raises(SystemExit): + args = tr.parse_args() + + stderr = capsys.readouterr().err + + assert f"{bad_path} does not exist" in stderr + + +def test_id_arg_must_match_pattern( + monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture, args: list +): + """Test that script errors if id argument doesn't match ACQ_####_######""" + args[-1] = "bad_id" + monkeypatch.setattr("sys.argv", args) + with pytest.raises(SystemExit): + args = tr.parse_args() + + stderr = capsys.readouterr().err + + assert f"bad_id does not match" in stderr + + +def test_rsync_completes_successfully(transfer_files): + id = "ACQ_1234_123456" + source = transfer_files / "rsync_files" + dest = transfer_files / id + dest.mkdir() + tr.run_rsync(source, dest) + + assert (dest / "metadata" / f"{id}_rsync.log").exists() + assert (dest / "objects" / "data").is_dir() + assert True + + +def test_rsync_fails_gracefully(transfer_files, monkeypatch, caplog): + tr.run_rsync("/nonexistant", transfer_files) + + assert ( + "Transfer did not complete successfully. Delete transferred files and re-run" + in caplog.text + ) + + +@pytest.fixture +def mounted_image(transfer_files): + image = transfer_files / "rsync_files.dmg" + mount_point = transfer_files / "new" + mount_point.mkdir() + process = subprocess.run(["hdiutil", "attach", image, "-mountpoint", mount_point]) + + return mount_point + + +def test_disktype_completes_successfully(mounted_image, transfer_files): + # source make and mount tiny disk image + dest = transfer_files + tr.run_disktype(mounted_image, dest) + assert (dest / "metadata" / f"{dest.name}_disktype.log").exists() + + +def test_disktype_skips_folders(transfer_files, caplog): + source = transfer_files / "rsync_files" + tr.run_disktype(source, transfer_files) + + assert "Disktype log cannot be generated for a folder. Skipping" in caplog.text + + +def test_full_run( + monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture, args: list +): + """Test end to end successful run""" + + monkeypatch.setattr("sys.argv", args) + tr.main() + + pkg_dir = Path(args[-3]) / args[-1][:-7] / args[-1] + assert pkg_dir.exists() + assert bagit.Bag(str(pkg_dir / "objects")).validate()