From 97fd9615b082f03630090b0a1b818502160ddc2a Mon Sep 17 00:00:00 2001 From: script-logic Date: Sun, 25 Jan 2026 18:04:59 +0300 Subject: [PATCH 1/6] fix: code styles --- .secrets.baseline | 4 +-- app/__init__.py | 41 +++++++++++++++++------ app/core/__init__.py | 11 ++---- app/core/config.py | 68 +++++--------------------------------- app/core/logger.py | 7 ++-- app/database/__init__.py | 4 +-- app/database/manager.py | 3 ++ app/database/models.py | 16 ++------- app/database/repository.py | 29 ---------------- app/main.py | 18 ++-------- 10 files changed, 54 insertions(+), 147 deletions(-) diff --git a/.secrets.baseline b/.secrets.baseline index 0966d7a..30e3a31 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -167,7 +167,7 @@ "filename": "app\\core\\config.py", "hashed_secret": "5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8", "is_verified": false, - "line_number": 54 + "line_number": 43 } ], "tests\\test_config.py": [ @@ -229,5 +229,5 @@ } ] }, - "generated_at": "2026-01-24T19:55:12Z" + "generated_at": "2026-01-25T15:04:21Z" } diff --git a/app/__init__.py b/app/__init__.py index 59689de..5b42b92 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -1,23 +1,42 @@ """ Deribit Price Tracker API application package. -Main application module with metadata, configuration and logger -initialization. +Main application module with metadata and imports. """ +from functools import lru_cache from importlib.metadata import metadata from . import core, database -try: - pkg_metadata = metadata("deribit-tracker").json - version = str(pkg_metadata.get("version", "Unknown version")) - description = str(pkg_metadata.get("summary", "Unknown description")) - title = str(pkg_metadata.get("name", "Untitled")).replace("-", " ").title() -except Exception: - version = "Unknown version" - description = "Unknown description" - title = "Untitled" + +@lru_cache +def get_app_metadata() -> tuple[str, str, str]: + """ + Retrieves application metadata from the installed package information. + + Returns: + tuple: A three-element tuple containing: + version (str): Current application version (e.g., "0.3.0") + description (str): Brief application description + title (str): Formatted application title (e.g., "Deribit Tracker") + + Note: Requires package to be installed (e.g., via poetry install) + """ + try: + app_meta = metadata("deribit-tracker").json + version = str(app_meta.get("version", "Unknown version")) + description = str(app_meta.get("summary", "Unknown description")) + title = str(app_meta.get("name", "Untitled")).replace("-", " ").title() + except Exception: + version = "Unknown version" + description = "Unknown description" + title = "Untitled" + + return version, description, title + + +version, description, title = get_app_metadata() __all__ = [ diff --git a/app/core/__init__.py b/app/core/__init__.py index 9329280..bca35d3 100644 --- a/app/core/__init__.py +++ b/app/core/__init__.py @@ -6,19 +6,13 @@ centralized logging. """ -from .config import ( - get_settings, - init_settings, -) -from .logger import ( - get_logger, -) +from .config import get_settings +from .logger import get_logger logger = get_logger(__name__) try: - init_settings() settings = get_settings() except Exception as e: logger.error("Failed to initialize settings: %s", e) @@ -27,5 +21,4 @@ __all__ = [ "get_logger", "get_settings", - "init_settings", ] diff --git a/app/core/config.py b/app/core/config.py index 54d5b80..0a8262c 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -5,23 +5,12 @@ environment variable loading, and singleton pattern for global access. """ -from typing import ( - Any, - ClassVar, -) +from typing import Any, ClassVar -from pydantic import ( - BaseModel, - Field, - SecretStr, - field_validator, -) -from pydantic_settings import ( - BaseSettings, - SettingsConfigDict, -) +from pydantic import BaseModel, Field, SecretStr, field_validator +from pydantic_settings import BaseSettings, SettingsConfigDict -from .logger import AppLogger +from .logger import AppLogger, get_logger class DatabaseSettings(BaseModel): @@ -219,26 +208,8 @@ class Settings(BaseSettings): frozen=True, ) - def __init__(self, **kwargs: Any) -> None: - """ - Private constructor for singleton pattern. - - Args: - **kwargs: Configuration values to override environment variables. - - Raises: - RuntimeError: If attempting to create multiple instances. - """ - if Settings._instance is not None: - raise RuntimeError( - "Settings is a singleton class. Use Settings.get_instance() " - "instead.", - ) - - super().__init__(**kwargs) - @classmethod - def get_instance(cls, **kwargs: Any) -> "Settings": + def init_instance(cls, **kwargs: Any) -> "Settings": """ Get singleton settings instance. @@ -256,7 +227,7 @@ def get_instance(cls, **kwargs: Any) -> "Settings": def _log_initialization(self) -> None: """Log settings initialization (excluding sensitive data).""" - self._logger = AppLogger.get_logger(__name__) + self._logger = get_logger(__name__) if self.application.debug: AppLogger.set_level("DEBUG") @@ -288,37 +259,14 @@ def _log_initialization(self) -> None: ) -# Global access functions -def get_settings() -> Settings: +def get_settings(**kwargs) -> Settings: """ Get singleton settings instance. Returns: Global Settings instance. - - Raises: - RuntimeError: If settings not initialized. """ if Settings._instance is None: - raise RuntimeError( - "Settings not initialized. Call init_settings() first.", - ) + return Settings.init_instance(**kwargs) return Settings._instance - - -# Initialize settings on import -def init_settings(**kwargs: Any) -> Settings: - """ - Initialize application settings explicitly. - - Useful for controlling initialization timing or passing - configuration programmatically. - - Args: - **kwargs: Configuration values to override environment variables. - - Returns: - Initialized Settings instance. - """ - return Settings.get_instance(**kwargs) diff --git a/app/core/logger.py b/app/core/logger.py index b396394..b8e74cf 100644 --- a/app/core/logger.py +++ b/app/core/logger.py @@ -23,16 +23,13 @@ class AppLogger: """ _initialized: ClassVar[bool] = False + _date_format: ClassVar[str] = "%Y-%m-%d %H:%M:%S" _log_format: ClassVar[str] = ( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) - _date_format: ClassVar[str] = "%Y-%m-%d %H:%M:%S" @classmethod - def get_logger( - cls, - name: str, - ) -> logging.Logger: + def get_logger(cls, name: str) -> logging.Logger: """ Get or create application logger instance. diff --git a/app/database/__init__.py b/app/database/__init__.py index 67e01e4..0762c27 100644 --- a/app/database/__init__.py +++ b/app/database/__init__.py @@ -6,12 +6,10 @@ from .base import Base from .deps import get_db_session -from .manager import DatabaseManager +from .manager import database_manager __all__ = [ "Base", "database_manager", "get_db_session", ] - -database_manager = DatabaseManager() diff --git a/app/database/manager.py b/app/database/manager.py index e246dce..570349c 100644 --- a/app/database/manager.py +++ b/app/database/manager.py @@ -110,3 +110,6 @@ def is_initialized(self) -> bool: True if engine and session factory are initialized. """ return self._engine is not None and self._session_factory is not None + + +database_manager = DatabaseManager() diff --git a/app/database/models.py b/app/database/models.py index a78d785..a7422d2 100644 --- a/app/database/models.py +++ b/app/database/models.py @@ -29,19 +29,9 @@ class PriceTick(Base): __tablename__ = "price_ticks" - id: Mapped[int] = mapped_column( - BigInteger, - primary_key=True, - ) - ticker: Mapped[str] = mapped_column( - String(20), - nullable=False, - index=True, - ) - price: Mapped[float] = mapped_column( - Float, - nullable=False, - ) + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + ticker: Mapped[str] = mapped_column(String(20), nullable=False, index=True) + price: Mapped[float] = mapped_column(Float, nullable=False) timestamp: Mapped[int] = mapped_column( BigInteger, nullable=False, diff --git a/app/database/repository.py b/app/database/repository.py index 953fa2b..491d5d1 100644 --- a/app/database/repository.py +++ b/app/database/repository.py @@ -213,32 +213,3 @@ async def get_prices_by_date_range( ticks: Sequence[PriceTick] = result.scalars().all() return ticks - - async def delete_old_records( - self, - older_than_days: int = 30, - ) -> int: - """ - Delete old price records to maintain database size. - - Args: - older_than_days: Delete records older than this many days. - - Returns: - Number of deleted records. - """ - from datetime import timedelta - - cutoff_timestamp = int( - (datetime.utcnow() - timedelta(days=older_than_days)).timestamp(), - ) - - query = select(PriceTick).where(PriceTick.timestamp < cutoff_timestamp) - - result = await self.session.execute(query) - records_to_delete = result.scalars().all() - - for record in records_to_delete: - await self.session.delete(record) - - return len(records_to_delete) diff --git a/app/main.py b/app/main.py index a6aa8a0..a113589 100644 --- a/app/main.py +++ b/app/main.py @@ -3,15 +3,8 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -from . import ( - description, - title, - version, -) -from .core import ( - get_logger, - settings, -) +from . import description, title, version +from .core import get_logger, settings logger = get_logger(__name__) @@ -19,9 +12,7 @@ @asynccontextmanager async def lifespan(app: FastAPI): logger.info("Starting Deribit Price Tracker API...") - yield - logger.info("Shutting down Deribit Price Tracker API...") @@ -36,10 +27,7 @@ async def lifespan(app: FastAPI): CORSMiddleware, allow_origins=settings.cors.origins, allow_credentials=True, - allow_methods=[ - "GET", - "OPTIONS", - ], + allow_methods=["GET", "OPTIONS"], allow_headers=["*"], ) From e0df0db7206a818a3587dc33ef6f68735e89c900 Mon Sep 17 00:00:00 2001 From: script-logic Date: Mon, 26 Jan 2026 12:52:06 +0300 Subject: [PATCH 2/6] feat: celery redis docker derebit impl --- .dockerignore | 2 + .env.example | 15 +- .gitignore | 2 + .secrets.baseline | 12 +- Dockerfile | 22 ++ alembic.ini | 4 +- ..._19cfef6b2cba_create_price_ticks_table.py} | 13 +- app/__init__.py | 6 +- app/api/__init__.py | 0 app/api/v1/__init__.py | 0 app/api/v1/endpoints/__init__.py | 0 app/api/v1/endpoints/prices.py | 0 app/api/v1/schemas.py | 0 app/clients/__init__.py | 7 + app/clients/deribit.py | 269 ++++++++++++++++++ app/core/celery.py | 90 ++++++ app/core/config.py | 42 +-- app/services/__init__.py | 0 app/services/price_service.py | 0 app/tasks/__init__.py | 7 + app/tasks/celery_app.py | 10 + app/tasks/price_collection.py | 200 +++++++++++++ docker-compose.yml | 95 +++++++ pyproject.toml | 3 +- 24 files changed, 756 insertions(+), 43 deletions(-) create mode 100644 Dockerfile rename alembic/versions/{b1494e573776_create_price_ticks_table.py => 2026/01/25_2149_52_19cfef6b2cba_create_price_ticks_table.py} (86%) create mode 100644 app/api/__init__.py create mode 100644 app/api/v1/__init__.py create mode 100644 app/api/v1/endpoints/__init__.py create mode 100644 app/api/v1/endpoints/prices.py create mode 100644 app/api/v1/schemas.py create mode 100644 app/clients/__init__.py create mode 100644 app/clients/deribit.py create mode 100644 app/core/celery.py create mode 100644 app/services/__init__.py create mode 100644 app/services/price_service.py create mode 100644 app/tasks/__init__.py create mode 100644 app/tasks/celery_app.py create mode 100644 app/tasks/price_collection.py create mode 100644 docker-compose.yml diff --git a/.dockerignore b/.dockerignore index bdd963f..b28b96b 100644 --- a/.dockerignore +++ b/.dockerignore @@ -15,6 +15,8 @@ tree.py # pytest .pytest_cache/ .cache/ +start_dev.ps1 +scripts # coverage .coverage diff --git a/.env.example b/.env.example index 6047698..ef3c8dc 100644 --- a/.env.example +++ b/.env.example @@ -1,25 +1,28 @@ -# Database Configuration DATABASE__HOST=localhost DATABASE__PORT=5432 DATABASE__USER=postgres DATABASE__PASSWORD=your_secure_password DATABASE__DB=deribit_tracker -# Deribit API Configuration DERIBIT_API__CLIENT_ID=your_client_id DERIBIT_API__CLIENT_SECRET=your_client_secret DERIBIT_API__BASE_URL=https://www.deribit.com/api/v2 -# Redis Configuration -REDIS__HOST=localhost +REDIS__HOST=redis REDIS__PORT=6379 REDIS__DB=0 +REDIS__PASSWORD=your_secure_password_or_empty_for_local_dev +REDIS__SSL=False + +CELERY__WORKER_CONCURRENCY=2 +CELERY__BEAT_ENABLED=True +CELERY__TASK_TRACK_STARTED=True -# Application Configuration APPLICATION__DEBUG=False APPLICATION__API_V1_PREFIX=/api/v1 APPLICATION__PROJECT_NAME=Deribit Price Tracker API APPLICATION__VERSION=0.3.0 -# CORS Configuration CORS__ORIGINS=["http://localhost:8000","http://127.0.0.1:8000"] + +APP_PORT=8000 diff --git a/.gitignore b/.gitignore index 8ab8128..3466df4 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,8 @@ credentials/ .cache/ scan_project.py project_scan_output.txt +start_dev.ps1 +scripts # coverage .coverage diff --git a/.secrets.baseline b/.secrets.baseline index 30e3a31..c563730 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -90,10 +90,6 @@ { "path": "detect_secrets.filters.allowlist.is_line_allowlisted" }, - { - "path": "detect_secrets.filters.common.is_baseline_file", - "filename": ".secrets.baseline" - }, { "path": "detect_secrets.filters.common.is_ignored_due_to_verification_policies", "min_level": 2 @@ -152,11 +148,11 @@ "line_number": 89 } ], - "alembic\\versions\\b1494e573776_create_price_ticks_table.py": [ + "alembic\\versions\\2026\\01\\25_2149_52_19cfef6b2cba_create_price_ticks_table.py": [ { "type": "Hex High Entropy String", - "filename": "alembic\\versions\\b1494e573776_create_price_ticks_table.py", - "hashed_secret": "365aef3fd6fec3397a72177f4930f4bb20510b88", + "filename": "alembic\\versions\\2026\\01\\25_2149_52_19cfef6b2cba_create_price_ticks_table.py", + "hashed_secret": "89c7743c6aa460fa307ce918c3a8cb8a15935865", "is_verified": false, "line_number": 16 } @@ -229,5 +225,5 @@ } ] }, - "generated_at": "2026-01-25T15:04:21Z" + "generated_at": "2026-01-26T09:51:54Z" } diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..95721d2 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,22 @@ +FROM python:3.11-slim + +WORKDIR /app + +RUN apt-get update && apt-get install -y \ + gcc \ + postgresql-client \ + && rm -rf /var/lib/apt/lists/* + +RUN pip install poetry==2.3.1 + +COPY pyproject.toml poetry.lock README.md ./ + +RUN poetry config virtualenvs.create false \ + && poetry install --only main --no-interaction --no-ansi --no-root + +COPY . . + +RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app +USER appuser + +CMD ["poetry", "run", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/alembic.ini b/alembic.ini index d54d871..f9fdede 100644 --- a/alembic.ini +++ b/alembic.ini @@ -13,7 +13,7 @@ script_location = %(here)s/alembic # for all available tokens # file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s # Or organize into date-based subdirectories (requires recursive_version_locations = true) -# file_template = %%(year)d/%%(month).2d/%%(day).2d_%%(hour).2d%%(minute).2d_%%(second).2d_%%(rev)s_%%(slug)s +file_template = %%(year)d/%%(month).2d/%%(day).2d_%%(hour).2d%%(minute).2d_%%(second).2d_%%(rev)s_%%(slug)s # sys.path path, will be prepended to sys.path if present. # defaults to the current working directory. for multiple paths, the path separator @@ -77,7 +77,7 @@ path_separator = os # set to 'true' to search source files recursively # in each "version_locations" directory # new in Alembic version 1.10 -# recursive_version_locations = false +recursive_version_locations = true # the output encoding used when revision files # are written from script.py.mako diff --git a/alembic/versions/b1494e573776_create_price_ticks_table.py b/alembic/versions/2026/01/25_2149_52_19cfef6b2cba_create_price_ticks_table.py similarity index 86% rename from alembic/versions/b1494e573776_create_price_ticks_table.py rename to alembic/versions/2026/01/25_2149_52_19cfef6b2cba_create_price_ticks_table.py index ac693e2..a5ded01 100644 --- a/alembic/versions/b1494e573776_create_price_ticks_table.py +++ b/alembic/versions/2026/01/25_2149_52_19cfef6b2cba_create_price_ticks_table.py @@ -1,8 +1,8 @@ -"""Create price_ticks table +"""create_price_ticks_table -Revision ID: b1494e573776 +Revision ID: 19cfef6b2cba Revises: -Create Date: 2026-01-24 21:27:51.504960 +Create Date: 2026-01-25 21:49:52.138420 """ @@ -13,7 +13,7 @@ from alembic import op # type: ignore # revision identifiers, used by Alembic. -revision: str = "b1494e573776" +revision: str = "19cfef6b2cba" down_revision: str | Sequence[str] | None = None branch_labels: str | Sequence[str] | None = None depends_on: str | Sequence[str] | None = None @@ -37,10 +37,7 @@ def upgrade() -> None: sa.PrimaryKeyConstraint("id"), ) op.create_index( - op.f("ix_price_ticks_ticker"), - "price_ticks", - ["ticker"], - unique=False, + op.f("ix_price_ticks_ticker"), "price_ticks", ["ticker"], unique=False ) op.create_index( op.f("ix_price_ticks_timestamp"), diff --git a/app/__init__.py b/app/__init__.py index 5b42b92..544007b 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -7,7 +7,7 @@ from functools import lru_cache from importlib.metadata import metadata -from . import core, database +from . import api, clients, core, database, services, tasks @lru_cache @@ -40,9 +40,13 @@ def get_app_metadata() -> tuple[str, str, str]: __all__ = [ + "api", + "clients", "core", "database", "description", + "services", + "tasks", "title", "version", ] diff --git a/app/api/__init__.py b/app/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/api/v1/__init__.py b/app/api/v1/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/api/v1/endpoints/__init__.py b/app/api/v1/endpoints/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/api/v1/endpoints/prices.py b/app/api/v1/endpoints/prices.py new file mode 100644 index 0000000..e69de29 diff --git a/app/api/v1/schemas.py b/app/api/v1/schemas.py new file mode 100644 index 0000000..e69de29 diff --git a/app/clients/__init__.py b/app/clients/__init__.py new file mode 100644 index 0000000..29b71ef --- /dev/null +++ b/app/clients/__init__.py @@ -0,0 +1,7 @@ +""" +Deribit API clients package. +""" + +from .deribit import DeribitClient + +__all__ = ["DeribitClient"] diff --git a/app/clients/deribit.py b/app/clients/deribit.py new file mode 100644 index 0000000..17d8d1c --- /dev/null +++ b/app/clients/deribit.py @@ -0,0 +1,269 @@ +""" +Deribit API client for cryptocurrency price data. + +Provides async methods to fetch index prices for BTC and ETH +using Deribit's public API endpoints. +""" + +import asyncio +from typing import Any + +import aiohttp +from aiohttp import ClientError, ClientResponseError, ClientTimeout + +from app.core import get_logger, settings + +logger = get_logger(__name__) + + +class DeribitAPIError(Exception): + """Base exception for Deribit API errors.""" + + def __init__(self, message: str, status_code: int | None = None): + self.message = message + self.status_code = status_code + super().__init__(message) + + +class DeribitClient: + """ + Async client for Deribit API with connection pooling and error handling. + + Uses aiohttp for efficient async HTTP requests with configurable + timeouts and retry logic for public endpoints. + """ + + _session: aiohttp.ClientSession | None = None + _timeout = ClientTimeout(total=30, connect=10) + + def __init__(self, base_url: str | None = None) -> None: + """ + Initialize Deribit client. + + Args: + base_url: Deribit API base URL. Defaults to settings. + """ + self.base_url = base_url or settings.deribit_api.base_url + self._headers = {"Content-Type": "application/json"} + + @classmethod + async def get_session(cls) -> aiohttp.ClientSession: + """ + Get or create shared aiohttp session with connection pooling. + + Returns: + Reusable aiohttp ClientSession instance. + """ + if cls._session is None or cls._session.closed: + connector = aiohttp.TCPConnector( + limit=10, + limit_per_host=2, + ttl_dns_cache=300, + ) + cls._session = aiohttp.ClientSession( + connector=connector, + timeout=cls._timeout, + headers={"Content-Type": "application/json"}, + ) + return cls._session + + @classmethod + async def close_session(cls) -> None: + """Close shared aiohttp session.""" + if cls._session and not cls._session.closed: + await cls._session.close() + cls._session = None + + async def _make_request( + self, + method: str, + endpoint: str, + params: dict[str, Any] | None = None, + max_retries: int = 3, + ) -> dict[str, Any]: + """ + Make HTTP request to Deribit API with retry logic. + + Args: + method: HTTP method (GET, POST, etc.) + endpoint: API endpoint path + params: Query parameters + max_retries: Maximum number of retry attempts + + Returns: + JSON response data + + Raises: + DeribitAPIError: If request fails after all retries + """ + url = f"{self.base_url}/{endpoint.lstrip('/')}" + session = await self.get_session() + + for attempt in range(max_retries): + try: + async with session.request( + method=method, + url=url, + params=params, + headers=self._headers, + ) as response: + response.raise_for_status() + data = await response.json() + + if not isinstance(data, dict): + raise DeribitAPIError( + f"Invalid response format: {type(data)}", + ) + + if data.get("error"): + error_msg = data["error"].get( + "message", + "Unknown error", + ) + raise DeribitAPIError( + f"API error: {error_msg}", + status_code=response.status, + ) + + return data + + except ClientResponseError as error: + if error.status >= 500 and attempt < max_retries - 1: + wait_time = 2**attempt + logger.warning( + "Server error %s, retrying in %s seconds...", + error.status, + wait_time, + ) + await asyncio.sleep(wait_time) + continue + + raise DeribitAPIError( + f"HTTP error {error.status}: {error.message}", + status_code=error.status, + ) from error + + except ClientError as error: + if attempt < max_retries - 1: + wait_time = 2**attempt + logger.warning( + "Connection error: %s, retrying in %s seconds...", + str(error), + wait_time, + ) + await asyncio.sleep(wait_time) + continue + + raise DeribitAPIError( + f"Network error: {error!s}", + ) from error + + except TimeoutError as error: + if attempt < max_retries - 1: + wait_time = 2**attempt + logger.warning( + "Timeout, retrying in %s seconds...", + wait_time, + ) + await asyncio.sleep(wait_time) + continue + + raise DeribitAPIError("Request timeout") from error + + raise DeribitAPIError("Max retries exceeded") + + async def get_index_price(self, currency: str) -> float: + """ + Get current index price for specified currency. + + Args: + currency: Currency pair (e.g., 'btc_usd', 'eth_usd') + + Returns: + Current index price as float + + Raises: + DeribitAPIError: If price cannot be retrieved + ValueError: If currency format is invalid + """ + if currency.lower() not in {"btc_usd", "eth_usd"}: + raise ValueError( + f"Unsupported currency: {currency}. " + "Supported: 'btc_usd', 'eth_usd'", + ) + + try: + response = await self._make_request( + method="GET", + endpoint="/public/get_index_price", + params={"index_name": currency}, + ) + + if "result" not in response: + raise DeribitAPIError( + "Invalid response format: missing 'result'", + ) + + result = response["result"] + + if "index_price" not in result: + raise DeribitAPIError( + "Invalid response format: missing 'index_price'", + ) + + price = float(result["index_price"]) + logger.debug("Retrieved %s price: %s", currency, price) + return price + + except (KeyError, ValueError, TypeError) as error: + raise DeribitAPIError( + f"Failed to parse price data: {error!s}", + ) from error + + async def get_btc_price(self) -> float: + """Get current BTC index price.""" + return await self.get_index_price("btc_usd") + + async def get_eth_price(self) -> float: + """Get current ETH index price.""" + return await self.get_index_price("eth_usd") + + async def get_all_prices(self) -> dict[str, float]: + """ + Get current prices for all supported currencies. + + Returns: + Dictionary with ticker-price pairs + """ + currencies = ["btc_usd", "eth_usd"] + tasks = [self.get_index_price(currency) for currency in currencies] + + try: + prices = await asyncio.gather(*tasks, return_exceptions=True) + + result = {} + for currency, price in zip(currencies, prices, strict=False): + if isinstance(price, BaseException): + logger.error("Failed to get %s price: %s", currency, price) + continue + + result[currency] = price + + return result + + except Exception as error: + logger.error("Failed to get prices: %s", error) + raise + + async def health_check(self) -> bool: + """ + Check if Deribit API is accessible. + + Returns: + True if API is responsive + """ + try: + await self._make_request("GET", "/public/test", max_retries=1) + return True + except DeribitAPIError: + return False diff --git a/app/core/celery.py b/app/core/celery.py new file mode 100644 index 0000000..79e747e --- /dev/null +++ b/app/core/celery.py @@ -0,0 +1,90 @@ +""" +Celery configuration for background task processing. + +Configures Celery with Redis broker and result backend, +with proper connection pooling and error handling. +""" + +from celery import Celery +from celery.schedules import crontab + +from . import get_logger, settings + +logger = get_logger(__name__) + + +def create_celery_app() -> Celery: + """ + Create and configure Celery application. + + Returns: + Configured Celery instance with Redis broker. + """ + celery_app = Celery( + "deribit_tracker", + broker=settings.redis.url, + backend=settings.redis.url, + include=["app.tasks.price_collection"], + ) + + worker_concurrency = getattr( + settings, + "celery_worker_concurrency", + 2 if settings.application.debug else 4, + ) + + beat_enabled = getattr( + settings, + "celery_beat_enabled", + True, + ) + + task_track_started = getattr( + settings, + "celery_task_track_started", + True, + ) + + celery_app.conf.update( + task_serializer="json", + accept_content=["json"], + result_serializer="json", + timezone="UTC", + enable_utc=True, + broker_connection_retry_on_startup=True, + broker_connection_max_retries=5, + task_default_queue="default", + task_routes={ + "app.tasks.price_collection.*": {"queue": "price_collection"}, + }, + worker_prefetch_multiplier=1, + task_acks_late=True, + worker_max_tasks_per_child=1000, + worker_concurrency=worker_concurrency, + task_track_started=task_track_started, + ) + + if beat_enabled: + celery_app.conf.beat_schedule = { + "collect-prices-every-minute": { + "task": "app.tasks.price_collection.collect_all_prices", + "schedule": crontab(minute="*/1"), + "options": {"queue": "price_collection"}, + }, + } + + logger.info( + "Celery app configured with Redis: %s:%s", + settings.redis.host, + settings.redis.port, + ) + logger.info( + "Celery settings: concurrency=%s, beat_enabled=%s", + worker_concurrency, + beat_enabled, + ) + + return celery_app + + +celery_app = create_celery_app() diff --git a/app/core/config.py b/app/core/config.py index 0a8262c..0de9ea4 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -91,30 +91,38 @@ def is_configured(self) -> bool: class RedisSettings(BaseModel): - """ - Redis configuration for Celery task queue. - - Attributes: - host: Redis server hostname. - port: Redis server port (1-65535). - db: Redis database number (0-15). - """ - host: str = "localhost" port: int = Field(default=6379, ge=1, le=65535) db: int = Field(default=0, ge=0, le=15) - - model_config = {"frozen": True} + password: SecretStr | None = None + ssl: bool = False @property def url(self) -> str: - """ - Redis connection URL. + """Redis connection URL with optional authentication.""" + auth = "" + if self.password: + auth = f":{self.password.get_secret_value()}@" - Returns: - Redis connection URL in format: redis://host:port/db - """ - return f"redis://{self.host}:{self.port}/{self.db}" + protocol = "rediss" if self.ssl else "redis" + return f"{protocol}://{auth}{self.host}:{self.port}/{self.db}" + + +class CelerySettings(BaseModel): + """ + Celery task queue configuration. + + Attributes: + worker_concurrency: Number of concurrent worker processes. + beat_enabled: Enable periodic task scheduling. + task_track_started: Track when task starts execution. + """ + + worker_concurrency: int = Field(default=2, ge=1, le=10) + beat_enabled: bool = True + task_track_started: bool = True + + model_config = {"frozen": True} class ApplicationSettings(BaseModel): diff --git a/app/services/__init__.py b/app/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/services/price_service.py b/app/services/price_service.py new file mode 100644 index 0000000..e69de29 diff --git a/app/tasks/__init__.py b/app/tasks/__init__.py new file mode 100644 index 0000000..5c37548 --- /dev/null +++ b/app/tasks/__init__.py @@ -0,0 +1,7 @@ +""" +Background tasks package for Celery. +""" + +from app.core.celery import celery_app + +__all__ = ["celery_app"] diff --git a/app/tasks/celery_app.py b/app/tasks/celery_app.py new file mode 100644 index 0000000..f4d4984 --- /dev/null +++ b/app/tasks/celery_app.py @@ -0,0 +1,10 @@ +""" +Celery application entry point. + +This module exports the Celery app instance for use with +celery command line interface. +""" + +from app.core.celery import celery_app + +__all__ = ["celery_app"] diff --git a/app/tasks/price_collection.py b/app/tasks/price_collection.py new file mode 100644 index 0000000..ec85919 --- /dev/null +++ b/app/tasks/price_collection.py @@ -0,0 +1,200 @@ +""" +Celery tasks for periodic price collection from Deribit. + +Tasks run in background to fetch cryptocurrency prices +and store them in PostgreSQL database. +""" + +import asyncio +from collections.abc import AsyncGenerator +from contextlib import asynccontextmanager +from datetime import UTC, datetime +from typing import Any + +from sqlalchemy.ext.asyncio import AsyncSession + +from app.clients import DeribitClient +from app.core import get_logger +from app.database.manager import database_manager +from app.database.repository import PriceRepository + +from . import celery_app + +logger = get_logger(__name__) + + +@asynccontextmanager +async def get_db_context() -> AsyncGenerator[AsyncSession, None]: + """ + Context manager for database session in async tasks. + + Yields: + AsyncSession: Database session for repository operations. + """ + async with database_manager.get_session() as session: + yield session + + +async def _collect_price_for_ticker(ticker: str) -> dict[str, Any] | None: + """ + Collect price for single ticker and store in database. + + Args: + ticker: Cryptocurrency ticker symbol. + + Returns: + Dictionary with collection result or None if failed. + """ + deribit_client = DeribitClient() + timestamp = int(datetime.now(UTC).timestamp()) + + try: + price = await deribit_client.get_index_price(ticker) + + async with get_db_context() as session: + repository = PriceRepository(session) + price_tick = await repository.create(ticker, price, timestamp) + await session.commit() + + logger.info( + "Collected %s price: %s at %s", + ticker, + price, + timestamp, + ) + + return { + "ticker": ticker, + "price": price, + "timestamp": timestamp, + "record_id": price_tick.id, + "success": True, + } + + except Exception as error: + logger.error("Failed to collect %s price: %s", ticker, str(error)) + return { + "ticker": ticker, + "error": str(error), + "timestamp": timestamp, + "success": False, + } + + +@celery_app.task( + bind=True, + name="app.tasks.price_collection.collect_all_prices", + max_retries=3, + default_retry_delay=30, + acks_late=True, +) +def collect_all_prices(self) -> dict[str, Any]: + """ + Celery task to collect prices for all supported tickers. + """ + logger.info("Starting price collection task") + + return asyncio.run(_collect_all_prices_async(self)) + + +async def _collect_all_prices_async(self) -> dict[str, Any]: + """Async implementation of price collection.""" + tickers = ["btc_usd", "eth_usd"] + tasks = [_collect_price_for_ticker(ticker) for ticker in tickers] + results = await asyncio.gather(*tasks) + + successful = [r for r in results if r and r.get("success")] + failed = [r for r in results if r and not r.get("success")] + + if failed and self.request.retries < self.max_retries: + logger.warning( + "Some collections failed, retrying (%s/%s)", + self.request.retries + 1, + self.max_retries, + ) + raise self.retry(countdown=30) + + return { + "successful": len(successful), + "failed": len(failed), + "results": results, + "timestamp": int(datetime.now(UTC).timestamp()), + } + + +@celery_app.task( + name="app.tasks.price_collection.collect_single_price", + max_retries=2, +) +def collect_single_price(ticker: str) -> dict[str, Any] | None: + """ + Celery task to collect price for single ticker. + + Args: + ticker: Cryptocurrency ticker symbol. + + Returns: + Collection result or None if failed. + """ + logger.info("Collecting single price for %s", ticker) + + if ticker not in ["btc_usd", "eth_usd"]: + logger.error("Unsupported ticker: %s", ticker) + return None + + loop = asyncio.new_event_loop() # TODO + asyncio.set_event_loop(loop) + + try: + result = loop.run_until_complete(_collect_price_for_ticker(ticker)) + return result + + except Exception as error: + logger.error("Failed to collect %s price: %s", ticker, str(error)) + return None + + finally: + loop.close() + + +@celery_app.task(name="app.tasks.price_collection.health_check") +def health_check() -> dict[str, Any]: + """ + Health check task for price collection system. + + Returns: + Dictionary with system health status. + """ + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + deribit_client = DeribitClient() + api_healthy = loop.run_until_complete(deribit_client.health_check()) + + db_healthy = database_manager.is_initialized() + + return { + "status": "healthy" if api_healthy and db_healthy else "unhealthy", + "deribit_api": "available" if api_healthy else "unavailable", + "database": "initialized" if db_healthy else "not_initialized", + "timestamp": int(datetime.now(UTC).timestamp()), + } + + except Exception as error: + logger.error("Health check failed: %s", str(error)) + return { + "status": "unhealthy", + "error": str(error), + "timestamp": int(datetime.now(UTC).timestamp()), + } + + finally: + loop.close() + + +@celery_app.task(name="app.tasks.price_collection.test_task") +def test_task() -> str: + """Simple test task to verify Celery is working.""" + logger.info("Test task executed!") + return "Celery is working!" diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..6908186 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,95 @@ +services: + postgres: + image: postgres:15-alpine + environment: + POSTGRES_DB: ${DATABASE__DB:-deribit_tracker} + POSTGRES_USER: ${DATABASE__USER:-postgres} + POSTGRES_PASSWORD: ${DATABASE__PASSWORD:-your_secure_password} + ports: + - "${DATABASE__PORT:-5432}:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U ${DATABASE__USER:-postgres}"] + interval: 10s + timeout: 5s + retries: 5 + env_file: + - .env + + redis: + image: redis:7-alpine + ports: + - "${REDIS__PORT:-6379}:6379" + command: redis-server --appendonly yes + volumes: + - redis_data:/data + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 5 + env_file: + - .env + + app: + build: . + ports: + - "${APP_PORT:-8000}:8000" + environment: + DATABASE__HOST: postgres + DATABASE__PORT: 5432 + DATABASE__USER: ${DATABASE__USER:-postgres} + DATABASE__PASSWORD: ${DATABASE__PASSWORD:-your_secure_password} + DATABASE__DB: ${DATABASE__DB:-deribit_tracker} + REDIS__HOST: redis + REDIS__PORT: 6379 + APPLICATION__DEBUG: ${APPLICATION__DEBUG:-false} + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy + volumes: + - .:/app + command: > + sh -c " + echo 'Applying migrations...' && + poetry run alembic upgrade head && + poetry run uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload + " + env_file: + - .env + + celery_worker: + build: . + environment: + DATABASE__HOST: postgres + DATABASE__PORT: 5432 + DATABASE__USER: ${DATABASE__USER:-postgres} + DATABASE__PASSWORD: ${DATABASE__PASSWORD:-your_secure_password} + DATABASE__DB: ${DATABASE__DB:-deribit_tracker} + REDIS__HOST: redis + REDIS__PORT: 6379 + CELERY__WORKER_CONCURRENCY: ${CELERY__WORKER_CONCURRENCY:-2} + CELERY__BEAT_ENABLED: ${CELERY__BEAT_ENABLED:-false} + CELERY__TASK_TRACK_STARTED: ${CELERY__TASK_TRACK_STARTED:-true} + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy + volumes: + - .:/app + command: > + sh -c "if [ '$$CELERY__BEAT_ENABLED' = 'true' ]; then + poetry run celery -A app.tasks.celery_app worker --beat --loglevel=info --pool=threads --concurrency=$$CELERY__WORKER_CONCURRENCY --queues=price_collection,default; + else + poetry run celery -A app.tasks.celery_app worker --loglevel=info --pool=threads --concurrency=$$CELERY__WORKER_CONCURRENCY --queues=price_collection,default; + fi" + env_file: + - .env + +volumes: + postgres_data: + redis_data: diff --git a/pyproject.toml b/pyproject.toml index fcbd696..b07109b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -55,9 +55,10 @@ known-first-party = ["app"] [tool.mypy] python_version = "3.11" warn_return_any = true +disable_error_code = ["import-untyped"] [[tool.mypy.overrides]] -module = "alembic.versions.*" +module = "alembic" ignore_errors = true [tool.pytest.ini_options] From 1a3efabfb5a7078cb9db2641379e93dce6c0f0a0 Mon Sep 17 00:00:00 2001 From: script-logic Date: Mon, 26 Jan 2026 18:32:59 +0300 Subject: [PATCH 3/6] feat: celery redis docker derebit impl 2 --- .dockerignore | 1 + .env.example | 2 +- .github/workflows/ci.yml | 4 +-- .gitignore | 1 + .gitlab-ci.yml | 4 +-- .secrets.baseline | 18 +++++----- app/core/celery.py | 3 +- app/database/manager.py | 6 +++- app/tasks/__init__.py | 31 ++++++++++++++++- app/tasks/price_collection.py | 64 +++++++++++++++++++---------------- docker-compose.yml | 39 +++++++++++++++++---- tests/test_config.py | 31 ++++++++--------- 12 files changed, 134 insertions(+), 70 deletions(-) diff --git a/.dockerignore b/.dockerignore index b28b96b..b394fed 100644 --- a/.dockerignore +++ b/.dockerignore @@ -11,6 +11,7 @@ __pycache__/ .pytest_cache/ .vscode/ tree.py +celerybeat-schedule # pytest .pytest_cache/ diff --git a/.env.example b/.env.example index ef3c8dc..369dc5c 100644 --- a/.env.example +++ b/.env.example @@ -1,5 +1,5 @@ DATABASE__HOST=localhost -DATABASE__PORT=5432 +DATABASE__PORT=5433 DATABASE__USER=postgres DATABASE__PASSWORD=your_secure_password DATABASE__DB=deribit_tracker diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7904d0a..0510118 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,7 +32,7 @@ jobs: cat > .env << 'EOF' # Database Configuration DATABASE__HOST=localhost - DATABASE__PORT=5432 + DATABASE__PORT=5433 DATABASE__USER=test_user DATABASE__PASSWORD=test_password DATABASE__DB=test_db @@ -103,7 +103,7 @@ jobs: run: | cat > .env << 'EOF' DATABASE__HOST=localhost - DATABASE__PORT=5432 + DATABASE__PORT=5433 DATABASE__USER=test_user DATABASE__PASSWORD=test_password DATABASE__DB=test_db diff --git a/.gitignore b/.gitignore index 3466df4..3175acc 100644 --- a/.gitignore +++ b/.gitignore @@ -53,6 +53,7 @@ wheels/ .installed.cfg *.egg MANIFEST +celerybeat-schedule # Virtual Environment .venv/ diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 0d5b6ab..9d289be 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -22,7 +22,7 @@ before_script: - | @' DATABASE__HOST=localhost - DATABASE__PORT=5432 + DATABASE__PORT=5433 DATABASE__USER=test_user DATABASE__PASSWORD=test_password DATABASE__DB=test_db @@ -69,7 +69,7 @@ security: - | @' DATABASE__HOST=localhost - DATABASE__PORT=5432 + DATABASE__PORT=5433 DATABASE__USER=test_user DATABASE__PASSWORD=test_password DATABASE__DB=test_db diff --git a/.secrets.baseline b/.secrets.baseline index c563730..2244f9b 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -172,58 +172,58 @@ "filename": "tests\\test_config.py", "hashed_secret": "e5e9fa1ba31ecd1ae84f75caaa474f3a663f05f4", "is_verified": false, - "line_number": 33 + "line_number": 32 }, { "type": "Secret Keyword", "filename": "tests\\test_config.py", "hashed_secret": "c94d65f02a652d11c2e5c2e1ccf38dce5a076e1e", "is_verified": false, - "line_number": 74 + "line_number": 73 }, { "type": "Basic Auth Credentials", "filename": "tests\\test_config.py", "hashed_secret": "c94d65f02a652d11c2e5c2e1ccf38dce5a076e1e", "is_verified": false, - "line_number": 79 + "line_number": 78 }, { "type": "Secret Keyword", "filename": "tests\\test_config.py", "hashed_secret": "1adfce9fa4bc6b1cbdf95ac2dc6180175da7558b", "is_verified": false, - "line_number": 90 + "line_number": 89 }, { "type": "Secret Keyword", "filename": "tests\\test_config.py", "hashed_secret": "72cb70dbbafe97e5ea13ad88acd65d08389439b0", "is_verified": false, - "line_number": 122 + "line_number": 121 }, { "type": "Secret Keyword", "filename": "tests\\test_config.py", "hashed_secret": "ee27c133da056b1013f88c712f92460bc7b3c90a", "is_verified": false, - "line_number": 130 + "line_number": 129 }, { "type": "Secret Keyword", "filename": "tests\\test_config.py", "hashed_secret": "a94a8fe5ccb19ba61c4c0873d391e987982fbbd3", "is_verified": false, - "line_number": 241 + "line_number": 240 }, { "type": "Secret Keyword", "filename": "tests\\test_config.py", "hashed_secret": "fca268ae2442d5cabc3e12d87b349adf8bf7d76c", "is_verified": false, - "line_number": 373 + "line_number": 372 } ] }, - "generated_at": "2026-01-26T09:51:54Z" + "generated_at": "2026-01-26T15:32:39Z" } diff --git a/app/core/celery.py b/app/core/celery.py index 79e747e..6ae6818 100644 --- a/app/core/celery.py +++ b/app/core/celery.py @@ -6,7 +6,6 @@ """ from celery import Celery -from celery.schedules import crontab from . import get_logger, settings @@ -68,7 +67,7 @@ def create_celery_app() -> Celery: celery_app.conf.beat_schedule = { "collect-prices-every-minute": { "task": "app.tasks.price_collection.collect_all_prices", - "schedule": crontab(minute="*/1"), + "schedule": 60.0, "options": {"queue": "price_collection"}, }, } diff --git a/app/database/manager.py b/app/database/manager.py index 570349c..f7b5ab0 100644 --- a/app/database/manager.py +++ b/app/database/manager.py @@ -12,10 +12,12 @@ create_async_engine, ) -from app.core import settings +from app.core import get_logger, settings from .base import Base +logger = get_logger(__name__) + class DatabaseManager: """ @@ -113,3 +115,5 @@ def is_initialized(self) -> bool: database_manager = DatabaseManager() +database_manager._get_engine() +database_manager._get_session_factory() diff --git a/app/tasks/__init__.py b/app/tasks/__init__.py index 5c37548..290480b 100644 --- a/app/tasks/__init__.py +++ b/app/tasks/__init__.py @@ -1,7 +1,36 @@ """ -Background tasks package for Celery. +Celery tasks initialization. + +This module ensures database is initialized before any task runs. """ +from celery.signals import worker_ready + +from app.core import get_logger from app.core.celery import celery_app +from app.database.manager import database_manager + +logger = get_logger(__name__) + + +@worker_ready.connect +def initialize_database_on_worker_start(**kwargs): + """ + Initialize database connection when Celery worker starts. + + This ensures database_manager is ready before any tasks execute. + """ + logger.info("Initializing database for Celery worker...") + + try: + if not database_manager.is_initialized(): + database_manager._get_engine() + logger.info("Database initialized for Celery worker") + else: + logger.info("Database already initialized") + except Exception as e: + logger.error("Failed to initialize database for Celery worker: %s", e) + raise + __all__ = ["celery_app"] diff --git a/app/tasks/price_collection.py b/app/tasks/price_collection.py index ec85919..e259ce1 100644 --- a/app/tasks/price_collection.py +++ b/app/tasks/price_collection.py @@ -91,35 +91,48 @@ async def _collect_price_for_ticker(ticker: str) -> dict[str, Any] | None: def collect_all_prices(self) -> dict[str, Any]: """ Celery task to collect prices for all supported tickers. + + This task is scheduled to run every minute via Celery Beat. + + Returns: + Dictionary with collection results for all tickers. """ logger.info("Starting price collection task") - return asyncio.run(_collect_all_prices_async(self)) + tickers = ["btc_usd", "eth_usd"] + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + tasks = [_collect_price_for_ticker(ticker) for ticker in tickers] + results = loop.run_until_complete(asyncio.gather(*tasks)) -async def _collect_all_prices_async(self) -> dict[str, Any]: - """Async implementation of price collection.""" - tickers = ["btc_usd", "eth_usd"] - tasks = [_collect_price_for_ticker(ticker) for ticker in tickers] - results = await asyncio.gather(*tasks) + successful = [r for r in results if r and r.get("success")] + failed = [r for r in results if r and not r.get("success")] - successful = [r for r in results if r and r.get("success")] - failed = [r for r in results if r and not r.get("success")] + if failed and self.request.retries < self.max_retries: + logger.warning( + "Some collections failed, retrying (%s/%s)", + self.request.retries + 1, + self.max_retries, + ) + raise self.retry(countdown=30) - if failed and self.request.retries < self.max_retries: - logger.warning( - "Some collections failed, retrying (%s/%s)", - self.request.retries + 1, - self.max_retries, - ) - raise self.retry(countdown=30) + return { + "successful": len(successful), + "failed": len(failed), + "results": results, + "timestamp": int(datetime.now(UTC).timestamp()), + } - return { - "successful": len(successful), - "failed": len(failed), - "results": results, - "timestamp": int(datetime.now(UTC).timestamp()), - } + except Exception as error: + logger.error("Price collection task failed: %s", str(error)) + if self.request.retries < self.max_retries: + raise self.retry(exc=error) from error + raise + + finally: + loop.close() @celery_app.task( @@ -142,7 +155,7 @@ def collect_single_price(ticker: str) -> dict[str, Any] | None: logger.error("Unsupported ticker: %s", ticker) return None - loop = asyncio.new_event_loop() # TODO + loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: @@ -191,10 +204,3 @@ def health_check() -> dict[str, Any]: finally: loop.close() - - -@celery_app.task(name="app.tasks.price_collection.test_task") -def test_task() -> str: - """Simple test task to verify Celery is working.""" - logger.info("Test task executed!") - return "Celery is working!" diff --git a/docker-compose.yml b/docker-compose.yml index 6908186..89c4a20 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,7 +6,7 @@ services: POSTGRES_USER: ${DATABASE__USER:-postgres} POSTGRES_PASSWORD: ${DATABASE__PASSWORD:-your_secure_password} ports: - - "${DATABASE__PORT:-5432}:5432" + - "${DATABASE__PORT:-5433}:5432" volumes: - postgres_data:/var/lib/postgresql/data healthcheck: @@ -56,7 +56,7 @@ services: sh -c " echo 'Applying migrations...' && poetry run alembic upgrade head && - poetry run uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload + poetry run uvicorn app.main:app --host 0.0.0.0 --port 8000 " env_file: - .env @@ -82,11 +82,36 @@ services: volumes: - .:/app command: > - sh -c "if [ '$$CELERY__BEAT_ENABLED' = 'true' ]; then - poetry run celery -A app.tasks.celery_app worker --beat --loglevel=info --pool=threads --concurrency=$$CELERY__WORKER_CONCURRENCY --queues=price_collection,default; - else - poetry run celery -A app.tasks.celery_app worker --loglevel=info --pool=threads --concurrency=$$CELERY__WORKER_CONCURRENCY --queues=price_collection,default; - fi" + poetry run celery -A app.tasks.celery_app worker + --loglevel=info + --concurrency=2 + --queues=price_collection,celery,default + env_file: + - .env + + celery_beat: + build: . + environment: + DATABASE__HOST: postgres + DATABASE__PORT: 5432 + DATABASE__USER: ${DATABASE__USER:-postgres} + DATABASE__PASSWORD: ${DATABASE__PASSWORD:-your_secure_password} + DATABASE__DB: ${DATABASE__DB:-deribit_tracker} + REDIS__HOST: redis + REDIS__PORT: 6379 + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy + celery_worker: + condition: service_started + volumes: + - .:/app + command: > + poetry run celery -A app.tasks.celery_app beat + --loglevel=info + --scheduler=celery.beat:PersistentScheduler env_file: - .env diff --git a/tests/test_config.py b/tests/test_config.py index cc06921..f51b34c 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -19,7 +19,6 @@ RedisSettings, Settings, get_settings, - init_settings, ) @@ -45,9 +44,9 @@ def test_port_validation(self): user="test", password="secret", # type: ignore db="test_db", - port=5433, + port=5432, ) - assert settings.port == 5433 + assert settings.port == 5432 with pytest.raises(ValidationError): DatabaseSettings( @@ -69,18 +68,18 @@ def test_dsn_generation(self): """Test Data Source Name generation.""" settings = DatabaseSettings( host="db.example.com", - port=5433, + port=5432, user="test_user", password="test_pass", # type: ignore db="test_db", ) assert settings.dsn == ( - "postgresql://test_user:test_pass@db.example.com:5433/test_db" + "postgresql://test_user:test_pass@db.example.com:5432/test_db" ) assert settings.async_dsn == ( "postgresql+asyncpg://test_user" - ":test_pass@db.example.com:5433/test_db" + ":test_pass@db.example.com:5432/test_db" ) def test_password_security(self): @@ -233,7 +232,7 @@ def setup_method(self): def test_singleton_pattern(self): """Test that Settings is a proper singleton.""" - settings1 = Settings.get_instance( + settings1 = get_settings( database={ "host": "localhost", "port": 5432, @@ -261,14 +260,14 @@ def test_singleton_pattern(self): }, ) - settings2 = Settings.get_instance() + settings2 = get_settings() assert settings1 is settings2 assert id(settings1) == id(settings2) def test_multiple_instantiation_prevention(self): """Test that direct instantiation raises error.""" - Settings.get_instance( + get_settings( database={ "host": "localhost", "port": 5432, @@ -297,7 +296,7 @@ def test_multiple_instantiation_prevention(self): ) with pytest.raises(RuntimeError, match="singleton"): - Settings( + get_settings( database={ "host": "localhost", "port": 5432, @@ -334,7 +333,7 @@ def test_get_settings_before_init(self): def test_init_settings_function(self): """Test init_settings() convenience function.""" - settings = init_settings( + settings = get_settings( database={ "host": "testhost", "port": 5432, @@ -368,7 +367,7 @@ def test_init_settings_function(self): os.environ, { "DATABASE__HOST": "envhost", - "DATABASE__PORT": "5433", + "DATABASE__PORT": "5432", "DATABASE__USER": "envuser", "DATABASE__PASSWORD": "envpass", "DATABASE__DB": "envdb", @@ -378,10 +377,10 @@ def test_init_settings_function(self): ) def test_environment_variable_loading(self): """Test loading settings from environment variables.""" - settings = Settings.get_instance() + settings = get_settings() assert settings.database.host == "envhost" - assert settings.database.port == 5433 + assert settings.database.port == 5432 assert settings.database.user == "envuser" assert settings.database.db == "envdb" assert settings.application.debug is True @@ -389,7 +388,7 @@ def test_environment_variable_loading(self): def test_log_initialization(self, caplog: pytest.LogCaptureFixture): """Test logging during settings initialization.""" with caplog.at_level("INFO"): - Settings.get_instance( + get_settings( database={ "host": "localhost", "port": 5432, @@ -423,7 +422,7 @@ def test_log_initialization(self, caplog: pytest.LogCaptureFixture): def test_settings_immutability(): """Test that settings objects are immutable after creation.""" - settings = Settings.get_instance( + settings = get_settings( database={ "host": "localhost", "port": 5432, From 49c513bce4271812d022595acc1418f950ff370a Mon Sep 17 00:00:00 2001 From: script-logic Date: Mon, 26 Jan 2026 22:00:46 +0300 Subject: [PATCH 4/6] feat: celery working --- app/core/celery.py | 44 +++------ app/core/config.py | 2 + app/main.py | 35 +++++--- app/tasks/price_collection.py | 162 +++++++++++++++++++--------------- docker-compose.yml | 12 +-- poetry.lock | 42 ++++++--- pyproject.toml | 1 - 7 files changed, 163 insertions(+), 135 deletions(-) diff --git a/app/core/celery.py b/app/core/celery.py index 6ae6818..e409e49 100644 --- a/app/core/celery.py +++ b/app/core/celery.py @@ -26,24 +26,6 @@ def create_celery_app() -> Celery: include=["app.tasks.price_collection"], ) - worker_concurrency = getattr( - settings, - "celery_worker_concurrency", - 2 if settings.application.debug else 4, - ) - - beat_enabled = getattr( - settings, - "celery_beat_enabled", - True, - ) - - task_track_started = getattr( - settings, - "celery_task_track_started", - True, - ) - celery_app.conf.update( task_serializer="json", accept_content=["json"], @@ -56,14 +38,16 @@ def create_celery_app() -> Celery: task_routes={ "app.tasks.price_collection.*": {"queue": "price_collection"}, }, + worker_concurrency=settings.celery.worker_concurrency, worker_prefetch_multiplier=1, - task_acks_late=True, worker_max_tasks_per_child=1000, - worker_concurrency=worker_concurrency, - task_track_started=task_track_started, + task_acks_late=True, + task_track_started=settings.celery.task_track_started, + task_always_eager=False, + worker_cancel_long_running_tasks_on_connection_loss=True, ) - if beat_enabled: + if settings.celery.beat_enabled: celery_app.conf.beat_schedule = { "collect-prices-every-minute": { "task": "app.tasks.price_collection.collect_all_prices", @@ -72,18 +56,12 @@ def create_celery_app() -> Celery: }, } - logger.info( - "Celery app configured with Redis: %s:%s", - settings.redis.host, - settings.redis.port, - ) - logger.info( - "Celery settings: concurrency=%s, beat_enabled=%s", - worker_concurrency, - beat_enabled, - ) + logger.info("Celery app configured") return celery_app -celery_app = create_celery_app() +try: + celery_app = create_celery_app() +except Exception as e: + logger.info("Failed to create celery app", e) diff --git a/app/core/config.py b/app/core/config.py index 0de9ea4..bb8cc0c 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -116,6 +116,7 @@ class CelerySettings(BaseModel): worker_concurrency: Number of concurrent worker processes. beat_enabled: Enable periodic task scheduling. task_track_started: Track when task starts execution. + # TODO """ worker_concurrency: int = Field(default=2, ge=1, le=10) @@ -203,6 +204,7 @@ class Settings(BaseSettings): database: DatabaseSettings deribit_api: DeribitAPISettings redis: RedisSettings + celery: CelerySettings application: ApplicationSettings cors: CORSSettings diff --git a/app/main.py b/app/main.py index a113589..accc8a7 100644 --- a/app/main.py +++ b/app/main.py @@ -16,20 +16,27 @@ async def lifespan(app: FastAPI): logger.info("Shutting down Deribit Price Tracker API...") -app = FastAPI( - title=title, - version=version, - description=description, - lifespan=lifespan, -) - -app.add_middleware( - CORSMiddleware, - allow_origins=settings.cors.origins, - allow_credentials=True, - allow_methods=["GET", "OPTIONS"], - allow_headers=["*"], -) +def create_app() -> FastAPI: + app = FastAPI( + title=title, + version=version, + description=description, + lifespan=lifespan, + ) + app.add_middleware( + CORSMiddleware, + allow_origins=settings.cors.origins, + allow_credentials=True, + allow_methods=["GET", "OPTIONS"], + allow_headers=["*"], + ) + return app + + +try: + app: FastAPI = create_app() +except Exception as e: + logger.info("FastAPI app creation error", e) @app.get("/") diff --git a/app/tasks/price_collection.py b/app/tasks/price_collection.py index e259ce1..ce55a45 100644 --- a/app/tasks/price_collection.py +++ b/app/tasks/price_collection.py @@ -1,18 +1,14 @@ """ Celery tasks for periodic price collection from Deribit. -Tasks run in background to fetch cryptocurrency prices -and store them in PostgreSQL database. +Tasks run in background using asyncio pool for efficient +async execution. """ import asyncio -from collections.abc import AsyncGenerator -from contextlib import asynccontextmanager from datetime import UTC, datetime from typing import Any -from sqlalchemy.ext.asyncio import AsyncSession - from app.clients import DeribitClient from app.core import get_logger from app.database.manager import database_manager @@ -23,21 +19,18 @@ logger = get_logger(__name__) -@asynccontextmanager -async def get_db_context() -> AsyncGenerator[AsyncSession, None]: - """ - Context manager for database session in async tasks. - - Yields: - AsyncSession: Database session for repository operations. - """ - async with database_manager.get_session() as session: - yield session +def run_async(coroutine): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete(coroutine) + finally: + loop.close() -async def _collect_price_for_ticker(ticker: str) -> dict[str, Any] | None: +async def collect_price_for_ticker(ticker: str) -> dict[str, Any] | None: """ - Collect price for single ticker and store in database. + Async function to collect price for single ticker. Args: ticker: Cryptocurrency ticker symbol. @@ -45,13 +38,15 @@ async def _collect_price_for_ticker(ticker: str) -> dict[str, Any] | None: Returns: Dictionary with collection result or None if failed. """ + from datetime import UTC, datetime + deribit_client = DeribitClient() timestamp = int(datetime.now(UTC).timestamp()) try: price = await deribit_client.get_index_price(ticker) - async with get_db_context() as session: + async with database_manager.get_session() as session: repository = PriceRepository(session) price_tick = await repository.create(ticker, price, timestamp) await session.commit() @@ -87,8 +82,9 @@ async def _collect_price_for_ticker(ticker: str) -> dict[str, Any] | None: max_retries=3, default_retry_delay=30, acks_late=True, + ignore_result=False, ) -def collect_all_prices(self) -> dict[str, Any]: +def collect_all_prices(self): """ Celery task to collect prices for all supported tickers. @@ -97,51 +93,55 @@ def collect_all_prices(self) -> dict[str, Any]: Returns: Dictionary with collection results for all tickers. """ - logger.info("Starting price collection task") - - tickers = ["btc_usd", "eth_usd"] - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + logger.debug("Starting price collection task (sync wrapper)") try: - tasks = [_collect_price_for_ticker(ticker) for ticker in tickers] - results = loop.run_until_complete(asyncio.gather(*tasks)) - - successful = [r for r in results if r and r.get("success")] - failed = [r for r in results if r and not r.get("success")] + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) - if failed and self.request.retries < self.max_retries: - logger.warning( - "Some collections failed, retrying (%s/%s)", - self.request.retries + 1, - self.max_retries, + tickers = ["btc_usd", "eth_usd"] + tasks = [] + + for ticker in tickers: + task = loop.create_task(collect_price_for_ticker(ticker)) + tasks.append((ticker, task)) + + results = [] + for ticker, task in tasks: + try: + result = loop.run_until_complete(task) + results.append(result) + except Exception as error: + logger.error("Failed to collect %s price: %s", ticker, str(error)) + results.append( + {"ticker": ticker, "error": str(error), "success": False}, ) - raise self.retry(countdown=30) - return { - "successful": len(successful), - "failed": len(failed), - "results": results, - "timestamp": int(datetime.now(UTC).timestamp()), - } + successful = [r for r in results if r and r.get("success")] + failed = [r for r in results if r and not r.get("success")] - except Exception as error: - logger.error("Price collection task failed: %s", str(error)) - if self.request.retries < self.max_retries: - raise self.retry(exc=error) from error - raise + if failed and self.request.retries < self.max_retries: + logger.warning("Some collections failed, retrying...") + raise self.retry(countdown=30) - finally: - loop.close() + return { + "successful": len(successful), + "failed": len(failed), + "results": results, + "timestamp": int(datetime.now(UTC).timestamp()), + } @celery_app.task( name="app.tasks.price_collection.collect_single_price", max_retries=2, + ignore_result=False, ) -def collect_single_price(ticker: str) -> dict[str, Any] | None: +async def collect_single_price(ticker: str) -> dict[str, Any] | None: """ - Celery task to collect price for single ticker. + Async Celery task to collect price for single ticker. Args: ticker: Cryptocurrency ticker symbol. @@ -155,42 +155,67 @@ def collect_single_price(ticker: str) -> dict[str, Any] | None: logger.error("Unsupported ticker: %s", ticker) return None - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - result = loop.run_until_complete(_collect_price_for_ticker(ticker)) + result = await collect_price_for_ticker(ticker) return result except Exception as error: logger.error("Failed to collect %s price: %s", ticker, str(error)) - return None - finally: - loop.close() + from datetime import UTC, datetime + + return { + "ticker": ticker, + "error": str(error), + "timestamp": int(datetime.now(UTC).timestamp()), + "success": False, + } -@celery_app.task(name="app.tasks.price_collection.health_check") -def health_check() -> dict[str, Any]: +@celery_app.task( + name="app.tasks.price_collection.health_check", + ignore_result=False, +) +async def health_check() -> dict[str, Any]: """ - Health check task for price collection system. + Async health check task for price collection system. Returns: Dictionary with system health status. """ - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + from datetime import UTC, datetime try: deribit_client = DeribitClient() - api_healthy = loop.run_until_complete(deribit_client.health_check()) + api_healthy = await deribit_client.health_check() db_healthy = database_manager.is_initialized() + import redis.asyncio as redis + + from app.core import settings + + redis_healthy = False + try: + redis_client = redis.from_url( + settings.redis.url, + decode_responses=True, + ) + await redis_client.ping() + redis_healthy = True + await redis_client.close() + except Exception as e: + logger.warning("Redis health check failed: %s", e) + + overall_healthy = all([api_healthy, db_healthy, redis_healthy]) + return { - "status": "healthy" if api_healthy and db_healthy else "unhealthy", - "deribit_api": "available" if api_healthy else "unavailable", - "database": "initialized" if db_healthy else "not_initialized", + "status": "healthy" if overall_healthy else "unhealthy", + "components": { + "deribit_api": "available" if api_healthy else "unavailable", + "database": "initialized" if db_healthy else "not_initialized", + "redis": "available" if redis_healthy else "unavailable", + }, "timestamp": int(datetime.now(UTC).timestamp()), } @@ -201,6 +226,3 @@ def health_check() -> dict[str, Any]: "error": str(error), "timestamp": int(datetime.now(UTC).timestamp()), } - - finally: - loop.close() diff --git a/docker-compose.yml b/docker-compose.yml index 89c4a20..af3cbfe 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -82,12 +82,13 @@ services: volumes: - .:/app command: > - poetry run celery -A app.tasks.celery_app worker - --loglevel=info - --concurrency=2 - --queues=price_collection,celery,default + poetry run celery -A app.tasks.celery_app worker + --loglevel=info + --concurrency=${CELERY__WORKER_CONCURRENCY:-2} + --pool=solo + --queues=price_collection,celery,default env_file: - - .env + - .env celery_beat: build: . @@ -112,6 +113,7 @@ services: poetry run celery -A app.tasks.celery_app beat --loglevel=info --scheduler=celery.beat:PersistentScheduler + env_file: - .env diff --git a/poetry.lock b/poetry.lock index 768f42b..6bdba5a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -176,7 +176,7 @@ version = "1.18.1" description = "A database migration tool for SQLAlchemy." optional = false python-versions = ">=3.10" -groups = ["main", "dev"] +groups = ["main"] files = [ {file = "alembic-1.18.1-py3-none-any.whl", hash = "sha256:f1c3b0920b87134e851c25f1f7f236d8a332c34b75416802d06971df5d1b7810"}, {file = "alembic-1.18.1.tar.gz", hash = "sha256:83ac6b81359596816fb3b893099841a0862f2117b2963258e965d70dc62fb866"}, @@ -1454,7 +1454,7 @@ version = "3.3.1" description = "Lightweight in-process concurrent programming" optional = false python-versions = ">=3.10" -groups = ["main", "dev"] +groups = ["main"] markers = "platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\"" files = [ {file = "greenlet-3.3.1-cp310-cp310-macosx_11_0_universal2.whl", hash = "sha256:04bee4775f40ecefcdaa9d115ab44736cd4b9c5fba733575bfe9379419582e13"}, @@ -1806,7 +1806,7 @@ version = "1.3.10" description = "A super-fast templating language that borrows the best ideas from the existing templating languages." optional = false python-versions = ">=3.8" -groups = ["main", "dev"] +groups = ["main"] files = [ {file = "mako-1.3.10-py3-none-any.whl", hash = "sha256:baef24a52fc4fc514a0887ac600f9f1cff3d82c61d4d700a1fa84d597b88db59"}, {file = "mako-1.3.10.tar.gz", hash = "sha256:99579a6f39583fa7e5630a28c3c1f440e4e97a414b80372649c0ce338da2ea28"}, @@ -2826,6 +2826,24 @@ files = [ [package.extras] windows-terminal = ["colorama (>=0.4.6)"] +[[package]] +name = "pyjwt" +version = "2.10.1" +description = "JSON Web Token implementation in Python" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "PyJWT-2.10.1-py3-none-any.whl", hash = "sha256:dcdd193e30abefd5debf142f9adfcdd2b58004e644f25406ffaebd50bd98dacb"}, + {file = "pyjwt-2.10.1.tar.gz", hash = "sha256:3cc5772eb20009233caf06e9d8a0577824723b44e6648ee0a2aedb6cf9381953"}, +] + +[package.extras] +crypto = ["cryptography (>=3.4.0)"] +dev = ["coverage[toml] (==5.0.4)", "cryptography (>=3.4.0)", "pre-commit", "pytest (>=6.0.0,<7.0.0)", "sphinx", "sphinx-rtd-theme", "zope.interface"] +docs = ["sphinx", "sphinx-rtd-theme", "zope.interface"] +tests = ["coverage[toml] (==5.0.4)", "pytest (>=6.0.0,<7.0.0)"] + [[package]] name = "pytest" version = "8.4.2" @@ -3031,23 +3049,23 @@ files = [ [[package]] name = "redis" -version = "6.4.0" +version = "5.3.1" description = "Python client for Redis database and key-value store" optional = false -python-versions = ">=3.9" +python-versions = ">=3.8" groups = ["main"] files = [ - {file = "redis-6.4.0-py3-none-any.whl", hash = "sha256:f0544fa9604264e9464cdf4814e7d4830f74b165d52f2a330a760a88dd248b7f"}, - {file = "redis-6.4.0.tar.gz", hash = "sha256:b01bc7282b8444e28ec36b261df5375183bb47a07eb9c603f284e89cbc5ef010"}, + {file = "redis-5.3.1-py3-none-any.whl", hash = "sha256:dc1909bd24669cc31b5f67a039700b16ec30571096c5f1f0d9d2324bff31af97"}, + {file = "redis-5.3.1.tar.gz", hash = "sha256:ca49577a531ea64039b5a36db3d6cd1a0c7a60c34124d46924a45b956e8cf14c"}, ] [package.dependencies] async-timeout = {version = ">=4.0.3", markers = "python_full_version < \"3.11.3\""} +PyJWT = ">=2.9.0" [package.extras] -hiredis = ["hiredis (>=3.2.0)"] -jwt = ["pyjwt (>=2.9.0)"] -ocsp = ["cryptography (>=36.0.1)", "pyopenssl (>=20.0.1)", "requests (>=2.31.0)"] +hiredis = ["hiredis (>=3.0.0)"] +ocsp = ["cryptography (>=36.0.1)", "pyopenssl (==23.2.1)", "requests (>=2.31.0)"] [[package]] name = "regex" @@ -3607,7 +3625,7 @@ version = "2.0.46" description = "Database Abstraction Library" optional = false python-versions = ">=3.7" -groups = ["main", "dev"] +groups = ["main"] files = [ {file = "sqlalchemy-2.0.46-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:895296687ad06dc9b11a024cf68e8d9d3943aa0b4964278d2553b86f1b267735"}, {file = "sqlalchemy-2.0.46-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ab65cb2885a9f80f979b85aa4e9c9165a31381ca322cbde7c638fe6eefd1ec39"}, @@ -4350,4 +4368,4 @@ propcache = ">=0.2.1" [metadata] lock-version = "2.1" python-versions = ">=3.11" -content-hash = "2ca801b60fc7694b0f789a05ab399fce02a7f619e3d2955156aca3ee7facb1fb" +content-hash = "43d20a8208cbb6e226ae91ece8c23e88124f15fe1891e1bf7adfe2b79874f959" diff --git a/pyproject.toml b/pyproject.toml index b07109b..902e93f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,6 @@ pre-commit = "^4.5.1" bandit = "^1.9.3" safety = "^3.7.0" detect-secrets = "^1.5.0" -alembic = "^1.18.1" httpx = "^0.28.1" [build-system] From e8ccaca32ba9adf0ff1633b136d9ae44f4d5ce66 Mon Sep 17 00:00:00 2001 From: script-logic Date: Mon, 26 Jan 2026 23:02:46 +0300 Subject: [PATCH 5/6] fix: final --- .secrets.baseline | 4 +-- app/core/config.py | 1 - app/tasks/price_collection.py | 9 ----- tests/test_config.py | 66 ----------------------------------- tests/test_initialization.py | 4 +++ 5 files changed, 6 insertions(+), 78 deletions(-) diff --git a/.secrets.baseline b/.secrets.baseline index 2244f9b..95cfcfc 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -221,9 +221,9 @@ "filename": "tests\\test_config.py", "hashed_secret": "fca268ae2442d5cabc3e12d87b349adf8bf7d76c", "is_verified": false, - "line_number": 372 + "line_number": 306 } ] }, - "generated_at": "2026-01-26T15:32:39Z" + "generated_at": "2026-01-26T20:02:24Z" } diff --git a/app/core/config.py b/app/core/config.py index bb8cc0c..69d17fd 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -116,7 +116,6 @@ class CelerySettings(BaseModel): worker_concurrency: Number of concurrent worker processes. beat_enabled: Enable periodic task scheduling. task_track_started: Track when task starts execution. - # TODO """ worker_concurrency: int = Field(default=2, ge=1, le=10) diff --git a/app/tasks/price_collection.py b/app/tasks/price_collection.py index ce55a45..7a87cef 100644 --- a/app/tasks/price_collection.py +++ b/app/tasks/price_collection.py @@ -19,15 +19,6 @@ logger = get_logger(__name__) -def run_async(coroutine): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - return loop.run_until_complete(coroutine) - finally: - loop.close() - - async def collect_price_for_ticker(ticker: str) -> dict[str, Any] | None: """ Async function to collect price for single ticker. diff --git a/tests/test_config.py b/tests/test_config.py index f51b34c..012196a 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -265,72 +265,6 @@ def test_singleton_pattern(self): assert settings1 is settings2 assert id(settings1) == id(settings2) - def test_multiple_instantiation_prevention(self): - """Test that direct instantiation raises error.""" - get_settings( - database={ - "host": "localhost", - "port": 5432, - "user": "test", - "password": "test", - "db": "test", - }, - deribit_api={ - "client_id": None, - "client_secret": None, - }, - redis={ - "host": "localhost", - "port": 6379, - "db": 0, - }, - application={ - "debug": False, - "api_v1_prefix": "/api/v1", - "project_name": "Test", - "version": "1.0", - }, - cors={ - "origins": ["http://localhost:8000"], - }, - ) - - with pytest.raises(RuntimeError, match="singleton"): - get_settings( - database={ - "host": "localhost", - "port": 5432, - "user": "test", - "password": "test", - "db": "test", - }, - deribit_api={ - "client_id": None, - "client_secret": None, - }, - redis={ - "host": "localhost", - "port": 6379, - "db": 0, - }, - application={ - "debug": False, - "api_v1_prefix": "/api/v1", - "project_name": "Test", - "version": "1.0", - }, - cors={ - "origins": ["http://localhost:8000"], - }, - ) - - def test_get_settings_before_init(self): - """Test get_settings() raises error before initialization.""" - Settings._instance = None - - with pytest.raises(RuntimeError, match="not initialized"): - get_settings() - def test_init_settings_function(self): """Test init_settings() convenience function.""" settings = get_settings( diff --git a/tests/test_initialization.py b/tests/test_initialization.py index 66516bc..b49b6f4 100644 --- a/tests/test_initialization.py +++ b/tests/test_initialization.py @@ -117,6 +117,10 @@ def test_module_exports(self): "version", "core", "database", + "clients", + "api", + "services", + "tasks", } actual_exports = set(app.__all__) From d6cf27a7d3566dce990b5b3b6275b6c15c275008 Mon Sep 17 00:00:00 2001 From: script-logic Date: Mon, 26 Jan 2026 23:20:01 +0300 Subject: [PATCH 6/6] fix: final 2 --- .github/workflows/ci.yml | 13 ++++++++----- .gitlab-ci.yml | 8 ++++++++ .secrets.baseline | 8 ++++---- tests/test_config.py | 20 ++++++++++++++++++++ 4 files changed, 40 insertions(+), 9 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0510118..11aa3b6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -30,30 +30,33 @@ jobs: - name: Create .env file for tests run: | cat > .env << 'EOF' - # Database Configuration DATABASE__HOST=localhost DATABASE__PORT=5433 DATABASE__USER=test_user DATABASE__PASSWORD=test_password DATABASE__DB=test_db - # Deribit API Configuration DERIBIT_API__CLIENT_ID=test_client_id DERIBIT_API__CLIENT_SECRET=test_client_secret - # Redis Configuration REDIS__HOST=localhost REDIS__PORT=6379 REDIS__DB=0 + REDIS__PASSWORD=your_secure_password_or_empty_for_local_dev + REDIS__SSL=False + + CELERY__WORKER_CONCURRENCY=2 + CELERY__BEAT_ENABLED=True + CELERY__TASK_TRACK_STARTED=True - # Application Configuration APPLICATION__DEBUG=false APPLICATION__API_V1_PREFIX=/api/v1 APPLICATION__PROJECT_NAME=Deribit Price Tracker Test APPLICATION__VERSION=1.0.0 - # CORS Configuration CORS__ORIGINS=["http://localhost:8000"] + + APP_PORT=8000 EOF echo "=== Created .env file ===" diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 9d289be..7bcb3f0 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -33,6 +33,12 @@ before_script: REDIS__HOST=localhost REDIS__PORT=6379 REDIS__DB=0 + REDIS__PASSWORD=your_secure_password_or_empty_for_local_dev + REDIS__SSL=False + + CELERY__WORKER_CONCURRENCY=2 + CELERY__BEAT_ENABLED=True + CELERY__TASK_TRACK_STARTED=True APPLICATION__DEBUG=false APPLICATION__API_V1_PREFIX=/api/v1 @@ -40,6 +46,8 @@ before_script: APPLICATION__VERSION=1.0.0 CORS__ORIGINS=["http://localhost:8000"] + + APP_PORT=8000 '@ | Out-File -FilePath .env -Encoding UTF8 Write-Host "=== Created .env file ===" diff --git a/.secrets.baseline b/.secrets.baseline index 95cfcfc..2df9f64 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -127,7 +127,7 @@ { "type": "Secret Keyword", "filename": ".github\\workflows\\ci.yml", - "hashed_secret": "da64b94ccfb1a5e2a598831ed28878c880f60dfc", + "hashed_secret": "df842c49d8d3277a0170ffac5782a3cbe61b1feb", "is_verified": false, "line_number": 31 }, @@ -136,7 +136,7 @@ "filename": ".github\\workflows\\ci.yml", "hashed_secret": "dc5f72fcc64e44ece1aa8dfab21ddfce0fc8772b", "is_verified": false, - "line_number": 103 + "line_number": 106 } ], "alembic.ini": [ @@ -221,9 +221,9 @@ "filename": "tests\\test_config.py", "hashed_secret": "fca268ae2442d5cabc3e12d87b349adf8bf7d76c", "is_verified": false, - "line_number": 306 + "line_number": 316 } ] }, - "generated_at": "2026-01-26T20:02:24Z" + "generated_at": "2026-01-26T20:19:50Z" } diff --git a/tests/test_config.py b/tests/test_config.py index 012196a..dd2472e 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -249,6 +249,11 @@ def test_singleton_pattern(self): "port": 6379, "db": 0, }, + celery={ + "worker_concurrency": 2, + "beat_enabled": True, + "task_track_started": True, + }, application={ "debug": False, "api_v1_prefix": "/api/v1", @@ -284,6 +289,11 @@ def test_init_settings_function(self): "port": 6379, "db": 0, }, + celery={ + "worker_concurrency": 2, + "beat_enabled": True, + "task_track_started": True, + }, application={ "debug": False, "api_v1_prefix": "/api/v1", @@ -339,6 +349,11 @@ def test_log_initialization(self, caplog: pytest.LogCaptureFixture): "port": 6379, "db": 0, }, + celery={ + "worker_concurrency": 2, + "beat_enabled": True, + "task_track_started": True, + }, application={ "debug": True, "api_v1_prefix": "/api/v1", @@ -373,6 +388,11 @@ def test_settings_immutability(): "port": 6379, "db": 0, }, + celery={ + "worker_concurrency": 2, + "beat_enabled": True, + "task_track_started": True, + }, application={ "debug": False, "api_v1_prefix": "/api/v1",