diff --git a/lib/data_layer.ex b/lib/data_layer.ex index 07812631..9755eedf 100644 --- a/lib/data_layer.ex +++ b/lib/data_layer.ex @@ -662,6 +662,7 @@ defmodule AshPostgres.DataLayer do def can?(_, :combine), do: true def can?(_, {:combine, _}), do: true def can?(_, :bulk_create), do: true + def can?(_, :bulk_upsert_return_skipped), do: true def can?(_, :action_select), do: true @@ -2035,6 +2036,74 @@ defmodule AshPostgres.DataLayer do repo.insert_all(source, ecto_changesets, opts) end) + identity = options[:identity] + keys = Map.get(identity || %{}, :keys) || Ash.Resource.Info.primary_key(resource) + + # if it's single the return_skipped_upsert? is handled at the + # call site https://github.com/ash-project/ash_postgres/blob/0b21d4a99cc3f6d8676947e291ac9b9d57ad6e2e/lib/data_layer.ex#L3046-L3046 + result = + if options[:return_skipped_upsert?] && !opts[:single?] do + [changeset | _] = changesets + + results_by_identity = + result + |> elem(1) + |> List.wrap() + |> Enum.into(%{}, fn r -> + {Map.take(r, keys), r} + end) + + ash_query = + resource + |> Ash.Query.do_filter( + or: + changesets + |> Enum.filter(fn changeset -> + not Map.has_key?( + results_by_identity, + Map.take(changeset.attributes, keys) + ) + end) + |> Enum.map(fn changeset -> + changeset.attributes + |> Map.take(keys) + |> Keyword.new() + end) + ) + |> then(fn + query when is_nil(identity) or is_nil(identity.where) -> query + query -> Ash.Query.do_filter(query, identity.where) + end) + |> Ash.Query.set_tenant(changeset.tenant) + + skipped_upserts = + with {:ok, ecto_query} <- Ash.Query.data_layer_query(ash_query), + {:ok, results} <- run_query(ecto_query, resource) do + results + |> Enum.map(fn result -> + Ash.Resource.put_metadata(result, :upsert_skipped, true) + end) + |> Enum.reduce(%{}, fn r, acc -> + Map.put(acc, Map.take(r, keys), r) + end) + end + + results = + changesets + |> Enum.map(fn changeset -> + identity = + changeset.attributes + |> Map.take(keys) + + Map.get(results_by_identity, identity, Map.get(skipped_upserts, identity)) + end) + |> Enum.filter(& &1) + + {length(results), results} + else + result + end + case result do {_, nil} -> :ok @@ -2045,25 +2114,43 @@ defmodule AshPostgres.DataLayer do {:ok, results} else - {:ok, - Stream.zip_with(results, changesets, fn result, changeset -> - if !opts[:upsert?] do - maybe_create_tenant!(resource, result) - end - - case get_bulk_operation_metadata(changeset) do - {index, metadata_key} -> - Ash.Resource.put_metadata(result, metadata_key, index) - - nil -> - # Compatibility fallback - Ash.Resource.put_metadata( - result, - :bulk_create_index, - changeset.context[:bulk_create][:index] - ) - end - end)} + results_by_identity = + results + |> Enum.into(%{}, fn r -> + {Map.take(r, keys), r} + end) + + results = + changesets + |> Enum.map(fn changeset -> + identity = + changeset.attributes + |> Map.take(keys) + + result_for_changeset = Map.get(results_by_identity, identity) + + if result_for_changeset do + if !opts[:upsert?] do + maybe_create_tenant!(resource, result_for_changeset) + end + + case get_bulk_operation_metadata(changeset) do + {index, metadata_key} -> + Ash.Resource.put_metadata(result_for_changeset, metadata_key, index) + + nil -> + # Compatibility fallback + Ash.Resource.put_metadata( + result_for_changeset, + :bulk_create_index, + changeset.context[:bulk_create][:index] + ) + end + end + end) + |> Enum.filter(& &1) + + {:ok, results} end end rescue diff --git a/mix.exs b/mix.exs index cf59bf71..af4055bf 100644 --- a/mix.exs +++ b/mix.exs @@ -177,10 +177,10 @@ defmodule AshPostgres.MixProject do # Run "mix help deps" to learn about dependencies. defp deps do [ - {:ash, ash_version("~> 3.5 and >= 3.5.35")}, + {:ash, ash_version("~> 3.5 and >= 3.6.2")}, {:spark, "~> 2.3 and >= 2.3.4"}, {:ash_sql, ash_sql_version("~> 0.3 and >= 0.3.2")}, - {:igniter, "~> 0.6 and >= 0.6.14", optional: true}, + {:igniter, "~> 0.6 and >= 0.6.29", optional: true}, {:ecto_sql, "~> 3.13"}, {:ecto, "~> 3.13"}, {:jason, "~> 1.0"}, diff --git a/mix.lock b/mix.lock index 1827dcb4..05bd993c 100644 --- a/mix.lock +++ b/mix.lock @@ -1,5 +1,5 @@ %{ - "ash": {:hex, :ash, "3.5.43", "222f9a8ac26ad3b029f8e69306cc83091c992d858b4538af12e33a148f301cab", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.7", [hex: :ecto, repo: "hexpm", optional: false]}, {:ets, "~> 0.8", [hex: :ets, repo: "hexpm", optional: false]}, {:igniter, ">= 0.6.29 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:picosat_elixir, "~> 0.2", [hex: :picosat_elixir, repo: "hexpm", optional: true]}, {:plug, ">= 0.0.0", [hex: :plug, repo: "hexpm", optional: true]}, {:reactor, "~> 0.11", [hex: :reactor, repo: "hexpm", optional: false]}, {:simple_sat, ">= 0.1.1 and < 1.0.0-0", [hex: :simple_sat, repo: "hexpm", optional: true]}, {:spark, ">= 2.3.3 and < 3.0.0-0", [hex: :spark, repo: "hexpm", optional: false]}, {:splode, ">= 0.2.6 and < 1.0.0-0", [hex: :splode, repo: "hexpm", optional: false]}, {:stream_data, "~> 1.0", [hex: :stream_data, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "48b2aa274c524f5b968c563dd56aec8f9b278c529c8aa46e6fe0ca564c26cc1c"}, + "ash": {:hex, :ash, "3.6.2", "90d1c8296be777b90caabf51b99323d6618a0b92594dfab92b02bdf848ac38bf", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.7", [hex: :ecto, repo: "hexpm", optional: false]}, {:ets, "~> 0.8", [hex: :ets, repo: "hexpm", optional: false]}, {:igniter, ">= 0.6.29 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:picosat_elixir, "~> 0.2", [hex: :picosat_elixir, repo: "hexpm", optional: true]}, {:plug, ">= 0.0.0", [hex: :plug, repo: "hexpm", optional: true]}, {:reactor, "~> 0.11", [hex: :reactor, repo: "hexpm", optional: false]}, {:simple_sat, ">= 0.1.1 and < 1.0.0-0", [hex: :simple_sat, repo: "hexpm", optional: true]}, {:spark, ">= 2.3.3 and < 3.0.0-0", [hex: :spark, repo: "hexpm", optional: false]}, {:splode, ">= 0.2.6 and < 1.0.0-0", [hex: :splode, repo: "hexpm", optional: false]}, {:stream_data, "~> 1.0", [hex: :stream_data, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3546b5798dd24576cc451f6e03f3d6e3bb62666c0921bfe8aae700c599d9c38d"}, "ash_sql": {:hex, :ash_sql, "0.3.2", "e2d65dac1c813cbd2569a750bf1c063109778e840052e44535ced294d7638a19", [:mix], [{:ash, ">= 3.5.43 and < 4.0.0-0", [hex: :ash, repo: "hexpm", optional: false]}, {:ecto, "~> 3.9", [hex: :ecto, repo: "hexpm", optional: false]}, {:ecto_sql, "~> 3.9", [hex: :ecto_sql, repo: "hexpm", optional: false]}], "hexpm", "1f6e5d827c0eb55fc5a07f58eb97f9bb3e6b290d83df75883f422537b98c9c68"}, "benchee": {:hex, :benchee, "1.4.0", "9f1f96a30ac80bab94faad644b39a9031d5632e517416a8ab0a6b0ac4df124ce", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}, {:statistex, "~> 1.0", [hex: :statistex, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "299cd10dd8ce51c9ea3ddb74bb150f93d25e968f93e4c1fa31698a8e4fa5d715"}, "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, diff --git a/test/bulk_create_test.exs b/test/bulk_create_test.exs index e48d0238..420ff219 100644 --- a/test/bulk_create_test.exs +++ b/test/bulk_create_test.exs @@ -176,6 +176,67 @@ defmodule AshPostgres.BulkCreateTest do end) end + test "bulk upsert returns skipped records with return_skipped_upsert?" do + assert [ + {:ok, %{title: "fredfoo", uniq_if_contains_foo: "1foo", price: 10}}, + {:ok, %{title: "georgefoo", uniq_if_contains_foo: "2foo", price: 20}}, + {:ok, %{title: "herbert", uniq_if_contains_foo: "3", price: 30}} + ] = + Ash.bulk_create!( + [ + %{title: "fredfoo", uniq_if_contains_foo: "1foo", price: 10}, + %{title: "georgefoo", uniq_if_contains_foo: "2foo", price: 20}, + %{title: "herbert", uniq_if_contains_foo: "3", price: 30} + ], + Post, + :create, + return_stream?: true, + return_records?: true + ) + |> Enum.sort_by(fn {:ok, result} -> result.title end) + + results = + Ash.bulk_create!( + [ + %{title: "fredfoo", uniq_if_contains_foo: "1foo", price: 10}, + %{title: "georgefoo", uniq_if_contains_foo: "2foo", price: 20_000}, + %{title: "herbert", uniq_if_contains_foo: "3", price: 30} + ], + Post, + :upsert_with_no_filter, + return_stream?: true, + upsert_condition: expr(price != upsert_conflict(:price)), + return_errors?: true, + return_records?: true, + return_skipped_upsert?: true + ) + |> Enum.sort_by(fn + {:ok, result} -> + result.title + + _ -> + nil + end) + + assert [ + {:ok, skipped}, + {:ok, updated}, + {:ok, no_conflict} + ] = results + + assert skipped.title == "fredfoo" + assert skipped.price == 10 + assert Ash.Resource.get_metadata(skipped, :upsert_skipped) == true + + assert updated.title == "georgefoo" + assert updated.price == 20_000 + refute Ash.Resource.get_metadata(updated, :upsert_skipped) + + assert no_conflict.title == "herbert" + assert no_conflict.price == 30 + refute Ash.Resource.get_metadata(no_conflict, :upsert_skipped) + end + # confirmed that this doesn't work because it can't. An upsert must map to a potentially successful insert. # leaving this test here for posterity # test "bulk creates can upsert with id" do