Skip to content
125 changes: 106 additions & 19 deletions lib/data_layer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ defmodule AshPostgres.DataLayer do
def can?(_, :combine), do: true
def can?(_, {:combine, _}), do: true
def can?(_, :bulk_create), do: true
def can?(_, :bulk_upsert_return_skipped), do: true

def can?(_, :action_select), do: true

Expand Down Expand Up @@ -2035,6 +2036,74 @@ defmodule AshPostgres.DataLayer do
repo.insert_all(source, ecto_changesets, opts)
end)

identity = options[:identity]
keys = Map.get(identity || %{}, :keys) || Ash.Resource.Info.primary_key(resource)

# if it's single the return_skipped_upsert? is handled at the
# call site https://github.com/ash-project/ash_postgres/blob/0b21d4a99cc3f6d8676947e291ac9b9d57ad6e2e/lib/data_layer.ex#L3046-L3046
result =
if options[:return_skipped_upsert?] && !opts[:single?] do
[changeset | _] = changesets

results_by_identity =
result
|> elem(1)
|> List.wrap()
|> Enum.into(%{}, fn r ->
{Map.take(r, keys), r}
end)

ash_query =
resource
|> Ash.Query.do_filter(
or:
changesets
|> Enum.filter(fn changeset ->
not Map.has_key?(
results_by_identity,
Map.take(changeset.attributes, keys)
)
end)
|> Enum.map(fn changeset ->
changeset.attributes
|> Map.take(keys)
|> Keyword.new()
end)
)
|> then(fn
query when is_nil(identity) or is_nil(identity.where) -> query
query -> Ash.Query.do_filter(query, identity.where)
end)
|> Ash.Query.set_tenant(changeset.tenant)

skipped_upserts =
with {:ok, ecto_query} <- Ash.Query.data_layer_query(ash_query),
{:ok, results} <- run_query(ecto_query, resource) do
results
|> Enum.map(fn result ->
Ash.Resource.put_metadata(result, :upsert_skipped, true)
end)
|> Enum.reduce(%{}, fn r, acc ->
Map.put(acc, Map.take(r, keys), r)
end)
end

results =
changesets
|> Enum.map(fn changeset ->
identity =
changeset.attributes
|> Map.take(keys)

Map.get(results_by_identity, identity, Map.get(skipped_upserts, identity))
end)
|> Enum.filter(& &1)

{length(results), results}
else
result
end

case result do
{_, nil} ->
:ok
Expand All @@ -2045,25 +2114,43 @@ defmodule AshPostgres.DataLayer do

{:ok, results}
else
{:ok,
Stream.zip_with(results, changesets, fn result, changeset ->
if !opts[:upsert?] do
maybe_create_tenant!(resource, result)
end

case get_bulk_operation_metadata(changeset) do
{index, metadata_key} ->
Ash.Resource.put_metadata(result, metadata_key, index)

nil ->
# Compatibility fallback
Ash.Resource.put_metadata(
result,
:bulk_create_index,
changeset.context[:bulk_create][:index]
)
end
end)}
results_by_identity =
results
|> Enum.into(%{}, fn r ->
{Map.take(r, keys), r}
end)

results =
changesets
|> Enum.map(fn changeset ->
identity =
changeset.attributes
|> Map.take(keys)

result_for_changeset = Map.get(results_by_identity, identity)

if result_for_changeset do
if !opts[:upsert?] do
maybe_create_tenant!(resource, result_for_changeset)
end

case get_bulk_operation_metadata(changeset) do
{index, metadata_key} ->
Ash.Resource.put_metadata(result_for_changeset, metadata_key, index)

nil ->
# Compatibility fallback
Ash.Resource.put_metadata(
result_for_changeset,
:bulk_create_index,
changeset.context[:bulk_create][:index]
)
end
end
end)
|> Enum.filter(& &1)

{:ok, results}
end
end
rescue
Expand Down
4 changes: 2 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,10 @@ defmodule AshPostgres.MixProject do
# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:ash, ash_version("~> 3.5 and >= 3.5.35")},
{:ash, ash_version("~> 3.5 and >= 3.6.2")},
{:spark, "~> 2.3 and >= 2.3.4"},
{:ash_sql, ash_sql_version("~> 0.3 and >= 0.3.2")},
{:igniter, "~> 0.6 and >= 0.6.14", optional: true},
{:igniter, "~> 0.6 and >= 0.6.29", optional: true},
{:ecto_sql, "~> 3.13"},
{:ecto, "~> 3.13"},
{:jason, "~> 1.0"},
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%{
"ash": {:hex, :ash, "3.5.43", "222f9a8ac26ad3b029f8e69306cc83091c992d858b4538af12e33a148f301cab", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.7", [hex: :ecto, repo: "hexpm", optional: false]}, {:ets, "~> 0.8", [hex: :ets, repo: "hexpm", optional: false]}, {:igniter, ">= 0.6.29 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:picosat_elixir, "~> 0.2", [hex: :picosat_elixir, repo: "hexpm", optional: true]}, {:plug, ">= 0.0.0", [hex: :plug, repo: "hexpm", optional: true]}, {:reactor, "~> 0.11", [hex: :reactor, repo: "hexpm", optional: false]}, {:simple_sat, ">= 0.1.1 and < 1.0.0-0", [hex: :simple_sat, repo: "hexpm", optional: true]}, {:spark, ">= 2.3.3 and < 3.0.0-0", [hex: :spark, repo: "hexpm", optional: false]}, {:splode, ">= 0.2.6 and < 1.0.0-0", [hex: :splode, repo: "hexpm", optional: false]}, {:stream_data, "~> 1.0", [hex: :stream_data, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "48b2aa274c524f5b968c563dd56aec8f9b278c529c8aa46e6fe0ca564c26cc1c"},
"ash": {:hex, :ash, "3.6.2", "90d1c8296be777b90caabf51b99323d6618a0b92594dfab92b02bdf848ac38bf", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.7", [hex: :ecto, repo: "hexpm", optional: false]}, {:ets, "~> 0.8", [hex: :ets, repo: "hexpm", optional: false]}, {:igniter, ">= 0.6.29 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:picosat_elixir, "~> 0.2", [hex: :picosat_elixir, repo: "hexpm", optional: true]}, {:plug, ">= 0.0.0", [hex: :plug, repo: "hexpm", optional: true]}, {:reactor, "~> 0.11", [hex: :reactor, repo: "hexpm", optional: false]}, {:simple_sat, ">= 0.1.1 and < 1.0.0-0", [hex: :simple_sat, repo: "hexpm", optional: true]}, {:spark, ">= 2.3.3 and < 3.0.0-0", [hex: :spark, repo: "hexpm", optional: false]}, {:splode, ">= 0.2.6 and < 1.0.0-0", [hex: :splode, repo: "hexpm", optional: false]}, {:stream_data, "~> 1.0", [hex: :stream_data, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3546b5798dd24576cc451f6e03f3d6e3bb62666c0921bfe8aae700c599d9c38d"},
"ash_sql": {:hex, :ash_sql, "0.3.2", "e2d65dac1c813cbd2569a750bf1c063109778e840052e44535ced294d7638a19", [:mix], [{:ash, ">= 3.5.43 and < 4.0.0-0", [hex: :ash, repo: "hexpm", optional: false]}, {:ecto, "~> 3.9", [hex: :ecto, repo: "hexpm", optional: false]}, {:ecto_sql, "~> 3.9", [hex: :ecto_sql, repo: "hexpm", optional: false]}], "hexpm", "1f6e5d827c0eb55fc5a07f58eb97f9bb3e6b290d83df75883f422537b98c9c68"},
"benchee": {:hex, :benchee, "1.4.0", "9f1f96a30ac80bab94faad644b39a9031d5632e517416a8ab0a6b0ac4df124ce", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}, {:statistex, "~> 1.0", [hex: :statistex, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "299cd10dd8ce51c9ea3ddb74bb150f93d25e968f93e4c1fa31698a8e4fa5d715"},
"bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"},
Expand Down
61 changes: 61 additions & 0 deletions test/bulk_create_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,67 @@ defmodule AshPostgres.BulkCreateTest do
end)
end

test "bulk upsert returns skipped records with return_skipped_upsert?" do
assert [
{:ok, %{title: "fredfoo", uniq_if_contains_foo: "1foo", price: 10}},
{:ok, %{title: "georgefoo", uniq_if_contains_foo: "2foo", price: 20}},
{:ok, %{title: "herbert", uniq_if_contains_foo: "3", price: 30}}
] =
Ash.bulk_create!(
[
%{title: "fredfoo", uniq_if_contains_foo: "1foo", price: 10},
%{title: "georgefoo", uniq_if_contains_foo: "2foo", price: 20},
%{title: "herbert", uniq_if_contains_foo: "3", price: 30}
],
Post,
:create,
return_stream?: true,
return_records?: true
)
|> Enum.sort_by(fn {:ok, result} -> result.title end)

results =
Ash.bulk_create!(
[
%{title: "fredfoo", uniq_if_contains_foo: "1foo", price: 10},
%{title: "georgefoo", uniq_if_contains_foo: "2foo", price: 20_000},
%{title: "herbert", uniq_if_contains_foo: "3", price: 30}
],
Post,
:upsert_with_no_filter,
return_stream?: true,
upsert_condition: expr(price != upsert_conflict(:price)),
return_errors?: true,
return_records?: true,
return_skipped_upsert?: true
)
|> Enum.sort_by(fn
{:ok, result} ->
result.title

_ ->
nil
end)

assert [
{:ok, skipped},
{:ok, updated},
{:ok, no_conflict}
] = results

assert skipped.title == "fredfoo"
assert skipped.price == 10
assert Ash.Resource.get_metadata(skipped, :upsert_skipped) == true

assert updated.title == "georgefoo"
assert updated.price == 20_000
refute Ash.Resource.get_metadata(updated, :upsert_skipped)

assert no_conflict.title == "herbert"
assert no_conflict.price == 30
refute Ash.Resource.get_metadata(no_conflict, :upsert_skipped)
end

# confirmed that this doesn't work because it can't. An upsert must map to a potentially successful insert.
# leaving this test here for posterity
# test "bulk creates can upsert with id" do
Expand Down
Loading