diff --git a/Makefile b/Makefile index ab85d37c..327e701d 100644 --- a/Makefile +++ b/Makefile @@ -660,6 +660,13 @@ experiment-migrate: $(DOCKER_COMPOSE) exec experiment-service python -m bin.migrate --database-url "$${EXPERIMENT_DATABASE_URL:-postgresql://experiment_user:experiment_password@postgres:5432/experiment_db}" @echo "✅ Миграции применены" +# Применение миграций telemetry-ingest-service (использует ту же experiment_db) +telemetry-ingest-migrate: + @echo "Применение миграций telemetry-ingest-service..." + @$(DOCKER_COMPOSE) exec -T telemetry-ingest-service python -m bin.migrate --database-url "$${EXPERIMENT_DATABASE_URL:-postgresql://experiment_user:experiment_password@postgres:5432/experiment_db}" || \ + $(DOCKER_COMPOSE) exec telemetry-ingest-service python -m bin.migrate --database-url "$${EXPERIMENT_DATABASE_URL:-postgresql://experiment_user:experiment_password@postgres:5432/experiment_db}" + @echo "✅ Миграции telemetry-ingest-service применены" + # Создание базы данных script-service script-create-db: @echo "Создание базы данных script_db..." @@ -699,5 +706,5 @@ infra-destroy: @cd infrastructure/yandex-cloud && terraform destroy .PHONY: mvp-demo-check -mvp-demo-check: dev-up auth-init experiment-migrate +mvp-demo-check: dev-up auth-init experiment-migrate telemetry-ingest-migrate @bash scripts/mvp_demo_check.sh diff --git a/projects/backend/common/src/backend_common/db/migrations.py b/projects/backend/common/src/backend_common/db/migrations.py index 1bfa640b..95dbd44d 100644 --- a/projects/backend/common/src/backend_common/db/migrations.py +++ b/projects/backend/common/src/backend_common/db/migrations.py @@ -1,10 +1,12 @@ """Database migrations helpers shared between services.""" from __future__ import annotations +import argparse import asyncio import hashlib +import os from pathlib import Path -from typing import Any, Awaitable, Callable, Iterable, Protocol +from typing import Any, Awaitable, Callable, Dict, Iterable, Protocol import asyncpg from aiohttp import web @@ -143,3 +145,103 @@ async def apply_migrations_on_startup(_app: web.Application) -> None: await conn.close() return apply_migrations_on_startup + + +def _cli_load_migrations(directory: Path) -> Dict[str, Path]: + if not directory.exists(): + raise FileNotFoundError(f"Migrations directory does not exist: {directory}") + migrations: Dict[str, Path] = {} + for path in sorted(directory.glob("*.sql")): + version = path.stem + if version in migrations: + raise ValueError(f"Duplicate migration version detected: {version}") + migrations[version] = path + if not migrations: + raise ValueError(f"No *.sql files found in {directory}") + return migrations + + +async def _apply_migrations_cli(database_url: str, migrations_dir: Path, dry_run: bool) -> None: + migrations = _cli_load_migrations(migrations_dir) + conn = await asyncpg.connect(database_url) + try: + await conn.execute( + """ + CREATE TABLE IF NOT EXISTS schema_migrations ( + version text PRIMARY KEY, + checksum text NOT NULL, + applied_at timestamptz NOT NULL DEFAULT now() + ); + """ + ) + rows = await conn.fetch("SELECT version, checksum FROM schema_migrations") + applied = {row["version"]: row["checksum"] for row in rows} + pending = [] + for version, path in migrations.items(): + sql = path.read_text(encoding="utf-8") + checksum = hashlib.sha256(sql.encode("utf-8")).hexdigest() + if version in applied: + if applied[version] != checksum: + raise RuntimeError( + f"Checksum mismatch for {version}: " + f"{applied[version]} (db) != {checksum} (file)" + ) + continue + pending.append((version, path, sql, checksum)) + + if not pending: + print("No pending migrations.") + return + + for version, path, sql, checksum in pending: + if dry_run: + print(f"[dry-run] Pending migration: {path.name}") + continue + print(f"Applying {path.name}...") + async with conn.transaction(): + await conn.execute(sql) + await conn.execute( + "INSERT INTO schema_migrations (version, checksum) VALUES ($1, $2)", + version, + checksum, + ) + if dry_run: + print(f"{len(pending)} migration(s) pending.") + else: + print(f"Applied {len(pending)} migration(s).") + finally: + await conn.close() + + +def run_migrate_cli(default_migrations_dir: Path) -> None: + """CLI entry point for applying SQL migrations. + + Intended for use in each service's bin/migrate.py: + + from pathlib import Path + from backend_common.db.migrations import run_migrate_cli + run_migrate_cli(Path(__file__).resolve().parent.parent / "migrations") + """ + parser = argparse.ArgumentParser(description="Apply SQL migrations sequentially.") + parser.add_argument( + "--database-url", + "-d", + default=os.getenv("DATABASE_URL"), + help="PostgreSQL connection string. Defaults to DATABASE_URL env variable.", + ) + parser.add_argument( + "--migrations-dir", + "-m", + type=Path, + default=default_migrations_dir, + help="Directory with *.sql migrations (sorted lexicographically).", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Only list pending migrations without applying.", + ) + args = parser.parse_args() + if not args.database_url: + raise SystemExit("Database URL must be provided via --database-url or DATABASE_URL env.") + asyncio.run(_apply_migrations_cli(args.database_url, args.migrations_dir, args.dry_run)) diff --git a/projects/backend/services/auth-service/bin/migrate.py b/projects/backend/services/auth-service/bin/migrate.py index da18df16..7eab65d1 100755 --- a/projects/backend/services/auth-service/bin/migrate.py +++ b/projects/backend/services/auth-service/bin/migrate.py @@ -1,126 +1,7 @@ #!/usr/bin/env python3 -"""Minimal SQL migration runner for Auth Service.""" -# pyright: reportMissingImports=false -from __future__ import annotations - -import argparse -import asyncio -import hashlib -import os from pathlib import Path -from typing import Dict - -import asyncpg - - -def _default_migrations_dir() -> Path: - return Path(__file__).resolve().parent.parent / "migrations" - - -def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="Apply SQL migrations sequentially.") - parser.add_argument( - "--database-url", - "-d", - default=os.getenv("DATABASE_URL"), - help="PostgreSQL connection string. Defaults to DATABASE_URL env variable.", - ) - parser.add_argument( - "--migrations-dir", - "-m", - type=Path, - default=_default_migrations_dir(), - help="Directory with *.sql migrations (sorted lexicographically).", - ) - parser.add_argument( - "--dry-run", - action="store_true", - help="Only list pending migrations without applying.", - ) - return parser.parse_args() - - -async def ensure_schema_table(conn: asyncpg.Connection) -> None: - await conn.execute( - """ - CREATE TABLE IF NOT EXISTS schema_migrations ( - version text PRIMARY KEY, - checksum text NOT NULL, - applied_at timestamptz NOT NULL DEFAULT now() - ); - """ - ) - - -def load_migrations(directory: Path) -> Dict[str, Path]: - if not directory.exists(): - raise FileNotFoundError(f"Migrations directory does not exist: {directory}") - migrations: Dict[str, Path] = {} - for path in sorted(directory.glob("*.sql")): - version = path.stem - if version in migrations: - raise ValueError(f"Duplicate migration version detected: {version}") - migrations[version] = path - if not migrations: - raise ValueError(f"No *.sql files found in {directory}") - return migrations - - -async def apply_migrations(database_url: str, migrations_dir: Path, dry_run: bool) -> None: - migrations = load_migrations(migrations_dir) - conn = await asyncpg.connect(database_url) - try: - await ensure_schema_table(conn) - rows = await conn.fetch("SELECT version, checksum FROM schema_migrations") - applied = {row["version"]: row["checksum"] for row in rows} - pending = [] - for version, path in migrations.items(): - sql = path.read_text(encoding="utf-8") - checksum = hashlib.sha256(sql.encode("utf-8")).hexdigest() - if version in applied: - if applied[version] != checksum: - raise RuntimeError( - f"Checksum mismatch for {version}: " - f"{applied[version]} (db) != {checksum} (file)" - ) - continue - pending.append((version, path, sql, checksum)) - - if not pending: - print("No pending migrations.") - return - - for version, path, sql, checksum in pending: - if dry_run: - print(f"[dry-run] Pending migration: {path.name}") - continue - print(f"Applying {path.name}...") - async with conn.transaction(): - await conn.execute(sql) - await conn.execute( - "INSERT INTO schema_migrations (version, checksum) VALUES ($1, $2)", - version, - checksum, - ) - if dry_run: - print(f"{len(pending)} migration(s) pending.") - else: - print(f"Applied {len(pending)} migration(s).") - finally: - await conn.close() - - -async def main_async() -> None: - args = parse_args() - if not args.database_url: - raise SystemExit("Database URL must be provided via --database-url or DATABASE_URL env.") - await apply_migrations(args.database_url, args.migrations_dir, args.dry_run) - - -def main() -> None: - asyncio.run(main_async()) +from backend_common.db.migrations import run_migrate_cli if __name__ == "__main__": - main() - + run_migrate_cli(Path(__file__).resolve().parent.parent / "migrations") diff --git a/projects/backend/services/experiment-service/bin/migrate.py b/projects/backend/services/experiment-service/bin/migrate.py index 0d9a3552..7eab65d1 100644 --- a/projects/backend/services/experiment-service/bin/migrate.py +++ b/projects/backend/services/experiment-service/bin/migrate.py @@ -1,126 +1,7 @@ #!/usr/bin/env python3 -"""Minimal SQL migration runner for Experiment Service.""" -# pyright: reportMissingImports=false -from __future__ import annotations - -import argparse -import asyncio -import hashlib -import os from pathlib import Path -from typing import Dict - -import asyncpg - - -def _default_migrations_dir() -> Path: - return Path(__file__).resolve().parent.parent / "migrations" - - -def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="Apply SQL migrations sequentially.") - parser.add_argument( - "--database-url", - "-d", - default=os.getenv("DATABASE_URL"), - help="PostgreSQL connection string. Defaults to DATABASE_URL env variable.", - ) - parser.add_argument( - "--migrations-dir", - "-m", - type=Path, - default=_default_migrations_dir(), - help="Directory with *.sql migrations (sorted lexicographically).", - ) - parser.add_argument( - "--dry-run", - action="store_true", - help="Only list pending migrations without applying.", - ) - return parser.parse_args() - - -async def ensure_schema_table(conn: asyncpg.Connection) -> None: - await conn.execute( - """ - CREATE TABLE IF NOT EXISTS schema_migrations ( - version text PRIMARY KEY, - checksum text NOT NULL, - applied_at timestamptz NOT NULL DEFAULT now() - ); - """ - ) - - -def load_migrations(directory: Path) -> Dict[str, Path]: - if not directory.exists(): - raise FileNotFoundError(f"Migrations directory does not exist: {directory}") - migrations: Dict[str, Path] = {} - for path in sorted(directory.glob("*.sql")): - version = path.stem - if version in migrations: - raise ValueError(f"Duplicate migration version detected: {version}") - migrations[version] = path - if not migrations: - raise ValueError(f"No *.sql files found in {directory}") - return migrations - - -async def apply_migrations(database_url: str, migrations_dir: Path, dry_run: bool) -> None: - migrations = load_migrations(migrations_dir) - conn = await asyncpg.connect(database_url) - try: - await ensure_schema_table(conn) - rows = await conn.fetch("SELECT version, checksum FROM schema_migrations") - applied = {row["version"]: row["checksum"] for row in rows} - pending = [] - for version, path in migrations.items(): - sql = path.read_text(encoding="utf-8") - checksum = hashlib.sha256(sql.encode("utf-8")).hexdigest() - if version in applied: - if applied[version] != checksum: - raise RuntimeError( - f"Checksum mismatch for {version}: " - f"{applied[version]} (db) != {checksum} (file)" - ) - continue - pending.append((version, path, sql, checksum)) - - if not pending: - print("No pending migrations.") - return - - for version, path, sql, checksum in pending: - if dry_run: - print(f"[dry-run] Pending migration: {path.name}") - continue - print(f"Applying {path.name}...") - async with conn.transaction(): - await conn.execute(sql) - await conn.execute( - "INSERT INTO schema_migrations (version, checksum) VALUES ($1, $2)", - version, - checksum, - ) - if dry_run: - print(f"{len(pending)} migration(s) pending.") - else: - print(f"Applied {len(pending)} migration(s).") - finally: - await conn.close() - - -async def main_async() -> None: - args = parse_args() - if not args.database_url: - raise SystemExit("Database URL must be provided via --database-url or DATABASE_URL env.") - await apply_migrations(args.database_url, args.migrations_dir, args.dry_run) - - -def main() -> None: - asyncio.run(main_async()) +from backend_common.db.migrations import run_migrate_cli if __name__ == "__main__": - main() - + run_migrate_cli(Path(__file__).resolve().parent.parent / "migrations") diff --git a/projects/backend/services/telemetry-ingest-service/Dockerfile b/projects/backend/services/telemetry-ingest-service/Dockerfile index b2769eb3..8bedb3b9 100644 --- a/projects/backend/services/telemetry-ingest-service/Dockerfile +++ b/projects/backend/services/telemetry-ingest-service/Dockerfile @@ -20,6 +20,8 @@ RUN poetry install --no-root # Copy source code COPY services/telemetry-ingest-service/src ./src COPY services/telemetry-ingest-service/openapi ./openapi +COPY services/telemetry-ingest-service/bin ./bin +COPY services/telemetry-ingest-service/migrations ./migrations # Set PYTHONPATH to include src directory and backend-common ENV PYTHONPATH=/app/services/telemetry-ingest-service/src:/app/common/src diff --git a/projects/backend/services/telemetry-ingest-service/bin/migrate.py b/projects/backend/services/telemetry-ingest-service/bin/migrate.py new file mode 100644 index 00000000..7eab65d1 --- /dev/null +++ b/projects/backend/services/telemetry-ingest-service/bin/migrate.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 +from pathlib import Path + +from backend_common.db.migrations import run_migrate_cli + +if __name__ == "__main__": + run_migrate_cli(Path(__file__).resolve().parent.parent / "migrations")