Skip to content

Commit af0903f

Browse files
authored
fix: return skipped upserts in bulk_create (#626)
1 parent efc4e38 commit af0903f

File tree

3 files changed

+169
-21
lines changed

3 files changed

+169
-21
lines changed

lib/data_layer.ex

Lines changed: 106 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,7 @@ defmodule AshPostgres.DataLayer do
662662
def can?(_, :combine), do: true
663663
def can?(_, {:combine, _}), do: true
664664
def can?(_, :bulk_create), do: true
665+
def can?(_, :bulk_upsert_return_skipped), do: true
665666

666667
def can?(_, :action_select), do: true
667668

@@ -2035,6 +2036,74 @@ defmodule AshPostgres.DataLayer do
20352036
repo.insert_all(source, ecto_changesets, opts)
20362037
end)
20372038

2039+
identity = options[:identity]
2040+
keys = Map.get(identity || %{}, :keys) || Ash.Resource.Info.primary_key(resource)
2041+
2042+
# if it's single the return_skipped_upsert? is handled at the
2043+
# call site https://github.com/ash-project/ash_postgres/blob/0b21d4a99cc3f6d8676947e291ac9b9d57ad6e2e/lib/data_layer.ex#L3046-L3046
2044+
result =
2045+
if options[:return_skipped_upsert?] && !opts[:single?] do
2046+
[changeset | _] = changesets
2047+
2048+
results_by_identity =
2049+
result
2050+
|> elem(1)
2051+
|> List.wrap()
2052+
|> Enum.into(%{}, fn r ->
2053+
{Map.take(r, keys), r}
2054+
end)
2055+
2056+
ash_query =
2057+
resource
2058+
|> Ash.Query.do_filter(
2059+
or:
2060+
changesets
2061+
|> Enum.filter(fn changeset ->
2062+
not Map.has_key?(
2063+
results_by_identity,
2064+
Map.take(changeset.attributes, keys)
2065+
)
2066+
end)
2067+
|> Enum.map(fn changeset ->
2068+
changeset.attributes
2069+
|> Map.take(keys)
2070+
|> Keyword.new()
2071+
end)
2072+
)
2073+
|> then(fn
2074+
query when is_nil(identity) or is_nil(identity.where) -> query
2075+
query -> Ash.Query.do_filter(query, identity.where)
2076+
end)
2077+
|> Ash.Query.set_tenant(changeset.tenant)
2078+
2079+
skipped_upserts =
2080+
with {:ok, ecto_query} <- Ash.Query.data_layer_query(ash_query),
2081+
{:ok, results} <- run_query(ecto_query, resource) do
2082+
results
2083+
|> Enum.map(fn result ->
2084+
Ash.Resource.put_metadata(result, :upsert_skipped, true)
2085+
end)
2086+
|> Enum.reduce(%{}, fn r, acc ->
2087+
Map.put(acc, Map.take(r, keys), r)
2088+
end)
2089+
end
2090+
2091+
results =
2092+
changesets
2093+
|> Enum.map(fn changeset ->
2094+
identity =
2095+
changeset.attributes
2096+
|> Map.take(keys)
2097+
2098+
Map.get(results_by_identity, identity, Map.get(skipped_upserts, identity))
2099+
end)
2100+
|> Enum.filter(& &1)
2101+
2102+
{length(results), results}
2103+
else
2104+
result
2105+
end
2106+
20382107
case result do
20392108
{_, nil} ->
20402109
:ok
@@ -2045,25 +2114,43 @@ defmodule AshPostgres.DataLayer do
20452114

20462115
{:ok, results}
20472116
else
2048-
{:ok,
2049-
Stream.zip_with(results, changesets, fn result, changeset ->
2050-
if !opts[:upsert?] do
2051-
maybe_create_tenant!(resource, result)
2052-
end
2053-
2054-
case get_bulk_operation_metadata(changeset) do
2055-
{index, metadata_key} ->
2056-
Ash.Resource.put_metadata(result, metadata_key, index)
2057-
2058-
nil ->
2059-
# Compatibility fallback
2060-
Ash.Resource.put_metadata(
2061-
result,
2062-
:bulk_create_index,
2063-
changeset.context[:bulk_create][:index]
2064-
)
2065-
end
2066-
end)}
2117+
results_by_identity =
2118+
results
2119+
|> Enum.into(%{}, fn r ->
2120+
{Map.take(r, keys), r}
2121+
end)
2122+
2123+
results =
2124+
changesets
2125+
|> Enum.map(fn changeset ->
2126+
identity =
2127+
changeset.attributes
2128+
|> Map.take(keys)
2129+
2130+
result_for_changeset = Map.get(results_by_identity, identity)
2131+
2132+
if result_for_changeset do
2133+
if !opts[:upsert?] do
2134+
maybe_create_tenant!(resource, result_for_changeset)
2135+
end
2136+
2137+
case get_bulk_operation_metadata(changeset) do
2138+
{index, metadata_key} ->
2139+
Ash.Resource.put_metadata(result_for_changeset, metadata_key, index)
2140+
2141+
nil ->
2142+
# Compatibility fallback
2143+
Ash.Resource.put_metadata(
2144+
result_for_changeset,
2145+
:bulk_create_index,
2146+
changeset.context[:bulk_create][:index]
2147+
)
2148+
end
2149+
end
2150+
end)
2151+
|> Enum.filter(& &1)
2152+
2153+
{:ok, results}
20672154
end
20682155
end
20692156
rescue

mix.exs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,10 +177,10 @@ defmodule AshPostgres.MixProject do
177177
# Run "mix help deps" to learn about dependencies.
178178
defp deps do
179179
[
180-
{:ash, ash_version("~> 3.5 and >= 3.5.35")},
180+
{:ash, ash_version("~> 3.5 and >= 3.6.2")},
181181
{:spark, "~> 2.3 and >= 2.3.4"},
182182
{:ash_sql, ash_sql_version("~> 0.3 and >= 0.3.2")},
183-
{:igniter, "~> 0.6 and >= 0.6.14", optional: true},
183+
{:igniter, "~> 0.6 and >= 0.6.29", optional: true},
184184
{:ecto_sql, "~> 3.13"},
185185
{:ecto, "~> 3.13"},
186186
{:jason, "~> 1.0"},

test/bulk_create_test.exs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,67 @@ defmodule AshPostgres.BulkCreateTest do
176176
end)
177177
end
178178

179+
test "bulk upsert returns skipped records with return_skipped_upsert?" do
180+
assert [
181+
{:ok, %{title: "fredfoo", uniq_if_contains_foo: "1foo", price: 10}},
182+
{:ok, %{title: "georgefoo", uniq_if_contains_foo: "2foo", price: 20}},
183+
{:ok, %{title: "herbert", uniq_if_contains_foo: "3", price: 30}}
184+
] =
185+
Ash.bulk_create!(
186+
[
187+
%{title: "fredfoo", uniq_if_contains_foo: "1foo", price: 10},
188+
%{title: "georgefoo", uniq_if_contains_foo: "2foo", price: 20},
189+
%{title: "herbert", uniq_if_contains_foo: "3", price: 30}
190+
],
191+
Post,
192+
:create,
193+
return_stream?: true,
194+
return_records?: true
195+
)
196+
|> Enum.sort_by(fn {:ok, result} -> result.title end)
197+
198+
results =
199+
Ash.bulk_create!(
200+
[
201+
%{title: "fredfoo", uniq_if_contains_foo: "1foo", price: 10},
202+
%{title: "georgefoo", uniq_if_contains_foo: "2foo", price: 20_000},
203+
%{title: "herbert", uniq_if_contains_foo: "3", price: 30}
204+
],
205+
Post,
206+
:upsert_with_no_filter,
207+
return_stream?: true,
208+
upsert_condition: expr(price != upsert_conflict(:price)),
209+
return_errors?: true,
210+
return_records?: true,
211+
return_skipped_upsert?: true
212+
)
213+
|> Enum.sort_by(fn
214+
{:ok, result} ->
215+
result.title
216+
217+
_ ->
218+
nil
219+
end)
220+
221+
assert [
222+
{:ok, skipped},
223+
{:ok, updated},
224+
{:ok, no_conflict}
225+
] = results
226+
227+
assert skipped.title == "fredfoo"
228+
assert skipped.price == 10
229+
assert Ash.Resource.get_metadata(skipped, :upsert_skipped) == true
230+
231+
assert updated.title == "georgefoo"
232+
assert updated.price == 20_000
233+
refute Ash.Resource.get_metadata(updated, :upsert_skipped)
234+
235+
assert no_conflict.title == "herbert"
236+
assert no_conflict.price == 30
237+
refute Ash.Resource.get_metadata(no_conflict, :upsert_skipped)
238+
end
239+
179240
# confirmed that this doesn't work because it can't. An upsert must map to a potentially successful insert.
180241
# leaving this test here for posterity
181242
# test "bulk creates can upsert with id" do

0 commit comments

Comments
 (0)