Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion sources/matomo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def matomo_visits(
if get_live_event_visitors:
resource_list.append(
visits_data_generator
| get_unique_visitors(client=client, site_id=live_events_site_id)
| get_unique_visitors_with_chunk(client=client, site_id=live_events_site_id)
)
return resource_list

Expand Down Expand Up @@ -221,3 +221,41 @@ def get_unique_visitors(
)
for method_dict in method_data:
yield method_dict


@dlt.transformer(
data_from=get_last_visits,
write_disposition="merge",
name="visitors",
primary_key="visitorId",
)
def get_unique_visitors_with_chunk(
visits: List[DictStrAny],
client: MatomoAPIClient,
site_id: int,
chunk_size: int = 20,
) -> Iterator[TDataItem]:
"""
Dlt transformer. Receives information about visits from get_last_visits.
This version allows batch loading for visitors data, which is to avoid too-long-URL issue

Args:
visits (List[DictStrAny]): List of dicts containing information on last visits in the given timeframe.
client (MatomoAPIClient): Used to make calls to Matomo API.
site_id (int): Every site in Matomo has a unique id.
chunk_size (int): Number of visitor IDs to process in each batch. Defaults to 100.

Returns:
Iterator[TDataItem]: Dict containing information about the visitor.
"""

visitor_ids = [visit["visitorId"] for visit in visits]
indexed_visitor_ids = [
visitor_ids[i : i + chunk_size] for i in range(0, len(visitor_ids), chunk_size)
]
for visitor_list in indexed_visitor_ids:
method_data = client.get_visitors_batch(
visitor_list=visitor_list, site_id=site_id
)
for method_dict in method_data:
yield method_dict
Loading