diff --git a/README.md b/README.md
index bb5260c..67ec9e7 100644
--- a/README.md
+++ b/README.md
@@ -821,7 +821,7 @@ uv run alembic upgrade head
> 📖 **[See CRUD operations guide in our docs](https://benavlabs.github.io/FastAPI-boilerplate/user-guide/database/crud/)**
-Inside `app/crud`, create a new `crud_entities.py` inheriting from `FastCRUD` for each new entity:
+Inside `app/crud`, create a new `crud_entity.py` inheriting from `FastCRUD` for each new entity:
```python
from fastcrud import FastCRUD
@@ -1028,42 +1028,56 @@ crud_user.get(db=db, username="myusername", schema_to_select=UserRead)
> 📖 **[See API endpoints guide in our docs](https://benavlabs.github.io/FastAPI-boilerplate/user-guide/api/endpoints/)**
-Inside `app/api/v1`, create a new `entities.py` file and create the desired routes
+Inside `app/api/v1`, create a new `entities.py` file and create the desired routes with proper dependency injection:
```python
-from typing import Annotated
-
-from fastapi import Depends
+from typing import Annotated, List
+from fastapi import Depends, Request, APIRouter
+from sqlalchemy.ext.asyncio import AsyncSession
from app.schemas.entity import EntityRead
from app.core.db.database import async_get_db
+from app.crud.crud_entity import crud_entity
-...
+router = APIRouter(tags=["entities"])
-router = fastapi.APIRouter(tags=["entities"])
-
-
-@router.get("/entities/{id}", response_model=List[EntityRead])
-async def read_entities(request: Request, id: int, db: Annotated[AsyncSession, Depends(async_get_db)]):
- entity = await crud_entities.get(db=db, id=id)
+@router.get("/entities/{id}", response_model=EntityRead)
+async def read_entity(
+ request: Request,
+ id: int,
+ db: Annotated[AsyncSession, Depends(async_get_db)]
+):
+ entity = await crud_entity.get(db=db, id=id)
+
+ if entity is None: # Explicit None check
+ raise NotFoundException("Entity not found")
+
return entity
-...
+@router.get("/entities", response_model=List[EntityRead])
+async def read_entities(
+ request: Request,
+ db: Annotated[AsyncSession, Depends(async_get_db)]
+):
+ entities = await crud_entity.get_multi(db=db, is_deleted=False)
+ return entities
```
-Then in `app/api/v1/__init__.py` add the router such as:
+Then in `app/api/v1/__init__.py` add the router:
```python
from fastapi import APIRouter
-from app.api.v1.entity import router as entity_router
+from app.api.v1.entities import router as entity_router
+from app.api.v1.users import router as user_router
+from app.api.v1.posts import router as post_router
-...
+router = APIRouter(prefix="/v1")
-router = APIRouter(prefix="/v1") # this should be there already
-...
-router.include_router(entity_router)
+router.include_router(user_router)
+router.include_router(post_router)
+router.include_router(entity_router) # Add your new router
```
#### 5.7.1 Paginated Responses
@@ -1100,6 +1114,9 @@ With the `get_multi` method we get a python `dict` with full suport for paginati
And in the endpoint, we can import from `fastcrud.paginated` the following functions and Pydantic Schema:
```python
+from typing import Annotated
+from fastapi import Depends, Request
+from sqlalchemy.ext.asyncio import AsyncSession
from fastcrud.paginated import (
PaginatedListResponse, # What you'll use as a response_model to validate
paginated_response, # Creates a paginated response based on the parameters
@@ -1119,13 +1136,16 @@ from app.schemas.entity import EntityRead
@router.get("/entities", response_model=PaginatedListResponse[EntityRead])
async def read_entities(
- request: Request, db: Annotated[AsyncSession, Depends(async_get_db)], page: int = 1, items_per_page: int = 10
+ request: Request,
+ db: Annotated[AsyncSession, Depends(async_get_db)],
+ page: int = 1,
+ items_per_page: int = 10
):
entities_data = await crud_entity.get_multi(
db=db,
offset=compute_offset(page, items_per_page),
limit=items_per_page,
- schema_to_select=UserRead,
+ schema_to_select=EntityRead,
is_deleted=False,
)
@@ -1139,15 +1159,48 @@ async def read_entities(
To add exceptions you may just import from `app/core/exceptions/http_exceptions` and optionally add a detail:
```python
-from app.core.exceptions.http_exceptions import NotFoundException
+from app.core.exceptions.http_exceptions import (
+ NotFoundException,
+ ForbiddenException,
+ DuplicateValueException
+)
+
+@router.post("/entities", response_model=EntityRead, status_code=201)
+async def create_entity(
+ request: Request,
+ entity_data: EntityCreate,
+ db: Annotated[AsyncSession, Depends(async_get_db)],
+ current_user: Annotated[UserRead, Depends(get_current_user)]
+):
+ # Check if entity already exists
+ if await crud_entity.exists(db=db, name=entity_data.name) is True:
+ raise DuplicateValueException("Entity with this name already exists")
+
+ # Check user permissions
+ if current_user.is_active is False: # Explicit boolean check
+ raise ForbiddenException("User account is disabled")
+
+ # Create the entity
+ entity = await crud_entity.create(db=db, object=entity_data)
+
+ if entity is None: # Explicit None check
+ raise CustomException("Failed to create entity")
+
+ return entity
-# If you want to specify the detail, just add the message
-if not user:
- raise NotFoundException("User not found")
-# Or you may just use the default message
-if not post:
- raise NotFoundException()
+@router.get("/entities/{id}", response_model=EntityRead)
+async def read_entity(
+ request: Request,
+ id: int,
+ db: Annotated[AsyncSession, Depends(async_get_db)]
+):
+ entity = await crud_entity.get(db=db, id=id)
+
+ if entity is None: # Explicit None check
+ raise NotFoundException("Entity not found")
+
+ return entity
```
**The predefined possibilities in http_exceptions are the following:**
diff --git a/docs/user-guide/api/endpoints.md b/docs/user-guide/api/endpoints.md
index e6c0118..313c152 100644
--- a/docs/user-guide/api/endpoints.md
+++ b/docs/user-guide/api/endpoints.md
@@ -50,7 +50,7 @@ async def get_user(
### 2. Get Multiple Items (with Pagination)
```python
-from fastcrud.paginated import PaginatedListResponse
+from fastcrud.paginated import PaginatedListResponse, paginated_response
@router.get("/", response_model=PaginatedListResponse[UserRead])
async def get_users(
@@ -66,10 +66,9 @@ async def get_users(
return_as_model=True,
return_total_count=True
)
-
return paginated_response(
crud_data=users,
- page=page,
+ page=page,
items_per_page=items_per_page
)
```
@@ -321,8 +320,8 @@ Your new endpoints will be available at:
Now that you understand basic endpoints:
-- **[Pagination](pagination.md)** - Add pagination to your endpoints
-- **[Database Schemas](../database/schemas.md)** - Create schemas for your data
-- **[CRUD Operations](../database/crud.md)** - Understand the CRUD layer
+- **[Pagination](pagination.md)** - Add pagination to your endpoints
+- **[Exceptions](exceptions.md)** - Custom error handling and HTTP exceptions
+- **[CRUD Operations](../database/crud.md)** - Understand the CRUD layer
The boilerplate provides everything you need - just follow these patterns!
\ No newline at end of file
diff --git a/docs/user-guide/api/exceptions.md b/docs/user-guide/api/exceptions.md
index d15ac15..9186ff9 100644
--- a/docs/user-guide/api/exceptions.md
+++ b/docs/user-guide/api/exceptions.md
@@ -458,8 +458,8 @@ async def test_duplicate_email(client: AsyncClient):
## What's Next
Now that you understand error handling:
-- **[Versioning](versioning.md)** - Learn how to version your APIs
-- **[Database CRUD](../database/crud.md)** - Understand the database operations
+- **[Versioning](versioning.md)** - Learn how to version your APIs
+- **[Database CRUD](../database/crud.md)** - Understand the database operations
- **[Authentication](../authentication/index.md)** - Add user authentication to your APIs
Proper error handling makes your API much more user-friendly and easier to debug!
\ No newline at end of file
diff --git a/docs/user-guide/caching/client-cache.md b/docs/user-guide/caching/client-cache.md
index a4ec8c1..7096e78 100644
--- a/docs/user-guide/caching/client-cache.md
+++ b/docs/user-guide/caching/client-cache.md
@@ -207,7 +207,8 @@ async def get_posts(
response: Response,
page: int = 1,
per_page: int = 10,
- category: str = None
+ category: str | None = None,
+ db: Annotated[AsyncSession, Depends(async_get_db)]
):
"""Conditional caching based on parameters."""
@@ -255,7 +256,8 @@ def generate_etag(data: Any) -> str:
async def get_user(
request: Request,
response: Response,
- user_id: int
+ user_id: int,
+ db: Annotated[AsyncSession, Depends(async_get_db)]
):
"""Endpoint with ETag support for efficient caching."""
@@ -289,7 +291,8 @@ Use Last-Modified headers for time-based cache validation:
async def get_post(
request: Request,
response: Response,
- post_id: int
+ post_id: int,
+ db: Annotated[AsyncSession, Depends(async_get_db)]
):
"""Endpoint with Last-Modified header support."""
@@ -343,13 +346,13 @@ async def serve_static(response: Response, file_path: str):
```python
# Reference data (rarely changes)
@router.get("/api/v1/countries")
-async def get_countries(response: Response, db: AsyncSession = Depends(async_get_db)):
+async def get_countries(response: Response, db: Annotated[AsyncSession, Depends(async_get_db)]):
response.headers["Cache-Control"] = "public, max-age=86400" # 24 hours
return await crud_countries.get_all(db=db)
# User-generated content (moderate changes)
@router.get("/api/v1/posts")
-async def get_posts(response: Response, db: AsyncSession = Depends(async_get_db)):
+async def get_posts(response: Response, db: Annotated[AsyncSession, Depends(async_get_db)]):
response.headers["Cache-Control"] = "public, max-age=1800" # 30 minutes
return await crud_posts.get_multi(db=db, is_deleted=False)
@@ -358,7 +361,7 @@ async def get_posts(response: Response, db: AsyncSession = Depends(async_get_db)
async def get_notifications(
response: Response,
current_user: dict = Depends(get_current_user),
- db: AsyncSession = Depends(async_get_db)
+ db: Annotated[AsyncSession, Depends(async_get_db)]
):
response.headers["Cache-Control"] = "private, max-age=300" # 5 minutes
response.headers["Vary"] = "Authorization"
@@ -444,12 +447,15 @@ async def update_post(
response: Response,
post_id: int,
post_data: PostUpdate,
- current_user: dict = Depends(get_current_user)
+ current_user: dict = Depends(get_current_user),
+ db: Annotated[AsyncSession, Depends(async_get_db)]
):
"""Update post and invalidate related caches."""
# Update the post
updated_post = await crud_posts.update(db=db, id=post_id, object=post_data)
+ if not updated_post:
+ raise HTTPException(status_code=404, detail="Post not found")
# Set headers to indicate cache invalidation is needed
response.headers["Cache-Control"] = "no-cache"
diff --git a/docs/user-guide/caching/redis-cache.md b/docs/user-guide/caching/redis-cache.md
index cf67e72..e9e6092 100644
--- a/docs/user-guide/caching/redis-cache.md
+++ b/docs/user-guide/caching/redis-cache.md
@@ -21,14 +21,16 @@ The `@cache` decorator provides a simple interface for adding caching to any Fas
### Basic Usage
```python
-from fastapi import APIRouter, Request
+from fastapi import APIRouter, Request, Depends
+from sqlalchemy.orm import Session
from app.core.utils.cache import cache
+from app.core.db.database import get_db
router = APIRouter()
@router.get("/posts/{post_id}")
@cache(key_prefix="post_cache", expiration=3600)
-async def get_post(request: Request, post_id: int):
+async def get_post(request: Request, post_id: int, db: Session = Depends(get_db)):
# This function's result will be cached for 1 hour
post = await crud_posts.get(db=db, id=post_id)
return post
diff --git a/docs/user-guide/rate-limiting/index.md b/docs/user-guide/rate-limiting/index.md
index 1652147..8cba87a 100644
--- a/docs/user-guide/rate-limiting/index.md
+++ b/docs/user-guide/rate-limiting/index.md
@@ -30,10 +30,10 @@ async def create_post(post_data: PostCreate):
### Rate Limiting Components
-**Rate Limiter Class**: Singleton Redis client for checking limits
-**User Tiers**: Database-stored user subscription levels
-**Rate Limit Rules**: Path-specific limits per tier
-**Dependency Injection**: Automatic enforcement via FastAPI dependencies
+**Rate Limiter Class**: Singleton Redis client for checking limits
+**User Tiers**: Database-stored user subscription levels
+**Rate Limit Rules**: Path-specific limits per tier
+**Dependency Injection**: Automatic enforcement via FastAPI dependencies
### How It Works
@@ -110,6 +110,59 @@ async def protected_endpoint():
# 3. Checks Redis counter
# 4. Allows or blocks the request
```
+#### Example Dependency Implementation
+
+To make the rate limiting dependency functional, you must implement how user tiers and paths resolve to actual rate limits.
+Below is a complete example using Redis and the database to determine per-tier and per-path restrictions.
+
+```python
+async def rate_limiter_dependency(
+ request: Request,
+ db: AsyncSession = Depends(async_get_db),
+ user=Depends(get_current_user_optional),
+):
+ """
+ Enforces rate limits per user tier and API path.
+
+ - Identifies user (or defaults to IP-based anonymous rate limit)
+ - Finds tier-specific limit for the request path
+ - Checks Redis counter to determine if request should be allowed
+ """
+ path = sanitize_path(request.url.path)
+ user_id = getattr(user, "id", None) or request.client.host or "anonymous"
+
+ # Determine user tier (default to "free" or anonymous)
+ if user and getattr(user, "tier_id", None):
+ tier = await crud_tiers.get(db=db, id=user.tier_id)
+ else:
+ tier = await crud_tiers.get(db=db, name="free")
+
+ if not tier:
+ raise RateLimitException("Tier configuration not found")
+
+ # Find specific rate limit rule for this path + tier
+ rate_limit_rule = await crud_rate_limits.get_by_path_and_tier(
+ db=db, path=path, tier_id=tier.id
+ )
+
+ # Use default limits if no specific rule is found
+ limit = getattr(rate_limit_rule, "limit", 100)
+ period = getattr(rate_limit_rule, "period", 3600)
+
+ # Check rate limit in Redis
+ is_limited = await rate_limiter.is_rate_limited(
+ db=db,
+ user_id=user_id,
+ path=path,
+ limit=limit,
+ period=period,
+ )
+
+ if is_limited:
+ raise RateLimitException(
+ f"Rate limit exceeded for path '{path}'. Try again later."
+ )
+```
### Redis-Based Counting