A small end-to-end pipeline that simulates Spotify events, streams them through Kafka, lands raw data in MinIO, loads into Snowflake (Bronze) via Airflow, and transforms into curated views (Silver/Gold) using dbt.
- Simulator:
simulator/producer.py→ produces events to Kafka topicspotify-events - Landing:
consumer/kafka-to-minio.py→ consumes from Kafka and uploads JSON to MinIO underbronze/date=.../hour=.../ - Orchestration / Load: Airflow DAG
docker/dags/minio-to-kafka.py→ reads from MinIO and inserts into Snowflake BRONZE table - Transformations: dbt project
spotify_analysis/→ creates TRANSFORM schema views:transform.spotify_silvertransform.top_songstransform.user_engagement
- Docker + Docker Compose
- Python 3 (local)
- Snowflake account + warehouse + role access
- dbt Snowflake adapter (
pip3 install dbt-snowflake)
- Docker/Airflow env:
docker/.env - Airflow DAG env (MinIO + Snowflake):
docker/dags/.env - Consumer env:
consumer/.env - dbt profile:
~/.dbt/profiles.yml
Keep passwords/tokens out of Git. Use placeholders in committed files.
cd docker
docker-compose up -d- Airflow UI: http://localhost:8080
- MinIO Console: http://localhost:9001
- MinIO S3 API (host): http://localhost:9002
cd simulator
python3 producer.pycd consumer
python3 kafka-to-minio.pyVerify files land in MinIO:
docker exec -it minio sh -c "mc alias set myminio http://localhost:9000 <MINIO_USER> <MINIO_PASS> && mc ls myminio/spotify/ --recursive"- Open Airflow UI → DAG:
spotify_minio_to_snowflake_bronze→ Trigger DAG - DAG file:
docker/dags/minio-to-kafka.py
Verify in Snowflake:
USE DATABASE SPOTIFY_DB;
USE SCHEMA BRONZE;
SELECT COUNT(*) FROM SPOTIFY_EVENTS_BRONZE;cd spotify_analysis
dbt debug
dbt runVerify in Snowflake:
USE DATABASE SPOTIFY_DB;
USE SCHEMA TRANSFORM;
SHOW VIEWS;
SELECT * FROM SPOTIFY_SILVER LIMIT 10;
SELECT * FROM TOP_SONGS LIMIT 10;
SELECT * FROM USER_ENGAGEMENT LIMIT 10;- Airflow task fails: NoSuchBucket → Create bucket
spotifyin MinIO and ensure consumer is writing. - Airflow can’t find MinIO objects → Ensure
MINIO_PREFIXmatches upload prefix (e.g.,bronze/). - dbt “invalid identifier …” → Column names in Gold models must match Silver model outputs (
song_id,song_name,device_type,event_ts, etc.). - dbt can’t connect to Snowflake → Check
~/.dbt/profiles.ymlcredentials and rundbt debug.
- Producer:
simulator/producer.py - Consumer:
consumer/kafka-to-minio.py - Airflow DAG:
docker/dags/minio-to-kafka.py - dbt project:
spotify_analysis/- Silver:
spotify_analysis/models/silver/spotify_silver.sql - Gold:
spotify_analysis/models/gold/top_songs.sql,spotify_analysis/models/gold/user_engagement.sql - Sources:
spotify_analysis/models/sources.yml
- Silver: