Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ conviso --help
- Project status update: `python -m conviso.app projects status ANALYSIS --id 12345`
- User access profile update: `python -m conviso.app accesscontrol user-profile --company-id 443 --user-id 123 --profile-id 7`
- User teams update: `python -m conviso.app accesscontrol user-teams --company-id 443 --user-id 123 --team-ids 10,11`
- Bulk user access update: `python -m conviso.app accesscontrol bulk-users --file users.csv`
- Project requirements + activities: `python -m conviso.app projects requirements --project-id 12345`
- Assets: `python -m conviso.app assets list --company-id 443 --tags cloud --attack-surface INTERNET_FACING --all`
- Requirements: `python -m conviso.app requirements create --company-id 443 --label "Req" --description "Desc" --activity "Login|Check login|REF-123"`
Expand Down Expand Up @@ -228,6 +229,30 @@ Automatic normalizations:
- `python -m conviso.app accesscontrol user-teams --company-id 443 --user-id 123 --team-ids 10,11`
- `python -m conviso.app accesscontrol user-teams --company-id 443 --user-id 123 --clear`
- `python -m conviso.app accesscontrol user-teams --company-id 443 --user-id 123 --team-ids 10,11 --force`
- Command: `python -m conviso.app accesscontrol bulk-users --file <CSV_FILE>`
- Purpose: bulk update user access profiles and teams across different companies using CSV.
- Safety: always runs dry-run first; use `--preview-only` to stop after preview or `--force` to skip the apply confirmation.
- CSV columns:
- `company_id` required
- `user_id` required
- `profile_id` optional if only changing teams; current profile is preserved automatically
- `team_ids` optional comma-separated team IDs
- `clear_teams` optional `true|false`
- Rules:
- each row must change at least one thing: `profile_id`, `team_ids`, or `clear_teams=true`
- do not use `team_ids` and `clear_teams=true` together on the same row
- Examples:
- `python -m conviso.app accesscontrol bulk-users --file users.csv`
- `python -m conviso.app accesscontrol bulk-users --file users.csv --preview-only`
- `python -m conviso.app accesscontrol bulk-users --file users.csv --force`

Example CSV:
```csv
company_id,user_id,profile_id,team_ids,clear_teams
443,123,7,"10,11",false
444,456,9,,false
445,789,,,true
```

## SBOM
- List: `python -m conviso.app sbom list --company-id 443 --name log4j --all --format csv --output sbom.csv`
Expand Down
136 changes: 136 additions & 0 deletions src/conviso/commands/accesscontrol.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

from __future__ import annotations

import os
import typer

from conviso.clients.client_graphql import graphql_request
from conviso.core.bulk_loader import BulkResult, load_csv
from conviso.core.notifier import error, info, success

app = typer.Typer(help="Manage user access control settings.")
Expand Down Expand Up @@ -41,6 +43,10 @@ def _parse_team_ids(team_ids: str) -> list[str]:
return parsed


def _parse_bool(value: str | None) -> bool:
return str(value or "").strip().lower() in {"1", "true", "yes", "y"}


def _confirm_or_abort(message: str, force: bool) -> None:
if force:
return
Expand All @@ -49,6 +55,67 @@ def _confirm_or_abort(message: str, force: bool) -> None:
raise typer.Exit()


def _show_bulk_users_template() -> None:
typer.echo("Columns for accesscontrol bulk-users CSV:")
typer.echo(" company_id,user_id,profile_id,team_ids,clear_teams")
typer.echo("")
typer.echo("Rules:")
typer.echo(" - company_id: required")
typer.echo(" - user_id: required")
typer.echo(" - profile_id: optional if only changing teams; current profile will be preserved")
typer.echo(" - team_ids: optional comma-separated team IDs")
typer.echo(" - clear_teams: optional true/false; when true, removes all teams")
typer.echo(" - each row must change at least one thing: profile_id, team_ids, or clear_teams=true")
typer.echo("")
typer.echo("Example:")
typer.echo("company_id,user_id,profile_id,team_ids,clear_teams")
typer.echo('443,123,7,"10,11",false')
typer.echo("444,456,9,,false")
typer.echo("445,789,, ,true")


def _prepare_bulk_user_input(row: dict[str, str], rownum: int) -> tuple[dict[str, str | list[str]], str]:
company_id = (row.get("company_id") or "").strip()
user_id = (row.get("user_id") or "").strip()
profile_id = (row.get("profile_id") or "").strip()
team_ids_raw = row.get("team_ids")
clear_teams = _parse_bool(row.get("clear_teams"))

if not company_id:
raise ValueError("Missing company_id")
if not user_id:
raise ValueError("Missing user_id")
if clear_teams and (team_ids_raw or "").strip():
raise ValueError("Use team_ids or clear_teams=true, not both")

has_team_update = clear_teams or bool((team_ids_raw or "").strip())
if not profile_id and not has_team_update:
raise ValueError("Row must include profile_id and/or team_ids/clear_teams")

effective_profile_id = profile_id or _fetch_user_profile_id(company_id, user_id)
input_data: dict[str, str | list[str]] = {
"portalUserId": user_id,
"accessProfileId": effective_profile_id,
"companyId": company_id,
}

action_parts = []
if profile_id:
action_parts.append(f"profile={effective_profile_id}")
else:
action_parts.append(f"profile={effective_profile_id} (preserved)")

if clear_teams:
input_data["teamsIds"] = []
action_parts.append("teams=clear")
elif (team_ids_raw or "").strip():
parsed_team_ids = _parse_team_ids(team_ids_raw)
input_data["teamsIds"] = parsed_team_ids
action_parts.append(f"teams={','.join(parsed_team_ids)}")

return input_data, f"company={company_id} user={user_id} " + " ".join(action_parts)


@app.command("user-profile")
def update_user_profile(
company_id: str = typer.Option(..., "--company-id", "-c", help="Company ID."),
Expand Down Expand Up @@ -167,3 +234,72 @@ def update_user_teams(
except Exception as exc:
error(f"Error updating user teams: {exc}")
raise typer.Exit(code=1)


@app.command("bulk-users")
def bulk_users(
file: str = typer.Option(None, "--file", "-f", help="Path to CSV file."),
force: bool = typer.Option(False, "--force", help="Apply changes after dry-run without confirmation."),
preview_only: bool = typer.Option(False, "--preview-only", help="Run dry-run only and exit without applying."),
show_template: bool = typer.Option(False, "--show-template", help="Display expected CSV columns and examples, then exit."),
):
"""Bulk update user access profiles and teams via CSV."""
if show_template:
_show_bulk_users_template()
raise typer.Exit()
if not file:
error("Missing required option --file. For column layout, run --show-template.")
raise typer.Exit(code=1)
if not os.path.isfile(file):
error(f"File not found: {file}")
raise typer.Exit(code=1)

rows = load_csv(file)
if not rows:
error("No rows found in CSV.")
raise typer.Exit(code=1)

info(f"Loaded {len(rows)} row(s) from {file}.")
info("Running dry-run (no changes will be applied)...")
preview = BulkResult()
prepared_rows: list[tuple[int, dict[str, str | list[str]], str]] = []
for idx, row in enumerate(rows, start=2):
try:
input_data, description = _prepare_bulk_user_input(row, idx)
typer.echo(f"ℹ️ [dry-run] Row {idx}: {description}")
prepared_rows.append((idx, input_data, description))
preview.add_success(idx, "dry-run")
except Exception as exc:
preview.add_error(idx, str(exc))
preview.report()

if preview_only:
info("Preview-only mode: no changes applied.")
raise typer.Exit()
if preview.errors:
error("Dry-run found errors. Fix the CSV before applying changes.")
raise typer.Exit(code=1)

_confirm_or_abort("Apply bulk user access changes now (run without dry-run)?", force)

mutation = """
mutation UpdateUserAccess($input: UpdatePortalUserAccessInput!) {
updatePortalUserAccess(input: $input) {
portalUserAccess {
portalUser {
id
}
}
}
}
"""

info("Applying changes...")
result = BulkResult()
for idx, input_data, _ in prepared_rows:
try:
graphql_request(mutation, {"input": input_data})
result.add_success(idx, "ok")
except Exception as exc:
result.add_error(idx, str(exc))
result.report()
Loading