Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
from typing import NoReturn


SUCCESS_MESSAGE: str = (
"Successfully connected to the PagerDuty API."
)
SUCCESS_MESSAGE: str = "Successfully connected to the PagerDuty API."
ERROR_MESSAGE: str = "Failed to connect to the PagerDuty API."


Expand All @@ -31,10 +29,17 @@ def _extract_action_parameters(self) -> None:
param_name="api_key",
is_mandatory=True,
)
self.verify_ssl = extract_configuration_param(
self.soar_action,
provider_name=INTEGRATION_NAME,
param_name="Verify SSL",
default_value=True,
input_type=bool,
)

def _init_api_clients(self):
"""Prepare API client"""
return PagerDutyManager(self.api_key)
return PagerDutyManager(self.api_key, verify_ssl=self.verify_ssl)

def _perform_action(self, _=None) -> None:
self.api_client.test_connectivity()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from __future__ import annotations

import datetime
import sys
import uuid
from typing import Any

from soar_sdk.SiemplifyConnectors import SiemplifyConnectorExecution
from soar_sdk.SiemplifyConnectorsDataModel import AlertInfo
Expand All @@ -10,7 +11,9 @@
dict_to_flat,
output_handler,
)
from TIPCommon.types import SingleJson

from ..core.constants import SEVERITY_HIGH, SEVERITY_LOW
from ..core.PagerDutyManager import PagerDutyManager

CONNECTOR_NAME = "PagerDuty"
Expand All @@ -19,9 +22,9 @@


@output_handler
def main(is_test_run):
processed_alerts = [] # The main output of each connector run
siemplify = SiemplifyConnectorExecution() # Siemplify main SDK wrapper
def main(is_test_run: bool) -> None:
processed_alerts: list[AlertInfo] = []
siemplify = SiemplifyConnectorExecution()
siemplify.script_name = CONNECTOR_NAME

if is_test_run:
Expand All @@ -31,33 +34,77 @@ def main(is_test_run):

siemplify.LOGGER.info("----------------- Main - Param Init -----------------")

api_key = siemplify.extract_connector_param(param_name="apiKey")
acknowledge_enabled = siemplify.extract_connector_param(param_name="acknowledge")
api_key: str = siemplify.extract_connector_param(param_name="apiKey")
acknowledge_enabled: str = siemplify.extract_connector_param(
param_name="acknowledge"
)
max_hours_backwards: int = siemplify.extract_connector_param(
param_name="Max Hours Backwards",
input_type=int,
default_value=24,
)
services: str = siemplify.extract_connector_param(param_name="Services")
proxy_address: str = siemplify.extract_connector_param(
param_name="Proxy Server Address"
)
proxy_username: str = siemplify.extract_connector_param(param_name="Proxy Username")
proxy_password: str = siemplify.extract_connector_param(param_name="Proxy Password")

siemplify.LOGGER.info("------------------- Main - Started -------------------")
manager = PagerDutyManager(api_key)
manager = PagerDutyManager(
api_key=api_key,
)

if proxy_address:
if "://" not in proxy_address:
proxy_address = "http://" + proxy_address
from urllib.parse import urlparse
server_url = urlparse(proxy_address)
scheme: str = server_url.scheme
hostname: str | None = server_url.hostname
port: int | None = server_url.port
credentials: str = ""
if proxy_username and proxy_password:
credentials = f"{proxy_username}:{proxy_password}@"
proxy_str: str = f"{scheme}://{credentials}{hostname}"
if port:
proxy_str += f":{port}"
manager.requests_session.proxies = {"http": proxy_str, "https": proxy_str}

try:
incidents_list = manager.list_incidents()
time_diff: datetime.timedelta = datetime.timedelta(hours=max_hours_backwards)
since: str = (
datetime.datetime.now(datetime.timezone.utc) - time_diff
).strftime("%Y-%m-%dT%H:%M:%SZ")
params: dict[str, Any] = {"since": since}
if services:
params["service_ids[]"] = [
s.strip() for s in services.split(",") if s.strip()
]
incidents_list: list[SingleJson] = manager.list_filtered_incidents(
params=params
)
if incidents_list is None:
siemplify.LOGGER.info(
"No events were retrieved for the specified timeframe from PagerDuty",
)
return
siemplify.LOGGER.info(f"Retrieved {len(incidents_list)} events from PagerDuty")
for incident in incidents_list:
alert_id = incident["incident_key"]
# Map the severity in PagerDuty to the severity levels in Siemplify
severity = get_siemplify_mapped_severity(incident["urgency"])
alert_id: str = incident.get("incident_key", "")

severity: str | None = get_siemplify_mapped_severity(
incident.get("urgency", "low")
)

siemplify_alert = build_alert_info(siemplify, incident, severity)
siemplify_alert: AlertInfo = build_alert_info(siemplify, incident, severity)

if siemplify_alert:
processed_alerts.append(siemplify_alert)
siemplify.LOGGER.info(f"Added incident {alert_id} to package results")
# `acknowledge_enabled` is a str, hence the str comparison below

if acknowledge_enabled:
incident_got = manager.acknowledge_incident(alert_id)
incident_got = manager.acknowledge_incident(incident["id"])
siemplify.LOGGER.info(
f"Incident {incident_got} acknowledged in PagerDuty",
)
Expand All @@ -71,18 +118,22 @@ def main(is_test_run):
siemplify.return_package(processed_alerts)


def get_siemplify_mapped_severity(severity):
severity_map = {"high": "100", "low": "-1"}
return severity_map.get(severity)
def get_siemplify_mapped_severity(severity: str) -> str | None:
severity_map: dict[str, str] = {"high": SEVERITY_HIGH, "low": SEVERITY_LOW}
return severity_map.get(severity.lower()) if severity else None


def build_alert_info(siemplify, incident, severity):
def build_alert_info(
siemplify: SiemplifyConnectorExecution, incident: SingleJson, severity: str | None
) -> AlertInfo:
"""Returns an alert, which is an aggregation of basic events."""
alert_info = AlertInfo()
alert_info: AlertInfo = AlertInfo()
alert_info.display_id = incident["id"]
alert_info.ticket_id = str(uuid.uuid4())
alert_info.name = "PagerDuty Incident: " + incident["title"]
alert_info.rule_generator = incident["first_trigger_log_entry"]["summary"]
alert_info.ticket_id = incident["id"]
alert_info.name = incident['id']
alert_info.rule_generator = (
incident.get("first_trigger_log_entry", {}).get("summary", "No Summary")
)
alert_info.start_time = convert_string_to_unix_time(incident["created_at"])
alert_info.end_time = alert_info.start_time
alert_info.severity = severity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@ parameters:
is_mandatory: true
is_advanced: false
mode: script
- name: Max Hours Backwards
default_value: '24'
type: integer
description: Max hours backwards to fetch incidents on first run.
is_mandatory: false
is_advanced: false
mode: script
- name: Services
default_value: ''
type: string
description: Comma-separated list of service IDs to filter incidents by.
is_mandatory: false
is_advanced: true
mode: script
- name: DeviceProductField
default_value: device_product
type: string
Expand All @@ -38,6 +52,27 @@ parameters:
is_mandatory: true
is_advanced: false
mode: regular
- name: Proxy Server Address
default_value: ''
type: string
description: Address of the proxy server, if applicable.
is_mandatory: false
is_advanced: true
mode: script
- name: Proxy Username
default_value: 'null'
type: string
description: Username for proxy authentication.
is_mandatory: false
is_advanced: true
mode: script
- name: Proxy Password
default_value: 'null'
type: string
description: Password for proxy authentication.
is_mandatory: false
is_advanced: true
mode: script
description: The connector pulls events from the Pagerduty API https://developer.pagerduty.com/api-reference/9d0b4b12e36f9-list-incidents.
integration: PagerDuty
rules: [ ]
Expand Down
Loading
Loading