Skip to content
Merged
3 changes: 2 additions & 1 deletion pyrightconfig.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"include": [
"src/scc_cli/services"
"src",
"tests"
],
"exclude": [
"**/__pycache__",
Expand Down
218 changes: 218 additions & 0 deletions src/scc_cli/adapters/config_normalizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
"""Config normalization - converts raw dicts to typed config models.

Parse and validate configuration at load edges, then pass normalized
models inward to the application layer. This reduces stringly-typed
access and schema drift risk.
"""

from __future__ import annotations

from typing import Any

from scc_cli.ports.config_models import (
DefaultsConfig,
DelegationConfig,
MarketplaceConfig,
MCPServerConfig,
NormalizedOrgConfig,
NormalizedProjectConfig,
NormalizedTeamConfig,
NormalizedUserConfig,
OrganizationInfo,
OrganizationSource,
ProjectsDelegation,
SecurityConfig,
SessionSettings,
TeamDelegation,
TeamsDelegation,
)


def normalize_user_config(raw: dict[str, Any]) -> NormalizedUserConfig:
"""Normalize a raw user config dict to typed model.

Args:
raw: Raw user config dict from JSON.

Returns:
NormalizedUserConfig with typed fields.
"""
org_source = None
raw_source = raw.get("organization_source")
if raw_source and isinstance(raw_source, dict):
org_source = OrganizationSource(
url=raw_source.get("url", ""),
auth=raw_source.get("auth"),
auth_header=raw_source.get("auth_header"),
)

workspace_map = raw.get("workspace_team_map", {})
if not isinstance(workspace_map, dict):
workspace_map = {}

return NormalizedUserConfig(
selected_profile=raw.get("selected_profile"),
standalone=bool(raw.get("standalone", False)),
organization_source=org_source,
workspace_team_map=workspace_map,
onboarding_seen=bool(raw.get("onboarding_seen", False)),
)


def _normalize_session_settings(raw: dict[str, Any] | None) -> SessionSettings:
"""Normalize session settings from raw dict."""
if not raw:
return SessionSettings()
return SessionSettings(
timeout_hours=raw.get("timeout_hours"),
auto_resume=bool(raw.get("auto_resume", False)),
)


def _normalize_mcp_server(raw: dict[str, Any]) -> MCPServerConfig:
"""Normalize a single MCP server config."""
return MCPServerConfig(
name=raw.get("name", ""),
type=raw.get("type", "sse"),
url=raw.get("url"),
command=raw.get("command"),
args=list(raw.get("args", [])),
env=dict(raw.get("env", {})),
headers=dict(raw.get("headers", {})),
)


def _normalize_team_config(name: str, raw: dict[str, Any]) -> NormalizedTeamConfig:
"""Normalize a single team/profile config."""
mcp_servers = tuple(_normalize_mcp_server(s) for s in raw.get("additional_mcp_servers", []))

delegation_raw = raw.get("delegation", {})
delegation = TeamDelegation(
allow_project_overrides=bool(delegation_raw.get("allow_project_overrides", False)),
)

return NormalizedTeamConfig(
name=name,
description=raw.get("description", ""),
plugin=raw.get("plugin"),
marketplace=raw.get("marketplace"),
additional_plugins=tuple(raw.get("additional_plugins", [])),
additional_mcp_servers=mcp_servers,
session=_normalize_session_settings(raw.get("session")),
delegation=delegation,
)


def _normalize_security(raw: dict[str, Any] | None) -> SecurityConfig:
"""Normalize security config."""
if not raw:
return SecurityConfig()
return SecurityConfig(
blocked_plugins=tuple(raw.get("blocked_plugins", [])),
blocked_mcp_servers=tuple(raw.get("blocked_mcp_servers", [])),
allow_stdio_mcp=bool(raw.get("allow_stdio_mcp", False)),
allowed_stdio_prefixes=tuple(raw.get("allowed_stdio_prefixes", [])),
)


def _normalize_defaults(raw: dict[str, Any] | None) -> DefaultsConfig:
"""Normalize defaults config."""
if not raw:
return DefaultsConfig()

allowed_plugins = raw.get("allowed_plugins")
allowed_mcp = raw.get("allowed_mcp_servers")

return DefaultsConfig(
enabled_plugins=tuple(raw.get("enabled_plugins", [])),
disabled_plugins=tuple(raw.get("disabled_plugins", [])),
allowed_plugins=tuple(allowed_plugins) if allowed_plugins is not None else None,
allowed_mcp_servers=tuple(allowed_mcp) if allowed_mcp is not None else None,
network_policy=raw.get("network_policy"),
session=_normalize_session_settings(raw.get("session")),
)


def _normalize_delegation(raw: dict[str, Any] | None) -> DelegationConfig:
"""Normalize delegation config."""
if not raw:
return DelegationConfig()

teams_raw = raw.get("teams", {})
projects_raw = raw.get("projects", {})

return DelegationConfig(
teams=TeamsDelegation(
allow_additional_plugins=tuple(teams_raw.get("allow_additional_plugins", [])),
allow_additional_mcp_servers=tuple(teams_raw.get("allow_additional_mcp_servers", [])),
),
projects=ProjectsDelegation(
inherit_team_delegation=bool(projects_raw.get("inherit_team_delegation", False)),
),
)


def _normalize_marketplace(name: str, raw: dict[str, Any]) -> MarketplaceConfig:
"""Normalize a single marketplace config."""
return MarketplaceConfig(
name=name,
source=raw.get("source", ""),
owner=raw.get("owner"),
repo=raw.get("repo"),
branch=raw.get("branch"),
url=raw.get("url"),
host=raw.get("host"),
path=raw.get("path"),
headers=dict(raw.get("headers", {})),
)


def normalize_org_config(raw: dict[str, Any]) -> NormalizedOrgConfig:
"""Normalize a raw organization config dict to typed model.

Args:
raw: Raw org config dict from JSON/cache.

Returns:
NormalizedOrgConfig with typed fields.
"""
org_raw = raw.get("organization", {})
org_info = OrganizationInfo(name=org_raw.get("name", ""))

profiles_raw = raw.get("profiles", {})
profiles = {name: _normalize_team_config(name, config) for name, config in profiles_raw.items()}

marketplaces_raw = raw.get("marketplaces", {})
marketplaces = {
name: _normalize_marketplace(name, config) for name, config in marketplaces_raw.items()
}

return NormalizedOrgConfig(
organization=org_info,
security=_normalize_security(raw.get("security")),
defaults=_normalize_defaults(raw.get("defaults")),
delegation=_normalize_delegation(raw.get("delegation")),
profiles=profiles,
marketplaces=marketplaces,
)


def normalize_project_config(raw: dict[str, Any] | None) -> NormalizedProjectConfig | None:
"""Normalize a raw project config dict to typed model.

Args:
raw: Raw project config dict from .scc.yaml, or None.

Returns:
NormalizedProjectConfig with typed fields, or None if no config.
"""
if raw is None:
return None

mcp_servers = tuple(_normalize_mcp_server(s) for s in raw.get("additional_mcp_servers", []))

return NormalizedProjectConfig(
additional_plugins=tuple(raw.get("additional_plugins", [])),
additional_mcp_servers=mcp_servers,
session=_normalize_session_settings(raw.get("session")),
)
59 changes: 59 additions & 0 deletions src/scc_cli/adapters/local_config_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Local config store adapter - implements ConfigStore using local filesystem."""

from __future__ import annotations

from pathlib import Path

from scc_cli import config as config_module
from scc_cli.adapters.config_normalizer import (
normalize_org_config,
normalize_project_config,
normalize_user_config,
)
from scc_cli.ports.config_models import (
NormalizedOrgConfig,
NormalizedProjectConfig,
NormalizedUserConfig,
)
from scc_cli.ports.config_store import ConfigStore


class LocalConfigStore:
"""Config store implementation using local filesystem.

Wraps the existing config module and normalizes results to typed models.
"""

def load_user_config(self) -> NormalizedUserConfig:
"""Load and normalize user configuration."""
raw = config_module.load_user_config()
return normalize_user_config(raw)

def load_org_config(self) -> NormalizedOrgConfig | None:
"""Load and normalize cached organization configuration."""
raw = config_module.load_cached_org_config()
if raw is None:
return None
return normalize_org_config(raw)

def load_project_config(self, workspace_path: Path) -> NormalizedProjectConfig | None:
"""Load and normalize project configuration from workspace."""
raw = config_module.read_project_config(workspace_path)
return normalize_project_config(raw)

def get_selected_profile(self) -> str | None:
"""Get the currently selected profile/team name."""
return config_module.get_selected_profile()

def is_standalone_mode(self) -> bool:
"""Check if running in standalone (solo) mode."""
return config_module.is_standalone_mode()

def is_organization_configured(self) -> bool:
"""Check if organization source is configured."""
return config_module.is_organization_configured()


def _assert_implements_protocol() -> None:
"""Type check that LocalConfigStore implements ConfigStore."""
_: ConfigStore = LocalConfigStore()
32 changes: 32 additions & 0 deletions src/scc_cli/adapters/local_dependency_installer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Local dependency installer adapter."""

from __future__ import annotations

from pathlib import Path

from scc_cli import deps
from scc_cli.ports.dependency_installer import DependencyInstaller, DependencyInstallResult


class LocalDependencyInstaller(DependencyInstaller):
"""Install dependencies using local package managers."""

def install(self, workspace: Path) -> DependencyInstallResult:
"""Install dependencies for a workspace.

Args:
workspace: Workspace directory to inspect and install dependencies.

Returns:
Result describing whether installation was attempted and succeeded.
"""
package_manager = deps.detect_package_manager(workspace)
if package_manager is None:
return DependencyInstallResult(attempted=False, success=False)

success = deps.install_dependencies(workspace, package_manager, strict=False)
return DependencyInstallResult(
attempted=True,
success=success,
package_manager=package_manager,
)
17 changes: 17 additions & 0 deletions src/scc_cli/adapters/local_doctor_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""Local adapter for running doctor checks."""

from __future__ import annotations

from pathlib import Path

from scc_cli.doctor.core import run_doctor
from scc_cli.doctor.types import DoctorResult
from scc_cli.ports.doctor_runner import DoctorRunner


class LocalDoctorRunner(DoctorRunner):
"""Adapter that executes doctor checks locally."""

def run(self, workspace: str | None = None) -> DoctorResult:
workspace_path = Path(workspace) if workspace else None
return run_doctor(workspace_path)
47 changes: 47 additions & 0 deletions src/scc_cli/adapters/local_git_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from scc_cli.ports.git_client import GitClient
from scc_cli.services.git import branch as git_branch
from scc_cli.services.git import core as git_core
from scc_cli.services.git import worktree as git_worktree


class LocalGitClient(GitClient):
Expand Down Expand Up @@ -35,3 +36,49 @@ def detect_workspace_root(self, start_dir: Path) -> tuple[Path | None, Path]:

def get_current_branch(self, path: Path) -> str | None:
return git_branch.get_current_branch(path)

def has_commits(self, path: Path) -> bool:
return git_core.has_commits(path)

def has_remote(self, path: Path) -> bool:
return git_core.has_remote(path)

def get_default_branch(self, path: Path) -> str:
return git_branch.get_default_branch(path)

def list_worktrees(self, path: Path) -> list[git_worktree.WorktreeInfo]:
return git_worktree.get_worktrees_data(path)

def get_worktree_status(self, path: Path) -> tuple[int, int, int, bool]:
return git_worktree.get_worktree_status(str(path))

def find_worktree_by_query(
self,
path: Path,
query: str,
) -> tuple[git_worktree.WorktreeInfo | None, list[git_worktree.WorktreeInfo]]:
return git_worktree.find_worktree_by_query(path, query)

def find_main_worktree(self, path: Path) -> git_worktree.WorktreeInfo | None:
return git_worktree.find_main_worktree(path)

def list_branches_without_worktrees(self, path: Path) -> list[str]:
return git_branch.list_branches_without_worktrees(path)

def fetch_branch(self, path: Path, branch: str) -> None:
git_worktree.fetch_branch(path, branch)

def add_worktree(
self,
repo_path: Path,
worktree_path: Path,
branch_name: str,
base_branch: str,
) -> None:
git_worktree.add_worktree(repo_path, worktree_path, branch_name, base_branch)

def remove_worktree(self, repo_path: Path, worktree_path: Path, *, force: bool) -> None:
git_worktree.remove_worktree(repo_path, worktree_path, force=force)

def prune_worktrees(self, repo_path: Path) -> None:
git_worktree.prune_worktrees(repo_path)
Loading
Loading