Unified connection pooling for Python, inspired by Netflix's Dyno
PyDyno is a modern, async-first connection pooling library that provides a unified interface for managing connections to databases, caches, and HTTP services. Built with attrs and designed for production use.
import asyncio
from pydyno import PyDyno, PoolConfig
from pydyno.adapters.postgresql import PostgreSQLAdapter
async def main():
# Create PyDyno manager
dyno = PyDyno()
# Configure PostgreSQL pool
config = {
'host': 'localhost',
'user': 'postgres',
'password': 'password',
'database': 'mydb'
}
adapter = PostgreSQLAdapter(
name="main_db",
service_type="postgresql",
config=config,
pool_config=PoolConfig(max_connections=20)
)
# Add pool to manager
await dyno.create_pool("main_db", adapter)
# Use the pool
pool = dyno.get_pool("main_db")
async with pool.session_scope() as session:
result = await session.execute(text("SELECT version()"))
print(result.scalar())
# Cleanup
await dyno.close_all()
asyncio.run(main())- ๐ Unified Interface: One consistent API for all service types
- โก Async-First: Built for modern async Python applications
- ๐ Built-in Metrics: Track requests, response times, and health
- ๐ฅ Health Monitoring: Automatic background health checks
- ๐ก๏ธ Production Ready: Robust error handling and connection recovery
- ๐ง Highly Configurable: Fine-tune connection pools for your needs
- ๐ฆ Clean Architecture: Easy to extend with new adapters
| Service | Status | Adapter |
|---|---|---|
| PostgreSQL | โ Ready | PostgreSQLAdapter |
| Kafka | โ Ready | KafkaAdapter |
| Redis | ๐ง Planned | RedisAdapter |
| HTTP APIs | ๐ง Planned | HTTPAdapter |
# Basic installation
pip install pydyno
# With PostgreSQL support
pip install pydyno[postgresql]
# With Kafka support
pip install pydyno[kafka]
# With all supported services
pip install pydyno[all]
# Development installation
git clone https://github.com/yourusername/pydyno.git
cd pydyno
pip install -e .PyDyno Manager: Central coordinator that manages multiple connection pools Adapters: Service-specific implementations (PostgreSQL, Redis, etc.) Pool Config: Configuration for connection pool behavior Metrics: Built-in monitoring and performance tracking
from pydyno.core.pool_config import PoolConfig
# Customize pool behavior
pool_config = PoolConfig(
max_connections=20, # Maximum connections in pool
min_connections=2, # Minimum connections to maintain
max_overflow=30, # Additional connections beyond max
timeout=30.0, # Connection timeout in seconds
pool_recycle=3600, # Recycle connections after 1 hour
pool_pre_ping=True, # Verify connections before use
retry_attempts=3, # Retry failed operations
health_check_interval=60.0, # Health check frequency
echo=False # Log SQL queries (PostgreSQL)
)from pydyno.adapters.postgresql import PostgreSQLAdapter, create_postgresql_adapter
# Method 1: Direct creation
adapter = PostgreSQLAdapter(
name="my_db",
service_type="postgresql",
config={
'host': 'localhost',
'port': 5432,
'user': 'postgres',
'password': 'password',
'database': 'myapp'
},
pool_config=PoolConfig(max_connections=10)
)
# Method 2: From environment variables
# Set: POSTGRES_HOST, POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_DB
adapter = create_postgresql_adapter("my_db")
# Method 3: From DATABASE_URL
# Set: DATABASE_URL=postgresql://user:pass@host:5432/db
adapter = create_postgresql_adapter("my_db")from pydyno.adapters import create_kafka_adapter, create_emby_kafka_adapter
# Method 1: Direct creation
adapter = create_kafka_adapter(
name="my_kafka",
bootstrap_servers=["localhost:9092"],
client_type="both", # "producer", "consumer", or "both"
producer_config={
'acks': 'all',
'retries': 3,
'enable_idempotence': True
},
consumer_config={
'group_id': 'my-service',
'auto_offset_reset': 'latest'
}
)
# Method 2: Optimized for streaming applications (like Emby)
adapter = create_emby_kafka_adapter(
name="streaming_kafka",
bootstrap_servers=["localhost:9092"],
consumer_group="media-processor"
)
# Method 3: From environment variables
# Set: KAFKA_BOOTSTRAP_SERVERS, KAFKA_CONSUMER_GROUP
from pydyno.adapters import create_kafka_adapter_from_env
adapter = await create_kafka_adapter_from_env("env_kafka")from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from pydyno import PyDyno
from pydyno.adapters.postgresql import create_postgresql_adapter
# Global PyDyno instance
dyno = PyDyno()
async def startup_event():
"""Initialize database pool on startup"""
adapter = create_postgresql_adapter("main_db")
await dyno.create_pool("main_db", adapter)
async def shutdown_event():
"""Cleanup on shutdown"""
await dyno.close_all()
# FastAPI dependency
async def get_db_session() -> AsyncSession:
"""Get database session for routes"""
pool = dyno.get_pool("main_db")
return await pool.get_session()
# Use in routes
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db_session)):
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
# FastAPI app setup
app = FastAPI()
app.add_event_handler("startup", startup_event)
app.add_event_handler("shutdown", shutdown_event)import asyncio
from pydyno import PyDyno
from pydyno.adapters import create_kafka_adapter
async def streaming_example():
# Create PyDyno manager
dyno = PyDyno()
# Create Kafka adapter
kafka_adapter = create_kafka_adapter(
name="stream_processor",
bootstrap_servers=["localhost:9092"],
client_type="both"
)
await dyno.create_pool("kafka", kafka_adapter)
pool = dyno.get_pool("kafka")
# Produce messages
for i in range(5):
await pool.send_message(
topic="events",
value={"event_id": i, "data": f"Event {i}"},
key=f"event_{i}"
)
print("โ
Sent 5 messages")
# Consume messages
async with pool.consume_messages(["events"]) as consumer:
count = 0
async for message in consumer:
print(f"Received: {message.value}")
count += 1
if count >= 5:
break
await dyno.close_all()
asyncio.run(streaming_example())# Automatic transaction management
async with adapter.session_scope() as session:
# Create user
user = User(name="John", email="john@example.com")
session.add(user)
# Update user (same transaction)
user.last_login = datetime.utcnow()
# Automatically commits on success, rolls back on error
# Raw SQL queries
result = await adapter.execute_scalar("SELECT COUNT(*) FROM users")
print(f"Total users: {result}")
# Query with parameters
users = await adapter.execute_query(
"SELECT * FROM users WHERE created_at > :date",
{"date": datetime(2024, 1, 1)}
)# Check health of all pools
health_results = await dyno.health_check()
print(health_results) # {'main_db': True, 'cache': True}
# Check specific pool
is_healthy = await dyno.health_check("main_db")
# Get detailed metrics
metrics = await dyno.get_metrics_dict()
for pool_name, pool_metrics in metrics.items():
print(f"Pool: {pool_name}")
print(f" Total requests: {pool_metrics['total_requests']}")
print(f" Success rate: {pool_metrics['success_rate']:.1f}%")
print(f" Avg response time: {pool_metrics['average_response_time']:.3f}s")
print(f" Health: {pool_metrics['health_status']}")async def setup_multiple_databases():
dyno = PyDyno()
# Primary database
primary_adapter = PostgreSQLAdapter(
name="primary",
service_type="postgresql",
config={"url": "postgresql://user:pass@primary-db:5432/app"},
pool_config=PoolConfig(max_connections=20)
)
# Analytics database (read-only)
analytics_adapter = PostgreSQLAdapter(
name="analytics",
service_type="postgresql",
config={"url": "postgresql://user:pass@analytics-db:5432/analytics"},
pool_config=PoolConfig(max_connections=5, echo=True)
)
# Add both pools
await dyno.create_pool("primary", primary_adapter)
await dyno.create_pool("analytics", analytics_adapter)
return dyno
# Use different databases
async def get_user_analytics(dyno: PyDyno, user_id: int):
# Write to primary
primary = dyno.get_pool("primary")
async with primary.session_scope() as session:
user = User(id=user_id, name="John")
session.add(user)
# Read from analytics
analytics = dyno.get_pool("analytics")
result = await analytics.execute_scalar(
"SELECT COUNT(*) FROM user_events WHERE user_id = :user_id",
{"user_id": user_id}
)
return resultPyDyno includes comprehensive testing capabilities:
Run the full integration test suite with Docker:
# Complete integration test suite (includes PostgreSQL setup)
./run_integration_tests.sh
# Setup test environment for manual testing
./run_integration_tests.sh --setup-only
# Cleanup test environment
./run_integration_tests.sh --cleanupWhat's tested:
- Real PostgreSQL connectivity and pooling
- Transaction management and rollback
- Complex queries, JSON operations, stored procedures
- Concurrent connection handling
- Health monitoring and metrics collection
- Error handling and recovery
For quick functionality checks without Docker:
# Run basic functionality tests (no database required)
python -m src.pydyno.scripts.test_pydyno_basic
# Manual database tests (requires existing PostgreSQL)
export POSTGRES_HOST=localhost
export POSTGRES_USER=postgres
export POSTGRES_PASSWORD=password
export POSTGRES_DB=test_db
python -m src.pydyno.scripts.test_postgres_adapter- Docker Integration Tests: Docker and Docker Compose
- Manual Tests: Python 3.12+, PostgreSQL instance
- CI/CD: See
tests/README.mdfor setup examples
PyDyno follows a clean, extensible architecture:
pydyno/
โโโ core/
โ โโโ manager.py # PyDyno main manager
โ โโโ adapters.py # Base adapter interface
โ โโโ pool_config.py # Configuration classes
โ โโโ utils.py # Metrics and utilities
โ โโโ exceptions.py # Custom exceptions
โโโ adapters/
โโโ postgresql.py # PostgreSQL adapter
โโโ redis.py # Redis adapter (planned)
โโโ http.py # HTTP adapter (planned)
from pydyno.core.adapters import ConnectionAdapter
class CustomAdapter(ConnectionAdapter):
"""Custom service adapter"""
async def initialize(self):
"""Set up your service connection pool"""
# Initialize your client/connection pool
self._client = YourServiceClient(
**self.config,
max_connections=self.pool_config.max_connections
)
self._initialized = True
async def health_check(self) -> bool:
"""Check service health"""
try:
await self._client.ping()
self.metrics.record_health_check(True)
return True
except Exception:
self.metrics.record_health_check(False)
return False
async def close(self):
"""Clean up resources"""
if self._client:
await self._client.close()
self._closed = TrueModern applications often need to connect to multiple services:
- Primary database (PostgreSQL)
- Cache layer (Redis)
- External APIs (HTTP)
- Message queues (Kafka)
Each service has its own connection pooling mechanism, configuration format, and management approach. This leads to:
- Inconsistent APIs across your codebase
- Scattered configuration and monitoring
- Duplicate connection management logic
- No unified health checking
PyDyno provides a single, unified interface for all your connection pools:
- โ One API for all services
- โ Consistent configuration patterns
- โ Unified metrics and monitoring
- โ Centralized health checking
- โ Production-ready error handling
Netflix's Dyno library solved this problem for Java applications at massive scale. PyDyno brings these same architectural patterns to Python, adapted for modern async applications.
- Redis connection adapter
- Pub/Sub support
- Redis Cluster support
- HTTP adapter for API calls
- Load balancing strategies
- Circuit breaker pattern
- Kafka adapter
- Service discovery integration
- Prometheus metrics export
- Comprehensive test suite
- Performance optimizations
- Full documentation
- Stability guarantees
We welcome contributions! Areas where help is needed:
- New Adapters: Redis, HTTP, Kafka, MongoDB
- Testing: More test cases and edge cases
- Documentation: Examples and tutorials
- Performance: Benchmarks and optimizations
# Development setup
git clone https://github.com/yourusername/pydyno.git
cd pydyno
pip install -e ".[dev]"
# Run tests
python test_pydyno_basic.py
# Code formatting
black src/
isort src/MIT License - see LICENSE file for details.
- Inspired by Netflix's Dyno library
- Built with attrs for clean Python classes
- Uses SQLAlchemy for PostgreSQL support
- Issues: GitHub Issues
- Discussions: GitHub Discussions
PyDyno - Making connection pooling simple, unified, and powerful. ๐
