Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions integration_test/pg/all_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ Code.require_file "../sql/lock.exs", __DIR__
Code.require_file "../sql/migration.exs", __DIR__
Code.require_file "../sql/sandbox.exs", __DIR__
Code.require_file "../sql/sql.exs", __DIR__
Code.require_file "../sql/stage.exs", __DIR__
Code.require_file "../sql/stream.exs", __DIR__
Code.require_file "../sql/subquery.exs", __DIR__
Code.require_file "../sql/transaction.exs", __DIR__
Expand Down
2 changes: 1 addition & 1 deletion integration_test/pg/copy_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Ecto.Integration.CopyTest do
alias Ecto.Integration.TestRepo
alias Ecto.Integration.Post

test "copy to and from table" do
test "stream copy to and from table" do
read = Ecto.Adapters.SQL.stream(TestRepo, "COPY posts TO STDOUT")
write = Ecto.Adapters.SQL.stream(TestRepo, "COPY posts FROM STDIN")

Expand Down
83 changes: 83 additions & 0 deletions integration_test/sql/stage.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
defmodule Ecto.Integration.StageTest do
use Ecto.Integration.Case, async: true

alias Ecto.Integration.TestRepo
alias Ecto.Integration.Post
alias Ecto.Integration.Comment
import Ecto.Query

test "stream empty" do
{:ok, stage} = TestRepo.start_producer(Post)
assert to_list(stage) === []

{:ok, stage} = TestRepo.start_producer(from p in Post)
assert to_list(stage) == []
end

test "stream without schema" do
%Post{} = TestRepo.insert!(%Post{title: "title1"})
%Post{} = TestRepo.insert!(%Post{title: "title2"})

query = from(p in "posts", order_by: p.title, select: p.title)
{:ok, stage} = TestRepo.start_producer(query)

assert to_list(stage) == ["title1", "title2"]
end

test "stream with assoc" do
p1 = TestRepo.insert!(%Post{title: "1"})

%Comment{id: cid1} = TestRepo.insert!(%Comment{text: "1", post_id: p1.id})
%Comment{id: cid2} = TestRepo.insert!(%Comment{text: "2", post_id: p1.id})

{:ok, stage} = TestRepo.start_producer(Ecto.assoc(p1, :comments))
assert [c1, c2] = to_list(stage)

assert c1.id == cid1
assert c2.id == cid2
end

test "stream with preload" do
p1 = TestRepo.insert!(%Post{title: "1"})
p2 = TestRepo.insert!(%Post{title: "2"})
TestRepo.insert!(%Post{title: "3"})

%Comment{id: cid1} = TestRepo.insert!(%Comment{text: "1", post_id: p1.id})
%Comment{id: cid2} = TestRepo.insert!(%Comment{text: "2", post_id: p1.id})
%Comment{id: cid3} = TestRepo.insert!(%Comment{text: "3", post_id: p2.id})
%Comment{id: cid4} = TestRepo.insert!(%Comment{text: "4", post_id: p2.id})

query = from(p in Post, preload: [:comments], select: p)
{:ok, stage} = TestRepo.start_producer(query, [max_rows: 2])

assert [p1, p2, p3] = stage |> to_list() |> sort_by_id()

assert [%Comment{id: ^cid1}, %Comment{id: ^cid2}] = p1.comments |> sort_by_id
assert [%Comment{id: ^cid3}, %Comment{id: ^cid4}] = p2.comments |> sort_by_id

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a function call when a pipeline is only one function long

assert [] = p3.comments
end

test "insert entries" do
{:ok, stage} = TestRepo.start_consumer(Post)

mon = Process.monitor(stage)

[[title: "hello"], [title: "world"]]
|> Flow.from_enumerable()
|> Flow.into_stages([{stage, [cancel: :transient, max_demand: 1]}])

assert_receive {:DOWN, ^mon, _, _, :normal}

assert [%Post{title: "hello"}, %Post{title: "world"}] = TestRepo.all(Post)
end

defp to_list(stage) do
stage
|> Flow.from_stage()
|> Enum.to_list()
end

defp sort_by_id(values) do
Enum.sort_by(values, &(&1.id))
end
end
32 changes: 32 additions & 0 deletions lib/ecto/adapter/stage.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule Ecto.Adapter.Stage do
@moduledoc """
Specifies the adapter `GenStage` API.
"""

@type flat_map :: ({integer, list | nil} -> list)
@type insert_all :: (list -> integer)
@type options :: Keyword.t

@doc """
Starts and links to a `GenStage` producer.

The producer executes a query and returns the result to consumer(s).

See `Ecto.Repo.start_producer/2`.
"""
@callback start_producer(repo :: Ecto.Repo.t, Ecto.Adapter.query_meta, query, params :: list(), Ecto.Adapter.process | nil, flat_map, options) ::
GenServer.on_start when
query: {:nocache, Ecto.Adapter.prepared} |
{:cached, (Ecto.Adapter.prepared -> :ok), Ecto.Adapter.cached} |
{:cache, (Ecto.Adapter.cached -> :ok), Ecto.Adapter.prepared}

@doc """
Starts and links to a `GenStage` consumer.

The consumers inserts the entries it receives for the schema.

See `Ecto.Repo.start_consumer/2`
"""
@callback start_consumer(repo :: Ecto.Repo.t, insert_all, options) ::
GenServer.on_start
end
8 changes: 8 additions & 0 deletions lib/ecto/adapters/postgres/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ if Code.ensure_loaded?(Postgrex) do
|> Postgrex.child_spec()
end

def start_producer(pool, statement, params, opts) do
Postgrex.Producer.start_link(pool, statement, params, opts)
end

def start_consumer(pool, fun, opts) do
Postgrex.Consumer.start_link(pool, fun, opts)
end

def to_constraints(%Postgrex.Error{postgres: %{code: :unique_violation, constraint: constraint}}),
do: [unique: constraint]
def to_constraints(%Postgrex.Error{postgres: %{code: :foreign_key_violation, constraint: constraint}}),
Expand Down
126 changes: 102 additions & 24 deletions lib/ecto/adapters/sql.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ defmodule Ecto.Adapters.SQL do
@behaviour Ecto.Adapter
@behaviour Ecto.Adapter.Migration
@behaviour Ecto.Adapter.Transaction
@behaviour Ecto.Adapter.Stage

@conn __MODULE__.Connection
@adapter unquote(adapter)
Expand Down Expand Up @@ -69,8 +70,8 @@ defmodule Ecto.Adapters.SQL do
end

@doc false
def stream(repo, meta, query, params, process, opts) do
Ecto.Adapters.SQL.stream(repo, meta, query, params, process, opts)
def stream(repo, meta, query, params, process, flat_map, opts) do
Ecto.Adapters.SQL.stream(repo, meta, query, params, process, flat_map, opts)
end

@doc false
Expand Down Expand Up @@ -136,8 +137,21 @@ defmodule Ecto.Adapters.SQL do
:ok
end

## GenStage

@doc false
def start_producer(repo, meta, query, params, process, flat_map, opts) do
Ecto.Adapters.SQL.start_producer(repo, meta, query, params, process, flat_map, opts)
end

@doc false
def start_consumer(repo, insert, opts) do
Ecto.Adapters.SQL.start_consumer(repo, insert, opts)
end

defoverridable [prepare: 2, execute: 6, insert: 6, update: 6, delete: 4, insert_all: 7,
execute_ddl: 3, loaders: 2, dumpers: 2, autogenerate: 1, ensure_all_started: 2]
execute_ddl: 3, loaders: 2, dumpers: 2, autogenerate: 1, ensure_all_started: 2,
start_producer: 7, start_consumer: 3]
end
end

Expand Down Expand Up @@ -477,55 +491,60 @@ defmodule Ecto.Adapters.SQL do
"""
@spec stream(Ecto.Repo.t, String.t, [term], Keyword.t) :: Enum.t
def stream(repo, sql, params \\ [], opts \\ []) do
Ecto.Adapters.SQL.Stream.__build__(repo, sql, params, fn x -> x end, opts)
Ecto.Adapters.SQL.Stream.__build__(repo, sql, params, fn x -> x end, nil, opts)
end

@doc false
def stream(repo, meta, prepared, params, mapper, opts) do
do_stream(repo, meta, prepared, params, mapper, put_source(opts, meta))
def stream(repo, meta, prepared, params, mapper, flat_map, opts) do
do_stream(repo, meta, prepared, params, mapper, flat_map, put_source(opts, meta))
end

def do_stream(repo, _meta, {:cache, _, {_, prepared}}, params, nil, opts) do
prepare_stream(repo, prepared, params, nil, opts)
def do_stream(repo, _meta, {:cache, _, {_, prepared}}, params, nil, flat_map, opts) do
prepare_stream(repo, prepared, params, nil, flat_map, opts)
end

def do_stream(repo, %{fields: fields, sources: sources}, {:cache, _, {_, prepared}}, params, process, opts) do
def do_stream(repo, %{fields: fields, sources: sources}, {:cache, _, {_, prepared}}, params, process, flat_map, opts) do
mapper = &process_row(&1, process, fields, sources)
prepare_stream(repo, prepared, params, mapper, opts)
prepare_stream(repo, prepared, params, mapper, flat_map, opts)
end

def do_stream(repo, _, {:cached, _, {_, cached}}, params, nil, opts) do
prepare_stream(repo, String.Chars.to_string(cached), params, nil, opts)
def do_stream(repo, _, {:cached, _, {_, cached}}, params, nil, flat_map, opts) do
prepare_stream(repo, String.Chars.to_string(cached), params, nil, flat_map, opts)
end

def do_stream(repo, %{fields: fields, sources: sources}, {:cached, _, {_, cached}}, params, process, opts) do
def do_stream(repo, %{fields: fields, sources: sources}, {:cached, _, {_, cached}}, params, process, flat_map, opts) do
mapper = &process_row(&1, process, fields, sources)
prepare_stream(repo, String.Chars.to_string(cached), params, mapper, opts)
prepare_stream(repo, String.Chars.to_string(cached), params, mapper, flat_map, opts)
end

def do_stream(repo, _meta, {:nocache, {_id, prepared}}, params, nil, opts) do
prepare_stream(repo, prepared, params, nil, opts)
def do_stream(repo, _meta, {:nocache, {_id, prepared}}, params, nil, flat_map, opts) do
prepare_stream(repo, prepared, params, nil, flat_map, opts)
end

def do_stream(repo, %{fields: fields, sources: sources}, {:nocache, {_id, prepared}}, params, process, opts) do
def do_stream(repo, %{fields: fields, sources: sources}, {:nocache, {_id, prepared}}, params, process, flat_map, opts) do
mapper = &process_row(&1, process, fields, sources)
prepare_stream(repo, prepared, params, mapper, opts)
prepare_stream(repo, prepared, params, mapper, flat_map, opts)
end

defp prepare_stream(repo, prepared, params, mapper, flat_map, opts) do
Ecto.Adapters.SQL.Stream.__build__(repo, prepared, params, mapper, flat_map, opts)
end

defp prepare_stream(repo, prepared, params, mapper, opts) do
repo
|> Ecto.Adapters.SQL.Stream.__build__(prepared, params, mapper, opts)
|> Stream.map(fn(%{num_rows: nrows, rows: rows}) -> {nrows, rows} end)
@doc false
def stream_mapper(conn, %{num_rows: nrows, rows: rows}, conn, flat_map) do
flat_map.({nrows, rows})
end

@doc false
def reduce(repo, statement, params, mapper, opts, acc, fun) do
def reduce(repo, statement, params, mapper, flat_map, opts, acc, fun) do
{repo_mod, pool, default_opts} = lookup_pool(repo)
opts = [decode_mapper: mapper] ++ with_log(repo, params, opts ++ default_opts)
case get_conn(pool) do
nil ->
raise "cannot reduce stream outside of transaction"
conn ->
stream_mapper = flat_map && {__MODULE__, :stream_mapper, [conn, flat_map]}
map_opts = [decode_mapper: mapper, stream_mapper: stream_mapper]
opts = map_opts ++ with_log(repo, params, opts ++ default_opts)
apply(repo_mod.__sql__, :stream, [conn, statement, params, opts])
|> Enumerable.reduce(acc, fun)
end
Expand Down Expand Up @@ -627,6 +646,65 @@ defmodule Ecto.Adapters.SQL do
end
end

@doc false
def start_producer(repo, meta, prepared, params, mapper, flat_map, opts) do
stream = stream(repo, meta, prepared, params, mapper, flat_map, opts)
%Ecto.Adapters.SQL.Stream{repo: repo, statement: statement, params: params,
mapper: mapper, flat_map: flat_map, opts: opts} = stream
start_producer(repo, statement, params, mapper, flat_map, opts)
end

@doc """
Start link a `GenStage` producer that streams the result of a query.
"""
def start_producer(repo, statement, params, opts \\ []) do
start_producer(repo, statement, params, fn x -> x end, nil, opts)
end

defp start_producer(repo, statement, params, mapper, flat_map, opts) do
{repo_mod, pool, default_opts} = lookup_pool(repo)
stage_opts = Keyword.delete(default_opts, :name)
stream_mapper = flat_map && {__MODULE__, :stage_mapper, [pool, flat_map]}
map_opts = [decode_mapper: mapper, stream_mapper: stream_mapper]
opts = map_opts ++ with_log(repo, params, opts ++ stage_opts)
opts = Keyword.put_new(opts, :caller, self())
repo_mod.__sql__.start_producer(pool, statement, params, opts)
end

@doc false
def stage_mapper(stream_conn, res, pool, flat_map) do
cur_conn = get_conn(pool)
try do
put_conn(pool, stream_conn)
stream_mapper(stream_conn, res, stream_conn, flat_map)
after
if cur_conn do
put_conn(pool, cur_conn)
else
delete_conn(pool)
end
end
end

@doc """
Start link a `GenStage` consumers that runs a `transaction` for every batch of
events and maintains a state.
"""
def start_consumer(repo, fun, opts \\ []) when is_function(fun, 1) do
{repo_mod, pool, default_opts} = lookup_pool(repo)

transaction =
fn(conn, rows) ->
put_conn(pool, conn)
fun.(rows)
end

stage_opts = Keyword.delete(default_opts, :name)
opts = with_log(repo, [], opts ++ stage_opts)
opts = Keyword.put_new(opts, :caller, self())
repo_mod.__sql__.start_consumer(pool, transaction, opts)
end

## Log

defp with_log(repo, params, opts) do
Expand Down
17 changes: 17 additions & 0 deletions lib/ecto/adapters/sql/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ defmodule Ecto.Adapters.SQL.Connection do
@typedoc "The cache query which is a DBConnection Query"
@type cached :: map

@typedoc "The DBConection transaction fun"
@type transaction :: (DBConnection.t, list -> term)

@doc """
Receives options and returns `DBConnection` supervisor child
specification.
Expand Down Expand Up @@ -94,4 +97,18 @@ defmodule Ecto.Adapters.SQL.Connection do
Receives a DDL command and returns a query that executes it.
"""
@callback execute_ddl(command :: Ecto.Adapter.Migration.command) :: String.t | [iodata]

## GenStage

@doc """
Start and link a `GenStage` producer that streams the result of a query.
"""
@callback start_producer(pool :: GenServer.server, statement :: prepared, params :: [term], options :: Keyword.t) ::
GenServer.on_start

@doc """
Start and link a `GenStage` consumer that runs a transaction for the events.
"""
@callback start_consumer(pool :: GenServer.server, transaction, options :: Keyword.t) ::
GenServer.on_start
end
13 changes: 7 additions & 6 deletions lib/ecto/adapters/sql/stream.ex
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
defmodule Ecto.Adapters.SQL.Stream do
@moduledoc false

defstruct [:repo, :statement, :params, :mapper, :opts]
defstruct [:repo, :statement, :params, :mapper, :flat_map, :opts]

def __build__(repo, statement, params, mapper, opts) do
%__MODULE__{repo: repo, statement: statement, params: params, mapper: mapper,
opts: opts}
def __build__(repo, statement, params, mapper, flat_map, opts) do
%__MODULE__{repo: repo, statement: statement, params: params,
mapper: mapper, flat_map: flat_map, opts: opts}
end
end

Expand All @@ -16,8 +16,9 @@ defimpl Enumerable, for: Ecto.Adapters.SQL.Stream do

def reduce(stream, acc, fun) do
%Ecto.Adapters.SQL.Stream{repo: repo, statement: statement, params: params,
mapper: mapper, opts: opts} = stream
Ecto.Adapters.SQL.reduce(repo, statement, params, mapper, opts, acc, fun)
mapper: mapper, flat_map: flat_map,
opts: opts} = stream
Ecto.Adapters.SQL.reduce(repo, statement, params, mapper, flat_map, opts, acc, fun)
end
end

Expand Down
Loading