Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 90 additions & 56 deletions lib/arrow/hastus/export_upload.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Arrow.Hastus.ExportUpload do
require Logger

alias Arrow.Hastus.TripRouteDirection
alias Arrow.PolytreeHelper

@type t :: %__MODULE__{
services: list(map()),
Expand Down Expand Up @@ -468,30 +469,31 @@ defmodule Arrow.Hastus.ExportUpload do
unique_trip_counts = Enum.frequencies_by(trips, &trip_type/1)

# Trim down the number of trips we look at to save time:
# - Only direction_id 0 trips
# - Only direction_id 1 trips
# - Only trips of a type that occurs more than a handful of times.
# (We can assume the uncommon trip types are train repositionings.)
trips =
trips
|> Stream.filter(&(trp_direction_to_direction_id[&1["trp_direction"]] == 0))
|> Stream.filter(&(trp_direction_to_direction_id[&1["trp_direction"]] == 1))
|> Enum.filter(&(unique_trip_counts[trip_type(&1)] >= @min_trip_type_occurrence))

canonical_stop_sequences = stop_sequences_for_routes(route_ids)
tree = build_polytree(route_ids)
natural_stop_sequences = PolytreeHelper.all_full_paths(tree)

Enum.map(services, &add_derived_limits(&1, trips, stop_times, canonical_stop_sequences))
Enum.map(services, &add_derived_limits(&1, trips, stop_times, natural_stop_sequences, tree))
end

defp add_derived_limits(service, trips, stop_times, canonical_stop_sequences)
defp add_derived_limits(service, trips, stop_times, natural_stop_sequences, tree)

defp add_derived_limits(%{service_dates: []} = service, _, _, _) do
defp add_derived_limits(%{service_dates: []} = service, _, _, _, _) do
Map.put(service, :derived_limits, [])
end

defp add_derived_limits(service, trips, stop_times, canonical_stop_sequences) do
defp add_derived_limits(service, trips, stop_times, natural_stop_sequences, tree) do
# Summary of logic:
# Chunk trips within this service into hour windows, based on the departure time of each trip's first stop_time.
# For each chunk, collect all trips' visited stops into a set of stop IDs.
# Compare these with canonical stop sequence(s) for the line, looking for "holes" of unvisited stops.
# Compare these with stop sequence(s) for the line, looking for "holes" of unvisited stops.
# These "holes" are the derived limits.
#
# Why:
Expand All @@ -518,16 +520,19 @@ defmodule Arrow.Hastus.ExportUpload do
fn {_trip_id, stop_times} -> MapSet.new(stop_times, & &1["stop_id"]) end
)
|> Map.delete(:skip)
|> Enum.map(fn {_hour, stop_id_sets} ->
Enum.reduce(stop_id_sets, &MapSet.union/2)
end)
# Sort the map keys before iterating over them to prevent undefined order in test results.
|> Enum.sort_by(fn {hour, _} -> hour end)
|> Enum.map(fn {_hour, stop_id_sets} -> Enum.reduce(stop_id_sets, &MapSet.union/2) end)

derived_limits =
for visited_stops <- visited_stops_per_time_window,
seq <- canonical_stop_sequences,
{start_stop_id, end_stop_id} <- limits_from_sequence(seq, visited_stops) do
%{start_stop_id: start_stop_id, end_stop_id: end_stop_id}
end
visited_stops_per_time_window
|> Enum.flat_map(fn visited_stops ->
for seq <- natural_stop_sequences,
limit <- limit_slices_from_sequence(seq, visited_stops) do
limit
end
|> condense_limits(tree)
end)
|> Enum.uniq()

Map.put(service, :derived_limits, derived_limits)
Expand All @@ -545,21 +550,30 @@ defmodule Arrow.Hastus.ExportUpload do
)
end

# Returns a list of lists with the direction_id=0 canonical stop sequence(s) for the given routes.
@spec stop_sequences_for_routes([String.t()]) :: [[stop_id :: String.t()]]
defp stop_sequences_for_routes(route_ids) do
# Returns a polytree (directed acyclic graph that can have multiple roots)
# constructed from a line's canonical direction_id=1 stop sequences.
#
# Nodes of the tree use platform stop IDs as their IDs,
# and contain corresponding parent station IDs as their values.
@spec build_polytree([String.t()]) :: UnrootedPolytree.t()
defp build_polytree(route_ids) do
Arrow.Repo.all(
from t in Arrow.Gtfs.Trip,
where: t.direction_id == 0,
where: t.direction_id == 1,
where: t.service_id == "canonical",
where: t.route_id in ^route_ids,
join: st in Arrow.Gtfs.StopTime,
on: t.id == st.trip_id,
join: s in Arrow.Gtfs.Stop,
on: st.stop_id == s.id,
join: ps in Arrow.Gtfs.Stop,
on: ps.id == s.parent_station_id,
order_by: [t.id, st.stop_sequence],
select: %{trip_id: t.id, stop_id: st.stop_id}
select: %{trip_id: t.id, stop_id: st.stop_id, parent_id: ps.id}
)
|> Stream.chunk_by(& &1.trip_id)
|> Enum.map(fn stops -> Enum.map(stops, & &1.stop_id) end)
|> Enum.map(fn stops -> Enum.map(stops, &{&1.stop_id, &1.parent_id}) end)
|> UnrootedPolytree.from_lists()
end

# Maps HASTUS all_trips.txt `trp_direction` values
Expand All @@ -580,61 +594,81 @@ defmodule Arrow.Hastus.ExportUpload do
|> Map.new()
end

@typep limit :: {start_stop_id :: stop_id, end_stop_id :: stop_id}
@typep limit :: %{start_stop_id: stop_id, end_stop_id: stop_id}
@typep stop_id :: String.t()

@spec limits_from_sequence([stop_id], MapSet.t(stop_id)) :: [limit]
defp limits_from_sequence(stop_sequence, visited_stops)

defp limits_from_sequence([], _visited_stops), do: []
@spec limit_slices_from_sequence([stop_id], MapSet.t(stop_id)) :: [[stop_id]]
defp limit_slices_from_sequence(stop_sequence, visited_stops)

defp limits_from_sequence([first_stop | stops] = stop_sequence, visited_stops) do
# Regardless of whether it was visited, the first stop in the sequence
# is the potential first stop of a limit.
acc = {first_stop, first_stop in visited_stops}
defp limit_slices_from_sequence([], _visited_stops), do: []

defp limit_slices_from_sequence([first_stop | stops], visited_stops) do
Enum.chunk_while(
stops,
acc,
&chunk_limits(&1, &2, &1 in visited_stops),
&chunk_limits(&1, stop_sequence)
[first_stop],
&chunk_limits(&1, &2, visited_stops),
&chunk_limits(&1, visited_stops)
)
end

# The acc records:
# 1. the potential first stop of a limit, and
# 2. whether the previous stop in the sequence was visited by any trip in the time window.
@typep limits_acc :: {potential_first_stop_of_limit :: stop_id, prev_stop_visited? :: boolean}
@typep limit_acc :: nonempty_list(stop_id)

# chunk fun
@spec chunk_limits(stop_id, limits_acc, boolean) ::
{:cont, limit, limits_acc} | {:cont, limits_acc}
defp chunk_limits(stop, acc, stop_visited?)
@spec chunk_limits(stop_id, limit_acc, MapSet.t(stop_id)) ::
{:cont, [stop_id], limit_acc} | {:cont, limit_acc}
defp chunk_limits(stop, limit_stops, visited_stops)

defp chunk_limits(stop, limit_stops, visited_stops) do
prev_stop_visited? = hd(limit_stops) in visited_stops
stop_visited? = stop in visited_stops

defp chunk_limits(stop, {first_stop, prev_stop_visited?}, stop_visited?) do
cond do
# This stop was not visited.
# Potential start of limit remains where it was.
not stop_visited? -> {:cont, {first_stop, stop_visited?}}
# Add it to the in-progress limit.
not stop_visited? -> {:cont, [stop | limit_stops]}
# Prev stop was visited, this stop was visited.
# Potential start of limit moves to this stop.
prev_stop_visited? -> {:cont, {stop, stop_visited?}}
prev_stop_visited? -> {:cont, [stop]}
# Prev stop was not visited, this stop was visited.
# This is the end of a limit--emit it and form a new limit starting at this stop.
not prev_stop_visited? -> {:cont, {first_stop, stop}, {stop, stop_visited?}}
# This is the end of a limit--emit it and begin a new limit starting at this stop.
not prev_stop_visited? -> {:cont, Enum.reverse([stop | limit_stops]), [stop]}
end
end

# after fun
@spec chunk_limits(limits_acc, [stop_id]) :: {:cont, term} | {:cont, limit, term}
defp chunk_limits(acc, sequence)

# The last stop in the sequence was visited.
defp chunk_limits({_, true}, _), do: {:cont, nil}

# The last stop in the sequence was not visited. Emit a limit that ends with it.
defp chunk_limits({first_stop, false}, sequence) do
{:cont, {first_stop, List.last(sequence)}, nil}
@spec chunk_limits(limit_acc, MapSet.t(stop_id)) :: {:cont, term} | {:cont, [stop_id], term}
defp chunk_limits(limit_stops, visited_stops) do
# If the last stop in the sequence was not visited, emit a limit that ends with it.
if hd(limit_stops) in visited_stops,
do: {:cont, nil},
else: {:cont, Enum.reverse(limit_stops), nil}
end

# Merges all limit slices (sequences of stop IDs) produced from an hour window
# into a final, minimal list of limits.
@spec condense_limits([[stop_id]], UnrootedPolytree.t()) :: [limit]
defp condense_limits(limit_slices, tree) do
# Convert slices to parent station IDs to avoid unwanted duplicates of
# limits that include e.g. Kenmore, which has multiple eastbound platform stop IDs.
limits_tree =
limit_slices
|> Enum.map(fn slice ->
Enum.map(slice, fn stop_id ->
{:ok, tree_node} = UnrootedPolytree.node_for_id(tree, stop_id)
parent_id = tree_node.value
{parent_id, stop_id}
end)
end)
|> UnrootedPolytree.from_lists()

limits_tree
|> PolytreeHelper.all_full_paths()
# Convert parent IDs in these paths back to their child IDs
|> Enum.map(fn path ->
{:ok, start_node} = UnrootedPolytree.node_for_id(limits_tree, hd(path))
{:ok, end_node} = UnrootedPolytree.node_for_id(limits_tree, List.last(path))
%{start_stop_id: start_node.value, end_stop_id: end_node.value}
end)
end

defp chunk_dates(date, {start_date, last_date}, service, exceptions) do
Expand Down
76 changes: 76 additions & 0 deletions lib/arrow/polytree_helper.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
defmodule Arrow.PolytreeHelper do
@moduledoc """
Functions for creating and analyzing UnrootedPolytree structures.

Intended for use with subway line stop sequences, but can work with any UnrootedPolytree.
"""
alias UnrootedPolytree, as: UPTree

@doc """
Returns a list of IDs for all nodes in the polytree that have no previous nodes.
"""
@spec leftmost_ids(UPTree.t()) :: [UPTree.Node.id()]
def leftmost_ids(%UPTree{} = tree), do: do_leftmost(tree, tree.starting_nodes)

@doc """
Returns a list of all possible paths from a leftmost node to a rightmost node
in the polytree.

Each path is a list of node IDs, where the first element is a leftmost node
and the last is a rightmost node.
"""
@spec all_full_paths(UPTree.t()) :: [[UPTree.Node.id()]]
def all_full_paths(%UPTree{} = tree) do
tree
|> leftmost_ids()
|> Enum.map(&[&1])
|> then(&build_paths(tree, &1))
end

@doc """
Constructs an UnrootedPolytree from a list of stop sequences.
"""
@spec seqs_to_tree([[stop_id :: String.t()]]) :: UPTree.t()
def seqs_to_tree(seqs) do
seqs
|> Enum.map(fn seq -> Enum.map(seq, &{&1, &1}) end)
|> UPTree.from_lists()
end

defp do_leftmost(tree, ids, acc \\ [], visited \\ MapSet.new())

defp do_leftmost(_tree, [], acc, _visited), do: Enum.uniq(acc)

defp do_leftmost(tree, ids, acc, visited) do
{prev_ids, acc} =
Enum.reduce(ids, {[], acc}, fn id, {prev_ids, acc} ->
case UPTree.edges_for_id(tree, id).previous do
[] -> {prev_ids, [id | acc]}
prev -> {Enum.reject(prev, &(&1 in visited)) ++ prev_ids, acc}
end
end)

do_leftmost(tree, prev_ids, acc, for(id <- ids, into: visited, do: id))
end

defp build_paths(tree, paths, completed \\ [])

defp build_paths(_tree, [], completed) do
completed
|> Enum.reverse()
|> Enum.map(&Enum.reverse/1)
end

defp build_paths(tree, paths, completed) do
paths_with_next =
Enum.map(paths, fn [id | _] = path -> {path, UPTree.edges_for_id(tree, id).next} end)

{new_completed, paths} =
Enum.split_with(paths_with_next, &match?({_path, []}, &1))

new_completed = Enum.map(new_completed, fn {path, _} -> path end)
new_paths = Enum.flat_map(paths, fn {path, next} -> Enum.map(next, &[&1 | path]) end)

build_paths(tree, new_paths, Enum.reverse(new_completed) ++ completed)
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ defmodule Arrow.MixProject do
{:tzdata, "~> 1.1"},
{:ueberauth_oidcc, "~> 0.4.0"},
{:ueberauth, "~> 0.10"},
{:unrooted_polytree, "~> 0.1.1"},
{:wallaby, "~> 0.30", runtime: false, only: :test},
{:sentry, "~> 10.7"},
{:tailwind, "~> 0.2", runtime: Mix.env() == :dev},
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
"ueberauth": {:hex, :ueberauth, "0.10.8", "ba78fbcbb27d811a6cd06ad851793aaf7d27c3b30c9e95349c2c362b344cd8f0", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "f2d3172e52821375bccb8460e5fa5cb91cfd60b19b636b6e57e9759b6f8c10c1"},
"ueberauth_oidcc": {:hex, :ueberauth_oidcc, "0.4.1", "172f202c8e6731d30c2221f5ea67a2217c30f60436b6a236e745e978497e57d9", [:mix], [{:oidcc, "~> 3.2.0", [hex: :oidcc, repo: "hexpm", optional: false]}, {:plug, "~> 1.11", [hex: :plug, repo: "hexpm", optional: false]}, {:ueberauth, "~> 0.10", [hex: :ueberauth, repo: "hexpm", optional: false]}], "hexpm", "ba4447d428df74d5cff8b6717e1249163649d946d4aefd22f7445a9979adab54"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.7.1", "a48703a25c170eedadca83b11e88985af08d35f37c6f664d6dcfb106a97782fc", [:rebar3], [], "hexpm", "b3a917854ce3ae233619744ad1e0102e05673136776fb2fa76234f3e03b23642"},
"unrooted_polytree": {:hex, :unrooted_polytree, "0.1.1", "95027b1619d707fcbbd8980708a50efd170782142dd3de5112e9332d4cc27fef", [:mix], [], "hexpm", "9c8143d2015526ae49c3642ca509802e4db129685a57a0ec413e66546fe0c251"},
"unzip": {:hex, :unzip, "0.12.0", "beed92238724732418b41eba77dcb7f51e235b707406c05b1732a3052d1c0f36", [:mix], [], "hexpm", "95655b72db368e5a84951f0bed586ac053b55ee3815fd96062fce10ce4fc998d"},
"wallaby": {:hex, :wallaby, "0.30.10", "574afb8796521252daf49a4cd76a1c389d53cae5897f2d4b5f55dfae159c8e50", [:mix], [{:ecto_sql, ">= 3.0.0", [hex: :ecto_sql, repo: "hexpm", optional: true]}, {:httpoison, "~> 0.12 or ~> 1.0 or ~> 2.0", [hex: :httpoison, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:phoenix_ecto, ">= 3.0.0", [hex: :phoenix_ecto, repo: "hexpm", optional: true]}, {:web_driver_client, "~> 0.2.0", [hex: :web_driver_client, repo: "hexpm", optional: false]}], "hexpm", "a8f89b92d8acce37a94b5dfae6075c2ef00cb3689d6333f5f36c04b381c077b2"},
"web_driver_client": {:hex, :web_driver_client, "0.2.0", "63b76cd9eb3b0716ec5467a0f8bead73d3d9612e63f7560d21357f03ad86e31a", [:mix], [{:hackney, "~> 1.6", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:tesla, "~> 1.3", [hex: :tesla, repo: "hexpm", optional: false]}], "hexpm", "83cc6092bc3e74926d1c8455f0ce927d5d1d36707b74d9a65e38c084aab0350f"},
Expand Down
Loading