A robust ETL (Extract, Transform, Load) pipeline for fetching Solana blockchain data, classifying transactions, and storing them in PostgreSQL. Built with Rust for performance and reliability.
- Complete ETL Pipeline: Extract blocks from Solana RPC β Transform/classify transactions β Load into PostgreSQL
- Transaction Classification: Automatically identifies SOL transfers, SPL token transfers, DEX swaps, NFT operations, and program interactions
- Robust Error Handling: Retry logic with exponential backoff for network failures
- Batch Processing: Efficient batch insertion with database transactions for atomicity
- Configurable CLI: Full command-line interface for all parameters
- Real-time Statistics: Progress tracking, throughput metrics, and success rates
- Idempotent Operations: UPSERT logic allows re-processing blocks without duplicates
- Rust 1.70+ (2021 edition)
- PostgreSQL 14+
- Helius RPC API key (or any Solana RPC endpoint)
# Create database and user
sudo -u postgres psql
CREATE DATABASE solana_block_data;
CREATE USER solana_user WITH PASSWORD 'solana_pass';
GRANT ALL PRIVILEGES ON DATABASE solana_block_data TO solana_user;
\c solana_block_data
GRANT ALL ON SCHEMA public TO solana_user;
\qCreate a .env file:
HELIUS_RPC_URL=https://mainnet.helius-rpc.com/?api-key=YOUR_API_KEY
DATABASE_URL=postgresql://solana_user:solana_pass@localhost:5432/solana_block_data
RUST_LOG=info# Build release version
cargo build --release
# Run with default settings (10 recent blocks)
./target/release/block-data-fetcher
# Run with custom parameters
./target/release/block-data-fetcher --num-blocks 50 --batch-size 25# Process 20 recent blocks
./block-data-fetcher --num-blocks 20
# Process specific slot range
./block-data-fetcher --start-slot 375000000 --end-slot 375000100
# Use custom RPC endpoint
./block-data-fetcher --rpc-url https://api.mainnet-beta.solana.com --num-blocks 10
# Continuous mode (keep processing latest blocks)
./block-data-fetcher --continuous --interval 30| Option | Description | Default |
|---|---|---|
-s, --start-slot <SLOT> |
Starting slot number | latest - 30 |
-e, --end-slot <SLOT> |
Ending slot number | latest - 20 |
-n, --num-blocks <COUNT> |
Number of blocks to fetch | - |
-r, --rpc-url <URL> |
RPC endpoint URL | From .env |
-d, --database-url <URL> |
Database connection URL | From .env |
-b, --batch-size <SIZE> |
Batch size for processing | 10 |
--max-retries <COUNT> |
Maximum retry attempts | 3 |
--retry-delay <SECONDS> |
Retry delay in seconds | 2 |
-c, --continuous |
Enable continuous mode | false |
--interval <SECONDS> |
Poll interval for continuous mode | 10 |
-h, --help |
Print help information | - |
-V, --version |
Print version information | - |
./block-data-fetcher \
--start-slot 375000000 \
--end-slot 375010000 \
--batch-size 50 \
--max-retries 5./block-data-fetcher \
--num-blocks 10 \
--continuous \
--interval 20./block-data-fetcher \
--rpc-url https://api.mainnet-beta.solana.com \
--num-blocks 100 \
--batch-size 25 \
--max-retries 5 \
--retry-delay 3./block-data-fetcher --num-blocks 5./block-data-fetcher \
--database-url postgresql://user:pass@localhost:5432/mydb \
--num-blocks 20The following environment variables are used when CLI arguments are not provided:
HELIUS_RPC_URL- Solana RPC endpointDATABASE_URL- PostgreSQL connection stringRUST_LOG- Logging level (info, debug, trace)
These are loaded from a .env file if present.
graph TB
subgraph "External Services"
A[Solana RPC<br/>Helius API]
B[(PostgreSQL<br/>Database)]
end
subgraph "CLI Layer"
C[Command Line<br/>Interface]
end
subgraph "Pipeline Orchestration"
D[Pipeline<br/>Coordinator]
E[Statistics<br/>Tracker]
F[Error Handler<br/>& Retry Logic]
end
subgraph "ETL Pipeline"
G[Extract<br/>RPC Client]
H[Transform<br/>Classifier]
I[Load<br/>Database Writer]
end
C -->|Config| D
D -->|Fetch Blocks| G
G -->|Block Data| A
A -->|Response| G
G -->|Raw Data| H
H -->|Classified| I
I -->|Batch Insert| B
D -->|Monitor| E
D -->|Handle Errors| F
F -->|Retry| G
E -->|Report| C
style A fill:#e1f5ff
style B fill:#e1f5ff
style D fill:#fff4e1
style G fill:#e8f5e9
style H fill:#e8f5e9
style I fill:#e8f5e9
sequenceDiagram
participant CLI
participant Pipeline
participant RPC as RPC Client
participant Helius as Helius API
participant Classifier
participant DB as PostgreSQL
CLI->>Pipeline: Start (slot range)
loop For each block
Pipeline->>RPC: fetch_block(slot)
RPC->>Helius: getBlock(slot, maxSupportedVersion)
Helius-->>RPC: Block + Transactions
RPC-->>Pipeline: RawBlockData
Pipeline->>Classifier: classify_transactions(block)
loop For each transaction
Classifier->>Classifier: analyze_instructions()
Classifier->>Classifier: determine_type()
end
Classifier-->>Pipeline: ClassifiedData
Pipeline->>DB: batch_insert(classified_data)
alt Insert Success
DB-->>Pipeline: OK (rows inserted)
Pipeline->>Pipeline: update_stats(success)
else Insert Failure
DB-->>Pipeline: Error
Pipeline->>Pipeline: retry_logic()
Pipeline->>DB: batch_insert(retry)
end
end
Pipeline-->>CLI: Statistics Report
erDiagram
BLOCKS ||--o{ TRANSACTIONS : contains
TRANSACTIONS ||--o{ INSTRUCTIONS : has
TRANSACTIONS ||--o{ ACCOUNTS : involves
INSTRUCTIONS }o--|| PROGRAM_REGISTRY : references
BLOCKS {
bigint slot PK
text blockhash UK
bigint parent_slot
timestamptz block_time
bigint block_height
jsonb raw_data
timestamptz created_at
}
TRANSACTIONS {
text signature PK
bigint slot FK
int index_in_block
text fee_payer
bigint fee
text status
text transaction_type
jsonb raw_data
timestamptz created_at
}
INSTRUCTIONS {
bigserial id PK
text transaction_signature FK
int instruction_index
text program_id
text instruction_type
jsonb data
timestamptz created_at
}
ACCOUNTS {
bigserial id PK
text transaction_signature FK
text pubkey
bool is_signer
bool is_writable
bigint pre_balance
bigint post_balance
timestamptz created_at
}
PROGRAM_REGISTRY {
text program_id PK
text program_name
text program_type
text description
timestamptz created_at
}
flowchart TD
A[Transaction] --> B{Has Instructions?}
B -->|No| Z[Unknown Type]
B -->|Yes| C[Analyze Instructions]
C --> D{System Program<br/>Transfer?}
D -->|Yes| E[SOL Transfer]
D -->|No| F{Token Program<br/>Transfer?}
F -->|Yes| G[Token Transfer]
F -->|No| H{DEX Program?}
H -->|Yes| I[DEX Swap]
H -->|No| J{NFT Operation?}
J -->|Yes| K[NFT Operation]
J -->|No| L{Known Program?}
L -->|Yes| M[Program Interaction]
L -->|No| Z
style E fill:#4caf50
style G fill:#2196f3
style I fill:#ff9800
style K fill:#9c27b0
style M fill:#00bcd4
style Z fill:#9e9e9e
- Extract: Fetch blocks from Solana RPC with rate limiting and error handling
- Transform: Classify transactions based on program IDs and instruction data
- Load: Batch insert into PostgreSQL with atomic transactions
The system uses 6 tables with proper relationships:
blocks: Block metadata (slot, blockhash, timestamp, parent relationships)transactions: Transaction details with classification labels, linked to blocksinstructions: Individual instruction data, linked to transactionsaccounts: Account states (pre/post balances, signer status)program_registry: Known Solana programs for classification- Indexes: Optimized for common queries on slots, signatures, and program IDs
Automatically identifies:
- πΈ SOL Transfers: Native SOL transfers via System Program
- πͺ SPL Token Transfers: Token transfers via Token Program
- π DEX Swaps: Interactions with Raydium, Orca, Jupiter, etc.
- πΌοΈ NFT Operations: NFT mints and transfers
- βοΈ Program Interactions: Other program invocations (Drift, Kamino, etc.)
- β Unknown: Unclassified transactions
- Batch Processing: Process blocks in configurable batches (default: 10) to balance memory vs. throughput
- UPSERT Strategy: Use PostgreSQL UPSERT to enable idempotent re-processing without duplicates
- Classification at ETL Time: Pre-compute transaction types for faster queries
- Exponential Backoff: Handle transient failures (network, rate limits) with configurable retry logic
- JSONB Storage: Structured schema + JSONB for raw data provides queryability + flexibility
Typical performance metrics from real-world testing:
- Throughput: 200-300 transactions/second (including classification)
- Batch Processing: 10 blocks (~12,000 transactions) in ~25 seconds
- Success Rate: 99-100% with retry logic enabled
- Database: 28,597+ transactions processed across 26+ blocks
- Idempotency: UPSERT operations allow safe re-processing without duplicates
- Language: Rust 2021 edition
- Async Runtime: Tokio for concurrent operations
- Database: PostgreSQL 16 with sqlx 0.8 (compile-time checked queries)
- Blockchain: Solana SDK v2.0 + Helius RPC integration
- CLI: clap v4.5 with derive macros
- Error Handling: anyhow + thiserror for comprehensive error context
- Logging: tracing for structured logging
- JsonParsed encoding simplifies instruction parsing
- Account keys array determines signer/fee payer roles
- Program IDs provide reliable classification basis
- UPSERT enables idempotent operations
- JSONB storage provides flexibility for evolving schemas
- Foreign keys ensure referential integrity
- GIN indexes on JSONB optimize query performance
Arc<T>for sharing non-Clone types across async tasks- Database transactions ensure atomicity
- sqlx compile-time checking catches SQL errors at build time
- clap derive macros create elegant, type-safe CLIs
- Exponential backoff handles transient network failures
- Detailed error context tracks failure stages
- Continue-on-error pattern processes remaining blocks
- More program-specific parsers (MEV, governance, staking)
- NFT metadata extraction and enrichment
- DeFi protocol-specific analysis
- SQL views for common analytics queries
- Transaction volume and pattern analytics
- Fee analysis and trends over time
- REST API for querying stored data
- WebSocket for real-time transaction updates
- GraphQL for flexible data queries
- Web dashboard for metrics
- Transaction flow diagrams
- Volume, fee, and program interaction charts
- Parallel block fetching with async concurrency
- True bulk INSERT using PostgreSQL UNNEST
- In-memory caching for program registry
# Format code
cargo fmt --all
# Run linter
cargo clippy -- --deny warnings
# Run full checks
just full-check
# Run tests
cargo testsrc/
βββ cli.rs # Command-line interface
βββ db/ # Database connection and migrations
βββ etl/ # ETL pipeline modules
β βββ extract.rs # Block fetching from RPC
β βββ transform.rs # Transaction classification
β βββ load.rs # Database insertion
β βββ parsers/ # Instruction parsers
βββ models.rs # Data models
βββ pipeline.rs # Pipeline orchestration
βββ rpc/ # RPC client wrapper
migrations/ # Database migrations
docs/ # Documentation
Build a production-quality Rust ETL pipeline to extract Solana blockchain data, classify transactions into meaningful categories, and store them in PostgreSQL for analysis and querying.
β
Extract blocks from Solana mainnet via RPC
β
Classify transactions into 6 meaningful categories
β
Store data in structured PostgreSQL schema
β
Handle errors gracefully with retry logic
β
Process arbitrary block ranges efficiently
β
Provide CLI for flexible configuration
β
Track and report real-time statistics
β
Maintain high success rate (99%+)
β
Achieve good throughput (200+ txs/sec)
β
Create comprehensive documentation
- Production ETL Pipelines: Building robust data pipelines in Rust with proper error handling
- Blockchain Data Processing: Working with Solana's complex transaction structure and RPC APIs
- Database Design: Effective schema design with proper relationships, indexes, and JSONB flexibility
- Async Rust: Leveraging Tokio for concurrent operations and efficient I/O
- CLI Design: Creating user-friendly command-line interfaces with clap
- Error Resilience: Implementing retry logic, exponential backoff, and graceful degradation
Status: β
Feature Complete
Date: October 23, 2025
Author: 0xfave
This is a personal side project, but suggestions and improvements are welcome!
This project is licensed under MIT.
- Built with Solana SDK
- Uses Helius RPC for enhanced Solana data access
- Inspired by the need for structured Solana transaction data