Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ conviso --help
## Usage (examples)
- Projects: `python -m conviso.app projects list --company-id 443 --all`
- Projects (assignee filter): `python -m conviso.app projects list --company-id 443 --filter assignee=analyst@company.com --all`
- Project status update: `python -m conviso.app projects status ANALYSIS --id 12345`
- User access profile update: `python -m conviso.app accesscontrol user-profile --company-id 443 --user-id 123 --profile-id 7`
- User teams update: `python -m conviso.app accesscontrol user-teams --company-id 443 --user-id 123 --team-ids 10,11`
- Project requirements + activities: `python -m conviso.app projects requirements --project-id 12345`
- Assets: `python -m conviso.app assets list --company-id 443 --tags cloud --attack-surface INTERNET_FACING --all`
- Requirements: `python -m conviso.app requirements create --company-id 443 --label "Req" --description "Desc" --activity "Login|Check login|REF-123"`
Expand Down Expand Up @@ -193,6 +196,37 @@ Automatic normalizations:
- `python -m conviso.app projects requirements --project-id 12345 --requirement-id 123 --status NOT_ACCORDING`
- `python -m conviso.app projects requirements --project-id 12345 --status DONE --history-attachments`

## Project status
- Command: `python -m conviso.app projects status <STATUS> --id <PROJECT_ID>`
- Purpose: update only the project status without using the generic `projects update` command.
- Allowed values: `PLANNED`, `ANALYSIS`, `PAUSED`, `DONE`, `DISCONTINUED`
- Behavior: validates the informed status and normalizes it to uppercase before sending it to `updateProjectStatus`.
- Example:
- `python -m conviso.app projects status ANALYSIS --id 12345`

## Access control
- Command: `python -m conviso.app accesscontrol user-profile --company-id <COMPANY_ID> --user-id <USER_ID> --profile-id <PROFILE_ID>`
- Purpose: change the access profile associated with a user using `updatePortalUserAccess`.
- Safety: asks for confirmation before applying; use `--force` to skip the prompt.
- Required fields:
- `--company-id <id>`
- `--user-id <id>`
- `--profile-id <id>`
- Examples:
- `python -m conviso.app accesscontrol user-profile --company-id 443 --user-id 123 --profile-id 7`
- `python -m conviso.app accesscontrol user-profile --company-id 443 --user-id 123 --profile-id 7 --force`
- Command: `python -m conviso.app accesscontrol user-teams --company-id <COMPANY_ID> --user-id <USER_ID> [--team-ids <ID1,ID2,...>|--clear]`
- Purpose: change the teams associated with a user while preserving the current access profile.
- Safety: asks for confirmation before applying; use `--force` to skip the prompt.
- Required fields:
- `--company-id <id>`
- `--user-id <id>`
- `--team-ids <id1,id2,...>` or `--clear`
- Examples:
- `python -m conviso.app accesscontrol user-teams --company-id 443 --user-id 123 --team-ids 10,11`
- `python -m conviso.app accesscontrol user-teams --company-id 443 --user-id 123 --clear`
- `python -m conviso.app accesscontrol user-teams --company-id 443 --user-id 123 --team-ids 10,11 --force`

## SBOM
- List: `python -m conviso.app sbom list --company-id 443 --name log4j --all --format csv --output sbom.csv`
- Filters: `--name`, `--vulnerable-only`, `--asset-ids`, `--tags`, `--sort-by`, `--order`, pagination (`--page/--per-page/--all`).
Expand Down
2 changes: 2 additions & 0 deletions src/conviso/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from conviso.commands import bulk
from conviso.commands import sbom
from conviso.commands import tasks
from conviso.commands import accesscontrol
from conviso.core.logger import log, set_verbosity
from conviso.core.concurrency import set_default_workers
from conviso.core.output_prefs import set_output_preferences
Expand All @@ -24,6 +25,7 @@
app.add_typer(bulk.app, name="bulk", help="Bulk operations via CSV.")
app.add_typer(sbom.app, name="sbom", help="List/import SBOM components.")
app.add_typer(tasks.app, name="tasks", help="Execute YAML tasks from requirements.")
app.add_typer(accesscontrol.app, name="accesscontrol", help="Manage access control and user profile settings.")

@app.callback(invoke_without_command=True)
def main(
Expand Down
169 changes: 169 additions & 0 deletions src/conviso/commands/accesscontrol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
"""
Access control command module.
"""

from __future__ import annotations

import typer

from conviso.clients.client_graphql import graphql_request
from conviso.core.notifier import error, info, success

app = typer.Typer(help="Manage user access control settings.")


def _fetch_user_profile_id(company_id: str, user_id: str) -> str:
query = """
query PortalUserProfile($companyId: ID!, $id: ID) {
portalUserProfile(companyId: $companyId, id: $id) {
id
profile(companyId: $companyId) {
id
}
}
}
"""
data = graphql_request(query, {"companyId": company_id, "id": user_id})
portal_user = data.get("portalUserProfile") or {}
profile = portal_user.get("profile") or {}
profile_id = profile.get("id")
if not portal_user:
raise RuntimeError(f"User {user_id} not found in company {company_id}")
if not profile_id:
raise RuntimeError(f"User {user_id} has no access profile in company {company_id}")
return str(profile_id)


def _parse_team_ids(team_ids: str) -> list[str]:
parsed = [item.strip() for item in str(team_ids).split(",") if item.strip()]
if not parsed:
raise ValueError("Provide at least one team ID in --team-ids, or use --clear.")
return parsed


def _confirm_or_abort(message: str, force: bool) -> None:
if force:
return
if not typer.confirm(message, default=False):
info("Aborted.")
raise typer.Exit()


@app.command("user-profile")
def update_user_profile(
company_id: str = typer.Option(..., "--company-id", "-c", help="Company ID."),
user_id: str = typer.Option(..., "--user-id", "-u", help="Portal user ID."),
profile_id: str = typer.Option(..., "--profile-id", "-p", help="Access profile ID."),
force: bool = typer.Option(False, "--force", help="Skip confirmation prompt."),
):
"""Change the access profile associated with a user."""
_confirm_or_abort(
f"Change access profile for user {user_id} in company {company_id} to profile {profile_id}?",
force,
)
info(f"Updating access profile for user {user_id} in company {company_id}...")

mutation = """
mutation UpdateUserAccess($input: UpdatePortalUserAccessInput!) {
updatePortalUserAccess(input: $input) {
portalUserAccess {
portalUser {
id
}
}
}
}
"""

variables = {
"input": {
"portalUserId": user_id,
"accessProfileId": profile_id,
"companyId": company_id,
}
}

try:
data = graphql_request(mutation, variables)
portal_user = ((data.get("updatePortalUserAccess") or {}).get("portalUserAccess") or {}).get("portalUser") or {}
updated_user_id = portal_user.get("id") or user_id
success(f"User {updated_user_id} access profile updated successfully to profile {profile_id}")
except Exception as exc:
error(f"Error updating user access profile: {exc}")
raise typer.Exit(code=1)


@app.command("user-teams")
def update_user_teams(
company_id: str = typer.Option(..., "--company-id", "-c", help="Company ID."),
user_id: str = typer.Option(..., "--user-id", "-u", help="Portal user ID."),
team_ids: str = typer.Option(
None,
"--team-ids",
"-t",
help="Comma-separated team IDs to associate with the user.",
),
clear: bool = typer.Option(False, "--clear", help="Remove all teams from the user."),
force: bool = typer.Option(False, "--force", help="Skip confirmation prompt."),
):
"""Change the teams associated with a user."""
if clear and team_ids:
error("Use either --team-ids or --clear, not both.")
raise typer.Exit(code=1)
if not clear and not team_ids:
error("Provide --team-ids or use --clear.")
raise typer.Exit(code=1)

try:
current_profile_id = _fetch_user_profile_id(company_id, user_id)
parsed_team_ids = [] if clear else _parse_team_ids(team_ids)
except ValueError as exc:
error(str(exc))
raise typer.Exit(code=1)
except Exception as exc:
error(f"Error loading current user access data: {exc}")
raise typer.Exit(code=1)

if clear:
confirm_message = f"Remove all teams from user {user_id} in company {company_id}?"
else:
confirm_message = (
f"Update teams for user {user_id} in company {company_id} "
f"to: {', '.join(parsed_team_ids)}?"
)
_confirm_or_abort(confirm_message, force)

info(f"Updating teams for user {user_id} in company {company_id}...")

mutation = """
mutation UpdateUserAccess($input: UpdatePortalUserAccessInput!) {
updatePortalUserAccess(input: $input) {
portalUserAccess {
portalUser {
id
}
}
}
}
"""

variables = {
"input": {
"portalUserId": user_id,
"accessProfileId": current_profile_id,
"companyId": company_id,
"teamsIds": parsed_team_ids,
}
}

try:
data = graphql_request(mutation, variables)
portal_user = ((data.get("updatePortalUserAccess") or {}).get("portalUserAccess") or {}).get("portalUser") or {}
updated_user_id = portal_user.get("id") or user_id
if clear:
success(f"All teams removed successfully from user {updated_user_id}")
else:
success(f"Teams updated successfully for user {updated_user_id}: {', '.join(parsed_team_ids)}")
except Exception as exc:
error(f"Error updating user teams: {exc}")
raise typer.Exit(code=1)
52 changes: 52 additions & 0 deletions src/conviso/commands/projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from conviso.core.output_manager import export_data

app = typer.Typer(help="Manage projects via Conviso GraphQL API.")
PROJECT_STATUS_ALLOWED = {"PLANNED", "ANALYSIS", "PAUSED", "DONE", "DISCONTINUED"}

# ---------------------- LIST COMMAND ---------------------- #
@app.command("list")
Expand Down Expand Up @@ -759,6 +760,57 @@ def _unique_preserve(seq):
raise typer.Exit(code=1)


# ---------------------- STATUS COMMAND ---------------------- #
@app.command("status")
def update_project_status(
status: str = typer.Argument(
...,
metavar="STATUS",
help="New project status. Allowed: PLANNED, ANALYSIS, PAUSED, DONE, DISCONTINUED.",
),
project_id: int = typer.Option(..., "--id", "-i", help="Project ID to update."),
):
"""Update only the status of an existing project."""
try:
normalized_status = validate_choice(status, PROJECT_STATUS_ALLOWED, "status")
except ValueError as exc:
error(str(exc))
raise typer.Exit(code=1)

info(f"Updating status for project ID {project_id} to {normalized_status}...")

mutation = """
mutation UpdateProjectStatus($input: UpdateProjectStatusInput!) {
updateProjectStatus(input: $input) {
errors
project {
id
label
status
}
}
}
"""

try:
data = graphql_request(
mutation,
{"input": {"id": project_id, "projectStatus": normalized_status}},
)
result = data["updateProjectStatus"]
errors = result.get("errors") or []
if errors:
raise RuntimeError("; ".join(str(err) for err in errors))
project = result["project"]
success(
f"Project status updated successfully: ID {project['id']} - "
f"{project['label']} -> {project.get('status') or normalized_status}"
)
except Exception as e:
error(f"Error updating project status: {e}")
raise typer.Exit(code=1)


# ---------------------- DELETE COMMAND ---------------------- #
@app.command("delete")
def delete_projects(
Expand Down
Loading