Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,13 @@ experiment-migrate:
$(DOCKER_COMPOSE) exec experiment-service python -m bin.migrate --database-url "$${EXPERIMENT_DATABASE_URL:-postgresql://experiment_user:experiment_password@postgres:5432/experiment_db}"
@echo "✅ Миграции применены"

# Применение миграций telemetry-ingest-service (использует ту же experiment_db)
telemetry-ingest-migrate:
@echo "Применение миграций telemetry-ingest-service..."
@$(DOCKER_COMPOSE) exec -T telemetry-ingest-service python -m bin.migrate --database-url "$${EXPERIMENT_DATABASE_URL:-postgresql://experiment_user:experiment_password@postgres:5432/experiment_db}" || \
$(DOCKER_COMPOSE) exec telemetry-ingest-service python -m bin.migrate --database-url "$${EXPERIMENT_DATABASE_URL:-postgresql://experiment_user:experiment_password@postgres:5432/experiment_db}"
@echo "✅ Миграции telemetry-ingest-service применены"

# Создание базы данных script-service
script-create-db:
@echo "Создание базы данных script_db..."
Expand Down Expand Up @@ -699,5 +706,5 @@ infra-destroy:
@cd infrastructure/yandex-cloud && terraform destroy

.PHONY: mvp-demo-check
mvp-demo-check: dev-up auth-init experiment-migrate
mvp-demo-check: dev-up auth-init experiment-migrate telemetry-ingest-migrate
@bash scripts/mvp_demo_check.sh
104 changes: 103 additions & 1 deletion projects/backend/common/src/backend_common/db/migrations.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Database migrations helpers shared between services."""
from __future__ import annotations

import argparse
import asyncio
import hashlib
import os
from pathlib import Path
from typing import Any, Awaitable, Callable, Iterable, Protocol
from typing import Any, Awaitable, Callable, Dict, Iterable, Protocol

import asyncpg
from aiohttp import web
Expand Down Expand Up @@ -143,3 +145,103 @@ async def apply_migrations_on_startup(_app: web.Application) -> None:
await conn.close()

return apply_migrations_on_startup


def _cli_load_migrations(directory: Path) -> Dict[str, Path]:
if not directory.exists():
raise FileNotFoundError(f"Migrations directory does not exist: {directory}")
migrations: Dict[str, Path] = {}
for path in sorted(directory.glob("*.sql")):
version = path.stem
if version in migrations:
raise ValueError(f"Duplicate migration version detected: {version}")
migrations[version] = path
if not migrations:
raise ValueError(f"No *.sql files found in {directory}")
return migrations


async def _apply_migrations_cli(database_url: str, migrations_dir: Path, dry_run: bool) -> None:
migrations = _cli_load_migrations(migrations_dir)
conn = await asyncpg.connect(database_url)
try:
await conn.execute(
"""
CREATE TABLE IF NOT EXISTS schema_migrations (
version text PRIMARY KEY,
checksum text NOT NULL,
applied_at timestamptz NOT NULL DEFAULT now()
);
"""
)
rows = await conn.fetch("SELECT version, checksum FROM schema_migrations")
applied = {row["version"]: row["checksum"] for row in rows}
pending = []
for version, path in migrations.items():
sql = path.read_text(encoding="utf-8")
checksum = hashlib.sha256(sql.encode("utf-8")).hexdigest()
if version in applied:
if applied[version] != checksum:
raise RuntimeError(
f"Checksum mismatch for {version}: "
f"{applied[version]} (db) != {checksum} (file)"
)
continue
pending.append((version, path, sql, checksum))

if not pending:
print("No pending migrations.")
return

for version, path, sql, checksum in pending:
if dry_run:
print(f"[dry-run] Pending migration: {path.name}")
continue
print(f"Applying {path.name}...")
async with conn.transaction():
await conn.execute(sql)
await conn.execute(
"INSERT INTO schema_migrations (version, checksum) VALUES ($1, $2)",
version,
checksum,
)
if dry_run:
print(f"{len(pending)} migration(s) pending.")
else:
print(f"Applied {len(pending)} migration(s).")
finally:
await conn.close()


def run_migrate_cli(default_migrations_dir: Path) -> None:
"""CLI entry point for applying SQL migrations.

Intended for use in each service's bin/migrate.py:

from pathlib import Path
from backend_common.db.migrations import run_migrate_cli
run_migrate_cli(Path(__file__).resolve().parent.parent / "migrations")
"""
parser = argparse.ArgumentParser(description="Apply SQL migrations sequentially.")
parser.add_argument(
"--database-url",
"-d",
default=os.getenv("DATABASE_URL"),
help="PostgreSQL connection string. Defaults to DATABASE_URL env variable.",
)
parser.add_argument(
"--migrations-dir",
"-m",
type=Path,
default=default_migrations_dir,
help="Directory with *.sql migrations (sorted lexicographically).",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Only list pending migrations without applying.",
)
args = parser.parse_args()
if not args.database_url:
raise SystemExit("Database URL must be provided via --database-url or DATABASE_URL env.")
asyncio.run(_apply_migrations_cli(args.database_url, args.migrations_dir, args.dry_run))
123 changes: 2 additions & 121 deletions projects/backend/services/auth-service/bin/migrate.py
Original file line number Diff line number Diff line change
@@ -1,126 +1,7 @@
#!/usr/bin/env python3
"""Minimal SQL migration runner for Auth Service."""
# pyright: reportMissingImports=false
from __future__ import annotations

import argparse
import asyncio
import hashlib
import os
from pathlib import Path
from typing import Dict

import asyncpg


def _default_migrations_dir() -> Path:
return Path(__file__).resolve().parent.parent / "migrations"


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Apply SQL migrations sequentially.")
parser.add_argument(
"--database-url",
"-d",
default=os.getenv("DATABASE_URL"),
help="PostgreSQL connection string. Defaults to DATABASE_URL env variable.",
)
parser.add_argument(
"--migrations-dir",
"-m",
type=Path,
default=_default_migrations_dir(),
help="Directory with *.sql migrations (sorted lexicographically).",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Only list pending migrations without applying.",
)
return parser.parse_args()


async def ensure_schema_table(conn: asyncpg.Connection) -> None:
await conn.execute(
"""
CREATE TABLE IF NOT EXISTS schema_migrations (
version text PRIMARY KEY,
checksum text NOT NULL,
applied_at timestamptz NOT NULL DEFAULT now()
);
"""
)


def load_migrations(directory: Path) -> Dict[str, Path]:
if not directory.exists():
raise FileNotFoundError(f"Migrations directory does not exist: {directory}")
migrations: Dict[str, Path] = {}
for path in sorted(directory.glob("*.sql")):
version = path.stem
if version in migrations:
raise ValueError(f"Duplicate migration version detected: {version}")
migrations[version] = path
if not migrations:
raise ValueError(f"No *.sql files found in {directory}")
return migrations


async def apply_migrations(database_url: str, migrations_dir: Path, dry_run: bool) -> None:
migrations = load_migrations(migrations_dir)
conn = await asyncpg.connect(database_url)
try:
await ensure_schema_table(conn)
rows = await conn.fetch("SELECT version, checksum FROM schema_migrations")
applied = {row["version"]: row["checksum"] for row in rows}
pending = []
for version, path in migrations.items():
sql = path.read_text(encoding="utf-8")
checksum = hashlib.sha256(sql.encode("utf-8")).hexdigest()
if version in applied:
if applied[version] != checksum:
raise RuntimeError(
f"Checksum mismatch for {version}: "
f"{applied[version]} (db) != {checksum} (file)"
)
continue
pending.append((version, path, sql, checksum))

if not pending:
print("No pending migrations.")
return

for version, path, sql, checksum in pending:
if dry_run:
print(f"[dry-run] Pending migration: {path.name}")
continue
print(f"Applying {path.name}...")
async with conn.transaction():
await conn.execute(sql)
await conn.execute(
"INSERT INTO schema_migrations (version, checksum) VALUES ($1, $2)",
version,
checksum,
)
if dry_run:
print(f"{len(pending)} migration(s) pending.")
else:
print(f"Applied {len(pending)} migration(s).")
finally:
await conn.close()


async def main_async() -> None:
args = parse_args()
if not args.database_url:
raise SystemExit("Database URL must be provided via --database-url or DATABASE_URL env.")
await apply_migrations(args.database_url, args.migrations_dir, args.dry_run)


def main() -> None:
asyncio.run(main_async())

from backend_common.db.migrations import run_migrate_cli

if __name__ == "__main__":
main()

run_migrate_cli(Path(__file__).resolve().parent.parent / "migrations")
123 changes: 2 additions & 121 deletions projects/backend/services/experiment-service/bin/migrate.py
Original file line number Diff line number Diff line change
@@ -1,126 +1,7 @@
#!/usr/bin/env python3
"""Minimal SQL migration runner for Experiment Service."""
# pyright: reportMissingImports=false
from __future__ import annotations

import argparse
import asyncio
import hashlib
import os
from pathlib import Path
from typing import Dict

import asyncpg


def _default_migrations_dir() -> Path:
return Path(__file__).resolve().parent.parent / "migrations"


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Apply SQL migrations sequentially.")
parser.add_argument(
"--database-url",
"-d",
default=os.getenv("DATABASE_URL"),
help="PostgreSQL connection string. Defaults to DATABASE_URL env variable.",
)
parser.add_argument(
"--migrations-dir",
"-m",
type=Path,
default=_default_migrations_dir(),
help="Directory with *.sql migrations (sorted lexicographically).",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Only list pending migrations without applying.",
)
return parser.parse_args()


async def ensure_schema_table(conn: asyncpg.Connection) -> None:
await conn.execute(
"""
CREATE TABLE IF NOT EXISTS schema_migrations (
version text PRIMARY KEY,
checksum text NOT NULL,
applied_at timestamptz NOT NULL DEFAULT now()
);
"""
)


def load_migrations(directory: Path) -> Dict[str, Path]:
if not directory.exists():
raise FileNotFoundError(f"Migrations directory does not exist: {directory}")
migrations: Dict[str, Path] = {}
for path in sorted(directory.glob("*.sql")):
version = path.stem
if version in migrations:
raise ValueError(f"Duplicate migration version detected: {version}")
migrations[version] = path
if not migrations:
raise ValueError(f"No *.sql files found in {directory}")
return migrations


async def apply_migrations(database_url: str, migrations_dir: Path, dry_run: bool) -> None:
migrations = load_migrations(migrations_dir)
conn = await asyncpg.connect(database_url)
try:
await ensure_schema_table(conn)
rows = await conn.fetch("SELECT version, checksum FROM schema_migrations")
applied = {row["version"]: row["checksum"] for row in rows}
pending = []
for version, path in migrations.items():
sql = path.read_text(encoding="utf-8")
checksum = hashlib.sha256(sql.encode("utf-8")).hexdigest()
if version in applied:
if applied[version] != checksum:
raise RuntimeError(
f"Checksum mismatch for {version}: "
f"{applied[version]} (db) != {checksum} (file)"
)
continue
pending.append((version, path, sql, checksum))

if not pending:
print("No pending migrations.")
return

for version, path, sql, checksum in pending:
if dry_run:
print(f"[dry-run] Pending migration: {path.name}")
continue
print(f"Applying {path.name}...")
async with conn.transaction():
await conn.execute(sql)
await conn.execute(
"INSERT INTO schema_migrations (version, checksum) VALUES ($1, $2)",
version,
checksum,
)
if dry_run:
print(f"{len(pending)} migration(s) pending.")
else:
print(f"Applied {len(pending)} migration(s).")
finally:
await conn.close()


async def main_async() -> None:
args = parse_args()
if not args.database_url:
raise SystemExit("Database URL must be provided via --database-url or DATABASE_URL env.")
await apply_migrations(args.database_url, args.migrations_dir, args.dry_run)


def main() -> None:
asyncio.run(main_async())

from backend_common.db.migrations import run_migrate_cli

if __name__ == "__main__":
main()

run_migrate_cli(Path(__file__).resolve().parent.parent / "migrations")
Loading
Loading