diff --git a/code/soar-uam-pq-alerts/alert.json b/code/soar-uam-pq-alerts/alert.json new file mode 100644 index 0000000..fe64eba --- /dev/null +++ b/code/soar-uam-pq-alerts/alert.json @@ -0,0 +1,78 @@ +{ + "activity_id": 1, + "attack_surface_ids": [1], + "category_uid": 2, + "class_name": "Detection Finding", + "evidences": [ + { + "process": { + "file": { + "name": "malicious_file.exe", + "path": "/usr/local/bin/malicious_file.exe", + "size": 1024000, + "hashes": [ + { + "algorithm_id": 6, + "value": "2c4d2c6262d46313990e31bf7b6437028a566ba8" + }, + { + "algorithm_id": 1, + "value": "d41d8cd98f00b204e9800998ecf8427e" + }, + { + "algorithm_id": 2, + "value": "da39a3ee5e6b4b0d3255bfef95601890afd80709" + } + ], + "signature": { + "algorithm_id": 3, + "certificate": { + "subject": "CN=CodeSign Cert, O=Acme Corp, C=US", + "serial_number": "1234567890ABCDEF", + "expiration_time": 1718400000000, + "fingerprints": [ + { + "algorithm_id": 6, + "value": "2c4d2c6262d46313990e31bf7b6437028a566ba8" + }, + { + "algorithm_id": 1, + "value": "d41d8cd98f00b204e9800998ecf8427e" + } + ], + "issuer": "Acme Corp" + } + }, + "type_id": 3 + } + } + } + ], + "class_uid": 99602001, + "confidence_id": 1, + "finding_info": { + "title": "Suspicious Activity 8", + "uid": "placeholder_uid" + }, + "metadata": { + "product": { + "name": "sdl-cli", + "vendor_name": "Scalyr" + }, + "version": "1.1.0" + }, + "resources": [ + { + "name": "foo", + "uid": "bar" + } + ], + "s1_classification_id": 0, + "severity_id": 1, + "status_id": 1, + "time": 0, + "timezone_offset": 0, + "title": "Malware Detection", + "type_name": "Detection Finding: Other", + "type_uid": 9960200101 +} diff --git a/code/soar-uam-pq-alerts/readme.md b/code/soar-uam-pq-alerts/readme.md new file mode 100644 index 0000000..6a39362 --- /dev/null +++ b/code/soar-uam-pq-alerts/readme.md @@ -0,0 +1,42 @@ +# **SentinelOne Alert Handling Script** + +## **Overview** + +This script is designed to facilitate the ingestion and processing of alerts from SentinelOne's Power Query API. It loads alert data, updates it with additional information retrieved from SentinelOne, and then sends the alert to S1 Unified Alerts Management console. + +## **Features** + +* **Alert Ingestion**: The script fetches alert data from a local file. +* **Data Enrichment**: It queries SentinelOne for additional data, such as process names and MD5 hashes, based on specific queries. +* **Alert Processing**: Updates the alert data with the enriched information (e.g., process name and MD5 hash). +* **Alert Egress**: Sends the updated alerts to an external ingestion endpoint via an HTTP POST request. + +## **Requirements** + +* Python 3.x +* `requests` library +* `gzip` and `json` modules (standard Python libraries) +* SentinelOne API token +* SentinelOne account ID +* Site ID (optional) + +## **Environment Variables** + +The following environment variables must be set for the script to function properly: + +* `SENTINELONE_TOKEN`: Your SentinelOne API token. +* `ACCOUNT_ID`: Your SentinelOne account ID. +* `SITE_ID` (optional): Your SentinelOne site ID (if applicable). + +## **Setup** + +1. Install the required Python packages: +`pip install requests` + +2. Set the required environment variables: + * `SENTINELONE_TOKEN` + * `ACCOUNT_ID` + * `SITE_ID` (optional) +3. Place the alert JSON file that contains the basic alert data (e.g., `alert.json`), which will be updated by the script. +4. execute `python3 run.py` + diff --git a/code/soar-uam-pq-alerts/run.py b/code/soar-uam-pq-alerts/run.py new file mode 100644 index 0000000..7790590 --- /dev/null +++ b/code/soar-uam-pq-alerts/run.py @@ -0,0 +1,231 @@ +import os +import requests +import gzip +import json +import uuid +import time +import logging + + +# Set up logging +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +ch = logging.StreamHandler() +ch.setLevel(logging.DEBUG) +formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') +ch.setFormatter(formatter) +logger.addHandler(ch) + +def egress_uam_alert(alert, scope, token, uam_ingest_url): + headers = { + "Authorization": f"Bearer {token}", + "S1-Scope": scope, + "Content-Encoding": "gzip", + "Content-Type": "application/json" + } + + # Convert the alert to JSON and gzip it + gzipped_alert = gzip.compress(json.dumps(alert).encode('utf-8')) + logger.info("Compressed alert data and prepared headers for request.") + + try: + logger.info("Sending POST request to SentinelOne endpoint.") + response = requests.post(f"{uam_ingest_url}/v1/alerts", headers=headers, data=gzipped_alert) + response.raise_for_status() # Raise an error for bad responses (4xx, 5xx) + logger.info("Request successful with status code: %s", response.status_code) + return { + "status": response.status_code, + "statusText": response.reason, + "data": response.json(), + } + except requests.exceptions.RequestException as err: + logger.error("Request error: %s", err) + return None + +def load_alert_from_file(file_path): + try: + with open(file_path, 'r') as file: + alert = json.load(file) + logger.info("Loaded alert from file: %s", file_path) + except Exception as e: + logger.error("Error loading alert file: %s", e) + return None + + timeDst = int(time.time() * 1000) # Generate current timestamp in milliseconds + logger.info("Generated timestamp: %s", timeDst) + + # Replace UID and time with generated values + alert["finding_info"]["uid"] = str(uuid.uuid4()) # Auto-generate unique identifier (UID) + alert["time"] = timeDst # Auto-generate current timestamp in milliseconds + logger.info("Assigned UID and timestamp to alert.") + + return alert + +def query_sentinelone(query, start_time, end_time, token, scope): + url = "https://xdr.us1.sentinelone.net/api/powerQuery" + payload = json.dumps({ + "query": query, + "startTime": start_time, + "endTime": end_time + }) + headers = { + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {token}', + 'S1-Scope': scope # Confirm account_id here if it's the correct scope identifier + } + + try: + logger.info("Sending query to SentinelOne powerQuery API.") + response = requests.post(url, headers=headers, data=payload) + response.raise_for_status() # Ensure we notice bad responses + logger.info("Query successful with status code: %s", response.status_code) + + try: + # Try to parse JSON response + response_json = response.json() + logger.debug("Parsed response: %s", response_json) + + # Check if the 'values' field exists and contains data + if not response_json or 'values' not in response_json or not response_json['values']: + logger.warning("No valid data found in response. Could not update alert.") + return None + + # Return the data (values) from the response + return response_json + + except ValueError as e: + logger.error("Error parsing JSON response: %s", e) + return None + + except requests.exceptions.HTTPError as http_err: + logger.error("HTTP error: %s - Response content: %s", http_err, response.text) + except Exception as err: + logger.error("Other error: %s", err) + + return None + +def update_alert_with_process_name_and_md5(alert, query_result, row_index): + try: + logger.debug("Query result: %s", query_result) # Log the entire response for inspection + + if "columns" in query_result and "values" in query_result: + logger.debug("Columns: %s", query_result["columns"]) + logger.debug("Values: %s", query_result["values"]) + + # Extract column names and their indices + column_names = [col["name"] for col in query_result["columns"]] + logger.debug("Column names: %s", column_names) + + # Extract the process name and MD5 hash columns independently + process_name_index = None + md5_index = None + + if "src.process.file.name" in column_names: + process_name_index = column_names.index("src.process.file.name") + logger.debug("Process name index: %d", process_name_index) + + if "md5" in column_names: + md5_index = column_names.index("md5") + logger.debug("MD5 hash index: %d", md5_index) + + # Get the specific row based on the index + row = query_result["values"][row_index] + + # Process name update + if process_name_index is not None: + process_name = row[process_name_index] + logger.debug("Extracted process name: %s", process_name) + + # Update the alert's "evidences.process.file.name" with this value + if "evidences" in alert and alert["evidences"]: + alert["evidences"][0]["process"]["file"]["name"] = process_name + logger.info("Updated evidences.process.file.name with value: %s", process_name) + else: + logger.warning("No evidences found in the alert to update process name.") + + # MD5 hash update + if md5_index is not None: + md5_value = row[md5_index] + logger.debug("Extracted MD5 hash: %s", md5_value) + + # Check if "evidences.process.file.hashes" exists in the alert + if "evidences" in alert and alert["evidences"] and "process" in alert["evidences"][0]: + file_data = alert["evidences"][0]["process"]["file"] + if "hashes" in file_data: + # Search for an existing MD5 hash with algorithm_id 1 + md5_hash_found = False + for hash_entry in file_data["hashes"]: + if hash_entry.get("algorithm_id") == 1: # MD5 has algorithm_id 1 + hash_entry["value"] = md5_value # Update the MD5 hash value + md5_hash_found = True + logger.info("Updated MD5 hash (algorithm_id 1) with value: %s", md5_value) + break + + # If no existing MD5 hash was found, add a new entry + if not md5_hash_found: + file_data["hashes"].append({"algorithm_id": 1, "value": md5_value}) + logger.info("Added new MD5 hash (algorithm_id 1) with value: %s", md5_value) + else: + # If "hashes" is missing, initialize it + file_data["hashes"] = [{"algorithm_id": 1, "value": md5_value}] + logger.info("Initialized MD5 hashes with value: %s", md5_value) + else: + logger.warning("No hashes found in the alert to update.") + else: + logger.warning("Unexpected query result format. Could not update alert.") + + except (IndexError, KeyError) as e: + logger.error("Error extracting process name or MD5 hash: %s", e) + + + + + +# Example usage +if __name__ == "__main__": + token = os.getenv("SENTINELONE_TOKEN") + account_id = os.getenv("ACCOUNT_ID") + site_id = "" # Default to empty if SITE_ID is not set + logger.info("Environment variables loaded. Token: %s, Account ID: %s, Site ID: %s", token, account_id, site_id) + + if not token or not account_id: + logger.error("Missing necessary environment variables.") + exit(1) + + # Define scope and prepare for egress + scope = f"{account_id}" + logger.info("Scope: %s", scope) + + # Load alert data from JSON file + alert_file_path = "alert.json" + + + # Query SentinelOne for additional data + query = "|union (|limit 1 | columns src.process.file.name= 'hello', md5 = '5eb63bbbe01eeed093cb22bb8f5acdc3'), (|limit 1 | columns src.process.file.name= 'world', md5 = '7d793037a0760186574b0282f2f435e7')" # Define your actual query + start_time = "1h" # Last 7 days in milliseconds + end_time = "0h" + query_result = query_sentinelone(query, start_time, end_time, token, scope) + + # Loop through each row in the query result and send a separate alert for each + if query_result: + for i, row in enumerate(query_result['values']): + alert = load_alert_from_file(alert_file_path) + if alert is None: + logger.error("Failed to load alert data. Exiting.") + exit(1) + + logger.info("Processing row %d: %s", i, row) + + # Pass the row and its index to the update function + update_alert_with_process_name_and_md5(alert, query_result, i) # Pass the index and full query result + + # Define UAM ingest URL + uam_ingest_url = "https://ingest.us1.sentinelone.net" + response = egress_uam_alert(alert, scope, token, uam_ingest_url) + + if response: + logger.info("Egress successful for row %d. Response: %s", i, response) + else: + logger.error("Egress failed for row %d.", i) + else: + logger.error("Query failed or returned no data. Proceeding without update.")