Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 18 additions & 26 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,86 +3,78 @@ requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["src/DD_tools"]
packages = ["src/TreeOfLife_toolbox"]

[project]
name = "DD_tools"
name = "TreeOfLife_toolbox"
dynamic = ["version"]
authors = [
{ name = "Andrey Kopanev", email = "kopanev.1@osu.edu" },
{ name = "Elizabeth G. Campolongo", email = "e.campolongo479@gmail.com" },
{ name = "Matthew J. Thompson", email = "thompson.m.j@outlook.com" },
]
description = "A tool for downloading files from a list of URLs in parallel."
description = "A tool for processing datasets that was downloaded using the distributed-downloader package."
readme = "README.md"
requires-python = ">=3.8"
requires-python = ">=3.10, <3.12"
classifiers = [
"Development Status :: 4 - Beta",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
dependencies = [
"attrs",
"brotli",
"certifi",
"charset-normalizer",
"cramjam",
"cython",
"exceptiongroup",
"fsspec",
"hatchling",
"idna",
"inflate64",
"iniconfig",
"mpi4py < 4",
"mpi4py",
"multivolumefile",
"numpy",
"opencv-python",
"packaging",
"pandas",
"pathspec",
"pillow",
"pip",
"pluggy",
"psutil",
"py4j",
"pyarrow",
"pybcj",
"pycryptodomex",
"pyppmd",
"pyspark",
"pytest",
"python-dateutil",
"python-dotenv",
"pytz",
"pyyaml",
"pyzstd",
"requests",
"setuptools",
"six",
"texttable",
"tomli",
"trove-classifiers",
"typing-extensions",
"tzdata",
"urllib3",
"wheel"
]

[project.optional-dependencies]
dev = ["pytest"]
dev = [
"pytest",
"ruff"
]

keywords = [
"parallel",
"distributed",
"download",
"url",
"mpi-applications",
"dataset-generation",
]

[project.urls]
Homepage = "https://github.com/Imageomics/distributed-downloader"
Repository = "https://github.com/Imageomics/distributed-downloader.git"
"Bug Tracker" = "https://github.com/Imageomics/distributed-downloader/issues"

[project.scripts]
tree_of_life_toolbox = "TreeOfLife_toolbox.main.main:main"

[tool.hatch.version]
path = "src/DD_tools/main/__about__.py"
path = "src/TreeOfLife_toolbox/main/__about__.py"
3 changes: 1 addition & 2 deletions scripts/tools_filter.slurm
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ executor_memory="64G"
module load spark/3.4.1
module load miniconda3/23.3.1-py310
source "${REPO_ROOT}/.venv/bin/activate"
export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader"

pbs-spark-submit \
--driver-memory $driver_memory \
--executor-memory $executor_memory \
"${REPO_ROOT}/src/distributed_downloader/tools/filter.py" \
"${TOOLBOX_PATH}/main/filter.py" \
"${tool_name}" \
> "${logs_dir}/tool_filter.log"
3 changes: 1 addition & 2 deletions scripts/tools_scheduler.slurm
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ module load miniconda3/23.3.1-py310
source "${REPO_ROOT}/.venv/bin/activate"
export PYARROW_IGNORE_TIMEZONE=1
export I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0
export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader"

srun \
--mpi=pmi2 \
Expand All @@ -28,4 +27,4 @@ srun \
--cpus-per-task=1 \
--mem=0 \
--output="${logs_dir}/tool_scheduler.log" \
python "${REPO_ROOT}/src/distributed_downloader/tools/scheduler.py" "${tool_name}"
python "${TOOLBOX_PATH}/main/scheduler.py" "${tool_name}"
3 changes: 1 addition & 2 deletions scripts/tools_verifier.slurm
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ module load miniconda3/23.3.1-py310
source "${REPO_ROOT}/.venv/bin/activate"
export PYARROW_IGNORE_TIMEZONE=1
export I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0
export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader"

srun \
--mpi=pmi2 \
Expand All @@ -28,4 +27,4 @@ srun \
--cpus-per-task=1 \
--mem=0 \
--output="${logs_dir}/tool_verifier.log" \
python "${REPO_ROOT}/src/distributed_downloader/tools/verification.py" "${tool_name}"
python "${TOOLBOX_PATH}/main/verification.py" "${tool_name}"
3 changes: 1 addition & 2 deletions scripts/tools_worker.slurm
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ module load miniconda3/23.3.1-py310
source "${REPO_ROOT}/.venv/bin/activate"
export PYARROW_IGNORE_TIMEZONE=1
export I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0
export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader"

srun \
--mpi=pmi2 \
Expand All @@ -28,4 +27,4 @@ srun \
--cpus-per-task="$TOOLS_CPU_PER_WORKER" \
--mem=0 \
--output="${logs_dir}/tool_worker-%2t.log" \
python "${REPO_ROOT}/src/distributed_downloader/tools/runner.py" "${tool_name}"
python "${TOOLBOX_PATH}/main/runner.py" "${tool_name}"
Empty file removed src/DD_tools/__init__.py
Empty file.
1 change: 1 addition & 0 deletions src/TreeOfLife_toolbox/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from TreeOfLife_toolbox import data_merging
81 changes: 81 additions & 0 deletions src/TreeOfLife_toolbox/data_merging/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Data Merging Tool

This tool identifies and filters out duplicate images between a target dataset and a newly downloaded source dataset
based on image hash values. It's designed to ensure data uniqueness when integrating new data into an existing
repository.

## What it does

The tool performs the following steps:

1. Scans a target dataset (existing data collection)
2. Scans a source dataset (newly downloaded data)
3. Identifies duplicate images by comparing `hashsum_original` values
4. Creates a filter table of duplicates that can be used for further processing
5. Filters out duplicated entries from the source dataset

## Required fields in config

- `merge_target`: Path to the target dataset folder that will be checked against the new dataset

## Initial assumptions/preconditions

**Target dataset:**

- It follows the following structure:

```
<dst_image_folder>/
├── server=<server1>
│ ├── data_<uuid1>.parquet
│ ├── data_<uuid2>.parquet
│ └── ...
└── server=<server2>
└── ...
```

- Entries in the target dataset are unique on `hashsum_original` column
- Each file follows the `successes` scheme from `distributed-downloader`

**Source dataset:**

- It follows the following structure:

```
<path_to_output_folder>/<images_folder>/
├── server_name=<server1>
│ ├── partition_id=<id>
│ │ ├── successes.parquet
│ │ ├── errors.parquet
│ │ └── completed
│ └── partition_id=<id2>
│ ├── ...
└── server_name=<server2>
└── ...
```

- Each file follows the `successes` scheme from `distributed-downloader`
- Entries in the source are unique on `hashsum_original` column

## Post conditions

After successful execution, the tool guarantees:

- A CSV filter table is created containing metadata about all duplicated entries, including their source and target
UUIDs
- The duplicated records are accessible via the tool's filter table, showing the relationship between source and target
entries
- The number of duplicated entries is logged for verification
- Processing is distributed across available nodes for efficient execution
- Original data files remain untouched - this is a non-destructive analysis

## Implementation Details

The tool consists of three main components:

1. **DataMergedDupCheckFilter** - Identifies duplicate records by comparing hashsums
2. **DataMergedDupCheckScheduleCreation** - Handles task scheduling for distributed processing
3. **DataMergedDupCheckRunner** - Executes the actual filtering process

The implementation relies on Apache Spark for efficient distributed data processing, making it suitable for large
datasets.
5 changes: 5 additions & 0 deletions src/TreeOfLife_toolbox/data_merging/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .classes import (
DataMergedDupCheckFilter,
DataMergedDupCheckScheduleCreation,
DataMergedDupCheckRunner,
)
122 changes: 122 additions & 0 deletions src/TreeOfLife_toolbox/data_merging/classes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
from TreeOfLife_toolbox.main.config import Config
from TreeOfLife_toolbox.main.filters import FilterRegister, SparkFilterToolBase
from TreeOfLife_toolbox.main.runners import RunnerRegister, FilterRunnerTool
from TreeOfLife_toolbox.main.schedulers import DefaultScheduler, SchedulerRegister


@FilterRegister("merged_duplicated_check")
class DataMergedDupCheckFilter(SparkFilterToolBase):
"""
Filter that identifies duplicate records between a target dataset and a source dataset.

This filter compares records based on their 'hashsum_original' column to identify
entries that exist in both the target dataset (specified by 'merge_target' in config)
and the source dataset (downloaded images).

Attributes:
filter_name (str): Name identifier for the filter, set to "merged_duplicated_check".
merge_target (str): Path to the target dataset for duplicate checking.
"""

def __init__(self, cfg: Config):
"""
Initialize the duplicate check filter with configuration.

Args:
cfg (Config): Configuration object containing filter settings
including the 'merge_target' path.
"""
super().__init__(cfg)

self.filter_name: str = "merged_duplicated_check"
self.merge_target: str = str(self.config["merge_target"])

def run(self):
"""
Execute the duplicate check process.

This method:
1. Reads the target dataset from the specified merge_target path
2. Reads the source dataset from the downloaded_images_path
3. Joins the datasets on 'hashsum_original' to find duplicates
4. Saves the duplicate records and logs the count

Returns:
None
"""
from pyspark.sql import DataFrame
import pyspark.sql.functions as func

target_df: DataFrame = (
self.spark.read.option("basePath", self.merge_target)
.parquet(self.merge_target + "/source=*/server=*/data_*.parquet")
.select("uuid", "source_id", "hashsum_original")
.withColumnsRenamed({"uuid": "uuid_main", "source_id": "source_id_main"})
)

target_df = target_df.withColumn("file_main", func.input_file_name())

object_df = (
self.spark.read.schema(self.success_scheme)
.option("basePath", self.downloaded_images_path)
.parquet(self.downloaded_images_path + "/*/*/successes.parquet")
.select(
"uuid", "source_id", "server_name", "partition_id", "hashsum_original"
)
)

duplicate_records = object_df.join(
target_df, on=["hashsum_original"], how="inner"
)

self.save_filter(duplicate_records)

self.logger.info(f"duplicated number: {duplicate_records.count()}")


@SchedulerRegister("merged_duplicated_check")
class DataMergedDupCheckScheduleCreation(DefaultScheduler):
"""
Scheduler for the duplicate check process.

Extends the DefaultScheduler to handle scheduling of the
duplicate checking tasks. This scheduler is registered with
the "merged_duplicated_check" name.

Attributes:
filter_name (str): Name identifier for the filter, set to "merged_duplicated_check".
"""
def __init__(self, cfg: Config):
"""
Initialize the duplicate check scheduler with configuration.

Args:
cfg (Config): Configuration object for scheduling parameters.
"""
super().__init__(cfg)

self.filter_name: str = "merged_duplicated_check"


@RunnerRegister("merged_duplicated_check")
class DataMergedDupCheckRunner(FilterRunnerTool):
"""
Runner for executing the duplicate check filter.

Handles the execution flow of the duplicate checking process,
extending the FilterRunnerTool to provide specific functionality
for the merged_duplicated_check filter.

Attributes:
filter_name (str): Name identifier for the filter, set to "merged_duplicated_check".
"""
def __init__(self, cfg: Config):
"""
Initialize the duplicate check runner with configuration.

Args:
cfg (Config): Configuration object containing runner settings.
"""
super().__init__(cfg)

self.filter_name: str = "merged_duplicated_check"
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import argparse
import os

from DD_tools.main.checkpoint import Checkpoint
from DD_tools.main.config import Config
from DD_tools.main.registry import ToolsRegistryBase
from DD_tools.main.utils import init_logger
from TreeOfLife_toolbox.main.checkpoint import Checkpoint
from TreeOfLife_toolbox.main.config import Config
from TreeOfLife_toolbox.main.registry import ToolsRegistryBase
from TreeOfLife_toolbox.main.utils import init_logger

if __name__ == "__main__":
config_path = os.environ.get("CONFIG_PATH")
Expand Down
Loading