Skip to content

virbahu/carbon-data-pipeline

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

3 Commits
Β 
Β 

Repository files navigation

πŸ” carbon-data-pipeline

Apache Kafka Apache Spark PostgreSQL Docker License: MIT Google Scholar

Real-time IoT data ingestion and streaming pipeline for continuous Scope 3 carbon accounting β€” replacing 12-month survey cycles with live emission telemetry.


πŸ“‹ Overview

carbon-data-pipeline is a production-grade event-streaming infrastructure that ingests IoT sensor data, ERP transactions, logistics telemetry, and supplier API feeds in real-time to maintain a continuously updated Scope 3 carbon accounting ledger.

The fundamental problem with traditional Scope 3 accounting is temporal: by the time emission data is collected, validated, and reported, it is 12–18 months stale. Decarbonization interventions based on last year's data are flying blind. This pipeline solves the staleness problem by treating carbon data as a first-class event stream.

Core capabilities:

  • Real-time IoT ingestion from factory sensors, smart meters, vehicle telematics, and logistics platforms
    • Event-driven carbon accounting with sub-minute latency from emission event to ledger update
      • Multi-source integration across ERP systems (SAP, Oracle), logistics APIs (DHL, FedEx), and supplier portals
        • Streaming Scope 3 computation using configurable emission factor lookup at the event level
          • Immutable audit ledger built on Apache Kafka for regulatory-grade data lineage

            • Scalable to billions of events with horizontal scaling via Kubernetes

πŸ–ΌοΈ Data Flow Overview

Streaming Architecture

 IoT Sensors (MQTT)  ─┐
 ERP Events (CDC)    ──┼──► Kafka Topics ──► Spark Streaming ──► Carbon Ledger
 Logistics APIs      ───                            β”‚
 Supplier Portals    β”€β”˜                             β–Ό
                                           Emission Factor Lookup
                                           (broadcast join)
                                                     β”‚
                                  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                  β–Ό                  β–Ό              β–Ό
                             PostgreSQL         TimescaleDB    Data Warehouse
                             (Audit Log)        (Dashboard)    (Annual Report)

πŸ—οΈ Architecture Diagram

╔═══════════════════════════════════════════════════════════════════╗
β•‘         CARBON DATA PIPELINE β€” STREAMING ARCHITECTURE             β•‘
╠═══════════════════════════════════════════════════════════════════╣
β•‘                                                                   β•‘
β•‘  DATA SOURCES (Real-time)                                         β•‘
β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β•‘
β•‘  β”‚  Factory   β”‚ β”‚  Vehicle   β”‚ β”‚  ERP/SAP   β”‚ β”‚   Supplier     β”‚ β•‘
β•‘  β”‚  IoT Snrs  β”‚ β”‚  Telemtcs  β”‚ β”‚  PO Events β”‚ β”‚   API Feeds    β”‚ β•‘
β•‘  β”‚  (MQTT)    β”‚ β”‚  (REST)    β”‚ β”‚  (Webhooks)β”‚ β”‚  (REST/SFTP)   β”‚ β•‘
β•‘  β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β•‘
β•‘        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β•‘
β•‘                                      β”‚                             β•‘
β•‘                     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β•‘
β•‘                     β”‚    INGESTION LAYER             β”‚              β•‘
β•‘                     β”‚    Kafka Connect + Producers   β”‚              β•‘
β•‘                     β”‚    β€’ Schema Registry (Avro)    β”‚              β•‘
β•‘                     β”‚    β€’ Dead Letter Queue         β”‚              β•‘
β•‘                     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β•‘
β•‘                                      β”‚                             β•‘
β•‘  KAFKA TOPICS:                       β–Ό                             β•‘
β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β•‘
β•‘  β”‚  raw.iot.energy  β”‚  raw.logistics  β”‚  raw.procurement       β”‚  β•‘
β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β•‘
β•‘                                 β”‚                                   β•‘
β•‘  STREAM PROCESSING              β–Ό                                   β•‘
β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β•‘
β•‘  β”‚  Apache Spark Structured Streaming                          β”‚  β•‘
β•‘  β”‚  β€’ Emission Factor Lookup (broadcast join)                  β”‚  β•‘
β•‘  β”‚  β€’ GHG Protocol category assignment                         β”‚  β•‘
β•‘  β”‚  β€’ kgCO2e computation per event                             β”‚  β•‘
β•‘  β”‚  β€’ 5-min / 1-hour / 24-hour aggregation windows             β”‚  β•‘
β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β•‘
β•‘                                 β”‚                                   β•‘
β•‘  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β•‘
β•‘  β”‚  PostgreSQL    β”‚   β”‚   TimescaleDB     β”‚   β”‚  Data Warehouse  β”‚ β•‘
β•‘  β”‚  (Audit Ledgr) β”‚   β”‚   (Dashboards)    β”‚   β”‚  (Annual rpt)   β”‚ β•‘
β•‘  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•

❗ Problem Statement

The Carbon Data Latency Crisis

Enterprise Scope 3 accounting operates on a 12–18 month reporting cycle. Companies set decarbonization targets against data that is already stale before intervention can begin.

Dimension Batch Approach Streaming Approach
Data Freshness 12–18 months stale Sub-minute latency
Anomaly Detection Post-hoc, annual Real-time threshold alerts
Intervention Speed Next fiscal year Same operational day
Data Sources Surveys + invoices IoT + ERP + logistics live
Audit Trail Manual spreadsheets Immutable Kafka log
Scalability Excel/VLOOKUP Billions of events/day

"You cannot decarbonize a supply chain on a 12-month feedback loop. Real-time emission telemetry is the foundation of science-based action."


βœ… Solution Overview

Event-Driven Carbon Accounting Architecture

The pipeline treats every energy consumption reading, every purchase order creation, every logistics leg departure, and every supplier production event as an emission-relevant event that must be immediately classified, quantified, and recorded.

Ingestion Layer Apache Kafka Connect with pre-built connectors ingests data from MQTT brokers (factory IoT), REST APIs (logistics, supplier portals), SAP/Oracle CDC streams (ERP procurement events), and SFTP file drops. All events are schema-validated with Apache Avro and registered in the Schema Registry.

Stream Processing Layer Spark Structured Streaming jobs run continuously with micro-batch intervals of 30 seconds to 5 minutes. Each event undergoes emission factor lookup via a broadcast-joined reference table, Scope 3 category assignment, and kgCO2e computation. Windowed aggregations produce rolling inventory totals.

Storage and Serving Layer Processed emission records land in three stores: PostgreSQL (audit ledger), TimescaleDB (time-series for dashboards), and a data warehouse (historical analytics). A FastAPI service layer exposes inventory data to downstream applications.


πŸ’» Code, Installation & Analysis

Quick Start with Docker

git clone https://github.com/virbahu/carbon-data-pipeline.git
cd carbon-data-pipeline

# Start the full stack
docker-compose up -d

# Services: Kafka, Schema Registry, Kafka Connect,
# Spark (1 master + 2 workers), PostgreSQL, TimescaleDB, Grafana, FastAPI

# Load demo data
python scripts/load_demo_data.py --events 10000 --duration 60

Producing Carbon Events

from pipeline.producers import CarbonEventProducer, IoTEnergyEvent
from datetime import datetime

producer = CarbonEventProducer(bootstrap_servers="localhost:9092")

event = IoTEnergyEvent(
    sensor_id="SM-PLANT-DE-042",
    facility_id="FACILITY_MUENCHEN_01",
    country_iso2="DE",
    energy_kwh=1247.3,
    energy_source="grid",
    grid_carbon_intensity_gco2_kwh=385.2,
    timestamp=datetime.utcnow()
)

producer.send("raw.iot.energy", key=event.sensor_id, value=event)

Querying the Carbon Ledger

from api.client import CarbonLedgerClient

client = CarbonLedgerClient(base_url="http://localhost:8000")

inventory = client.get_supplier_inventory(
    supplier_id="SUP_042_DE",
    scope=3,
    start_date="2025-01-01",
    end_date="2025-12-31"
)

print(f"YTD Scope 3: {inventory.total_tco2e:,.1f} tCO2e")
print(f"Last updated: {inventory.last_event_timestamp}")
# >> YTD Scope 3: 4,832.7 tCO2e
# >> Last updated: 2025-12-20T14:32:07Z  (< 1 minute ago)

πŸ“¦ Dependencies

# docker-compose.yml
services:
  kafka:         confluentinc/cp-kafka:7.6.0
  schema-reg:    confluentinc/cp-schema-registry:7.6.0
  kafka-connect: confluentinc/cp-kafka-connect:7.6.0
  spark-master:  bitnami/spark:3.5
  spark-worker:  bitnami/spark:3.5
  postgres:      postgres:15
  timescaledb:   timescale/timescaledb:latest-pg15
  grafana:       grafana/grafana:10.3.0
[tool.poetry.dependencies]
python = "^3.10"
confluent-kafka = "^2.3"
pyspark = "^3.5"
fastavro = "^1.9"
psycopg2-binary = "^2.9"
sqlalchemy = "^2.0"
fastapi = "^0.110"
pandas = "^2.0"
pydantic = "^2.0"

πŸ‘€ Author

Virbahu Jain β€” Founder & CEO, Quantisage

Building the AI Operating System for Scope 3 emissions management and supply chain decarbonization.


πŸŽ“ Education MBA, Kellogg School of Management, Northwestern University
🏭 Experience 20+ years across manufacturing, life sciences, energy & public sector
🌍 Scope Supply chain operations on five continents
πŸ“ Research Peer-reviewed publications on AI in sustainable supply chains
πŸ”¬ Patents IoT and AI solutions for manufacturing and logistics

LinkedIn GitHub Google Scholar Quantisage


πŸ“„ License

MIT License β€” see LICENSE for details.


Quantisage Supply Chain Climate

Part of the Quantisage Open Source Initiative | AI Γ— Supply Chain Γ— Climate

About

Real-time IoT data ingestion and streaming pipeline for Scope 3 carbon accounting using Apache Kafka, Spark, and PostgreSQL

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors