diff --git a/01_generate_dataset.py b/01_generate_dataset.py new file mode 100644 index 0000000..70d7986 --- /dev/null +++ b/01_generate_dataset.py @@ -0,0 +1,16 @@ +import sqlite3 +import pandas as pd +from settings import DB_PATH, RAW_DATA_PATH + +def fetch_data(db_path): + conn = sqlite3.connect(db_path) + df = pd.read_sql_query("SELECT * FROM elevator_calls", conn) + conn.close() + + return df.dropna() + + + +if __name__ == "__main__": + df = fetch_data(DB_PATH) + df.to_pickle(RAW_DATA_PATH) diff --git a/02_preprocess.py b/02_preprocess.py new file mode 100644 index 0000000..e8fb2ef --- /dev/null +++ b/02_preprocess.py @@ -0,0 +1,91 @@ +import pandas as pd +import numpy as np +from settings import RAW_DATA_PATH, PROCESSED_DATA_PATH +from tqdm import tqdm + +def basic_time_features(df): + df["timestamp"] = pd.to_datetime(df["timestamp"]) + df = df.sort_values("timestamp").reset_index(drop=True) + df["hour"] = df["timestamp"].dt.hour + df["weekday"] = df["timestamp"].dt.weekday + df["day_name"] = df["timestamp"].dt.day_name() + df["is_weekday"] = df["weekday"].isin([0,1,2,3,4]) + df["is_weekend"] = df["weekday"].isin([5,6]) + df["resting_floor"] = df["calling_floor"].shift(1) + return df + +def cyclical_features(df): + df["hour_sin"] = np.sin(2 * np.pi * df["hour"] / 24) + df["hour_cos"] = np.cos(2 * np.pi * df["hour"] / 24) + df["weekday_sin"] = np.sin(2 * np.pi * df["weekday"] / 7) + df["weekday_cos"] = np.cos(2 * np.pi * df["weekday"] / 7) + df.drop(columns=["hour", "weekday"], inplace=True) + return df + +def rolling_floor_features(df): + df["calls_from_calling_floor_last_5min"] = 0 + df["calls_from_calling_floor_last_hour"] = 0 + df["calls_from_calling_floor_last_day"] = 0 + df["calls_from_calling_floor_last_7_days"] = 0 + df["avg_time_between_calls_from_calling_floor"] = np.nan + + for floor in tqdm(df["calling_floor"].unique(), desc="Rolling features by floor"): + mask = df["calling_floor"] == floor + floor_df = df.loc[mask].copy() + floor_df = floor_df.set_index("timestamp") + floor_df = floor_df.sort_index() + + calls_last_5min = floor_df.rolling("5min").count()["calling_floor"] - 1 + calls_last_hour = floor_df.rolling("1h").count()["calling_floor"] - 1 + calls_last_day = floor_df.rolling("1d").count()["calling_floor"] - 1 + calls_last_7_days = floor_df.rolling("7d").count()["calling_floor"] - 1 + avg_time_between_calls = floor_df.index.to_series().diff().dt.total_seconds().rolling(10, min_periods=1).mean() + + idx = df.index[mask] + df.loc[idx, "calls_from_calling_floor_last_5min"] = calls_last_5min.values + df.loc[idx, "calls_from_calling_floor_last_hour"] = calls_last_hour.values + df.loc[idx, "calls_from_calling_floor_last_day"] = calls_last_day.values + df.loc[idx, "calls_from_calling_floor_last_7_days"] = calls_last_7_days.values + df.loc[idx, "avg_time_between_calls_from_calling_floor"] = avg_time_between_calls.values + return df + +def conditional_freq_given_resting_floor(df): + df["conditional_call_freq_given_resting_floor_last_7_days"] = 0 + df["conditional_call_freq_given_resting_floor_last_5_days"] = 0 + df["conditional_call_freq_given_resting_floor_last_24_hours"] = 0 + + for i, row in tqdm(df.iterrows(), total=len(df), desc="Conditional freq given resting floor"): + curr_time = row["timestamp"] + curr_calling_floor = row["calling_floor"] + curr_resting_floor = row["resting_floor"] + + mask_time_7d = (df["timestamp"] < curr_time) & (df["timestamp"] >= curr_time - pd.Timedelta(days=7)) + mask_resting = (df["resting_floor"] == curr_resting_floor) + mask_calling = (df["calling_floor"] == curr_calling_floor) + df.loc[i, "conditional_call_freq_given_resting_floor_last_7_days"] = df.loc[mask_time_7d & mask_resting & mask_calling].shape[0] + + mask_time_5d = (df["timestamp"] < curr_time) & (df["timestamp"] >= curr_time - pd.Timedelta(days=5)) + df.loc[i, "conditional_call_freq_given_resting_floor_last_5_days"] = df.loc[mask_time_5d & mask_resting & mask_calling].shape[0] + + mask_time_24h = (df["timestamp"] < curr_time) & (df["timestamp"] >= curr_time - pd.Timedelta(hours=24)) + df.loc[i, "conditional_call_freq_given_resting_floor_last_24_hours"] = df.loc[mask_time_24h & mask_resting & mask_calling].shape[0] + + return df + +def preprocess(df): + df = basic_time_features(df) + df = cyclical_features(df) + df = rolling_floor_features(df) + df = conditional_freq_given_resting_floor(df) + for col in df.select_dtypes(bool).columns: + df[col] = df[col].astype(int) + + df = df.iloc[1:].reset_index(drop=True) + df.fillna(0, inplace=True) + return df + +if __name__ == "__main__": + df = pd.read_pickle(RAW_DATA_PATH) + df = df.rename(columns={"from_floor": "calling_floor"}) + df_processed = preprocess(df) + df_processed.to_pickle(PROCESSED_DATA_PATH) diff --git a/03_train.py b/03_train.py new file mode 100644 index 0000000..39ecd10 --- /dev/null +++ b/03_train.py @@ -0,0 +1,26 @@ +import pandas as pd +import pickle +from xgboost import XGBClassifier +from settings import PROCESSED_DATA_PATH, MODEL_PATH, FEATURES, TARGET, TEST_SIZE + +def train(): + df = pd.read_pickle(PROCESSED_DATA_PATH) + df["target"] = df[TARGET].shift(-1) + df = df.dropna(subset=["target"]) + df = df.sort_values("timestamp").reset_index(drop=True) + X = df[FEATURES] + y = df["target"].astype(int) + split_index = int(len(df) * (1 - TEST_SIZE)) + X_train = X.iloc[:split_index] + y_train = y.iloc[:split_index] + X_test = X.iloc[split_index:] + y_test = y.iloc[split_index:] + model = XGBClassifier() + model.fit(X_train, y_train) + print(f"Train score: {model.score(X_train, y_train):.4f}") + print(f"Test score: {model.score(X_test, y_test):.4f}") + with open(MODEL_PATH, "wb") as f: + pickle.dump(model, f) + +if __name__ == "__main__": + train() diff --git a/README_solution.md b/README_solution.md new file mode 100644 index 0000000..4979dfb --- /dev/null +++ b/README_solution.md @@ -0,0 +1,115 @@ +# Elevator Resting Floor Data Pipeline + +This repository implements the core of a data engineering and feature preparation pipeline for the "elevator resting floor" modeling challenge. + +## Structure + +- **generate_dataset.py** + Extracts raw event data from the SQLite database and saves it as a pickle artifact for further processing. + +- **preprocess.py** + Performs feature engineering on event data, including time-based, cyclical, rolling, and conditional features. Data is output in a clean, ML-ready tabular format. All boolean fields are encoded as integers (0/1). + +- **train.py** + Trains a prediction model to anticipate the next calling floor based on historical demand and engineered features. Uses a temporal split to separate train/test, reflecting real-world deployment. + +- **model.py** + Defines a class for loading the trained model and producing predictions given new data. Features are recomputed for single-record inference, using defaults for features that require historical context. + +- **settings.py** + Centralizes all file paths, feature lists, and relevant pipeline constants. + +## Data Model + +Events are stored in a table named `elevator_calls`, with at minimum: +- `timestamp` +- `calling_floor` + +During preprocessing, additional features are derived, including: +- Previous resting floor +- Time-based and cyclical encodings +- Rolling counts and averages per floor +- Conditional frequencies based on the previous resting floor + +## Key Design Choices + +- **Temporal split:** + Train/test sets are split chronologically to simulate realistic production inference and avoid data leakage. + +- **Feature set:** + Engineered features include recent call frequencies, conditional stats (e.g., "calls from floor X when resting at Y"), and cyclical encodings for hours/days. + +- **Data artifacts:** + Intermediate datasets and models are versioned and stored as pickles for reproducibility. + +- **Extendability:** + The current schema and scripts are straightforward to adapt for multi-elevator scenarios or richer sensor data. + +## Usage + +1. Generate or extract event data: +``` +python 01_generate_dataset.py +```` + +2. Preprocess features: +``` +python 02_preprocess.py +``` + +3. Train the model: +``` +python 03_train.py +``` + +4. Run inference (from a notebook or script): +``` +from model import ElevetorModel +m = ElevetorModel() +m.predict({"timestamp": "2024-06-01 10:00:00", "calling_floor": 5, "resting_floor": 3}) +``` + +## Next Steps +- **Collect production data:** + Integrate this pipeline with a live data source (e.g., sensor logs, building management API) to gather real elevator usage data. Real behavioral data will reveal meaningful demand patterns and allow for richer feature engineering. + +- **Iterate on feature engineering:** + With real data, re-evaluate which features are most predictive. Additional signals—such as trip direction, user IDs, or elevator occupancy—could be incorporated. Time-of-day and seasonality patterns should be validated and potentially modeled more granularly. + +- **Model evaluation and tuning:** + Compare different algorithms (tree-based models, time series approaches, etc.), run cross-validation, and monitor for overfitting. Feature importance analysis will help prioritize which features drive prediction quality. + +- **Error analysis:** + Systematically analyze cases where the model mispredicts. Use these insights to refine feature sets, handle edge cases, and improve the definition of the "optimal" resting floor policy (e.g., optimizing for average wait time vs. call frequency). + +- **Scalability:** + Extend the schema and pipeline to handle multiple elevators and more complex building layouts. Add support for batch ingestion and parallel processing as needed. + +- **Productionization:** + Package the pipeline for deployment as a service (API, scheduled job, etc.). Add monitoring for data drift and model performance. Establish processes for regular retraining and validation as new data arrives. + +- **Testing and validation:** + Add automated tests to cover core feature transformations and edge cases. Validate that the data pipeline handles missing values, outliers, and schema changes gracefully. + +--- + +## Assumptions + +- **Synthetic data:** + The current pipeline uses generated data to demonstrate the end-to-end flow. No assumptions about actual user behavior, peak times, or real demand cycles are built in; all results and scores should be interpreted in that light. The model is designed to be robust to real data once available. + +- **Resting floor logic:** + The “resting_floor” feature is approximated as the last known floor at which a call occurred. In a production setting, this would ideally be recorded explicitly when the elevator becomes idle. + +- **Single-elevator focus:** + The data model assumes a single elevator in the building. Extension to multiple elevators would require an additional `elevator_id` field and further schema adjustments. + +- **Border cases (e.g., move-ins/move-outs, group calls):** + The pipeline currently does not account for outlier scenarios such as bulk move-ins/move-outs (unusually high demand from a single floor), or simultaneous calls by multiple users. All events are treated as independent, single-person elevator calls. In real deployments, these cases may need special handling or outlier filtering. + +- **No assumption on trip direction or group occupancy:** + Calls are treated as single-direction, single-user events. The system does not currently attempt to infer or record whether multiple users are entering/exiting together, or the direction of elevator travel (up vs. down) for each call. + +--- + + diff --git a/artifacts/data/processed_data.pkl b/artifacts/data/processed_data.pkl new file mode 100644 index 0000000..dde74b1 Binary files /dev/null and b/artifacts/data/processed_data.pkl differ diff --git a/artifacts/data/raw_data.pkl b/artifacts/data/raw_data.pkl new file mode 100644 index 0000000..c0201a6 Binary files /dev/null and b/artifacts/data/raw_data.pkl differ diff --git a/artifacts/models/model.pkl b/artifacts/models/model.pkl new file mode 100644 index 0000000..7bef293 Binary files /dev/null and b/artifacts/models/model.pkl differ diff --git a/db/elevator_calls.sqlite b/db/elevator_calls.sqlite new file mode 100644 index 0000000..0f37525 Binary files /dev/null and b/db/elevator_calls.sqlite differ diff --git a/elevator_model_test.ipynb b/elevator_model_test.ipynb new file mode 100644 index 0000000..ec285bf --- /dev/null +++ b/elevator_model_test.ipynb @@ -0,0 +1,75 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "fc05d032", + "metadata": {}, + "source": [ + "\n", + "# Elevator Prediction Model – Test Notebook" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "26a62f57", + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "from model import ElevetorModel" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "2cab65fd", + "metadata": {}, + "outputs": [], + "source": [ + "clf = ElevetorModel()" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "44176d73", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Given the current resting floor 2, the predicted floor is 3\n" + ] + } + ], + "source": [ + "sample_input = {\"timestamp\": \"2024-06-01 10:32:00\", \"calling_floor\": 7, \"resting_floor\": 2}\n", + "predicted = clf.predict(sample_input)\n", + "print(f\"Given the current resting floor {sample_input['resting_floor']}, the predicted floor is {predicted}\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "base", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.7" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/model.py b/model.py new file mode 100644 index 0000000..720b32b --- /dev/null +++ b/model.py @@ -0,0 +1,40 @@ +import pickle +import numpy as np +import pandas as pd +from settings import MODEL_PATH, FEATURES + +class ElevetorModel: + def __init__(self, model_path=MODEL_PATH): + with open(model_path, "rb") as f: + self.model = pickle.load(f) + + def compute_features(self, data_json): + df = pd.DataFrame([data_json]) + df["timestamp"] = pd.to_datetime(df["timestamp"]) + df["hour"] = df["timestamp"].dt.hour + df["weekday"] = df["timestamp"].dt.weekday + df["day_name"] = df["timestamp"].dt.day_name() + df["is_weekday"] = df["weekday"].isin([0,1,2,3,4]).astype(int) + df["is_weekend"] = df["weekday"].isin([5,6]).astype(int) + + df["resting_floor"] = data_json.get("resting_floor", 1) + df["hour_sin"] = np.sin(2 * np.pi * df["hour"] / 24) + df["hour_cos"] = np.cos(2 * np.pi * df["hour"] / 24) + df["weekday_sin"] = np.sin(2 * np.pi * df["weekday"] / 7) + df["weekday_cos"] = np.cos(2 * np.pi * df["weekday"] / 7) + + df["calls_from_calling_floor_last_5min"] = 0 + df["calls_from_calling_floor_last_hour"] = 0 + df["calls_from_calling_floor_last_day"] = 0 + df["calls_from_calling_floor_last_7_days"] = 0 + df["avg_time_between_calls_from_calling_floor"] = 0 + df["conditional_call_freq_given_resting_floor_last_7_days"] = 0 + df["conditional_call_freq_given_resting_floor_last_5_days"] = 0 + df["conditional_call_freq_given_resting_floor_last_24_hours"] = 0 + + return df[FEATURES] + + def predict(self, data_json): + features = self.compute_features(data_json) + return int(self.model.predict(features)[0]) + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..b766bb8 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +pandas +scikit-learn +xgboost +tqdm \ No newline at end of file diff --git a/settings.py b/settings.py new file mode 100644 index 0000000..6daff8c --- /dev/null +++ b/settings.py @@ -0,0 +1,23 @@ +DB_PATH = "db/elevator_calls.sqlite" +RAW_DATA_PATH = "artifacts/data/raw_data.pkl" +PROCESSED_DATA_PATH = "artifacts/data/processed_data.pkl" +MODEL_PATH = "artifacts/models/model.pkl" + +FEATURES = [ + "calling_floor", + "resting_floor", + "hour_sin", "hour_cos", "weekday_sin", "weekday_cos", + "calls_from_calling_floor_last_5min", + "calls_from_calling_floor_last_hour", + "calls_from_calling_floor_last_day", + "calls_from_calling_floor_last_7_days", + "avg_time_between_calls_from_calling_floor", + "conditional_call_freq_given_resting_floor_last_7_days", + "conditional_call_freq_given_resting_floor_last_5_days", + "conditional_call_freq_given_resting_floor_last_24_hours", + "is_weekday", + "is_weekend" +] +TARGET = "calling_floor" +TEST_SIZE = 0.2 +RANDOM_STATE = 42 \ No newline at end of file