Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 18 additions & 26 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,86 +3,78 @@ requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["src/DD_tools"]
packages = ["src/TreeOfLife_toolbox"]

[project]
name = "DD_tools"
name = "TreeOfLife_toolbox"
dynamic = ["version"]
authors = [
{ name = "Andrey Kopanev", email = "kopanev.1@osu.edu" },
{ name = "Elizabeth G. Campolongo", email = "e.campolongo479@gmail.com" },
{ name = "Matthew J. Thompson", email = "thompson.m.j@outlook.com" },
]
description = "A tool for downloading files from a list of URLs in parallel."
description = "A tool for processing datasets that was downloaded using the distributed-downloader package."
readme = "README.md"
requires-python = ">=3.8"
requires-python = ">=3.10, <3.12"
classifiers = [
"Development Status :: 4 - Beta",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
dependencies = [
"attrs",
"brotli",
"certifi",
"charset-normalizer",
"cramjam",
"cython",
"exceptiongroup",
"fsspec",
"hatchling",
"idna",
"inflate64",
"iniconfig",
"mpi4py < 4",
"mpi4py",
"multivolumefile",
"numpy",
"opencv-python",
"packaging",
"pandas",
"pathspec",
"pillow",
"pip",
"pluggy",
"psutil",
"py4j",
"pyarrow",
"pybcj",
"pycryptodomex",
"pyppmd",
"pyspark",
"pytest",
"python-dateutil",
"python-dotenv",
"pytz",
"pyyaml",
"pyzstd",
"requests",
"setuptools",
"six",
"texttable",
"tomli",
"trove-classifiers",
"typing-extensions",
"tzdata",
"urllib3",
"wheel"
]

[project.optional-dependencies]
dev = ["pytest"]
dev = [
"pytest",
"ruff"
]

keywords = [
"parallel",
"distributed",
"download",
"url",
"mpi-applications",
"dataset-generation",
]

[project.urls]
Homepage = "https://github.com/Imageomics/distributed-downloader"
Repository = "https://github.com/Imageomics/distributed-downloader.git"
"Bug Tracker" = "https://github.com/Imageomics/distributed-downloader/issues"

[project.scripts]
tree_of_life_toolbox = "TreeOfLife_toolbox.main.main:main"

[tool.hatch.version]
path = "src/DD_tools/main/__about__.py"
path = "src/TreeOfLife_toolbox/main/__about__.py"
3 changes: 1 addition & 2 deletions scripts/tools_filter.slurm
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ executor_memory="64G"
module load spark/3.4.1
module load miniconda3/23.3.1-py310
source "${REPO_ROOT}/.venv/bin/activate"
export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader"

pbs-spark-submit \
--driver-memory $driver_memory \
--executor-memory $executor_memory \
"${REPO_ROOT}/src/distributed_downloader/tools/filter.py" \
"${TOOLBOX_PATH}/main/filter.py" \
"${tool_name}" \
> "${logs_dir}/tool_filter.log"
3 changes: 1 addition & 2 deletions scripts/tools_scheduler.slurm
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ module load miniconda3/23.3.1-py310
source "${REPO_ROOT}/.venv/bin/activate"
export PYARROW_IGNORE_TIMEZONE=1
export I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0
export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader"

srun \
--mpi=pmi2 \
Expand All @@ -28,4 +27,4 @@ srun \
--cpus-per-task=1 \
--mem=0 \
--output="${logs_dir}/tool_scheduler.log" \
python "${REPO_ROOT}/src/distributed_downloader/tools/scheduler.py" "${tool_name}"
python "${TOOLBOX_PATH}/main/scheduler.py" "${tool_name}"
3 changes: 1 addition & 2 deletions scripts/tools_verifier.slurm
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ module load miniconda3/23.3.1-py310
source "${REPO_ROOT}/.venv/bin/activate"
export PYARROW_IGNORE_TIMEZONE=1
export I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0
export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader"

srun \
--mpi=pmi2 \
Expand All @@ -28,4 +27,4 @@ srun \
--cpus-per-task=1 \
--mem=0 \
--output="${logs_dir}/tool_verifier.log" \
python "${REPO_ROOT}/src/distributed_downloader/tools/verification.py" "${tool_name}"
python "${TOOLBOX_PATH}/main/verification.py" "${tool_name}"
3 changes: 1 addition & 2 deletions scripts/tools_worker.slurm
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ module load miniconda3/23.3.1-py310
source "${REPO_ROOT}/.venv/bin/activate"
export PYARROW_IGNORE_TIMEZONE=1
export I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0
export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader"

srun \
--mpi=pmi2 \
Expand All @@ -28,4 +27,4 @@ srun \
--cpus-per-task="$TOOLS_CPU_PER_WORKER" \
--mem=0 \
--output="${logs_dir}/tool_worker-%2t.log" \
python "${REPO_ROOT}/src/distributed_downloader/tools/runner.py" "${tool_name}"
python "${TOOLBOX_PATH}/main/runner.py" "${tool_name}"
Empty file removed src/DD_tools/__init__.py
Empty file.
1 change: 1 addition & 0 deletions src/TreeOfLife_toolbox/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from TreeOfLife_toolbox import lila_bc_filtering
68 changes: 68 additions & 0 deletions src/TreeOfLife_toolbox/lila_bc_filtering/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# LILA Biodiversity Catalog Filtering Tool

## Overview

The LILA BC Filtering tool is a specialized component of the TreeOfLife toolbox designed to filter out images from a
dataset based on their labels. Specifically, it removes images whose original labels match those specified in an
exclusion list. This tool is primarily built to work with the LILA (Labeled Information Library of Alexandria)
Biodiversity Catalog dataset, but can be applied to any dataset adhering to the proper format requirements.

## How It Works

The tool operates in three sequential stages:

1. **Filtering (LilaBCFilter)**:
- Loads image data from parquet files
- Loads original labels from the URLs table
- Identifies images with labels matching the exclusion list
- Creates a filter table containing UUIDs of images to be removed

2. **Scheduling (LilaBCScheduleCreation)**:
- Creates a work distribution schedule for parallel processing
- Assigns batches of images to different workers

3. **Running (LilaBCRunner)**:
- Executes the actual filtering operation using MPI
- Removes matched images from the dataset
- Retains the same directory structure and filenames

## Required Configuration

### Mandatory Config Fields

- `path_to_excluding_labels`: Path to a CSV file containing the labels to be excluded from the dataset

## Prerequisites

### Pre-conditions

- The dataset must follow the `distributed-downloader` format structure
- The dataset's URL table must contain a column named `original_label`
- The CSV file specified in `path_to_excluding_labels` must exist with proper headers
- The exclusion labels CSV must have a column that matches the 'original_label' values in the dataset

### Input Format

- The exclusion labels file should be a CSV file with column headers
- At minimum, it must contain an 'original_label' column with the labels to exclude

## Outcomes

### Post-conditions

- The dataset will be filtered to exclude all images with labels matching those in the exclusion list
- Original parquet files will be replaced with filtered versions (retaining the same paths and names)
- The tool will maintain a record of all filtered images
- The filtering process is idempotent - running it multiple times will not cause additional data loss

### Output Files

- Filtered parquet files in the original dataset structure
- Filtering logs and statistics in the tools directory
- Verification files to confirm successful processing of each partition

## Notes

- In theory, this tool can be used for any dataset, as long as the original dataset contains an `original_label` column.
- The filtering process is performed in-place, so make backups if you need to preserve the original data.
- The tool is designed to run efficiently on distributed systems using MPI.
1 change: 1 addition & 0 deletions src/TreeOfLife_toolbox/lila_bc_filtering/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .classes import LilaBCFilter, LilaBCScheduleCreation, LilaBCRunner
114 changes: 114 additions & 0 deletions src/TreeOfLife_toolbox/lila_bc_filtering/classes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import pyspark.sql as ps
from pyspark.sql import SparkSession

from TreeOfLife_toolbox.main.config import Config
from TreeOfLife_toolbox.main.filters import FilterRegister, SparkFilterToolBase
from TreeOfLife_toolbox.main.runners import RunnerRegister, FilterRunnerTool
from TreeOfLife_toolbox.main.schedulers import DefaultScheduler, SchedulerRegister


@FilterRegister("lila_bc_filtering")
class LilaBCFilter(SparkFilterToolBase):
"""
Filter class for filtering out images from LILA Biodiversity Catalog based on specified labels.

This class identifies images that have labels matching those in the excluding labels file
and creates a filter table containing UUIDs of those images. These images will later
be removed from the dataset by the runner.

Attributes:
filter_name (str): Name of the filter tool, used for folder structure.
path_to_excluding_labels (str): Path to CSV containing labels to be excluded.
"""

def __init__(self, cfg: Config, spark: SparkSession = None):
"""
Initialize the LILA BC filter with configuration.

Args:
cfg (Config): Configuration object containing paths and settings.
spark (SparkSession, optional): Existing SparkSession. If None, a new one will be created.
"""
super().__init__(cfg, spark)
self.filter_name: str = "lila_bc_filtering"
self.path_to_excluding_labels = cfg["path_to_excluding_labels"]

def run(self):
"""
Execute the filtering process.

This method:
1. Loads the image data from the parquet files
2. Loads the original labels from URLs table
3. Loads the labels to be excluded
4. Joins the datasets to identify images with labels to exclude
5. Saves the filter table for later processing by the runner

Returns:
None
"""
successes_df: ps.DataFrame = self.load_data_parquet()
data_df = self.spark.read.parquet(self.urls_path).select(
"uuid", "original_label"
)
labels_to_exclude_df = self.spark.read.csv(
self.path_to_excluding_labels, header=True
)

merged_df = successes_df.join(data_df, on="uuid", how="inner")
filtered_df = merged_df.join(
labels_to_exclude_df, on="original_label", how="inner"
).select("uuid", "source_id", "server_name", "partition_id")

self.save_filter(filtered_df)

self.logger.info(f"Images to filter out: {filtered_df.count()}")


@SchedulerRegister("lila_bc_filtering")
class LilaBCScheduleCreation(DefaultScheduler):
"""
Scheduler for LILA BC filtering tool.

This class creates a schedule for parallel processing of the filtering task.
It inherits from DefaultScheduler which manages the distribution of work
across available workers.

Attributes:
filter_name (str): Name of the filter tool, used for folder structure.
"""
def __init__(self, cfg: Config):
"""
Initialize the LILA BC scheduler.

Args:
cfg (Config): Configuration object containing paths and settings.
"""
super().__init__(cfg)

self.filter_name: str = "lila_bc_filtering"


@RunnerRegister("lila_bc_filtering")
class LilaBCRunner(FilterRunnerTool):
"""
Runner for LILA BC filtering tool.

This class executes the actual filtering operation by removing images
with specified labels. It uses MPI to distribute work across multiple nodes.
Inherits from FilterRunnerTool which provides common functionality for
filtering operations on downloaded images.

Attributes:
filter_name (str): Name of the filter tool, used for folder structure.
"""
def __init__(self, cfg: Config):
"""
Initialize the LILA BC runner.

Args:
cfg (Config): Configuration object containing paths and settings.
"""
super().__init__(cfg)

self.filter_name: str = "lila_bc_filtering"
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import argparse
import os

from DD_tools.main.checkpoint import Checkpoint
from DD_tools.main.config import Config
from DD_tools.main.registry import ToolsRegistryBase
from DD_tools.main.utils import init_logger
from TreeOfLife_toolbox.main.checkpoint import Checkpoint
from TreeOfLife_toolbox.main.config import Config
from TreeOfLife_toolbox.main.registry import ToolsRegistryBase
from TreeOfLife_toolbox.main.utils import init_logger

if __name__ == "__main__":
config_path = os.environ.get("CONFIG_PATH")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType

from DD_tools.main.config import Config
from DD_tools.main.registry import ToolsBase
from DD_tools.main.registry import ToolsRegistryBase
from DD_tools.main.utils import SuccessEntry
from TreeOfLife_toolbox.main.config import Config
from TreeOfLife_toolbox.main.registry import ToolsBase
from TreeOfLife_toolbox.main.registry import ToolsRegistryBase
from TreeOfLife_toolbox.main.utils import SuccessEntry

FilterRegister = partial(ToolsRegistryBase.register, "filter")

Expand Down
Loading