This is project is designed to stream workflow data which is sent in Json to FastAPI application. Then the data is published to kafka for further downstream and processing. The data is written to azure as well for OLAP purposes to enable advance analytics. And the OLAP follow the medaillon architecture principle with BRONZE-SILVER-GOLD layers.
Transaction Use Case (OLTP)
- Store all Transaction happening on the E-commerce platform (Items description,Amount,quantity) make the user to have access to each transaction
- User to view their total spend up to date
- View Order history by showing all invoices
- View all returned items
Analytics Use Case (OLAP)
- Business Intelligence for the analysts to view aggragate sales over time and analyse the Trends
- Total Sales
- Sales Over time
- Top 10 Products by sales
- Customer summary : Loyal customers
- Cancelled Orders or Return
The project contains a complete data pipeline that supports both OLTP (Online Transaction Processing) and OLAP (Online Analytical Processing) Workload. The workflow integrates multiple technologies for real-time data processing, storage, transformation and visualization.
- Python Script that Transfom csv to json format for the APi Client
- Python Client: Utilized to post real-time streaming messages to the backend system
- FastAPI: Acts as the API gateway to receive and handle incoming streaming messages and post it to kafka producer.
- Apache Kafka: Serves as a distributed message broker to buffer and distribute streaming messages.
- Apache Spark: Processes real-time data from kafka and writes it to both postgreSQL for transactional storage and Azure Data Lake for analytical (OLAP) processing.
- PostgreSQL & pgAdmin: PostgreSQL is used for persisten storage of processed streaming data and pgAdmin provides a user interface for querying and visualizing the data.
- Streamlit: The front -end web app that consume and display the real-time data from posgreSQL for OLTP use case.
- Docker and Docker compose which is hosting FastAPI, Kafka & zookepeer, Apache Spark , PostgreSQL & pgAdmin.
- Data Storage - Azure DataLake : Utilized a datastore for Bronze and Silver Layer
- Synapse Anlytics: Acting both as datastore for Gold Layer and a query engine, PBI reads directly from synapse views
- Azure Data Factory: Orchestrate the transformation process, bronze ---> silver Triggering synapse notebook and silver -----> gold Executing Store procedures and loading the outputs into synapse Tables.
- Terraform and CI/CD : Creating azure resources and access permission via Terraform and automatic build and deploy of Terraform code.
- Azure Keyvault and Service principal: Manages and secures secrets, connections strings and credentials used in GitHub Action pipeline and ADF, Synapse and Airflow.
- Apache Airflow : Orchestrated and Schedule ADF pipelines.
- Docker: Hosting Airflow infrastructure : Airflow webserver, Schedule, DAG processor , Airflow metadata database.
- Power BI: Dashboard and Report are powered by Power BI
- Ubuntu or WSL2 installed with al least 16 GB RAM.
- IDE like VsCode or Pycharm
- API Testing Software like Postman.
- Docker and Docker Compose
I have used an E-Commerce dataset from kaggle dataset link which Contains transactional records, customer details, and product information.
Overview of dataset Columns (Kaggle)
- Created a python script that converts Kaggle E-commerce dataset from csv to Json format.
- Tested first with Postman and Created API client to POST Json data into the FastAPI app.
- The Fast API application is build in python code run inside of the Docker container and exposed on port 80:80 link to compose
I have used the same docker compose for all my stack like Kafka, PostgreSQL etc so I run this command in directory of the compose file sudo docker-compose -f docker-compose-kafka.yml build first time to build the image or sudo docker-compose -f docker-compose-kafka.yml up to start the containers
DockerFile Code that copy the app code and command to start the App:
``` FROM tiangolo/uvicorn-gunicorn-fastapi:python3.7
COPY requirements.txt /tmp/
RUN pip install --no-cache-dir --upgrade pip && \
pip install --requirement /tmp/requirements.txt
COPY ./app /app
WORKDIR /app ```
The API client sends a JSON payload to the API app and displays the response in the terminal.
On success, it returns a status message such as: Status code: 201
API app receiving the json documents

Apache Zookeper acts as the metadata database for kafka, managing brokers, topics, and comsumers. Both Kafka and Zookeeper are defined in single docker Docker Compose file link_compose Kafka depend on Zookeper to start and both are Network including Spark and PostgreSQL.
New version of Kafka does no longer requires Zookeeper. reference confluent: link to documentation
depends_on:
- zookeeper
networks:
- kafka-net
user: "root"
A Kafka topic is created to receive data from the API backend through the producer.
The Local Consumer subscribes to and read the messages.
start kafka & Zookeper with command sudo docker-compose -f docker-compose-kafka.yml build for building the images or sudo docker-compose -f docker-compose-kafka.yml up to start kafka and zookeper. And in the project setup all services are defined in a single Docker Compose file to make sure they are in the same network to ensure communication between the streaming services.
A Kafka topics are the categories used to organize messages. Each topic has a name that is unique across the entire Kafka cluster. Messages are sent to and read from specific topics. In other words, producers write data to topics, and consumers read data from topics.
``` #First to attach the Kafka shell then go to the dictory cd /opt/bitnami/kafka/bin #comand to list the existing topic of kafka ./kafka-topics.sh --list --bootstrap-server localhost:9092 #command to create ingest topic : ./kafka-topics.sh --create --topic ingestion-topic --bootstrap-server localhost:9092 ./kafka-console-consumer.sh --topic spark-output --bootstrap-server localhost:9092 #spark spark ouput topic #command to create a local consummer: ./kafka-console-consumer.sh --topic ingestion-topic --bootstrap-server localhost:9092 ```
Spark reads the stream from Kafka ingest topic. Spark reads the Json stream and converts it to into a DataFrame that matches the data model in PostgreSQL.
At the same time, spark also writes the data to Azure Data Lake in Parquet format as it is. Spark also managed the logic of insertion or update into PostgresSQL.
Connection details (like passwords) are store in the .env files, which is listed in .gitignore so that the credentials are not shown in the repository.
Spark Jupiter Notebook is exposed on Port 8080 and Spark UI is available on Port 4040
Spark UI
For connection between Spark and Azure, an access key is used instead of SAS token because SAS token was not working well was not handling the rename of folder in Azure even with all privileges. Spark also managed the configuration and libraries which are required to write data to PostgreSQL.
I faced a major challenge to connect spark and Azure Data Lake. The Spark docker image (spark 2) was unable to authenticate to Azure because spark 2 don't have all the required hadoop libraries to write to Azure. error message
Writing batch 0 to Azure Blob
Failed to write batch 0 to Azure Blob: An error occurred while calling o361.parquet.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) - Wasb:// stand for Windows Azure Storage Blob a legacy Hadoop File System to interact with Azure Blob Storage and no support for hierarchical namespaces
- abfss:// Stand Azure Blob File System Secure a Modern scheme to interact with Azure Blob Storage and more secure, supports hierarchical namespaces, and works properly with Spark 3 and Hadoop-compatible tools
PostgreSQL is hosted in Docker and is on the same network as rest of the service. pgAdmin also is configured in Docker to connect to PosgreSQL and visualize the data. The connection and passwords of pgAdmin and postgreSQL are stored in the .env file. PostgreSQL is exposed on port .... and pgAdmin is exposed on port.....
In order to meet the objective processing the streaming dataset for OLTP . I need to do data modelling. Below is the ERD diagram of postgreSQL
ERD Diagram for PostgreSQL.
pgAdmin UI connect to PostgreSQL

Streamlit an open-source python Library that helps you to build customs application to share data and machine kearning web app.
In my setup, Streamlit connect directly to PostgreSQL database using SQLAlchemy Library. it runs SQL queries and shows the results as a dashboard in Streamlit app.
Command to start the app streamlit run app.py
Link to code
Streamlit DashBoard
The entire infrastructure in Azure has been provisioned using Terraform, following best practices. Terraform State is stored in a remote backend in azure blob storage. This approach offers several benefits:
- State Locking to prevent concurrent modifications
- Improved security by avoiding having state locally.
- Centralized State file, providing a single source of truth for the current state infrastructure state
- Better Collaboration accross teams by allowing shared access to the state backend.
snippet code :
```
terraform {
backend "azurerm" {
resource_group_name = "data_platform"
storage_account_name = "datastorage7i4ws2"
container_name = "terraform-state"
key = "terraform.tfstate"
}
} ```
Migrate state to remote backend :
And Storage account access key store in KeyVault for better security
I have built a CI/CD pipeline to automate the infrastructure provision with Terraform:
-
Authentication: via service principal (Contributor role), credentialas store in GitHub Secrets.
-
Workflow: defined in
.github/workflows/ci_cd.yaml. code- Build job: Azure login --> Terraform init/validate --> save plan as artifaact.
- Deploy job: Azure login --> download plan --> Terraform init/apply.
-
Currently using
terraform apply -auto-approvehowever in a team settings, a manual approval before apply is recommended.Terraform Service Principal
GitHub Secrets
GitHub Action CI/CD Workflow
Build Task in CI/CD
Deploy Task in CI/CD
Terraform Plan Output in GitHub Action
Terraform Apply Output in GitHub Action
Azure infrastructure created via Terraform
The datastore is organized using the Medallion Architecture pattern with 3 Layers
-
Bronze Layer Store in Azure Data Lake Gen2, holds raw data in parquet format.
-
Silver Layer Also store in Azure Data Lake Gen2 Data is transformed and split into structures tables
- Dim_Customer
- Dim_Date
- Dim_Invoice
- Dim_Product
- Fact_Sales
-
Gold Layer Store in Azure Synapse Analytics.
- Serves as query engine for Power BI
- Data from Silver is loaded via SQL stored procedures.
Silver Layer Tables
ERD GOLD
The data transformation floows the Medaillion Architecture flow:
-
Bronze → Silver Transformations are performed using Synapse Spark Serveless Pool. Raw Parquet data from Bronze layer is cleaned, deduplicated and split into structured tables (
dim_customer,dim_date,dim_invoice,dim_product,fact_sales) The processed outputs are stored in silver layer (ADLS Gen2). -
Silver → Gold Data is accessed through external tables on the fly from stored procedures Transformations are handle via SQL stored procedures using upsert/merge logic Outputs are loaded into Synapse Tables
Analytics pools
Synapse connect to Azure Data Lake Gen2 via Managed Identity
The orchestration of data pipelins is managed using Azure Data Factory (ADF) and Apache Airflow.
-
Azure Data Factory
- Orchestrates transformation from Bronze → Silver by executing Synapse Spark notebooks with conditional activities.
- Manges Silver → Gold by running stored procedures in the correct sequence inside Synapse Dedicated SQL Pool
-
Apache Airflow
- Handles scheduling and triggering of ADF pipelins using Data Factory operator withing the DAG
- Airflow run inside Docker
- The run is scheduled to run every day at 07:40
- Connects to Azure via Service Principal:
- The service principal is granted the ADF Contributor role.
- The secrets are securely stored in Keyvault and use Airflow Variable to retrieve the secrets and authenticate to Azure
This hybrid approach leverages ADF for data movement and transformation orchestration, while Airflow provide robust workflow scheduling and control.
ADF UI Silver pipeline
ADF UI Gold pipeline
Run from monitor trigger by Airflow
Airflow UI Run
Run Logs
VsCode Logs
Power BY is connected directly to Azure Synapse (Gold Layer) direct query to enable reporting for the analyst. I have built a Sales Dashboard that supports The objectives defined for this dataset with insights such as:
- Total Sales
- Sales Over Time
- Cancelled Orders
- Top 10 Products by sales
- Top loyal customers
Power By Dashboard
This project was a great way for me to show my skillset in an industry relevant context. build solution that meet user needs and project goals
I used FastAPI, Kafka, Zookeeper, Apache Spark, Apache Airflow, and PostreSQL, all running in Docker containers, and along with Azure stack for OLAP. Some of the challenges were setting up Zookeeper and Kafka and getting Spark to write to Azure Data Lake.
Going forward, I plan to:
- To explore how to run kafka without Zookeeper
- Follow best practices for managing Docker Images using container registries


