Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
## [Unreleased]

### Added
- Added `TotalLocatablePeriods` query type in flowmachine. [#7237](https://github.com/Flowminder/FlowKit/pull/7237)

### Changed

Expand Down
186 changes: 186 additions & 0 deletions flowmachine/flowmachine/features/subscriber/total_locatable_periods.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
from typing import List, Optional, Tuple
from typing import Union as UnionType

from flowmachine.core import Query
from flowmachine.core.spatial_unit import AnySpatialUnit
from flowmachine.core.union import Union
from flowmachine.features.subscriber.total_active_periods import (
TotalActivePeriodsSubscriber,
)
from flowmachine.features.utilities.subscriber_locations import SubscriberLocations
from flowmachine.features.utilities.unique_values_from_queries import (
UniqueValuesFromQueries,
)
from flowmachine.utils import time_period_add, standardise_date


class TotalLocatablePeriods(TotalActivePeriodsSubscriber):
"""
Breaks a time span into distinct time periods (integer number of days, hours or minutes).
For each subscriber counts the total number of time periods in
which each subscriber was seen at locatable cells.
For instance we might ask for a month worth of data, break down our
month into 10 3 day chunks, and ask for each subscriber how many of these
three day chunks each subscriber was present and locatable in the data in.

This is equivalent to TotalActivePeriodsSubscriber, except that this query
only considers events from cells that map to a location (using the specified spatial unit).

Note: "locatable cell" here means a cell whose ID appears in the "location_id" column
of the specified spatial unit, even if this location_id is mapped to a null location.

Parameters
----------
start : str
iso-format date, start of the analysis.
total_periods : int
Total number of periods to break your time span into
period_length : int, default 1
Total number of days per period.
period_unit : {'days', 'hours', 'minutes'} default 'days'
Comment on lines +41 to +43
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Clarify period_length docs for non‑day units.

Line 41–42 says “days per period”, but period_unit supports hours/minutes, so the current wording can mislead users. Please update the description to be unit‑agnostic.

✏️ Suggested docstring tweak
-    period_length : int, default 1
-        Total number of days per period.
+    period_length : int, default 1
+        Number of `period_unit` intervals per period.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
period_length : int, default 1
Total number of days per period.
period_unit : {'days', 'hours', 'minutes'} default 'days'
period_length : int, default 1
Number of `period_unit` intervals per period.
period_unit : {'days', 'hours', 'minutes'} default 'days'
🤖 Prompt for AI Agents
In `@flowmachine/flowmachine/features/subscriber/total_locatable_periods.py`
around lines 41 - 43, The docstring for the parameters period_length and
period_unit is misleading: period_length is described as "Total number of days
per period" even though period_unit accepts 'days', 'hours', or 'minutes';
update the docstring for period_length to be unit-agnostic (e.g., "Total number
of units per period" or "Length of each period in units specified by
period_unit") and ensure period_unit description remains consistent (e.g., "Unit
for period_length: 'days', 'hours', or 'minutes'"). Update these descriptions in
the function/class where period_length and period_unit are defined (search for
period_length and period_unit in total_locatable_periods.py) so the docstring
clearly indicates the relationship between the two parameters.

Split this time frame into hours or days etc.
spatial_unit : flowmachine.core.spatial_unit.*SpatialUnit, default versioned-cell
Spatial unit defining the set of "locatable" cell IDs.
subscriber_identifier : {'msisdn', 'imei'}, default 'msisdn'
Either msisdn, or imei, the column that identifies the subscriber.
subscriber_subset : str, list, flowmachine.core.Query, flowmachine.core.Table, default None
If provided, string or list of string which are msisdn or imeis to limit
results to; or, a query or table which has a column with a name matching
subscriber_identifier (typically, msisdn), to limit results to.
periods_to_exclude : list of str
List of datetimes corresponding to the starts of periods that should be excluded
(e.g. to ignore a date in the middle of the window)

See Also
--------
flowmachine.features.subscriber.total_active_periods.TotalActivePeriodsSubscriber
"""

def __init__(
self,
*,
start: str,
total_periods: int,
period_length: int = 1,
period_unit: str = "days",
spatial_unit: AnySpatialUnit,
hours: Optional[Tuple[int, int]] = None,
table: Optional[UnionType[str, List[str]]] = None,
subscriber_identifier: str = "msisdn",
subscriber_subset: Optional[Query] = None,
periods_to_exclude: Optional[List[str]] = None,
):
self.spatial_unit = spatial_unit
if periods_to_exclude is not None:
# Filter periods to exclude to only relevant dates
# (otherwise changes to irrelevant dates will change query ID)
lower_bound = standardise_date(start)
upper_bound = time_period_add(
start, total_periods * period_length, period_unit
)
# Needs to be sorted so that the query ID is deterministic
self.periods_to_exclude = sorted(
set(
standardise_date(p)
for p in periods_to_exclude
if (
standardise_date(p) >= lower_bound
and standardise_date(p) < upper_bound
)
)
)
Comment on lines +77 to +94
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

rg -n "def standardise_date|def standardise_datetime|def time_period_add" -A 15

Repository: Flowminder/FlowKit

Length of output: 3015


🏁 Script executed:

sed -n '57,105p' flowmachine/flowmachine/utils.py

Repository: Flowminder/FlowKit

Length of output: 1241


🏁 Script executed:

sed -n '132,170p' flowmachine/flowmachine/utils.py

Repository: Flowminder/FlowKit

Length of output: 986


🏁 Script executed:

sed -n '70,100p' flowmachine/flowmachine/features/subscriber/total_locatable_periods.py

Repository: Flowminder/FlowKit

Length of output: 1339


🏁 Script executed:

rg -n "def parse_datestring" -A 20

Repository: Flowminder/FlowKit

Length of output: 1344


🏁 Script executed:

sed -n '40,55p' flowmachine/flowmachine/utils.py

Repository: Flowminder/FlowKit

Length of output: 728


🏁 Script executed:

rg -n "periods_to_exclude" -B 5 -A 5 | head -60

Repository: Flowminder/FlowKit

Length of output: 6833


Exclusions for hourly/minute periods may silently fail if provided as date-only strings.

The code normalises periods_to_exclude with standardise_date, which preserves time precision (HH:MM:SS). However, parse_datestring (called during normalisation) defaults date-only input like "2016-01-01" to "2016-01-01 00:00:00". For hourly or minute period_unit, period starts occur at specific times (e.g., "2016-01-01 14:30:00"), so date-only exclusions will not match and be silently ignored.

Either require periods_to_exclude to include explicit times matching the period granularity, document this constraint clearly, or enhance the filtering to handle date-only inputs intelligently based on period_unit.

🤖 Prompt for AI Agents
In `@flowmachine/flowmachine/features/subscriber/total_locatable_periods.py`
around lines 77 - 94, The exclusion filtering silently ignores date-only strings
for finer-grained periods because standardise_date (via parse_datestring)
defaults them to 00:00:00; update the filtering in the periods_to_exclude block
to treat date-only inputs as whole-day ranges for hour/minute period_unit: when
iterating periods_to_exclude, detect inputs that lack time (use parse_datestring
or original string heuristic), and if period_unit is 'hour' or 'minute' expand
that date into a range (start_of_day..end_of_day) and include any period start
that falls within that range (instead of only matching 00:00:00); keep using
standardise_date for datetime inputs, preserve the existing
lower_bound/upper_bound logic (time_period_add, total_periods, period_length),
and ensure final self.periods_to_exclude remains sorted and deduplicated as
before.

else:
self.periods_to_exclude = []
super().__init__(
start=start,
total_periods=total_periods,
period_length=period_length,
period_unit=period_unit,
hours=hours,
table=table,
subscriber_identifier=subscriber_identifier,
subscriber_subset=subscriber_subset,
)

def _get_start_stops(self):
"""
Gets two lists, one for the start dates and one for the
stop dates.
"""

starts = [
time_period_add(self.start, i * self.period_length, self.period_unit)
for i in range(self.total_periods)
]
stops = [
time_period_add(self.start, (i + 1) * self.period_length, self.period_unit)
for i in range(self.total_periods)
]
filtered_startstops = [
(start, stop)
for start, stop in zip(starts, stops)
if start not in self.periods_to_exclude
]
if not filtered_startstops:
raise ValueError(
"Cannot create a TotalLocatablePeriods query with all periods excluded"
)
filtered_starts, filtered_stops = zip(
*(
(start, stop)
for start, stop in zip(starts, stops)
if start not in self.periods_to_exclude
)
)
return filtered_starts, filtered_stops

def _get_unioned_subscribers_list(
self,
hours: UnionType[str, Tuple[int, int]] = "all",
table: Optional[UnionType[str, List[str]]] = None,
subscriber_identifier: str = "msisdn",
subscriber_subset: Optional[Query] = None,
):
"""
Constructs a list of unique locatable subscribers for each time period.
They will be unique within each time period. Joins
these lists into one long list and returns the result
(as a query)
"""
# Union doesn't handle the case where there's only one query in the list,
# so handle that separately here
if len(self.starts) == 1:
return UniqueValuesFromQueries(
query_list=SubscriberLocations(
self.starts[0],
self.stops[0],
spatial_unit=self.spatial_unit,
hours=hours,
table=table,
subscriber_identifier=subscriber_identifier,
subscriber_subset=subscriber_subset,
),
column_names="subscriber",
)
else:
return Union(
*[
UniqueValuesFromQueries(
query_list=SubscriberLocations(
start,
stop,
spatial_unit=self.spatial_unit,
hours=hours,
table=table,
subscriber_identifier=subscriber_identifier,
subscriber_subset=subscriber_subset,
),
column_names="subscriber",
)
for start, stop in zip(self.starts, self.stops)
],
all=True,
)