Skip to content

Dima806/sample_project_pyspark

Repository files navigation

SparkOps

PySpark inefficiency visualisation and optimisation, designed to run on GitHub Codespaces with 2 CPU cores.

SparkOps generates a synthetic dataset, deliberately triggers common Spark performance problems, measures them, then demonstrates the fixes — with before/after output and automated tests.


Quickstart

# 1. Install dependencies and register pre-commit hooks
make install

# 2. Run the inefficient baseline pipeline
make run-baseline

# 3. Run the optimized pipeline and generate plots
make run-optimized

# 4. Run all tests
make test

Tip: row count and partition tuning are in config/config.yaml — no source-code edits required.


Project Structure

sparkops/
├── config/
│   └── config.yaml                # Pipeline tuning parameters (rows, partitions, salt)
├── data/                          # Parquet output (gitignored)
├── plots/                         # Generated charts (gitignored)
├── scripts/
│   └── validate_config.py         # Pre-commit schema validator for config.yaml
├── src/
│   ├── data_generation/
│   │   ├── generate_events.py     # Synthetic events dataset with intentional skew
│   │   └── session_data.py        # Shared dataset factory (get_pipeline_data)
│   ├── inefficiencies/
│   │   └── baseline.py            # Naive, inefficient pipeline (5 patterns)
│   ├── optimizations/
│   │   └── optimized.py           # Fixed pipeline with before/after metrics
│   ├── metrics/
│   │   └── collector.py           # partition_stats, skew_ratio, charts
│   └── utils/
│       ├── config.py              # YAML config loader
│       └── spark_session.py       # Shared SparkSession factory (local[2], 4 g)
├── tests/
│   ├── conftest.py                # Session-scoped Spark fixture
│   ├── test_data_generation.py
│   └── test_optimizations.py
├── .devcontainer/
│   └── devcontainer.json          # Python 3.12 + Java 17 Codespace image
├── .github/
│   └── workflows/
│       └── ci.yml                 # GitHub Actions CI (install → pre-commit → lint → test)
├── .pre-commit-config.yaml
├── Makefile
└── pyproject.toml

Inefficiencies Demonstrated

Both pipelines start from identical data produced by get_pipeline_data(), so every comparison is apples-to-apples.

# Inefficiency Symptom Fix in optimized.py
1 Skewed groupBy on location (92 % NULL) One mega-partition; single task handles all NULLs Two-phase salted groupBy
2 Sort-merge join on a 10-row table Both sides fully shuffled Broadcast join
3 Over-partitioned (200 parts, 2 cores) Scheduler overhead dominates Repartition to optimal_partitions
4 Under-partitioned (1 part) One core idle throughout Repartition to optimal_partitions
5 Parquet write without coalescing 200 tiny output files coalesce(optimal_partitions) before write

Dataset

Synthetic events table with intentional skew:

Column Type Notes
event_id long Sequential row ID
user_id int Power-law: top 1 % of users → ~30 % of rows
event_type string click / view / purchase / login / logout
session_duration int 0 – 3 600 s
location string 92 % NULL → Spark hashes all NULLs to partition 0
device_type string mobile / desktop / tablet, ~3 % NULL
timestamp long Unix epoch ms

Row count and partition settings live in config/config.yaml and are loaded at runtime — no code changes needed.


Key Metric

skew_ratio = max_partition_size / median_partition_size
Range Verdict
< 3 Acceptable
3 – 10 Investigate
> 10 Problematic (PRD threshold)

The 92 % NULL location column produces a skew ratio of ~70 × when repartitioned by location, which optimized.py resolves to near 1 ×.


Configuration

config/config.yaml controls all pipeline tuning parameters:

pipeline:
  total_rows: 15000000      # synthetic events to generate
  optimal_partitions: 6     # 3 × number of executor cores
  salt_buckets: 4           # two-phase salted groupBy buckets

Edit this file to change the demo scale. Values are validated automatically by the validate-pipeline-config pre-commit hook.


Makefile Targets

Target Description
make install uv sync + register pre-commit git hooks
make pre-commit Run all pre-commit hooks on every file
make lint ruff check src/ tests/
make format Auto-format with ruff format (in-place)
make format-check Format check only — used by CI
make test Run pytest
make validate Run Great Expectations data validation
make run-baseline Execute the inefficient pipeline, then make clean
make run-optimized Execute the optimized pipeline + plots, then make clean
make clean Remove data/*.parquet output directories
make all Full pipeline: install → pre-commit → test → validate → baseline → optimized

CI / Pre-commit

GitHub Actions (.github/workflows/ci.yml) runs on every push and pull request to main:

install → pre-commit → lint → format-check → test

Pre-commit hooks (.pre-commit-config.yaml):

Hook What it checks
trailing-whitespace No trailing spaces
end-of-file-fixer Files end with a newline
check-yaml YAML syntax
check-toml TOML syntax
check-merge-conflict No unresolved conflict markers
check-added-large-files Files ≤ 500 KB
ruff (+ --fix) Python linting and import sorting
ruff-format Python formatting
validate-pipeline-config config/config.yaml schema (required keys, positive integers)

Constraints

Setting Value
Spark master local[2] — both Codespace vCPUs
Driver memory 4 g — shared between cache, shuffle buffers, and Python worker
Shuffle partitions 63 × #cores
Python ≥ 3.10
Package manager uv

About

Sample project with pyspark

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors