Skip to content

Commit 018cc65

Browse files
committed
Introduce Ecto.Adapters.SQL.Stage
1 parent a91d7da commit 018cc65

File tree

6 files changed

+266
-2
lines changed

6 files changed

+266
-2
lines changed

integration_test/pg/all_test.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ Code.require_file "../sql/lock.exs", __DIR__
33
Code.require_file "../sql/migration.exs", __DIR__
44
Code.require_file "../sql/sandbox.exs", __DIR__
55
Code.require_file "../sql/sql.exs", __DIR__
6+
Code.require_file "../sql/stage.exs", __DIR__
67
Code.require_file "../sql/stream.exs", __DIR__
78
Code.require_file "../sql/subquery.exs", __DIR__
89
Code.require_file "../sql/transaction.exs", __DIR__

integration_test/sql/stage.exs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
defmodule Ecto.Integration.StageTest do
2+
use Ecto.Integration.Case, async: true
3+
4+
alias Ecto.Integration.TestRepo
5+
alias Ecto.Integration.Post
6+
alias Ecto.Integration.Comment
7+
alias Ecto.Adapters.SQL.Stage
8+
import Ecto.Query
9+
10+
test "stream empty" do
11+
{:ok, stage} = Stage.stream(TestRepo, Post)
12+
assert to_list(stage) === []
13+
14+
{:ok, stage} = Stage.stream(TestRepo, from p in Post)
15+
assert to_list(stage) == []
16+
end
17+
18+
test "stream without schema" do
19+
%Post{} = TestRepo.insert!(%Post{title: "title1"})
20+
%Post{} = TestRepo.insert!(%Post{title: "title2"})
21+
22+
query = from(p in "posts", order_by: p.title, select: p.title)
23+
{:ok, stage} = Stage.stream(TestRepo, query)
24+
25+
assert to_list(stage) == ["title1", "title2"]
26+
end
27+
28+
test "stream with assoc" do
29+
p1 = TestRepo.insert!(%Post{title: "1"})
30+
31+
%Comment{id: cid1} = TestRepo.insert!(%Comment{text: "1", post_id: p1.id})
32+
%Comment{id: cid2} = TestRepo.insert!(%Comment{text: "2", post_id: p1.id})
33+
34+
{:ok, stage} = Stage.stream(TestRepo, Ecto.assoc(p1, :comments))
35+
assert [c1, c2] = to_list(stage)
36+
37+
assert c1.id == cid1
38+
assert c2.id == cid2
39+
end
40+
41+
test "stream with preload" do
42+
p1 = TestRepo.insert!(%Post{title: "1"})
43+
p2 = TestRepo.insert!(%Post{title: "2"})
44+
TestRepo.insert!(%Post{title: "3"})
45+
46+
%Comment{id: cid1} = TestRepo.insert!(%Comment{text: "1", post_id: p1.id})
47+
%Comment{id: cid2} = TestRepo.insert!(%Comment{text: "2", post_id: p1.id})
48+
%Comment{id: cid3} = TestRepo.insert!(%Comment{text: "3", post_id: p2.id})
49+
%Comment{id: cid4} = TestRepo.insert!(%Comment{text: "4", post_id: p2.id})
50+
51+
query = from(p in Post, preload: [:comments], select: p)
52+
{:ok, stage} = Stage.stream(TestRepo, query, [max_rows: 2])
53+
54+
assert [p1, p2, p3] = stage |> to_list() |> sort_by_id()
55+
56+
assert [%Comment{id: ^cid1}, %Comment{id: ^cid2}] = p1.comments |> sort_by_id
57+
assert [%Comment{id: ^cid3}, %Comment{id: ^cid4}] = p2.comments |> sort_by_id
58+
assert [] = p3.comments
59+
end
60+
61+
defp to_list(stage) do
62+
stage
63+
|> Flow.from_stage()
64+
|> Enum.to_list()
65+
end
66+
67+
defp sort_by_id(values) do
68+
Enum.sort_by(values, &(&1.id))
69+
end
70+
end

lib/ecto/adapters/sql.ex

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,31 @@ defmodule Ecto.Adapters.SQL do
627627
end
628628
end
629629

630+
@doc false
631+
def stage(repo, type, start, handle, stop, opts \\ []) do
632+
{repo_mod, pool, default_opts} = lookup_pool(repo)
633+
default_opts =
634+
default_opts
635+
|> Keyword.delete(:name)
636+
|> Keyword.put_new(:caller, self())
637+
opts = with_log(repo_mod, [], opts ++ default_opts)
638+
start =
639+
fn(conn) ->
640+
put_conn(pool, conn)
641+
start.()
642+
end
643+
handle = fn(_, arg, state) -> handle.(arg, state) end
644+
stop =
645+
fn(_, reason, state) ->
646+
try do
647+
stop.(reason, state)
648+
after
649+
delete_conn(pool)
650+
end
651+
end
652+
DBConnection.Stage.start_link(pool, type, start, handle, stop, opts)
653+
end
654+
630655
## Log
631656

632657
defp with_log(repo, params, opts) do

lib/ecto/adapters/sql/stage.ex

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
defmodule Ecto.Adapters.SQL.Stage do
2+
@moduledoc """
3+
A `GenStage` process that encapsulates a SQL transaction.
4+
"""
5+
6+
@doc """
7+
Start link a `GenStage` process that will run a transaction for its duration.
8+
9+
The first argument is the pool, the second argument is the `GenStage` type,
10+
the third argument is the start function, the fourth argument is the handle
11+
function, the fifth argument is the stop function and the optional sixth
12+
argument are the options.
13+
14+
The start function is a o-arity anonymous function. This is called after the
15+
transaction begins but before `start_link/6` returns. It should return the
16+
`state` or call `MyRepo.rollback/1` to stop the `GenStage`.
17+
18+
The handle function is a 2-arity anonymous function. If the `GenStage` type is
19+
a `:producer`, then the first argument is the `demand` from a `GenStage`
20+
`handle_demand` callback. Otherwise the first argument is the events from a
21+
`GenStage` `handle_events` callback. The second argument is the state. This
22+
function returns a 2-tuple, with first element as events (empty list for
23+
`:consumer`) and second element as the `state`. This function can roll back
24+
and stop the `GenStage` using `MyRepo.rollback/1`.
25+
26+
The stop function is a 2-arity anonymous function. The first argument is the
27+
terminate reason and the second argument is the `state`. This function will
28+
only be called if connection is alive and the transaction has not been rolled
29+
back. If this function returns the transaction is committed. This function can
30+
roll back and stop the `GenStage` using `MyRepo.rollback/1`.
31+
32+
The `GenStage` process will behave like a `Flow` stage:
33+
34+
* It will stop with reason `:normal` when the last consumer cancels
35+
* It will notify consumers that it is done when all producers have cancelled
36+
or notified that they are done or halted
37+
* It will cancel new and remaining producers when all producers have
38+
notified that they are done or halted and it is a `:consumer`
39+
* It will not send demand to new producers when all producers have notified
40+
that they are done or halted and it is a `:consumer_producer`
41+
42+
### Options
43+
44+
* `:name` - A name to register the started process (see the `:name` option
45+
in `GenServer.start_link/3`)
46+
47+
See the "Shared options" section at the `Ecto.Repo` documentation. All options
48+
are passed to the `GenStage` on init.
49+
50+
### Example
51+
52+
start = fn() -> Post end
53+
handle =
54+
fn(entries, schema) ->
55+
MyRepo.insert_all(schema, entries)
56+
{[], schema}
57+
end
58+
stop =
59+
fn
60+
:normal, _ -> :ok
61+
reason, _ -> MyRepo.rollback(reason)
62+
end
63+
Ecto.Adapters.SQL.Stage.start_link(MyRepo, :consumer, start, handle, stop)
64+
"""
65+
@spec start_link(repo :: module, :producer,
66+
start :: (() -> state),
67+
handle_demand :: ((demand :: pos_integer, state) -> {[any], state}),
68+
stop :: ((reason :: any, state) -> any), opts :: Keyword.t) ::
69+
GenServer.on_start when state: var
70+
@spec start_link(repo :: module, :producer_consumer,
71+
start :: (() -> state),
72+
handle_events :: (([any], state) -> {[any], state}),
73+
stop :: ((reason :: any, state) -> any), opts :: Keyword.t) ::
74+
GenServer.on_start when state: var
75+
@spec start_link(repo :: module, :consumer,
76+
start :: (() -> state),
77+
handle_events :: (([any], state) -> {[], state}),
78+
stop :: ((reason :: any, state) -> any), opts :: Keyword.t) ::
79+
GenServer.on_start when state: var
80+
def start_link(repo, type, start, handle, stop, opts \\ []) do
81+
Ecto.Adapters.SQL.stage(repo, type, start, handle, stop, opts)
82+
end
83+
84+
@doc """
85+
Starts a `GenStage` producer that emits all entries from the data store
86+
matching the given query. SQL adapters, such as Postgres and MySQL, will use
87+
a separate transaction to enumerate the stream.
88+
89+
May raise `Ecto.QueryError` if query validation fails.
90+
91+
## Options
92+
93+
* `:prefix` - The prefix to run the query on (such as the schema path
94+
in Postgres or the database in MySQL). This overrides the prefix set
95+
in the query
96+
97+
* `:max_rows` - The number of rows to load from the database as we stream.
98+
It is supported at least by Postgres and MySQL and defaults to 500.
99+
100+
See the "Shared options" section at the `Ecto.Repo` documentation.
101+
102+
## Example
103+
104+
# Print all post titles
105+
query = from p in Post,
106+
select: p.title
107+
{:ok, stage} = Ecto.Adapters.SQL.stream(MyRepo, query)
108+
stage
109+
|> Flow.from_stage()
110+
|> Flow.each(&IO.inspect/1)
111+
|> Flow.start_link()
112+
"""
113+
114+
@callback stream(repo :: module, queryable :: Ecto.Query.t, opts :: Keyword.t) ::
115+
GenServer.on_start()
116+
def stream(repo, queryable, opts \\ []) do
117+
stream = apply(repo, :stream, [queryable, opts])
118+
start =
119+
fn() ->
120+
acc = {:suspend, {0, []}}
121+
{:suspended, _, cont} = Enumerable.reduce(stream, acc, &stream_reduce/2)
122+
{repo, :cont, cont}
123+
end
124+
start_link(repo, :producer, start, &stream_handle/2, &stream_stop/2, opts)
125+
end
126+
127+
## Helpers
128+
129+
defp stream_reduce(v, {1, acc}) do
130+
{:suspend, {0, [v | acc]}}
131+
end
132+
defp stream_reduce(v, {n, acc}) do
133+
{:cont, {n-1, [v | acc]}}
134+
end
135+
136+
defp stream_handle(n, {repo, :cont, cont}) when n > 0 do
137+
case cont.({:cont, {n, []}}) do
138+
{:suspended, {0, acc}, cont} ->
139+
{Enum.reverse(acc), {repo, :cont, cont}}
140+
{status, {_, acc}} when status in [:halted, :done] ->
141+
GenStage.async_notify(self(), {:producer, status})
142+
{Enum.reverse(acc), {repo, status}}
143+
end
144+
end
145+
defp stream_handle(_, {_repo, status} = state) do
146+
GenStage.async_notify(self(), {:producer, status})
147+
{[], state}
148+
end
149+
150+
defp stream_stop(reason, {repo, :cont, cont}) do
151+
_ = cont.({:halt, {0, []}})
152+
stream_stop(repo, reason)
153+
end
154+
defp stream_stop(reason, {repo, status}) when status in [:halted, :done] do
155+
stream_stop(repo, reason)
156+
end
157+
158+
defp stream_stop(_, :normal) do
159+
:ok
160+
end
161+
defp stream_stop(repo, reason) do
162+
apply(repo, :rollback, [reason])
163+
end
164+
end

mix.exs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ defmodule Ecto.Mixfile do
4343
{:decimal, "~> 1.2"},
4444

4545
# Drivers
46-
{:db_connection, "~> 1.1", optional: true},
46+
{:db_connection, "~> 1.1", github: "elixir-ecto/db_connection", branch: "jf-stream_stage", override: true, optional: true},
47+
{:gen_stage, "~> 0.11", optional: true},
48+
{:flow, "~> 0.11", optional: true},
4749
{:postgrex, "~> 0.13.0", optional: true},
4850
{:mariaex, "~> 0.8.0", optional: true},
4951

mix.lock

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
%{"backoff": {:hex, :backoff, "1.1.1"},
22
"connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], []},
3-
"db_connection": {:hex, :db_connection, "1.1.2", "2865c2a4bae0714e2213a0ce60a1b12d76a6efba0c51fbda59c9ab8d1accc7a8", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, optional: false]}, {:poolboy, "~> 1.5", [hex: :poolboy, optional: true]}, {:sbroker, "~> 1.0", [hex: :sbroker, optional: true]}]},
3+
"db_connection": {:git, "https://github.com/elixir-ecto/db_connection.git", "392affacc4a730d748bf95d7261de9e8a9709fe2", [branch: "jf-stream_stage"]},
44
"decimal": {:hex, :decimal, "1.3.1", "157b3cedb2bfcb5359372a7766dd7a41091ad34578296e951f58a946fcab49c6", [:mix], []},
55
"earmark": {:hex, :earmark, "1.0.3", "89bdbaf2aca8bbb5c97d8b3b55c5dd0cff517ecc78d417e87f1d0982e514557b", [:mix], []},
66
"ex_doc": {:hex, :ex_doc, "0.14.5", "c0433c8117e948404d93ca69411dd575ec6be39b47802e81ca8d91017a0cf83c", [:mix], [{:earmark, "~> 1.0", [hex: :earmark, optional: false]}]},
7+
"flow": {:hex, :flow, "0.11.1", "cbc35a0236520cc5fec7b5863cd8431cb1e77297c5c9119055676355eb1fb5a6", [:mix], [{:gen_stage, "~> 0.11.0", [hex: :gen_stage, optional: false]}]},
8+
"gen_stage": {:hex, :gen_stage, "0.11.0", "943bdfa85c75fa624e0a36a9d135baad20a523be040178f5a215444b45c66ea4", [:mix], []},
79
"inch_ex": {:hex, :inch_ex, "0.5.5", "b63f57e281467bd3456461525fdbc9e158c8edbe603da6e3e4671befde796a3d", [:mix], [{:poison, "~> 1.5 or ~> 2.0 or ~> 3.0", [hex: :poison, optional: false]}]},
810
"mariaex": {:hex, :mariaex, "0.8.2", "a9ee64a02fd72579f844934b4cbecca9566593e499125edf6032c58f9d50b5f9", [:mix], [{:db_connection, "~> 1.1", [hex: :db_connection, optional: false]}, {:decimal, "~> 1.0", [hex: :decimal, optional: false]}]},
911
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], []},

0 commit comments

Comments
 (0)