Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 18 additions & 26 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,86 +3,78 @@ requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["src/DD_tools"]
packages = ["src/TreeOfLife_toolbox"]

[project]
name = "DD_tools"
name = "TreeOfLife_toolbox"
dynamic = ["version"]
authors = [
{ name = "Andrey Kopanev", email = "kopanev.1@osu.edu" },
{ name = "Elizabeth G. Campolongo", email = "e.campolongo479@gmail.com" },
{ name = "Matthew J. Thompson", email = "thompson.m.j@outlook.com" },
]
description = "A tool for downloading files from a list of URLs in parallel."
description = "A tool for processing datasets that was downloaded using the distributed-downloader package."
readme = "README.md"
requires-python = ">=3.8"
requires-python = ">=3.10, <3.12"
classifiers = [
"Development Status :: 4 - Beta",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
dependencies = [
"attrs",
"brotli",
"certifi",
"charset-normalizer",
"cramjam",
"cython",
"exceptiongroup",
"fsspec",
"hatchling",
"idna",
"inflate64",
"iniconfig",
"mpi4py < 4",
"mpi4py",
"multivolumefile",
"numpy",
"opencv-python",
"packaging",
"pandas",
"pathspec",
"pillow",
"pip",
"pluggy",
"psutil",
"py4j",
"pyarrow",
"pybcj",
"pycryptodomex",
"pyppmd",
"pyspark",
"pytest",
"python-dateutil",
"python-dotenv",
"pytz",
"pyyaml",
"pyzstd",
"requests",
"setuptools",
"six",
"texttable",
"tomli",
"trove-classifiers",
"typing-extensions",
"tzdata",
"urllib3",
"wheel"
]

[project.optional-dependencies]
dev = ["pytest"]
dev = [
"pytest",
"ruff"
]

keywords = [
"parallel",
"distributed",
"download",
"url",
"mpi-applications",
"dataset-generation",
]

[project.urls]
Homepage = "https://github.com/Imageomics/distributed-downloader"
Repository = "https://github.com/Imageomics/distributed-downloader.git"
"Bug Tracker" = "https://github.com/Imageomics/distributed-downloader/issues"

[project.scripts]
tree_of_life_toolbox = "TreeOfLife_toolbox.main.main:main"

[tool.hatch.version]
path = "src/DD_tools/main/__about__.py"
path = "src/TreeOfLife_toolbox/main/__about__.py"
3 changes: 1 addition & 2 deletions scripts/tools_filter.slurm
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ executor_memory="64G"
module load spark/3.4.1
module load miniconda3/23.3.1-py310
source "${REPO_ROOT}/.venv/bin/activate"
export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader"

pbs-spark-submit \
--driver-memory $driver_memory \
--executor-memory $executor_memory \
"${REPO_ROOT}/src/distributed_downloader/tools/filter.py" \
"${TOOLBOX_PATH}/main/filter.py" \
"${tool_name}" \
> "${logs_dir}/tool_filter.log"
3 changes: 1 addition & 2 deletions scripts/tools_scheduler.slurm
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ module load miniconda3/23.3.1-py310
source "${REPO_ROOT}/.venv/bin/activate"
export PYARROW_IGNORE_TIMEZONE=1
export I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0
export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader"

srun \
--mpi=pmi2 \
Expand All @@ -28,4 +27,4 @@ srun \
--cpus-per-task=1 \
--mem=0 \
--output="${logs_dir}/tool_scheduler.log" \
python "${REPO_ROOT}/src/distributed_downloader/tools/scheduler.py" "${tool_name}"
python "${TOOLBOX_PATH}/main/scheduler.py" "${tool_name}"
3 changes: 1 addition & 2 deletions scripts/tools_verifier.slurm
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ module load miniconda3/23.3.1-py310
source "${REPO_ROOT}/.venv/bin/activate"
export PYARROW_IGNORE_TIMEZONE=1
export I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0
export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader"

srun \
--mpi=pmi2 \
Expand All @@ -28,4 +27,4 @@ srun \
--cpus-per-task=1 \
--mem=0 \
--output="${logs_dir}/tool_verifier.log" \
python "${REPO_ROOT}/src/distributed_downloader/tools/verification.py" "${tool_name}"
python "${TOOLBOX_PATH}/main/verification.py" "${tool_name}"
3 changes: 1 addition & 2 deletions scripts/tools_worker.slurm
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ module load miniconda3/23.3.1-py310
source "${REPO_ROOT}/.venv/bin/activate"
export PYARROW_IGNORE_TIMEZONE=1
export I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0
export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader"

srun \
--mpi=pmi2 \
Expand All @@ -28,4 +27,4 @@ srun \
--cpus-per-task="$TOOLS_CPU_PER_WORKER" \
--mem=0 \
--output="${logs_dir}/tool_worker-%2t.log" \
python "${REPO_ROOT}/src/distributed_downloader/tools/runner.py" "${tool_name}"
python "${TOOLBOX_PATH}/main/runner.py" "${tool_name}"
Empty file removed src/DD_tools/__init__.py
Empty file.
1 change: 1 addition & 0 deletions src/TreeOfLife_toolbox/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from TreeOfLife_toolbox import column_name_change
78 changes: 78 additions & 0 deletions src/TreeOfLife_toolbox/column_name_change/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Column Name Change Tool

A distributed tool for renaming columns in Parquet files that follow the `distributed-downloader` directory structure.
This tool enables batch renaming of columns across multiple Parquet files in parallel using MPI, making it efficient for
large datasets.

## Overview

The Column Name Change tool provides a way to update column names in Parquet files without having to reload or
reconstruct the entire dataset. This is particularly useful when:

- Schema requirements change in downstream applications
- Column names need standardization across multiple datasets
- Fixing typos or inconsistencies in column naming
- Adapting to new naming conventions

## Implementation

The tool consists of three main components:

1. **Filter** (`ColumnNameChangeFilter`): Identifies all server_name/partition_id combinations that contain Parquet
files needing column renaming.

2. **Scheduler** (`ColumnNameChangeScheduleCreation`): Distributes the workload across available workers using a
round-robin approach for balanced processing.

3. **Runner** (`ColumnNameChangeRunner`): Performs the actual column renaming operation in parallel using MPI,
processing each assigned partition.

## Required Configuration Fields

The tool requires the following configuration:

- Standard TreeOfLife-toolbox configurations for distributed processing
- `name_mapping`: A dictionary mapping old column names to new column names

Example configuration YAML:

```yaml
# Standard tool configuration
# ...

# Tool-specific configuration
name_mapping:
old_column_name1: new_column_name1
old_column_name2: new_column_name2
# Add more mappings as needed
```

## Pre-conditions

- The input image directory must follow the distributed-downloader structure:

```
<path_to_output_folder>/<images_folder>/
├── server_name=<server1>
│ ├── partition_id=<id>
│ │ ├── successes.parquet # Contains data with columns to be renamed
│ │ ├── errors.parquet
│ │ └── completed
│ └── partition_id=<id2>
│ ├── ...
└── server_name=<server2>
└── ...
```

- The Parquet files must exist and be readable
- The columns specified in `name_mapping` must exist in the Parquet files (if a column doesn't exist, that specific
mapping will be ignored)

## Post-conditions

After running this tool:

- The column names in all `successes.parquet` files will be changed according to the provided mapping
- File structure and other metadata remain unchanged
- Data content (rows and values) remain unchanged
- Original compression and partitioning are preserved
5 changes: 5 additions & 0 deletions src/TreeOfLife_toolbox/column_name_change/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .classes import (
ColumnNameChangeFilter,
ColumnNameChangeScheduleCreation,
ColumnNameChangeRunner,
)
138 changes: 138 additions & 0 deletions src/TreeOfLife_toolbox/column_name_change/classes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import os
from typing import List

import pandas as pd

from TreeOfLife_toolbox.main.config import Config
from TreeOfLife_toolbox.main.filters import PythonFilterToolBase, FilterRegister
from TreeOfLife_toolbox.main.runners import MPIRunnerTool, RunnerRegister
from TreeOfLife_toolbox.main.schedulers import DefaultScheduler, SchedulerRegister


@FilterRegister("column_name_change")
class ColumnNameChangeFilter(PythonFilterToolBase):
"""
Filter class for the Column Name Change tool.

This class implements the filtering stage of the column name change process.
It identifies all partitions in the downloaded image directory structure
that need to have their column names modified.

Inherits from PythonFilterToolBase which provides the implementation
for traversing the directory structure and identifying all server_name/partition_id
combinations with parquet files.
"""

def __init__(self, cfg: Config):
"""
Initialize the Column Name Change Filter.

Args:
cfg (Config): Configuration object containing parameters for the tool.
"""
super().__init__(cfg)

self.filter_name: str = "column_name_change"


@SchedulerRegister("column_name_change")
class ColumnNameChangeScheduleCreation(DefaultScheduler):
"""
Scheduler class for the Column Name Change tool.

This class creates a work schedule for the column name change process,
distributing the partitions across available workers. It uses the
DefaultScheduler implementation which assigns partitions to workers
based on a round-robin assignment for load balancing.
"""

def __init__(self, cfg: Config):
"""
Initialize the Column Name Change Scheduler.

Args:
cfg (Config): Configuration object containing parameters for the tool.
"""
super().__init__(cfg)

self.filter_name: str = "column_name_change"


@RunnerRegister("column_name_change")
class ColumnNameChangeRunner(MPIRunnerTool):
"""
Runner class for the Column Name Change tool.

This class performs the actual column name change operation on the parquet files.
It processes each server_name/partition_id combination assigned to a worker
by loading the parquet file, renaming columns according to the mapping specified
in the configuration, and saving the modified file back to disk.

The MPI approach enables parallel processing across multiple nodes and cores,
with each worker handling its assigned partitions.
"""

def __init__(self, cfg: Config):
"""
Initialize the Column Name Change Runner.

Args:
cfg (Config): Configuration object containing parameters for the tool,
including the name mapping dictionary.
"""
super().__init__(cfg)

self.filter_name: str = "column_name_change"
self.data_scheme: List[str] = ["server_name", "partition_id"]
self.verification_scheme: List[str] = ["server_name", "partition_id"]
self.total_time = 150

# Load the column name mapping from configuration
self.name_mapping = cfg["name_mapping"]

def apply_filter(
self, filtering_df: pd.DataFrame, server_name: str, partition_id: int
) -> int:
"""
Apply the column name change operation to a specific partition.

This method loads the parquet file for the specified server_name and partition_id,
renames the columns according to the mapping provided in the configuration,
and saves the modified file back to the same location.

Args:
filtering_df (pd.DataFrame): DataFrame containing information about partitions to process.
server_name (str): Name of the server containing the partition.
partition_id (int): ID of the partition to process.

Returns:
int: Number of rows in the processed parquet file, or 0 if the file doesn't exist.

Note:
The method also performs time checks to ensure there is enough time left
in the job to complete the operation.
"""
self.is_enough_time()

parquet_path = os.path.join(
self.downloaded_images_path,
f"server_name={server_name}",
f"partition_id={partition_id}",
"successes.parquet",
)

if not os.path.exists(parquet_path):
self.logger.info(f"Path doesn't exists: {parquet_path}")
return 0

renamed_parquet = pd.read_parquet(parquet_path)

self.is_enough_time()

renamed_parquet = renamed_parquet.rename(columns=self.name_mapping)

renamed_parquet.to_parquet(
parquet_path, index=False, compression="zstd", compression_level=3
)

return len(renamed_parquet)
File renamed without changes.
Loading