Skip to content

Commit 7998b03

Browse files
authored
Merge pull request #550 from Jeet009/mlOps
Build a Simple ETL Pipeline (MLOps)
2 parents 99cf136 + 5c8919f commit 7998b03

File tree

9 files changed

+197
-0
lines changed

9 files changed

+197
-0
lines changed

build/184.json

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
{
2+
"id": "184",
3+
"title": "Build a Simple ETL Pipeline (MLOps)",
4+
"difficulty": "medium",
5+
"category": "MLOps",
6+
"video": "",
7+
"likes": "0",
8+
"dislikes": "0",
9+
"contributor": [
10+
{
11+
"profile_link": "https://github.com/Jeet009",
12+
"name": "Jeet Mukherjee"
13+
}
14+
],
15+
"description": "## Problem\n\nImplement a simple ETL (Extract-Transform-Load) pipeline for model-ready data preparation.\n\nGiven a CSV-like string containing user events with columns: `user_id,event_type,value` (header included), write a function `run_etl(csv_text)` that:\n\n1. Extracts rows from the raw CSV text.\n2. Transforms data by:\n\t- Filtering only rows where `event_type == \"purchase\"`.\n\t- Converting `value` to float and dropping invalid rows.\n\t- Aggregating total purchase `value` per `user_id`.\n3. Loads the transformed results by returning a list of `(user_id, total_value)` tuples sorted by `user_id` ascending.\n\nAssume small inputs (no external libs), handle extra whitespace, and ignore blank lines.",
16+
"learn_section": "## Solution Explanation\n\nThis task mirrors a minimal MLOps ETL flow that prepares data for downstream modeling.\n\n### ETL breakdown\n- Extract: parse raw CSV text, ignore blanks, and split into header and rows.\n- Transform:\n\t- Filter only relevant records (event_type == \"purchase\").\n\t- Cast `value` to float; discard invalid rows to maintain data quality.\n\t- Aggregate total purchase value per user to create compact features.\n- Load: return a deterministic, sorted list of `(user_id, total_value)`.\n\n### Why this design?\n- Input sanitation prevents runtime errors and poor-quality features.\n- Aggregation compresses event-level logs into user-level features commonly used in models.\n- Sorting produces stable, testable outputs.\n\n### Complexity\n- For N rows, parsing and aggregation run in O(N); sorting unique users U costs O(U log U).\n\n### Extensions\n- Add schema validation and logging.\n- Write outputs to files or databases.\n- Schedule ETL runs and add monitoring for drift and freshness.",
17+
"starter_code": "# Implement your function below.\n\ndef run_etl(csv_text: str) -> list[tuple[str, float]]:\n\t\"\"\"Run a simple ETL pipeline over CSV text with header user_id,event_type,value.\n\n\tReturns a sorted list of (user_id, total_value) for event_type == \"purchase\".\n\t\"\"\"\n\t# TODO: implement extract, transform, and load steps\n\traise NotImplementedError",
18+
"solution": "from typing import List, Tuple\n\n\ndef run_etl(csv_text: str) -> List[Tuple[str, float]]:\n\t\"\"\"Reference ETL implementation.\n\n\t- Extract: parse CSV text, skip header, strip whitespace, ignore blanks\n\t- Transform: keep event_type == \"purchase\"; parse value as float; aggregate per user\n\t- Load: return sorted list of (user_id, total_value) by user_id asc\n\t\"\"\"\n\tlines = [line.strip() for line in csv_text.splitlines() if line.strip()]\n\tif not lines:\n\t\treturn []\n\t# header\n\theader = lines[0]\n\trows = lines[1:]\n\n\t# indices from header (allow varying order and case)\n\theaders = [h.strip().lower() for h in header.split(\",\")]\n\ttry:\n\t\tidx_user = headers.index(\"user_id\")\n\t\tidx_event = headers.index(\"event_type\")\n\t\tidx_value = headers.index(\"value\")\n\texcept ValueError:\n\t\t# header missing required columns\n\t\treturn []\n\n\taggregates: dict[str, float] = {}\n\tfor row in rows:\n\t\tparts = [c.strip() for c in row.split(\",\")]\n\t\tif len(parts) <= max(idx_user, idx_event, idx_value):\n\t\t\tcontinue\n\t\tuser_id = parts[idx_user]\n\t\tevent_type = parts[idx_event].lower()\n\t\tif event_type != \"purchase\":\n\t\t\tcontinue\n\t\ttry:\n\t\t\tvalue = float(parts[idx_value])\n\t\texcept ValueError:\n\t\t\tcontinue\n\t\taggregates[user_id] = aggregates.get(user_id, 0.0) + value\n\n\treturn sorted(aggregates.items(), key=lambda kv: kv[0])",
19+
"example": {
20+
"input": "run_etl(\"user_id,event_type,value\\n u1, purchase, 10.0\\n u2, view, 1.0\\n u1, purchase, 5\\n u3, purchase, not_a_number\\n u2, purchase, 3.5 \\n\\n\")",
21+
"output": "[('u1', 15.0), ('u2', 3.5)]",
22+
"reasoning": "Keep only purchases; convert values; drop invalid; aggregate per user; sort by user_id."
23+
},
24+
"test_cases": [
25+
{
26+
"test": "from solution import run_etl; print(run_etl('user_id,event_type,value\n u1, purchase, 10.0\n u2, view, 1.0\n u1, purchase, 5\n u3, purchase, not_a_number\n u2, purchase, 3.5 \n'))",
27+
"expected_output": "[('u1', 15.0), ('u2', 3.5)]"
28+
},
29+
{
30+
"test": "from solution import run_etl; print(run_etl('user_id,event_type,value\n'))",
31+
"expected_output": "[]"
32+
},
33+
{
34+
"test": "from solution import run_etl; print(run_etl('value,event_type,user_id\n 1.0, purchase, u1\n 2.0, purchase, u1\n'))",
35+
"expected_output": "[('u1', 3.0)]"
36+
}
37+
]
38+
}

build/185.json

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
{
2+
"id": "185",
3+
"title": "Basic Data Drift Check: Mean and Variance Thresholds",
4+
"difficulty": "easy",
5+
"category": "MLOps",
6+
"video": "",
7+
"likes": "0",
8+
"dislikes": "0",
9+
"contributor": [
10+
{
11+
"profile_link": "https://github.com/Jeet009",
12+
"name": "Jeet Mukherjee"
13+
}
14+
],
15+
"description": "## Problem\n\nImplement a basic data drift check comparing two numeric datasets (reference vs. current).\n\nWrite a function `check_drift(ref, cur, mean_threshold, var_threshold)` that:\n\n- Accepts two lists of numbers `ref` and `cur`.\n- Computes the absolute difference in means and variances.\n- Returns a tuple `(mean_drift, var_drift)` where each element is a boolean indicating whether drift exceeds the corresponding threshold:\n\t- `mean_drift = abs(mean(ref) - mean(cur)) > mean_threshold`\n\t- `var_drift = abs(var(ref) - var(cur)) > var_threshold`\n\nAssume population variance (divide by N). Handle empty inputs by returning `(False, False)`.",
16+
"learn_section": "## Solution Explanation\n\nWe compare two numeric samples (reference vs. current) using mean and variance with user-defined thresholds.\n\n### Definitions\n- Mean: \\( \\mu = \\frac{1}{N}\\sum_i x_i \\)\n- Population variance: \\( \\sigma^2 = \\frac{1}{N}\\sum_i (x_i - \\mu)^2 \\)\n\n### Drift rules\n- Mean drift if \\(|\\mu_{ref} - \\mu_{cur}| > \\tau_{mean}\\)\n- Variance drift if \\(|\\sigma^2_{ref} - \\sigma^2_{cur}| > \\tau_{var}\\)\n\n### Edge cases\n- If either sample is empty, return `(False, False)` to avoid false alarms.\n- Population vs. sample variance: we use population here to match many monitoring setups. Either is fine if used consistently.\n\n### Complexity\n- O(N + M) to compute stats; O(1) extra space.",
17+
"starter_code": "from typing import List, Tuple\n\n\ndef check_drift(ref: List[float], cur: List[float], mean_threshold: float, var_threshold: float) -> Tuple[bool, bool]:\n\t\"\"\"Return (mean_drift, var_drift) comparing ref vs cur with given thresholds.\n\n\tUse population variance.\n\t\"\"\"\n\t# TODO: handle empty inputs; compute means and variances; compare with thresholds\n\traise NotImplementedError",
18+
"solution": "from typing import List, Tuple\n\n\ndef _mean(xs: List[float]) -> float:\n\treturn sum(xs) / len(xs) if xs else 0.0\n\n\ndef _var(xs: List[float]) -> float:\n\tif not xs:\n\t\treturn 0.0\n\tm = _mean(xs)\n\treturn sum((x - m) * (x - m) for x in xs) / len(xs)\n\n\ndef check_drift(ref: List[float], cur: List[float], mean_threshold: float, var_threshold: float) -> Tuple[bool, bool]:\n\tif not ref or not cur:\n\t\treturn (False, False)\n\tmean_ref = _mean(ref)\n\tmean_cur = _mean(cur)\n\tvar_ref = _var(ref)\n\tvar_cur = _var(cur)\n\tmean_drift = abs(mean_ref - mean_cur) > mean_threshold\n\tvar_drift = abs(var_ref - var_cur) > var_threshold\n\treturn (mean_drift, var_drift)",
19+
"example": {
20+
"input": "check_drift([1, 2, 3], [1.1, 2.2, 3.3], 0.05, 0.1)",
21+
"output": "(True, True)",
22+
"reasoning": "Mean(ref)=2.0, Mean(cur)=2.2 → |Δ|=0.2>0.05. Var(ref)=2/3≈0.667; Var(cur)=1.21×0.667≈0.807 → |Δ|≈0.14>0.1."
23+
},
24+
"test_cases": [
25+
{
26+
"test": "from solution import check_drift; print(check_drift([1,2,3], [1.1,2.2,3.3], 0.05, 0.1))",
27+
"expected_output": "(True, True)"
28+
},
29+
{
30+
"test": "from solution import check_drift; print(check_drift([0,0,0], [0,0,0], 0.01, 0.01))",
31+
"expected_output": "(False, False)"
32+
},
33+
{
34+
"test": "from solution import check_drift; print(check_drift([], [1,2,3], 0.01, 0.01))",
35+
"expected_output": "(False, False)"
36+
}
37+
]
38+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
## Problem
2+
3+
Implement a simple ETL (Extract-Transform-Load) pipeline for model-ready data preparation.
4+
5+
Given a CSV-like string containing user events with columns: `user_id,event_type,value` (header included), write a function `run_etl(csv_text)` that:
6+
7+
1. Extracts rows from the raw CSV text.
8+
2. Transforms data by:
9+
- Filtering only rows where `event_type == "purchase"`.
10+
- Converting `value` to float and dropping invalid rows.
11+
- Aggregating total purchase `value` per `user_id`.
12+
3. Loads the transformed results by returning a list of `(user_id, total_value)` tuples sorted by `user_id` ascending.
13+
14+
Assume small inputs (no external libs), handle extra whitespace, and ignore blank lines.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"input": "run_etl(\"user_id,event_type,value\\n u1, purchase, 10.0\\n u2, view, 1.0\\n u1, purchase, 5\\n u3, purchase, not_a_number\\n u2, purchase, 3.5 \\n\\n\")",
3+
"output": "[('u1', 15.0), ('u2', 3.5)]",
4+
"reasoning": "Keep only purchases; convert values; drop invalid; aggregate per user; sort by user_id."
5+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
## Solution Explanation
2+
3+
This task mirrors a minimal MLOps ETL flow that prepares data for downstream modeling.
4+
5+
### ETL breakdown
6+
- Extract: parse raw CSV text, ignore blanks, and split into header and rows.
7+
- Transform:
8+
- Filter only relevant records (event_type == "purchase").
9+
- Cast `value` to float; discard invalid rows to maintain data quality.
10+
- Aggregate total purchase value per user to create compact features.
11+
- Load: return a deterministic, sorted list of `(user_id, total_value)`.
12+
13+
### Why this design?
14+
- Input sanitation prevents runtime errors and poor-quality features.
15+
- Aggregation compresses event-level logs into user-level features commonly used in models.
16+
- Sorting produces stable, testable outputs.
17+
18+
### Complexity
19+
- For N rows, parsing and aggregation run in O(N); sorting unique users U costs O(U log U).
20+
21+
### Extensions
22+
- Add schema validation and logging.
23+
- Write outputs to files or databases.
24+
- Schedule ETL runs and add monitoring for drift and freshness.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"id": "187",
3+
"title": "Build a Simple ETL Pipeline (MLOps)",
4+
"difficulty": "medium",
5+
"category": "MLOps",
6+
"video": "",
7+
"likes": "0",
8+
"dislikes": "0",
9+
"contributor": [
10+
{ "profile_link": "https://github.com/Jeet009", "name": "Jeet Mukherjee" }
11+
]
12+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
from typing import List, Tuple
2+
3+
4+
def run_etl(csv_text: str) -> List[Tuple[str, float]]:
5+
"""Reference ETL implementation.
6+
7+
- Extract: parse CSV text, skip header, strip whitespace, ignore blanks
8+
- Transform: keep event_type == "purchase"; parse value as float; aggregate per user
9+
- Load: return sorted list of (user_id, total_value) by user_id asc
10+
"""
11+
lines = [line.strip() for line in csv_text.splitlines() if line.strip()]
12+
if not lines:
13+
return []
14+
# header
15+
header = lines[0]
16+
rows = lines[1:]
17+
18+
# indices from header (allow varying order and case)
19+
headers = [h.strip().lower() for h in header.split(",")]
20+
try:
21+
idx_user = headers.index("user_id")
22+
idx_event = headers.index("event_type")
23+
idx_value = headers.index("value")
24+
except ValueError:
25+
# header missing required columns
26+
return []
27+
28+
aggregates: dict[str, float] = {}
29+
for row in rows:
30+
parts = [c.strip() for c in row.split(",")]
31+
if len(parts) <= max(idx_user, idx_event, idx_value):
32+
continue
33+
user_id = parts[idx_user]
34+
event_type = parts[idx_event].lower()
35+
if event_type != "purchase":
36+
continue
37+
try:
38+
value = float(parts[idx_value])
39+
except ValueError:
40+
continue
41+
aggregates[user_id] = aggregates.get(user_id, 0.0) + value
42+
43+
return sorted(aggregates.items(), key=lambda kv: kv[0])
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# Implement your function below.
2+
3+
def run_etl(csv_text: str) -> list[tuple[str, float]]:
4+
"""Run a simple ETL pipeline over CSV text with header user_id,event_type,value.
5+
6+
Returns a sorted list of (user_id, total_value) for event_type == "purchase".
7+
"""
8+
# TODO: implement extract, transform, and load steps
9+
raise NotImplementedError
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
[
2+
{
3+
"test": "print(run_etl('user_id,event_type,value\\n u1, purchase, 10.0\\n u2, view, 1.0\\n u1, purchase, 5\\n u3, purchase, not_a_number\\n u2, purchase, 3.5 \\n'))",
4+
"expected_output": "[('u1', 15.0), ('u2', 3.5)]"
5+
},
6+
{
7+
"test": "print(run_etl('user_id,event_type,value'))",
8+
"expected_output": "[]"
9+
},
10+
{
11+
"test": "print(run_etl('value,event_type,user_id\\n 1.0, purchase, u1\\n 2.0, purchase, u1\\n'))",
12+
"expected_output": "[('u1', 3.0)]"
13+
}
14+
]

0 commit comments

Comments
 (0)