Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/blunt/lib/blunt.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Blunt do
use Application

def start(_type, _args) do
[Blunt.DispatchContext.Shipper]
[Blunt.DispatchContext.Shipper, Blunt.Message.State]
|> Supervisor.start_link(strategy: :one_for_one, name: Blunt.Supervisor)
end

Expand Down
2 changes: 1 addition & 1 deletion apps/blunt/lib/blunt/command.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Blunt.Command do

defmacro __using__(opts) do
opts =
[require_all_fields?: true, keep_discarded_data: true]
[require_all_fields?: true]
|> Keyword.merge(opts)
|> Keyword.put(:dispatch?, true)
|> Keyword.put(:message_type, :command)
Expand Down
11 changes: 5 additions & 6 deletions apps/blunt/lib/blunt/dispatch_context.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Blunt.DispatchContext do
alias Blunt.Config
alias Blunt.Message.{Metadata, Options}
alias Blunt.Message.{Metadata, Options, State}

defmodule Error do
defexception [:message]
Expand Down Expand Up @@ -40,15 +40,16 @@ defmodule Blunt.DispatchContext do
def new(%{__struct__: message_module} = message, opts) do
message_type = Metadata.message_type(message_module)

{discarded_data, message} = Map.get_and_update(message, :discarded_data, fn data -> {data, %{}} end)
%{id: id, discarded_data: discarded_data, user_supplied_fields: user_supplied_fields} = State.get(message)

context = %__MODULE__{
id: UUID.uuid4(),
id: id,
opts: opts,
message: message,
message_type: message_type,
message_module: message_module,
discarded_data: discarded_data,
user_supplied_fields: user_supplied_fields,
created_at: DateTime.utc_now(),
pid: self()
}
Expand All @@ -71,7 +72,6 @@ defmodule Blunt.DispatchContext do
defp populate_from_opts(%{opts: opts} = base_context) do
{user, opts} = Keyword.pop(opts, :user)
{async, opts} = Keyword.pop(opts, :async, false)
{user_supplied_fields, opts} = Keyword.pop(opts, :user_supplied_fields, [])

return = Keyword.get(opts, :return, :response)

Expand All @@ -81,8 +81,7 @@ defmodule Blunt.DispatchContext do
opts: opts,
async: async,
return: return,
last_pipeline_step: :read_opts,
user_supplied_fields: user_supplied_fields
last_pipeline_step: :read_opts
}
end

Expand Down
1 change: 1 addition & 0 deletions apps/blunt/lib/blunt/dispatch_strategy/default.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ defmodule Blunt.DispatchStrategy.Default do
def dispatch(%{message_type: :query} = context) do
bindings = Query.bindings(context)
filter_list = Query.create_filter_list(context)

pipeline = PipelineResolver.get_pipeline!(context, QueryHandler)

context =
Expand Down
13 changes: 9 additions & 4 deletions apps/blunt/lib/blunt/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ defmodule Blunt.Message do
PrimaryKey,
Schema,
Schema.Fields,
Version
Version,
State
}

defmodule Error do
Expand Down Expand Up @@ -70,9 +71,7 @@ defmodule Blunt.Message do
@behaviour Blunt.Message
@before_compile Blunt.Message

if opts[:keep_discarded_data] do
@schema_fields {:discarded_data, :map, required: false, internal: true, virtual: true}
end
@schema_fields {:__blunt_id, :binary_id, required: false, internal: true, virtual: true}

@impl true
def handle_validate(changeset, _opts), do: changeset
Expand Down Expand Up @@ -152,4 +151,10 @@ defmodule Blunt.Message do

@doc false
defdelegate compile_start(message_module), to: Blunt.Message.Compilation

def user_supplied_fields(%{__blunt_id: id}),
do: State.get(id, :user_supplied_fields, [])

def discarded_data(%{__blunt_id: id}),
do: State.get(id, :discarded_data, [])
end
47 changes: 23 additions & 24 deletions apps/blunt/lib/blunt/message/changeset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,14 @@ defmodule Blunt.Message.Changeset do
@moduledoc false

alias Ecto.Changeset
alias Blunt.Message.{Input, Metadata}
alias Blunt.Message.{Input, Metadata, State}
alias Blunt.Message.Schema.BuiltInValidations
alias Blunt.Message.Changeset, as: MessageChangeset

def generate do
quote location: :keep, generated: true do
def changeset(values, opts \\ [])
when is_list(values) or is_map(values) or is_struct(values) do
changeset_with_discarded_data(values, opts) |> elem(0)
end

@doc false
def changeset_with_discarded_data(values, opts \\ [])
when is_list(values) or is_map(values) or is_struct(values) do
def changeset(values, opts \\ []) when is_list(values) or is_map(values) or is_struct(values) do
MessageChangeset.changeset(%__MODULE__{}, values, opts)
end
end
Expand All @@ -38,11 +32,15 @@ defmodule Blunt.Message.Changeset do

def changeset(message_module, values, opts) when is_list(values) or is_map(values) do
fields = Metadata.field_names(message_module)
virtual_fields = Metadata.field_names(message_module, :virtual)
required_fields = Metadata.field_names(message_module, :required)
static_fields = Metadata.field_names(message_module, :static) |> Enum.map(&to_string/1)

values =
blunt_id = UUID.uuid4()

final_values =
values
|> Map.put(:__blunt_id, blunt_id)
|> Input.normalize(message_module)
|> set_defaults_for_required_fields(message_module)
|> autogenerate_fields(message_module)
Expand All @@ -53,30 +51,31 @@ defmodule Blunt.Message.Changeset do
embeds = message_module.__schema__(:embeds)

discarded_data =
values
final_values
|> Map.drop(Enum.map(fields, &to_string/1))
|> Map.drop(Enum.map(embeds, &to_string/1))

user_supplied_fields =
values
|> Map.take(Enum.map(fields, &to_string/1))
|> Map.keys()
|> Enum.map(&String.to_existing_atom/1)

_ = State.put(blunt_id, %{discarded_data: discarded_data, user_supplied_fields: user_supplied_fields})

changeset =
message_module
|> struct()
|> Changeset.cast(values, fields -- embeds)
|> Changeset.cast(final_values, (fields ++ virtual_fields) -- embeds)

opts = opts |> List.wrap() |> Keyword.new()
{type, opts} = Keyword.pop(opts, :type, :schema)
embed_changeset = {__MODULE__, :changeset, [Keyword.put(opts, :type, :embed)]}
embed_changeset = {__MODULE__, :changeset, []}

changeset =
embeds
|> Enum.reduce(changeset, &Changeset.cast_embed(&2, &1, with: embed_changeset))
|> Changeset.validate_required(required_fields)
|> run_built_in_validations(message_module)
|> message_module.handle_validate(opts)

case type do
:embed -> changeset
:schema -> {changeset, discarded_data}
end
embeds
|> Enum.reduce(changeset, &Changeset.cast_embed(&2, &1, with: embed_changeset))
|> Changeset.validate_required(required_fields)
|> run_built_in_validations(message_module)
|> message_module.handle_validate(opts)
end

defp set_defaults_for_required_fields(values, message_module) do
Expand Down
27 changes: 10 additions & 17 deletions apps/blunt/lib/blunt/message/constructor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ defmodule Blunt.Message.Constructor do

alias Ecto.Changeset
alias __MODULE__, as: Constructor
alias Blunt.Message.{Documentation, Input, Options}
alias Blunt.Message.Changeset, as: MessageChangeset
alias Blunt.Message.{Documentation, Input, Metadata}

defmacro register(opts) do
quote bind_quoted: [opts: opts] do
Expand All @@ -17,7 +17,11 @@ defmodule Blunt.Message.Constructor do
constructor = Module.get_attribute(module, :constructor)
doc = Documentation.generate_constructor_doc(module)
pk_type = Module.get_attribute(module, :primary_key_type)
schema_fields = Module.get_attribute(module, :schema_fields)

schema_fields =
Module.get_attribute(module, :schema_fields)
|> Enum.reject(&match?({:__blunt_id, _type, _opts}, &1))

required_fields = Module.get_attribute(module, :required_fields)

constructor_info = %{
Expand All @@ -30,12 +34,6 @@ defmodule Blunt.Message.Constructor do
Constructor.do_generate(constructor_info)
end

# defp type_spec(schema_fields) do
# {required, optional} = Enum.split_with(schema_fields, fn {_name, _type, config} ->
# Keyword.get(config, :required) == true
# end)
# end

def do_generate(%{has_fields?: true, has_required_fields?: true, name: name, docs: docs}) do
quote do
@type input :: map() | struct() | keyword()
Expand Down Expand Up @@ -73,21 +71,16 @@ defmodule Blunt.Message.Constructor do
overrides = Input.normalize(overrides, module)
input = Map.merge(values, overrides)

with {:ok, message} <- input |> module.changeset_with_discarded_data(opts) |> handle_changeset(module) do
with {:ok, opts} <- Options.Parser.parse_message_opts(module, opts),
{:ok, message} <- input |> module.changeset(opts) |> handle_changeset() do
{:ok, module.after_validate(message)}
end
end

defp handle_changeset({%{valid?: false} = changeset, _discarded_data}, _message_module),
defp handle_changeset(%{valid?: false} = changeset),
do: {:error, MessageChangeset.format_errors(changeset)}

defp handle_changeset({changeset, discarded_data}, message_module) do
changeset =
case Metadata.has_field?(message_module, :discarded_data) do
true -> Changeset.put_change(changeset, :discarded_data, discarded_data)
false -> changeset
end

defp handle_changeset(changeset) do
{:ok, Changeset.apply_action!(changeset, :create)}
end
end
55 changes: 55 additions & 0 deletions apps/blunt/lib/blunt/message/state.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
defmodule Blunt.Message.State do
use DynamicSupervisor

def start_link(opts) do
DynamicSupervisor.start_link(__MODULE__, opts, name: __MODULE__)
end

@impl true
def init(_opts) do
DynamicSupervisor.init(strategy: :one_for_one)
end

def put(%{__blunt_id: id}, value) when is_map(value),
do: put(id, value)

def put(id, value) when is_map(value) do
id
|> get_or_start_server()
|> __MODULE__.Server.put(value)
end

def put(%{__blunt_id: id}, key, value) when is_atom(key),
do: put(id, key, value)

def put(id, key, value) when is_atom(key) do
id
|> get_or_start_server()
|> __MODULE__.Server.put(key, value)
end

def get(%{__blunt_id: id}),
do: get(id)

def get(id) do
id
|> get_or_start_server()
|> __MODULE__.Server.get()
end

def get(%{__blunt_id: id}, key, default \\ nil) when is_atom(key) do
id
|> get()
|> Map.get(key, default)
end

defp get_or_start_server(id) do
spec = Supervisor.child_spec({__MODULE__.Server, id}, restart: :transient)

case DynamicSupervisor.start_child(__MODULE__, spec) do
{:ok, pid} -> pid
{:ok, pid, _} -> pid
{:error, {:already_started, pid}} -> pid
end
end
end
34 changes: 34 additions & 0 deletions apps/blunt/lib/blunt/message/state/server.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
defmodule Blunt.Message.State.Server do
@timeout :timer.minutes(1)
use GenServer, restart: :transient

def start_link(id) do
name = {:global, inspect(__MODULE__) <> "/" <> id}
GenServer.start_link(__MODULE__, %{id: id}, name: name)
end

def get(pid), do: GenServer.call(pid, :get)

def put(pid, key, value),
do: GenServer.call(pid, {:put, key, value})

def put(pid, value) when is_map(value),
do: GenServer.call(pid, {:put, value})

@impl true
def init(state), do: {:ok, state}

@impl true
def handle_call(:get, _from, state),
do: {:reply, state, state, @timeout}

def handle_call({:put, value}, _from, state) do
new_state = Map.merge(state, value)
{:reply, new_state, new_state, @timeout}
end

def handle_call({:put, key, value}, _from, state) do
new_state = Map.put(state, key, value)
{:reply, new_state, new_state, @timeout}
end
end
2 changes: 1 addition & 1 deletion apps/blunt/lib/blunt/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ defmodule Blunt.Query do

filter_map
|> Map.from_struct()
|> Map.delete(:discarded_data)
|> Map.delete(:__blunt_id)
|> reject_nil_filters(opts)
end

Expand Down
2 changes: 0 additions & 2 deletions apps/blunt/lib/blunt/testing/factories/dispatch_strategy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ if Code.ensure_loaded?(ExMachina) and Code.ensure_loaded?(Faker) do
defexception [:message]
end

alias Blunt.Message.Metadata
alias Blunt.{DispatchContext, Message}

def handle_dispatch(message, opts),
Expand All @@ -22,7 +21,6 @@ if Code.ensure_loaded?(ExMachina) and Code.ensure_loaded?(Faker) do
dispatch_opts
|> Keyword.put(:dispatched_from, :ex_machina)
|> Keyword.update(:return, :context, &Function.identity/1)
|> Keyword.put(:user_supplied_fields, Metadata.field_names(module))

case module.dispatch({:ok, message}, dispatch_opts) do
{:error, %DispatchContext{} = context} ->
Expand Down
12 changes: 3 additions & 9 deletions apps/blunt/test/blunt/command_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule Blunt.CommandTest do
use ExUnit.Case, async: true

alias Blunt.Message
alias Blunt.Message.Metadata
alias Blunt.CommandTest.Protocol
alias Blunt.{Command, DispatchContext}
Expand All @@ -25,19 +26,12 @@ defmodule Blunt.CommandTest do
describe "discarded data" do
alias Protocol.DispatchWithPipeline

test "field is defined" do
assert {:discarded_data, :map, [required: false, internal: true, virtual: true]} =
DispatchWithPipeline
|> Metadata.fields()
|> Enum.find(&match?({:discarded_data, _, _}, &1))
end

test "data is preserved" do
test "data is preserved in state" do
attrs = %{name: "chris", poop: :yum}

{:ok, command} = DispatchWithPipeline.new(attrs)

assert %{discarded_data: %{"poop" => :yum}} = command
assert %{discarded_data: %{"poop" => :yum}} = Message.State.get(command)

assert {:ok, dispatch_context} = DispatchWithPipeline.dispatch(command, reply_to: self(), return: :context)

Expand Down
Loading