The goal of this exercise is to design and implement a data pipeline that extracts data from a public API, processes it, and stores it in a relational database. The pipeline should be able to handle data extraction, data transformation, and data storage.
- Overview
- Project Structure
- Prerequisites
- Setup Instructions
- Running the Pipeline
- Code Explanation
- Bonus Features
- License
This project demonstrates how to build a simple data pipeline. The pipeline:
- Extracts data from a public API (weather, cryptocurrency, or COVID-19 data).
- Transforms the data:
- Handles missing values.
- Converts timestamps to a standard format.
- Normalizes numerical values.
- Stores the processed data in a relational database, such as SQLite or PostgreSQL.
The pipeline is designed to be simple yet effective, providing insights into common data engineering tasks like ETL (Extract, Transform, Load).
data-pipeline/
│
├── data/
│ └── raw_data.json # Sample raw data extracted from the API (optional for testing)
│
├── src/
│ ├── pipeline.py # Main script to run the pipeline (extract, transform, load)
│ ├── api_connector.py # Script to handle API requests and data extraction
│ ├── data_transformer.py # Script to handle data cleaning and transformations
│ └── db_connector.py # Script to handle database connection and storing data
│
├── requirements.txt # Python dependencies
├── schema.sql # SQL script to create the database schema
└── README.md # Project documentation (this file)To run this project, you'll need:
- Python 3.x installed on your machine.
- A database system (SQLite or PostgreSQL) installed. For SQLite, no setup is needed since it stores data locally in a file.
- The following Python libraries:
requests(for making HTTP requests to the API)pandas(for data manipulation)sqlalchemy(for database interaction)psycopg2(only if using PostgreSQL)
You can install the dependencies by running:
pip install -r requirements.txt
Or using poetry
poetry install
-
Database Setup:
- For SQLite: The database will be created automatically in the
src/folder when running the pipeline. - For PostgreSQL: Set up a PostgreSQL instance, create a database, and adjust the database connection settings in
db_connector.pywith your credentials.
- For SQLite: The database will be created automatically in the
-
API Key Setup:
- If your chosen API requires an API key, make sure to place the key in the script or store it as an environment variable.
-
Run the pipeline:
-
Run the main pipeline script:
python src/pipeline.py
This will:
- Extract data from the API.
- Transform the data (cleaning, formatting).
- Store the data in the configured database.
-
-
Check the database:
- After running the pipeline, the transformed data should be available in your database under the specified table.
-
api_connector.py:
- Handles the HTTP request to the public API and extracts the data.
- Handles any rate limits or retries as needed.
-
data_transformer.py:
- Cleans the extracted data:
- Handles missing values (e.g., by replacing with a default value or removing rows).
- Converts timestamps to a uniform format (e.g., ISO 8601).
- Normalizes numeric fields.
- Cleans the extracted data:
-
db_connector.py:
- Connects to the relational database and creates tables as per the schema (defined in
schema.sql). - Inserts the transformed data into the database.
- Connects to the relational database and creates tables as per the schema (defined in
-
pipeline.py:
- The main script that orchestrates the execution of the entire pipeline, from extraction to storage.
- Error Handling: The pipeline includes error handling for API failures, database connection issues, and invalid data formats.
- Scheduling: For regular execution, you can use
cronjobs or task schedulers to run the pipeline periodically. - Logging: Detailed logging has been implemented using the
logginglibrary to track the flow of data through the pipeline.
This project is licensed under the MIT License - see the LICENSE file for details.