Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions apps/balados_sync_core/lib/balados_sync_core/aggregates/like.ex
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ defmodule BaladosSyncCore.Aggregates.Like do
def execute(%__MODULE__{} = state, %SnapshotLike{}) do
%LikeCheckpoint{
user_id: state.user_id,
podcast_likes: state.podcast_likes,
episode_likes: state.episode_likes,
podcast_likes: filter_old_unlikes(state.podcast_likes),
episode_likes: filter_old_unlikes(state.episode_likes),
timestamp: DateTime.utc_now() |> DateTime.truncate(:second)
}
end
Expand Down Expand Up @@ -190,4 +190,23 @@ defmodule BaladosSyncCore.Aggregates.Like do
end

def apply(%__MODULE__{} = state, _event), do: state

# Filters out likes that were unliked more than 45 days ago.
# Active likes and recent unlikes are preserved for idempotence.
# Matches the subscription aggregate pruning pattern.
#
# The unliked_at > liked_at guard (condition 2) prevents pruning entries with
# corrupted timestamps (e.g. unliked before liked). When liked_at is nil,
# falls back to epoch so any valid unliked_at passes the guard.
defp filter_old_unlikes(likes) do
forty_five_days_ago = DateTime.add(DateTime.utc_now(), -45, :day)

likes
|> Enum.reject(fn {_key, like} ->
like[:unliked_at] != nil &&
DateTime.compare(like[:unliked_at], like[:liked_at] || DateTime.from_unix!(0)) == :gt &&
DateTime.compare(like[:unliked_at], forty_five_days_ago) == :lt
end)
|> Map.new()
end
end
30 changes: 27 additions & 3 deletions apps/balados_sync_core/lib/balados_sync_core/like_normalizer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,22 @@ defmodule BaladosSyncCore.LikeNormalizer do
and the LikeProjector.
"""

# Explicit whitelist of allowed keys in like data maps.
# Using a whitelist instead of String.to_existing_atom avoids unhelpful
# ArgumentError on checkpoint replay if a field is ever renamed.
# When adding new fields to like data maps, add the mapping here too.
@allowed_keys %{
"liked_at" => :liked_at,
"unliked_at" => :unliked_at,
"rss_source_feed" => :rss_source_feed
}

@doc """
Normalize a map of like data entries, converting string keys to atoms.

Unknown string keys are dropped silently to avoid crashes on replay
with stale checkpoint data.

## Examples

iex> normalize(%{"feed-1" => %{"liked_at" => ~U[2024-01-01 00:00:00Z]}})
Expand All @@ -23,10 +36,21 @@ defmodule BaladosSyncCore.LikeNormalizer do
def normalize(_), do: %{}

defp atomize_keys(map) when is_map(map) do
Map.new(map, fn
{key, value} when is_binary(key) -> {String.to_existing_atom(key), value}
{key, value} -> {key, value}
map
|> Enum.flat_map(fn
{key, value} when is_binary(key) ->
case Map.fetch(@allowed_keys, key) do
{:ok, atom_key} -> [{atom_key, value}]
:error -> []
end

{key, value} when is_atom(key) ->
[{key, value}]

_ ->
[]
end)
|> Map.new()
end

defp atomize_keys(value), do: value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,5 +335,89 @@ defmodule BaladosSyncCore.Aggregates.LikeTest do
new_state = Like.apply(state, %{some: "unknown event"})
assert new_state == state
end

test "LikeCheckpoint drops unknown string keys from JSON deserialization" do
state = %Like{user_id: nil}

event = %LikeCheckpoint{
user_id: "user-1",
podcast_likes: %{
"feed-1" => %{
"liked_at" => ~U[2024-01-01 00:00:00Z],
"unliked_at" => nil,
"some_future_field" => "should be dropped"
}
},
episode_likes: %{}
}

new_state = Like.apply(state, event)

# Known keys are preserved
assert new_state.podcast_likes["feed-1"].liked_at == ~U[2024-01-01 00:00:00Z]
# Unknown keys are silently dropped
refute Map.has_key?(new_state.podcast_likes["feed-1"], :some_future_field)
refute Map.has_key?(new_state.podcast_likes["feed-1"], "some_future_field")
end
end

describe "SnapshotLike 45-day pruning" do
test "prunes unliked entries older than 45 days" do
old_date = DateTime.add(DateTime.utc_now(), -60, :day)
recent_date = DateTime.add(DateTime.utc_now(), -10, :day)

state = %Like{
user_id: "user-1",
podcast_likes: %{
"feed-active" => %{liked_at: ~U[2024-01-01 00:00:00Z], unliked_at: nil},
"feed-old-unlike" => %{liked_at: ~U[2024-01-01 00:00:00Z], unliked_at: old_date},
"feed-recent-unlike" => %{liked_at: ~U[2024-01-01 00:00:00Z], unliked_at: recent_date}
},
episode_likes: %{
"item-active" => %{
liked_at: ~U[2024-01-01 00:00:00Z],
unliked_at: nil,
rss_source_feed: "feed-1"
},
"item-old-unlike" => %{
liked_at: ~U[2024-01-01 00:00:00Z],
unliked_at: old_date,
rss_source_feed: "feed-1"
}
}
}

cmd = %SnapshotLike{user_id: "user-1"}
event = Like.execute(state, cmd)

assert %LikeCheckpoint{} = event
# Active likes preserved
assert Map.has_key?(event.podcast_likes, "feed-active")
# Recent unlikes preserved (for idempotence)
assert Map.has_key?(event.podcast_likes, "feed-recent-unlike")
# Old unlikes pruned
refute Map.has_key?(event.podcast_likes, "feed-old-unlike")
# Episode: active preserved, old pruned
assert Map.has_key?(event.episode_likes, "item-active")
refute Map.has_key?(event.episode_likes, "item-old-unlike")
end

test "preserves entry where unliked_at equals liked_at (same timestamp)" do
same_time = DateTime.add(DateTime.utc_now(), -60, :day)

state = %Like{
user_id: "user-1",
podcast_likes: %{
"feed-same-ts" => %{liked_at: same_time, unliked_at: same_time}
},
episode_likes: %{}
}

cmd = %SnapshotLike{user_id: "user-1"}
event = Like.execute(state, cmd)

# unliked_at == liked_at does NOT satisfy the :gt guard, so entry is preserved
assert Map.has_key?(event.podcast_likes, "feed-same-ts")
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,38 @@ defmodule BaladosSyncProjections.Projectors.PopularityProjectorTest do
assert pop.score == 14
assert user2 in pop.likes_people
end

test "private user increments likes but is not added to likes_people" do
feed = encode_feed("https://example.com/feed.xml")
public_user = uuid()
private_user = uuid()

# Public user likes first
event1 = %PodcastLiked{
user_id: public_user,
rss_source_feed: feed,
liked_at: now(),
timestamp: now(),
event_infos: %{}
}

assert {:ok, _} = apply_popularity_event(event1)

# Private user likes
event2 = %PodcastLiked{
user_id: private_user,
rss_source_feed: feed,
liked_at: now(),
timestamp: now(),
event_infos: %{}
}

assert {:ok, pop} = apply_popularity_event(event2, is_public: false)
assert pop.likes == 2
assert pop.score == 14
assert public_user in pop.likes_people
refute private_user in pop.likes_people
end
end

describe "PodcastUnliked" do
Expand Down Expand Up @@ -163,6 +195,28 @@ defmodule BaladosSyncProjections.Projectors.PopularityProjectorTest do
podcast_pop = ProjectionsRepo.get(PodcastPopularity, feed)
assert podcast_pop.score == 7
end

test "private user increments episode likes but is not added to likes_people" do
feed = encode_feed("https://example.com/feed.xml")
item = encode_item("episode-1", "https://example.com/ep1.mp3")
private_user = uuid()

event = %EpisodeLiked{
user_id: private_user,
rss_source_feed: feed,
rss_source_item: item,
liked_at: now(),
timestamp: now(),
event_infos: %{}
}

assert {:ok, _} = apply_popularity_event(event, is_public: false)

episode_pop = ProjectionsRepo.get(EpisodePopularity, item)
assert episode_pop.likes == 1
assert episode_pop.score == 7
refute private_user in (episode_pop.likes_people || [])
end
end

describe "EpisodeUnliked" do
Expand Down
34 changes: 24 additions & 10 deletions apps/balados_sync_projections/test/support/projector_test_case.ex
Original file line number Diff line number Diff line change
Expand Up @@ -195,18 +195,18 @@ defmodule BaladosSyncProjections.ProjectorTestCase do
Separate from `apply_event` because popularity projections are handled by
a different projector (PopularityProjector) and require different schemas.
"""
def apply_popularity_event(event) do
def apply_popularity_event(event, opts \\ []) do
alias BaladosSyncProjections.ProjectionsRepo

case event do
%BaladosSyncCore.Events.PodcastLiked{} ->
apply_popularity_podcast_liked(event, ProjectionsRepo)
apply_popularity_podcast_liked(event, ProjectionsRepo, opts)

%BaladosSyncCore.Events.PodcastUnliked{} ->
apply_popularity_podcast_unliked(event, ProjectionsRepo)

%BaladosSyncCore.Events.EpisodeLiked{} ->
apply_popularity_episode_liked(event, ProjectionsRepo)
apply_popularity_episode_liked(event, ProjectionsRepo, opts)

%BaladosSyncCore.Events.EpisodeUnliked{} ->
apply_popularity_episode_unliked(event, ProjectionsRepo)
Expand Down Expand Up @@ -750,25 +750,32 @@ defmodule BaladosSyncProjections.ProjectorTestCase do
# ============================================================================
# Popularity Projector Logic (Like events)
#
# Note: these test helpers assume all users are public (always add to likes_people).
# The real PopularityProjector checks UserPrivacy and conditionally adds users.
# Privacy-related behaviour is tested separately via the full projector integration.
# By default, these test helpers assume all users are public (add to likes_people).
# Pass `is_public: false` in opts to simulate a private user (increments likes
# count but does not add to likes_people), matching PopularityProjector behaviour.
# ============================================================================

@score_like 7

defp apply_popularity_podcast_liked(event, repo) do
defp apply_popularity_podcast_liked(event, repo, opts \\ []) do
alias BaladosSyncProjections.Schemas.PodcastPopularity

is_public = Keyword.get(opts, :is_public, true)

popularity =
repo.get(PodcastPopularity, event.rss_source_feed) ||
%PodcastPopularity{rss_source_feed: event.rss_source_feed}

likes_people =
if is_public,
do: add_recent_user(popularity.likes_people, event.user_id),
else: popularity.likes_people || []

attrs = %{
rss_source_feed: event.rss_source_feed,
score: popularity.score + @score_like,
likes: popularity.likes + 1,
likes_people: add_recent_user(popularity.likes_people, event.user_id)
likes_people: likes_people
}

repo.insert_or_update(
Expand Down Expand Up @@ -800,9 +807,11 @@ defmodule BaladosSyncProjections.ProjectorTestCase do
end
end

defp apply_popularity_episode_liked(event, repo) do
defp apply_popularity_episode_liked(event, repo, opts \\ []) do
alias BaladosSyncProjections.Schemas.{EpisodePopularity, PodcastPopularity}

is_public = Keyword.get(opts, :is_public, true)

# Episode popularity
episode_pop =
repo.get(EpisodePopularity, event.rss_source_item) ||
Expand All @@ -811,12 +820,17 @@ defmodule BaladosSyncProjections.ProjectorTestCase do
rss_source_feed: event.rss_source_feed
}

likes_people =
if is_public,
do: add_recent_user(episode_pop.likes_people, event.user_id),
else: episode_pop.likes_people || []

episode_attrs = %{
rss_source_item: event.rss_source_item,
rss_source_feed: event.rss_source_feed,
score: episode_pop.score + @score_like,
likes: episode_pop.likes + 1,
likes_people: add_recent_user(episode_pop.likes_people, event.user_id)
likes_people: likes_people
}

{:ok, _} =
Expand Down