From 5712a5ad7e4d89ac9209e6cdb203a2fd53f47ead Mon Sep 17 00:00:00 2001 From: Maximilian Lehrbaum Date: Wed, 4 Feb 2026 17:51:15 +0100 Subject: [PATCH 1/7] Update Unit42Intelligence.py relationship reliability Instance parameter for custom reliability also added to relationship creations. --- .../Unit42Intelligence/Unit42Intelligence.py | 95 ++++++++++++------- 1 file changed, 63 insertions(+), 32 deletions(-) diff --git a/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/Integrations/Unit42Intelligence/Unit42Intelligence.py b/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/Integrations/Unit42Intelligence/Unit42Intelligence.py index dfcfd3c04af0..adce316d14a9 100644 --- a/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/Integrations/Unit42Intelligence/Unit42Intelligence.py +++ b/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/Integrations/Unit42Intelligence/Unit42Intelligence.py @@ -267,7 +267,11 @@ def remove_mitre_technique_id_prefix(threat_name: str) -> str: def create_relationships( - indicator: str, indicator_type: str, threat_objects: list[dict[str, Any]], create_relationships: bool + indicator: str, + indicator_type: str, + threat_objects: list[dict[str, Any]], + create_relationships: bool, + reliability: str = DBotScoreReliability.A_PLUS_PLUS, ) -> list[EntityRelationship]: """ Create relationships between indicator and threat objects @@ -305,7 +309,7 @@ def create_relationships( entity_a_type=indicator_type, entity_b=threat_name, entity_b_type=INDICATOR_TYPE_MAPPING[threat_class], - source_reliability=DBotScoreReliability.A_PLUS_PLUS, + source_reliability=reliability, brand=INTEGRATION_NAME, ) relationships.append(relationship) @@ -439,7 +443,10 @@ def create_publications(publications_data: list) -> list: def create_threat_object_relationships( - threat_obj: dict[str, Any], threat_object_name: str, threat_class: str + threat_obj: dict[str, Any], + threat_object_name: str, + threat_class: str, + reliability: str = DBotScoreReliability.A_PLUS_PLUS, ) -> list[EntityRelationship]: """ Create threat object relationships from related_threat_objects @@ -469,7 +476,7 @@ def create_threat_object_relationships( entity_a_type=INDICATOR_TYPE_MAPPING[threat_class], entity_b=related_name, entity_b_type=INDICATOR_TYPE_MAPPING[related_class], - source_reliability=DBotScoreReliability.A_PLUS_PLUS, + source_reliability=reliability, brand=INTEGRATION_NAME, ) relationships.append(entity_relationship.to_entry()) @@ -478,7 +485,10 @@ def create_threat_object_relationships( def create_campaigns_relationships( - threat_obj: dict[str, Any], threat_object_name: str, threat_class: str + threat_obj: dict[str, Any], + threat_object_name: str, + threat_class: str, + reliability: str = DBotScoreReliability.A_PLUS_PLUS, ) -> list[EntityRelationship]: """ Create campaigns relationships from campaigns list @@ -502,7 +512,7 @@ def create_campaigns_relationships( entity_a_type=INDICATOR_TYPE_MAPPING[threat_class], entity_b=string_to_table_header(campaign), entity_b_type=ThreatIntel.ObjectsNames.CAMPAIGN, - source_reliability=DBotScoreReliability.A_PLUS_PLUS, + source_reliability=reliability, brand=INTEGRATION_NAME, ) relationships.append(entity_relationship.to_entry()) @@ -511,7 +521,10 @@ def create_campaigns_relationships( def create_attack_patterns_relationships( - threat_obj: dict[str, Any], threat_actor_name: str, threat_class: str + threat_obj: dict[str, Any], + threat_actor_name: str, + threat_class: str, + reliability: str = DBotScoreReliability.A_PLUS_PLUS, ) -> list[EntityRelationship]: """ Create attack patterns relationships from attack patterns associations @@ -546,7 +559,7 @@ def create_attack_patterns_relationships( entity_a_type=INDICATOR_TYPE_MAPPING[threat_class], entity_b=string_to_table_header(pattern_name), entity_b_type=ThreatIntel.ObjectsNames.ATTACK_PATTERN, - source_reliability=DBotScoreReliability.A_PLUS_PLUS, + source_reliability=reliability, brand=INTEGRATION_NAME, ) relationships.append(entity_relationship.to_entry()) @@ -555,7 +568,10 @@ def create_attack_patterns_relationships( def create_malware_relationships( - threat_obj: dict[str, Any], threat_actor_name: str, threat_class: str + threat_obj: dict[str, Any], + threat_actor_name: str, + threat_class: str, + reliability: str = DBotScoreReliability.A_PLUS_PLUS, ) -> list[EntityRelationship]: """ Create malware relationships from malware_associations @@ -583,7 +599,7 @@ def create_malware_relationships( entity_a_type=INDICATOR_TYPE_MAPPING[threat_class], entity_b=string_to_table_header(name), entity_b_type=ThreatIntel.ObjectsNames.MALWARE, - source_reliability=DBotScoreReliability.A_PLUS_PLUS, + source_reliability=reliability, brand=INTEGRATION_NAME, ) relationships.append(entity_relationship.to_entry()) @@ -596,7 +612,7 @@ def create_malware_relationships( entity_a_type=INDICATOR_TYPE_MAPPING[threat_class], entity_b=string_to_table_header(alias), entity_b_type=ThreatIntel.ObjectsNames.MALWARE, - source_reliability=DBotScoreReliability.A_PLUS_PLUS, + source_reliability=reliability, brand=INTEGRATION_NAME, ) relationships.append(entity_relationship.to_entry()) @@ -604,7 +620,12 @@ def create_malware_relationships( return relationships -def create_tools_relationships(threat_obj: dict[str, Any], threat_actor_name: str, threat_class: str) -> list[EntityRelationship]: +def create_tools_relationships( + threat_obj: dict[str, Any], + threat_actor_name: str, + threat_class: str, + reliability: str = DBotScoreReliability.A_PLUS_PLUS, +) -> list[EntityRelationship]: """ Create tools relationships from tools associations @@ -629,7 +650,7 @@ def create_tools_relationships(threat_obj: dict[str, Any], threat_actor_name: st entity_a_type=INDICATOR_TYPE_MAPPING[threat_class], entity_b=string_to_table_header(tool_name), entity_b_type=ThreatIntel.ObjectsNames.TOOL, - source_reliability=DBotScoreReliability.A_PLUS_PLUS, + source_reliability=reliability, brand=INTEGRATION_NAME, fields={"tags": f"mitre-id: {tool.get('mitreid')}" if tool.get("mitreid") else ""}, ) @@ -639,7 +660,10 @@ def create_tools_relationships(threat_obj: dict[str, Any], threat_actor_name: st def create_vulnerabilities_relationships( - threat_obj: dict[str, Any], threat_actor_name: str, threat_class: str + threat_obj: dict[str, Any], + threat_actor_name: str, + threat_class: str, + reliability: str = DBotScoreReliability.A_PLUS_PLUS, ) -> list[EntityRelationship]: """ Create vulnerabilities relationships from vulnerabilities associations @@ -665,7 +689,7 @@ def create_vulnerabilities_relationships( entity_a_type=INDICATOR_TYPE_MAPPING[threat_class], entity_b=cve_id.upper(), entity_b_type=FeedIndicatorType.CVE, - source_reliability=DBotScoreReliability.A_PLUS_PLUS, + source_reliability=reliability, brand=INTEGRATION_NAME, ) relationships.append(entity_relationship.to_entry()) @@ -674,7 +698,10 @@ def create_vulnerabilities_relationships( def create_actor_relationships( - threat_obj: dict[str, Any], malware_family_name: str, threat_class: str + threat_obj: dict[str, Any], + malware_family_name: str, + threat_class: str, + reliability: str = DBotScoreReliability.A_PLUS_PLUS, ) -> list[EntityRelationship]: """ Create actor relationships from actor_associations @@ -703,7 +730,7 @@ def create_actor_relationships( entity_a_type=INDICATOR_TYPE_MAPPING[threat_class], entity_b=string_to_table_header(alias), entity_b_type=ThreatIntel.ObjectsNames.THREAT_ACTOR, - source_reliability=DBotScoreReliability.A_PLUS_PLUS, + source_reliability=reliability, brand=INTEGRATION_NAME, ) relationships.append(entity_relationship.to_entry()) @@ -715,7 +742,7 @@ def create_actor_relationships( entity_a_type=INDICATOR_TYPE_MAPPING[threat_class], entity_b=string_to_table_header(name), entity_b_type=ThreatIntel.ObjectsNames.THREAT_ACTOR, - source_reliability=DBotScoreReliability.A_PLUS_PLUS, + source_reliability=reliability, brand=INTEGRATION_NAME, ) relationships.append(entity_relationship.to_entry()) @@ -723,7 +750,11 @@ def create_actor_relationships( return relationships -def create_location_indicators_and_relationships(threat_obj: dict[str, Any], threat_actor_name: str) -> list[dict[str, Any]]: +def create_location_indicators_and_relationships( + threat_obj: dict[str, Any], + threat_actor_name: str, + reliability: str = DBotScoreReliability.A_PLUS_PLUS, +) -> list[dict[str, Any]]: """ Create location indicators from affected regions and origin field and build relationships @@ -760,7 +791,7 @@ def create_location_indicators_and_relationships(threat_obj: dict[str, Any], thr entity_a_type=ThreatIntel.ObjectsNames.THREAT_ACTOR, entity_b=standardized_region, entity_b_type=FeedIndicatorType.Location, - source_reliability=DBotScoreReliability.A_PLUS_PLUS, + source_reliability=reliability, brand=INTEGRATION_NAME, ) @@ -830,13 +861,13 @@ def create_threat_object_indicators( # Create relationships relationships = [] - relationships += create_threat_object_relationships(threat_obj, name, threat_class) - relationships += create_campaigns_relationships(threat_obj, name, threat_class) - relationships += create_attack_patterns_relationships(threat_obj, name, threat_class) - relationships += create_malware_relationships(threat_obj, name, threat_class) - relationships += create_tools_relationships(threat_obj, name, threat_class) - relationships += create_vulnerabilities_relationships(threat_obj, name, threat_class) - relationships += create_actor_relationships(threat_obj, name, threat_class) + relationships += create_threat_object_relationships(threat_obj, name, threat_class, reliability) + relationships += create_campaigns_relationships(threat_obj, name, threat_class, reliability) + relationships += create_attack_patterns_relationships(threat_obj, name, threat_class, reliability) + relationships += create_malware_relationships(threat_obj, name, threat_class, reliability) + relationships += create_tools_relationships(threat_obj, name, threat_class, reliability) + relationships += create_vulnerabilities_relationships(threat_obj, name, threat_class, reliability) + relationships += create_actor_relationships(threat_obj, name, threat_class, reliability) # Create fields with threat object details fields = { @@ -867,7 +898,7 @@ def create_threat_object_indicators( indicators.append(indicator_data) # Create location indicators from affected regions - location_indicators = create_location_indicators_and_relationships(threat_obj, name) + location_indicators = create_location_indicators_and_relationships(threat_obj, name, reliability) indicators.extend(location_indicators) return indicators @@ -981,7 +1012,7 @@ def ip_command(client: Client, args: dict[str, Any]) -> CommandResults: ip_indicator = Common.IP(ip=ip, dbot_score=dbot_score, tags=tags, malware_family=malware_families) # Create relationships - relationships = create_relationships(ip, FeedIndicatorType.IP, threat_objects, create_relationships_flag) + relationships = create_relationships(ip, FeedIndicatorType.IP, threat_objects, create_relationships_flag, client.reliability) # Create indicators from relationships if create_threat_object_indicators_flag: @@ -1044,7 +1075,7 @@ def domain_command(client: Client, args: dict[str, Any]) -> CommandResults: domain_indicator = Common.Domain(domain=domain, dbot_score=dbot_score, tags=tags, malware_family=malware_families) # Create relationships - relationships = create_relationships(domain, FeedIndicatorType.Domain, threat_objects, create_relationships_flag) + relationships = create_relationships(domain, FeedIndicatorType.Domain, threat_objects, create_relationships_flag, client.reliability) # Create indicators from relationships if create_threat_object_indicators_flag: @@ -1107,7 +1138,7 @@ def url_command(client: Client, args: dict[str, Any]) -> CommandResults: url_indicator = Common.URL(url=url, dbot_score=dbot_score, tags=tags, malware_family=malware_families) # Create relationships - relationships = create_relationships(url, FeedIndicatorType.URL, threat_objects, create_relationships_flag) + relationships = create_relationships(url, FeedIndicatorType.URL, threat_objects, create_relationships_flag, client.reliability) # Create indicators from relationships if create_threat_object_indicators_flag: @@ -1188,7 +1219,7 @@ def file_command(client: Client, args: dict[str, Any]) -> CommandResults: ) # Create relationships - relationships = create_relationships(file_hash, FeedIndicatorType.File, threat_objects, create_relationships_flag) + relationships = create_relationships(file_hash, FeedIndicatorType.File, threat_objects, create_relationships_flag, client.reliability) # Create indicators from relationships if create_threat_object_indicators_flag: From e5fef454e8c23de977efaa0c6550ddfe5e38dd90 Mon Sep 17 00:00:00 2001 From: Maximilian Lehrbaum Date: Wed, 4 Feb 2026 17:53:37 +0100 Subject: [PATCH 2/7] Create ReleaseNotes for Unit42 Intelligence Pack --- .../ReleaseNotes/1_0_12.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/ReleaseNotes/1_0_12.md diff --git a/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/ReleaseNotes/1_0_12.md b/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/ReleaseNotes/1_0_12.md new file mode 100644 index 000000000000..e3ed85050fc1 --- /dev/null +++ b/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/ReleaseNotes/1_0_12.md @@ -0,0 +1,5 @@ +#### Integrations + +##### Unit 42 Intelligence + +- Change relationship source_reliability to use parameter reliability. From e73829f62b63a6ae90df1f56931aedd39aba4312 Mon Sep 17 00:00:00 2001 From: Maximilian Lehrbaum Date: Wed, 4 Feb 2026 17:53:52 +0100 Subject: [PATCH 3/7] Update pack_metadata.json --- .../pack_metadata.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/pack_metadata.json b/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/pack_metadata.json index 15ba65ebd2ec..b9e2f40dec87 100644 --- a/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/pack_metadata.json +++ b/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/pack_metadata.json @@ -2,7 +2,7 @@ "name": "Unit 42 Threat Intelligence by Palo Alto Networks", "description": "Use the Unit 42 Threat Intelligence by Palo Alto Networks integrations to enrich indicators with threat intelligence data and fetch threat intelligence feeds.", "support": "xsoar", - "currentVersion": "1.0.11", + "currentVersion": "1.0.12", "author": "Cortex XSOAR", "url": "https://www.paloaltonetworks.com/cortex", "email": "", @@ -49,4 +49,4 @@ "cloud_runtime_security", "cloud_posture" ] -} \ No newline at end of file +} From c25b4e87832d0e4c45b1bf816f027ad67c8ae6a3 Mon Sep 17 00:00:00 2001 From: Maximilian Lehrbaum Date: Wed, 4 Feb 2026 17:54:16 +0100 Subject: [PATCH 4/7] Create CONTRIBUTORS.json --- .../CONTRIBUTORS.json | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/CONTRIBUTORS.json diff --git a/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/CONTRIBUTORS.json b/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/CONTRIBUTORS.json new file mode 100644 index 000000000000..fcabcec481f7 --- /dev/null +++ b/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/CONTRIBUTORS.json @@ -0,0 +1,3 @@ +[ + "Maximilian Lehrbaum" +] From e5cbb22414486afbb54120969fec0d8446c4c6bb Mon Sep 17 00:00:00 2001 From: Maximilian Lehrbaum Date: Wed, 4 Feb 2026 18:13:47 +0100 Subject: [PATCH 5/7] Update Unit42Intelligence.py Formating fixed --- .../Unit42Intelligence/Unit42Intelligence.py | 603 ++++++++++-------- 1 file changed, 350 insertions(+), 253 deletions(-) diff --git a/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/Integrations/Unit42Intelligence/Unit42Intelligence.py b/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/Integrations/Unit42Intelligence/Unit42Intelligence.py index adce316d14a9..b2b03a0e949e 100644 --- a/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/Integrations/Unit42Intelligence/Unit42Intelligence.py +++ b/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/Integrations/Unit42Intelligence/Unit42Intelligence.py @@ -1,7 +1,8 @@ +from typing import Any + import demistomock as demisto # noqa: F401 -from CommonServerPython import * # noqa: F401 import urllib3 -from typing import Any +from CommonServerPython import * # noqa: F401 # Disable insecure warnings urllib3.disable_warnings() @@ -9,11 +10,11 @@ #### CONSTANTS #### -INTEGRATION_NAME = "Unit 42 Intelligence" +INTEGRATION_NAME = 'Unit 42 Intelligence' # API endpoints -SERVER_URL = "https://prod-us.tas.crtx.paloaltonetworks.com" -LOOKUP_ENDPOINT = "/api/v1/lookups/indicator/{indicator_type}/{indicator_value}" +SERVER_URL = 'https://prod-us.tas.crtx.paloaltonetworks.com' +LOOKUP_ENDPOINT = '/api/v1/lookups/indicator/{indicator_type}/{indicator_value}' # Retry configuration RETRY_COUNT = 5 @@ -21,43 +22,43 @@ # Score mappings VERDICT_TO_SCORE = { - "malicious": Common.DBotScore.BAD, - "suspicious": Common.DBotScore.SUSPICIOUS, - "benign": Common.DBotScore.GOOD, - "unknown": Common.DBotScore.NONE, + 'malicious': Common.DBotScore.BAD, + 'suspicious': Common.DBotScore.SUSPICIOUS, + 'benign': Common.DBotScore.GOOD, + 'unknown': Common.DBotScore.NONE, } # Indicator type mappings INDICATOR_TYPE_MAPPING = { - "ip": FeedIndicatorType.IP, - "domain": FeedIndicatorType.Domain, - "url": FeedIndicatorType.URL, - "file": FeedIndicatorType.File, - "filehash_sha256": FeedIndicatorType.File, - "exploit": FeedIndicatorType.CVE, - "malware_family": ThreatIntel.ObjectsNames.MALWARE, - "actor": ThreatIntel.ObjectsNames.THREAT_ACTOR, - "threat_actor": ThreatIntel.ObjectsNames.THREAT_ACTOR, - "campaign": ThreatIntel.ObjectsNames.CAMPAIGN, - "attack pattern": ThreatIntel.ObjectsNames.ATTACK_PATTERN, - "technique": ThreatIntel.ObjectsNames.ATTACK_PATTERN, - "malicious_behavior": ThreatIntel.ObjectsNames.ATTACK_PATTERN, - "malicious behavior": ThreatIntel.ObjectsNames.ATTACK_PATTERN, + 'ip': FeedIndicatorType.IP, + 'domain': FeedIndicatorType.Domain, + 'url': FeedIndicatorType.URL, + 'file': FeedIndicatorType.File, + 'filehash_sha256': FeedIndicatorType.File, + 'exploit': FeedIndicatorType.CVE, + 'malware_family': ThreatIntel.ObjectsNames.MALWARE, + 'actor': ThreatIntel.ObjectsNames.THREAT_ACTOR, + 'threat_actor': ThreatIntel.ObjectsNames.THREAT_ACTOR, + 'campaign': ThreatIntel.ObjectsNames.CAMPAIGN, + 'attack pattern': ThreatIntel.ObjectsNames.ATTACK_PATTERN, + 'technique': ThreatIntel.ObjectsNames.ATTACK_PATTERN, + 'malicious_behavior': ThreatIntel.ObjectsNames.ATTACK_PATTERN, + 'malicious behavior': ThreatIntel.ObjectsNames.ATTACK_PATTERN, } # Define valid regions enum VALID_REGIONS = { - "australia and oceania": "Australia And Oceania", - "antarctica": "Antarctica", - "north america": "North America", - "south asia": "South Asia", - "europe": "Europe", - "central america and the caribbean": "Central America And The Caribbean", - "africa": "Africa", - "east and southeast asia": "East And Southeast Asia", - "middle east": "Middle East", - "central asia": "Central Asia", - "south america": "South America", + 'australia and oceania': 'Australia And Oceania', + 'antarctica': 'Antarctica', + 'north america': 'North America', + 'south asia': 'South Asia', + 'europe': 'Europe', + 'central america and the caribbean': 'Central America And The Caribbean', + 'africa': 'Africa', + 'east and southeast asia': 'East And Southeast Asia', + 'middle east': 'Middle East', + 'central asia': 'Central Asia', + 'south america': 'South America', } #### HELPER FUNCTIONS #### @@ -106,9 +107,13 @@ def parse_url_list(url_input: str | list | None) -> list[str]: # Split by comma only if followed by a URL scheme (supports all schemes, not just http/https) # This preserves commas within URL parameters all_urls: list[str] = [] - for line in url_input.split("\n"): + for line in url_input.split('\n'): if line := line.strip(): - all_urls.extend(url.strip() for url in re.split(r",\s*(?=[a-zA-Z][a-zA-Z0-9+.-]*://)", line) if url.strip()) + all_urls.extend( + url.strip() + for url in re.split(r',\s*(?=[a-zA-Z][a-zA-Z0-9+.-]*://)', line) + if url.strip() + ) return all_urls @@ -124,12 +129,14 @@ def unit42_error_handler(res: requests.Response): Returns: Error message string including X-Request-ID if available """ - request_id = res.headers.get("X-Request-ID", "N/A") - demisto.debug(f"{INTEGRATION_NAME} API Error - X-Request-ID: {request_id}, Status: {res.status_code}, URL: {res.url}") + request_id = res.headers.get('X-Request-ID', 'N/A') + demisto.debug( + f'{INTEGRATION_NAME} API Error - X-Request-ID: {request_id}, Status: {res.status_code}, URL: {res.url}' + ) - error_msg = f"Error in API request [Status: {res.status_code}]\n" - error_msg += f"[X-Request-ID: {request_id}]\n" - error_msg += f"Response text - {res.text}" + error_msg = f'Error in API request [Status: {res.status_code}]\n' + error_msg += f'[X-Request-ID: {request_id}]\n' + error_msg += f'Response text - {res.text}' return_error(error_msg) @@ -149,7 +156,7 @@ def encode_url_indicator(indicator_value: str) -> str: # Step 2: Encode special characters in the query string (keeping '=' safe) # This converts query= to query=%3Ctest%3E - encoded_query = urllib.parse.quote(parsed.query, safe="=") if parsed.query else "" + encoded_query = urllib.parse.quote(parsed.query, safe='=') if parsed.query else '' # Step 3: Reconstruct the URL with the encoded query temp_url = urllib.parse.urlunparse(parsed._replace(query=encoded_query)) @@ -158,7 +165,7 @@ def encode_url_indicator(indicator_value: str) -> str: # This converts the entire URL including the already-encoded query # Example: https://example.com/search?query=%3Ctest%3E becomes # https%3A%2F%2Fexample.com%2Fsearch%3Fquery%3D%253Ctest%253E - return urllib.parse.quote(temp_url, safe="") + return urllib.parse.quote(temp_url, safe='') #### CLIENT CLASS #### @@ -173,7 +180,10 @@ def __init__( proxy: bool, reliability: str, ): - headers = {"Authorization": f"Bearer {demisto.getLicenseID()}", "Content-Type": "application/json"} + headers = { + 'Authorization': f'Bearer {demisto.getLicenseID()}', + 'Content-Type': 'application/json', + } super().__init__(base_url=SERVER_URL, verify=verify, proxy=proxy, headers=headers) self.reliability = reliability @@ -188,16 +198,18 @@ def lookup_indicator(self, indicator_type: str, indicator_value: str) -> request Returns: requests.Response object """ - if indicator_type.lower() == "url": + if indicator_type.lower() == 'url': indicator_value = encode_url_indicator(indicator_value=indicator_value) - endpoint = LOOKUP_ENDPOINT.format(indicator_type=indicator_type, indicator_value=indicator_value) + endpoint = LOOKUP_ENDPOINT.format( + indicator_type=indicator_type, indicator_value=indicator_value + ) return self._http_request( - method="GET", + method='GET', url_suffix=endpoint, ok_codes=(200, 404), - resp_type="response", + resp_type='response', error_handler=unit42_error_handler, retries=RETRY_COUNT, status_list_to_retry=STATUS_CODES_TO_RETRY, @@ -222,12 +234,14 @@ def create_dbot_score( Returns: DBotScore object """ - score: int = VERDICT_TO_SCORE.get(verdict.lower() or "unknown", Common.DBotScore.NONE) + score: int = VERDICT_TO_SCORE.get(verdict.lower() or 'unknown', Common.DBotScore.NONE) # Add malicious description if the verdict is malicious malicious_description = None - if verdict.lower() == "malicious": - malicious_description = f"Unit 42 Intelligence classified this {indicator_type.lower()} as malicious" + if verdict.lower() == 'malicious': + malicious_description = ( + f'Unit 42 Intelligence classified this {indicator_type.lower()} as malicious' + ) return Common.DBotScore( indicator=indicator, @@ -259,17 +273,17 @@ def remove_mitre_technique_id_prefix(threat_name: str) -> str: >>> remove_mitre_technique_id_prefix("Not a MITRE ID - Something") "Not a MITRE ID - Something" """ - if " - " in threat_name: - parts = threat_name.split(" - ", 1) - if len(parts) == 2 and parts[0].startswith("T") and parts[0][1:].isdigit(): + if ' - ' in threat_name: + parts = threat_name.split(' - ', 1) + if len(parts) == 2 and parts[0].startswith('T') and parts[0][1:].isdigit(): return parts[1] return threat_name def create_relationships( - indicator: str, - indicator_type: str, - threat_objects: list[dict[str, Any]], + indicator: str, + indicator_type: str, + threat_objects: list[dict[str, Any]], create_relationships: bool, reliability: str = DBotScoreReliability.A_PLUS_PLUS, ) -> list[EntityRelationship]: @@ -288,19 +302,23 @@ def create_relationships( relationships: list[EntityRelationship] = [] if not create_relationships or not threat_objects: - demisto.debug(f"Skipping create_relationships as {create_relationships} and {threat_objects=}") + demisto.debug( + f'Skipping create_relationships as {create_relationships} and {threat_objects=}' + ) return relationships for threat_obj in threat_objects: - threat_name = threat_obj.get("name", "") - threat_class = threat_obj.get("threat_object_class", "").lower() + threat_name = threat_obj.get('name', '') + threat_class = threat_obj.get('threat_object_class', '').lower() # Remove MITRE technique ID prefix for attack patterns if INDICATOR_TYPE_MAPPING[threat_class] == ThreatIntel.ObjectsNames.ATTACK_PATTERN: threat_name = remove_mitre_technique_id_prefix(threat_name) if not threat_name or threat_class not in INDICATOR_TYPE_MAPPING: - demisto.debug(f"Skipping create_relationships for threat_name {threat_name} and threat_class {threat_class}") + demisto.debug( + f'Skipping create_relationships for threat_name {threat_name} and threat_class {threat_class}' + ) continue relationship = EntityRelationship( @@ -328,17 +346,19 @@ def extract_response_data(response: dict[str, Any]) -> dict[str, Any]: Dictionary containing extracted data """ return { - "indicator_value": response.get("indicator_value", ""), - "indicator_type": response.get("indicator_type", ""), - "counts": response.get("counts", []), - "verdict": response.get("verdict", "unknown"), - "verdict_categories": [item.get("value") for item in response.get("verdict_categories", [])], - "first_seen": response.get("first_seen", ""), - "last_seen": response.get("last_seen", ""), - "updated_at": response.get("updated_at", ""), - "seen_by": response.get("sources", []), - "threat_object_associations": response.get("threat_object_associations", []), - "indicator_details": response.get("indicator_details", {}), + 'indicator_value': response.get('indicator_value', ''), + 'indicator_type': response.get('indicator_type', ''), + 'counts': response.get('counts', []), + 'verdict': response.get('verdict', 'unknown'), + 'verdict_categories': [ + item.get('value') for item in response.get('verdict_categories', []) + ], + 'first_seen': response.get('first_seen', ''), + 'last_seen': response.get('last_seen', ''), + 'updated_at': response.get('updated_at', ''), + 'seen_by': response.get('sources', []), + 'threat_object_associations': response.get('threat_object_associations', []), + 'indicator_details': response.get('indicator_details', {}), } @@ -354,19 +374,21 @@ def extract_tags_from_threat_objects(threat_objects: list[dict[str, Any]]) -> li """ tags = [] for threat_obj in threat_objects: - name = threat_obj.get("name") + name = threat_obj.get('name') if name: tags.append(name) # Add aliases as additional tags - aliases = threat_obj.get("aliases", []) + aliases = threat_obj.get('aliases', []) if aliases: tags.extend([alias for alias in aliases if alias]) return list(set(tags)) # Remove duplicates -def extract_malware_families_from_threat_objects(threat_objects: list[dict[str, Any]]) -> str | None: +def extract_malware_families_from_threat_objects( + threat_objects: list[dict[str, Any]], +) -> str | None: """ Extract malware families from threat object associations @@ -377,9 +399,9 @@ def extract_malware_families_from_threat_objects(threat_objects: list[dict[str, Malware family name if found, None otherwise """ for threat_obj in threat_objects: - threat_class = threat_obj.get("threat_object_class", "").lower() - if threat_class == "malware_family": - name = threat_obj.get("name") + threat_class = threat_obj.get('threat_object_class', '').lower() + if threat_class == 'malware_family': + name = threat_obj.get('name') if name: return name @@ -396,24 +418,30 @@ def build_threat_object_description(threat_obj: dict[str, Any]) -> str: Returns: Formatted description string with sections for highlights, methods, and targets """ - description = threat_obj.get("description", "").replace("\\n", "\n") + description = threat_obj.get('description', '').replace('\\n', '\n') # Add highlights section if available - highlights = demisto.get(threat_obj, "battlecard_details.highlights", "").replace("\\n", "\n") - if highlights and highlights != "Highlights / Key Takeaways (external)": # Do not add if it is only the default title - description += "\n\n##" + highlights = demisto.get(threat_obj, 'battlecard_details.highlights', '').replace('\\n', '\n') + if ( + highlights and highlights != 'Highlights / Key Takeaways (external)' + ): # Do not add if it is only the default title + description += '\n\n##' description += highlights # Add methods section if available (for threat actors) - methods = demisto.get(threat_obj, "battlecard_details.threat_actor_details.methods", "").replace("\\n", "\n") + methods = demisto.get( + threat_obj, 'battlecard_details.threat_actor_details.methods', '' + ).replace('\\n', '\n') if methods: - description += "\n\n##" + description += '\n\n##' description += methods # Add targets section if available (for threat actors) - targets = demisto.get(threat_obj, "battlecard_details.threat_actor_details.targets", "").replace("\\n", "\n") + targets = demisto.get( + threat_obj, 'battlecard_details.threat_actor_details.targets', '' + ).replace('\\n', '\n') if targets: - description += "\n\n##" + description += '\n\n##' description += targets return description @@ -432,12 +460,12 @@ def create_publications(publications_data: list) -> list: publications = [] for data in publications_data: - timestamp = data.get("created", "") - title = data.get("title", "") - url = data.get("url", "") - source = data.get("source", INTEGRATION_NAME) + timestamp = data.get('created', '') + title = data.get('title', '') + url = data.get('url', '') + source = data.get('source', INTEGRATION_NAME) - publications.append({"link": url, "title": title, "timestamp": timestamp, "source": source}) + publications.append({'link': url, 'title': title, 'timestamp': timestamp, 'source': source}) return publications @@ -460,14 +488,14 @@ def create_threat_object_relationships( List of EntityRelationship objects """ relationships = [] - related_threat_objects = threat_obj.get("related_threat_objects", []) + related_threat_objects = threat_obj.get('related_threat_objects', []) for related_obj in related_threat_objects: if not isinstance(related_obj, dict): continue - related_name = related_obj.get("name") - related_class = related_obj.get("class", "").lower() + related_name = related_obj.get('name') + related_class = related_obj.get('class', '').lower() if related_name and related_class: entity_relationship = EntityRelationship( @@ -502,7 +530,7 @@ def create_campaigns_relationships( List of EntityRelationship objects """ relationships = [] - campaigns = demisto.get(threat_obj, "battlecard_details.campaigns", []) + campaigns = demisto.get(threat_obj, 'battlecard_details.campaigns', []) for campaign in campaigns: if isinstance(campaign, str) and campaign.strip(): @@ -538,20 +566,20 @@ def create_attack_patterns_relationships( List of EntityRelationship objects """ relationships = [] - attack_patterns = demisto.get(threat_obj, "battlecard_details.attack_patterns", []) + attack_patterns = demisto.get(threat_obj, 'battlecard_details.attack_patterns', []) for pattern in attack_patterns: - mitre_id = pattern.get("mitreid", "") - pattern_name = pattern.get("name", "") + mitre_id = pattern.get('mitreid', '') + pattern_name = pattern.get('name', '') # Skip items with a dot in the mitreid - if "." in mitre_id: - demisto.debug(f"Skipping attack pattern {pattern_name} with mitreid {mitre_id}") + if '.' in mitre_id: + demisto.debug(f'Skipping attack pattern {pattern_name} with mitreid {mitre_id}') continue - if pattern_name and pattern_name.endswith("(enterprise)"): + if pattern_name and pattern_name.endswith('(enterprise)'): # Remove (enterprise) suffix if present - pattern_name = pattern_name.removesuffix("(enterprise)").strip() + pattern_name = pattern_name.removesuffix('(enterprise)').strip() entity_relationship = EntityRelationship( name=EntityRelationship.Relationships.USES, @@ -585,11 +613,13 @@ def create_malware_relationships( List of EntityRelationship objects """ relationships = [] - malware_associations = demisto.get(threat_obj, "battlecard_details.threat_actor_details.malware_associations", []) + malware_associations = demisto.get( + threat_obj, 'battlecard_details.threat_actor_details.malware_associations', [] + ) for relationship in malware_associations: - name = relationship.get("name") - aliases = relationship.get("aliases", []) + name = relationship.get('name') + aliases = relationship.get('aliases', []) if name: # Create a relationship using the name @@ -638,10 +668,12 @@ def create_tools_relationships( List of EntityRelationship objects """ relationships = [] - tools_associations = demisto.get(threat_obj, "battlecard_details.threat_actor_details.tools", []) + tools_associations = demisto.get( + threat_obj, 'battlecard_details.threat_actor_details.tools', [] + ) for tool in tools_associations: - tool_name = tool.get("name") + tool_name = tool.get('name') if tool_name: entity_relationship = EntityRelationship( @@ -652,7 +684,7 @@ def create_tools_relationships( entity_b_type=ThreatIntel.ObjectsNames.TOOL, source_reliability=reliability, brand=INTEGRATION_NAME, - fields={"tags": f"mitre-id: {tool.get('mitreid')}" if tool.get("mitreid") else ""}, + fields={'tags': f'mitre-id: {tool.get("mitreid")}' if tool.get('mitreid') else ''}, ) relationships.append(entity_relationship.to_entry()) @@ -677,10 +709,12 @@ def create_vulnerabilities_relationships( List of EntityRelationship objects """ relationships = [] - vulnerabilities = demisto.get(threat_obj, "battlecard_details.threat_actor_details.vulnerability_associations", []) + vulnerabilities = demisto.get( + threat_obj, 'battlecard_details.threat_actor_details.vulnerability_associations', [] + ) for vulnerability in vulnerabilities: - cve_id = vulnerability.get("cve") + cve_id = vulnerability.get('cve') if cve_id: entity_relationship = EntityRelationship( @@ -715,11 +749,13 @@ def create_actor_relationships( List of EntityRelationship objects """ relationships = [] - actor_associations = demisto.get(threat_obj, "battlecard_details.malware_family_details.actor_associations", []) + actor_associations = demisto.get( + threat_obj, 'battlecard_details.malware_family_details.actor_associations', [] + ) for relationship in actor_associations: - aliases = relationship.get("aliases", []) - name = relationship.get("name") + aliases = relationship.get('aliases', []) + name = relationship.get('name') if aliases: # Create a relationship for each alias @@ -768,7 +804,9 @@ def create_location_indicators_and_relationships( location_indicators: list = [] # Handle affected regions - affected_regions = demisto.get(threat_obj, "battlecard_details.threat_actor_details.affected_regions", []) + affected_regions = demisto.get( + threat_obj, 'battlecard_details.threat_actor_details.affected_regions', [] + ) # in case affected_regions is "null", return empty list. if not isinstance(affected_regions, list): @@ -781,7 +819,7 @@ def create_location_indicators_and_relationships( # Use the standardized region name if it matches our enum standardized_region = VALID_REGIONS.get(region_lower) if not standardized_region: - demisto.debug(f"Skipping region {region} as it is not in the valid regions enum") + demisto.debug(f'Skipping region {region} as it is not in the valid regions enum') continue # Create EntityRelationship for the location @@ -796,13 +834,13 @@ def create_location_indicators_and_relationships( ) location_indicator = { - "value": standardized_region, - "type": FeedIndicatorType.Location, - "score": Common.DBotScore.NONE, - "service": INTEGRATION_NAME, - "relationships": [entity_relationship.to_entry()], - "fields": { - "geocountry": standardized_region, + 'value': standardized_region, + 'type': FeedIndicatorType.Location, + 'score': Common.DBotScore.NONE, + 'service': INTEGRATION_NAME, + 'relationships': [entity_relationship.to_entry()], + 'fields': { + 'geocountry': standardized_region, }, } location_indicators.append(location_indicator) @@ -838,7 +876,7 @@ def get_threat_object_score(threat_class: str) -> int: def create_threat_object_indicators( - threat_objects: list[dict[str, Any]], reliability: str = "A++ - Reputation script" + threat_objects: list[dict[str, Any]], reliability: str = 'A++ - Reputation script' ) -> list[dict[str, Any]]: """ Create threat object indicators from threat object associations @@ -853,58 +891,73 @@ def create_threat_object_indicators( indicators = [] for threat_obj in threat_objects: - name = threat_obj.get("name") - threat_class = threat_obj.get("threat_object_class", "").lower() + name = threat_obj.get('name') + threat_class = threat_obj.get('threat_object_class', '').lower() if not name or threat_class not in INDICATOR_TYPE_MAPPING: continue # Create relationships relationships = [] - relationships += create_threat_object_relationships(threat_obj, name, threat_class, reliability) + relationships += create_threat_object_relationships( + threat_obj, name, threat_class, reliability + ) relationships += create_campaigns_relationships(threat_obj, name, threat_class, reliability) - relationships += create_attack_patterns_relationships(threat_obj, name, threat_class, reliability) + relationships += create_attack_patterns_relationships( + threat_obj, name, threat_class, reliability + ) relationships += create_malware_relationships(threat_obj, name, threat_class, reliability) relationships += create_tools_relationships(threat_obj, name, threat_class, reliability) - relationships += create_vulnerabilities_relationships(threat_obj, name, threat_class, reliability) + relationships += create_vulnerabilities_relationships( + threat_obj, name, threat_class, reliability + ) relationships += create_actor_relationships(threat_obj, name, threat_class, reliability) # Create fields with threat object details fields = { - "description": build_threat_object_description(threat_obj), - "reportedby": threat_obj.get("sources"), - "aliases": [string_to_table_header(alias) for alias in threat_obj.get("aliases", [])], - "industrysectors": [ - string_to_table_header(industry) for industry in demisto.get(threat_obj, "battlecard_details.industries", []) + 'description': build_threat_object_description(threat_obj), + 'reportedby': threat_obj.get('sources'), + 'aliases': [string_to_table_header(alias) for alias in threat_obj.get('aliases', [])], + 'industrysectors': [ + string_to_table_header(industry) + for industry in demisto.get(threat_obj, 'battlecard_details.industries', []) ], - "primarymotivation": string_to_table_header( - demisto.get(threat_obj, "battlecard_details.threat_actor_details.primary_motivation", "") + 'primarymotivation': string_to_table_header( + demisto.get( + threat_obj, 'battlecard_details.threat_actor_details.primary_motivation', '' + ) ), - "publications": create_publications(threat_obj.get("publications", [])), - "geocountry": demisto.get(threat_obj, "battlecard_details.threat_actor_details.origin", "").upper(), - "ismalwarefamily": "True" if threat_class == "malware_family" else "False", + 'publications': create_publications(threat_obj.get('publications', [])), + 'geocountry': demisto.get( + threat_obj, 'battlecard_details.threat_actor_details.origin', '' + ).upper(), + 'ismalwarefamily': 'True' if threat_class == 'malware_family' else 'False', } indicator_data = { - "value": name, - "type": INDICATOR_TYPE_MAPPING[threat_class], - "score": get_threat_object_score(threat_class), - "service": INTEGRATION_NAME, - "relationships": relationships, - "fields": fields, - "rawJSON": threat_obj, + 'value': name, + 'type': INDICATOR_TYPE_MAPPING[threat_class], + 'score': get_threat_object_score(threat_class), + 'service': INTEGRATION_NAME, + 'relationships': relationships, + 'fields': fields, + 'rawJSON': threat_obj, } indicators.append(indicator_data) # Create location indicators from affected regions - location_indicators = create_location_indicators_and_relationships(threat_obj, name, reliability) + location_indicators = create_location_indicators_and_relationships( + threat_obj, name, reliability + ) indicators.extend(location_indicators) return indicators -def create_context_data(response_data: dict[str, Any], indicator_value: str | None = None) -> dict[str, Any]: +def create_context_data( + response_data: dict[str, Any], indicator_value: str | None = None +) -> dict[str, Any]: """ Create context data for indicators @@ -915,15 +968,17 @@ def create_context_data(response_data: dict[str, Any], indicator_value: str | No Dictionary containing context data """ return { - "Value": indicator_value or response_data["indicator_value"], - "Type": INDICATOR_TYPE_MAPPING.get(response_data["indicator_type"]), - "Verdict": string_to_table_header(response_data["verdict"]), - "VerdictCategories": list({string_to_table_header(item) for item in response_data["verdict_categories"]}), - "Counts": response_data["counts"], - "FirstSeen": response_data["first_seen"], - "LastSeen": response_data["last_seen"], - "SeenBy": list({string_to_table_header(item) for item in response_data["seen_by"]}), - "EnrichedThreatObjectAssociation": response_data["threat_object_associations"], + 'Value': indicator_value or response_data['indicator_value'], + 'Type': INDICATOR_TYPE_MAPPING.get(response_data['indicator_type']), + 'Verdict': string_to_table_header(response_data['verdict']), + 'VerdictCategories': list( + {string_to_table_header(item) for item in response_data['verdict_categories']} + ), + 'Counts': response_data['counts'], + 'FirstSeen': response_data['first_seen'], + 'LastSeen': response_data['last_seen'], + 'SeenBy': list({string_to_table_header(item) for item in response_data['seen_by']}), + 'EnrichedThreatObjectAssociation': response_data['threat_object_associations'], } @@ -939,17 +994,19 @@ def construct_404_response(indicator_value: str, indicator_type: str) -> dict[st Dictionary containing indicator default response in cases of 404 """ return { - "indicator_value": indicator_value, - "indicator_type": indicator_type, - "verdict": "Unknown", - "verdict_categories": [], - "counts": [{"count_type": "wf_sample", "count_values": {"benign": 0, "grayware": 0, "malware": 0}}], - "first_seen": "", - "last_seen": "", - "seen_by": [], - "threat_object_associations": [], - "is_observed": False, - "sources": [], + 'indicator_value': indicator_value, + 'indicator_type': indicator_type, + 'verdict': 'Unknown', + 'verdict_categories': [], + 'counts': [ + {'count_type': 'wf_sample', 'count_values': {'benign': 0, 'grayware': 0, 'malware': 0}} + ], + 'first_seen': '', + 'last_seen': '', + 'seen_by': [], + 'threat_object_associations': [], + 'is_observed': False, + 'sources': [], } @@ -968,10 +1025,10 @@ def test_module(client: Client) -> str: """ try: # Test with a known safe domain - client.lookup_indicator("domain", "example.com") - return "ok" + client.lookup_indicator('domain', 'example.com') + return 'ok' except Exception as e: - return f"Test failed: {str(e)}" + return f'Test failed: {str(e)}' #### COMMAND FUNCTIONS #### @@ -988,31 +1045,39 @@ def ip_command(client: Client, args: dict[str, Any]) -> CommandResults: Returns: CommandResults object """ - ip = args.get("ip", "") - create_relationships_flag = argToBoolean(args.get("create_relationships", True)) - create_threat_object_indicators_flag = argToBoolean(args.get("create_threat_object_indicators", False)) + ip = args.get('ip', '') + create_relationships_flag = argToBoolean(args.get('create_relationships', True)) + create_threat_object_indicators_flag = argToBoolean( + args.get('create_threat_object_indicators', False) + ) - response = client.lookup_indicator("ip", ip) + response = client.lookup_indicator('ip', ip) if response.status_code == 404: - response_data = construct_404_response(ip, "IP") + response_data = construct_404_response(ip, 'IP') else: response_data = extract_response_data(response.json()) - threat_objects = response_data["threat_object_associations"] + threat_objects = response_data['threat_object_associations'] # Create DBotScore - dbot_score = create_dbot_score(ip, DBotScoreType.IP, response_data["verdict"], client.reliability) + dbot_score = create_dbot_score( + ip, DBotScoreType.IP, response_data['verdict'], client.reliability + ) # Extract tags and malware families from threat objects tags = extract_tags_from_threat_objects(threat_objects) malware_families = extract_malware_families_from_threat_objects(threat_objects) # Create enriched IP indicator with tags and malware families - ip_indicator = Common.IP(ip=ip, dbot_score=dbot_score, tags=tags, malware_family=malware_families) + ip_indicator = Common.IP( + ip=ip, dbot_score=dbot_score, tags=tags, malware_family=malware_families + ) # Create relationships - relationships = create_relationships(ip, FeedIndicatorType.IP, threat_objects, create_relationships_flag, client.reliability) + relationships = create_relationships( + ip, FeedIndicatorType.IP, threat_objects, create_relationships_flag, client.reliability + ) # Create indicators from relationships if create_threat_object_indicators_flag: @@ -1024,16 +1089,16 @@ def ip_command(client: Client, args: dict[str, Any]) -> CommandResults: context_data = create_context_data(response_data) readable_output = tableToMarkdown( - f"Unit 42 Intelligence results for IP: {ip}", + f'Unit 42 Intelligence results for IP: {ip}', context_data, - headers=["Value", "Verdict", "VerdictCategories", "SeenBy", "FirstSeen", "LastSeen"], + headers=['Value', 'Verdict', 'VerdictCategories', 'SeenBy', 'FirstSeen', 'LastSeen'], headerTransform=pascalToSpace, removeNull=True, ) return CommandResults( - outputs_prefix="Unit42Intelligence.IP", - outputs_key_field="Value", + outputs_prefix='Unit42Intelligence.IP', + outputs_key_field='Value', outputs=context_data, readable_output=readable_output, indicator=ip_indicator, @@ -1052,30 +1117,42 @@ def domain_command(client: Client, args: dict[str, Any]) -> CommandResults: Returns: CommandResults object """ - domain = args.get("domain", "") - create_relationships_flag = argToBoolean(args.get("create_relationships", True)) - create_threat_object_indicators_flag = argToBoolean(args.get("create_threat_object_indicators", False)) + domain = args.get('domain', '') + create_relationships_flag = argToBoolean(args.get('create_relationships', True)) + create_threat_object_indicators_flag = argToBoolean( + args.get('create_threat_object_indicators', False) + ) - response = client.lookup_indicator("domain", domain) + response = client.lookup_indicator('domain', domain) if response.status_code == 404: - response_data = construct_404_response(domain, "Domain") + response_data = construct_404_response(domain, 'Domain') else: response_data = extract_response_data(response.json()) - threat_objects = response_data["threat_object_associations"] + threat_objects = response_data['threat_object_associations'] # Create DBotScore - dbot_score = create_dbot_score(domain, DBotScoreType.DOMAIN, response_data["verdict"], client.reliability) + dbot_score = create_dbot_score( + domain, DBotScoreType.DOMAIN, response_data['verdict'], client.reliability + ) # Extract tags and malware families from threat objects tags = extract_tags_from_threat_objects(threat_objects) malware_families = extract_malware_families_from_threat_objects(threat_objects) # Create enriched Domain indicator with tags and malware families - domain_indicator = Common.Domain(domain=domain, dbot_score=dbot_score, tags=tags, malware_family=malware_families) + domain_indicator = Common.Domain( + domain=domain, dbot_score=dbot_score, tags=tags, malware_family=malware_families + ) # Create relationships - relationships = create_relationships(domain, FeedIndicatorType.Domain, threat_objects, create_relationships_flag, client.reliability) + relationships = create_relationships( + domain, + FeedIndicatorType.Domain, + threat_objects, + create_relationships_flag, + client.reliability, + ) # Create indicators from relationships if create_threat_object_indicators_flag: @@ -1087,16 +1164,16 @@ def domain_command(client: Client, args: dict[str, Any]) -> CommandResults: context_data = create_context_data(response_data) readable_output = tableToMarkdown( - f"Unit 42 Intelligence results for Domain: {domain}", + f'Unit 42 Intelligence results for Domain: {domain}', context_data, - headers=["Value", "Verdict", "VerdictCategories", "SeenBy", "FirstSeen", "LastSeen"], + headers=['Value', 'Verdict', 'VerdictCategories', 'SeenBy', 'FirstSeen', 'LastSeen'], headerTransform=pascalToSpace, removeNull=True, ) return CommandResults( - outputs_prefix="Unit42Intelligence.Domain", - outputs_key_field="Value", + outputs_prefix='Unit42Intelligence.Domain', + outputs_key_field='Value', outputs=context_data, readable_output=readable_output, indicator=domain_indicator, @@ -1115,30 +1192,38 @@ def url_command(client: Client, args: dict[str, Any]) -> CommandResults: Returns: CommandResults object """ - url = args.get("url", "") - create_relationships_flag = argToBoolean(args.get("create_relationships", True)) - create_threat_object_indicators_flag = argToBoolean(args.get("create_threat_object_indicators", False)) + url = args.get('url', '') + create_relationships_flag = argToBoolean(args.get('create_relationships', True)) + create_threat_object_indicators_flag = argToBoolean( + args.get('create_threat_object_indicators', False) + ) - response = client.lookup_indicator("url", url) + response = client.lookup_indicator('url', url) if response.status_code == 404: - response_data = construct_404_response(url, "URL") + response_data = construct_404_response(url, 'URL') else: response_data = extract_response_data(response.json()) - threat_objects = response_data["threat_object_associations"] + threat_objects = response_data['threat_object_associations'] # Create DBotScore - dbot_score = create_dbot_score(url, DBotScoreType.URL, response_data["verdict"], client.reliability) + dbot_score = create_dbot_score( + url, DBotScoreType.URL, response_data['verdict'], client.reliability + ) # Extract tags and malware families from threat objects tags = extract_tags_from_threat_objects(threat_objects) malware_families = extract_malware_families_from_threat_objects(threat_objects) # Create enriched URL indicator with tags and malware families - url_indicator = Common.URL(url=url, dbot_score=dbot_score, tags=tags, malware_family=malware_families) + url_indicator = Common.URL( + url=url, dbot_score=dbot_score, tags=tags, malware_family=malware_families + ) # Create relationships - relationships = create_relationships(url, FeedIndicatorType.URL, threat_objects, create_relationships_flag, client.reliability) + relationships = create_relationships( + url, FeedIndicatorType.URL, threat_objects, create_relationships_flag, client.reliability + ) # Create indicators from relationships if create_threat_object_indicators_flag: @@ -1150,16 +1235,16 @@ def url_command(client: Client, args: dict[str, Any]) -> CommandResults: context_data = create_context_data(response_data, indicator_value=url) readable_output = tableToMarkdown( - f"Unit 42 Intelligence results for URL: {url}", + f'Unit 42 Intelligence results for URL: {url}', context_data, - headers=["Value", "Verdict", "VerdictCategories", "SeenBy", "FirstSeen", "LastSeen"], + headers=['Value', 'Verdict', 'VerdictCategories', 'SeenBy', 'FirstSeen', 'LastSeen'], headerTransform=pascalToSpace, removeNull=True, ) return CommandResults( - outputs_prefix="Unit42Intelligence.URL", - outputs_key_field="Value", + outputs_prefix='Unit42Intelligence.URL', + outputs_key_field='Value', outputs=context_data, readable_output=readable_output, indicator=url_indicator, @@ -1178,27 +1263,31 @@ def file_command(client: Client, args: dict[str, Any]) -> CommandResults: Returns: CommandResults object """ - file_hash = args.get("file", "") - create_relationships_flag = argToBoolean(args.get("create_relationships", True)) - create_threat_object_indicators_flag = argToBoolean(args.get("create_threat_object_indicators", False)) + file_hash = args.get('file', '') + create_relationships_flag = argToBoolean(args.get('create_relationships', True)) + create_threat_object_indicators_flag = argToBoolean( + args.get('create_threat_object_indicators', False) + ) # Validate hash type - Unit 42 Intelligence only supports SHA256 hash_type = get_hash_type(file_hash) - if hash_type != "sha256": + if hash_type != 'sha256': return CommandResults( - readable_output=f"Unit 42 Intelligence only supports SHA256 hashes. Provided hash type: {hash_type}" + readable_output=f'Unit 42 Intelligence only supports SHA256 hashes. Provided hash type: {hash_type}' ) - response = client.lookup_indicator("filehash_sha256", file_hash) + response = client.lookup_indicator('filehash_sha256', file_hash) if response.status_code == 404: - response_data = construct_404_response(file_hash, "File") + response_data = construct_404_response(file_hash, 'File') else: response_data = extract_response_data(response.json()) - threat_objects = response_data["threat_object_associations"] + threat_objects = response_data['threat_object_associations'] # Create DBotScore - dbot_score = create_dbot_score(file_hash, DBotScoreType.FILE, response_data["verdict"], client.reliability) + dbot_score = create_dbot_score( + file_hash, DBotScoreType.FILE, response_data['verdict'], client.reliability + ) # Extract tags and malware families from threat objects tags = extract_tags_from_threat_objects(threat_objects) @@ -1206,20 +1295,26 @@ def file_command(client: Client, args: dict[str, Any]) -> CommandResults: # Create enriched File indicator with proper hash field assignment file_indicator = Common.File( - size=demisto.get(response_data, "indicator_details.file_size", ""), - file_type=demisto.get(response_data, "indicator_details.file_type", ""), - imphash=demisto.get(response_data, "indicator_details.file_hashes.imphash", ""), - md5=demisto.get(response_data, "indicator_details.file_hashes.md5", ""), - sha1=demisto.get(response_data, "indicator_details.file_hashes.sha1", ""), + size=demisto.get(response_data, 'indicator_details.file_size', ''), + file_type=demisto.get(response_data, 'indicator_details.file_type', ''), + imphash=demisto.get(response_data, 'indicator_details.file_hashes.imphash', ''), + md5=demisto.get(response_data, 'indicator_details.file_hashes.md5', ''), + sha1=demisto.get(response_data, 'indicator_details.file_hashes.sha1', ''), sha256=file_hash, - ssdeep=demisto.get(response_data, "indicator_details.file_hashes.ssdeep", ""), + ssdeep=demisto.get(response_data, 'indicator_details.file_hashes.ssdeep', ''), dbot_score=dbot_score, tags=tags, malware_family=malware_families, ) # Create relationships - relationships = create_relationships(file_hash, FeedIndicatorType.File, threat_objects, create_relationships_flag, client.reliability) + relationships = create_relationships( + file_hash, + FeedIndicatorType.File, + threat_objects, + create_relationships_flag, + client.reliability, + ) # Create indicators from relationships if create_threat_object_indicators_flag: @@ -1231,16 +1326,16 @@ def file_command(client: Client, args: dict[str, Any]) -> CommandResults: context_data = create_context_data(response_data) readable_output = tableToMarkdown( - f"Unit 42 Intelligence results for File: {file_hash}", + f'Unit 42 Intelligence results for File: {file_hash}', context_data, - headers=["Value", "Verdict", "VerdictCategories", "SeenBy", "FirstSeen", "LastSeen"], + headers=['Value', 'Verdict', 'VerdictCategories', 'SeenBy', 'FirstSeen', 'LastSeen'], headerTransform=pascalToSpace, removeNull=True, ) return CommandResults( - outputs_prefix="Unit42Intelligence.File", - outputs_key_field="Value", + outputs_prefix='Unit42Intelligence.File', + outputs_key_field='Value', outputs=context_data, readable_output=readable_output, indicator=file_indicator, @@ -1259,17 +1354,19 @@ def main() -> None: command = demisto.command() # Get parameters - verify_certificate = not argToBoolean(params.get("insecure", False)) - proxy = argToBoolean(params.get("proxy", False)) - reliability = params.get("integration_reliability", "A++ - Reputation script") - create_relationships = argToBoolean(params.get("create_relationships", True)) - create_threat_object_indicators = argToBoolean(params.get("create_threat_object_indicators", False)) + verify_certificate = not argToBoolean(params.get('insecure', False)) + proxy = argToBoolean(params.get('proxy', False)) + reliability = params.get('integration_reliability', 'A++ - Reputation script') + create_relationships = argToBoolean(params.get('create_relationships', True)) + create_threat_object_indicators = argToBoolean( + params.get('create_threat_object_indicators', False) + ) # Add create_relationships to args for commands - args["create_relationships"] = create_relationships - args["create_threat_object_indicators"] = create_threat_object_indicators + args['create_relationships'] = create_relationships + args['create_threat_object_indicators'] = create_threat_object_indicators - demisto.debug(f"Command being called is {command}") + demisto.debug(f'Command being called is {command}') try: client = Client( @@ -1278,50 +1375,50 @@ def main() -> None: reliability=reliability, ) - if command == "test-module": + if command == 'test-module': result = test_module(client) return_results(result) - elif command == "ip": + elif command == 'ip': results = [] - ips = argToList(args.get("ip", "")) + ips = argToList(args.get('ip', '')) for ip in ips: - args["ip"] = ip + args['ip'] = ip results.append(ip_command(client, args)) return_results(results) - elif command == "domain": + elif command == 'domain': results = [] - domains = argToList(args.get("domain", "")) + domains = argToList(args.get('domain', '')) for domain in domains: - args["domain"] = domain + args['domain'] = domain results.append(domain_command(client, args)) return_results(results) - elif command == "url": + elif command == 'url': results = [] # Use smart URL parsing to handle URLs with commas in parameters - urls = parse_url_list(args.get("url", "")) + urls = parse_url_list(args.get('url', '')) for url in urls: - args["url"] = url + args['url'] = url results.append(url_command(client, args)) return_results(results) - elif command == "file": + elif command == 'file': results = [] - files = argToList(args.get("file", "")) + files = argToList(args.get('file', '')) for file in files: - args["file"] = file + args['file'] = file results.append(file_command(client, args)) return_results(results) else: - raise NotImplementedError(f"Command {command} is not implemented") + raise NotImplementedError(f'Command {command} is not implemented') except Exception as e: demisto.error(traceback.format_exc()) - return_error(f"Failed to execute {command} command.\nError:\n{str(e)}") + return_error(f'Failed to execute {command} command.\nError:\n{str(e)}') -if __name__ in ("__main__", "__builtin__", "builtins"): +if __name__ in ('__main__', '__builtin__', 'builtins'): main() From b44871816cfcfacfc0a3eb37dfd6049290f2f019 Mon Sep 17 00:00:00 2001 From: Maximilian Lehrbaum Date: Thu, 5 Feb 2026 10:52:39 +0100 Subject: [PATCH 6/7] Reverted formating changes --- .../Unit42Intelligence/Unit42Intelligence.py | 597 ++++++++---------- 1 file changed, 250 insertions(+), 347 deletions(-) diff --git a/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/Integrations/Unit42Intelligence/Unit42Intelligence.py b/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/Integrations/Unit42Intelligence/Unit42Intelligence.py index b2b03a0e949e..4e826953c261 100644 --- a/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/Integrations/Unit42Intelligence/Unit42Intelligence.py +++ b/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/Integrations/Unit42Intelligence/Unit42Intelligence.py @@ -1,8 +1,7 @@ -from typing import Any - import demistomock as demisto # noqa: F401 -import urllib3 from CommonServerPython import * # noqa: F401 +import urllib3 +from typing import Any # Disable insecure warnings urllib3.disable_warnings() @@ -10,11 +9,11 @@ #### CONSTANTS #### -INTEGRATION_NAME = 'Unit 42 Intelligence' +INTEGRATION_NAME = "Unit 42 Intelligence" # API endpoints -SERVER_URL = 'https://prod-us.tas.crtx.paloaltonetworks.com' -LOOKUP_ENDPOINT = '/api/v1/lookups/indicator/{indicator_type}/{indicator_value}' +SERVER_URL = "https://prod-us.tas.crtx.paloaltonetworks.com" +LOOKUP_ENDPOINT = "/api/v1/lookups/indicator/{indicator_type}/{indicator_value}" # Retry configuration RETRY_COUNT = 5 @@ -22,43 +21,43 @@ # Score mappings VERDICT_TO_SCORE = { - 'malicious': Common.DBotScore.BAD, - 'suspicious': Common.DBotScore.SUSPICIOUS, - 'benign': Common.DBotScore.GOOD, - 'unknown': Common.DBotScore.NONE, + "malicious": Common.DBotScore.BAD, + "suspicious": Common.DBotScore.SUSPICIOUS, + "benign": Common.DBotScore.GOOD, + "unknown": Common.DBotScore.NONE, } # Indicator type mappings INDICATOR_TYPE_MAPPING = { - 'ip': FeedIndicatorType.IP, - 'domain': FeedIndicatorType.Domain, - 'url': FeedIndicatorType.URL, - 'file': FeedIndicatorType.File, - 'filehash_sha256': FeedIndicatorType.File, - 'exploit': FeedIndicatorType.CVE, - 'malware_family': ThreatIntel.ObjectsNames.MALWARE, - 'actor': ThreatIntel.ObjectsNames.THREAT_ACTOR, - 'threat_actor': ThreatIntel.ObjectsNames.THREAT_ACTOR, - 'campaign': ThreatIntel.ObjectsNames.CAMPAIGN, - 'attack pattern': ThreatIntel.ObjectsNames.ATTACK_PATTERN, - 'technique': ThreatIntel.ObjectsNames.ATTACK_PATTERN, - 'malicious_behavior': ThreatIntel.ObjectsNames.ATTACK_PATTERN, - 'malicious behavior': ThreatIntel.ObjectsNames.ATTACK_PATTERN, + "ip": FeedIndicatorType.IP, + "domain": FeedIndicatorType.Domain, + "url": FeedIndicatorType.URL, + "file": FeedIndicatorType.File, + "filehash_sha256": FeedIndicatorType.File, + "exploit": FeedIndicatorType.CVE, + "malware_family": ThreatIntel.ObjectsNames.MALWARE, + "actor": ThreatIntel.ObjectsNames.THREAT_ACTOR, + "threat_actor": ThreatIntel.ObjectsNames.THREAT_ACTOR, + "campaign": ThreatIntel.ObjectsNames.CAMPAIGN, + "attack pattern": ThreatIntel.ObjectsNames.ATTACK_PATTERN, + "technique": ThreatIntel.ObjectsNames.ATTACK_PATTERN, + "malicious_behavior": ThreatIntel.ObjectsNames.ATTACK_PATTERN, + "malicious behavior": ThreatIntel.ObjectsNames.ATTACK_PATTERN, } # Define valid regions enum VALID_REGIONS = { - 'australia and oceania': 'Australia And Oceania', - 'antarctica': 'Antarctica', - 'north america': 'North America', - 'south asia': 'South Asia', - 'europe': 'Europe', - 'central america and the caribbean': 'Central America And The Caribbean', - 'africa': 'Africa', - 'east and southeast asia': 'East And Southeast Asia', - 'middle east': 'Middle East', - 'central asia': 'Central Asia', - 'south america': 'South America', + "australia and oceania": "Australia And Oceania", + "antarctica": "Antarctica", + "north america": "North America", + "south asia": "South Asia", + "europe": "Europe", + "central america and the caribbean": "Central America And The Caribbean", + "africa": "Africa", + "east and southeast asia": "East And Southeast Asia", + "middle east": "Middle East", + "central asia": "Central Asia", + "south america": "South America", } #### HELPER FUNCTIONS #### @@ -107,13 +106,9 @@ def parse_url_list(url_input: str | list | None) -> list[str]: # Split by comma only if followed by a URL scheme (supports all schemes, not just http/https) # This preserves commas within URL parameters all_urls: list[str] = [] - for line in url_input.split('\n'): + for line in url_input.split("\n"): if line := line.strip(): - all_urls.extend( - url.strip() - for url in re.split(r',\s*(?=[a-zA-Z][a-zA-Z0-9+.-]*://)', line) - if url.strip() - ) + all_urls.extend(url.strip() for url in re.split(r",\s*(?=[a-zA-Z][a-zA-Z0-9+.-]*://)", line) if url.strip()) return all_urls @@ -129,14 +124,12 @@ def unit42_error_handler(res: requests.Response): Returns: Error message string including X-Request-ID if available """ - request_id = res.headers.get('X-Request-ID', 'N/A') - demisto.debug( - f'{INTEGRATION_NAME} API Error - X-Request-ID: {request_id}, Status: {res.status_code}, URL: {res.url}' - ) + request_id = res.headers.get("X-Request-ID", "N/A") + demisto.debug(f"{INTEGRATION_NAME} API Error - X-Request-ID: {request_id}, Status: {res.status_code}, URL: {res.url}") - error_msg = f'Error in API request [Status: {res.status_code}]\n' - error_msg += f'[X-Request-ID: {request_id}]\n' - error_msg += f'Response text - {res.text}' + error_msg = f"Error in API request [Status: {res.status_code}]\n" + error_msg += f"[X-Request-ID: {request_id}]\n" + error_msg += f"Response text - {res.text}" return_error(error_msg) @@ -156,7 +149,7 @@ def encode_url_indicator(indicator_value: str) -> str: # Step 2: Encode special characters in the query string (keeping '=' safe) # This converts query= to query=%3Ctest%3E - encoded_query = urllib.parse.quote(parsed.query, safe='=') if parsed.query else '' + encoded_query = urllib.parse.quote(parsed.query, safe="=") if parsed.query else "" # Step 3: Reconstruct the URL with the encoded query temp_url = urllib.parse.urlunparse(parsed._replace(query=encoded_query)) @@ -165,7 +158,7 @@ def encode_url_indicator(indicator_value: str) -> str: # This converts the entire URL including the already-encoded query # Example: https://example.com/search?query=%3Ctest%3E becomes # https%3A%2F%2Fexample.com%2Fsearch%3Fquery%3D%253Ctest%253E - return urllib.parse.quote(temp_url, safe='') + return urllib.parse.quote(temp_url, safe="") #### CLIENT CLASS #### @@ -180,10 +173,7 @@ def __init__( proxy: bool, reliability: str, ): - headers = { - 'Authorization': f'Bearer {demisto.getLicenseID()}', - 'Content-Type': 'application/json', - } + headers = {"Authorization": f"Bearer {demisto.getLicenseID()}", "Content-Type": "application/json"} super().__init__(base_url=SERVER_URL, verify=verify, proxy=proxy, headers=headers) self.reliability = reliability @@ -198,18 +188,16 @@ def lookup_indicator(self, indicator_type: str, indicator_value: str) -> request Returns: requests.Response object """ - if indicator_type.lower() == 'url': + if indicator_type.lower() == "url": indicator_value = encode_url_indicator(indicator_value=indicator_value) - endpoint = LOOKUP_ENDPOINT.format( - indicator_type=indicator_type, indicator_value=indicator_value - ) + endpoint = LOOKUP_ENDPOINT.format(indicator_type=indicator_type, indicator_value=indicator_value) return self._http_request( - method='GET', + method="GET", url_suffix=endpoint, ok_codes=(200, 404), - resp_type='response', + resp_type="response", error_handler=unit42_error_handler, retries=RETRY_COUNT, status_list_to_retry=STATUS_CODES_TO_RETRY, @@ -234,14 +222,12 @@ def create_dbot_score( Returns: DBotScore object """ - score: int = VERDICT_TO_SCORE.get(verdict.lower() or 'unknown', Common.DBotScore.NONE) + score: int = VERDICT_TO_SCORE.get(verdict.lower() or "unknown", Common.DBotScore.NONE) # Add malicious description if the verdict is malicious malicious_description = None - if verdict.lower() == 'malicious': - malicious_description = ( - f'Unit 42 Intelligence classified this {indicator_type.lower()} as malicious' - ) + if verdict.lower() == "malicious": + malicious_description = f"Unit 42 Intelligence classified this {indicator_type.lower()} as malicious" return Common.DBotScore( indicator=indicator, @@ -273,9 +259,9 @@ def remove_mitre_technique_id_prefix(threat_name: str) -> str: >>> remove_mitre_technique_id_prefix("Not a MITRE ID - Something") "Not a MITRE ID - Something" """ - if ' - ' in threat_name: - parts = threat_name.split(' - ', 1) - if len(parts) == 2 and parts[0].startswith('T') and parts[0][1:].isdigit(): + if " - " in threat_name: + parts = threat_name.split(" - ", 1) + if len(parts) == 2 and parts[0].startswith("T") and parts[0][1:].isdigit(): return parts[1] return threat_name @@ -302,23 +288,19 @@ def create_relationships( relationships: list[EntityRelationship] = [] if not create_relationships or not threat_objects: - demisto.debug( - f'Skipping create_relationships as {create_relationships} and {threat_objects=}' - ) + demisto.debug(f"Skipping create_relationships as {create_relationships} and {threat_objects=}") return relationships for threat_obj in threat_objects: - threat_name = threat_obj.get('name', '') - threat_class = threat_obj.get('threat_object_class', '').lower() + threat_name = threat_obj.get("name", "") + threat_class = threat_obj.get("threat_object_class", "").lower() # Remove MITRE technique ID prefix for attack patterns if INDICATOR_TYPE_MAPPING[threat_class] == ThreatIntel.ObjectsNames.ATTACK_PATTERN: threat_name = remove_mitre_technique_id_prefix(threat_name) if not threat_name or threat_class not in INDICATOR_TYPE_MAPPING: - demisto.debug( - f'Skipping create_relationships for threat_name {threat_name} and threat_class {threat_class}' - ) + demisto.debug(f"Skipping create_relationships for threat_name {threat_name} and threat_class {threat_class}") continue relationship = EntityRelationship( @@ -346,19 +328,17 @@ def extract_response_data(response: dict[str, Any]) -> dict[str, Any]: Dictionary containing extracted data """ return { - 'indicator_value': response.get('indicator_value', ''), - 'indicator_type': response.get('indicator_type', ''), - 'counts': response.get('counts', []), - 'verdict': response.get('verdict', 'unknown'), - 'verdict_categories': [ - item.get('value') for item in response.get('verdict_categories', []) - ], - 'first_seen': response.get('first_seen', ''), - 'last_seen': response.get('last_seen', ''), - 'updated_at': response.get('updated_at', ''), - 'seen_by': response.get('sources', []), - 'threat_object_associations': response.get('threat_object_associations', []), - 'indicator_details': response.get('indicator_details', {}), + "indicator_value": response.get("indicator_value", ""), + "indicator_type": response.get("indicator_type", ""), + "counts": response.get("counts", []), + "verdict": response.get("verdict", "unknown"), + "verdict_categories": [item.get("value") for item in response.get("verdict_categories", [])], + "first_seen": response.get("first_seen", ""), + "last_seen": response.get("last_seen", ""), + "updated_at": response.get("updated_at", ""), + "seen_by": response.get("sources", []), + "threat_object_associations": response.get("threat_object_associations", []), + "indicator_details": response.get("indicator_details", {}), } @@ -374,21 +354,19 @@ def extract_tags_from_threat_objects(threat_objects: list[dict[str, Any]]) -> li """ tags = [] for threat_obj in threat_objects: - name = threat_obj.get('name') + name = threat_obj.get("name") if name: tags.append(name) # Add aliases as additional tags - aliases = threat_obj.get('aliases', []) + aliases = threat_obj.get("aliases", []) if aliases: tags.extend([alias for alias in aliases if alias]) return list(set(tags)) # Remove duplicates -def extract_malware_families_from_threat_objects( - threat_objects: list[dict[str, Any]], -) -> str | None: +def extract_malware_families_from_threat_objects(threat_objects: list[dict[str, Any]]) -> str | None: """ Extract malware families from threat object associations @@ -399,9 +377,9 @@ def extract_malware_families_from_threat_objects( Malware family name if found, None otherwise """ for threat_obj in threat_objects: - threat_class = threat_obj.get('threat_object_class', '').lower() - if threat_class == 'malware_family': - name = threat_obj.get('name') + threat_class = threat_obj.get("threat_object_class", "").lower() + if threat_class == "malware_family": + name = threat_obj.get("name") if name: return name @@ -418,30 +396,24 @@ def build_threat_object_description(threat_obj: dict[str, Any]) -> str: Returns: Formatted description string with sections for highlights, methods, and targets """ - description = threat_obj.get('description', '').replace('\\n', '\n') + description = threat_obj.get("description", "").replace("\\n", "\n") # Add highlights section if available - highlights = demisto.get(threat_obj, 'battlecard_details.highlights', '').replace('\\n', '\n') - if ( - highlights and highlights != 'Highlights / Key Takeaways (external)' - ): # Do not add if it is only the default title - description += '\n\n##' + highlights = demisto.get(threat_obj, "battlecard_details.highlights", "").replace("\\n", "\n") + if highlights and highlights != "Highlights / Key Takeaways (external)": # Do not add if it is only the default title + description += "\n\n##" description += highlights # Add methods section if available (for threat actors) - methods = demisto.get( - threat_obj, 'battlecard_details.threat_actor_details.methods', '' - ).replace('\\n', '\n') + methods = demisto.get(threat_obj, "battlecard_details.threat_actor_details.methods", "").replace("\\n", "\n") if methods: - description += '\n\n##' + description += "\n\n##" description += methods # Add targets section if available (for threat actors) - targets = demisto.get( - threat_obj, 'battlecard_details.threat_actor_details.targets', '' - ).replace('\\n', '\n') + targets = demisto.get(threat_obj, "battlecard_details.threat_actor_details.targets", "").replace("\\n", "\n") if targets: - description += '\n\n##' + description += "\n\n##" description += targets return description @@ -460,12 +432,12 @@ def create_publications(publications_data: list) -> list: publications = [] for data in publications_data: - timestamp = data.get('created', '') - title = data.get('title', '') - url = data.get('url', '') - source = data.get('source', INTEGRATION_NAME) + timestamp = data.get("created", "") + title = data.get("title", "") + url = data.get("url", "") + source = data.get("source", INTEGRATION_NAME) - publications.append({'link': url, 'title': title, 'timestamp': timestamp, 'source': source}) + publications.append({"link": url, "title": title, "timestamp": timestamp, "source": source}) return publications @@ -488,14 +460,14 @@ def create_threat_object_relationships( List of EntityRelationship objects """ relationships = [] - related_threat_objects = threat_obj.get('related_threat_objects', []) + related_threat_objects = threat_obj.get("related_threat_objects", []) for related_obj in related_threat_objects: if not isinstance(related_obj, dict): continue - related_name = related_obj.get('name') - related_class = related_obj.get('class', '').lower() + related_name = related_obj.get("name") + related_class = related_obj.get("class", "").lower() if related_name and related_class: entity_relationship = EntityRelationship( @@ -530,7 +502,7 @@ def create_campaigns_relationships( List of EntityRelationship objects """ relationships = [] - campaigns = demisto.get(threat_obj, 'battlecard_details.campaigns', []) + campaigns = demisto.get(threat_obj, "battlecard_details.campaigns", []) for campaign in campaigns: if isinstance(campaign, str) and campaign.strip(): @@ -566,20 +538,20 @@ def create_attack_patterns_relationships( List of EntityRelationship objects """ relationships = [] - attack_patterns = demisto.get(threat_obj, 'battlecard_details.attack_patterns', []) + attack_patterns = demisto.get(threat_obj, "battlecard_details.attack_patterns", []) for pattern in attack_patterns: - mitre_id = pattern.get('mitreid', '') - pattern_name = pattern.get('name', '') + mitre_id = pattern.get("mitreid", "") + pattern_name = pattern.get("name", "") # Skip items with a dot in the mitreid - if '.' in mitre_id: - demisto.debug(f'Skipping attack pattern {pattern_name} with mitreid {mitre_id}') + if "." in mitre_id: + demisto.debug(f"Skipping attack pattern {pattern_name} with mitreid {mitre_id}") continue - if pattern_name and pattern_name.endswith('(enterprise)'): + if pattern_name and pattern_name.endswith("(enterprise)"): # Remove (enterprise) suffix if present - pattern_name = pattern_name.removesuffix('(enterprise)').strip() + pattern_name = pattern_name.removesuffix("(enterprise)").strip() entity_relationship = EntityRelationship( name=EntityRelationship.Relationships.USES, @@ -613,13 +585,11 @@ def create_malware_relationships( List of EntityRelationship objects """ relationships = [] - malware_associations = demisto.get( - threat_obj, 'battlecard_details.threat_actor_details.malware_associations', [] - ) + malware_associations = demisto.get(threat_obj, "battlecard_details.threat_actor_details.malware_associations", []) for relationship in malware_associations: - name = relationship.get('name') - aliases = relationship.get('aliases', []) + name = relationship.get("name") + aliases = relationship.get("aliases", []) if name: # Create a relationship using the name @@ -668,12 +638,10 @@ def create_tools_relationships( List of EntityRelationship objects """ relationships = [] - tools_associations = demisto.get( - threat_obj, 'battlecard_details.threat_actor_details.tools', [] - ) + tools_associations = demisto.get(threat_obj, "battlecard_details.threat_actor_details.tools", []) for tool in tools_associations: - tool_name = tool.get('name') + tool_name = tool.get("name") if tool_name: entity_relationship = EntityRelationship( @@ -684,7 +652,7 @@ def create_tools_relationships( entity_b_type=ThreatIntel.ObjectsNames.TOOL, source_reliability=reliability, brand=INTEGRATION_NAME, - fields={'tags': f'mitre-id: {tool.get("mitreid")}' if tool.get('mitreid') else ''}, + fields={"tags": f"mitre-id: {tool.get('mitreid')}" if tool.get("mitreid") else ""}, ) relationships.append(entity_relationship.to_entry()) @@ -709,12 +677,10 @@ def create_vulnerabilities_relationships( List of EntityRelationship objects """ relationships = [] - vulnerabilities = demisto.get( - threat_obj, 'battlecard_details.threat_actor_details.vulnerability_associations', [] - ) + vulnerabilities = demisto.get(threat_obj, "battlecard_details.threat_actor_details.vulnerability_associations", []) for vulnerability in vulnerabilities: - cve_id = vulnerability.get('cve') + cve_id = vulnerability.get("cve") if cve_id: entity_relationship = EntityRelationship( @@ -749,13 +715,11 @@ def create_actor_relationships( List of EntityRelationship objects """ relationships = [] - actor_associations = demisto.get( - threat_obj, 'battlecard_details.malware_family_details.actor_associations', [] - ) + actor_associations = demisto.get(threat_obj, "battlecard_details.malware_family_details.actor_associations", []) for relationship in actor_associations: - aliases = relationship.get('aliases', []) - name = relationship.get('name') + aliases = relationship.get("aliases", []) + name = relationship.get("name") if aliases: # Create a relationship for each alias @@ -804,9 +768,7 @@ def create_location_indicators_and_relationships( location_indicators: list = [] # Handle affected regions - affected_regions = demisto.get( - threat_obj, 'battlecard_details.threat_actor_details.affected_regions', [] - ) + affected_regions = demisto.get(threat_obj, "battlecard_details.threat_actor_details.affected_regions", []) # in case affected_regions is "null", return empty list. if not isinstance(affected_regions, list): @@ -819,7 +781,7 @@ def create_location_indicators_and_relationships( # Use the standardized region name if it matches our enum standardized_region = VALID_REGIONS.get(region_lower) if not standardized_region: - demisto.debug(f'Skipping region {region} as it is not in the valid regions enum') + demisto.debug(f"Skipping region {region} as it is not in the valid regions enum") continue # Create EntityRelationship for the location @@ -834,13 +796,13 @@ def create_location_indicators_and_relationships( ) location_indicator = { - 'value': standardized_region, - 'type': FeedIndicatorType.Location, - 'score': Common.DBotScore.NONE, - 'service': INTEGRATION_NAME, - 'relationships': [entity_relationship.to_entry()], - 'fields': { - 'geocountry': standardized_region, + "value": standardized_region, + "type": FeedIndicatorType.Location, + "score": Common.DBotScore.NONE, + "service": INTEGRATION_NAME, + "relationships": [entity_relationship.to_entry()], + "fields": { + "geocountry": standardized_region, }, } location_indicators.append(location_indicator) @@ -876,7 +838,7 @@ def get_threat_object_score(threat_class: str) -> int: def create_threat_object_indicators( - threat_objects: list[dict[str, Any]], reliability: str = 'A++ - Reputation script' + threat_objects: list[dict[str, Any]], reliability: str = "A++ - Reputation script" ) -> list[dict[str, Any]]: """ Create threat object indicators from threat object associations @@ -891,73 +853,58 @@ def create_threat_object_indicators( indicators = [] for threat_obj in threat_objects: - name = threat_obj.get('name') - threat_class = threat_obj.get('threat_object_class', '').lower() + name = threat_obj.get("name") + threat_class = threat_obj.get("threat_object_class", "").lower() if not name or threat_class not in INDICATOR_TYPE_MAPPING: continue # Create relationships relationships = [] - relationships += create_threat_object_relationships( - threat_obj, name, threat_class, reliability - ) + relationships += create_threat_object_relationships(threat_obj, name, threat_class, reliability) relationships += create_campaigns_relationships(threat_obj, name, threat_class, reliability) - relationships += create_attack_patterns_relationships( - threat_obj, name, threat_class, reliability - ) + relationships += create_attack_patterns_relationships(threat_obj, name, threat_class, reliability) relationships += create_malware_relationships(threat_obj, name, threat_class, reliability) relationships += create_tools_relationships(threat_obj, name, threat_class, reliability) - relationships += create_vulnerabilities_relationships( - threat_obj, name, threat_class, reliability - ) + relationships += create_vulnerabilities_relationships(threat_obj, name, threat_class, reliability) relationships += create_actor_relationships(threat_obj, name, threat_class, reliability) # Create fields with threat object details fields = { - 'description': build_threat_object_description(threat_obj), - 'reportedby': threat_obj.get('sources'), - 'aliases': [string_to_table_header(alias) for alias in threat_obj.get('aliases', [])], - 'industrysectors': [ - string_to_table_header(industry) - for industry in demisto.get(threat_obj, 'battlecard_details.industries', []) + "description": build_threat_object_description(threat_obj), + "reportedby": threat_obj.get("sources"), + "aliases": [string_to_table_header(alias) for alias in threat_obj.get("aliases", [])], + "industrysectors": [ + string_to_table_header(industry) for industry in demisto.get(threat_obj, "battlecard_details.industries", []) ], - 'primarymotivation': string_to_table_header( - demisto.get( - threat_obj, 'battlecard_details.threat_actor_details.primary_motivation', '' - ) + "primarymotivation": string_to_table_header( + demisto.get(threat_obj, "battlecard_details.threat_actor_details.primary_motivation", "") ), - 'publications': create_publications(threat_obj.get('publications', [])), - 'geocountry': demisto.get( - threat_obj, 'battlecard_details.threat_actor_details.origin', '' - ).upper(), - 'ismalwarefamily': 'True' if threat_class == 'malware_family' else 'False', + "publications": create_publications(threat_obj.get("publications", [])), + "geocountry": demisto.get(threat_obj, "battlecard_details.threat_actor_details.origin", "").upper(), + "ismalwarefamily": "True" if threat_class == "malware_family" else "False", } indicator_data = { - 'value': name, - 'type': INDICATOR_TYPE_MAPPING[threat_class], - 'score': get_threat_object_score(threat_class), - 'service': INTEGRATION_NAME, - 'relationships': relationships, - 'fields': fields, - 'rawJSON': threat_obj, + "value": name, + "type": INDICATOR_TYPE_MAPPING[threat_class], + "score": get_threat_object_score(threat_class), + "service": INTEGRATION_NAME, + "relationships": relationships, + "fields": fields, + "rawJSON": threat_obj, } indicators.append(indicator_data) # Create location indicators from affected regions - location_indicators = create_location_indicators_and_relationships( - threat_obj, name, reliability - ) + location_indicators = create_location_indicators_and_relationships(threat_obj, name, reliability) indicators.extend(location_indicators) return indicators -def create_context_data( - response_data: dict[str, Any], indicator_value: str | None = None -) -> dict[str, Any]: +def create_context_data(response_data: dict[str, Any], indicator_value: str | None = None) -> dict[str, Any]: """ Create context data for indicators @@ -968,17 +915,15 @@ def create_context_data( Dictionary containing context data """ return { - 'Value': indicator_value or response_data['indicator_value'], - 'Type': INDICATOR_TYPE_MAPPING.get(response_data['indicator_type']), - 'Verdict': string_to_table_header(response_data['verdict']), - 'VerdictCategories': list( - {string_to_table_header(item) for item in response_data['verdict_categories']} - ), - 'Counts': response_data['counts'], - 'FirstSeen': response_data['first_seen'], - 'LastSeen': response_data['last_seen'], - 'SeenBy': list({string_to_table_header(item) for item in response_data['seen_by']}), - 'EnrichedThreatObjectAssociation': response_data['threat_object_associations'], + "Value": indicator_value or response_data["indicator_value"], + "Type": INDICATOR_TYPE_MAPPING.get(response_data["indicator_type"]), + "Verdict": string_to_table_header(response_data["verdict"]), + "VerdictCategories": list({string_to_table_header(item) for item in response_data["verdict_categories"]}), + "Counts": response_data["counts"], + "FirstSeen": response_data["first_seen"], + "LastSeen": response_data["last_seen"], + "SeenBy": list({string_to_table_header(item) for item in response_data["seen_by"]}), + "EnrichedThreatObjectAssociation": response_data["threat_object_associations"], } @@ -994,19 +939,17 @@ def construct_404_response(indicator_value: str, indicator_type: str) -> dict[st Dictionary containing indicator default response in cases of 404 """ return { - 'indicator_value': indicator_value, - 'indicator_type': indicator_type, - 'verdict': 'Unknown', - 'verdict_categories': [], - 'counts': [ - {'count_type': 'wf_sample', 'count_values': {'benign': 0, 'grayware': 0, 'malware': 0}} - ], - 'first_seen': '', - 'last_seen': '', - 'seen_by': [], - 'threat_object_associations': [], - 'is_observed': False, - 'sources': [], + "indicator_value": indicator_value, + "indicator_type": indicator_type, + "verdict": "Unknown", + "verdict_categories": [], + "counts": [{"count_type": "wf_sample", "count_values": {"benign": 0, "grayware": 0, "malware": 0}}], + "first_seen": "", + "last_seen": "", + "seen_by": [], + "threat_object_associations": [], + "is_observed": False, + "sources": [], } @@ -1025,10 +968,10 @@ def test_module(client: Client) -> str: """ try: # Test with a known safe domain - client.lookup_indicator('domain', 'example.com') - return 'ok' + client.lookup_indicator("domain", "example.com") + return "ok" except Exception as e: - return f'Test failed: {str(e)}' + return f"Test failed: {str(e)}" #### COMMAND FUNCTIONS #### @@ -1045,39 +988,31 @@ def ip_command(client: Client, args: dict[str, Any]) -> CommandResults: Returns: CommandResults object """ - ip = args.get('ip', '') - create_relationships_flag = argToBoolean(args.get('create_relationships', True)) - create_threat_object_indicators_flag = argToBoolean( - args.get('create_threat_object_indicators', False) - ) + ip = args.get("ip", "") + create_relationships_flag = argToBoolean(args.get("create_relationships", True)) + create_threat_object_indicators_flag = argToBoolean(args.get("create_threat_object_indicators", False)) - response = client.lookup_indicator('ip', ip) + response = client.lookup_indicator("ip", ip) if response.status_code == 404: - response_data = construct_404_response(ip, 'IP') + response_data = construct_404_response(ip, "IP") else: response_data = extract_response_data(response.json()) - threat_objects = response_data['threat_object_associations'] + threat_objects = response_data["threat_object_associations"] # Create DBotScore - dbot_score = create_dbot_score( - ip, DBotScoreType.IP, response_data['verdict'], client.reliability - ) + dbot_score = create_dbot_score(ip, DBotScoreType.IP, response_data["verdict"], client.reliability) # Extract tags and malware families from threat objects tags = extract_tags_from_threat_objects(threat_objects) malware_families = extract_malware_families_from_threat_objects(threat_objects) # Create enriched IP indicator with tags and malware families - ip_indicator = Common.IP( - ip=ip, dbot_score=dbot_score, tags=tags, malware_family=malware_families - ) + ip_indicator = Common.IP(ip=ip, dbot_score=dbot_score, tags=tags, malware_family=malware_families) # Create relationships - relationships = create_relationships( - ip, FeedIndicatorType.IP, threat_objects, create_relationships_flag, client.reliability - ) + relationships = create_relationships(ip, FeedIndicatorType.IP, threat_objects, create_relationships_flag, client.reliability) # Create indicators from relationships if create_threat_object_indicators_flag: @@ -1089,16 +1024,16 @@ def ip_command(client: Client, args: dict[str, Any]) -> CommandResults: context_data = create_context_data(response_data) readable_output = tableToMarkdown( - f'Unit 42 Intelligence results for IP: {ip}', + f"Unit 42 Intelligence results for IP: {ip}", context_data, - headers=['Value', 'Verdict', 'VerdictCategories', 'SeenBy', 'FirstSeen', 'LastSeen'], + headers=["Value", "Verdict", "VerdictCategories", "SeenBy", "FirstSeen", "LastSeen"], headerTransform=pascalToSpace, removeNull=True, ) return CommandResults( - outputs_prefix='Unit42Intelligence.IP', - outputs_key_field='Value', + outputs_prefix="Unit42Intelligence.IP", + outputs_key_field="Value", outputs=context_data, readable_output=readable_output, indicator=ip_indicator, @@ -1117,42 +1052,30 @@ def domain_command(client: Client, args: dict[str, Any]) -> CommandResults: Returns: CommandResults object """ - domain = args.get('domain', '') - create_relationships_flag = argToBoolean(args.get('create_relationships', True)) - create_threat_object_indicators_flag = argToBoolean( - args.get('create_threat_object_indicators', False) - ) + domain = args.get("domain", "") + create_relationships_flag = argToBoolean(args.get("create_relationships", True)) + create_threat_object_indicators_flag = argToBoolean(args.get("create_threat_object_indicators", False)) - response = client.lookup_indicator('domain', domain) + response = client.lookup_indicator("domain", domain) if response.status_code == 404: - response_data = construct_404_response(domain, 'Domain') + response_data = construct_404_response(domain, "Domain") else: response_data = extract_response_data(response.json()) - threat_objects = response_data['threat_object_associations'] + threat_objects = response_data["threat_object_associations"] # Create DBotScore - dbot_score = create_dbot_score( - domain, DBotScoreType.DOMAIN, response_data['verdict'], client.reliability - ) + dbot_score = create_dbot_score(domain, DBotScoreType.DOMAIN, response_data["verdict"], client.reliability) # Extract tags and malware families from threat objects tags = extract_tags_from_threat_objects(threat_objects) malware_families = extract_malware_families_from_threat_objects(threat_objects) # Create enriched Domain indicator with tags and malware families - domain_indicator = Common.Domain( - domain=domain, dbot_score=dbot_score, tags=tags, malware_family=malware_families - ) + domain_indicator = Common.Domain(domain=domain, dbot_score=dbot_score, tags=tags, malware_family=malware_families) # Create relationships - relationships = create_relationships( - domain, - FeedIndicatorType.Domain, - threat_objects, - create_relationships_flag, - client.reliability, - ) + relationships = create_relationships(domain, FeedIndicatorType.Domain, threat_objects, create_relationships_flag, client.reliability) # Create indicators from relationships if create_threat_object_indicators_flag: @@ -1164,16 +1087,16 @@ def domain_command(client: Client, args: dict[str, Any]) -> CommandResults: context_data = create_context_data(response_data) readable_output = tableToMarkdown( - f'Unit 42 Intelligence results for Domain: {domain}', + f"Unit 42 Intelligence results for Domain: {domain}", context_data, - headers=['Value', 'Verdict', 'VerdictCategories', 'SeenBy', 'FirstSeen', 'LastSeen'], + headers=["Value", "Verdict", "VerdictCategories", "SeenBy", "FirstSeen", "LastSeen"], headerTransform=pascalToSpace, removeNull=True, ) return CommandResults( - outputs_prefix='Unit42Intelligence.Domain', - outputs_key_field='Value', + outputs_prefix="Unit42Intelligence.Domain", + outputs_key_field="Value", outputs=context_data, readable_output=readable_output, indicator=domain_indicator, @@ -1192,38 +1115,30 @@ def url_command(client: Client, args: dict[str, Any]) -> CommandResults: Returns: CommandResults object """ - url = args.get('url', '') - create_relationships_flag = argToBoolean(args.get('create_relationships', True)) - create_threat_object_indicators_flag = argToBoolean( - args.get('create_threat_object_indicators', False) - ) + url = args.get("url", "") + create_relationships_flag = argToBoolean(args.get("create_relationships", True)) + create_threat_object_indicators_flag = argToBoolean(args.get("create_threat_object_indicators", False)) - response = client.lookup_indicator('url', url) + response = client.lookup_indicator("url", url) if response.status_code == 404: - response_data = construct_404_response(url, 'URL') + response_data = construct_404_response(url, "URL") else: response_data = extract_response_data(response.json()) - threat_objects = response_data['threat_object_associations'] + threat_objects = response_data["threat_object_associations"] # Create DBotScore - dbot_score = create_dbot_score( - url, DBotScoreType.URL, response_data['verdict'], client.reliability - ) + dbot_score = create_dbot_score(url, DBotScoreType.URL, response_data["verdict"], client.reliability) # Extract tags and malware families from threat objects tags = extract_tags_from_threat_objects(threat_objects) malware_families = extract_malware_families_from_threat_objects(threat_objects) # Create enriched URL indicator with tags and malware families - url_indicator = Common.URL( - url=url, dbot_score=dbot_score, tags=tags, malware_family=malware_families - ) + url_indicator = Common.URL(url=url, dbot_score=dbot_score, tags=tags, malware_family=malware_families) # Create relationships - relationships = create_relationships( - url, FeedIndicatorType.URL, threat_objects, create_relationships_flag, client.reliability - ) + relationships = create_relationships(url, FeedIndicatorType.URL, threat_objects, create_relationships_flag, client.reliability) # Create indicators from relationships if create_threat_object_indicators_flag: @@ -1235,16 +1150,16 @@ def url_command(client: Client, args: dict[str, Any]) -> CommandResults: context_data = create_context_data(response_data, indicator_value=url) readable_output = tableToMarkdown( - f'Unit 42 Intelligence results for URL: {url}', + f"Unit 42 Intelligence results for URL: {url}", context_data, - headers=['Value', 'Verdict', 'VerdictCategories', 'SeenBy', 'FirstSeen', 'LastSeen'], + headers=["Value", "Verdict", "VerdictCategories", "SeenBy", "FirstSeen", "LastSeen"], headerTransform=pascalToSpace, removeNull=True, ) return CommandResults( - outputs_prefix='Unit42Intelligence.URL', - outputs_key_field='Value', + outputs_prefix="Unit42Intelligence.URL", + outputs_key_field="Value", outputs=context_data, readable_output=readable_output, indicator=url_indicator, @@ -1263,31 +1178,27 @@ def file_command(client: Client, args: dict[str, Any]) -> CommandResults: Returns: CommandResults object """ - file_hash = args.get('file', '') - create_relationships_flag = argToBoolean(args.get('create_relationships', True)) - create_threat_object_indicators_flag = argToBoolean( - args.get('create_threat_object_indicators', False) - ) + file_hash = args.get("file", "") + create_relationships_flag = argToBoolean(args.get("create_relationships", True)) + create_threat_object_indicators_flag = argToBoolean(args.get("create_threat_object_indicators", False)) # Validate hash type - Unit 42 Intelligence only supports SHA256 hash_type = get_hash_type(file_hash) - if hash_type != 'sha256': + if hash_type != "sha256": return CommandResults( - readable_output=f'Unit 42 Intelligence only supports SHA256 hashes. Provided hash type: {hash_type}' + readable_output=f"Unit 42 Intelligence only supports SHA256 hashes. Provided hash type: {hash_type}" ) - response = client.lookup_indicator('filehash_sha256', file_hash) + response = client.lookup_indicator("filehash_sha256", file_hash) if response.status_code == 404: - response_data = construct_404_response(file_hash, 'File') + response_data = construct_404_response(file_hash, "File") else: response_data = extract_response_data(response.json()) - threat_objects = response_data['threat_object_associations'] + threat_objects = response_data["threat_object_associations"] # Create DBotScore - dbot_score = create_dbot_score( - file_hash, DBotScoreType.FILE, response_data['verdict'], client.reliability - ) + dbot_score = create_dbot_score(file_hash, DBotScoreType.FILE, response_data["verdict"], client.reliability) # Extract tags and malware families from threat objects tags = extract_tags_from_threat_objects(threat_objects) @@ -1295,26 +1206,20 @@ def file_command(client: Client, args: dict[str, Any]) -> CommandResults: # Create enriched File indicator with proper hash field assignment file_indicator = Common.File( - size=demisto.get(response_data, 'indicator_details.file_size', ''), - file_type=demisto.get(response_data, 'indicator_details.file_type', ''), - imphash=demisto.get(response_data, 'indicator_details.file_hashes.imphash', ''), - md5=demisto.get(response_data, 'indicator_details.file_hashes.md5', ''), - sha1=demisto.get(response_data, 'indicator_details.file_hashes.sha1', ''), + size=demisto.get(response_data, "indicator_details.file_size", ""), + file_type=demisto.get(response_data, "indicator_details.file_type", ""), + imphash=demisto.get(response_data, "indicator_details.file_hashes.imphash", ""), + md5=demisto.get(response_data, "indicator_details.file_hashes.md5", ""), + sha1=demisto.get(response_data, "indicator_details.file_hashes.sha1", ""), sha256=file_hash, - ssdeep=demisto.get(response_data, 'indicator_details.file_hashes.ssdeep', ''), + ssdeep=demisto.get(response_data, "indicator_details.file_hashes.ssdeep", ""), dbot_score=dbot_score, tags=tags, malware_family=malware_families, ) # Create relationships - relationships = create_relationships( - file_hash, - FeedIndicatorType.File, - threat_objects, - create_relationships_flag, - client.reliability, - ) + relationships = create_relationships(file_hash, FeedIndicatorType.File, threat_objects, create_relationships_flag, client.reliability) # Create indicators from relationships if create_threat_object_indicators_flag: @@ -1326,16 +1231,16 @@ def file_command(client: Client, args: dict[str, Any]) -> CommandResults: context_data = create_context_data(response_data) readable_output = tableToMarkdown( - f'Unit 42 Intelligence results for File: {file_hash}', + f"Unit 42 Intelligence results for File: {file_hash}", context_data, - headers=['Value', 'Verdict', 'VerdictCategories', 'SeenBy', 'FirstSeen', 'LastSeen'], + headers=["Value", "Verdict", "VerdictCategories", "SeenBy", "FirstSeen", "LastSeen"], headerTransform=pascalToSpace, removeNull=True, ) return CommandResults( - outputs_prefix='Unit42Intelligence.File', - outputs_key_field='Value', + outputs_prefix="Unit42Intelligence.File", + outputs_key_field="Value", outputs=context_data, readable_output=readable_output, indicator=file_indicator, @@ -1354,19 +1259,17 @@ def main() -> None: command = demisto.command() # Get parameters - verify_certificate = not argToBoolean(params.get('insecure', False)) - proxy = argToBoolean(params.get('proxy', False)) - reliability = params.get('integration_reliability', 'A++ - Reputation script') - create_relationships = argToBoolean(params.get('create_relationships', True)) - create_threat_object_indicators = argToBoolean( - params.get('create_threat_object_indicators', False) - ) + verify_certificate = not argToBoolean(params.get("insecure", False)) + proxy = argToBoolean(params.get("proxy", False)) + reliability = params.get("integration_reliability", "A++ - Reputation script") + create_relationships = argToBoolean(params.get("create_relationships", True)) + create_threat_object_indicators = argToBoolean(params.get("create_threat_object_indicators", False)) # Add create_relationships to args for commands - args['create_relationships'] = create_relationships - args['create_threat_object_indicators'] = create_threat_object_indicators + args["create_relationships"] = create_relationships + args["create_threat_object_indicators"] = create_threat_object_indicators - demisto.debug(f'Command being called is {command}') + demisto.debug(f"Command being called is {command}") try: client = Client( @@ -1375,50 +1278,50 @@ def main() -> None: reliability=reliability, ) - if command == 'test-module': + if command == "test-module": result = test_module(client) return_results(result) - elif command == 'ip': + elif command == "ip": results = [] - ips = argToList(args.get('ip', '')) + ips = argToList(args.get("ip", "")) for ip in ips: - args['ip'] = ip + args["ip"] = ip results.append(ip_command(client, args)) return_results(results) - elif command == 'domain': + elif command == "domain": results = [] - domains = argToList(args.get('domain', '')) + domains = argToList(args.get("domain", "")) for domain in domains: - args['domain'] = domain + args["domain"] = domain results.append(domain_command(client, args)) return_results(results) - elif command == 'url': + elif command == "url": results = [] # Use smart URL parsing to handle URLs with commas in parameters - urls = parse_url_list(args.get('url', '')) + urls = parse_url_list(args.get("url", "")) for url in urls: - args['url'] = url + args["url"] = url results.append(url_command(client, args)) return_results(results) - elif command == 'file': + elif command == "file": results = [] - files = argToList(args.get('file', '')) + files = argToList(args.get("file", "")) for file in files: - args['file'] = file + args["file"] = file results.append(file_command(client, args)) return_results(results) else: - raise NotImplementedError(f'Command {command} is not implemented') + raise NotImplementedError(f"Command {command} is not implemented") except Exception as e: demisto.error(traceback.format_exc()) - return_error(f'Failed to execute {command} command.\nError:\n{str(e)}') + return_error(f"Failed to execute {command} command.\nError:\n{str(e)}") -if __name__ in ('__main__', '__builtin__', 'builtins'): +if __name__ in ("__main__", "__builtin__", "builtins"): main() From 6710c0615460acb3522539a8234823493647bf9e Mon Sep 17 00:00:00 2001 From: Moshe Eichler <78307768+MosheEichler@users.noreply.github.com> Date: Thu, 5 Feb 2026 16:12:24 +0200 Subject: [PATCH 7/7] Update Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/ReleaseNotes/1_0_12.md Co-authored-by: Richard Bluestone <53567272+richardbluestone@users.noreply.github.com> --- .../ReleaseNotes/1_0_12.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/ReleaseNotes/1_0_12.md b/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/ReleaseNotes/1_0_12.md index e3ed85050fc1..89dff56b3b76 100644 --- a/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/ReleaseNotes/1_0_12.md +++ b/Packs/Unit42ThreatIntelligencebyPaloAltoNetworks/ReleaseNotes/1_0_12.md @@ -2,4 +2,4 @@ ##### Unit 42 Intelligence -- Change relationship source_reliability to use parameter reliability. +- Changed `create_relationships` to use the `reliability` parameter for `source_reliability`, allowing for configurable reliability levels.