From 0145663b6b13f8a83f1c77336fb600c698b5a86f Mon Sep 17 00:00:00 2001 From: Ulisses Almeida Date: Mon, 11 Oct 2021 15:10:38 +0100 Subject: [PATCH] Add expiration and message retention options to Subscription Hello! In Cloud Pubsub we can specify for how long a message will be retained (https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/create#request-body) We're using these changes on our production system and has been working pretty well. Our use case: We create many dynamic subscriptions during the day. Old subscriptions aren't used, so it ended up with some leftover long-lived subscriptions that we don't use. It can be dangerous because we have a limit on how many subscriptions we can have. The default expiration of an inactive subscription is 31 days, we want to reduce it to t the minimum value that is 1 day. However, according to the docs, the subscription expiration should always be greater than the message retention duration. The default message retention is 7 days, so we need to reduce it together. --- lib/kane/subscription.ex | 54 ++++++++++++++++++++++--------- test/kane/subscription_test.exs | 56 +++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 14 deletions(-) diff --git a/lib/kane/subscription.ex b/lib/kane/subscription.ex index b2d6bc9..c4fbc4e 100644 --- a/lib/kane/subscription.ex +++ b/lib/kane/subscription.ex @@ -1,5 +1,11 @@ defmodule Kane.Subscription do - defstruct name: nil, topic: nil, ack_deadline: 10, filter: nil + defstruct name: nil, + topic: nil, + ack_deadline: 10, + filter: nil, + expires_in: nil, + message_retention_duration: nil + alias Kane.Topic alias Kane.Message alias Kane.Client @@ -112,19 +118,29 @@ defmodule Kane.Subscription do end end - def data(%__MODULE__{ack_deadline: ack, topic: %Topic{} = topic, filter: nil}, :create) do - %{ - "topic" => Topic.full_name(topic), - "ackDeadlineSeconds" => ack - } - end + def data(%__MODULE__{} = sub, :create) do + sub + |> Map.from_struct() + |> Enum.map(fn + {:topic, topic} -> + {"topic", Topic.full_name(topic)} - def data(%__MODULE__{ack_deadline: ack, topic: %Topic{} = topic, filter: filter}, :create) do - %{ - "topic" => Topic.full_name(topic), - "ackDeadlineSeconds" => ack, - "filter" => filter - } + {:ack_deadline, ack} -> + {"ackDeadlineSeconds", ack} + + {:filter, filter} when is_binary(filter) -> + {"filter", filter} + + {:expires_in, expires_in} when is_integer(expires_in) -> + {"expirationPolicy", %{"ttl" => "#{expires_in}s"}} + + {:message_retention_duration, retention} when is_integer(retention) -> + {"messageRetentionDuration", "#{retention}s"} + + {key_to_drop, _v} -> {key_to_drop, nil} + end) + |> Enum.reject(fn {_k, value} -> is_nil(value) end) + |> Map.new() end def data(%__MODULE__{}, :pull, options) do @@ -180,10 +196,20 @@ defmodule Kane.Subscription do name: strip!(subscription_name), ack_deadline: Map.get(data, "ackDeadlineSeconds"), topic: %Topic{name: Topic.strip!(topic_name)}, - filter: Map.get(data, "filter") + filter: Map.get(data, "filter"), + message_retention_duration: data |> Map.get("messageRetentionDuration") |> parse_seconds(), + expires_in: data |> get_in(["expirationPolicy", "ttl"]) |> parse_seconds() } end + def parse_seconds(nil), do: nil + + def parse_seconds(string) when is_binary(string) do + {seconds, "s"} = Integer.parse(string) + + seconds + end + defp http_options(options) do case Keyword.get(options, :return_immediately, true) do false -> [recv_timeout: :infinity] diff --git a/test/kane/subscription_test.exs b/test/kane/subscription_test.exs index 082f861..98ce1db 100644 --- a/test/kane/subscription_test.exs +++ b/test/kane/subscription_test.exs @@ -129,6 +129,60 @@ defmodule Kane.SubscriptionTest do }} = Subscription.create(sub) end + test "includes expiration keys when creating a subscription", %{ + bypass: bypass + } do + name = "create-sub" + topic = "topic-to-sub" + expires_in = 24 |> :timer.hours() |> to_seconds() + message_retention = 10 |> :timer.minutes() |> to_seconds() + + sub = %Subscription{ + name: name, + topic: %Topic{name: topic}, + expires_in: expires_in, + message_retention_duration: message_retention + } + + sname = Subscription.full_name(sub) + tname = Topic.full_name(sub.topic) + + Bypass.expect(bypass, fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + + assert body == + %{ + "topic" => tname, + "ackDeadlineSeconds" => sub.ack_deadline, + "messageRetentionDuration" => "#{message_retention}s", + "expirationPolicy" => %{ + "ttl" => "#{expires_in}s" + } + } + |> Jason.encode!() + + assert conn.method == "PUT" + assert_content_type(conn, "application/json") + + Plug.Conn.send_resp(conn, 201, ~s({ + "name": "#{sname}", + "topic": "#{tname}", + "ackDeadlineSeconds": 10, + "messageRetentionDuration": "#{message_retention}s", + "expirationPolicy": {"ttl": "#{expires_in}s"} + })) + end) + + assert {:ok, + %Subscription{ + topic: %Topic{name: ^topic}, + name: ^name, + ack_deadline: 10, + expires_in: ^expires_in, + message_retention_duration: ^message_retention + }} = Subscription.create(sub) + end + test "deleting a subscription", %{bypass: bypass, project: project} do name = "delete-me" @@ -313,4 +367,6 @@ defmodule Kane.SubscriptionTest do assert String.contains?(content_type, type) end + + defp to_seconds(milliseconds), do: div(milliseconds, 1000) end