diff --git a/README.md b/README.md index ad8085b..3db14f0 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,7 @@ conviso --help - Project status update: `python -m conviso.app projects status ANALYSIS --id 12345` - User access profile update: `python -m conviso.app accesscontrol user-profile --company-id 443 --user-id 123 --profile-id 7` - User teams update: `python -m conviso.app accesscontrol user-teams --company-id 443 --user-id 123 --team-ids 10,11` +- Bulk user access update: `python -m conviso.app accesscontrol bulk-users --file users.csv` - Project requirements + activities: `python -m conviso.app projects requirements --project-id 12345` - Assets: `python -m conviso.app assets list --company-id 443 --tags cloud --attack-surface INTERNET_FACING --all` - Requirements: `python -m conviso.app requirements create --company-id 443 --label "Req" --description "Desc" --activity "Login|Check login|REF-123"` @@ -228,6 +229,30 @@ Automatic normalizations: - `python -m conviso.app accesscontrol user-teams --company-id 443 --user-id 123 --team-ids 10,11` - `python -m conviso.app accesscontrol user-teams --company-id 443 --user-id 123 --clear` - `python -m conviso.app accesscontrol user-teams --company-id 443 --user-id 123 --team-ids 10,11 --force` +- Command: `python -m conviso.app accesscontrol bulk-users --file ` +- Purpose: bulk update user access profiles and teams across different companies using CSV. +- Safety: always runs dry-run first; use `--preview-only` to stop after preview or `--force` to skip the apply confirmation. +- CSV columns: + - `company_id` required + - `user_id` required + - `profile_id` optional if only changing teams; current profile is preserved automatically + - `team_ids` optional comma-separated team IDs + - `clear_teams` optional `true|false` +- Rules: + - each row must change at least one thing: `profile_id`, `team_ids`, or `clear_teams=true` + - do not use `team_ids` and `clear_teams=true` together on the same row +- Examples: + - `python -m conviso.app accesscontrol bulk-users --file users.csv` + - `python -m conviso.app accesscontrol bulk-users --file users.csv --preview-only` + - `python -m conviso.app accesscontrol bulk-users --file users.csv --force` + +Example CSV: +```csv +company_id,user_id,profile_id,team_ids,clear_teams +443,123,7,"10,11",false +444,456,9,,false +445,789,,,true +``` ## SBOM - List: `python -m conviso.app sbom list --company-id 443 --name log4j --all --format csv --output sbom.csv` diff --git a/src/conviso/commands/accesscontrol.py b/src/conviso/commands/accesscontrol.py index 828a245..1bed8b2 100644 --- a/src/conviso/commands/accesscontrol.py +++ b/src/conviso/commands/accesscontrol.py @@ -4,9 +4,11 @@ from __future__ import annotations +import os import typer from conviso.clients.client_graphql import graphql_request +from conviso.core.bulk_loader import BulkResult, load_csv from conviso.core.notifier import error, info, success app = typer.Typer(help="Manage user access control settings.") @@ -41,6 +43,10 @@ def _parse_team_ids(team_ids: str) -> list[str]: return parsed +def _parse_bool(value: str | None) -> bool: + return str(value or "").strip().lower() in {"1", "true", "yes", "y"} + + def _confirm_or_abort(message: str, force: bool) -> None: if force: return @@ -49,6 +55,67 @@ def _confirm_or_abort(message: str, force: bool) -> None: raise typer.Exit() +def _show_bulk_users_template() -> None: + typer.echo("Columns for accesscontrol bulk-users CSV:") + typer.echo(" company_id,user_id,profile_id,team_ids,clear_teams") + typer.echo("") + typer.echo("Rules:") + typer.echo(" - company_id: required") + typer.echo(" - user_id: required") + typer.echo(" - profile_id: optional if only changing teams; current profile will be preserved") + typer.echo(" - team_ids: optional comma-separated team IDs") + typer.echo(" - clear_teams: optional true/false; when true, removes all teams") + typer.echo(" - each row must change at least one thing: profile_id, team_ids, or clear_teams=true") + typer.echo("") + typer.echo("Example:") + typer.echo("company_id,user_id,profile_id,team_ids,clear_teams") + typer.echo('443,123,7,"10,11",false') + typer.echo("444,456,9,,false") + typer.echo("445,789,, ,true") + + +def _prepare_bulk_user_input(row: dict[str, str], rownum: int) -> tuple[dict[str, str | list[str]], str]: + company_id = (row.get("company_id") or "").strip() + user_id = (row.get("user_id") or "").strip() + profile_id = (row.get("profile_id") or "").strip() + team_ids_raw = row.get("team_ids") + clear_teams = _parse_bool(row.get("clear_teams")) + + if not company_id: + raise ValueError("Missing company_id") + if not user_id: + raise ValueError("Missing user_id") + if clear_teams and (team_ids_raw or "").strip(): + raise ValueError("Use team_ids or clear_teams=true, not both") + + has_team_update = clear_teams or bool((team_ids_raw or "").strip()) + if not profile_id and not has_team_update: + raise ValueError("Row must include profile_id and/or team_ids/clear_teams") + + effective_profile_id = profile_id or _fetch_user_profile_id(company_id, user_id) + input_data: dict[str, str | list[str]] = { + "portalUserId": user_id, + "accessProfileId": effective_profile_id, + "companyId": company_id, + } + + action_parts = [] + if profile_id: + action_parts.append(f"profile={effective_profile_id}") + else: + action_parts.append(f"profile={effective_profile_id} (preserved)") + + if clear_teams: + input_data["teamsIds"] = [] + action_parts.append("teams=clear") + elif (team_ids_raw or "").strip(): + parsed_team_ids = _parse_team_ids(team_ids_raw) + input_data["teamsIds"] = parsed_team_ids + action_parts.append(f"teams={','.join(parsed_team_ids)}") + + return input_data, f"company={company_id} user={user_id} " + " ".join(action_parts) + + @app.command("user-profile") def update_user_profile( company_id: str = typer.Option(..., "--company-id", "-c", help="Company ID."), @@ -167,3 +234,72 @@ def update_user_teams( except Exception as exc: error(f"Error updating user teams: {exc}") raise typer.Exit(code=1) + + +@app.command("bulk-users") +def bulk_users( + file: str = typer.Option(None, "--file", "-f", help="Path to CSV file."), + force: bool = typer.Option(False, "--force", help="Apply changes after dry-run without confirmation."), + preview_only: bool = typer.Option(False, "--preview-only", help="Run dry-run only and exit without applying."), + show_template: bool = typer.Option(False, "--show-template", help="Display expected CSV columns and examples, then exit."), +): + """Bulk update user access profiles and teams via CSV.""" + if show_template: + _show_bulk_users_template() + raise typer.Exit() + if not file: + error("Missing required option --file. For column layout, run --show-template.") + raise typer.Exit(code=1) + if not os.path.isfile(file): + error(f"File not found: {file}") + raise typer.Exit(code=1) + + rows = load_csv(file) + if not rows: + error("No rows found in CSV.") + raise typer.Exit(code=1) + + info(f"Loaded {len(rows)} row(s) from {file}.") + info("Running dry-run (no changes will be applied)...") + preview = BulkResult() + prepared_rows: list[tuple[int, dict[str, str | list[str]], str]] = [] + for idx, row in enumerate(rows, start=2): + try: + input_data, description = _prepare_bulk_user_input(row, idx) + typer.echo(f"ℹ️ [dry-run] Row {idx}: {description}") + prepared_rows.append((idx, input_data, description)) + preview.add_success(idx, "dry-run") + except Exception as exc: + preview.add_error(idx, str(exc)) + preview.report() + + if preview_only: + info("Preview-only mode: no changes applied.") + raise typer.Exit() + if preview.errors: + error("Dry-run found errors. Fix the CSV before applying changes.") + raise typer.Exit(code=1) + + _confirm_or_abort("Apply bulk user access changes now (run without dry-run)?", force) + + mutation = """ + mutation UpdateUserAccess($input: UpdatePortalUserAccessInput!) { + updatePortalUserAccess(input: $input) { + portalUserAccess { + portalUser { + id + } + } + } + } + """ + + info("Applying changes...") + result = BulkResult() + for idx, input_data, _ in prepared_rows: + try: + graphql_request(mutation, {"input": input_data}) + result.add_success(idx, "ok") + except Exception as exc: + result.add_error(idx, str(exc)) + result.report()