From f8b596792cb24821fc71683ca0e3aefcc950cd23 Mon Sep 17 00:00:00 2001 From: davidt99 Date: Sun, 22 Feb 2026 10:27:19 +0200 Subject: [PATCH] feat: add connect new source command --- CHANGES | 2 +- README.md | 54 +-- intezer_analyze_cli/cli.py | 177 +++++++- intezer_analyze_cli/commands.py | 1 + intezer_analyze_cli/connector_commands.py | 181 ++++++++ intezer_analyze_cli/key_store.py | 16 +- intezer_analyze_cli/utilities.py | 2 +- requirements-prod.txt | 3 +- setup.py | 4 +- tests/unit/cli_test.py | 240 ++++++++++ tests/unit/commands_test.py | 513 +++++++++++++++++++++- 11 files changed, 1131 insertions(+), 62 deletions(-) create mode 100644 intezer_analyze_cli/connector_commands.py diff --git a/CHANGES b/CHANGES index 1ebbaea..acde191 100644 --- a/CHANGES +++ b/CHANGES @@ -42,7 +42,7 @@ 1.10.1 ----- -- Add command for sending phishing emails from a directory to Intezer Analyze +- Add command for sending phishing emails from a directory to Intezer Platform - Upgrade intezer-SDK to 1.19.9 1.9.3 diff --git a/README.md b/README.md index e4e29ee..2619b2b 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ -# intezer-analyze +# intezer-cli -A cross-platform CLI tool which enables analyzing files with Intezer Analyze. +A cross-platform CLI tool which enables analyzing files with Intezer AI-SOC Platform. # Prerequisites Python 3.10 and above @@ -18,18 +18,18 @@ The CLI supports proxies. To use a proxy, set the environment variable `HTTP_PRO ## Login To begin using the cli, first you should login with your API key: -`intezer-analyze login ` +`intezer-cli login ` If you are running the CLI against an on premise deployment, enter the url: -`intezer-analyze login http://
/api` +`intezer-cli login http://
/api` ## Analyze -Send a file or a directory for analysis in Intezer Analyze. +Send a file or a directory for analysis in Intezer Platform. ### Usage -`intezer-analyze analyze PATH` +`intezer-cli analyze PATH` ### Parameters PATH: Path to file or directory to send the files inside for analysis. @@ -37,19 +37,19 @@ PATH: Path to file or directory to send the files inside for analysis. ### Examples: Send a single file for analysis: - $ intezer-analyze analyze C:\threat.exe + $ intezer-cli analyze C:\threat.exe Send all files in directory for analysis: - $ intezer-analyze analyze C:\files-to-analyze + $ intezer-cli analyze C:\files-to-analyze -For complete documentation please run `intezer-analyze analyze --help` +For complete documentation please run `intezer-cli analyze --help` ## Analyze hashes file Send a text file with list of hashes ### Usage -`intezer-analyze analyze-by-list PATH` +`intezer-cli analyze-by-list PATH` ### Parameters PATH: Path to txt file. @@ -57,15 +57,15 @@ PATH: Path to txt file. ### Example Send txt file with hashes for analysis: - $ intezer-analyze analyze-by-list ~/files/hashes.txt + $ intezer-cli analyze-by-list ~/files/hashes.txt -For complete documentation please run `intezer-analyze analyze-by-list --help` +For complete documentation please run `intezer-cli analyze-by-list --help` ## Index Send a file or a directory for indexing ### Usage -`intezer-analyze index PATH INDEX_AS [FAMILY_NAME]` +`intezer-cli index PATH INDEX_AS [FAMILY_NAME]` ### Parameters PATH: Path to file or directory to index @@ -77,19 +77,19 @@ FAMILY_NAME: The family name (optional) ### Example index a single file: - $ intezer-analyze index ~/files/threat.exe.sample malicious family_name + $ intezer-cli index ~/files/threat.exe.sample malicious family_name index all files in directory: - $ intezer-analyze index ~/files/files-to-index trusted + $ intezer-cli index ~/files/files-to-index trusted -For complete documentation please run `intezer-analyze index --help` +For complete documentation please run `intezer-cli index --help` ## Index hashes file Send a text file with list of hashes to index ### Usage -`intezer-analyze index-by-list PATH --index-as=INDEX [FAMILY_NAME]` +`intezer-cli index-by-list PATH --index-as=INDEX [FAMILY_NAME]` ### Parameters PATH: Path to txt file @@ -101,15 +101,15 @@ FAMILY_NAME: The family name (optional) ### Example Send a file with hashes and verdict for indexing: - $ intezer-analyze index-by-list ~/files/hashes.txt --index-as=malicious family_name + $ intezer-cli index-by-list ~/files/hashes.txt --index-as=malicious family_name -For complete documentation please run `intezer-analyze index-by-list --help` +For complete documentation please run `intezer-cli index-by-list --help` ## Upload offline endpoint scan Upload an offline scan created by running the Intezer Endpoint Scanner with '-o' flag ### Usage -`intezer-analyze upload-endpoint-scan OFFLINE_SCAN_DIRECTORY` +`intezer-cli upload-endpoint-scan OFFLINE_SCAN_DIRECTORY` ### Parameters OFFLINE_SCAN_DIRECTORY: Path to directory with offline endpoint scan results @@ -117,15 +117,15 @@ OFFLINE_SCAN_DIRECTORY: Path to directory with offline endpoint scan results ### Examples: Upload a directory with offline endpoint scan results: - $ intezer-analyze upload-endpoint-scan /home/user/offline_scans/scan_MYPC_2019-01-01_00-00-00 + $ intezer-cli upload-endpoint-scan /home/user/offline_scans/scan_MYPC_2019-01-01_00-00-00 -For complete documentation please run `intezer-analyze upload-endpoint-scan --help` +For complete documentation please run `intezer-cli upload-endpoint-scan --help` ## Upload multiple offline endpoint scans Upload multiple offline scans created by running the Intezer Endpoint Scanner with '-o' flag ### Usage -`intezer-analyze upload-endpoint-scans-in-directory OFFLINE_SCANS_ROOT_DIRECTORY` +`intezer-cli upload-endpoint-scans-in-directory OFFLINE_SCANS_ROOT_DIRECTORY` ### Parameters OFFLINE_SCANS_ROOT_DIRECTORY: Path to root directory containing offline endpoint scan results @@ -133,9 +133,9 @@ OFFLINE_SCANS_ROOT_DIRECTORY: Path to root directory containing offline endpoint ### Examples: Upload a directory with offline endpoint scan results: - $ intezer-analyze upload-endpoint-scans-in-directory /home/user/offline_scans + $ intezer-cli upload-endpoint-scans-in-directory /home/user/offline_scans -For complete documentation please run `intezer-analyze upload-endpoint-scans-in-directory --help` +For complete documentation please run `intezer-cli upload-endpoint-scans-in-directory --help` ## Upload all subdirectories with .eml files to analyze Upload a directory with .eml files @@ -144,8 +144,8 @@ Upload a directory with .eml files UPLOAD_EMAILS_IN_DIRECTORY: Path to root directory containing the .eml files ### Examples: - $ intezer-analyze upload-emails-in-directory /path/to/emails_root_directory + $ intezer-cli upload-emails-in-directory /path/to/emails_root_directory # Troubleshooting -The cli produce a log file named `intezer-analyze-cli.log` in the current working directory. +The cli produce a log file named `intezer-cli-cli.log` in the current working directory. To enable console output, set the environment variable `INTEZER_DEBUG=1`. diff --git a/intezer_analyze_cli/cli.py b/intezer_analyze_cli/cli.py index ea331ad..2f9d071 100644 --- a/intezer_analyze_cli/cli.py +++ b/intezer_analyze_cli/cli.py @@ -9,6 +9,7 @@ from intezer_analyze_cli import __version__ from intezer_analyze_cli import commands +from intezer_analyze_cli import connector_commands from intezer_analyze_cli import key_store from intezer_analyze_cli import utilities from intezer_analyze_cli.config import default_config @@ -61,26 +62,26 @@ def create_global_api(): @click.group(cls=AliasedGroup, context_settings=dict(help_option_names=['-h', '--help'], max_content_width=120), - help=f'Intezer Labs Ltd. Intezer Analyze CLI {__version__}') + help=f'Intezer Labs Ltd. Intezer CLI {__version__}') def main_cli(): pass -@main_cli.command('login', short_help='Login to Intezer Analyze') +@main_cli.command('login', short_help='Login to Intezer Platform') @click.argument('api_key', type=click.UUID) @click.argument('api_url', required=False, default=None, type=click.STRING) def login(api_key: str, api_url: str): - """Login to Intezer Analyze to perform analyses. + """Login to Intezer Platform to perform analyses. \b - API_KEY: API key or invite code for Intezer Analyze. + API_KEY: API key or invite code for Intezer Platform. \b - API_URL: Intezer Analyze URL in case you have on premise deployment. + API_URL: Intezer Platform URL in case you have on premise deployment. \b Example: - $ intezer-analyze login edb45d954da54e8e980078001d8921cc + $ intezer-cli login edb45d954da54e8e980078001d8921cc """ try: if api_url: @@ -111,7 +112,7 @@ def analyze(path: str, no_static_extraction: bool, code_item_type: str, ignore_directory_count_limit: bool): - """ Send a file or a directory for analysis in Intezer Analyze. + """ Send a file or a directory for analysis in Intezer Platform. \b PATH: Path to file or directory to send the files inside for analysis. @@ -119,10 +120,10 @@ def analyze(path: str, \b Examples: Send a single file for analysis: - $ intezer-analyze analyze ~/files/threat.exe.sample + $ intezer-cli analyze ~/files/threat.exe.sample \b Send all files in directory for analysis: - $ intezer-analyze analyze ~/files/files-to-analyze + $ intezer-cli analyze ~/files/files-to-analyze """ try: create_global_api() @@ -157,7 +158,7 @@ def analyze(path: str, @main_cli.command('analyze-by-list', short_help='Send a text file with list of hashes') @click.argument('path', type=click.Path(exists=True, dir_okay=False)) def analyze_by_list(path): - """ Send a text file with hashes for analysis in Intezer Analyze. + """ Send a text file with hashes for analysis in Intezer Platform. \b PATH: Path to txt file. @@ -165,7 +166,7 @@ def analyze_by_list(path): \b Examples: Send txt file with hashes for analysis: - $ intezer-analyze analyze-by-list ~/files/hashes.txt + $ intezer-cli analyze-by-list ~/files/hashes.txt """ try: create_global_api() @@ -184,14 +185,14 @@ def analyze_by_list(path): @click.argument('family_name', required=False, type=click.STRING, default=None) def index_by_list(path: str, index_as: str, family_name: str): """ - Send a text file with hashes for indexing in Intezer Analyze. + Send a text file with hashes for indexing in Intezer Platform. \b PATH: Path to a txt file with hashes \b Examples: - $ intezer-analyze index-by-list ~/files/hashes.txt malicious family_name + $ intezer-cli index-by-list ~/files/hashes.txt malicious family_name \b """ try: @@ -227,10 +228,10 @@ def index(path: str, index_as: str, family_name: str, ignore_directory_count_lim \b Examples: index a single file: - $ intezer-analyze index ~/files/threat.exe.sample malicious family_name + $ intezer-cli index ~/files/threat.exe.sample malicious family_name \b index all files in directory: - $ intezer-analyze index ~/files/files-to-index trusted + $ intezer-cli index ~/files/files-to-index trusted """ try: index_type = sdk_consts.IndexType.from_str(index_as) @@ -270,7 +271,7 @@ def upload_endpoint_scan(offline_scan_directory: str, force: bool, max_concurren Examples: upload a directory with offline endpoint scan results: - $ intezer-analyze upload-endpoint-scan /path/to/endpoint_scan_results + $ intezer-cli upload-endpoint-scan /path/to/endpoint_scan_results """ try: create_global_api() @@ -299,7 +300,7 @@ def upload_endpoint_scans_in_directory(offline_scans_root_directory: str, force: Examples: upload a directory with offline endpoint scan results: - $ intezer-analyze upload-endpoint-scans-in-directory /path/to/endpoint_scan_results_root + $ intezer-cli upload-endpoint-scans-in-directory /path/to/endpoint_scan_results_root """ try: create_global_api() @@ -329,7 +330,7 @@ def upload_emails_in_directory(emails_root_directory: str, ignore_directory_coun Examples: upload a directory with .eml files: - $ intezer-analyze upload-emails-in-directory /path/to/emails_root_directory + $ intezer-cli upload-emails-in-directory /path/to/emails_root_directory """ try: create_global_api() @@ -345,10 +346,146 @@ def upload_emails_in_directory(emails_root_directory: str, ignore_directory_coun @main_cli.group('alerts', short_help='Alert management commands') def alerts(): - """Alert management commands for Intezer Analyze.""" + """Alert management commands for Intezer Platform.""" pass +@main_cli.group('alerts-data-sources', cls=AliasedGroup, short_help='Alert data source connector management') +def alerts_data_sources(): + """Manage alert data source connectors for Intezer Platform.""" + pass + + +@alerts_data_sources.command('connect', short_help='Connect a new alert data source') +@click.option('--source', required=True, type=click.STRING, help='Alert source type') +@click.option('--name', required=True, type=click.STRING, help='Connector name (must match ^[a-z0-9-]*$)') +@click.option('--config', 'config_file', required=True, type=click.File('r'), + help='Path to JSON file with source-specific credentials (use - for stdin)') +@click.option('--resolve-false-positive', is_flag=True, default=False, + help='Enable auto-resolve false positives') +@click.option('--noting', is_flag=True, default=False, help='Enable noting') +@click.option('--auto-endpoint-scan', is_flag=True, default=False, help='Enable auto endpoint scan') +@click.option('--wait', is_flag=True, default=False, help='Poll status until terminal state') +def connect_data_source(source: str, name: str, config_file, + resolve_false_positive: bool, noting: bool, + auto_endpoint_scan: bool, wait: bool): + """Connect a new alert data source connector. + + \b + Examples: + $ intezer-cli alerts-data-sources connect --source crowdstrike --name acme-corp --config creds.json + $ intezer-cli alerts-data-sources connect --source crowdstrike --name acme-corp --config - < creds.json + $ intezer-cli alerts-data-sources connect --source crowdstrike --name acme-corp --config creds.json --wait + """ + try: + create_global_api() + connector_commands.connect_alert_data_source_command( + source=source, + name=name, + config_file=config_file, + resolve_false_positive=resolve_false_positive, + noting=noting, + auto_endpoint_scan=auto_endpoint_scan, + wait=wait + ) + except click.Abort: + raise + except click.ClickException: + raise + except Exception: + logger.exception('Unexpected error occurred') + click.echo('Unexpected error occurred, please contact us at support@intezer.com ' + f'and attach the log file in {utilities.log_file_path}', err=True) + + +@alerts_data_sources.command('deactivate', short_help='Deactivate a connector') +@click.argument('connector_id', type=click.STRING) +@click.option('--wait', is_flag=True, default=False, help='Poll status until terminal state') +def deactivate_data_source(connector_id: str, wait: bool): + """Deactivate an alert data source connector. + + \b + CONNECTOR_ID: The connector ID to deactivate. + + \b + Examples: + $ intezer-cli alerts-data-sources deactivate acme-corp + $ intezer-cli alerts-data-sources deactivate acme-corp --wait + """ + try: + create_global_api() + connector_commands.deactivate_alert_data_source_command(connector_id=connector_id, wait=wait) + except click.Abort: + raise + except click.ClickException: + raise + except Exception: + logger.exception('Unexpected error occurred') + click.echo('Unexpected error occurred, please contact us at support@intezer.com ' + f'and attach the log file in {utilities.log_file_path}', err=True) + + +@alerts_data_sources.command('reactivate', short_help='Reactivate a connector') +@click.argument('connector_id', type=click.STRING) +@click.option('--wait', is_flag=True, default=False, help='Poll status until terminal state') +def reactivate_data_source(connector_id: str, wait: bool): + """Activate (reactivate) an alert data source connector. + + \b + CONNECTOR_ID: The connector ID to activate. + + \b + Examples: + $ intezer-cli alerts-data-sources activate acme-corp + $ intezer-cli alerts-data-sources activate acme-corp --wait + """ + try: + create_global_api() + connector_commands.reactivate_alert_data_source_command(connector_id=connector_id, wait=wait) + except click.Abort: + raise + except click.ClickException: + raise + except Exception: + logger.exception('Unexpected error occurred') + click.echo('Unexpected error occurred, please contact us at support@intezer.com ' + f'and attach the log file in {utilities.log_file_path}', err=True) + + +@alerts_data_sources.command('update', short_help='Update connector credentials or settings') +@click.argument('connector_id', type=click.STRING) +@click.option('--config', 'config_file', required=True, type=click.File('r'), + help='Path to JSON file with updated credentials or settings (use - for stdin)') +@click.option('--wait', is_flag=True, default=False, help='Poll status until terminal state') +def update_data_source(connector_id: str, config_file, wait: bool): + """Update an alert data source connector's credentials or settings. + + \b + CONNECTOR_ID: The connector ID to update. + + \b + Examples: + $ intezer-cli alerts-data-sources update --config new-creds.json + $ intezer-cli alerts-data-sources update --config - < new-creds.json + $ intezer-cli alerts-data-sources update --config new-creds.json --wait + """ + try: + create_global_api() + connector_commands.update_alert_data_source_command( + connector_id=connector_id, + config_file=config_file, + wait=wait + ) + except click.Abort: + raise + except click.ClickException: + raise + except Exception: + logger.exception('Unexpected error occurred') + click.echo('Unexpected error occurred, please contact us at support@intezer.com ' + f'and attach the log file in {utilities.log_file_path}', err=True) + + @alerts.command('notify-from-csv', short_help='Notify alerts from CSV file') @click.argument('csv_path', type=click.Path(exists=True, dir_okay=False)) def notify_from_csv(csv_path: str): @@ -366,7 +503,7 @@ def notify_from_csv(csv_path: str): \b Examples: Notify alerts from CSV file: - $ intezer-analyze alerts notify-from-csv ~/alerts.csv + $ intezer-cli alerts notify-from-csv ~/alerts.csv """ try: create_global_api() diff --git a/intezer_analyze_cli/commands.py b/intezer_analyze_cli/commands.py index ad4ed56..af5817c 100644 --- a/intezer_analyze_cli/commands.py +++ b/intezer_analyze_cli/commands.py @@ -518,6 +518,7 @@ def notify_alerts_from_csv_command(csv_path: str): raise click.Abort() + def _read_alerts_from_csv(csv_path: str) -> list[dict[str, str | None]]: """ Read alert IDs and environments from CSV file. diff --git a/intezer_analyze_cli/connector_commands.py b/intezer_analyze_cli/connector_commands.py new file mode 100644 index 0000000..4f536d4 --- /dev/null +++ b/intezer_analyze_cli/connector_commands.py @@ -0,0 +1,181 @@ +import json +import os +import re +import time +from http import HTTPStatus + +import click +from intezer_sdk import api +from intezer_sdk.api import raise_for_status +from yaspin import yaspin + +_CONNECTOR_NAME_PATTERN = re.compile(r'^[a-z0-9-]*$') + +_TERMINAL_SUCCESS_STATUSES = {'pending', 'active', 'deactivated'} +_TERMINAL_FAILURE_STATUSES = {'credentials_verification_failed', 'deployment_failed', 'update_failed'} +_POLL_INTERVAL_SECONDS = 5 + + +def _get_extra_params(): + tenant_id = os.environ.get('INTEZER_TENANT_ID') + if tenant_id: + return {'tenant_id': tenant_id} + return {} + + +def _print_bad_request(response): + if response.status_code == HTTPStatus.BAD_REQUEST: + click.echo(json.dumps(response.json(), indent=2), err=True) + + +def connect_alert_data_source_command(source: str, + name: str, + config_file, + resolve_false_positive: bool, + noting: bool, + auto_endpoint_scan: bool, + wait: bool): + if not _CONNECTOR_NAME_PATTERN.match(name): + click.echo('Error: Connector name must match ^[a-z0-9-]*$', err=True) + raise click.Abort() + + try: + config_data = json.load(config_file) + except json.JSONDecodeError as e: + click.echo(f'Error: Invalid JSON in config file: {e}', err=True) + raise click.Abort() + + request_body = { + 'alert_source': source, + 'connector_name': name, + 'is_resolve_false_positive_enabled': resolve_false_positive, + 'is_noting_enabled': noting, + 'is_auto_endpoint_scan_enabled': auto_endpoint_scan, + **_get_extra_params(), + } + request_body.update(config_data) + + api_client = api.get_global_api() + response = api_client.request_with_refresh_expired_access_token( + method='POST', + path='/alerts-data-sources/connect', + data=request_body + ) + _print_bad_request(response) + raise_for_status(response) + result = response.json() + result_url = result.get('result_url') + + connector_id = result.get('connector_id') + click.echo(f'Connect request sent for connector "{name}"') + + if wait and result_url: + _wait_for_connector_status(result_url) + + if connector_id: + click.echo(f'Connector ID: {connector_id}') + + +def deactivate_alert_data_source_command(connector_id: str, wait: bool): + api_client = api.get_global_api() + extra_params = _get_extra_params() + response = api_client.request_with_refresh_expired_access_token( + method='POST', + path=f'/alerts-data-sources/{connector_id}/deactivate', + **({'data': extra_params} if extra_params else {}) + ) + _print_bad_request(response) + raise_for_status(response) + result = response.json() + result_url = result.get('result_url') + + click.echo(f'Deactivate request sent for "{connector_id}"') + + if wait and result_url: + _wait_for_connector_status(result_url) + + +def reactivate_alert_data_source_command(connector_id: str, wait: bool): + api_client = api.get_global_api() + extra_params = _get_extra_params() + response = api_client.request_with_refresh_expired_access_token( + method='POST', + path=f'/alerts-data-sources/{connector_id}/reactivate', + **({'data': extra_params} if extra_params else {}) + ) + _print_bad_request(response) + raise_for_status(response) + result = response.json() + result_url = result.get('result_url') + + click.echo(f'Activate request sent for "{connector_id}"') + + if wait and result_url: + _wait_for_connector_status(result_url) + + +def update_alert_data_source_command(connector_id: str, config_file, wait: bool): + try: + config_data = json.load(config_file) + except json.JSONDecodeError as e: + click.echo(f'Error: Invalid JSON in config file: {e}', err=True) + raise click.Abort() + + request_data = {**_get_extra_params(), **config_data} + + api_client = api.get_global_api() + response = api_client.request_with_refresh_expired_access_token( + method='PUT', + path=f'/alerts-data-sources/{connector_id}', + data=request_data + ) + _print_bad_request(response) + raise_for_status(response) + + if response.status_code == HTTPStatus.OK: + click.echo(f'Update applied for "{connector_id}"') + return + + result = response.json() + result_url = result.get('result_url') + + click.echo(f'Update request sent for "{connector_id}"') + + if wait and result_url: + _wait_for_connector_status(result_url) + + +def _wait_for_connector_status(result_url: str): + api_client = api.get_global_api() + base_url = api_client.base_url.removesuffix('api/').rstrip('/') + last_status = None + + with yaspin(text='Waiting') as sp: + while True: + response = api_client.request_with_refresh_expired_access_token( + method='GET', + path=result_url, + base_url=base_url + ) + raise_for_status(response, statuses_to_ignore=[HTTPStatus.INTERNAL_SERVER_ERROR]) + result = response.json() + status = result.get('status') + + if status != last_status: + sp.write(f'Status: {status}') + sp.text = f'Waiting ({status})' + last_status = status + + if status in _TERMINAL_SUCCESS_STATUSES: + sp.ok('✓') + click.echo('Operation completed successfully') + return + + if status in _TERMINAL_FAILURE_STATUSES: + sp.fail('✗') + error_detail = result.get('error', '') + if error_detail: + click.echo(f'Error: {error_detail}', err=True) + raise click.ClickException(f'Operation failed with status: {status}') + + time.sleep(_POLL_INTERVAL_SECONDS) diff --git a/intezer_analyze_cli/key_store.py b/intezer_analyze_cli/key_store.py index 7403a31..d6672d9 100644 --- a/intezer_analyze_cli/key_store.py +++ b/intezer_analyze_cli/key_store.py @@ -1,16 +1,16 @@ import logging import os -from intezer_analyze_cli.config import default_config as config_ +from intezer_analyze_cli.config import default_config logger = logging.getLogger('intezer_cli') def get_key_file_path(key_file_name): if os.name == 'posix': - return os.path.join(os.path.expanduser('~'), config_.key_dir_name, key_file_name) + return os.path.join(os.path.expanduser('~'), default_config.key_dir_name, key_file_name) - return os.path.join(os.path.expandvars('%APPDATA%'), config_.key_dir_name, key_file_name) + return os.path.join(os.path.expandvars('%APPDATA%'), default_config.key_dir_name, key_file_name) def delete_key(key_file_name): @@ -45,20 +45,20 @@ def get_stored_key(key_file_name): def get_stored_api_key(): - return get_stored_key(config_.key_file_name) + return get_stored_key(default_config.key_file_name) def get_stored_default_url(): - return get_stored_key(config_.url_file_name) + return get_stored_key(default_config.url_file_name) def store_api_key(key): - store_key(key, config_.key_file_name) + store_key(key, default_config.key_file_name) def store_default_url(key): - store_key(key, config_.url_file_name) + store_key(key, default_config.url_file_name) def delete_default_url(): - delete_key(config_.url_file_name) + delete_key(default_config.url_file_name) diff --git a/intezer_analyze_cli/utilities.py b/intezer_analyze_cli/utilities.py index 85e9b80..7d94324 100644 --- a/intezer_analyze_cli/utilities.py +++ b/intezer_analyze_cli/utilities.py @@ -59,7 +59,7 @@ def init_log(logger_name, debug_mode=False): # file try: current_directory = os.getcwd() - log_file_path = os.path.join(current_directory, 'intezer-analyze-cli.log') + log_file_path = os.path.join(current_directory, 'intezer-cli.log') handler = logging.FileHandler(log_file_path) formatter = ExtraFormatter('%(asctime)s %(levelname)-8s %(module)s line: %(lineno)d: %(message)s. %(extra)s') diff --git a/requirements-prod.txt b/requirements-prod.txt index eb2a9cc..df3b2e4 100644 --- a/requirements-prod.txt +++ b/requirements-prod.txt @@ -1,2 +1,3 @@ click==7.1.2 -intezer-sdk==1.25.0 +intezer-sdk>=1.25.0,<2 +yaspin>=3.0.0,<4 diff --git a/setup.py b/setup.py index 1481e11..44a65e6 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,8 @@ def rel(*xs): install_requires = [ 'click==7.1.2', - 'intezer-sdk>=1.25.0,<2' + 'intezer-sdk>=1.25.0,<2', + 'yaspin>=3.0.0,<4' ] tests_require = [ 'pytest==9.0.2', @@ -48,6 +49,7 @@ def rel(*xs): entry_points=''' [console_scripts] intezer-analyze=intezer_analyze_cli.cli:main_cli + intezer-cli=intezer_analyze_cli.cli:main_cli ''', license='Apache License v2', long_description=long_description, diff --git a/tests/unit/cli_test.py b/tests/unit/cli_test.py index 6cc2363..18c4c4e 100644 --- a/tests/unit/cli_test.py +++ b/tests/unit/cli_test.py @@ -1,7 +1,9 @@ +import json import os import tempfile import unittest from pathlib import Path +from unittest.mock import ANY from unittest.mock import MagicMock from unittest.mock import patch @@ -312,6 +314,244 @@ def test_alerts_notify_from_csv_file_not_exists_returns_error(self): self.assertEqual(result.exit_code, 2) self.assertTrue(b'does not exist' in result.stdout_bytes) +class AlertsDataSourcesCliSpec(CliSpec): + def setUp(self): + super(AlertsDataSourcesCliSpec, self).setUp() + + create_global_api_patcher = patch('intezer_analyze_cli.cli.create_global_api') + self.create_global_api_patcher_mock = create_global_api_patcher.start() + self.addCleanup(create_global_api_patcher.stop) + + key_store.get_stored_api_key = MagicMock(return_value='api_key') + + @patch('intezer_analyze_cli.connector_commands.connect_alert_data_source_command') + def test_connect_invokes_command_with_correct_args(self, connect_mock): + # Arrange + with tempfile.TemporaryDirectory() as temp_dir: + config_path = os.path.join(temp_dir, 'creds.json') + with open(config_path, 'w') as f: + json.dump({'crowdstrike': {'client_id': 'id'}}, f) + + # Act + result = self.runner.invoke(cli.main_cli, [ + 'alerts-data-sources', 'connect', + '--source', 'crowdstrike', + '--name', 'acme-corp', + '--config', config_path, + '--resolve-false-positive', + '--noting', + '--auto-endpoint-scan', + '--wait' + ]) + + # Assert + self.assertEqual(result.exit_code, 0, result.exception) + connect_mock.assert_called_once_with( + source='crowdstrike', + name='acme-corp', + config_file=ANY, + resolve_false_positive=True, + noting=True, + auto_endpoint_scan=True, + wait=True + ) + + @patch('intezer_analyze_cli.connector_commands.connect_alert_data_source_command') + def test_connect_invokes_command_with_defaults(self, connect_mock): + # Arrange + with tempfile.TemporaryDirectory() as temp_dir: + config_path = os.path.join(temp_dir, 'creds.json') + with open(config_path, 'w') as f: + json.dump({}, f) + + # Act + result = self.runner.invoke(cli.main_cli, [ + 'alerts-data-sources', 'connect', + '--source', 'crowdstrike', + '--name', 'acme-corp', + '--config', config_path + ]) + + # Assert + self.assertEqual(result.exit_code, 0, result.exception) + connect_mock.assert_called_once_with( + source='crowdstrike', + name='acme-corp', + config_file=ANY, + resolve_false_positive=False, + noting=False, + auto_endpoint_scan=False, + wait=False + ) + + def test_connect_missing_required_source_returns_error(self): + # Arrange + with tempfile.TemporaryDirectory() as temp_dir: + config_path = os.path.join(temp_dir, 'creds.json') + with open(config_path, 'w') as f: + json.dump({}, f) + + # Act + result = self.runner.invoke(cli.main_cli, [ + 'alerts-data-sources', 'connect', + '--name', 'acme-corp', + '--config', config_path + ]) + + # Assert + self.assertEqual(result.exit_code, 2) + self.assertIn(b'--source', result.stdout_bytes) + + def test_connect_missing_required_name_returns_error(self): + # Arrange + with tempfile.TemporaryDirectory() as temp_dir: + config_path = os.path.join(temp_dir, 'creds.json') + with open(config_path, 'w') as f: + json.dump({}, f) + + # Act + result = self.runner.invoke(cli.main_cli, [ + 'alerts-data-sources', 'connect', + '--source', 'crowdstrike', + '--config', config_path + ]) + + # Assert + self.assertEqual(result.exit_code, 2) + self.assertIn(b'--name', result.stdout_bytes) + + def test_connect_missing_required_config_returns_error(self): + # Act + result = self.runner.invoke(cli.main_cli, [ + 'alerts-data-sources', 'connect', + '--source', 'crowdstrike', + '--name', 'acme-corp' + ]) + + # Assert + self.assertEqual(result.exit_code, 2) + self.assertIn(b'--config', result.stdout_bytes) + + @patch('intezer_analyze_cli.connector_commands.deactivate_alert_data_source_command') + def test_deactivate_invokes_command_with_correct_args(self, deactivate_mock): + # Act + result = self.runner.invoke(cli.main_cli, [ + 'alerts-data-sources', 'deactivate', 'acme-corp', '--wait' + ]) + + # Assert + self.assertEqual(result.exit_code, 0, result.exception) + deactivate_mock.assert_called_once_with(connector_id='acme-corp', wait=True) + + @patch('intezer_analyze_cli.connector_commands.deactivate_alert_data_source_command') + def test_deactivate_invokes_command_without_wait(self, deactivate_mock): + # Act + result = self.runner.invoke(cli.main_cli, [ + 'alerts-data-sources', 'deactivate', 'acme-corp' + ]) + + # Assert + self.assertEqual(result.exit_code, 0, result.exception) + deactivate_mock.assert_called_once_with(connector_id='acme-corp', wait=False) + + def test_deactivate_missing_connector_name_returns_error(self): + # Act + result = self.runner.invoke(cli.main_cli, [ + 'alerts-data-sources', 'deactivate' + ]) + + # Assert + self.assertEqual(result.exit_code, 2) + + @patch('intezer_analyze_cli.connector_commands.reactivate_alert_data_source_command') + def test_activate_invokes_command_with_correct_args(self, activate_mock): + # Act + result = self.runner.invoke(cli.main_cli, [ + 'alerts-data-sources', 'reactivate', 'acme-corp', '--wait' + ]) + + # Assert + self.assertEqual(result.exit_code, 0, result.exception) + activate_mock.assert_called_once_with(connector_id='acme-corp', wait=True) + + @patch('intezer_analyze_cli.connector_commands.update_alert_data_source_command') + def test_update_invokes_command_with_correct_args(self, update_mock): + # Arrange + with tempfile.TemporaryDirectory() as temp_dir: + config_path = os.path.join(temp_dir, 'new-creds.json') + with open(config_path, 'w') as f: + json.dump({'new_key': 'new_value'}, f) + + # Act + result = self.runner.invoke(cli.main_cli, [ + 'alerts-data-sources', 'update', 'acme-corp', + '--config', config_path, '--wait' + ]) + + # Assert + self.assertEqual(result.exit_code, 0, result.exception) + update_mock.assert_called_once_with( + connector_id='acme-corp', + config_file=ANY, + wait=True + ) + + def test_update_missing_required_config_returns_error(self): + # Act + result = self.runner.invoke(cli.main_cli, [ + 'alerts-data-sources', 'update', 'acme-corp' + ]) + + # Assert + self.assertEqual(result.exit_code, 2) + self.assertIn(b'--config', result.stdout_bytes) + + @patch('intezer_analyze_cli.connector_commands.connect_alert_data_source_command') + def test_group_accessible_with_underscores(self, connect_mock): + # Arrange + with tempfile.TemporaryDirectory() as temp_dir: + config_path = os.path.join(temp_dir, 'creds.json') + with open(config_path, 'w') as f: + json.dump({}, f) + + # Act + result = self.runner.invoke(cli.main_cli, [ + 'alerts_data_sources', 'connect', + '--source', 'crowdstrike', + '--name', 'acme-corp', + '--config', config_path + ]) + + # Assert + self.assertEqual(result.exit_code, 0, result.exception) + self.assertTrue(connect_mock.called) + + def test_group_help_shows_subcommands(self): + # Act + result = self.runner.invoke(cli.main_cli, ['alerts-data-sources', '--help']) + + # Assert + self.assertEqual(result.exit_code, 0) + self.assertIn(b'connect', result.stdout_bytes) + self.assertIn(b'deactivate', result.stdout_bytes) + self.assertIn(b'activate', result.stdout_bytes) + self.assertIn(b'update', result.stdout_bytes) + + def test_connect_help_shows_options(self): + # Act + result = self.runner.invoke(cli.main_cli, ['alerts-data-sources', 'connect', '--help']) + + # Assert + self.assertEqual(result.exit_code, 0) + self.assertIn(b'--source', result.stdout_bytes) + self.assertIn(b'--name', result.stdout_bytes) + self.assertIn(b'--config', result.stdout_bytes) + self.assertIn(b'--resolve-false-positive', result.stdout_bytes) + self.assertIn(b'--noting', result.stdout_bytes) + self.assertIn(b'--auto-endpoint-scan', result.stdout_bytes) + self.assertIn(b'--wait', result.stdout_bytes) + + class CliIndexSpec(CliSpec): def setUp(self): super(CliIndexSpec, self).setUp() diff --git a/tests/unit/commands_test.py b/tests/unit/commands_test.py index e861740..8521d47 100644 --- a/tests/unit/commands_test.py +++ b/tests/unit/commands_test.py @@ -1,3 +1,5 @@ +import io +import json import os import tempfile import unittest.mock @@ -5,6 +7,7 @@ from pathlib import Path from tempfile import tempdir from unittest.mock import MagicMock +from unittest.mock import call from unittest.mock import patch import click.exceptions @@ -13,6 +16,7 @@ from intezer_sdk import errors as sdk_errors import intezer_analyze_cli.key_store as key_store from intezer_analyze_cli import commands +from intezer_analyze_cli import connector_commands from intezer_analyze_cli.cli import create_global_api from tests.unit.cli_test import CliSpec @@ -367,16 +371,16 @@ def test_notify_alerts_from_csv_command_handles_alert_not_found(self, mock_progr def test_notify_alerts_from_csv_command_handles_alert_in_progress(self, mock_progressbar, mock_alert_class): # Arrange create_global_api() - + # Mock progress bar mock_progress_context = MagicMock() mock_progressbar.return_value.__enter__.return_value = mock_progress_context - + # Mock Alert instance that raises AlertInProgressError mock_alert = MagicMock() mock_alert.notify.side_effect = sdk_errors.AlertInProgressError('test-alert-1') mock_alert_class.return_value = mock_alert - + with tempfile.TemporaryDirectory() as temp_dir: csv_file_path = os.path.join(temp_dir, 'test_alerts.csv') with open(csv_file_path, 'w') as f: @@ -391,3 +395,506 @@ def test_notify_alerts_from_csv_command_handles_alert_in_progress(self, mock_pro mock_echo.assert_any_call('Alert test-alert-1 is still in progress') mock_echo.assert_any_call('1 alerts failed to notify') + +class CommandConnectAlertDataSourceSpec(CliSpec): + def setUp(self): + super(CommandConnectAlertDataSourceSpec, self).setUp() + + self.mock_api_client = MagicMock() + api_patcher = patch('intezer_analyze_cli.connector_commands.api.get_global_api', + return_value=self.mock_api_client) + api_patcher.start() + self.addCleanup(api_patcher.stop) + + raise_for_status_patcher = patch('intezer_analyze_cli.connector_commands.raise_for_status') + raise_for_status_patcher.start() + self.addCleanup(raise_for_status_patcher.stop) + + def test_connect_prints_connector_id(self): + # Arrange + mock_response = MagicMock() + mock_response.json.return_value = { + 'result_url': '/alerts-data-sources/acme-corp/connect-status', + 'connector_id': 'conn-123-abc' + } + self.mock_api_client.request_with_refresh_expired_access_token.return_value = mock_response + + config_file = io.StringIO('{}') + + # Act + with patch('click.echo') as mock_echo: + connector_commands.connect_alert_data_source_command( + source='crowdstrike', + name='acme-corp', + config_file=config_file, + resolve_false_positive=False, + noting=False, + auto_endpoint_scan=False, + wait=False + ) + + # Assert + mock_echo.assert_any_call('Connector ID: conn-123-abc') + + def test_connect_builds_correct_request_body(self): + # Arrange + mock_response = MagicMock() + mock_response.json.return_value = {'result_url': '/alerts-data-sources/acme-corp/connect-status'} + self.mock_api_client.request_with_refresh_expired_access_token.return_value = mock_response + + config_data = {'crowdstrike': {'client_secret': 'secret', 'client_id': 'id123'}} + config_file = io.StringIO(json.dumps(config_data)) + + # Act + connector_commands.connect_alert_data_source_command( + source='crowdstrike', + name='acme-corp', + config_file=config_file, + resolve_false_positive=True, + noting=False, + auto_endpoint_scan=True, + wait=False + ) + + # Assert + expected_body = { + 'alert_source': 'crowdstrike', + 'connector_name': 'acme-corp', + 'is_resolve_false_positive_enabled': True, + 'is_noting_enabled': False, + 'is_auto_endpoint_scan_enabled': True, + 'crowdstrike': {'client_secret': 'secret', 'client_id': 'id123'} + } + self.mock_api_client.request_with_refresh_expired_access_token.assert_called_once_with( + method='POST', + path='/alerts-data-sources/connect', + data=expected_body + ) + + @patch.dict(os.environ, {'INTEZER_TENANT_ID': 'tenant-123'}) + def test_connect_includes_tenant_id_when_env_var_set(self): + # Arrange + mock_response = MagicMock() + mock_response.json.return_value = {'result_url': '/alerts-data-sources/acme-corp/connect-status'} + self.mock_api_client.request_with_refresh_expired_access_token.return_value = mock_response + + config_file = io.StringIO('{}') + + # Act + connector_commands.connect_alert_data_source_command( + source='crowdstrike', + name='acme-corp', + config_file=config_file, + resolve_false_positive=False, + noting=False, + auto_endpoint_scan=False, + wait=False + ) + + # Assert + call_kwargs = self.mock_api_client.request_with_refresh_expired_access_token.call_args + self.assertEqual(call_kwargs[1]['data']['tenant_id'], 'tenant-123') + + def test_connect_aborts_when_invalid_connector_name(self): + # Arrange + config_file = io.StringIO('{}') + + # Act & Assert + with self.assertRaises(click.exceptions.Abort): + connector_commands.connect_alert_data_source_command( + source='crowdstrike', + name='INVALID_NAME!', + config_file=config_file, + resolve_false_positive=False, + noting=False, + auto_endpoint_scan=False, + wait=False + ) + + self.mock_api_client.request_with_refresh_expired_access_token.assert_not_called() + + def test_connect_aborts_when_invalid_json_config(self): + # Arrange + config_file = io.StringIO('{invalid json}') + + # Act & Assert + with patch('click.echo') as mock_echo: + with self.assertRaises(click.exceptions.Abort): + connector_commands.connect_alert_data_source_command( + source='crowdstrike', + name='acme-corp', + config_file=config_file, + resolve_false_positive=False, + noting=False, + auto_endpoint_scan=False, + wait=False + ) + + echo_calls = [str(c) for c in mock_echo.call_args_list] + self.assertTrue(any('Invalid JSON' in c for c in echo_calls)) + + @patch('intezer_analyze_cli.connector_commands._wait_for_connector_status') + def test_connect_calls_wait_when_flag_is_set(self, mock_wait): + # Arrange + mock_response = MagicMock() + mock_response.json.return_value = {'result_url': '/alerts-data-sources/acme-corp/connect-status'} + self.mock_api_client.request_with_refresh_expired_access_token.return_value = mock_response + + config_file = io.StringIO('{}') + + # Act + connector_commands.connect_alert_data_source_command( + source='crowdstrike', + name='acme-corp', + config_file=config_file, + resolve_false_positive=False, + noting=False, + auto_endpoint_scan=False, + wait=True + ) + + # Assert + mock_wait.assert_called_once_with('/alerts-data-sources/acme-corp/connect-status') + + @patch('intezer_analyze_cli.connector_commands._wait_for_connector_status') + def test_connect_does_not_call_wait_when_flag_is_false(self, mock_wait): + # Arrange + mock_response = MagicMock() + mock_response.json.return_value = {'result_url': '/alerts-data-sources/acme-corp/connect-status'} + self.mock_api_client.request_with_refresh_expired_access_token.return_value = mock_response + + config_file = io.StringIO('{}') + + # Act + connector_commands.connect_alert_data_source_command( + source='crowdstrike', + name='acme-corp', + config_file=config_file, + resolve_false_positive=False, + noting=False, + auto_endpoint_scan=False, + wait=False + ) + + # Assert + mock_wait.assert_not_called() + + +class CommandDeactivateAlertDataSourceSpec(CliSpec): + def setUp(self): + super(CommandDeactivateAlertDataSourceSpec, self).setUp() + + self.mock_api_client = MagicMock() + api_patcher = patch('intezer_analyze_cli.connector_commands.api.get_global_api', + return_value=self.mock_api_client) + api_patcher.start() + self.addCleanup(api_patcher.stop) + + raise_for_status_patcher = patch('intezer_analyze_cli.connector_commands.raise_for_status') + raise_for_status_patcher.start() + self.addCleanup(raise_for_status_patcher.stop) + + def test_deactivate_calls_correct_api_path(self): + # Arrange + mock_response = MagicMock() + mock_response.json.return_value = {'result_url': '/alerts-data-sources/acme-corp/connect-status'} + self.mock_api_client.request_with_refresh_expired_access_token.return_value = mock_response + + # Act + connector_commands.deactivate_alert_data_source_command(connector_id='acme-corp', wait=False) + + # Assert + self.mock_api_client.request_with_refresh_expired_access_token.assert_called_once_with( + method='POST', + path='/alerts-data-sources/acme-corp/deactivate' + ) + + @patch('intezer_analyze_cli.connector_commands._wait_for_connector_status') + def test_deactivate_calls_wait_when_flag_is_set(self, mock_wait): + # Arrange + mock_response = MagicMock() + mock_response.json.return_value = {'result_url': '/alerts-data-sources/acme-corp/connect-status'} + self.mock_api_client.request_with_refresh_expired_access_token.return_value = mock_response + + # Act + connector_commands.deactivate_alert_data_source_command(connector_id='acme-corp', wait=True) + + # Assert + mock_wait.assert_called_once_with('/alerts-data-sources/acme-corp/connect-status') + + +class CommandActivateAlertDataSourceSpec(CliSpec): + def setUp(self): + super(CommandActivateAlertDataSourceSpec, self).setUp() + + self.mock_api_client = MagicMock() + api_patcher = patch('intezer_analyze_cli.connector_commands.api.get_global_api', + return_value=self.mock_api_client) + api_patcher.start() + self.addCleanup(api_patcher.stop) + + raise_for_status_patcher = patch('intezer_analyze_cli.connector_commands.raise_for_status') + raise_for_status_patcher.start() + self.addCleanup(raise_for_status_patcher.stop) + + def test_activate_calls_correct_api_path(self): + # Arrange + mock_response = MagicMock() + mock_response.json.return_value = {'result_url': '/alerts-data-sources/acme-corp/connect-status'} + self.mock_api_client.request_with_refresh_expired_access_token.return_value = mock_response + + # Act + connector_commands.reactivate_alert_data_source_command(connector_id='acme-corp', wait=False) + + # Assert + self.mock_api_client.request_with_refresh_expired_access_token.assert_called_once_with( + method='POST', + path='/alerts-data-sources/acme-corp/reactivate' + ) + + @patch('intezer_analyze_cli.connector_commands._wait_for_connector_status') + def test_activate_calls_wait_when_flag_is_set(self, mock_wait): + # Arrange + mock_response = MagicMock() + mock_response.json.return_value = {'result_url': '/alerts-data-sources/acme-corp/connect-status'} + self.mock_api_client.request_with_refresh_expired_access_token.return_value = mock_response + + # Act + connector_commands.reactivate_alert_data_source_command(connector_id='acme-corp', wait=True) + + # Assert + mock_wait.assert_called_once_with('/alerts-data-sources/acme-corp/connect-status') + + +class CommandUpdateAlertDataSourceSpec(CliSpec): + def setUp(self): + super(CommandUpdateAlertDataSourceSpec, self).setUp() + + self.mock_api_client = MagicMock() + api_patcher = patch('intezer_analyze_cli.connector_commands.api.get_global_api', + return_value=self.mock_api_client) + api_patcher.start() + self.addCleanup(api_patcher.stop) + + raise_for_status_patcher = patch('intezer_analyze_cli.connector_commands.raise_for_status') + raise_for_status_patcher.start() + self.addCleanup(raise_for_status_patcher.stop) + + def test_update_calls_correct_api_path_with_put(self): + # Arrange + mock_response = MagicMock() + mock_response.status_code = 202 + mock_response.json.return_value = {'result_url': '/alerts-data-sources/acme-corp/connect-status'} + self.mock_api_client.request_with_refresh_expired_access_token.return_value = mock_response + + config_data = {'crowdstrike': {'client_secret': 'new-secret'}} + config_file = io.StringIO(json.dumps(config_data)) + + # Act + connector_commands.update_alert_data_source_command( + connector_id='acme-corp', + config_file=config_file, + wait=False + ) + + # Assert + self.mock_api_client.request_with_refresh_expired_access_token.assert_called_once_with( + method='PUT', + path='/alerts-data-sources/acme-corp', + data=config_data + ) + + def test_update_returns_immediately_on_200_ok(self): + # Arrange + mock_response = MagicMock() + mock_response.status_code = 200 + self.mock_api_client.request_with_refresh_expired_access_token.return_value = mock_response + + config_file = io.StringIO('{}') + + # Act + with patch('click.echo') as mock_echo: + connector_commands.update_alert_data_source_command( + connector_id='acme-corp', + config_file=config_file, + wait=True + ) + + # Assert + mock_echo.assert_called_once_with('Update applied for "acme-corp"') + mock_response.json.assert_not_called() + + def test_update_aborts_when_invalid_json_config(self): + # Arrange + config_file = io.StringIO('not valid json') + + # Act & Assert + with patch('click.echo') as mock_echo: + with self.assertRaises(click.exceptions.Abort): + connector_commands.update_alert_data_source_command( + connector_id='acme-corp', + config_file=config_file, + wait=False + ) + + echo_calls = [str(c) for c in mock_echo.call_args_list] + self.assertTrue(any('Invalid JSON' in c for c in echo_calls)) + + @patch('intezer_analyze_cli.connector_commands._wait_for_connector_status') + def test_update_calls_wait_when_flag_is_set(self, mock_wait): + # Arrange + mock_response = MagicMock() + mock_response.status_code = 202 + mock_response.json.return_value = {'result_url': '/alerts-data-sources/acme-corp/connect-status'} + self.mock_api_client.request_with_refresh_expired_access_token.return_value = mock_response + + config_file = io.StringIO('{}') + + # Act + connector_commands.update_alert_data_source_command( + connector_id='acme-corp', + config_file=config_file, + wait=True + ) + + # Assert + mock_wait.assert_called_once_with('/alerts-data-sources/acme-corp/connect-status') + + +class CommandWaitForConnectorStatusSpec(CliSpec): + def setUp(self): + super(CommandWaitForConnectorStatusSpec, self).setUp() + + self.mock_api_client = MagicMock() + api_patcher = patch('intezer_analyze_cli.connector_commands.api.get_global_api', + return_value=self.mock_api_client) + api_patcher.start() + self.addCleanup(api_patcher.stop) + + raise_for_status_patcher = patch('intezer_analyze_cli.connector_commands.raise_for_status') + self.mock_raise_for_status = raise_for_status_patcher.start() + self.addCleanup(raise_for_status_patcher.stop) + + sleep_patcher = patch('intezer_analyze_cli.connector_commands.time.sleep') + self.mock_sleep = sleep_patcher.start() + self.addCleanup(sleep_patcher.stop) + + self.mock_spinner = MagicMock() + yaspin_patcher = patch('intezer_analyze_cli.connector_commands.yaspin') + self.mock_yaspin = yaspin_patcher.start() + self.mock_yaspin.return_value.__enter__ = MagicMock(return_value=self.mock_spinner) + self.mock_yaspin.return_value.__exit__ = MagicMock(return_value=False) + self.addCleanup(yaspin_patcher.stop) + + def _make_status_response(self, status, error=None): + mock_response = MagicMock() + result = {'status': status} + if error: + result['error'] = error + mock_response.json.return_value = result + return mock_response + + def test_wait_returns_on_active_status(self): + # Arrange + self.mock_api_client.request_with_refresh_expired_access_token.return_value = \ + self._make_status_response('active') + + # Act + connector_commands._wait_for_connector_status('/alerts-data-sources/acme-corp/connect-status') + + # Assert + self.mock_api_client.request_with_refresh_expired_access_token.assert_called_once_with( + method='GET', + path='/alerts-data-sources/acme-corp/connect-status', + base_url=self.mock_api_client.base_url.removesuffix().rstrip() + ) + self.mock_sleep.assert_not_called() + + def test_wait_returns_on_pending_status(self): + # Arrange + self.mock_api_client.request_with_refresh_expired_access_token.return_value = \ + self._make_status_response('pending') + + # Act + connector_commands._wait_for_connector_status('/alerts-data-sources/acme-corp/connect-status') + + # Assert + self.mock_sleep.assert_not_called() + + def test_wait_returns_on_deactivated_status(self): + # Arrange + self.mock_api_client.request_with_refresh_expired_access_token.return_value = \ + self._make_status_response('deactivated') + + # Act + connector_commands._wait_for_connector_status('/alerts-data-sources/acme-corp/connect-status') + + # Assert + self.mock_sleep.assert_not_called() + + def test_wait_raises_on_credentials_verification_failed(self): + # Arrange + self.mock_api_client.request_with_refresh_expired_access_token.return_value = \ + self._make_status_response('credentials_verification_failed', error='Bad credentials') + + # Act & Assert + with self.assertRaises(click.exceptions.ClickException) as ctx: + connector_commands._wait_for_connector_status('/alerts-data-sources/acme-corp/connect-status') + + self.assertIn('credentials_verification_failed', str(ctx.exception)) + + def test_wait_raises_on_deployment_failed(self): + # Arrange + self.mock_api_client.request_with_refresh_expired_access_token.return_value = \ + self._make_status_response('deployment_failed') + + # Act & Assert + with self.assertRaises(click.exceptions.ClickException) as ctx: + connector_commands._wait_for_connector_status('/alerts-data-sources/acme-corp/connect-status') + + self.assertIn('deployment_failed', str(ctx.exception)) + + def test_wait_raises_on_update_failed(self): + # Arrange + self.mock_api_client.request_with_refresh_expired_access_token.return_value = \ + self._make_status_response('update_failed') + + # Act & Assert + with self.assertRaises(click.exceptions.ClickException): + connector_commands._wait_for_connector_status('/alerts-data-sources/acme-corp/connect-status') + + def test_wait_polls_through_in_progress_statuses(self): + # Arrange + self.mock_api_client.request_with_refresh_expired_access_token.side_effect = [ + self._make_status_response('verifying_credentials'), + self._make_status_response('deployment_in_progress'), + self._make_status_response('active'), + ] + + # Act + connector_commands._wait_for_connector_status('/alerts-data-sources/acme-corp/connect-status') + + # Assert + self.assertEqual(self.mock_api_client.request_with_refresh_expired_access_token.call_count, 3) + self.assertEqual(self.mock_sleep.call_count, 2) + self.mock_sleep.assert_called_with(5) + + def test_wait_prints_status_transitions(self): + # Arrange + self.mock_api_client.request_with_refresh_expired_access_token.side_effect = [ + self._make_status_response('verifying_credentials'), + self._make_status_response('verifying_credentials'), + self._make_status_response('active'), + ] + + # Act + connector_commands._wait_for_connector_status('/alerts-data-sources/acme-corp/connect-status') + + # Assert - should print verifying_credentials only once (deduped), then active via sp.write + self.mock_spinner.write.assert_any_call('Status: verifying_credentials') + self.mock_spinner.write.assert_any_call('Status: active') + # verifying_credentials should appear exactly once in write calls + status_calls = [c for c in self.mock_spinner.write.call_args_list + if 'Status: verifying_credentials' in str(c)] + self.assertEqual(len(status_calls), 1) +