Skip to content

Commit 2879f85

Browse files
committed
Move GenStage API to Ecto.Repo (via Ecto.Adapter.Stage)
1 parent cf90836 commit 2879f85

File tree

8 files changed

+74
-73
lines changed

8 files changed

+74
-73
lines changed

integration_test/pg/copy_test.exs

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,4 @@ defmodule Ecto.Integration.CopyTest do
1919
assert TestRepo.all(Post) == [one, two]
2020
end
2121
end
22-
23-
test "stage copy to and from table" do
24-
one = TestRepo.insert!(%Post{title: "one"})
25-
two = TestRepo.insert!(%Post{title: "two"})
26-
27-
read = "COPY posts TO STDOUT"
28-
{:ok, producer} = Ecto.Adapters.SQL.start_stage(TestRepo, read, [], [])
29-
30-
data =
31-
[{producer, cancel: :transient}]
32-
|> GenStage.stream()
33-
|> Enum.to_list()
34-
35-
assert TestRepo.delete_all(Post) == {2, nil}
36-
37-
write = "COPY posts FROM STDIN"
38-
{:ok, consumer} = Ecto.Adapters.SQL.start_stage(TestRepo, write, [], [stage_module: Postgrex.CopyConsumer])
39-
40-
{:ok, _} =
41-
data
42-
|> Flow.from_enumerable()
43-
|> Flow.into_stages([consumer])
44-
45-
assert TestRepo.all(Post) == [one, two]
46-
end
4722
end

integration_test/pg/test_helper.exs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,6 @@ defmodule Ecto.Integration.TestRepo do
3434
use Ecto.Integration.Repo, otp_app: :ecto
3535
end
3636

37-
defmodule Ecto.Integration.TestStage do
38-
use Ecto.Repo.Stage, repo: TestRepo
39-
end
40-
4137
# Pool repo for non-async tests
4238
alias Ecto.Integration.PoolRepo
4339

integration_test/sql/stage.exs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,15 @@ defmodule Ecto.Integration.StageTest do
22
use Ecto.Integration.Case, async: true
33

44
alias Ecto.Integration.TestRepo
5-
alias Ecto.Integration.TestStage
65
alias Ecto.Integration.Post
76
alias Ecto.Integration.Comment
87
import Ecto.Query
98

109
test "stream empty" do
11-
{:ok, stage} = TestStage.start_link(Post)
10+
{:ok, stage} = TestRepo.start_producer(Post)
1211
assert to_list(stage) === []
1312

14-
{:ok, stage} = TestStage.start_link(from p in Post)
13+
{:ok, stage} = TestRepo.start_producer(from p in Post)
1514
assert to_list(stage) == []
1615
end
1716

@@ -20,7 +19,7 @@ defmodule Ecto.Integration.StageTest do
2019
%Post{} = TestRepo.insert!(%Post{title: "title2"})
2120

2221
query = from(p in "posts", order_by: p.title, select: p.title)
23-
{:ok, stage} = TestStage.start_link(query)
22+
{:ok, stage} = TestRepo.start_producer(query)
2423

2524
assert to_list(stage) == ["title1", "title2"]
2625
end
@@ -31,7 +30,7 @@ defmodule Ecto.Integration.StageTest do
3130
%Comment{id: cid1} = TestRepo.insert!(%Comment{text: "1", post_id: p1.id})
3231
%Comment{id: cid2} = TestRepo.insert!(%Comment{text: "2", post_id: p1.id})
3332

34-
{:ok, stage} = TestStage.start_link(Ecto.assoc(p1, :comments))
33+
{:ok, stage} = TestRepo.start_producer(Ecto.assoc(p1, :comments))
3534
assert [c1, c2] = to_list(stage)
3635

3736
assert c1.id == cid1
@@ -49,7 +48,7 @@ defmodule Ecto.Integration.StageTest do
4948
%Comment{id: cid4} = TestRepo.insert!(%Comment{text: "4", post_id: p2.id})
5049

5150
query = from(p in Post, preload: [:comments], select: p)
52-
{:ok, stage} = TestStage.start_link(query, [max_rows: 2])
51+
{:ok, stage} = TestRepo.start_producer(query, [max_rows: 2])
5352

5453
assert [p1, p2, p3] = stage |> to_list() |> sort_by_id()
5554

lib/ecto/adapters/postgres/connection.ex

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@ if Code.ensure_loaded?(Postgrex) do
1818
|> Postgrex.child_spec()
1919
end
2020

21-
def stage_spec(pool, statement, params, opts) do
22-
stage_mod = Keyword.get(opts, :stage_module, Postgrex.Producer)
23-
Supervisor.Spec.worker(stage_mod, [pool, statement, params, opts])
21+
def start_producer(pool, statement, params, opts) do
22+
Postgrex.Producer.start_link(pool, statement, params, opts)
2423
end
2524

2625
def to_constraints(%Postgrex.Error{postgres: %{code: :unique_violation, constraint: constraint}}),

lib/ecto/adapters/sql.ex

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ defmodule Ecto.Adapters.SQL do
1717
@behaviour Ecto.Adapter
1818
@behaviour Ecto.Adapter.Migration
1919
@behaviour Ecto.Adapter.Transaction
20+
@behaviour Ecto.Adapter.Stage
2021

2122
@conn __MODULE__.Connection
2223
@adapter unquote(adapter)
@@ -139,13 +140,13 @@ defmodule Ecto.Adapters.SQL do
139140
## GenStage
140141

141142
@doc false
142-
def stage_spec(repo, meta, query, params, process, flat_map, opts) do
143-
Ecto.Adapters.SQL.stage_spec(repo, meta, query, params, process, flat_map, opts)
143+
def start_producer(repo, meta, query, params, process, flat_map, opts) do
144+
Ecto.Adapters.SQL.start_producer(repo, meta, query, params, process, flat_map, opts)
144145
end
145146

146147
defoverridable [prepare: 2, execute: 6, insert: 6, update: 6, delete: 4, insert_all: 7,
147148
execute_ddl: 3, loaders: 2, dumpers: 2, autogenerate: 1, ensure_all_started: 2,
148-
stage_spec: 7]
149+
start_producer: 7]
149150
end
150151
end
151152

@@ -640,41 +641,29 @@ defmodule Ecto.Adapters.SQL do
640641
end
641642
end
642643

643-
@doc """
644-
Return child specification for `GenStage` process running query.
645-
"""
646-
def stage_spec(repo, statement, params, opts) do
647-
stage_spec(repo, statement, params, fn x -> x end, nil, opts)
648-
end
649-
650-
@doc """
651-
Start link `GenStage` process running query.
652-
"""
653-
def start_stage(repo, statement, params, opts) do
654-
case stage_spec(repo, statement, params, opts) do
655-
{_, {mod, fun, args}, _, _, _, _} ->
656-
apply(mod, fun, args)
657-
%{start: {mod, fun, args}} ->
658-
apply(mod, fun, args)
659-
end
660-
end
661-
662644
@doc false
663-
def stage_spec(repo, meta, prepared, params, mapper, flat_map, opts) do
645+
def start_producer(repo, meta, prepared, params, mapper, flat_map, opts) do
664646
stream = stream(repo, meta, prepared, params, mapper, flat_map, opts)
665647
%Ecto.Adapters.SQL.Stream{repo: repo, statement: statement, params: params,
666648
mapper: mapper, flat_map: flat_map, opts: opts} = stream
667-
stage_spec(repo, statement, params, mapper, flat_map, opts)
649+
start_producer(repo, statement, params, mapper, flat_map, opts)
650+
end
651+
652+
@doc """
653+
Start link a `GenStage` producer that streams the result of a query.
654+
"""
655+
def start_producer(repo, statement, params, opts) do
656+
start_producer(repo, statement, params, fn x -> x end, nil, opts)
668657
end
669658

670-
defp stage_spec(repo, statement, params, mapper, flat_map, opts) do
659+
defp start_producer(repo, statement, params, mapper, flat_map, opts) do
671660
{repo_mod, pool, default_opts} = lookup_pool(repo)
672661
stage_opts = Keyword.delete(default_opts, :name)
673662
stream_mapper = flat_map && {__MODULE__, :stage_mapper, [pool, flat_map]}
674663
map_opts = [decode_mapper: mapper, stream_mapper: stream_mapper]
675664
opts = map_opts ++ with_log(repo, params, opts ++ stage_opts)
676665
opts = Keyword.put_new(opts, :caller, self())
677-
apply(repo_mod.__sql__, :stage_spec, [pool, statement, params, opts])
666+
repo_mod.__sql__.start_producer(pool, statement, params, opts)
678667
end
679668

680669
@doc false

lib/ecto/adapters/sql/connection.ex

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,6 @@ defmodule Ecto.Adapters.SQL.Connection do
1515
"""
1616
@callback child_spec(options :: Keyword.t) :: {module, Keyword.t}
1717

18-
@doc """
19-
Receives `pool`, `statement`, `params` and `options` and returns `GenStage`
20-
child specification
21-
"""
22-
@callback stage_spec(pool :: GenServer.server, statement :: prepared, params :: [term], options :: Keyword.t) ::
23-
Supervisor.spec()
24-
2518
@doc """
2619
Prepares and executes the given query with `DBConnection`.
2720
"""
@@ -101,4 +94,12 @@ defmodule Ecto.Adapters.SQL.Connection do
10194
Receives a DDL command and returns a query that executes it.
10295
"""
10396
@callback execute_ddl(command :: Ecto.Adapter.Migration.command) :: String.t | [iodata]
97+
98+
## GenStage
99+
100+
@doc """
101+
Start and link a `GenStage` producer that streams the result of a query.
102+
"""
103+
@callback start_producer(pool :: GenServer.server, statement :: prepared, params :: [term], options :: Keyword.t) ::
104+
GenServer.on_start
104105
end

lib/ecto/repo.ex

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,12 @@ defmodule Ecto.Repo do
229229
def load(schema_or_types, data) do
230230
Ecto.Repo.Schema.load(@adapter, schema_or_types, data)
231231
end
232+
233+
if function_exported?(@adapter, :start_producer, 7) do
234+
def start_producer(queryable, opts \\ []) do
235+
Ecto.Repo.Queryable.start_producer(__MODULE__, @adapter, queryable, opts)
236+
end
237+
end
232238
end
233239
end
234240

@@ -1009,4 +1015,40 @@ defmodule Ecto.Repo do
10091015
10101016
"""
10111017
@callback load(Ecto.Schema.t | map(), map() | Keyword.t | {list, list}) :: Ecto.Schema.t | map()
1018+
1019+
@doc """
1020+
Starts and links to a `GenStage` producer that executes a query.
1021+
1022+
1023+
Returns a `GenStage` that produces all entries from the data store to its
1024+
consumer(s). SQL adapters, such as Postgres and MySQL, will wrap the query
1025+
inside a transaction.
1026+
1027+
May raise `Ecto.QueryError` if query validation fails.
1028+
1029+
## Options
1030+
1031+
* `:prefix` - The prefix to run the query on (such as the schema path
1032+
in Postgres or the database in MySQL). This overrides the prefix set
1033+
in the query
1034+
1035+
* `:max_rows` - The number of rows to load from the database as we stream.
1036+
It is supported at least by Postgres and MySQL and defaults to 500.
1037+
1038+
See the "Shared options" section at the module documentation.
1039+
1040+
## Example
1041+
1042+
# Fetch all post titles
1043+
query = from p in Post,
1044+
select: p.title
1045+
{:ok, producer} = MyRepo.start_producer(query)
1046+
[{producer, cancel: :transient}]
1047+
|> Flow.from_stages()
1048+
|> Flow.map(&IO.inspect/1)
1049+
|> Flow.run()
1050+
"""
1051+
@callback start_producer(queryable :: Ecto.Query.t, opts :: Keyword.t) ::
1052+
GenServer.on_start
1053+
@optional_callbacks [start_producer: 2]
10121054
end

lib/ecto/repo/queryable.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,13 +109,13 @@ defmodule Ecto.Repo.Queryable do
109109
execute(:delete_all, repo, adapter, query, opts)
110110
end
111111

112-
def stage_spec(repo, adapter, queryable, opts) when is_list(opts) do
112+
def start_producer(repo, adapter, queryable, opts) when is_list(opts) do
113113
query =
114114
queryable
115115
|> Ecto.Queryable.to_query
116116
|> Ecto.Query.Planner.returning(true)
117117
|> attach_prefix(opts)
118-
lazy(&adapter.stage_spec/7, :all, repo, adapter, query, opts)
118+
lazy(&adapter.start_producer/7, :all, repo, adapter, query, opts)
119119
end
120120

121121
## Helpers

0 commit comments

Comments
 (0)