Skip to content

Commit 56df49d

Browse files
committed
Update DBConnection/Postgrex and introduce start_consumer
1 parent 2879f85 commit 56df49d

File tree

8 files changed

+153
-4
lines changed

8 files changed

+153
-4
lines changed

integration_test/sql/stage.exs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,20 @@ defmodule Ecto.Integration.StageTest do
5757
assert [] = p3.comments
5858
end
5959

60+
test "insert entries" do
61+
{:ok, stage} = TestRepo.start_consumer(Post)
62+
63+
mon = Process.monitor(stage)
64+
65+
[[title: "hello"], [title: "world"]]
66+
|> Flow.from_enumerable()
67+
|> Flow.into_stages([{stage, [cancel: :transient, max_demand: 1]}])
68+
69+
assert_receive {:DOWN, ^mon, _, _, :normal}
70+
71+
assert [%Post{title: "hello"}, %Post{title: "world"}] = TestRepo.all(Post)
72+
end
73+
6074
defp to_list(stage) do
6175
stage
6276
|> Flow.from_stage()

lib/ecto/adapter/stage.ex

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
defmodule Ecto.Adapter.Stage do
2+
@moduledoc """
3+
Specifies the adapter `GenStage` API.
4+
"""
5+
6+
@type flat_map :: ({integer, list | nil} -> list)
7+
@type insert_all :: (list -> integer)
8+
@type options :: Keyword.t
9+
10+
@doc """
11+
Starts and links to a `GenStage` producer.
12+
13+
The producer executes a query and returns the result to consumer(s).
14+
15+
See `Ecto.Repo.start_producer/2`.
16+
"""
17+
@callback start_producer(repo :: Ecto.Repo.t, Ecto.Adapter.query_meta, query, params :: list(), Ecto.Adapter.process | nil, flat_map, options) ::
18+
GenServer.on_start when
19+
query: {:nocache, Ecto.Adapter.prepared} |
20+
{:cached, (Ecto.Adapter.prepared -> :ok), Ecto.Adapter.cached} |
21+
{:cache, (Ecto.Adapter.cached -> :ok), Ecto.Adapter.prepared}
22+
23+
@doc """
24+
Starts and links to a `GenStage` consumer.
25+
26+
The consumers inserts the entries it receives for the schema.
27+
28+
See `Ecto.Repo.start_consumer/2`
29+
"""
30+
@callback start_consumer(repo :: Ecto.Repo.t, insert_all, options) ::
31+
GenServer.on_start
32+
end

lib/ecto/adapters/postgres/connection.ex

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ if Code.ensure_loaded?(Postgrex) do
2222
Postgrex.Producer.start_link(pool, statement, params, opts)
2323
end
2424

25+
def start_consumer(pool, fun, opts) do
26+
Postgrex.Consumer.start_link(pool, fun, opts)
27+
end
28+
2529
def to_constraints(%Postgrex.Error{postgres: %{code: :unique_violation, constraint: constraint}}),
2630
do: [unique: constraint]
2731
def to_constraints(%Postgrex.Error{postgres: %{code: :foreign_key_violation, constraint: constraint}}),

lib/ecto/adapters/sql.ex

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,14 @@ defmodule Ecto.Adapters.SQL do
144144
Ecto.Adapters.SQL.start_producer(repo, meta, query, params, process, flat_map, opts)
145145
end
146146

147+
@doc false
148+
def start_consumer(repo, insert, opts) do
149+
Ecto.Adapters.SQL.start_consumer(repo, insert, opts)
150+
end
151+
147152
defoverridable [prepare: 2, execute: 6, insert: 6, update: 6, delete: 4, insert_all: 7,
148153
execute_ddl: 3, loaders: 2, dumpers: 2, autogenerate: 1, ensure_all_started: 2,
149-
start_producer: 7]
154+
start_producer: 7, start_consumer: 3]
150155
end
151156
end
152157

@@ -652,7 +657,7 @@ defmodule Ecto.Adapters.SQL do
652657
@doc """
653658
Start link a `GenStage` producer that streams the result of a query.
654659
"""
655-
def start_producer(repo, statement, params, opts) do
660+
def start_producer(repo, statement, params, opts \\ []) do
656661
start_producer(repo, statement, params, fn x -> x end, nil, opts)
657662
end
658663

@@ -681,6 +686,25 @@ defmodule Ecto.Adapters.SQL do
681686
end
682687
end
683688

689+
@doc """
690+
Start link a `GenStage` consumers that runs a `transaction` for every batch of
691+
events and maintains a state.
692+
"""
693+
def start_consumer(repo, fun, opts \\ []) when is_function(fun, 1) do
694+
{repo_mod, pool, default_opts} = lookup_pool(repo)
695+
696+
transaction =
697+
fn(conn, rows) ->
698+
put_conn(pool, conn)
699+
fun.(rows)
700+
end
701+
702+
stage_opts = Keyword.delete(default_opts, :name)
703+
opts = with_log(repo, [], opts ++ stage_opts)
704+
opts = Keyword.put_new(opts, :caller, self())
705+
repo_mod.__sql__.start_consumer(pool, transaction, opts)
706+
end
707+
684708
## Log
685709

686710
defp with_log(repo, params, opts) do

lib/ecto/adapters/sql/connection.ex

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ defmodule Ecto.Adapters.SQL.Connection do
99
@typedoc "The cache query which is a DBConnection Query"
1010
@type cached :: map
1111

12+
@typedoc "The DBConection transaction fun"
13+
@type transaction :: (DBConnection.t, list -> term)
14+
1215
@doc """
1316
Receives options and returns `DBConnection` supervisor child
1417
specification.
@@ -102,4 +105,10 @@ defmodule Ecto.Adapters.SQL.Connection do
102105
"""
103106
@callback start_producer(pool :: GenServer.server, statement :: prepared, params :: [term], options :: Keyword.t) ::
104107
GenServer.on_start
108+
109+
@doc """
110+
Start and link a `GenStage` consumer that runs a transaction for the events.
111+
"""
112+
@callback start_consumer(pool :: GenServer.server, transaction, options :: Keyword.t) ::
113+
GenServer.on_start
105114
end

lib/ecto/repo.ex

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,10 @@ defmodule Ecto.Repo do
234234
def start_producer(queryable, opts \\ []) do
235235
Ecto.Repo.Queryable.start_producer(__MODULE__, @adapter, queryable, opts)
236236
end
237+
238+
def start_consumer(schema_or_source, opts \\ []) do
239+
Ecto.Repo.Schema.start_consumer(__MODULE__, @adapter, schema_or_source, opts)
240+
end
237241
end
238242
end
239243
end
@@ -1051,4 +1055,27 @@ defmodule Ecto.Repo do
10511055
@callback start_producer(queryable :: Ecto.Query.t, opts :: Keyword.t) ::
10521056
GenServer.on_start
10531057
@optional_callbacks [start_producer: 2]
1058+
1059+
@doc """
1060+
Starts and links to a `GenStage` consumer that inserts all entries it
1061+
consumes.
1062+
1063+
Returns a `GenStage` consumer that uses `c:insert_all/3` to insert all
1064+
consumed entries in batches to the database. SQL adapters, such as Postgres
1065+
and MySQL, will wrap all the inserts inside a single transaction.
1066+
1067+
See `c:insert_all/3` for options.
1068+
1069+
## Examples
1070+
1071+
{:ok, consumer} = MyRepo.start_consumer(Post)
1072+
"posts"
1073+
|> File.stream!()
1074+
|> Flow.from_enumerable(stages: 1)
1075+
|> Flow.map(&MyDecoder.decode/1)
1076+
|> Flow.into_stages([{consumer, cancel: :transient}])
1077+
"""
1078+
@callback start_consumer(schema_or_source :: binary | {binary, Ecto.Schema.t} | Ecto.Schema.t, opts :: Keyword.t) ::
1079+
GenServer.on_start
1080+
@optional_callbacks [start_consumer: 2]
10541081
end

lib/ecto/repo/schema.ex

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,45 @@ defmodule Ecto.Repo.Schema do
5050
{count, postprocess(rows, fields, adapter, schema, source)}
5151
end
5252

53+
@doc """
54+
Implementation for `Ecto.Repo.start_consumer/2`.
55+
"""
56+
def start_consumer(repo, adapter, schema, opts) when is_atom(schema) do
57+
start_consumer(repo, adapter, schema, schema.__schema__(:prefix),
58+
schema.__schema__(:source), opts)
59+
end
60+
61+
def start_consumer(repo, adapter, table, opts) when is_binary(table) do
62+
start_consumer(repo, adapter, nil, nil, table, opts)
63+
end
64+
65+
def start_consumer(repo, adapter, {source, schema}, opts) when is_atom(schema) do
66+
start_consumer(repo, adapter, schema, schema.__schema__(:prefix), source, opts)
67+
end
68+
69+
defp start_consumer(repo, adapter, schema, prefix, source, opts) do
70+
returning = opts[:returning] || false
71+
autogen = schema && schema.__schema__(:autogenerate_id)
72+
source = {Keyword.get(opts, :prefix, prefix), source}
73+
fields = preprocess(returning, schema)
74+
75+
insert_all = fn(rows) ->
76+
{rows, header} = extract_header_and_fields(rows, schema, autogen, adapter)
77+
counter = fn -> Enum.reduce(rows, 0, &length(&1) + &2) end
78+
metadata = %{source: source, context: nil, schema: schema, autogenerate_id: autogen}
79+
80+
{on_conflict, opts} = Keyword.pop(opts, :on_conflict, :raise)
81+
{conflict_target, opts} = Keyword.pop(opts, :conflict_target, [])
82+
on_conflict = on_conflict(on_conflict, conflict_target, metadata, counter, adapter)
83+
84+
{count, _} =
85+
adapter.insert_all(repo, metadata, Map.keys(header), rows, on_conflict, fields || [], opts)
86+
count
87+
end
88+
89+
adapter.start_consumer(repo, insert_all, opts)
90+
end
91+
5392
defp preprocess([_|_] = fields, _schema),
5493
do: fields
5594
defp preprocess([], _schema),

mix.lock

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
%{"backoff": {:hex, :backoff, "1.1.1"},
22
"connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [], [], "hexpm"},
3-
"db_connection": {:git, "https://github.com/elixir-ecto/db_connection.git", "9c6e4945112554bf40c240244c033f6dcc9c4a13", [branch: "jf-stream_stage"]},
3+
"db_connection": {:git, "https://github.com/elixir-ecto/db_connection.git", "845b46c234a3723bc4c1af1059a86bc9e9d85af0", [branch: "jf-stream_stage"]},
44
"decimal": {:hex, :decimal, "1.3.1", "157b3cedb2bfcb5359372a7766dd7a41091ad34578296e951f58a946fcab49c6", [:mix], [], "hexpm"},
55
"earmark": {:hex, :earmark, "1.0.3", "89bdbaf2aca8bbb5c97d8b3b55c5dd0cff517ecc78d417e87f1d0982e514557b", [:mix], [], "hexpm"},
66
"ex_doc": {:hex, :ex_doc, "0.14.5", "c0433c8117e948404d93ca69411dd575ec6be39b47802e81ca8d91017a0cf83c", [:mix], [{:earmark, "~> 1.0", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"},
@@ -10,5 +10,5 @@
1010
"mariaex": {:hex, :mariaex, "0.8.2", "a9ee64a02fd72579f844934b4cbecca9566593e499125edf6032c58f9d50b5f9", [:mix], [{:db_connection, "~> 1.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: false]}], "hexpm"},
1111
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"},
1212
"poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [], [], "hexpm"},
13-
"postgrex": {:git, "https://github.com/elixir-ecto/postgrex.git", "f9f2ae1ca374d462ff86622951fbd216f28d74f0", [branch: "jf-stage"]},
13+
"postgrex": {:git, "https://github.com/elixir-ecto/postgrex.git", "b825546e8954f9623e256f2423483bf42a2c6596", [branch: "jf-stage"]},
1414
"sbroker": {:hex, :sbroker, "1.0.0", "28ff1b5e58887c5098539f236307b36fe1d3edaa2acff9d6a3d17c2dcafebbd0", [:rebar3], [], "hexpm"}}

0 commit comments

Comments
 (0)