Skip to content

Commit a6148c1

Browse files
committed
return skipped upserts in bulk_create
1 parent 14581be commit a6148c1

File tree

4 files changed

+129
-2
lines changed

4 files changed

+129
-2
lines changed

lib/data_layer.ex

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2029,6 +2029,66 @@ defmodule AshPostgres.DataLayer do
20292029
repo.insert_all(source, ecto_changesets, opts)
20302030
end)
20312031

2032+
result =
2033+
if options[:return_skipped_upsert?] do
2034+
identity = options[:identity]
2035+
[changeset | _] = changesets
2036+
2037+
results =
2038+
result
2039+
|> elem(1)
2040+
|> List.wrap()
2041+
|> Enum.reduce(%{}, fn r, acc ->
2042+
Map.put(acc, Map.take(r, identity.keys), r)
2043+
end)
2044+
2045+
ash_query =
2046+
resource
2047+
|> Ash.Query.do_filter(
2048+
or:
2049+
changesets
2050+
|> Enum.filter(fn changeset ->
2051+
not Map.has_key?(results, Map.take(changeset.attributes, identity.keys))
2052+
end)
2053+
|> Enum.map(fn changeset ->
2054+
changeset.attributes
2055+
|> Map.take(identity.keys)
2056+
|> Keyword.new()
2057+
end)
2058+
)
2059+
|> then(fn
2060+
query when is_nil(identity) or is_nil(identity.where) -> query
2061+
query -> Ash.Query.do_filter(query, identity.where)
2062+
end)
2063+
|> Ash.Query.set_tenant(changeset.tenant)
2064+
2065+
skipped_upserts =
2066+
with {:ok, ecto_query} <- Ash.Query.data_layer_query(ash_query),
2067+
{:ok, results} <- run_query(ecto_query, resource) do
2068+
results
2069+
|> Enum.map(fn result ->
2070+
Ash.Resource.put_metadata(result, :upsert_skipped, true)
2071+
end)
2072+
|> Enum.reduce(%{}, fn r, acc ->
2073+
Map.put(acc, Map.take(r, identity.keys), r)
2074+
end)
2075+
end
2076+
2077+
results =
2078+
changesets
2079+
|> Enum.map(fn changeset ->
2080+
identity =
2081+
changeset.attributes
2082+
|> Map.take(identity.keys)
2083+
2084+
Map.get(results, identity, Map.get(skipped_upserts, identity))
2085+
end)
2086+
2087+
{length(results), results}
2088+
else
2089+
result
2090+
end
2091+
20322092
case result do
20332093
{_, nil} ->
20342094
:ok
@@ -2039,6 +2099,7 @@ defmodule AshPostgres.DataLayer do
20392099

20402100
{:ok, results}
20412101
else
2102+
# TODO: what if there are less results than changesets because of upsert conditions?
20422103
{:ok,
20432104
Stream.zip_with(results, changesets, fn result, changeset ->
20442105
if !opts[:upsert?] do

mix.exs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ defmodule AshPostgres.MixProject do
169169
{:ash, ash_version("~> 3.5 and >= 3.5.35")},
170170
{:spark, "~> 2.3 and >= 2.3.4"},
171171
{:ash_sql, ash_sql_version("~> 0.2 and >= 0.2.90")},
172-
{:igniter, "~> 0.6 and >= 0.6.14", optional: true},
172+
{:igniter, "~> 0.6 and >= 0.6.29", optional: true},
173173
{:ecto_sql, "~> 3.13"},
174174
{:ecto, "~> 3.13"},
175175
{:jason, "~> 1.0"},
@@ -186,7 +186,7 @@ defmodule AshPostgres.MixProject do
186186
{:credo, ">= 0.0.0", only: [:dev, :test], runtime: false},
187187
{:mix_audit, ">= 0.0.0", only: [:dev, :test], runtime: false},
188188
{:dialyxir, ">= 0.0.0", only: [:dev, :test], runtime: false},
189-
{:sobelow, ">= 0.0.0", only: [:dev, :test], runtime: false}
189+
{:mix_test_watch, "~> 1.0", only: [:dev, :test]}
190190
]
191191
end
192192

mix.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
"mime": {:hex, :mime, "2.0.7", "b8d739037be7cd402aee1ba0306edfdef982687ee7e9859bee6198c1e7e2f128", [:mix], [], "hexpm", "6171188e399ee16023ffc5b76ce445eb6d9672e2e241d2df6050f3c771e80ccd"},
3434
"mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"},
3535
"mix_audit": {:hex, :mix_audit, "2.1.5", "c0f77cee6b4ef9d97e37772359a187a166c7a1e0e08b50edf5bf6959dfe5a016", [:make, :mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:yaml_elixir, "~> 2.11", [hex: :yaml_elixir, repo: "hexpm", optional: false]}], "hexpm", "87f9298e21da32f697af535475860dc1d3617a010e0b418d2ec6142bc8b42d69"},
36+
"mix_test_watch": {:hex, :mix_test_watch, "1.3.0", "2ffc9f72b0d1f4ecf0ce97b044e0e3c607c3b4dc21d6228365e8bc7c2856dc77", [:mix], [{:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "f9e5edca976857ffac78632e635750d158df14ee2d6185a15013844af7570ffe"},
3637
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
3738
"nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"},
3839
"nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},

test/bulk_create_test.exs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,71 @@ defmodule AshPostgres.BulkCreateTest do
171171
end)
172172
end
173173

174+
@tag :focus
175+
test "bulk upsert returns skipped records with return_skipped_upsert?" do
176+
assert [
177+
{:ok, %{title: "fredfoo", uniq_if_contains_foo: "1foo", price: 10}},
178+
{:ok, %{title: "georgefoo", uniq_if_contains_foo: "2foo", price: 20}},
179+
{:ok, %{title: "herbert", uniq_if_contains_foo: "3", price: 30}}
180+
] =
181+
Ash.bulk_create!(
182+
[
183+
%{title: "fredfoo", uniq_if_contains_foo: "1foo", price: 10},
184+
%{title: "georgefoo", uniq_if_contains_foo: "2foo", price: 20},
185+
%{title: "herbert", uniq_if_contains_foo: "3", price: 30}
186+
],
187+
Post,
188+
:create,
189+
return_stream?: true,
190+
return_records?: true
191+
)
192+
|> Enum.sort_by(fn {:ok, result} -> result.title end)
193+
194+
results =
195+
Ash.bulk_create!(
196+
[
197+
%{title: "fredfoo", uniq_if_contains_foo: "1foo", price: 10},
198+
%{title: "georgefoo", uniq_if_contains_foo: "2foo", price: 20_000},
199+
%{title: "herbert", uniq_if_contains_foo: "3", price: 30}
200+
],
201+
Post,
202+
:upsert_with_no_filter,
203+
return_stream?: true,
204+
upsert_condition: expr(price != upsert_conflict(:price)),
205+
return_errors?: true,
206+
return_records?: true,
207+
return_skipped_upsert?: true
208+
)
209+
|> Enum.sort_by(fn
210+
{:ok, result} ->
211+
result.title
212+
213+
_ ->
214+
nil
215+
end)
216+
217+
assert [
218+
{:ok, skipped},
219+
{:ok, updated},
220+
{:ok, no_conflict}
221+
] = results
222+
223+
# "fredfoo" was skipped because price matches (10 == 10)
224+
assert skipped.title == "fredfoo"
225+
assert skipped.price == 10
226+
assert Ash.Resource.get_metadata(skipped, :upsert_skipped) == true
227+
228+
# "georgefoo" was updated because price differs (20 -> 20_000)
229+
assert updated.title == "georgefoo"
230+
assert updated.price == 20_000
231+
refute Ash.Resource.get_metadata(updated, :upsert_skipped)
232+
233+
# "herbert" had no conflict (doesn't match identity)
234+
assert no_conflict.title == "herbert"
235+
assert no_conflict.price == 30
236+
refute Ash.Resource.get_metadata(no_conflict, :upsert_skipped)
237+
end
238+
174239
# confirmed that this doesn't work because it can't. An upsert must map to a potentially successful insert.
175240
# leaving this test here for posterity
176241
# test "bulk creates can upsert with id" do

0 commit comments

Comments
 (0)