From 9344e4b0d3c1b3597e48a3a9d3420470db99946a Mon Sep 17 00:00:00 2001 From: Andrey170170 Date: Sun, 16 Feb 2025 16:08:03 -0500 Subject: [PATCH 1/5] Added lila_separation_multilable_filtering tool --- .../__init__.py | 0 .../classes.py | 131 ++++++++++++++++++ .../metadata_separation.py | 25 ++++ 3 files changed, 156 insertions(+) create mode 100644 src/DD_tools/lila_separation_multilable_filtering/__init__.py create mode 100644 src/DD_tools/lila_separation_multilable_filtering/classes.py create mode 100644 src/DD_tools/lila_separation_multilable_filtering/metadata_separation.py diff --git a/src/DD_tools/lila_separation_multilable_filtering/__init__.py b/src/DD_tools/lila_separation_multilable_filtering/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/DD_tools/lila_separation_multilable_filtering/classes.py b/src/DD_tools/lila_separation_multilable_filtering/classes.py new file mode 100644 index 0000000..a3e0410 --- /dev/null +++ b/src/DD_tools/lila_separation_multilable_filtering/classes.py @@ -0,0 +1,131 @@ +import os +import shutil +from typing import List + +import pandas as pd + +from DD_tools.main.config import Config +from DD_tools.main.filters import PythonFilterToolBase, FilterRegister +from DD_tools.main.runners import MPIRunnerTool, RunnerRegister +from DD_tools.main.schedulers import DefaultScheduler, SchedulerRegister + + +@FilterRegister("lila_separation") +class LilaSeparationFilter(PythonFilterToolBase): + def __init__(self, cfg: Config): + super().__init__(cfg) + + self.filter_name: str = "lila_separation" + self.data_path = "/users/PAS2119/andreykopanev/gbif/data/lila_separation_table/part-00000-6e425202-ecec-426d-9631-f2f52fd45c51-c000.csv" + + def run(self): + filter_table_folder = os.path.join( + self.tools_path, self.filter_name, "filter_table" + ) + os.makedirs(filter_table_folder, exist_ok=True) + filter_table_folder += "/table.csv" + + shutil.copyfile(self.data_path, filter_table_folder) + + +@SchedulerRegister("lila_separation") +class LilaSeparationScheduleCreation(DefaultScheduler): + def __init__(self, cfg: Config): + super().__init__(cfg) + + self.filter_name: str = "lila_separation" + + +@RunnerRegister("lila_separation") +class LilaSeparationRunner(MPIRunnerTool): + def __init__(self, cfg: Config): + super().__init__(cfg) + self.filter_name: str = "lila_separation" + + self.data_scheme: List[str] = [ + "uuid", + "source_id", + "uuid_main", + "source_id_main", + "server_name", + "old_partition_id", + "partition_id", + ] + self.verification_scheme: List[str] = ["server_name", "partition_id"] + self.new_path = "/fs/scratch/PAS2136/gbif/processed/lilabc/separated_multilabel_data/downloaded_image" + self.total_time = 600 + + def apply_filter( + self, filtering_df: pd.DataFrame, server_name: str, partition_id: str + ) -> int: + self.is_enough_time() + + # self.downloaded_images_path + filtering_df_grouped = filtering_df.groupby(["server_name", "old_partition_id"]) + separated_dict = [] + for name, group in filtering_df_grouped: + parquet_path = os.path.join( + self.downloaded_images_path, + f"server_name={name[0]}", + f"partition_id={name[1]}", + "successes.parquet", + ) + if not os.path.exists(parquet_path): + self.logger.info(f"Path doesn't exists: {server_name}/{partition_id}") + continue + + partial_df = pd.read_parquet( + parquet_path, filters=[("uuid", "in", group["uuid_main"])] + ) + partial_merged_df = pd.merge( + partial_df, + group, + left_on="uuid", + right_on="uuid_main", + suffixes=("_x", "_y"), + sort=False, + how="right", + ) + + partial_merged_df = partial_merged_df[ + [ + "uuid_y", + "source_id_y", + "identifier", + "is_license_full", + "license", + "source", + "title", + "hashsum_original", + "hashsum_resized", + "original_size", + "resized_size", + "image", + ] + ] + separated_dict.extend( + partial_merged_df.rename( + {"uuid_y": "uuid", "source_id_y": "source_id"}, inplace=True + ).to_dict("records") + ) + + merged_df = pd.DataFrame.from_records(separated_dict) + + self.is_enough_time() + + save_path = os.path.join( + self.new_path, f"server_name={server_name}", f"partition_id={partition_id}" + ) + os.makedirs(save_path, exist_ok=True) + + if len(merged_df) == 0: + self.logger.info(f"Empty: {server_name}/{partition_id}") + + merged_df.to_parquet( + save_path + "/successes.parquet", + index=False, + compression="zstd", + compression_level=3, + ) + + return len(merged_df) diff --git a/src/DD_tools/lila_separation_multilable_filtering/metadata_separation.py b/src/DD_tools/lila_separation_multilable_filtering/metadata_separation.py new file mode 100644 index 0000000..7ee6634 --- /dev/null +++ b/src/DD_tools/lila_separation_multilable_filtering/metadata_separation.py @@ -0,0 +1,25 @@ +from pyspark.sql import SparkSession + +base_path = "/fs/scratch/PAS2136/gbif/processed/lilabc/merged_data/servers_batched" +filter_path = "/users/PAS2119/andreykopanev/gbif/data/lila_separation_table/part-00000-6e425202-ecec-426d-9631-f2f52fd45c51-c000.csv" +save_path = "/fs/scratch/PAS2136/gbif/processed/lilabc/separated_multilabel_data/servers_batched" + +if __name__ == "__main__": + spark = SparkSession.builder.appName("Multimedia prep").getOrCreate() + spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED") + spark.conf.set("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED") + + metadata_df = spark.read.parquet(base_path).drop("partition_id") + filter_df = spark.read.csv(filter_path, header=True).select("uuid", "partition_id") + + df = metadata_df.join(filter_df, on="uuid", how="inner") + + (df + .repartition("server_name", "partition_id") + .write + .partitionBy("server_name", "partition_id") + .mode("overwrite") + .format("parquet") + .save(save_path)) + + spark.stop() From f41119261d8b8b6a8a7da29ce0cfbe37b604b389 Mon Sep 17 00:00:00 2001 From: Andrey170170 Date: Mon, 12 May 2025 02:29:05 -0400 Subject: [PATCH 2/5] Rename project from 'DD_tools' to 'TreeOfLife_toolbox'. Updated package structure, filenames, and references to reflect the new name. Adjusted `pyproject.toml` to rename the project, update dependencies, and modify supported Python versions. These changes ensure consistency and alignment with the new project branding. --- pyproject.toml | 28 ++++--------------- .../__init__.py | 0 .../main/__about__.py | 0 .../main/checkpoint.py | 0 .../main/config.py | 0 .../main/config_templates/tools.yaml | 0 .../main/filter.py | 0 .../main/filters.py | 0 .../main/main.py | 0 .../main/registry.py | 0 .../main/runner.py | 0 .../main/runners.py | 0 .../main/scheduler.py | 0 .../main/schedulers.py | 0 .../main/utils.py | 0 .../main/verification.py | 0 16 files changed, 5 insertions(+), 23 deletions(-) rename src/{DD_tools => TreeOfLife_toolbox}/__init__.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/__about__.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/checkpoint.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/config.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/config_templates/tools.yaml (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/filter.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/filters.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/main.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/registry.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/runner.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/runners.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/scheduler.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/schedulers.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/utils.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/verification.py (100%) diff --git a/pyproject.toml b/pyproject.toml index 17be78f..cb76174 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,10 +3,10 @@ requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] -packages = ["src/DD_tools"] +packages = ["src/TreeOfLife_toolbox"] [project] -name = "DD_tools" +name = "TreeOfLife_toolbox" dynamic = ["version"] authors = [ { name = "Andrey Kopanev", email = "kopanev.1@osu.edu" }, @@ -15,7 +15,7 @@ authors = [ ] description = "A tool for downloading files from a list of URLs in parallel." readme = "README.md" -requires-python = ">=3.8" +requires-python = ">=3.10, <3.12" classifiers = [ "Programming Language :: Python :: 3", "License :: OSI Approved :: MIT License", @@ -24,48 +24,30 @@ classifiers = [ dependencies = [ "attrs", "brotli", - "certifi", - "charset-normalizer", "cramjam", "cython", - "exceptiongroup", "fsspec", - "hatchling", - "idna", "inflate64", - "iniconfig", - "mpi4py < 4", + "mpi4py", "multivolumefile", - "numpy", "opencv-python", - "packaging", "pandas", "pathspec", "pillow", - "pip", - "pluggy", "psutil", - "py4j", "pyarrow", "pybcj", "pycryptodomex", "pyppmd", "pyspark", - "pytest", - "python-dateutil", "python-dotenv", - "pytz", "pyyaml", "pyzstd", "requests", "setuptools", - "six", "texttable", - "tomli", "trove-classifiers", "typing-extensions", - "tzdata", - "urllib3", "wheel" ] @@ -85,4 +67,4 @@ Repository = "https://github.com/Imageomics/distributed-downloader.git" "Bug Tracker" = "https://github.com/Imageomics/distributed-downloader/issues" [tool.hatch.version] -path = "src/DD_tools/main/__about__.py" +path = "src/TreeOfLife_toolbox/main/__about__.py" diff --git a/src/DD_tools/__init__.py b/src/TreeOfLife_toolbox/__init__.py similarity index 100% rename from src/DD_tools/__init__.py rename to src/TreeOfLife_toolbox/__init__.py diff --git a/src/DD_tools/main/__about__.py b/src/TreeOfLife_toolbox/main/__about__.py similarity index 100% rename from src/DD_tools/main/__about__.py rename to src/TreeOfLife_toolbox/main/__about__.py diff --git a/src/DD_tools/main/checkpoint.py b/src/TreeOfLife_toolbox/main/checkpoint.py similarity index 100% rename from src/DD_tools/main/checkpoint.py rename to src/TreeOfLife_toolbox/main/checkpoint.py diff --git a/src/DD_tools/main/config.py b/src/TreeOfLife_toolbox/main/config.py similarity index 100% rename from src/DD_tools/main/config.py rename to src/TreeOfLife_toolbox/main/config.py diff --git a/src/DD_tools/main/config_templates/tools.yaml b/src/TreeOfLife_toolbox/main/config_templates/tools.yaml similarity index 100% rename from src/DD_tools/main/config_templates/tools.yaml rename to src/TreeOfLife_toolbox/main/config_templates/tools.yaml diff --git a/src/DD_tools/main/filter.py b/src/TreeOfLife_toolbox/main/filter.py similarity index 100% rename from src/DD_tools/main/filter.py rename to src/TreeOfLife_toolbox/main/filter.py diff --git a/src/DD_tools/main/filters.py b/src/TreeOfLife_toolbox/main/filters.py similarity index 100% rename from src/DD_tools/main/filters.py rename to src/TreeOfLife_toolbox/main/filters.py diff --git a/src/DD_tools/main/main.py b/src/TreeOfLife_toolbox/main/main.py similarity index 100% rename from src/DD_tools/main/main.py rename to src/TreeOfLife_toolbox/main/main.py diff --git a/src/DD_tools/main/registry.py b/src/TreeOfLife_toolbox/main/registry.py similarity index 100% rename from src/DD_tools/main/registry.py rename to src/TreeOfLife_toolbox/main/registry.py diff --git a/src/DD_tools/main/runner.py b/src/TreeOfLife_toolbox/main/runner.py similarity index 100% rename from src/DD_tools/main/runner.py rename to src/TreeOfLife_toolbox/main/runner.py diff --git a/src/DD_tools/main/runners.py b/src/TreeOfLife_toolbox/main/runners.py similarity index 100% rename from src/DD_tools/main/runners.py rename to src/TreeOfLife_toolbox/main/runners.py diff --git a/src/DD_tools/main/scheduler.py b/src/TreeOfLife_toolbox/main/scheduler.py similarity index 100% rename from src/DD_tools/main/scheduler.py rename to src/TreeOfLife_toolbox/main/scheduler.py diff --git a/src/DD_tools/main/schedulers.py b/src/TreeOfLife_toolbox/main/schedulers.py similarity index 100% rename from src/DD_tools/main/schedulers.py rename to src/TreeOfLife_toolbox/main/schedulers.py diff --git a/src/DD_tools/main/utils.py b/src/TreeOfLife_toolbox/main/utils.py similarity index 100% rename from src/DD_tools/main/utils.py rename to src/TreeOfLife_toolbox/main/utils.py diff --git a/src/DD_tools/main/verification.py b/src/TreeOfLife_toolbox/main/verification.py similarity index 100% rename from src/DD_tools/main/verification.py rename to src/TreeOfLife_toolbox/main/verification.py From 4a982dd82b03ab2bd8e9a8fc1a7ea270e51b6f53 Mon Sep 17 00:00:00 2001 From: Andrey170170 Date: Mon, 12 May 2025 02:34:30 -0400 Subject: [PATCH 3/5] Refactor import paths to use TreeOfLife_toolbox module. Updated all import statements to reference TreeOfLife_toolbox instead of DD_tools for consistency and clarity. Adjusted slurm scripts to align with the new module structure and standardized environment variables for toolbox path configuration. --- scripts/tools_filter.slurm | 3 +-- scripts/tools_scheduler.slurm | 3 +-- scripts/tools_verifier.slurm | 3 +-- scripts/tools_worker.slurm | 3 +-- src/TreeOfLife_toolbox/main/filter.py | 8 ++++---- src/TreeOfLife_toolbox/main/filters.py | 8 ++++---- src/TreeOfLife_toolbox/main/main.py | 10 ++++++---- src/TreeOfLife_toolbox/main/registry.py | 4 ++-- src/TreeOfLife_toolbox/main/runner.py | 8 ++++---- src/TreeOfLife_toolbox/main/runners.py | 4 ++-- src/TreeOfLife_toolbox/main/scheduler.py | 8 ++++---- src/TreeOfLife_toolbox/main/schedulers.py | 4 ++-- src/TreeOfLife_toolbox/main/verification.py | 10 +++++----- 13 files changed, 37 insertions(+), 39 deletions(-) diff --git a/scripts/tools_filter.slurm b/scripts/tools_filter.slurm index 4642e34..6aee3f6 100644 --- a/scripts/tools_filter.slurm +++ b/scripts/tools_filter.slurm @@ -19,11 +19,10 @@ executor_memory="64G" module load spark/3.4.1 module load miniconda3/23.3.1-py310 source "${REPO_ROOT}/.venv/bin/activate" -export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader" pbs-spark-submit \ --driver-memory $driver_memory \ --executor-memory $executor_memory \ - "${REPO_ROOT}/src/distributed_downloader/tools/filter.py" \ + "${TOOLBOX_PATH}/main/filter.py" \ "${tool_name}" \ > "${logs_dir}/tool_filter.log" diff --git a/scripts/tools_scheduler.slurm b/scripts/tools_scheduler.slurm index e4fb6a2..ea35a32 100644 --- a/scripts/tools_scheduler.slurm +++ b/scripts/tools_scheduler.slurm @@ -19,7 +19,6 @@ module load miniconda3/23.3.1-py310 source "${REPO_ROOT}/.venv/bin/activate" export PYARROW_IGNORE_TIMEZONE=1 export I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0 -export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader" srun \ --mpi=pmi2 \ @@ -28,4 +27,4 @@ srun \ --cpus-per-task=1 \ --mem=0 \ --output="${logs_dir}/tool_scheduler.log" \ - python "${REPO_ROOT}/src/distributed_downloader/tools/scheduler.py" "${tool_name}" + python "${TOOLBOX_PATH}/main/scheduler.py" "${tool_name}" diff --git a/scripts/tools_verifier.slurm b/scripts/tools_verifier.slurm index 98ca024..6a3b75e 100644 --- a/scripts/tools_verifier.slurm +++ b/scripts/tools_verifier.slurm @@ -19,7 +19,6 @@ module load miniconda3/23.3.1-py310 source "${REPO_ROOT}/.venv/bin/activate" export PYARROW_IGNORE_TIMEZONE=1 export I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0 -export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader" srun \ --mpi=pmi2 \ @@ -28,4 +27,4 @@ srun \ --cpus-per-task=1 \ --mem=0 \ --output="${logs_dir}/tool_verifier.log" \ - python "${REPO_ROOT}/src/distributed_downloader/tools/verification.py" "${tool_name}" + python "${TOOLBOX_PATH}/main/verification.py" "${tool_name}" diff --git a/scripts/tools_worker.slurm b/scripts/tools_worker.slurm index 2ee2662..4856e62 100644 --- a/scripts/tools_worker.slurm +++ b/scripts/tools_worker.slurm @@ -19,7 +19,6 @@ module load miniconda3/23.3.1-py310 source "${REPO_ROOT}/.venv/bin/activate" export PYARROW_IGNORE_TIMEZONE=1 export I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0 -export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader" srun \ --mpi=pmi2 \ @@ -28,4 +27,4 @@ srun \ --cpus-per-task="$TOOLS_CPU_PER_WORKER" \ --mem=0 \ --output="${logs_dir}/tool_worker-%2t.log" \ - python "${REPO_ROOT}/src/distributed_downloader/tools/runner.py" "${tool_name}" + python "${TOOLBOX_PATH}/main/runner.py" "${tool_name}" diff --git a/src/TreeOfLife_toolbox/main/filter.py b/src/TreeOfLife_toolbox/main/filter.py index 080e1a2..ed526c5 100644 --- a/src/TreeOfLife_toolbox/main/filter.py +++ b/src/TreeOfLife_toolbox/main/filter.py @@ -1,10 +1,10 @@ import argparse import os -from DD_tools.main.checkpoint import Checkpoint -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsRegistryBase -from DD_tools.main.utils import init_logger +from TreeOfLife_toolbox.main.checkpoint import Checkpoint +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsRegistryBase +from TreeOfLife_toolbox.main.utils import init_logger if __name__ == "__main__": config_path = os.environ.get("CONFIG_PATH") diff --git a/src/TreeOfLife_toolbox/main/filters.py b/src/TreeOfLife_toolbox/main/filters.py index 11c9426..385f18e 100644 --- a/src/TreeOfLife_toolbox/main/filters.py +++ b/src/TreeOfLife_toolbox/main/filters.py @@ -7,10 +7,10 @@ from pyspark.sql import SparkSession from pyspark.sql.types import StructType -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsBase -from DD_tools.main.registry import ToolsRegistryBase -from DD_tools.main.utils import SuccessEntry +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsBase +from TreeOfLife_toolbox.main.registry import ToolsRegistryBase +from TreeOfLife_toolbox.main.utils import SuccessEntry FilterRegister = partial(ToolsRegistryBase.register, "filter") diff --git a/src/TreeOfLife_toolbox/main/main.py b/src/TreeOfLife_toolbox/main/main.py index b3d5732..5272354 100644 --- a/src/TreeOfLife_toolbox/main/main.py +++ b/src/TreeOfLife_toolbox/main/main.py @@ -1,15 +1,16 @@ import argparse import os from logging import Logger +from pathlib import Path from typing import Dict, List, Optional, TextIO, Tuple import pandas as pd from attr import Factory, define, field -from DD_tools.main.checkpoint import Checkpoint -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsRegistryBase -from DD_tools.main.utils import ( +from TreeOfLife_toolbox.main.checkpoint import Checkpoint +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsRegistryBase +from TreeOfLife_toolbox.main.utils import ( init_logger, ensure_created, truncate_paths, @@ -78,6 +79,7 @@ def __attrs_post_init__(self): def __init_environment(self) -> None: os.environ["CONFIG_PATH"] = self.config.config_path + os.environ["TOOLBOX_PATH"] = str(Path(__file__).parent.parent.resolve()) os.environ["ACCOUNT"] = self.config["account"] os.environ["PATH_TO_INPUT"] = self.config["path_to_input"] diff --git a/src/TreeOfLife_toolbox/main/registry.py b/src/TreeOfLife_toolbox/main/registry.py index 12774dd..03cf9d6 100644 --- a/src/TreeOfLife_toolbox/main/registry.py +++ b/src/TreeOfLife_toolbox/main/registry.py @@ -1,7 +1,7 @@ from typing import Dict, Type, Optional -from DD_tools.main.config import Config -from DD_tools.main.utils import init_logger +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.utils import init_logger class ToolsRegistryBase(type): diff --git a/src/TreeOfLife_toolbox/main/runner.py b/src/TreeOfLife_toolbox/main/runner.py index 214237e..77dcefa 100644 --- a/src/TreeOfLife_toolbox/main/runner.py +++ b/src/TreeOfLife_toolbox/main/runner.py @@ -1,10 +1,10 @@ import argparse import os -from DD_tools.main.checkpoint import Checkpoint -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsRegistryBase -from DD_tools.main.utils import init_logger +from TreeOfLife_toolbox.main.checkpoint import Checkpoint +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsRegistryBase +from TreeOfLife_toolbox.main.utils import init_logger if __name__ == "__main__": config_path = os.environ.get("CONFIG_PATH") diff --git a/src/TreeOfLife_toolbox/main/runners.py b/src/TreeOfLife_toolbox/main/runners.py index cd875d3..bfb5d5e 100644 --- a/src/TreeOfLife_toolbox/main/runners.py +++ b/src/TreeOfLife_toolbox/main/runners.py @@ -6,8 +6,8 @@ import pandas as pd -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsBase, ToolsRegistryBase +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsBase, ToolsRegistryBase RunnerRegister = partial(ToolsRegistryBase.register, "runner") diff --git a/src/TreeOfLife_toolbox/main/scheduler.py b/src/TreeOfLife_toolbox/main/scheduler.py index 707b656..d686ae6 100644 --- a/src/TreeOfLife_toolbox/main/scheduler.py +++ b/src/TreeOfLife_toolbox/main/scheduler.py @@ -1,10 +1,10 @@ import argparse import os -from DD_tools.main.checkpoint import Checkpoint -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsRegistryBase -from DD_tools.main.utils import init_logger +from TreeOfLife_toolbox.main.checkpoint import Checkpoint +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsRegistryBase +from TreeOfLife_toolbox.main.utils import init_logger if __name__ == "__main__": config_path = os.environ.get("CONFIG_PATH") diff --git a/src/TreeOfLife_toolbox/main/schedulers.py b/src/TreeOfLife_toolbox/main/schedulers.py index ed70a9c..6b2c6e2 100644 --- a/src/TreeOfLife_toolbox/main/schedulers.py +++ b/src/TreeOfLife_toolbox/main/schedulers.py @@ -5,8 +5,8 @@ import pandas as pd -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsBase, ToolsRegistryBase +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsBase, ToolsRegistryBase SchedulerRegister = partial(ToolsRegistryBase.register, "scheduler") diff --git a/src/TreeOfLife_toolbox/main/verification.py b/src/TreeOfLife_toolbox/main/verification.py index 742bb86..31d2561 100644 --- a/src/TreeOfLife_toolbox/main/verification.py +++ b/src/TreeOfLife_toolbox/main/verification.py @@ -3,11 +3,11 @@ import pandas as pd -from DD_tools.main.checkpoint import Checkpoint -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsRegistryBase -from DD_tools.main.runners import MPIRunnerTool -from DD_tools.main.utils import init_logger +from TreeOfLife_toolbox.main.checkpoint import Checkpoint +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsRegistryBase +from TreeOfLife_toolbox.main.runners import MPIRunnerTool +from TreeOfLife_toolbox.main.utils import init_logger if __name__ == "__main__": config_path = os.environ.get("CONFIG_PATH") From 99ca3b016b14c27482d7152bba47d6db71b46334 Mon Sep 17 00:00:00 2001 From: Andrey170170 Date: Mon, 12 May 2025 02:39:08 -0400 Subject: [PATCH 4/5] Update metadata and dependencies in pyproject.toml Revised the project description, added programming language classifiers, and enhanced optional dependencies with 'ruff'. Introduced new keywords and added a script entry point for better usability. --- pyproject.toml | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index cb76174..b3e3a5c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,11 +13,14 @@ authors = [ { name = "Elizabeth G. Campolongo", email = "e.campolongo479@gmail.com" }, { name = "Matthew J. Thompson", email = "thompson.m.j@outlook.com" }, ] -description = "A tool for downloading files from a list of URLs in parallel." +description = "A tool for processing datasets that was downloaded using the distributed-downloader package." readme = "README.md" requires-python = ">=3.10, <3.12" classifiers = [ + "Development Status :: 4 - Beta", "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", ] @@ -52,13 +55,17 @@ dependencies = [ ] [project.optional-dependencies] -dev = ["pytest"] +dev = [ + "pytest", + "ruff" +] keywords = [ "parallel", "distributed", - "download", "url", + "mpi-applications", + "dataset-generation", ] [project.urls] @@ -66,5 +73,8 @@ Homepage = "https://github.com/Imageomics/distributed-downloader" Repository = "https://github.com/Imageomics/distributed-downloader.git" "Bug Tracker" = "https://github.com/Imageomics/distributed-downloader/issues" +[project.scripts] +tree_of_life_toolbox = "TreeOfLife_toolbox.main.main:main" + [tool.hatch.version] path = "src/TreeOfLife_toolbox/main/__about__.py" From 0d8bc3e76d6d46a934ca969617734661c9ae949d Mon Sep 17 00:00:00 2001 From: Andrey170170 Date: Wed, 14 May 2025 03:00:16 -0400 Subject: [PATCH 5/5] Refactor and relocate LILA filtering tool. The LILA multi-label separation tool was refactored for improved modularity and moved from `DD_tools` to `TreeOfLife_toolbox`. Updated class structure, documentation, and configurations to align with the new package and enhance clarity and maintainability. --- .../__init__.py | 0 .../classes.py | 131 --------- .../metadata_separation.py | 25 -- src/TreeOfLife_toolbox/__init__.py | 1 + .../README.md | 46 ++++ .../__init__.py | 5 + .../classes.py | 252 ++++++++++++++++++ 7 files changed, 304 insertions(+), 156 deletions(-) delete mode 100644 src/DD_tools/lila_separation_multilable_filtering/__init__.py delete mode 100644 src/DD_tools/lila_separation_multilable_filtering/classes.py delete mode 100644 src/DD_tools/lila_separation_multilable_filtering/metadata_separation.py create mode 100644 src/TreeOfLife_toolbox/lila_separation_multilable_filtering/README.md create mode 100644 src/TreeOfLife_toolbox/lila_separation_multilable_filtering/__init__.py create mode 100644 src/TreeOfLife_toolbox/lila_separation_multilable_filtering/classes.py diff --git a/src/DD_tools/lila_separation_multilable_filtering/__init__.py b/src/DD_tools/lila_separation_multilable_filtering/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/DD_tools/lila_separation_multilable_filtering/classes.py b/src/DD_tools/lila_separation_multilable_filtering/classes.py deleted file mode 100644 index a3e0410..0000000 --- a/src/DD_tools/lila_separation_multilable_filtering/classes.py +++ /dev/null @@ -1,131 +0,0 @@ -import os -import shutil -from typing import List - -import pandas as pd - -from DD_tools.main.config import Config -from DD_tools.main.filters import PythonFilterToolBase, FilterRegister -from DD_tools.main.runners import MPIRunnerTool, RunnerRegister -from DD_tools.main.schedulers import DefaultScheduler, SchedulerRegister - - -@FilterRegister("lila_separation") -class LilaSeparationFilter(PythonFilterToolBase): - def __init__(self, cfg: Config): - super().__init__(cfg) - - self.filter_name: str = "lila_separation" - self.data_path = "/users/PAS2119/andreykopanev/gbif/data/lila_separation_table/part-00000-6e425202-ecec-426d-9631-f2f52fd45c51-c000.csv" - - def run(self): - filter_table_folder = os.path.join( - self.tools_path, self.filter_name, "filter_table" - ) - os.makedirs(filter_table_folder, exist_ok=True) - filter_table_folder += "/table.csv" - - shutil.copyfile(self.data_path, filter_table_folder) - - -@SchedulerRegister("lila_separation") -class LilaSeparationScheduleCreation(DefaultScheduler): - def __init__(self, cfg: Config): - super().__init__(cfg) - - self.filter_name: str = "lila_separation" - - -@RunnerRegister("lila_separation") -class LilaSeparationRunner(MPIRunnerTool): - def __init__(self, cfg: Config): - super().__init__(cfg) - self.filter_name: str = "lila_separation" - - self.data_scheme: List[str] = [ - "uuid", - "source_id", - "uuid_main", - "source_id_main", - "server_name", - "old_partition_id", - "partition_id", - ] - self.verification_scheme: List[str] = ["server_name", "partition_id"] - self.new_path = "/fs/scratch/PAS2136/gbif/processed/lilabc/separated_multilabel_data/downloaded_image" - self.total_time = 600 - - def apply_filter( - self, filtering_df: pd.DataFrame, server_name: str, partition_id: str - ) -> int: - self.is_enough_time() - - # self.downloaded_images_path - filtering_df_grouped = filtering_df.groupby(["server_name", "old_partition_id"]) - separated_dict = [] - for name, group in filtering_df_grouped: - parquet_path = os.path.join( - self.downloaded_images_path, - f"server_name={name[0]}", - f"partition_id={name[1]}", - "successes.parquet", - ) - if not os.path.exists(parquet_path): - self.logger.info(f"Path doesn't exists: {server_name}/{partition_id}") - continue - - partial_df = pd.read_parquet( - parquet_path, filters=[("uuid", "in", group["uuid_main"])] - ) - partial_merged_df = pd.merge( - partial_df, - group, - left_on="uuid", - right_on="uuid_main", - suffixes=("_x", "_y"), - sort=False, - how="right", - ) - - partial_merged_df = partial_merged_df[ - [ - "uuid_y", - "source_id_y", - "identifier", - "is_license_full", - "license", - "source", - "title", - "hashsum_original", - "hashsum_resized", - "original_size", - "resized_size", - "image", - ] - ] - separated_dict.extend( - partial_merged_df.rename( - {"uuid_y": "uuid", "source_id_y": "source_id"}, inplace=True - ).to_dict("records") - ) - - merged_df = pd.DataFrame.from_records(separated_dict) - - self.is_enough_time() - - save_path = os.path.join( - self.new_path, f"server_name={server_name}", f"partition_id={partition_id}" - ) - os.makedirs(save_path, exist_ok=True) - - if len(merged_df) == 0: - self.logger.info(f"Empty: {server_name}/{partition_id}") - - merged_df.to_parquet( - save_path + "/successes.parquet", - index=False, - compression="zstd", - compression_level=3, - ) - - return len(merged_df) diff --git a/src/DD_tools/lila_separation_multilable_filtering/metadata_separation.py b/src/DD_tools/lila_separation_multilable_filtering/metadata_separation.py deleted file mode 100644 index 7ee6634..0000000 --- a/src/DD_tools/lila_separation_multilable_filtering/metadata_separation.py +++ /dev/null @@ -1,25 +0,0 @@ -from pyspark.sql import SparkSession - -base_path = "/fs/scratch/PAS2136/gbif/processed/lilabc/merged_data/servers_batched" -filter_path = "/users/PAS2119/andreykopanev/gbif/data/lila_separation_table/part-00000-6e425202-ecec-426d-9631-f2f52fd45c51-c000.csv" -save_path = "/fs/scratch/PAS2136/gbif/processed/lilabc/separated_multilabel_data/servers_batched" - -if __name__ == "__main__": - spark = SparkSession.builder.appName("Multimedia prep").getOrCreate() - spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED") - spark.conf.set("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED") - - metadata_df = spark.read.parquet(base_path).drop("partition_id") - filter_df = spark.read.csv(filter_path, header=True).select("uuid", "partition_id") - - df = metadata_df.join(filter_df, on="uuid", how="inner") - - (df - .repartition("server_name", "partition_id") - .write - .partitionBy("server_name", "partition_id") - .mode("overwrite") - .format("parquet") - .save(save_path)) - - spark.stop() diff --git a/src/TreeOfLife_toolbox/__init__.py b/src/TreeOfLife_toolbox/__init__.py index e69de29..4a2fb97 100644 --- a/src/TreeOfLife_toolbox/__init__.py +++ b/src/TreeOfLife_toolbox/__init__.py @@ -0,0 +1 @@ +from TreeOfLife_toolbox import lila_separation_multilable_filtering diff --git a/src/TreeOfLife_toolbox/lila_separation_multilable_filtering/README.md b/src/TreeOfLife_toolbox/lila_separation_multilable_filtering/README.md new file mode 100644 index 0000000..e9b7a87 --- /dev/null +++ b/src/TreeOfLife_toolbox/lila_separation_multilable_filtering/README.md @@ -0,0 +1,46 @@ +# LILA Separation Multilabel Filtering Tool + +## Overview + +This tool extracts and processes multi-labeled images from the LILA (LILA BC - Labeled Information Library of +Alexandria: Biodiversity Catalog) dataset. Multi-labeled images are those containing multiple objects of interest, each +with its own label. The tool creates a new dataset containing only these multi-labeled images with proper metadata, +effectively separating them from single-labeled images for specialized analysis or training. + +The workflow consists of three main components: + +1. **Filter (`LilaSeparationFilter`)**: Identifies multi-labeled images by joining metadata with a provided multi-label + entries file. +2. **Scheduler (`LilaSeparationScheduleCreation`)**: Creates a processing schedule to distribute work across compute + nodes. +3. **Runner (`LilaSeparationRunner`)**: Processes images according to the schedule, extracting and storing multi-labeled + images in a new location. + +## Configuration Requirements + +The following fields must be specified in the configuration file: + +- `new_images_path`: Destination path where processed multi-labeled images will be stored +- `new_urls_folder`: Destination path where metadata/URLs for multi-labeled images will be stored +- `multilabel_data_path`: Path to CSV file containing only multi-label entries + +## Pre-conditions + +For the tool to work correctly: + +- The input `multilabel_data_path` CSV file must: + - Contain only multi-label entries + - Include both `uuid` and `partition_id` columns + - Have the `partition_id` already repartitioned for the new dataset + +- The original LILA dataset must: + - Follow the distributed-downloader format + - Contain the original images referenced in the multi-label CSV + +## Post-conditions + +After successful execution: + +- A new dataset containing only multi-labeled images will be created at `new_images_path` +- Corresponding metadata will be stored at `new_urls_folder` +- The new dataset will follow the distributed-downloader format diff --git a/src/TreeOfLife_toolbox/lila_separation_multilable_filtering/__init__.py b/src/TreeOfLife_toolbox/lila_separation_multilable_filtering/__init__.py new file mode 100644 index 0000000..9caf5eb --- /dev/null +++ b/src/TreeOfLife_toolbox/lila_separation_multilable_filtering/__init__.py @@ -0,0 +1,5 @@ +from .classes import ( + LilaSeparationFilter, + LilaSeparationScheduleCreation, + LilaSeparationRunner, +) diff --git a/src/TreeOfLife_toolbox/lila_separation_multilable_filtering/classes.py b/src/TreeOfLife_toolbox/lila_separation_multilable_filtering/classes.py new file mode 100644 index 0000000..a755ab4 --- /dev/null +++ b/src/TreeOfLife_toolbox/lila_separation_multilable_filtering/classes.py @@ -0,0 +1,252 @@ +import os +import shutil +from typing import List + +import pandas as pd + +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.filters import ( + FilterRegister, + SparkFilterToolBase, +) +from TreeOfLife_toolbox.main.runners import MPIRunnerTool, RunnerRegister +from TreeOfLife_toolbox.main.schedulers import DefaultScheduler, SchedulerRegister + + +@FilterRegister("lila_separation_multilable_filtering") +class LilaSeparationFilter(SparkFilterToolBase): + """ + Filter class for separating multi-labeled images from LILA dataset. + + This class handles the initial filtering step of the workflow by: + 1. Copying the multi-label data table to the tool's filter folder + 2. Joining image metadata with the multi-label filter data + 3. Creating a new dataset with only the multi-labeled entries + + Attributes: + filter_name (str): Name of the filter tool + new_urls_folder (str): Target location for storing filtered metadata/URLs + data_path (str): Path to CSV containing multi-label image entries + """ + + def __init__(self, cfg: Config): + """ + Initialize the LILA separation filter. + + Args: + cfg (Config): Configuration object containing paths and parameters + """ + super().__init__(cfg) + + self.filter_name: str = "lila_separation_multilable_filtering" + self.new_urls_folder = cfg["new_urls_folder"] + self.data_path = cfg["multilabel_data_path"] + + def run(self): + """ + Execute the filtering process. + + This method: + 1. Sets up the filter table directory + 2. Copies the multi-label data to the filter table location + 3. Loads the original metadata and filter table into Spark DataFrames + 4. Joins the tables to identify multi-label images + 5. Saves the filtered metadata to the new URLs folder + """ + filter_table_folder = os.path.join( + self.tools_path, self.filter_name, "filter_table" + ) + os.makedirs(filter_table_folder, exist_ok=True) + filter_table_folder += "/table.csv" + + shutil.copyfile(self.data_path, filter_table_folder) + + metadata_df = self.spark.read.parquet(self.urls_path).drop("partition_id") + filter_df = self.spark.read.csv(self.data_path, header=True).select( + "uuid", "partition_id" + ) + + df = metadata_df.join(filter_df, on="uuid", how="inner") + + ( + df.repartition("server_name", "partition_id") + .write.partitionBy("server_name", "partition_id") + .mode("overwrite") + .format("parquet") + .save(self.new_urls_folder) + ) + + +@SchedulerRegister("lila_separation_multilable_filtering") +class LilaSeparationScheduleCreation(DefaultScheduler): + """ + Scheduler class for orchestrating the LILA multi-label separation process. + + This class inherits from DefaultScheduler and manages the creation of the + processing schedule for separating multi-labeled images. + + The scheduler creates a list of tasks based on server_name and partition_id + combinations that need to be processed. + + Attributes: + filter_name (str): Name of the filter tool + """ + + def __init__(self, cfg: Config): + """ + Initialize the LILA separation scheduler. + + Args: + cfg (Config): Configuration object containing paths and parameters + """ + super().__init__(cfg) + + self.filter_name: str = "lila_separation_multilable_filtering" + + +@RunnerRegister("lila_separation_multilable_filtering") +class LilaSeparationRunner(MPIRunnerTool): + """ + Runner class for executing the LILA multi-label image separation. + + This class performs the actual data processing to extract multi-labeled + images from the original dataset and store them in a new location with + proper metadata. + + The runner works in a distributed MPI environment, with each process + handling a subset of the data partitions. + + Attributes: + filter_name (str): Name of the filter tool + data_scheme (List[str]): Column structure of the input dataset + verification_scheme (List[str]): Columns used for verifying task completion + new_images_path (str): Path where separated images will be stored + total_time (int): Maximum processing time in seconds before timeout + """ + + def __init__(self, cfg: Config): + """ + Initialize the LILA separation runner. + + Args: + cfg (Config): Configuration object containing paths and parameters + """ + super().__init__(cfg) + self.filter_name: str = "lila_separation_multilable_filtering" + + self.data_scheme: List[str] = [ + "uuid", + "source_id", + "uuid_main", + "source_id_main", + "server_name", + "old_partition_id", + "partition_id", + ] + self.verification_scheme: List[str] = ["server_name", "partition_id"] + self.new_images_path = cfg["new_images_path"] + self.total_time = 600 + + def apply_filter( + self, filtering_df: pd.DataFrame, server_name: str, partition_id: str + ) -> int: + """ + Extract multi-labeled images from the original dataset. + + This method processes a specific server/partition combination: + 1. Groups filter data by server_name and old_partition_id + 2. Reads image data from original locations based on UUIDs + 3. Merges image data with filter information + 4. Saves the multi-labeled images to new locations + + Args: + filtering_df (pd.DataFrame): DataFrame containing filter information + server_name (str): Name of the server to process + partition_id (str): ID of the partition to process + + Returns: + int: Number of images processed and saved + + Raises: + TimeoutError: If processing exceeds the allocated time limit + """ + self.is_enough_time() + + # Group filtering data by server and partition to find original image locations + filtering_df_grouped = filtering_df.groupby(["server_name", "old_partition_id"]) + separated_dict = [] + for name, group in filtering_df_grouped: + parquet_path = os.path.join( + self.downloaded_images_path, + f"server_name={name[0]}", + f"partition_id={name[1]}", + "successes.parquet", + ) + if not os.path.exists(parquet_path): + self.logger.info(f"Path doesn't exists: {server_name}/{partition_id}") + continue + + # Read original image data matching the UUIDs in our filter group + partial_df = pd.read_parquet( + parquet_path, filters=[("uuid", "in", group["uuid_main"])] + ) + + # Merge image data with filter information + partial_merged_df = pd.merge( + partial_df, + group, + left_on="uuid", + right_on="uuid_main", + suffixes=("_x", "_y"), + sort=False, + how="right", + ) + + # Select and rename columns for the output dataset + partial_merged_df = partial_merged_df[ + [ + "uuid_y", + "source_id_y", + "identifier", + "is_license_full", + "license", + "source", + "title", + "hashsum_original", + "hashsum_resized", + "original_size", + "resized_size", + "image", + ] + ] + separated_dict.extend( + partial_merged_df.rename( + {"uuid_y": "uuid", "source_id_y": "source_id"}, inplace=True + ).to_dict("records") + ) + + # Create DataFrame from collected records + merged_df = pd.DataFrame.from_records(separated_dict) + + self.is_enough_time() + + # Create output directory and save processed data + save_path = os.path.join( + self.new_images_path, + f"server_name={server_name}", + f"partition_id={partition_id}", + ) + os.makedirs(save_path, exist_ok=True) + + if len(merged_df) == 0: + self.logger.info(f"Empty: {server_name}/{partition_id}") + + # Save processed data to parquet file + merged_df.to_parquet( + save_path + "/successes.parquet", + index=False, + compression="zstd", + compression_level=3, + ) + + return len(merged_df)