Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .hyperloop/checks/check-no-api-simulation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ check_dir() {
--include="*.js" \
--exclude="*.test.*" \
--exclude="*.spec.*" \
--exclude-dir=.venv \
-E "new Promise[^)]*setTimeout[[:space:]]*\([[:space:]]*resolve" \
"$dir" 2>/dev/null || true)

Expand Down
96 changes: 96 additions & 0 deletions src/api/tests/fakes/iam.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""In-memory fakes for IAM bounded context ports.

Provides fast, self-contained test doubles for:
- TenantServiceProbe

These fakes implement the full port protocols using in-memory storage and
record all method calls so tests can assert on behavior without resorting
to MagicMock or AsyncMock. They follow the "Fakes over Mocks" principle
from specs/nfr/testing.spec.md.
"""

from __future__ import annotations


class RecordingTenantServiceProbe:
"""Concrete recording probe implementing TenantServiceProbe protocol.

Records every method call in per-method lists so tests can assert on
which events were raised and what arguments were passed. This is a
concrete class — NOT a MagicMock(spec=...) — as required by the testing
NFR (specs/nfr/testing.spec.md).
"""

def __init__(self) -> None:
self.tenant_created_calls: list[dict[str, str]] = []
self.tenant_retrieved_calls: list[dict[str, str]] = []
self.tenants_listed_calls: list[dict[str, object]] = []
self.tenant_deleted_calls: list[dict[str, str]] = []
self.tenant_not_found_calls: list[dict[str, str]] = []
self.duplicate_tenant_name_calls: list[dict[str, str]] = []
self.tenant_member_added_calls: list[dict[str, object]] = []
self.tenant_member_removed_calls: list[dict[str, str]] = []
self.tenant_members_listed_calls: list[dict[str, object]] = []
self.tenant_cascade_deletion_started_calls: list[dict[str, object]] = []

def tenant_created(self, tenant_id: str, name: str) -> None:
self.tenant_created_calls.append({"tenant_id": tenant_id, "name": name})

def tenant_retrieved(self, tenant_id: str) -> None:
self.tenant_retrieved_calls.append({"tenant_id": tenant_id})

def tenants_listed(self, count: int) -> None:
self.tenants_listed_calls.append({"count": count})

def tenant_deleted(self, tenant_id: str) -> None:
self.tenant_deleted_calls.append({"tenant_id": tenant_id})

def tenant_not_found(self, tenant_id: str) -> None:
self.tenant_not_found_calls.append({"tenant_id": tenant_id})

def duplicate_tenant_name(self, name: str) -> None:
self.duplicate_tenant_name_calls.append({"name": name})

def tenant_member_added(
self, tenant_id: str, user_id: str, role: str, added_by: str | None
) -> None:
self.tenant_member_added_calls.append(
{
"tenant_id": tenant_id,
"user_id": user_id,
"role": role,
"added_by": added_by,
}
)

def tenant_member_removed(
self, tenant_id: str, user_id: str, removed_by: str
) -> None:
self.tenant_member_removed_calls.append(
{"tenant_id": tenant_id, "user_id": user_id, "removed_by": removed_by}
)

def tenant_members_listed(self, tenant_id: str, member_count: int) -> None:
self.tenant_members_listed_calls.append(
{"tenant_id": tenant_id, "member_count": member_count}
)

def tenant_cascade_deletion_started(
self,
tenant_id: str,
workspaces_count: int,
groups_count: int,
api_keys_count: int,
) -> None:
self.tenant_cascade_deletion_started_calls.append(
{
"tenant_id": tenant_id,
"workspaces_count": workspaces_count,
"groups_count": groups_count,
"api_keys_count": api_keys_count,
}
)

def with_context(self, context: object) -> "RecordingTenantServiceProbe":
"""Return self — context is not used in tests."""
return self
104 changes: 104 additions & 0 deletions src/api/tests/fakes/management.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
Provides fast, self-contained test doubles for:
- IKnowledgeGraphRepository
- IDataSourceRepository
- IDataSourceSyncRunRepository
- ISecretStoreRepository
- KnowledgeGraphServiceProbe
- DataSourceServiceProbe

These fakes implement the full port protocols using in-memory storage and
record all mutation calls so tests can assert on behavior without resorting
Expand All @@ -15,6 +17,7 @@
from __future__ import annotations

from management.domain.aggregates import DataSource, KnowledgeGraph
from management.domain.entities import DataSourceSyncRun
from management.domain.value_objects import DataSourceId, KnowledgeGraphId


Expand Down Expand Up @@ -236,3 +239,104 @@ def permission_denied(
def with_context(self, context: object) -> "RecordingKnowledgeGraphServiceProbe":
"""Return self — context is not used in tests."""
return self


class InMemoryDataSourceSyncRunRepository:
"""In-memory fake implementing IDataSourceSyncRunRepository.

Stores DataSourceSyncRun entities in a dict keyed by run ID.
Records all save calls so tests can assert on call count and args.
"""

def __init__(self) -> None:
self._store: dict[str, DataSourceSyncRun] = {}
self.saved: list[DataSourceSyncRun] = []

def seed(self, *sync_runs: DataSourceSyncRun) -> None:
"""Pre-populate the store (used in test setup)."""
for run in sync_runs:
self._store[run.id] = run

async def save(self, sync_run: DataSourceSyncRun) -> None:
self.saved.append(sync_run)
self._store[sync_run.id] = sync_run

async def get_by_id(self, sync_run_id: str) -> DataSourceSyncRun | None:
return self._store.get(sync_run_id)

async def find_by_data_source(self, data_source_id: str) -> list[DataSourceSyncRun]:
return [r for r in self._store.values() if r.data_source_id == data_source_id]


class RecordingDataSourceServiceProbe:
"""Concrete recording probe implementing DataSourceServiceProbe protocol.

Records every method call in per-method lists so tests can assert on
which events were raised and what arguments were passed. This is a
concrete class — NOT a MagicMock(spec=...) — as required by the testing
NFR (specs/nfr/testing.spec.md).
"""

def __init__(self) -> None:
self.data_source_created_calls: list[dict[str, str]] = []
self.data_source_creation_failed_calls: list[dict[str, str]] = []
self.data_source_retrieved_calls: list[dict[str, str]] = []
self.data_source_updated_calls: list[dict[str, str]] = []
self.data_source_deleted_calls: list[dict[str, str]] = []
self.data_source_deletion_failed_calls: list[dict[str, str]] = []
self.data_sources_listed_calls: list[dict[str, object]] = []
self.sync_requested_calls: list[dict[str, str]] = []
self.permission_denied_calls: list[dict[str, str]] = []

def data_source_created(
self,
ds_id: str,
kg_id: str,
tenant_id: str,
name: str,
) -> None:
self.data_source_created_calls.append(
{"ds_id": ds_id, "kg_id": kg_id, "tenant_id": tenant_id, "name": name}
)

def data_source_creation_failed(
self,
kg_id: str,
name: str,
error: str,
) -> None:
self.data_source_creation_failed_calls.append(
{"kg_id": kg_id, "name": name, "error": error}
)

def data_source_retrieved(self, ds_id: str) -> None:
self.data_source_retrieved_calls.append({"ds_id": ds_id})

def data_source_updated(self, ds_id: str, name: str) -> None:
self.data_source_updated_calls.append({"ds_id": ds_id, "name": name})

def data_source_deleted(self, ds_id: str) -> None:
self.data_source_deleted_calls.append({"ds_id": ds_id})

def data_source_deletion_failed(self, ds_id: str, error: str) -> None:
self.data_source_deletion_failed_calls.append({"ds_id": ds_id, "error": error})

def data_sources_listed(self, kg_id: str, count: int) -> None:
self.data_sources_listed_calls.append({"kg_id": kg_id, "count": count})

def sync_requested(self, ds_id: str) -> None:
self.sync_requested_calls.append({"ds_id": ds_id})

def permission_denied(
self,
user_id: str,
resource_id: str,
permission: str,
) -> None:
self.permission_denied_calls.append(
{"user_id": user_id, "resource_id": resource_id, "permission": permission}
)

def with_context(self, context: object) -> "RecordingDataSourceServiceProbe":
"""Return self — context is not used in tests."""
return self
168 changes: 168 additions & 0 deletions src/api/tests/integration/iam/test_group_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
"""Integration tests for GroupService transactional atomicity.

These tests verify that group deletion is atomic: if the transaction fails
at any point, the group record is NOT deleted (no partial state).

NOTE: Rollback semantics cannot be verified with mock sessions. These tests
require a real PostgreSQL connection.
"""

from __future__ import annotations

import pytest
from sqlalchemy.ext.asyncio import AsyncSession

from iam.application.services.group_service import GroupService
from iam.domain.aggregates import Group
from iam.domain.value_objects import TenantId, UserId
from iam.infrastructure.group_repository import GroupRepository
from infrastructure.outbox.repository import OutboxRepository
from shared_kernel.authorization.types import (
ResourceType,
format_resource,
format_subject,
)
from tests.fakes.authorization import InMemoryAuthorizationProvider

pytestmark = pytest.mark.integration


class TestGroupServiceDeleteRollback:
"""Tests that group deletion rolls back fully on transaction failure.

GroupService.delete_group() wraps the delete inside
``async with self._session.begin()``. If an exception escapes that block,
SQLAlchemy must roll back the entire unit of work. These tests confirm
that invariant holds at the real-database level.

This class verifies that the SERVICE-LEVEL transaction boundary provides
correct rollback semantics, not just the repository layer.
"""

@pytest.mark.asyncio
async def test_group_service_delete_rollback_on_failure(
self,
async_session: AsyncSession,
test_tenant: TenantId,
clean_iam_data: None,
) -> None:
"""When group deletion fails mid-transaction, the group is not deleted.

Creates a group, starts a deletion via GroupService (which uses a
``async with session.begin()`` block), injects a failure inside the
transaction, and asserts the group still exists after — verifying
full transactional rollback at the service level.
"""
outbox = OutboxRepository(session=async_session)
authz = InMemoryAuthorizationProvider()
group_repo = GroupRepository(
session=async_session,
authz=authz,
outbox=outbox,
)

# Arrange: create a group for the test tenant
group = Group.create(name="Rollback Test Group", tenant_id=test_tenant)
async with async_session.begin():
await group_repo.save(group)

# Grant MANAGE permission so the authorization check (outside the
# session.begin() block) passes.
resource = format_resource(ResourceType.GROUP, group.id.value)
subject = format_subject(ResourceType.USER, "test-user")
await authz.write_relationship(
resource=resource, relation="admin", subject=subject
)

# Arrange: failing repo subclass that raises RuntimeError on delete()
class FailingOnDeleteGroupRepository(GroupRepository):
"""Raises RuntimeError when delete() is called.

Simulates a failure that occurs inside the service's
``async with session.begin()`` block, after all reads have
completed but before the transaction can commit.
"""

async def delete(self, g: Group) -> bool:
raise RuntimeError(
"Simulated group deletion failure to verify service rollback"
)

failing_repo = FailingOnDeleteGroupRepository(
session=async_session,
authz=authz,
outbox=outbox,
)

svc = GroupService(
session=async_session,
group_repository=failing_repo,
authz=authz,
scope_to_tenant=test_tenant,
)

# Act: delete must raise (the repo is wired to fail)
with pytest.raises(RuntimeError, match="Simulated group deletion failure"):
await svc.delete_group(
group_id=group.id,
user_id=UserId(value="test-user"),
)

# Assert: transaction rolled back — group still exists in the database
retrieved = await group_repo.get_by_id(group.id)
assert retrieved is not None, (
"Group must not be deleted when the service-level transaction rolls back; "
"the async with session.begin() block must roll back completely."
)

@pytest.mark.asyncio
async def test_group_service_delete_commits_fully_on_success(
self,
async_session: AsyncSession,
test_tenant: TenantId,
clean_iam_data: None,
) -> None:
"""When delete succeeds, the group is removed from the database.

Verifies the happy path of the service-level delete to complement the
rollback test — the transaction must commit and leave no orphaned rows.
"""
outbox = OutboxRepository(session=async_session)
authz = InMemoryAuthorizationProvider()
group_repo = GroupRepository(
session=async_session,
authz=authz,
outbox=outbox,
)

# Arrange: create a group
group = Group.create(name="Happy Path Delete Group", tenant_id=test_tenant)
async with async_session.begin():
await group_repo.save(group)

# Grant MANAGE permission
resource = format_resource(ResourceType.GROUP, group.id.value)
subject = format_subject(ResourceType.USER, "test-user")
await authz.write_relationship(
resource=resource, relation="admin", subject=subject
)

svc = GroupService(
session=async_session,
group_repository=group_repo,
authz=authz,
scope_to_tenant=test_tenant,
)

# Act: successful delete
result = await svc.delete_group(
group_id=group.id,
user_id=UserId(value="test-user"),
)

# Assert: group removed from database
assert result is True, "service.delete_group() must return True on success"
retrieved = await group_repo.get_by_id(group.id)
assert retrieved is None, (
"Group must be deleted from the DB after successful delete"
)
Loading