Skip to content

Commit cf90836

Browse files
committed
Update Postgrex/DBConnection/GenStage/Flow
1 parent 57d2a19 commit cf90836

File tree

11 files changed

+173
-296
lines changed

11 files changed

+173
-296
lines changed

integration_test/pg/copy_test.exs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Ecto.Integration.CopyTest do
44
alias Ecto.Integration.TestRepo
55
alias Ecto.Integration.Post
66

7-
test "copy to and from table" do
7+
test "stream copy to and from table" do
88
read = Ecto.Adapters.SQL.stream(TestRepo, "COPY posts TO STDOUT")
99
write = Ecto.Adapters.SQL.stream(TestRepo, "COPY posts FROM STDIN")
1010

@@ -19,4 +19,29 @@ defmodule Ecto.Integration.CopyTest do
1919
assert TestRepo.all(Post) == [one, two]
2020
end
2121
end
22+
23+
test "stage copy to and from table" do
24+
one = TestRepo.insert!(%Post{title: "one"})
25+
two = TestRepo.insert!(%Post{title: "two"})
26+
27+
read = "COPY posts TO STDOUT"
28+
{:ok, producer} = Ecto.Adapters.SQL.start_stage(TestRepo, read, [], [])
29+
30+
data =
31+
[{producer, cancel: :transient}]
32+
|> GenStage.stream()
33+
|> Enum.to_list()
34+
35+
assert TestRepo.delete_all(Post) == {2, nil}
36+
37+
write = "COPY posts FROM STDIN"
38+
{:ok, consumer} = Ecto.Adapters.SQL.start_stage(TestRepo, write, [], [stage_module: Postgrex.CopyConsumer])
39+
40+
{:ok, _} =
41+
data
42+
|> Flow.from_enumerable()
43+
|> Flow.into_stages([consumer])
44+
45+
assert TestRepo.all(Post) == [one, two]
46+
end
2247
end

integration_test/pg/test_helper.exs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ defmodule Ecto.Integration.TestRepo do
3434
use Ecto.Integration.Repo, otp_app: :ecto
3535
end
3636

37+
defmodule Ecto.Integration.TestStage do
38+
use Ecto.Repo.Stage, repo: TestRepo
39+
end
40+
3741
# Pool repo for non-async tests
3842
alias Ecto.Integration.PoolRepo
3943

integration_test/sql/stage.exs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,16 @@ defmodule Ecto.Integration.StageTest do
22
use Ecto.Integration.Case, async: true
33

44
alias Ecto.Integration.TestRepo
5+
alias Ecto.Integration.TestStage
56
alias Ecto.Integration.Post
67
alias Ecto.Integration.Comment
7-
alias Ecto.Adapters.SQL.Stage
88
import Ecto.Query
99

1010
test "stream empty" do
11-
{:ok, stage} = Stage.stream(TestRepo, Post)
11+
{:ok, stage} = TestStage.start_link(Post)
1212
assert to_list(stage) === []
1313

14-
{:ok, stage} = Stage.stream(TestRepo, from p in Post)
14+
{:ok, stage} = TestStage.start_link(from p in Post)
1515
assert to_list(stage) == []
1616
end
1717

@@ -20,7 +20,7 @@ defmodule Ecto.Integration.StageTest do
2020
%Post{} = TestRepo.insert!(%Post{title: "title2"})
2121

2222
query = from(p in "posts", order_by: p.title, select: p.title)
23-
{:ok, stage} = Stage.stream(TestRepo, query)
23+
{:ok, stage} = TestStage.start_link(query)
2424

2525
assert to_list(stage) == ["title1", "title2"]
2626
end
@@ -31,7 +31,7 @@ defmodule Ecto.Integration.StageTest do
3131
%Comment{id: cid1} = TestRepo.insert!(%Comment{text: "1", post_id: p1.id})
3232
%Comment{id: cid2} = TestRepo.insert!(%Comment{text: "2", post_id: p1.id})
3333

34-
{:ok, stage} = Stage.stream(TestRepo, Ecto.assoc(p1, :comments))
34+
{:ok, stage} = TestStage.start_link(Ecto.assoc(p1, :comments))
3535
assert [c1, c2] = to_list(stage)
3636

3737
assert c1.id == cid1
@@ -49,7 +49,7 @@ defmodule Ecto.Integration.StageTest do
4949
%Comment{id: cid4} = TestRepo.insert!(%Comment{text: "4", post_id: p2.id})
5050

5151
query = from(p in Post, preload: [:comments], select: p)
52-
{:ok, stage} = Stage.stream(TestRepo, query, [max_rows: 2])
52+
{:ok, stage} = TestStage.start_link(query, [max_rows: 2])
5353

5454
assert [p1, p2, p3] = stage |> to_list() |> sort_by_id()
5555

lib/ecto/adapters/postgres/connection.ex

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ if Code.ensure_loaded?(Postgrex) do
1818
|> Postgrex.child_spec()
1919
end
2020

21+
def stage_spec(pool, statement, params, opts) do
22+
stage_mod = Keyword.get(opts, :stage_module, Postgrex.Producer)
23+
Supervisor.Spec.worker(stage_mod, [pool, statement, params, opts])
24+
end
25+
2126
def to_constraints(%Postgrex.Error{postgres: %{code: :unique_violation, constraint: constraint}}),
2227
do: [unique: constraint]
2328
def to_constraints(%Postgrex.Error{postgres: %{code: :foreign_key_violation, constraint: constraint}}),

lib/ecto/adapters/sql.ex

Lines changed: 84 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ defmodule Ecto.Adapters.SQL do
6969
end
7070

7171
@doc false
72-
def stream(repo, meta, query, params, process, opts) do
73-
Ecto.Adapters.SQL.stream(repo, meta, query, params, process, opts)
72+
def stream(repo, meta, query, params, process, flat_map, opts) do
73+
Ecto.Adapters.SQL.stream(repo, meta, query, params, process, flat_map, opts)
7474
end
7575

7676
@doc false
@@ -136,8 +136,16 @@ defmodule Ecto.Adapters.SQL do
136136
:ok
137137
end
138138

139+
## GenStage
140+
141+
@doc false
142+
def stage_spec(repo, meta, query, params, process, flat_map, opts) do
143+
Ecto.Adapters.SQL.stage_spec(repo, meta, query, params, process, flat_map, opts)
144+
end
145+
139146
defoverridable [prepare: 2, execute: 6, insert: 6, update: 6, delete: 4, insert_all: 7,
140-
execute_ddl: 3, loaders: 2, dumpers: 2, autogenerate: 1, ensure_all_started: 2]
147+
execute_ddl: 3, loaders: 2, dumpers: 2, autogenerate: 1, ensure_all_started: 2,
148+
stage_spec: 7]
141149
end
142150
end
143151

@@ -477,55 +485,60 @@ defmodule Ecto.Adapters.SQL do
477485
"""
478486
@spec stream(Ecto.Repo.t, String.t, [term], Keyword.t) :: Enum.t
479487
def stream(repo, sql, params \\ [], opts \\ []) do
480-
Ecto.Adapters.SQL.Stream.__build__(repo, sql, params, fn x -> x end, opts)
488+
Ecto.Adapters.SQL.Stream.__build__(repo, sql, params, fn x -> x end, nil, opts)
481489
end
482490

483491
@doc false
484-
def stream(repo, meta, prepared, params, mapper, opts) do
485-
do_stream(repo, meta, prepared, params, mapper, put_source(opts, meta))
492+
def stream(repo, meta, prepared, params, mapper, flat_map, opts) do
493+
do_stream(repo, meta, prepared, params, mapper, flat_map, put_source(opts, meta))
486494
end
487495

488-
def do_stream(repo, _meta, {:cache, _, {_, prepared}}, params, nil, opts) do
489-
prepare_stream(repo, prepared, params, nil, opts)
496+
def do_stream(repo, _meta, {:cache, _, {_, prepared}}, params, nil, flat_map, opts) do
497+
prepare_stream(repo, prepared, params, nil, flat_map, opts)
490498
end
491499

492-
def do_stream(repo, %{fields: fields, sources: sources}, {:cache, _, {_, prepared}}, params, process, opts) do
500+
def do_stream(repo, %{fields: fields, sources: sources}, {:cache, _, {_, prepared}}, params, process, flat_map, opts) do
493501
mapper = &process_row(&1, process, fields, sources)
494-
prepare_stream(repo, prepared, params, mapper, opts)
502+
prepare_stream(repo, prepared, params, mapper, flat_map, opts)
495503
end
496504

497-
def do_stream(repo, _, {:cached, _, {_, cached}}, params, nil, opts) do
498-
prepare_stream(repo, String.Chars.to_string(cached), params, nil, opts)
505+
def do_stream(repo, _, {:cached, _, {_, cached}}, params, nil, flat_map, opts) do
506+
prepare_stream(repo, String.Chars.to_string(cached), params, nil, flat_map, opts)
499507
end
500508

501-
def do_stream(repo, %{fields: fields, sources: sources}, {:cached, _, {_, cached}}, params, process, opts) do
509+
def do_stream(repo, %{fields: fields, sources: sources}, {:cached, _, {_, cached}}, params, process, flat_map, opts) do
502510
mapper = &process_row(&1, process, fields, sources)
503-
prepare_stream(repo, String.Chars.to_string(cached), params, mapper, opts)
511+
prepare_stream(repo, String.Chars.to_string(cached), params, mapper, flat_map, opts)
504512
end
505513

506-
def do_stream(repo, _meta, {:nocache, {_id, prepared}}, params, nil, opts) do
507-
prepare_stream(repo, prepared, params, nil, opts)
514+
def do_stream(repo, _meta, {:nocache, {_id, prepared}}, params, nil, flat_map, opts) do
515+
prepare_stream(repo, prepared, params, nil, flat_map, opts)
508516
end
509517

510-
def do_stream(repo, %{fields: fields, sources: sources}, {:nocache, {_id, prepared}}, params, process, opts) do
518+
def do_stream(repo, %{fields: fields, sources: sources}, {:nocache, {_id, prepared}}, params, process, flat_map, opts) do
511519
mapper = &process_row(&1, process, fields, sources)
512-
prepare_stream(repo, prepared, params, mapper, opts)
520+
prepare_stream(repo, prepared, params, mapper, flat_map, opts)
513521
end
514522

515-
defp prepare_stream(repo, prepared, params, mapper, opts) do
516-
repo
517-
|> Ecto.Adapters.SQL.Stream.__build__(prepared, params, mapper, opts)
518-
|> Stream.map(fn(%{num_rows: nrows, rows: rows}) -> {nrows, rows} end)
523+
defp prepare_stream(repo, prepared, params, mapper, flat_map, opts) do
524+
Ecto.Adapters.SQL.Stream.__build__(repo, prepared, params, mapper, flat_map, opts)
525+
end
526+
527+
@doc false
528+
def stream_mapper(conn, %{num_rows: nrows, rows: rows}, conn, flat_map) do
529+
flat_map.({nrows, rows})
519530
end
520531

521532
@doc false
522-
def reduce(repo, statement, params, mapper, opts, acc, fun) do
533+
def reduce(repo, statement, params, mapper, flat_map, opts, acc, fun) do
523534
{repo_mod, pool, default_opts} = lookup_pool(repo)
524-
opts = [decode_mapper: mapper] ++ with_log(repo, params, opts ++ default_opts)
525535
case get_conn(pool) do
526536
nil ->
527537
raise "cannot reduce stream outside of transaction"
528538
conn ->
539+
stream_mapper = flat_map && {__MODULE__, :stream_mapper, [conn, flat_map]}
540+
map_opts = [decode_mapper: mapper, stream_mapper: stream_mapper]
541+
opts = map_opts ++ with_log(repo, params, opts ++ default_opts)
529542
apply(repo_mod.__sql__, :stream, [conn, statement, params, opts])
530543
|> Enumerable.reduce(acc, fun)
531544
end
@@ -627,29 +640,56 @@ defmodule Ecto.Adapters.SQL do
627640
end
628641
end
629642

643+
@doc """
644+
Return child specification for `GenStage` process running query.
645+
"""
646+
def stage_spec(repo, statement, params, opts) do
647+
stage_spec(repo, statement, params, fn x -> x end, nil, opts)
648+
end
649+
650+
@doc """
651+
Start link `GenStage` process running query.
652+
"""
653+
def start_stage(repo, statement, params, opts) do
654+
case stage_spec(repo, statement, params, opts) do
655+
{_, {mod, fun, args}, _, _, _, _} ->
656+
apply(mod, fun, args)
657+
%{start: {mod, fun, args}} ->
658+
apply(mod, fun, args)
659+
end
660+
end
661+
630662
@doc false
631-
def stage(fun, repo, start, handle, stop, opts) do
663+
def stage_spec(repo, meta, prepared, params, mapper, flat_map, opts) do
664+
stream = stream(repo, meta, prepared, params, mapper, flat_map, opts)
665+
%Ecto.Adapters.SQL.Stream{repo: repo, statement: statement, params: params,
666+
mapper: mapper, flat_map: flat_map, opts: opts} = stream
667+
stage_spec(repo, statement, params, mapper, flat_map, opts)
668+
end
669+
670+
defp stage_spec(repo, statement, params, mapper, flat_map, opts) do
632671
{repo_mod, pool, default_opts} = lookup_pool(repo)
633-
default_opts =
634-
default_opts
635-
|> Keyword.delete(:name)
636-
|> Keyword.put_new(:caller, self())
637-
opts = with_log(repo_mod, [], opts ++ default_opts)
638-
start =
639-
fn(conn) ->
640-
put_conn(pool, conn)
641-
start.()
642-
end
643-
handle = fn(_, arg, state) -> handle.(arg, state) end
644-
stop =
645-
fn(_, reason, state) ->
646-
try do
647-
stop.(reason, state)
648-
after
649-
delete_conn(pool)
650-
end
672+
stage_opts = Keyword.delete(default_opts, :name)
673+
stream_mapper = flat_map && {__MODULE__, :stage_mapper, [pool, flat_map]}
674+
map_opts = [decode_mapper: mapper, stream_mapper: stream_mapper]
675+
opts = map_opts ++ with_log(repo, params, opts ++ stage_opts)
676+
opts = Keyword.put_new(opts, :caller, self())
677+
apply(repo_mod.__sql__, :stage_spec, [pool, statement, params, opts])
678+
end
679+
680+
@doc false
681+
def stage_mapper(stream_conn, res, pool, flat_map) do
682+
cur_conn = get_conn(pool)
683+
try do
684+
put_conn(pool, stream_conn)
685+
stream_mapper(stream_conn, res, stream_conn, flat_map)
686+
after
687+
if cur_conn do
688+
put_conn(pool, cur_conn)
689+
else
690+
delete_conn(pool)
651691
end
652-
fun.(pool, start, handle, stop, opts)
692+
end
653693
end
654694

655695
## Log

lib/ecto/adapters/sql/connection.ex

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@ defmodule Ecto.Adapters.SQL.Connection do
1515
"""
1616
@callback child_spec(options :: Keyword.t) :: {module, Keyword.t}
1717

18+
@doc """
19+
Receives `pool`, `statement`, `params` and `options` and returns `GenStage`
20+
child specification
21+
"""
22+
@callback stage_spec(pool :: GenServer.server, statement :: prepared, params :: [term], options :: Keyword.t) ::
23+
Supervisor.spec()
24+
1825
@doc """
1926
Prepares and executes the given query with `DBConnection`.
2027
"""

0 commit comments

Comments
 (0)