Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 58 additions & 8 deletions sources/hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,32 @@
List,
Literal,
Optional,
Sequence,
)
from urllib.parse import quote

import dlt
from dlt.common import pendulum
from dlt.common import logger, pendulum
from dlt.common.typing import TDataItems
from dlt.sources import DltResource

from .helpers import (
_get_property_names_types,
_to_dlt_columns_schema,
search_data_since,
fetch_data,
fetch_property_history,
get_properties_labels,
SearchOutOfBoundsException,
)
from .settings import (
ALL_OBJECTS,
ARCHIVED_PARAM,
CRM_OBJECT_ASSOCIATIONS,
CRM_OBJECT_ENDPOINTS,
CRM_PIPELINES_ENDPOINT,
ENTITY_PROPERTIES,
LAST_MODIFIED_PROPERTY,
HUBSPOT_CREATION_DATE,
MAX_PROPS_LENGTH,
OBJECT_TYPE_PLURAL,
OBJECT_TYPE_SINGULAR,
Expand All @@ -73,6 +77,7 @@ def fetch_data_for_properties(
api_key: str,
object_type: str,
soft_delete: bool,
last_modified: str = None,
) -> Iterator[TDataItems]:
"""
Fetch data for a given set of properties from the HubSpot API.
Expand All @@ -82,20 +87,50 @@ def fetch_data_for_properties(
api_key (str): HubSpot API key for authentication.
object_type (str): The type of HubSpot object (e.g., 'company', 'contact').
soft_delete (bool): Flag to fetch soft-deleted (archived) records.
last_modified (str): The date from which to fetch records. If None, get all records.

Yields:
Iterator[TDataItems]: Data retrieved from the HubSpot API.
"""
logger.info(f"Fetching data for {object_type}.")
# The Hubspot API expects a comma separated string as properties
joined_props = ",".join(sorted(props))
params: Dict[str, Any] = {"properties": joined_props, "limit": 100}
associations = CRM_OBJECT_ASSOCIATIONS[object_type]
joined_associations = ",".join(associations)
params: Dict[str, Any] = {
"properties": joined_props,
"limit": 100,
}
if associations:
params["associations"] = joined_associations

context: Optional[Dict[str, Any]] = (
{SOFT_DELETE_KEY: False} if soft_delete else None
)

yield from fetch_data(
CRM_OBJECT_ENDPOINTS[object_type], api_key, params=params, context=context
)
if last_modified is not None:
try:
yield from search_data_since(
CRM_OBJECT_ENDPOINTS[object_type],
api_key,
last_modified,
LAST_MODIFIED_PROPERTY[object_type],
props=props,
associations=associations,
context=context,
)
except SearchOutOfBoundsException:
logger.info("Search out of bounds, fetching all data")
yield from fetch_data(
CRM_OBJECT_ENDPOINTS[object_type],
api_key,
params=params,
context=context,
)
else:
yield from fetch_data(
CRM_OBJECT_ENDPOINTS[object_type], api_key, params=params, context=context
)
if soft_delete:
yield from fetch_data(
CRM_OBJECT_ENDPOINTS[object_type],
Expand All @@ -109,6 +144,7 @@ def crm_objects(
object_type: str,
api_key: str,
props: List[str],
last_modified: dlt.sources.incremental[str],
include_custom_props: bool = True,
archived: bool = False,
) -> Iterator[TDataItems]:
Expand All @@ -119,6 +155,7 @@ def crm_objects(
object_type (str): Type of HubSpot object (e.g., 'company', 'contact').
api_key (str): API key for HubSpot authentication.
props (List[str]): List of properties to retrieve.
last_modified (str): The date from which to fetch records
include_custom_props (bool, optional): Include custom properties in the result. Defaults to True.
archived (bool, optional): Fetch archived (soft-deleted) objects. Defaults to False.

Expand All @@ -135,8 +172,17 @@ def crm_objects(
prop: _to_dlt_columns_schema({prop: hb_type})
for prop, hb_type in props_to_type.items()
}
last_modified_on = (
None
if last_modified.start_value == last_modified.initial_value
else last_modified.start_value
)
for batch in fetch_data_for_properties(
list(props_to_type.keys()), api_key, object_type, archived
list(props_to_type.keys()),
api_key,
object_type,
archived,
last_modified_on,
):
yield dlt.mark.with_hints(batch, dlt.mark.make_hints(columns=col_type_hints))

Expand Down Expand Up @@ -337,7 +383,7 @@ def properties_custom_labels(api_key: str = api_key) -> Iterator[TDataItems]:
"""

def get_properties_description(
properties_list_inner: List[Dict[str, Any]]
properties_list_inner: List[Dict[str, Any]],
) -> Iterator[Dict[str, Any]]:
"""Fetch properties."""
for property_info in properties_list_inner:
Expand Down Expand Up @@ -411,6 +457,10 @@ def get_pipelines(object_type: str) -> Iterator[TDataItems]:
object_type=obj,
api_key=api_key,
props=properties.get(obj),
last_modified=dlt.sources.incremental(
LAST_MODIFIED_PROPERTY[obj],
initial_value=HUBSPOT_CREATION_DATE.isoformat(),
),
include_custom_props=include_custom_props,
archived=soft_delete,
)
Expand Down
194 changes: 169 additions & 25 deletions sources/hubspot/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,24 @@
import urllib.parse
from typing import Any, Dict, Iterator, List, Optional

from dlt.common import logger
from dlt.common.schema.typing import TColumnSchema
from dlt.sources.helpers import requests

from .settings import OBJECT_TYPE_PLURAL, HS_TO_DLT_TYPE
from .settings import (
CRM_ASSOCIATIONS_ENDPOINT,
CRM_SEARCH_ENDPOINT,
OBJECT_TYPE_PLURAL,
HS_TO_DLT_TYPE,
)

BASE_URL = "https://api.hubapi.com/"


class SearchOutOfBoundsException(Exception):
pass


def get_url(endpoint: str) -> str:
"""Get absolute hubspot endpoint URL"""
return urllib.parse.urljoin(BASE_URL, endpoint)
Expand Down Expand Up @@ -48,6 +58,21 @@ def pagination(
return None


def search_pagination(
url: str,
_data: Dict[str, Any],
headers: Dict[str, Any],
params: Optional[Dict[str, Any]] = None,
) -> Optional[Dict[str, Any]]:
_after = _data.get("paging", {}).get("next", {}).get("after", False)
if _after and _after != "10000":
# Get the next page response
r = requests.post(url, headers=headers, json={**params, "after": _after})
return r.json() # type: ignore
else:
return None


def extract_association_data(
_obj: Dict[str, Any],
data: Dict[str, Any],
Expand Down Expand Up @@ -126,6 +151,101 @@ def fetch_property_history(
_data = None


def search_data_since(
endpoint: str,
api_key: str,
last_modified: str,
last_modified_prop: str,
props: List[str],
associations: Optional[List[str]] = None,
context: Optional[Dict[str, Any]] = None,
) -> Iterator[List[Dict[str, Any]]]:
"""
Fetch data from the HUBSPOT search endpoint, based on a given root endpoint, using a specified
API key and yield the properties of each result. This function yields results from a last modified
point in time based on the provided last modified property.

Args:
endpoint (str): The root endpoint to fetch data from, as a string.
api_key (str): The API key to use for authentication, as a string.
last_modified (str): The date from which to start the search, as a string in ISO format.
last_modified_prop (str): The property used to check the last modified date against, as a string.
props: The list of properties to include for the object in the request.
associations: Optional dict of associations to search for for each object.
context (Optional[Dict[str, Any]]): Additional data which need to be added in the resulting page.

Yields:
A List of CRM object dicts

Raises:
requests.exceptions.HTTPError: If the API returns an HTTP error status code.

Notes:
This function uses the `requests` library to make a POST request to the specified endpoint, with
the API key included in the headers. If the API returns a non-successful HTTP status code (e.g.
404 Not Found), a `requests.exceptions.HTTPError` exception will be raised.

The `endpoint` argument should be a relative URL, which will be modified to a search endpoint
and then appended to the base URL for the API. `last_modified`, `last_modified_prop`, and `props`
are used to pass additional parameters to the request
"""
# Construct the URL and headers for the API request
url = get_url(CRM_SEARCH_ENDPOINT.format(crm_endpoint=endpoint))
headers = _get_headers(api_key)
body: Dict[str, Any] = {
"properties": sorted(props),
"limit": 200,
"filterGroups": [
{
"filters": [
{
"propertyName": last_modified_prop,
"operator": "GTE",
"value": last_modified,
}
]
}
],
"sorts": [{"propertyName": last_modified_prop, "direction": "ASCENDING"}],
}

# Make the API request
r = requests.post(url, headers=headers, json=body)
# Parse the API response and yield the properties of each result
# Parse the response JSON data
_data = r.json()

_total = _data.get("total", 0)
logger.info(f"Getting {_total} new objects from {url} starting at {last_modified}")
_max_last_modified = last_modified
# Yield the properties of each result in the API response
while _data is not None:
if "results" in _data:
for _result in _data["results"]:
if _result["updatedAt"]:
_max_last_modified = max(_max_last_modified, _result["updatedAt"])
yield _data_to_objects(
_data, endpoint, headers, associations=associations, context=context
)

# Follow pagination links if they exist
_data = search_pagination(url, _data, headers, body)

if _total > 9999:
if _max_last_modified == last_modified:
raise SearchOutOfBoundsException
logger.info(f"Starting new search iteration at {_max_last_modified}")
yield from search_data_since(
endpoint,
api_key,
_max_last_modified,
last_modified_prop,
props,
associations,
context,
)


def fetch_data(
endpoint: str,
api_key: str,
Expand Down Expand Up @@ -168,38 +288,62 @@ def fetch_data(
# Parse the API response and yield the properties of each result
# Parse the response JSON data
_data = r.json()

# Yield the properties of each result in the API response
while _data is not None:
if "results" in _data:
_objects: List[Dict[str, Any]] = []
for _result in _data["results"]:
_obj = _result.get("properties", _result)
if "id" not in _obj and "id" in _result:
# Move id from properties to top level
_obj["id"] = _result["id"]
if "associations" in _result:
for association in _result["associations"]:
__data = _result["associations"][association]

__values = extract_association_data(
_obj, __data, association, headers
)

# remove duplicates from list of dicts
__values = [
dict(t) for t in {tuple(d.items()) for d in __values}
]

_obj[association] = __values
if context:
_obj.update(context)
_objects.append(_obj)
yield _objects
yield _data_to_objects(_data, endpoint, headers, context=context)

# Follow pagination links if they exist
_data = pagination(_data, headers)


def _data_to_objects(
data: Any,
endpoint: str,
headers: Dict[str, str],
associations: Optional[List[str]] = None,
context: Optional[Dict[str, Any]] = None,
) -> List[Dict[str, Any]]:
_objects: List[Dict[str, Any]] = []
for _result in data["results"]:
_obj = _result.get("properties", _result)
if "id" not in _obj and "id" in _result:
# Move id from properties to top level
_obj["id"] = _result["id"]
if "associations" in _result:
for association in _result["associations"]:
__data = _result["associations"][association]
_add_association_data(__data, association, headers, _obj)
elif associations is not None:
for association in associations:
__endpoint = get_url(
CRM_ASSOCIATIONS_ENDPOINT.format(
crm_endpoint=endpoint,
object_id=_result["id"],
association=association,
)
)
r = requests.get(__endpoint, headers=headers, params={"limit": 500})
__data = r.json()
_add_association_data(__data, association, headers, _obj)
if context:
_obj.update(context)
_objects.append(_obj)
return _objects


def _add_association_data(
data: Any, association: str, headers: Dict[str, str], obj: Any
) -> None:
__values = extract_association_data(obj, data, association, headers)

# remove duplicates from list of dicts
__values = [dict(t) for t in {tuple(d.items()) for d in __values}]

obj[association] = __values


def _get_property_names_types(
api_key: str, object_type: str
) -> Dict[str, Union[str, None]]:
Expand Down
Loading
Loading