diff --git a/pyproject.toml b/pyproject.toml index 17be78f..b3e3a5c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,80 +3,69 @@ requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] -packages = ["src/DD_tools"] +packages = ["src/TreeOfLife_toolbox"] [project] -name = "DD_tools" +name = "TreeOfLife_toolbox" dynamic = ["version"] authors = [ { name = "Andrey Kopanev", email = "kopanev.1@osu.edu" }, { name = "Elizabeth G. Campolongo", email = "e.campolongo479@gmail.com" }, { name = "Matthew J. Thompson", email = "thompson.m.j@outlook.com" }, ] -description = "A tool for downloading files from a list of URLs in parallel." +description = "A tool for processing datasets that was downloaded using the distributed-downloader package." readme = "README.md" -requires-python = ">=3.8" +requires-python = ">=3.10, <3.12" classifiers = [ + "Development Status :: 4 - Beta", "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", ] dependencies = [ "attrs", "brotli", - "certifi", - "charset-normalizer", "cramjam", "cython", - "exceptiongroup", "fsspec", - "hatchling", - "idna", "inflate64", - "iniconfig", - "mpi4py < 4", + "mpi4py", "multivolumefile", - "numpy", "opencv-python", - "packaging", "pandas", "pathspec", "pillow", - "pip", - "pluggy", "psutil", - "py4j", "pyarrow", "pybcj", "pycryptodomex", "pyppmd", "pyspark", - "pytest", - "python-dateutil", "python-dotenv", - "pytz", "pyyaml", "pyzstd", "requests", "setuptools", - "six", "texttable", - "tomli", "trove-classifiers", "typing-extensions", - "tzdata", - "urllib3", "wheel" ] [project.optional-dependencies] -dev = ["pytest"] +dev = [ + "pytest", + "ruff" +] keywords = [ "parallel", "distributed", - "download", "url", + "mpi-applications", + "dataset-generation", ] [project.urls] @@ -84,5 +73,8 @@ Homepage = "https://github.com/Imageomics/distributed-downloader" Repository = "https://github.com/Imageomics/distributed-downloader.git" "Bug Tracker" = "https://github.com/Imageomics/distributed-downloader/issues" +[project.scripts] +tree_of_life_toolbox = "TreeOfLife_toolbox.main.main:main" + [tool.hatch.version] -path = "src/DD_tools/main/__about__.py" +path = "src/TreeOfLife_toolbox/main/__about__.py" diff --git a/scripts/tools_filter.slurm b/scripts/tools_filter.slurm index 4642e34..6aee3f6 100644 --- a/scripts/tools_filter.slurm +++ b/scripts/tools_filter.slurm @@ -19,11 +19,10 @@ executor_memory="64G" module load spark/3.4.1 module load miniconda3/23.3.1-py310 source "${REPO_ROOT}/.venv/bin/activate" -export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader" pbs-spark-submit \ --driver-memory $driver_memory \ --executor-memory $executor_memory \ - "${REPO_ROOT}/src/distributed_downloader/tools/filter.py" \ + "${TOOLBOX_PATH}/main/filter.py" \ "${tool_name}" \ > "${logs_dir}/tool_filter.log" diff --git a/scripts/tools_scheduler.slurm b/scripts/tools_scheduler.slurm index e4fb6a2..ea35a32 100644 --- a/scripts/tools_scheduler.slurm +++ b/scripts/tools_scheduler.slurm @@ -19,7 +19,6 @@ module load miniconda3/23.3.1-py310 source "${REPO_ROOT}/.venv/bin/activate" export PYARROW_IGNORE_TIMEZONE=1 export I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0 -export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader" srun \ --mpi=pmi2 \ @@ -28,4 +27,4 @@ srun \ --cpus-per-task=1 \ --mem=0 \ --output="${logs_dir}/tool_scheduler.log" \ - python "${REPO_ROOT}/src/distributed_downloader/tools/scheduler.py" "${tool_name}" + python "${TOOLBOX_PATH}/main/scheduler.py" "${tool_name}" diff --git a/scripts/tools_verifier.slurm b/scripts/tools_verifier.slurm index 98ca024..6a3b75e 100644 --- a/scripts/tools_verifier.slurm +++ b/scripts/tools_verifier.slurm @@ -19,7 +19,6 @@ module load miniconda3/23.3.1-py310 source "${REPO_ROOT}/.venv/bin/activate" export PYARROW_IGNORE_TIMEZONE=1 export I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0 -export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader" srun \ --mpi=pmi2 \ @@ -28,4 +27,4 @@ srun \ --cpus-per-task=1 \ --mem=0 \ --output="${logs_dir}/tool_verifier.log" \ - python "${REPO_ROOT}/src/distributed_downloader/tools/verification.py" "${tool_name}" + python "${TOOLBOX_PATH}/main/verification.py" "${tool_name}" diff --git a/scripts/tools_worker.slurm b/scripts/tools_worker.slurm index 2ee2662..4856e62 100644 --- a/scripts/tools_worker.slurm +++ b/scripts/tools_worker.slurm @@ -19,7 +19,6 @@ module load miniconda3/23.3.1-py310 source "${REPO_ROOT}/.venv/bin/activate" export PYARROW_IGNORE_TIMEZONE=1 export I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0 -export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader" srun \ --mpi=pmi2 \ @@ -28,4 +27,4 @@ srun \ --cpus-per-task="$TOOLS_CPU_PER_WORKER" \ --mem=0 \ --output="${logs_dir}/tool_worker-%2t.log" \ - python "${REPO_ROOT}/src/distributed_downloader/tools/runner.py" "${tool_name}" + python "${TOOLBOX_PATH}/main/runner.py" "${tool_name}" diff --git a/src/DD_tools/__init__.py b/src/DD_tools/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/TreeOfLife_toolbox/__init__.py b/src/TreeOfLife_toolbox/__init__.py new file mode 100644 index 0000000..3b58335 --- /dev/null +++ b/src/TreeOfLife_toolbox/__init__.py @@ -0,0 +1 @@ +from TreeOfLife_toolbox import research_filtering diff --git a/src/DD_tools/main/__about__.py b/src/TreeOfLife_toolbox/main/__about__.py similarity index 100% rename from src/DD_tools/main/__about__.py rename to src/TreeOfLife_toolbox/main/__about__.py diff --git a/src/DD_tools/main/checkpoint.py b/src/TreeOfLife_toolbox/main/checkpoint.py similarity index 100% rename from src/DD_tools/main/checkpoint.py rename to src/TreeOfLife_toolbox/main/checkpoint.py diff --git a/src/DD_tools/main/config.py b/src/TreeOfLife_toolbox/main/config.py similarity index 100% rename from src/DD_tools/main/config.py rename to src/TreeOfLife_toolbox/main/config.py diff --git a/src/DD_tools/main/config_templates/tools.yaml b/src/TreeOfLife_toolbox/main/config_templates/tools.yaml similarity index 100% rename from src/DD_tools/main/config_templates/tools.yaml rename to src/TreeOfLife_toolbox/main/config_templates/tools.yaml diff --git a/src/DD_tools/main/filter.py b/src/TreeOfLife_toolbox/main/filter.py similarity index 85% rename from src/DD_tools/main/filter.py rename to src/TreeOfLife_toolbox/main/filter.py index 080e1a2..ed526c5 100644 --- a/src/DD_tools/main/filter.py +++ b/src/TreeOfLife_toolbox/main/filter.py @@ -1,10 +1,10 @@ import argparse import os -from DD_tools.main.checkpoint import Checkpoint -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsRegistryBase -from DD_tools.main.utils import init_logger +from TreeOfLife_toolbox.main.checkpoint import Checkpoint +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsRegistryBase +from TreeOfLife_toolbox.main.utils import init_logger if __name__ == "__main__": config_path = os.environ.get("CONFIG_PATH") diff --git a/src/DD_tools/main/filters.py b/src/TreeOfLife_toolbox/main/filters.py similarity index 93% rename from src/DD_tools/main/filters.py rename to src/TreeOfLife_toolbox/main/filters.py index 11c9426..385f18e 100644 --- a/src/DD_tools/main/filters.py +++ b/src/TreeOfLife_toolbox/main/filters.py @@ -7,10 +7,10 @@ from pyspark.sql import SparkSession from pyspark.sql.types import StructType -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsBase -from DD_tools.main.registry import ToolsRegistryBase -from DD_tools.main.utils import SuccessEntry +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsBase +from TreeOfLife_toolbox.main.registry import ToolsRegistryBase +from TreeOfLife_toolbox.main.utils import SuccessEntry FilterRegister = partial(ToolsRegistryBase.register, "filter") diff --git a/src/DD_tools/main/main.py b/src/TreeOfLife_toolbox/main/main.py similarity index 96% rename from src/DD_tools/main/main.py rename to src/TreeOfLife_toolbox/main/main.py index b3d5732..5272354 100644 --- a/src/DD_tools/main/main.py +++ b/src/TreeOfLife_toolbox/main/main.py @@ -1,15 +1,16 @@ import argparse import os from logging import Logger +from pathlib import Path from typing import Dict, List, Optional, TextIO, Tuple import pandas as pd from attr import Factory, define, field -from DD_tools.main.checkpoint import Checkpoint -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsRegistryBase -from DD_tools.main.utils import ( +from TreeOfLife_toolbox.main.checkpoint import Checkpoint +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsRegistryBase +from TreeOfLife_toolbox.main.utils import ( init_logger, ensure_created, truncate_paths, @@ -78,6 +79,7 @@ def __attrs_post_init__(self): def __init_environment(self) -> None: os.environ["CONFIG_PATH"] = self.config.config_path + os.environ["TOOLBOX_PATH"] = str(Path(__file__).parent.parent.resolve()) os.environ["ACCOUNT"] = self.config["account"] os.environ["PATH_TO_INPUT"] = self.config["path_to_input"] diff --git a/src/DD_tools/main/registry.py b/src/TreeOfLife_toolbox/main/registry.py similarity index 94% rename from src/DD_tools/main/registry.py rename to src/TreeOfLife_toolbox/main/registry.py index 12774dd..03cf9d6 100644 --- a/src/DD_tools/main/registry.py +++ b/src/TreeOfLife_toolbox/main/registry.py @@ -1,7 +1,7 @@ from typing import Dict, Type, Optional -from DD_tools.main.config import Config -from DD_tools.main.utils import init_logger +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.utils import init_logger class ToolsRegistryBase(type): diff --git a/src/DD_tools/main/runner.py b/src/TreeOfLife_toolbox/main/runner.py similarity index 84% rename from src/DD_tools/main/runner.py rename to src/TreeOfLife_toolbox/main/runner.py index 214237e..77dcefa 100644 --- a/src/DD_tools/main/runner.py +++ b/src/TreeOfLife_toolbox/main/runner.py @@ -1,10 +1,10 @@ import argparse import os -from DD_tools.main.checkpoint import Checkpoint -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsRegistryBase -from DD_tools.main.utils import init_logger +from TreeOfLife_toolbox.main.checkpoint import Checkpoint +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsRegistryBase +from TreeOfLife_toolbox.main.utils import init_logger if __name__ == "__main__": config_path = os.environ.get("CONFIG_PATH") diff --git a/src/DD_tools/main/runners.py b/src/TreeOfLife_toolbox/main/runners.py similarity index 98% rename from src/DD_tools/main/runners.py rename to src/TreeOfLife_toolbox/main/runners.py index cd875d3..bfb5d5e 100644 --- a/src/DD_tools/main/runners.py +++ b/src/TreeOfLife_toolbox/main/runners.py @@ -6,8 +6,8 @@ import pandas as pd -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsBase, ToolsRegistryBase +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsBase, ToolsRegistryBase RunnerRegister = partial(ToolsRegistryBase.register, "runner") diff --git a/src/DD_tools/main/scheduler.py b/src/TreeOfLife_toolbox/main/scheduler.py similarity index 87% rename from src/DD_tools/main/scheduler.py rename to src/TreeOfLife_toolbox/main/scheduler.py index 707b656..d686ae6 100644 --- a/src/DD_tools/main/scheduler.py +++ b/src/TreeOfLife_toolbox/main/scheduler.py @@ -1,10 +1,10 @@ import argparse import os -from DD_tools.main.checkpoint import Checkpoint -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsRegistryBase -from DD_tools.main.utils import init_logger +from TreeOfLife_toolbox.main.checkpoint import Checkpoint +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsRegistryBase +from TreeOfLife_toolbox.main.utils import init_logger if __name__ == "__main__": config_path = os.environ.get("CONFIG_PATH") diff --git a/src/DD_tools/main/schedulers.py b/src/TreeOfLife_toolbox/main/schedulers.py similarity index 90% rename from src/DD_tools/main/schedulers.py rename to src/TreeOfLife_toolbox/main/schedulers.py index ed70a9c..6b2c6e2 100644 --- a/src/DD_tools/main/schedulers.py +++ b/src/TreeOfLife_toolbox/main/schedulers.py @@ -5,8 +5,8 @@ import pandas as pd -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsBase, ToolsRegistryBase +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsBase, ToolsRegistryBase SchedulerRegister = partial(ToolsRegistryBase.register, "scheduler") diff --git a/src/DD_tools/main/utils.py b/src/TreeOfLife_toolbox/main/utils.py similarity index 100% rename from src/DD_tools/main/utils.py rename to src/TreeOfLife_toolbox/main/utils.py diff --git a/src/DD_tools/main/verification.py b/src/TreeOfLife_toolbox/main/verification.py similarity index 85% rename from src/DD_tools/main/verification.py rename to src/TreeOfLife_toolbox/main/verification.py index 742bb86..31d2561 100644 --- a/src/DD_tools/main/verification.py +++ b/src/TreeOfLife_toolbox/main/verification.py @@ -3,11 +3,11 @@ import pandas as pd -from DD_tools.main.checkpoint import Checkpoint -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsRegistryBase -from DD_tools.main.runners import MPIRunnerTool -from DD_tools.main.utils import init_logger +from TreeOfLife_toolbox.main.checkpoint import Checkpoint +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsRegistryBase +from TreeOfLife_toolbox.main.runners import MPIRunnerTool +from TreeOfLife_toolbox.main.utils import init_logger if __name__ == "__main__": config_path = os.environ.get("CONFIG_PATH") diff --git a/src/TreeOfLife_toolbox/research_filtering/README.md b/src/TreeOfLife_toolbox/research_filtering/README.md new file mode 100644 index 0000000..15e231e --- /dev/null +++ b/src/TreeOfLife_toolbox/research_filtering/README.md @@ -0,0 +1,46 @@ +# Research Filtering Tool + +## Overview + +The Research Filtering Tool allows filtering of TreeOfLife datasets based on the `basisOfRecord` field from occurrence +data. This tool is designed to selectively remove entries from the dataset where the `basisOfRecord` matches specific +criteria defined in the configuration. + +## How It Works + +The tool operates in three phases: + +1. **Filtering Phase**: Reads occurrence data and the main dataset, filters occurrences based on the specified + `basisOfRecord` value, and creates a filter table identifying entries to be removed. + +2. **Scheduling Phase**: Creates a schedule for distributed processing of the filtered data, organized by file paths. + +3. **Running Phase**: Applies the filtering operation across distributed nodes, removing entries from parquet files + based on UUIDs identified in the filtering step. + +## Configuration Requirements + +The following fields must be specified in the configuration file: + +* `occurrences_path`: Path to the occurrences table containing `gbifID` and `basisOfRecord` fields +* `data_path`: Path to the TreeOfLife dataset root directory +* `basis_of_record`: Value or pattern to match in the `basisOfRecord` field for entries that should be filtered out +* `save_path_folder`: Path where filtered data should be saved (if applicable) + +## Initial Assumptions / Preconditions + +- The dataset must follow the TreeOfLife format structure +- The occurrences data must include `gbifID` and `basisOfRecord` fields +- The dataset must be organized in parquet files following the pattern: `/source=*/server=*/data_*.parquet` +- Each data file must contain at least `uuid` and `source_id` fields + +## Post-conditions + +After successful execution: + +- Parquet files in the dataset will be filtered to exclude entries where the `basisOfRecord` matches the specified + criteria +- Original dataset files will be overwritten with filtered versions +- Entries with matching criteria will be completely removed from the dataset +- All files processed will be logged in the verification folder +- Compression will be applied using zstd level 3 to maintain efficient storage diff --git a/src/TreeOfLife_toolbox/research_filtering/__init__.py b/src/TreeOfLife_toolbox/research_filtering/__init__.py new file mode 100644 index 0000000..8bb07b5 --- /dev/null +++ b/src/TreeOfLife_toolbox/research_filtering/__init__.py @@ -0,0 +1,5 @@ +from .classes import ( + ResearchFilteringFilter, + ResearchFilteringRunner, + ResearchFilteringScheduleCreation, +) diff --git a/src/TreeOfLife_toolbox/research_filtering/classes.py b/src/TreeOfLife_toolbox/research_filtering/classes.py new file mode 100644 index 0000000..3698d29 --- /dev/null +++ b/src/TreeOfLife_toolbox/research_filtering/classes.py @@ -0,0 +1,222 @@ +import os +import re +from typing import List + +import pandas as pd + +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.filters import FilterRegister, SparkFilterToolBase +from TreeOfLife_toolbox.main.runners import MPIRunnerTool, RunnerRegister +from TreeOfLife_toolbox.main.schedulers import DefaultScheduler, SchedulerRegister + + +@FilterRegister("research_filtering") +class ResearchFilteringFilter(SparkFilterToolBase): + """ + A filter class for research data filtering operations using Spark. + + This class filters a dataset based on the basisOfRecord field and creates + a filtered table that identifies entries to be removed from the TreeOfLife dataset. + + Attributes: + filter_name (str): Name identifier for the filter. + string_to_remove (str): String prefix to remove from file paths. + """ + + def __init__(self, cfg: Config): + """ + Initialize the research filtering filter. + + Args: + cfg (Config): Configuration object containing parameters for filtering. + """ + super().__init__(cfg) + + self.filter_name: str = "research_filtering" + self.string_to_remove = "file:/" + + def run(self): + """ + Run the filtering process. + + This method: + 1. Reads occurrence data and selects relevant columns + 2. Reads data files and extracts paths + 3. Filters occurrences based on the basisOfRecord value specified in config + 4. Joins filtered occurrences with data to create a filtered table + 5. Saves the filtered table for later processing + """ + import pyspark.sql.functions as func + + occurrences_df = ( + self.spark.read.parquet(self.config["occurrences_path"]) + .select("gbifID", "basisOfRecord") + .withColumnRenamed("gbifID", "source_id") + ) + + data_df = ( + self.spark.read.option("basePath", self.config["data_path"]) + .parquet(f"{self.config['data_path']}/source=*/server=*/data_*.parquet") + .select("uuid", "source_id") + .withColumn( + "path", + func.substring( + func.input_file_name(), len(self.string_to_remove), 2000000 + ), + ) + ) + + occurrences_df_filtered = occurrences_df.where( + occurrences_df["basisOfRecord"].contains(self.config["basis_of_record"]) + ) + data_merged = data_df.join(occurrences_df_filtered, on="source_id", how="inner") + + ( + data_merged.repartition(1).write.csv( + os.path.join(self.tools_path, self.filter_name, "filter_table"), + header=True, + mode="overwrite", + ) + ) + + +@SchedulerRegister("research_filtering") +class ResearchFilteringScheduleCreation(DefaultScheduler): + """ + Scheduler class for research filtering operations. + + Creates the schedule for distributed processing of filtered data. + + Attributes: + filter_name (str): Name identifier for the scheduler. + scheme (List[str]): Column schema for scheduling, specifies which fields + are used to group tasks for distribution. + """ + + def __init__(self, cfg: Config): + """ + Initialize the research filtering scheduler. + + Args: + cfg (Config): Configuration object containing parameters for scheduling. + """ + super().__init__(cfg) + + self.filter_name: str = "research_filtering" + self.scheme = ["path"] + + +@RunnerRegister("research_filtering") +class ResearchFilteringRunner(MPIRunnerTool): + """ + Runner class that applies the research filtering operation across distributed nodes. + + This runner filters out data entries from parquet files based on UUIDs + identified in the filtering step. + + Attributes: + filter_name (str): Name identifier for the runner. + server_pattern (str): Regex pattern to extract server name from a path. + source_pattern (str): Regex pattern to extract source name from a path. + data_scheme (List[str]): Schema for data columns used in processing. + verification_scheme (List[str]): Schema for verification columns. + total_time (int): Maximum execution time in seconds. + save_path_folder (str): Folder path to save filtered results. + """ + + server_pattern = r"server=([^/]+)" + source_pattern = r"source=([^/]+)" + + def __init__(self, cfg: Config): + """ + Initialize the research filtering runner. + + Args: + cfg (Config): Configuration object containing parameters for the runner. + """ + super().__init__(cfg) + + self.filter_name: str = "research_filtering" + self.data_scheme: List[str] = ["uuid", "path"] + self.verification_scheme: List[str] = ["path"] + self.total_time = 150 + self.save_path_folder = cfg["save_path_folder"] + + def apply_filter(self, filtering_df: pd.DataFrame, file_path: str) -> int: + """ + Apply filtering to a specific parquet file. + + This method: + 1. Checks if there is enough time remaining + 2. Verifies if the target file exists + 3. Reads the parquet file + 4. Filters out entries based on UUIDs + 5. Saves the filtered parquet file + + Args: + filtering_df (pd.DataFrame): DataFrame containing UUIDs to filter out + file_path (str): Path to the parquet file to be filtered + + Returns: + int: Count of remaining entries after filtering + + Raises: + TimeoutError: If there's not enough time left to complete the operation + """ + self.is_enough_time() + + if not os.path.exists(file_path): + self.logger.info(f"Path doesn't exists: {file_path}") + return 0 + + server_name = re.findall(r"server=([^/]+)", file_path)[0] + filename_path = os.path.basename(file_path) + + filtered_parquet = pd.read_parquet( + file_path, filters=[("uuid", "not in", filtering_df["uuid"])] + ) + + self.is_enough_time() + if len(filtered_parquet) == 0: + self.logger.info(f"Fully filtered out: {server_name}/{filename_path}") + + filtered_parquet.to_parquet( + file_path, index=False, compression="zstd", compression_level=3 + ) + + return len(filtered_parquet) + + def runner_fn(self, df_local: pd.DataFrame) -> int: + """ + Runner function that processes a chunk of data. + + This method: + 1. Extracts the file path from the input dataframe + 2. Calls apply_filter to process the file + 3. Logs the result and writes to verification file + + Args: + df_local (pd.DataFrame): DataFrame containing paths and UUIDs to process + + Returns: + int: 1 if successful, 0 if an error occurred + + Raises: + NotImplementedError: If filter function wasn't implemented + """ + filtering_df = df_local.reset_index(drop=True) + file_path = filtering_df.iloc[0]["path"] + try: + filtered_parquet_length = self.apply_filter(filtering_df, file_path) + except NotImplementedError: + raise NotImplementedError("Filter function wasn't implemented") + except Exception as e: + self.logger.exception(e) + self.logger.error(f"Error occurred: {e}") + return 0 + else: + print(f"{file_path}", end="\n", file=self.verification_IO) + self.logger.debug( + f"Completed filtering: {file_path} with {filtered_parquet_length}" + ) + return 1