From 018cc65e711ec799588e97a4847b1d4423e426a9 Mon Sep 17 00:00:00 2001 From: James Fish Date: Mon, 10 Apr 2017 23:00:01 +0100 Subject: [PATCH 1/5] Introduce Ecto.Adapters.SQL.Stage --- integration_test/pg/all_test.exs | 1 + integration_test/sql/stage.exs | 70 +++++++++++++ lib/ecto/adapters/sql.ex | 25 +++++ lib/ecto/adapters/sql/stage.ex | 164 +++++++++++++++++++++++++++++++ mix.exs | 4 +- mix.lock | 4 +- 6 files changed, 266 insertions(+), 2 deletions(-) create mode 100644 integration_test/sql/stage.exs create mode 100644 lib/ecto/adapters/sql/stage.ex diff --git a/integration_test/pg/all_test.exs b/integration_test/pg/all_test.exs index 2184f29084..24a9398413 100644 --- a/integration_test/pg/all_test.exs +++ b/integration_test/pg/all_test.exs @@ -3,6 +3,7 @@ Code.require_file "../sql/lock.exs", __DIR__ Code.require_file "../sql/migration.exs", __DIR__ Code.require_file "../sql/sandbox.exs", __DIR__ Code.require_file "../sql/sql.exs", __DIR__ +Code.require_file "../sql/stage.exs", __DIR__ Code.require_file "../sql/stream.exs", __DIR__ Code.require_file "../sql/subquery.exs", __DIR__ Code.require_file "../sql/transaction.exs", __DIR__ diff --git a/integration_test/sql/stage.exs b/integration_test/sql/stage.exs new file mode 100644 index 0000000000..5eb6f9c389 --- /dev/null +++ b/integration_test/sql/stage.exs @@ -0,0 +1,70 @@ +defmodule Ecto.Integration.StageTest do + use Ecto.Integration.Case, async: true + + alias Ecto.Integration.TestRepo + alias Ecto.Integration.Post + alias Ecto.Integration.Comment + alias Ecto.Adapters.SQL.Stage + import Ecto.Query + + test "stream empty" do + {:ok, stage} = Stage.stream(TestRepo, Post) + assert to_list(stage) === [] + + {:ok, stage} = Stage.stream(TestRepo, from p in Post) + assert to_list(stage) == [] + end + + test "stream without schema" do + %Post{} = TestRepo.insert!(%Post{title: "title1"}) + %Post{} = TestRepo.insert!(%Post{title: "title2"}) + + query = from(p in "posts", order_by: p.title, select: p.title) + {:ok, stage} = Stage.stream(TestRepo, query) + + assert to_list(stage) == ["title1", "title2"] + end + + test "stream with assoc" do + p1 = TestRepo.insert!(%Post{title: "1"}) + + %Comment{id: cid1} = TestRepo.insert!(%Comment{text: "1", post_id: p1.id}) + %Comment{id: cid2} = TestRepo.insert!(%Comment{text: "2", post_id: p1.id}) + + {:ok, stage} = Stage.stream(TestRepo, Ecto.assoc(p1, :comments)) + assert [c1, c2] = to_list(stage) + + assert c1.id == cid1 + assert c2.id == cid2 + end + + test "stream with preload" do + p1 = TestRepo.insert!(%Post{title: "1"}) + p2 = TestRepo.insert!(%Post{title: "2"}) + TestRepo.insert!(%Post{title: "3"}) + + %Comment{id: cid1} = TestRepo.insert!(%Comment{text: "1", post_id: p1.id}) + %Comment{id: cid2} = TestRepo.insert!(%Comment{text: "2", post_id: p1.id}) + %Comment{id: cid3} = TestRepo.insert!(%Comment{text: "3", post_id: p2.id}) + %Comment{id: cid4} = TestRepo.insert!(%Comment{text: "4", post_id: p2.id}) + + query = from(p in Post, preload: [:comments], select: p) + {:ok, stage} = Stage.stream(TestRepo, query, [max_rows: 2]) + + assert [p1, p2, p3] = stage |> to_list() |> sort_by_id() + + assert [%Comment{id: ^cid1}, %Comment{id: ^cid2}] = p1.comments |> sort_by_id + assert [%Comment{id: ^cid3}, %Comment{id: ^cid4}] = p2.comments |> sort_by_id + assert [] = p3.comments + end + + defp to_list(stage) do + stage + |> Flow.from_stage() + |> Enum.to_list() + end + + defp sort_by_id(values) do + Enum.sort_by(values, &(&1.id)) + end +end diff --git a/lib/ecto/adapters/sql.ex b/lib/ecto/adapters/sql.ex index 0fd9988a7c..82d90a71fb 100644 --- a/lib/ecto/adapters/sql.ex +++ b/lib/ecto/adapters/sql.ex @@ -627,6 +627,31 @@ defmodule Ecto.Adapters.SQL do end end + @doc false + def stage(repo, type, start, handle, stop, opts \\ []) do + {repo_mod, pool, default_opts} = lookup_pool(repo) + default_opts = + default_opts + |> Keyword.delete(:name) + |> Keyword.put_new(:caller, self()) + opts = with_log(repo_mod, [], opts ++ default_opts) + start = + fn(conn) -> + put_conn(pool, conn) + start.() + end + handle = fn(_, arg, state) -> handle.(arg, state) end + stop = + fn(_, reason, state) -> + try do + stop.(reason, state) + after + delete_conn(pool) + end + end + DBConnection.Stage.start_link(pool, type, start, handle, stop, opts) + end + ## Log defp with_log(repo, params, opts) do diff --git a/lib/ecto/adapters/sql/stage.ex b/lib/ecto/adapters/sql/stage.ex new file mode 100644 index 0000000000..71b7d8e350 --- /dev/null +++ b/lib/ecto/adapters/sql/stage.ex @@ -0,0 +1,164 @@ +defmodule Ecto.Adapters.SQL.Stage do + @moduledoc """ + A `GenStage` process that encapsulates a SQL transaction. + """ + + @doc """ + Start link a `GenStage` process that will run a transaction for its duration. + + The first argument is the pool, the second argument is the `GenStage` type, + the third argument is the start function, the fourth argument is the handle + function, the fifth argument is the stop function and the optional sixth + argument are the options. + + The start function is a o-arity anonymous function. This is called after the + transaction begins but before `start_link/6` returns. It should return the + `state` or call `MyRepo.rollback/1` to stop the `GenStage`. + + The handle function is a 2-arity anonymous function. If the `GenStage` type is + a `:producer`, then the first argument is the `demand` from a `GenStage` + `handle_demand` callback. Otherwise the first argument is the events from a + `GenStage` `handle_events` callback. The second argument is the state. This + function returns a 2-tuple, with first element as events (empty list for + `:consumer`) and second element as the `state`. This function can roll back + and stop the `GenStage` using `MyRepo.rollback/1`. + + The stop function is a 2-arity anonymous function. The first argument is the + terminate reason and the second argument is the `state`. This function will + only be called if connection is alive and the transaction has not been rolled + back. If this function returns the transaction is committed. This function can + roll back and stop the `GenStage` using `MyRepo.rollback/1`. + + The `GenStage` process will behave like a `Flow` stage: + + * It will stop with reason `:normal` when the last consumer cancels + * It will notify consumers that it is done when all producers have cancelled + or notified that they are done or halted + * It will cancel new and remaining producers when all producers have + notified that they are done or halted and it is a `:consumer` + * It will not send demand to new producers when all producers have notified + that they are done or halted and it is a `:consumer_producer` + + ### Options + + * `:name` - A name to register the started process (see the `:name` option + in `GenServer.start_link/3`) + + See the "Shared options" section at the `Ecto.Repo` documentation. All options + are passed to the `GenStage` on init. + + ### Example + + start = fn() -> Post end + handle = + fn(entries, schema) -> + MyRepo.insert_all(schema, entries) + {[], schema} + end + stop = + fn + :normal, _ -> :ok + reason, _ -> MyRepo.rollback(reason) + end + Ecto.Adapters.SQL.Stage.start_link(MyRepo, :consumer, start, handle, stop) + """ + @spec start_link(repo :: module, :producer, + start :: (() -> state), + handle_demand :: ((demand :: pos_integer, state) -> {[any], state}), + stop :: ((reason :: any, state) -> any), opts :: Keyword.t) :: + GenServer.on_start when state: var + @spec start_link(repo :: module, :producer_consumer, + start :: (() -> state), + handle_events :: (([any], state) -> {[any], state}), + stop :: ((reason :: any, state) -> any), opts :: Keyword.t) :: + GenServer.on_start when state: var + @spec start_link(repo :: module, :consumer, + start :: (() -> state), + handle_events :: (([any], state) -> {[], state}), + stop :: ((reason :: any, state) -> any), opts :: Keyword.t) :: + GenServer.on_start when state: var + def start_link(repo, type, start, handle, stop, opts \\ []) do + Ecto.Adapters.SQL.stage(repo, type, start, handle, stop, opts) + end + + @doc """ + Starts a `GenStage` producer that emits all entries from the data store + matching the given query. SQL adapters, such as Postgres and MySQL, will use + a separate transaction to enumerate the stream. + + May raise `Ecto.QueryError` if query validation fails. + + ## Options + + * `:prefix` - The prefix to run the query on (such as the schema path + in Postgres or the database in MySQL). This overrides the prefix set + in the query + + * `:max_rows` - The number of rows to load from the database as we stream. + It is supported at least by Postgres and MySQL and defaults to 500. + + See the "Shared options" section at the `Ecto.Repo` documentation. + + ## Example + + # Print all post titles + query = from p in Post, + select: p.title + {:ok, stage} = Ecto.Adapters.SQL.stream(MyRepo, query) + stage + |> Flow.from_stage() + |> Flow.each(&IO.inspect/1) + |> Flow.start_link() + """ + + @callback stream(repo :: module, queryable :: Ecto.Query.t, opts :: Keyword.t) :: + GenServer.on_start() + def stream(repo, queryable, opts \\ []) do + stream = apply(repo, :stream, [queryable, opts]) + start = + fn() -> + acc = {:suspend, {0, []}} + {:suspended, _, cont} = Enumerable.reduce(stream, acc, &stream_reduce/2) + {repo, :cont, cont} + end + start_link(repo, :producer, start, &stream_handle/2, &stream_stop/2, opts) + end + + ## Helpers + + defp stream_reduce(v, {1, acc}) do + {:suspend, {0, [v | acc]}} + end + defp stream_reduce(v, {n, acc}) do + {:cont, {n-1, [v | acc]}} + end + + defp stream_handle(n, {repo, :cont, cont}) when n > 0 do + case cont.({:cont, {n, []}}) do + {:suspended, {0, acc}, cont} -> + {Enum.reverse(acc), {repo, :cont, cont}} + {status, {_, acc}} when status in [:halted, :done] -> + GenStage.async_notify(self(), {:producer, status}) + {Enum.reverse(acc), {repo, status}} + end + end + defp stream_handle(_, {_repo, status} = state) do + GenStage.async_notify(self(), {:producer, status}) + {[], state} + end + + defp stream_stop(reason, {repo, :cont, cont}) do + _ = cont.({:halt, {0, []}}) + stream_stop(repo, reason) + end + defp stream_stop(reason, {repo, status}) when status in [:halted, :done] do + stream_stop(repo, reason) + end + + defp stream_stop(_, :normal) do + :ok + end + defp stream_stop(repo, reason) do + apply(repo, :rollback, [reason]) + end +end diff --git a/mix.exs b/mix.exs index 4caf6af410..225b657418 100644 --- a/mix.exs +++ b/mix.exs @@ -43,7 +43,9 @@ defmodule Ecto.Mixfile do {:decimal, "~> 1.2"}, # Drivers - {:db_connection, "~> 1.1", optional: true}, + {:db_connection, "~> 1.1", github: "elixir-ecto/db_connection", branch: "jf-stream_stage", override: true, optional: true}, + {:gen_stage, "~> 0.11", optional: true}, + {:flow, "~> 0.11", optional: true}, {:postgrex, "~> 0.13.0", optional: true}, {:mariaex, "~> 0.8.0", optional: true}, diff --git a/mix.lock b/mix.lock index 99e7bcf1d9..dd78ea41f6 100644 --- a/mix.lock +++ b/mix.lock @@ -1,9 +1,11 @@ %{"backoff": {:hex, :backoff, "1.1.1"}, "connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], []}, - "db_connection": {:hex, :db_connection, "1.1.2", "2865c2a4bae0714e2213a0ce60a1b12d76a6efba0c51fbda59c9ab8d1accc7a8", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, optional: false]}, {:poolboy, "~> 1.5", [hex: :poolboy, optional: true]}, {:sbroker, "~> 1.0", [hex: :sbroker, optional: true]}]}, + "db_connection": {:git, "https://github.com/elixir-ecto/db_connection.git", "392affacc4a730d748bf95d7261de9e8a9709fe2", [branch: "jf-stream_stage"]}, "decimal": {:hex, :decimal, "1.3.1", "157b3cedb2bfcb5359372a7766dd7a41091ad34578296e951f58a946fcab49c6", [:mix], []}, "earmark": {:hex, :earmark, "1.0.3", "89bdbaf2aca8bbb5c97d8b3b55c5dd0cff517ecc78d417e87f1d0982e514557b", [:mix], []}, "ex_doc": {:hex, :ex_doc, "0.14.5", "c0433c8117e948404d93ca69411dd575ec6be39b47802e81ca8d91017a0cf83c", [:mix], [{:earmark, "~> 1.0", [hex: :earmark, optional: false]}]}, + "flow": {:hex, :flow, "0.11.1", "cbc35a0236520cc5fec7b5863cd8431cb1e77297c5c9119055676355eb1fb5a6", [:mix], [{:gen_stage, "~> 0.11.0", [hex: :gen_stage, optional: false]}]}, + "gen_stage": {:hex, :gen_stage, "0.11.0", "943bdfa85c75fa624e0a36a9d135baad20a523be040178f5a215444b45c66ea4", [:mix], []}, "inch_ex": {:hex, :inch_ex, "0.5.5", "b63f57e281467bd3456461525fdbc9e158c8edbe603da6e3e4671befde796a3d", [:mix], [{:poison, "~> 1.5 or ~> 2.0 or ~> 3.0", [hex: :poison, optional: false]}]}, "mariaex": {:hex, :mariaex, "0.8.2", "a9ee64a02fd72579f844934b4cbecca9566593e499125edf6032c58f9d50b5f9", [:mix], [{:db_connection, "~> 1.1", [hex: :db_connection, optional: false]}, {:decimal, "~> 1.0", [hex: :decimal, optional: false]}]}, "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], []}, From 57d2a19d32f36affc7976073a2acf888f8f12aa9 Mon Sep 17 00:00:00 2001 From: James Fish Date: Wed, 12 Apr 2017 01:00:38 +0100 Subject: [PATCH 2/5] Split SQL stage start_link to 3 functions --- lib/ecto/adapters/sql.ex | 4 +- lib/ecto/adapters/sql/stage.ex | 172 +++++++++++++++++++++------------ mix.lock | 2 +- 3 files changed, 115 insertions(+), 63 deletions(-) diff --git a/lib/ecto/adapters/sql.ex b/lib/ecto/adapters/sql.ex index 82d90a71fb..8a834e7aba 100644 --- a/lib/ecto/adapters/sql.ex +++ b/lib/ecto/adapters/sql.ex @@ -628,7 +628,7 @@ defmodule Ecto.Adapters.SQL do end @doc false - def stage(repo, type, start, handle, stop, opts \\ []) do + def stage(fun, repo, start, handle, stop, opts) do {repo_mod, pool, default_opts} = lookup_pool(repo) default_opts = default_opts @@ -649,7 +649,7 @@ defmodule Ecto.Adapters.SQL do delete_conn(pool) end end - DBConnection.Stage.start_link(pool, type, start, handle, stop, opts) + fun.(pool, start, handle, stop, opts) end ## Log diff --git a/lib/ecto/adapters/sql/stage.ex b/lib/ecto/adapters/sql/stage.ex index 71b7d8e350..8740ce0555 100644 --- a/lib/ecto/adapters/sql/stage.ex +++ b/lib/ecto/adapters/sql/stage.ex @@ -1,84 +1,137 @@ defmodule Ecto.Adapters.SQL.Stage do @moduledoc """ A `GenStage` process that encapsulates a SQL transaction. + + ### Options + + * `:name` - A name to register the started process (see the `:name` option + in `GenServer.start_link/3`) + + See the "Shared options" section at the `Ecto.Repo` documentation. All options + are passed to the `GenStage` on init. """ @doc """ - Start link a `GenStage` process that will run a transaction for its duration. + Start link a `GenStage` producer that will run a transaction for its duration. - The first argument is the pool, the second argument is the `GenStage` type, - the third argument is the start function, the fourth argument is the handle - function, the fifth argument is the stop function and the optional sixth - argument are the options. + The first argument is the repo, the second argument is the start function, + the third argument is the handle demand function, the fourth argument is the + stop function and the optional fiftth argument are the options. - The start function is a o-arity anonymous function. This is called after the - transaction begins but before `start_link/6` returns. It should return the - `state` or call `MyRepo.rollback/1` to stop the `GenStage`. + The start function is a 0-arity anonymous function. This is called after the + transaction begins but before `producer/5` returns. It should return the + accumulator or call `repo.rollback/1` to stop the `GenStage`. - The handle function is a 2-arity anonymous function. If the `GenStage` type is - a `:producer`, then the first argument is the `demand` from a `GenStage` - `handle_demand` callback. Otherwise the first argument is the events from a - `GenStage` `handle_events` callback. The second argument is the state. This - function returns a 2-tuple, with first element as events (empty list for - `:consumer`) and second element as the `state`. This function can roll back - and stop the `GenStage` using `MyRepo.rollback/1`. + The handle demand function is a 2-arity anonymous function. The first argument + is the `demand`, and the second argument is the accumulator. This function + returns a 2-tuple, with first element as list of events to fulfil the demand + and second element as the accumulator. If the producer has emitted all events + (and so not fulfilled demand) it should call + `GenStage.async_notify(self(), {:producer, :done | :halted}` to signal to + consumers that it has finished. Also this function can rollback and stop the + `GenStage` using `repo.rollback/1`. The stop function is a 2-arity anonymous function. The first argument is the - terminate reason and the second argument is the `state`. This function will + terminate reason and the third argument is the accumulator. This function will only be called if connection is alive and the transaction has not been rolled back. If this function returns the transaction is committed. This function can - roll back and stop the `GenStage` using `MyRepo.rollback/1`. + rollback and stop the `GenStage` using `repo.rollback/1`. + + For options see "Options" in the module documentation. + + The `GenStage` process will behave like a `Flow` stage: + + * It will stop with reason `:normal` when the last consumer cancels + """ + @spec producer(module, start :: (() -> acc), + handle_demand :: ((demand :: pos_integer, acc) -> {[any], acc}), + stop :: ((reason :: any, acc) -> any), Keyword.t) :: + GenServer.on_start when acc: var + def producer(repo, start, handle_demand, stop, opts \\ []) do + fun = &DBConnection.Stage.producer/5 + Ecto.Adapters.SQL.stage(fun, repo, start, handle_demand, stop, opts) + end + + @doc """ + Start link a `GenStage` producer consumer that will run a transaction for its + duration. + + The first argument is the repo, the second argument is the start function, + the third argument is the handle events function, the fourth argument is the + stop function and the optional fiftth argument are the options. + + The start function is a 0-arity anonymous function. This is called after the + transaction begins but before `consumer_producer/5` returns. It should return + the accumulator or call `repo.rollback/1` to stop the `GenStage`. + + The handle events function is a 2-arity anonymous function. The first argument + is a list of incoming events, and the second argument is the accumulator. This + function returns a 2-tuple, with first element as list of outgoing events and + second element as the accumulator. Also this function can rollback and stop + the `GenStage` using `repo.rollback/1`. + + The stop function is a 2-arity anonymous function. The first argument is the + terminate reason and the second argument is the accumulator. This function + will only be called if connection is alive and the transaction has not been + rolled back. If this function returns the transaction is committed. This + function can rollback and stop the `GenStage` using `repo.rollback/1`. + + For options see "Options" in the module documentation. The `GenStage` process will behave like a `Flow` stage: * It will stop with reason `:normal` when the last consumer cancels * It will notify consumers that it is done when all producers have cancelled or notified that they are done or halted - * It will cancel new and remaining producers when all producers have - notified that they are done or halted and it is a `:consumer` * It will not send demand to new producers when all producers have notified - that they are done or halted and it is a `:consumer_producer` + that they are done or halted + """ + @spec producer_consumer(repo :: module, start :: (() -> acc), + handle_events :: ((events_in :: [any], acc) -> {events_out :: [any], acc}), + stop :: ((reason :: any, acc) -> any), Keyword.t) :: + GenServer.on_start when acc: var + def producer_consumer(repo, start, handle_events, stop, opts \\ []) do + fun = &DBConnection.Stage.producer_consumer/5 + Ecto.Adapters.SQL.stage(fun, repo, start, handle_events, stop, opts) + end - ### Options + @doc """ + Start link a `GenStage` consumer that will run a transaction for its duration. - * `:name` - A name to register the started process (see the `:name` option - in `GenServer.start_link/3`) + The first argument is the repo, the second argument is the start function, + the third argument is the handle events function, the fourth argument is the + stop function and the optional fiftth argument are the options. - See the "Shared options" section at the `Ecto.Repo` documentation. All options - are passed to the `GenStage` on init. + The start function is a 0-arity anonymous function. This is called after the + transaction begins but before `consumer/5` returns. It should return the + accumulator or call `repo.rollback/1` to stop the `GenStage`. - ### Example - - start = fn() -> Post end - handle = - fn(entries, schema) -> - MyRepo.insert_all(schema, entries) - {[], schema} - end - stop = - fn - :normal, _ -> :ok - reason, _ -> MyRepo.rollback(reason) - end - Ecto.Adapters.SQL.Stage.start_link(MyRepo, :consumer, start, handle, stop) + The handle events function is a 2-arity anonymous function. The first argument + is the list of events, and the second argument is the accumulator. This + function returns a 2-tuple, with first element is an empty list (as no + outgoing events) and second element as the accumulator. Also this function can + rollback and stop the `GenStage` using `repo.rollback/1`. + + The stop function is a 2-arity anonymous function. The first argument is the + terminate reason and the second argument is the accumulator. This function + will only be called if connection is alive and the transaction has not been + rolled back. If this function returns the transaction is committed. This + function can rollback and stop the `GenStage` using `repo.rollback/1`. + + See the "Shared options" section at the `Ecto.Repo` documentation. + + The `GenStage` process will behave like a `Flow` stage: + + * It will cancel new and remaining producers when all producers have + notified that they are done or halted and it is a `:consumer` """ - @spec start_link(repo :: module, :producer, - start :: (() -> state), - handle_demand :: ((demand :: pos_integer, state) -> {[any], state}), - stop :: ((reason :: any, state) -> any), opts :: Keyword.t) :: - GenServer.on_start when state: var - @spec start_link(repo :: module, :producer_consumer, - start :: (() -> state), - handle_events :: (([any], state) -> {[any], state}), - stop :: ((reason :: any, state) -> any), opts :: Keyword.t) :: - GenServer.on_start when state: var - @spec start_link(repo :: module, :consumer, - start :: (() -> state), - handle_events :: (([any], state) -> {[], state}), - stop :: ((reason :: any, state) -> any), opts :: Keyword.t) :: - GenServer.on_start when state: var - def start_link(repo, type, start, handle, stop, opts \\ []) do - Ecto.Adapters.SQL.stage(repo, type, start, handle, stop, opts) + @spec consumer(repo :: module, start :: (() -> acc), + handle_events :: ((events_in :: [any], acc) -> {[], acc}), + stop :: ((reason :: any, acc) -> any), Keyword.t) :: + GenServer.on_start when acc: var + def consumer(pool, start, handle_events, stop, opts \\ []) do + fun = &DBConnection.Stage.consumer/5 + Ecto.Adapters.SQL.stage(fun, pool, start, handle_events, stop, opts) end @doc """ @@ -97,7 +150,7 @@ defmodule Ecto.Adapters.SQL.Stage do * `:max_rows` - The number of rows to load from the database as we stream. It is supported at least by Postgres and MySQL and defaults to 500. - See the "Shared options" section at the `Ecto.Repo` documentation. + For more options see "Options" in the module documentation. ## Example @@ -110,8 +163,7 @@ defmodule Ecto.Adapters.SQL.Stage do |> Flow.each(&IO.inspect/1) |> Flow.start_link() """ - - @callback stream(repo :: module, queryable :: Ecto.Query.t, opts :: Keyword.t) :: + @spec stream(repo :: module, queryable :: Ecto.Query.t, opts :: Keyword.t) :: GenServer.on_start() def stream(repo, queryable, opts \\ []) do stream = apply(repo, :stream, [queryable, opts]) @@ -121,7 +173,7 @@ defmodule Ecto.Adapters.SQL.Stage do {:suspended, _, cont} = Enumerable.reduce(stream, acc, &stream_reduce/2) {repo, :cont, cont} end - start_link(repo, :producer, start, &stream_handle/2, &stream_stop/2, opts) + producer(repo, start, &stream_handle/2, &stream_stop/2, opts) end ## Helpers diff --git a/mix.lock b/mix.lock index dd78ea41f6..14e48d5993 100644 --- a/mix.lock +++ b/mix.lock @@ -1,6 +1,6 @@ %{"backoff": {:hex, :backoff, "1.1.1"}, "connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], []}, - "db_connection": {:git, "https://github.com/elixir-ecto/db_connection.git", "392affacc4a730d748bf95d7261de9e8a9709fe2", [branch: "jf-stream_stage"]}, + "db_connection": {:git, "https://github.com/elixir-ecto/db_connection.git", "73238baf9778b1263e108a0df9198aade2921bea", [branch: "jf-stream_stage"]}, "decimal": {:hex, :decimal, "1.3.1", "157b3cedb2bfcb5359372a7766dd7a41091ad34578296e951f58a946fcab49c6", [:mix], []}, "earmark": {:hex, :earmark, "1.0.3", "89bdbaf2aca8bbb5c97d8b3b55c5dd0cff517ecc78d417e87f1d0982e514557b", [:mix], []}, "ex_doc": {:hex, :ex_doc, "0.14.5", "c0433c8117e948404d93ca69411dd575ec6be39b47802e81ca8d91017a0cf83c", [:mix], [{:earmark, "~> 1.0", [hex: :earmark, optional: false]}]}, From cf908360ea16eccebbe6e291fb57e065fd7fb1a7 Mon Sep 17 00:00:00 2001 From: James Fish Date: Sun, 28 May 2017 22:49:23 +0100 Subject: [PATCH 3/5] Update Postgrex/DBConnection/GenStage/Flow --- integration_test/pg/copy_test.exs | 27 ++- integration_test/pg/test_helper.exs | 4 + integration_test/sql/stage.exs | 12 +- lib/ecto/adapters/postgres/connection.ex | 5 + lib/ecto/adapters/sql.ex | 128 +++++++++----- lib/ecto/adapters/sql/connection.ex | 7 + lib/ecto/adapters/sql/stage.ex | 216 ----------------------- lib/ecto/adapters/sql/stream.ex | 13 +- lib/ecto/repo/queryable.ex | 25 ++- mix.exs | 6 +- mix.lock | 26 +-- 11 files changed, 173 insertions(+), 296 deletions(-) delete mode 100644 lib/ecto/adapters/sql/stage.ex diff --git a/integration_test/pg/copy_test.exs b/integration_test/pg/copy_test.exs index ec1d7b3bd0..171569c247 100644 --- a/integration_test/pg/copy_test.exs +++ b/integration_test/pg/copy_test.exs @@ -4,7 +4,7 @@ defmodule Ecto.Integration.CopyTest do alias Ecto.Integration.TestRepo alias Ecto.Integration.Post - test "copy to and from table" do + test "stream copy to and from table" do read = Ecto.Adapters.SQL.stream(TestRepo, "COPY posts TO STDOUT") write = Ecto.Adapters.SQL.stream(TestRepo, "COPY posts FROM STDIN") @@ -19,4 +19,29 @@ defmodule Ecto.Integration.CopyTest do assert TestRepo.all(Post) == [one, two] end end + + test "stage copy to and from table" do + one = TestRepo.insert!(%Post{title: "one"}) + two = TestRepo.insert!(%Post{title: "two"}) + + read = "COPY posts TO STDOUT" + {:ok, producer} = Ecto.Adapters.SQL.start_stage(TestRepo, read, [], []) + + data = + [{producer, cancel: :transient}] + |> GenStage.stream() + |> Enum.to_list() + + assert TestRepo.delete_all(Post) == {2, nil} + + write = "COPY posts FROM STDIN" + {:ok, consumer} = Ecto.Adapters.SQL.start_stage(TestRepo, write, [], [stage_module: Postgrex.CopyConsumer]) + + {:ok, _} = + data + |> Flow.from_enumerable() + |> Flow.into_stages([consumer]) + + assert TestRepo.all(Post) == [one, two] + end end diff --git a/integration_test/pg/test_helper.exs b/integration_test/pg/test_helper.exs index 13871962f7..b6efcf4503 100644 --- a/integration_test/pg/test_helper.exs +++ b/integration_test/pg/test_helper.exs @@ -34,6 +34,10 @@ defmodule Ecto.Integration.TestRepo do use Ecto.Integration.Repo, otp_app: :ecto end +defmodule Ecto.Integration.TestStage do + use Ecto.Repo.Stage, repo: TestRepo +end + # Pool repo for non-async tests alias Ecto.Integration.PoolRepo diff --git a/integration_test/sql/stage.exs b/integration_test/sql/stage.exs index 5eb6f9c389..bdcb476323 100644 --- a/integration_test/sql/stage.exs +++ b/integration_test/sql/stage.exs @@ -2,16 +2,16 @@ defmodule Ecto.Integration.StageTest do use Ecto.Integration.Case, async: true alias Ecto.Integration.TestRepo + alias Ecto.Integration.TestStage alias Ecto.Integration.Post alias Ecto.Integration.Comment - alias Ecto.Adapters.SQL.Stage import Ecto.Query test "stream empty" do - {:ok, stage} = Stage.stream(TestRepo, Post) + {:ok, stage} = TestStage.start_link(Post) assert to_list(stage) === [] - {:ok, stage} = Stage.stream(TestRepo, from p in Post) + {:ok, stage} = TestStage.start_link(from p in Post) assert to_list(stage) == [] end @@ -20,7 +20,7 @@ defmodule Ecto.Integration.StageTest do %Post{} = TestRepo.insert!(%Post{title: "title2"}) query = from(p in "posts", order_by: p.title, select: p.title) - {:ok, stage} = Stage.stream(TestRepo, query) + {:ok, stage} = TestStage.start_link(query) assert to_list(stage) == ["title1", "title2"] end @@ -31,7 +31,7 @@ defmodule Ecto.Integration.StageTest do %Comment{id: cid1} = TestRepo.insert!(%Comment{text: "1", post_id: p1.id}) %Comment{id: cid2} = TestRepo.insert!(%Comment{text: "2", post_id: p1.id}) - {:ok, stage} = Stage.stream(TestRepo, Ecto.assoc(p1, :comments)) + {:ok, stage} = TestStage.start_link(Ecto.assoc(p1, :comments)) assert [c1, c2] = to_list(stage) assert c1.id == cid1 @@ -49,7 +49,7 @@ defmodule Ecto.Integration.StageTest do %Comment{id: cid4} = TestRepo.insert!(%Comment{text: "4", post_id: p2.id}) query = from(p in Post, preload: [:comments], select: p) - {:ok, stage} = Stage.stream(TestRepo, query, [max_rows: 2]) + {:ok, stage} = TestStage.start_link(query, [max_rows: 2]) assert [p1, p2, p3] = stage |> to_list() |> sort_by_id() diff --git a/lib/ecto/adapters/postgres/connection.ex b/lib/ecto/adapters/postgres/connection.ex index 4b7ca3abf3..3eb90a0d4a 100644 --- a/lib/ecto/adapters/postgres/connection.ex +++ b/lib/ecto/adapters/postgres/connection.ex @@ -18,6 +18,11 @@ if Code.ensure_loaded?(Postgrex) do |> Postgrex.child_spec() end + def stage_spec(pool, statement, params, opts) do + stage_mod = Keyword.get(opts, :stage_module, Postgrex.Producer) + Supervisor.Spec.worker(stage_mod, [pool, statement, params, opts]) + end + def to_constraints(%Postgrex.Error{postgres: %{code: :unique_violation, constraint: constraint}}), do: [unique: constraint] def to_constraints(%Postgrex.Error{postgres: %{code: :foreign_key_violation, constraint: constraint}}), diff --git a/lib/ecto/adapters/sql.ex b/lib/ecto/adapters/sql.ex index 8a834e7aba..119b90685d 100644 --- a/lib/ecto/adapters/sql.ex +++ b/lib/ecto/adapters/sql.ex @@ -69,8 +69,8 @@ defmodule Ecto.Adapters.SQL do end @doc false - def stream(repo, meta, query, params, process, opts) do - Ecto.Adapters.SQL.stream(repo, meta, query, params, process, opts) + def stream(repo, meta, query, params, process, flat_map, opts) do + Ecto.Adapters.SQL.stream(repo, meta, query, params, process, flat_map, opts) end @doc false @@ -136,8 +136,16 @@ defmodule Ecto.Adapters.SQL do :ok end + ## GenStage + + @doc false + def stage_spec(repo, meta, query, params, process, flat_map, opts) do + Ecto.Adapters.SQL.stage_spec(repo, meta, query, params, process, flat_map, opts) + end + defoverridable [prepare: 2, execute: 6, insert: 6, update: 6, delete: 4, insert_all: 7, - execute_ddl: 3, loaders: 2, dumpers: 2, autogenerate: 1, ensure_all_started: 2] + execute_ddl: 3, loaders: 2, dumpers: 2, autogenerate: 1, ensure_all_started: 2, + stage_spec: 7] end end @@ -477,55 +485,60 @@ defmodule Ecto.Adapters.SQL do """ @spec stream(Ecto.Repo.t, String.t, [term], Keyword.t) :: Enum.t def stream(repo, sql, params \\ [], opts \\ []) do - Ecto.Adapters.SQL.Stream.__build__(repo, sql, params, fn x -> x end, opts) + Ecto.Adapters.SQL.Stream.__build__(repo, sql, params, fn x -> x end, nil, opts) end @doc false - def stream(repo, meta, prepared, params, mapper, opts) do - do_stream(repo, meta, prepared, params, mapper, put_source(opts, meta)) + def stream(repo, meta, prepared, params, mapper, flat_map, opts) do + do_stream(repo, meta, prepared, params, mapper, flat_map, put_source(opts, meta)) end - def do_stream(repo, _meta, {:cache, _, {_, prepared}}, params, nil, opts) do - prepare_stream(repo, prepared, params, nil, opts) + def do_stream(repo, _meta, {:cache, _, {_, prepared}}, params, nil, flat_map, opts) do + prepare_stream(repo, prepared, params, nil, flat_map, opts) end - def do_stream(repo, %{fields: fields, sources: sources}, {:cache, _, {_, prepared}}, params, process, opts) do + def do_stream(repo, %{fields: fields, sources: sources}, {:cache, _, {_, prepared}}, params, process, flat_map, opts) do mapper = &process_row(&1, process, fields, sources) - prepare_stream(repo, prepared, params, mapper, opts) + prepare_stream(repo, prepared, params, mapper, flat_map, opts) end - def do_stream(repo, _, {:cached, _, {_, cached}}, params, nil, opts) do - prepare_stream(repo, String.Chars.to_string(cached), params, nil, opts) + def do_stream(repo, _, {:cached, _, {_, cached}}, params, nil, flat_map, opts) do + prepare_stream(repo, String.Chars.to_string(cached), params, nil, flat_map, opts) end - def do_stream(repo, %{fields: fields, sources: sources}, {:cached, _, {_, cached}}, params, process, opts) do + def do_stream(repo, %{fields: fields, sources: sources}, {:cached, _, {_, cached}}, params, process, flat_map, opts) do mapper = &process_row(&1, process, fields, sources) - prepare_stream(repo, String.Chars.to_string(cached), params, mapper, opts) + prepare_stream(repo, String.Chars.to_string(cached), params, mapper, flat_map, opts) end - def do_stream(repo, _meta, {:nocache, {_id, prepared}}, params, nil, opts) do - prepare_stream(repo, prepared, params, nil, opts) + def do_stream(repo, _meta, {:nocache, {_id, prepared}}, params, nil, flat_map, opts) do + prepare_stream(repo, prepared, params, nil, flat_map, opts) end - def do_stream(repo, %{fields: fields, sources: sources}, {:nocache, {_id, prepared}}, params, process, opts) do + def do_stream(repo, %{fields: fields, sources: sources}, {:nocache, {_id, prepared}}, params, process, flat_map, opts) do mapper = &process_row(&1, process, fields, sources) - prepare_stream(repo, prepared, params, mapper, opts) + prepare_stream(repo, prepared, params, mapper, flat_map, opts) end - defp prepare_stream(repo, prepared, params, mapper, opts) do - repo - |> Ecto.Adapters.SQL.Stream.__build__(prepared, params, mapper, opts) - |> Stream.map(fn(%{num_rows: nrows, rows: rows}) -> {nrows, rows} end) + defp prepare_stream(repo, prepared, params, mapper, flat_map, opts) do + Ecto.Adapters.SQL.Stream.__build__(repo, prepared, params, mapper, flat_map, opts) + end + + @doc false + def stream_mapper(conn, %{num_rows: nrows, rows: rows}, conn, flat_map) do + flat_map.({nrows, rows}) end @doc false - def reduce(repo, statement, params, mapper, opts, acc, fun) do + def reduce(repo, statement, params, mapper, flat_map, opts, acc, fun) do {repo_mod, pool, default_opts} = lookup_pool(repo) - opts = [decode_mapper: mapper] ++ with_log(repo, params, opts ++ default_opts) case get_conn(pool) do nil -> raise "cannot reduce stream outside of transaction" conn -> + stream_mapper = flat_map && {__MODULE__, :stream_mapper, [conn, flat_map]} + map_opts = [decode_mapper: mapper, stream_mapper: stream_mapper] + opts = map_opts ++ with_log(repo, params, opts ++ default_opts) apply(repo_mod.__sql__, :stream, [conn, statement, params, opts]) |> Enumerable.reduce(acc, fun) end @@ -627,29 +640,56 @@ defmodule Ecto.Adapters.SQL do end end + @doc """ + Return child specification for `GenStage` process running query. + """ + def stage_spec(repo, statement, params, opts) do + stage_spec(repo, statement, params, fn x -> x end, nil, opts) + end + + @doc """ + Start link `GenStage` process running query. + """ + def start_stage(repo, statement, params, opts) do + case stage_spec(repo, statement, params, opts) do + {_, {mod, fun, args}, _, _, _, _} -> + apply(mod, fun, args) + %{start: {mod, fun, args}} -> + apply(mod, fun, args) + end + end + @doc false - def stage(fun, repo, start, handle, stop, opts) do + def stage_spec(repo, meta, prepared, params, mapper, flat_map, opts) do + stream = stream(repo, meta, prepared, params, mapper, flat_map, opts) + %Ecto.Adapters.SQL.Stream{repo: repo, statement: statement, params: params, + mapper: mapper, flat_map: flat_map, opts: opts} = stream + stage_spec(repo, statement, params, mapper, flat_map, opts) + end + + defp stage_spec(repo, statement, params, mapper, flat_map, opts) do {repo_mod, pool, default_opts} = lookup_pool(repo) - default_opts = - default_opts - |> Keyword.delete(:name) - |> Keyword.put_new(:caller, self()) - opts = with_log(repo_mod, [], opts ++ default_opts) - start = - fn(conn) -> - put_conn(pool, conn) - start.() - end - handle = fn(_, arg, state) -> handle.(arg, state) end - stop = - fn(_, reason, state) -> - try do - stop.(reason, state) - after - delete_conn(pool) - end + stage_opts = Keyword.delete(default_opts, :name) + stream_mapper = flat_map && {__MODULE__, :stage_mapper, [pool, flat_map]} + map_opts = [decode_mapper: mapper, stream_mapper: stream_mapper] + opts = map_opts ++ with_log(repo, params, opts ++ stage_opts) + opts = Keyword.put_new(opts, :caller, self()) + apply(repo_mod.__sql__, :stage_spec, [pool, statement, params, opts]) + end + + @doc false + def stage_mapper(stream_conn, res, pool, flat_map) do + cur_conn = get_conn(pool) + try do + put_conn(pool, stream_conn) + stream_mapper(stream_conn, res, stream_conn, flat_map) + after + if cur_conn do + put_conn(pool, cur_conn) + else + delete_conn(pool) end - fun.(pool, start, handle, stop, opts) + end end ## Log diff --git a/lib/ecto/adapters/sql/connection.ex b/lib/ecto/adapters/sql/connection.ex index 9bd1dcce3c..25d441ea9e 100644 --- a/lib/ecto/adapters/sql/connection.ex +++ b/lib/ecto/adapters/sql/connection.ex @@ -15,6 +15,13 @@ defmodule Ecto.Adapters.SQL.Connection do """ @callback child_spec(options :: Keyword.t) :: {module, Keyword.t} + @doc """ + Receives `pool`, `statement`, `params` and `options` and returns `GenStage` + child specification + """ + @callback stage_spec(pool :: GenServer.server, statement :: prepared, params :: [term], options :: Keyword.t) :: + Supervisor.spec() + @doc """ Prepares and executes the given query with `DBConnection`. """ diff --git a/lib/ecto/adapters/sql/stage.ex b/lib/ecto/adapters/sql/stage.ex deleted file mode 100644 index 8740ce0555..0000000000 --- a/lib/ecto/adapters/sql/stage.ex +++ /dev/null @@ -1,216 +0,0 @@ -defmodule Ecto.Adapters.SQL.Stage do - @moduledoc """ - A `GenStage` process that encapsulates a SQL transaction. - - ### Options - - * `:name` - A name to register the started process (see the `:name` option - in `GenServer.start_link/3`) - - See the "Shared options" section at the `Ecto.Repo` documentation. All options - are passed to the `GenStage` on init. - """ - - @doc """ - Start link a `GenStage` producer that will run a transaction for its duration. - - The first argument is the repo, the second argument is the start function, - the third argument is the handle demand function, the fourth argument is the - stop function and the optional fiftth argument are the options. - - The start function is a 0-arity anonymous function. This is called after the - transaction begins but before `producer/5` returns. It should return the - accumulator or call `repo.rollback/1` to stop the `GenStage`. - - The handle demand function is a 2-arity anonymous function. The first argument - is the `demand`, and the second argument is the accumulator. This function - returns a 2-tuple, with first element as list of events to fulfil the demand - and second element as the accumulator. If the producer has emitted all events - (and so not fulfilled demand) it should call - `GenStage.async_notify(self(), {:producer, :done | :halted}` to signal to - consumers that it has finished. Also this function can rollback and stop the - `GenStage` using `repo.rollback/1`. - - The stop function is a 2-arity anonymous function. The first argument is the - terminate reason and the third argument is the accumulator. This function will - only be called if connection is alive and the transaction has not been rolled - back. If this function returns the transaction is committed. This function can - rollback and stop the `GenStage` using `repo.rollback/1`. - - For options see "Options" in the module documentation. - - The `GenStage` process will behave like a `Flow` stage: - - * It will stop with reason `:normal` when the last consumer cancels - """ - @spec producer(module, start :: (() -> acc), - handle_demand :: ((demand :: pos_integer, acc) -> {[any], acc}), - stop :: ((reason :: any, acc) -> any), Keyword.t) :: - GenServer.on_start when acc: var - def producer(repo, start, handle_demand, stop, opts \\ []) do - fun = &DBConnection.Stage.producer/5 - Ecto.Adapters.SQL.stage(fun, repo, start, handle_demand, stop, opts) - end - - @doc """ - Start link a `GenStage` producer consumer that will run a transaction for its - duration. - - The first argument is the repo, the second argument is the start function, - the third argument is the handle events function, the fourth argument is the - stop function and the optional fiftth argument are the options. - - The start function is a 0-arity anonymous function. This is called after the - transaction begins but before `consumer_producer/5` returns. It should return - the accumulator or call `repo.rollback/1` to stop the `GenStage`. - - The handle events function is a 2-arity anonymous function. The first argument - is a list of incoming events, and the second argument is the accumulator. This - function returns a 2-tuple, with first element as list of outgoing events and - second element as the accumulator. Also this function can rollback and stop - the `GenStage` using `repo.rollback/1`. - - The stop function is a 2-arity anonymous function. The first argument is the - terminate reason and the second argument is the accumulator. This function - will only be called if connection is alive and the transaction has not been - rolled back. If this function returns the transaction is committed. This - function can rollback and stop the `GenStage` using `repo.rollback/1`. - - For options see "Options" in the module documentation. - - The `GenStage` process will behave like a `Flow` stage: - - * It will stop with reason `:normal` when the last consumer cancels - * It will notify consumers that it is done when all producers have cancelled - or notified that they are done or halted - * It will not send demand to new producers when all producers have notified - that they are done or halted - """ - @spec producer_consumer(repo :: module, start :: (() -> acc), - handle_events :: ((events_in :: [any], acc) -> {events_out :: [any], acc}), - stop :: ((reason :: any, acc) -> any), Keyword.t) :: - GenServer.on_start when acc: var - def producer_consumer(repo, start, handle_events, stop, opts \\ []) do - fun = &DBConnection.Stage.producer_consumer/5 - Ecto.Adapters.SQL.stage(fun, repo, start, handle_events, stop, opts) - end - - @doc """ - Start link a `GenStage` consumer that will run a transaction for its duration. - - The first argument is the repo, the second argument is the start function, - the third argument is the handle events function, the fourth argument is the - stop function and the optional fiftth argument are the options. - - The start function is a 0-arity anonymous function. This is called after the - transaction begins but before `consumer/5` returns. It should return the - accumulator or call `repo.rollback/1` to stop the `GenStage`. - - The handle events function is a 2-arity anonymous function. The first argument - is the list of events, and the second argument is the accumulator. This - function returns a 2-tuple, with first element is an empty list (as no - outgoing events) and second element as the accumulator. Also this function can - rollback and stop the `GenStage` using `repo.rollback/1`. - - The stop function is a 2-arity anonymous function. The first argument is the - terminate reason and the second argument is the accumulator. This function - will only be called if connection is alive and the transaction has not been - rolled back. If this function returns the transaction is committed. This - function can rollback and stop the `GenStage` using `repo.rollback/1`. - - See the "Shared options" section at the `Ecto.Repo` documentation. - - The `GenStage` process will behave like a `Flow` stage: - - * It will cancel new and remaining producers when all producers have - notified that they are done or halted and it is a `:consumer` - """ - @spec consumer(repo :: module, start :: (() -> acc), - handle_events :: ((events_in :: [any], acc) -> {[], acc}), - stop :: ((reason :: any, acc) -> any), Keyword.t) :: - GenServer.on_start when acc: var - def consumer(pool, start, handle_events, stop, opts \\ []) do - fun = &DBConnection.Stage.consumer/5 - Ecto.Adapters.SQL.stage(fun, pool, start, handle_events, stop, opts) - end - - @doc """ - Starts a `GenStage` producer that emits all entries from the data store - matching the given query. SQL adapters, such as Postgres and MySQL, will use - a separate transaction to enumerate the stream. - - May raise `Ecto.QueryError` if query validation fails. - - ## Options - - * `:prefix` - The prefix to run the query on (such as the schema path - in Postgres or the database in MySQL). This overrides the prefix set - in the query - - * `:max_rows` - The number of rows to load from the database as we stream. - It is supported at least by Postgres and MySQL and defaults to 500. - - For more options see "Options" in the module documentation. - - ## Example - - # Print all post titles - query = from p in Post, - select: p.title - {:ok, stage} = Ecto.Adapters.SQL.stream(MyRepo, query) - stage - |> Flow.from_stage() - |> Flow.each(&IO.inspect/1) - |> Flow.start_link() - """ - @spec stream(repo :: module, queryable :: Ecto.Query.t, opts :: Keyword.t) :: - GenServer.on_start() - def stream(repo, queryable, opts \\ []) do - stream = apply(repo, :stream, [queryable, opts]) - start = - fn() -> - acc = {:suspend, {0, []}} - {:suspended, _, cont} = Enumerable.reduce(stream, acc, &stream_reduce/2) - {repo, :cont, cont} - end - producer(repo, start, &stream_handle/2, &stream_stop/2, opts) - end - - ## Helpers - - defp stream_reduce(v, {1, acc}) do - {:suspend, {0, [v | acc]}} - end - defp stream_reduce(v, {n, acc}) do - {:cont, {n-1, [v | acc]}} - end - - defp stream_handle(n, {repo, :cont, cont}) when n > 0 do - case cont.({:cont, {n, []}}) do - {:suspended, {0, acc}, cont} -> - {Enum.reverse(acc), {repo, :cont, cont}} - {status, {_, acc}} when status in [:halted, :done] -> - GenStage.async_notify(self(), {:producer, status}) - {Enum.reverse(acc), {repo, status}} - end - end - defp stream_handle(_, {_repo, status} = state) do - GenStage.async_notify(self(), {:producer, status}) - {[], state} - end - - defp stream_stop(reason, {repo, :cont, cont}) do - _ = cont.({:halt, {0, []}}) - stream_stop(repo, reason) - end - defp stream_stop(reason, {repo, status}) when status in [:halted, :done] do - stream_stop(repo, reason) - end - - defp stream_stop(_, :normal) do - :ok - end - defp stream_stop(repo, reason) do - apply(repo, :rollback, [reason]) - end -end diff --git a/lib/ecto/adapters/sql/stream.ex b/lib/ecto/adapters/sql/stream.ex index ad6be65234..b14637bcf6 100644 --- a/lib/ecto/adapters/sql/stream.ex +++ b/lib/ecto/adapters/sql/stream.ex @@ -1,11 +1,11 @@ defmodule Ecto.Adapters.SQL.Stream do @moduledoc false - defstruct [:repo, :statement, :params, :mapper, :opts] + defstruct [:repo, :statement, :params, :mapper, :flat_map, :opts] - def __build__(repo, statement, params, mapper, opts) do - %__MODULE__{repo: repo, statement: statement, params: params, mapper: mapper, - opts: opts} + def __build__(repo, statement, params, mapper, flat_map, opts) do + %__MODULE__{repo: repo, statement: statement, params: params, + mapper: mapper, flat_map: flat_map, opts: opts} end end @@ -16,8 +16,9 @@ defimpl Enumerable, for: Ecto.Adapters.SQL.Stream do def reduce(stream, acc, fun) do %Ecto.Adapters.SQL.Stream{repo: repo, statement: statement, params: params, - mapper: mapper, opts: opts} = stream - Ecto.Adapters.SQL.reduce(repo, statement, params, mapper, opts, acc, fun) + mapper: mapper, flat_map: flat_map, + opts: opts} = stream + Ecto.Adapters.SQL.reduce(repo, statement, params, mapper, flat_map, opts, acc, fun) end end diff --git a/lib/ecto/repo/queryable.ex b/lib/ecto/repo/queryable.ex index 0d8b71d414..cca20c29ae 100644 --- a/lib/ecto/repo/queryable.ex +++ b/lib/ecto/repo/queryable.ex @@ -41,7 +41,7 @@ defmodule Ecto.Repo.Queryable do |> Ecto.Queryable.to_query |> Ecto.Query.Planner.returning(true) |> attach_prefix(opts) - stream(:all, repo, adapter, query, opts) + lazy(&adapter.stream/7, :all, repo, adapter, query, opts) end def get(repo, adapter, queryable, id, opts) do @@ -109,6 +109,15 @@ defmodule Ecto.Repo.Queryable do execute(:delete_all, repo, adapter, query, opts) end + def stage_spec(repo, adapter, queryable, opts) when is_list(opts) do + query = + queryable + |> Ecto.Queryable.to_query + |> Ecto.Query.Planner.returning(true) + |> attach_prefix(opts) + lazy(&adapter.stage_spec/7, :all, repo, adapter, query, opts) + end + ## Helpers defp attach_prefix(query, opts) do @@ -137,25 +146,27 @@ defmodule Ecto.Repo.Queryable do end end - defp stream(operation, repo, adapter, query, opts) do + defp lazy(fun, operation, repo, adapter, query, opts) do {meta, prepared, params} = Planner.query(query, operation, repo, adapter, 0) case meta do %{fields: nil} -> - adapter.stream(repo, meta, prepared, params, nil, opts) - |> Stream.flat_map(fn({_, nil}) -> [] end) + flat_map = fn({_, nil}) -> [] end + fun.(repo, meta, prepared, params, nil, flat_map, opts) %{select: select, fields: fields, prefix: prefix, take: take, sources: sources, assocs: assocs, preloads: preloads} -> preprocess = preprocess(prefix, sources, adapter) - stream = adapter.stream(repo, meta, prepared, params, preprocess, opts) + postprocess = postprocess(select, fields, take) {_, take_0} = Map.get(take, 0, {:any, %{}}) - Stream.flat_map(stream, fn({_, rows}) -> + flat_map = fn({_, rows}) -> rows |> Ecto.Repo.Assoc.query(assocs, sources) |> Ecto.Repo.Preloader.query(repo, preloads, take_0, postprocess, opts) - end) + end + + fun.(repo, meta, prepared, params, preprocess, flat_map, opts) end end diff --git a/mix.exs b/mix.exs index 225b657418..ddfdbd93fa 100644 --- a/mix.exs +++ b/mix.exs @@ -44,9 +44,9 @@ defmodule Ecto.Mixfile do # Drivers {:db_connection, "~> 1.1", github: "elixir-ecto/db_connection", branch: "jf-stream_stage", override: true, optional: true}, - {:gen_stage, "~> 0.11", optional: true}, - {:flow, "~> 0.11", optional: true}, - {:postgrex, "~> 0.13.0", optional: true}, + {:gen_stage, "~> 0.11", github: "elixir-lang/gen_stage", branch: "jv-exit-signals", optional: true}, + {:flow, "~> 0.11", github: "elixir-lang/flow", branch: "jv-exit-signals", optional: true}, + {:postgrex, "~> 0.13.0", github: "elixir-ecto/postgrex", branch: "jf-stage", optional: true}, {:mariaex, "~> 0.8.0", optional: true}, # Optional diff --git a/mix.lock b/mix.lock index 14e48d5993..481ebbea29 100644 --- a/mix.lock +++ b/mix.lock @@ -1,14 +1,14 @@ %{"backoff": {:hex, :backoff, "1.1.1"}, - "connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], []}, - "db_connection": {:git, "https://github.com/elixir-ecto/db_connection.git", "73238baf9778b1263e108a0df9198aade2921bea", [branch: "jf-stream_stage"]}, - "decimal": {:hex, :decimal, "1.3.1", "157b3cedb2bfcb5359372a7766dd7a41091ad34578296e951f58a946fcab49c6", [:mix], []}, - "earmark": {:hex, :earmark, "1.0.3", "89bdbaf2aca8bbb5c97d8b3b55c5dd0cff517ecc78d417e87f1d0982e514557b", [:mix], []}, - "ex_doc": {:hex, :ex_doc, "0.14.5", "c0433c8117e948404d93ca69411dd575ec6be39b47802e81ca8d91017a0cf83c", [:mix], [{:earmark, "~> 1.0", [hex: :earmark, optional: false]}]}, - "flow": {:hex, :flow, "0.11.1", "cbc35a0236520cc5fec7b5863cd8431cb1e77297c5c9119055676355eb1fb5a6", [:mix], [{:gen_stage, "~> 0.11.0", [hex: :gen_stage, optional: false]}]}, - "gen_stage": {:hex, :gen_stage, "0.11.0", "943bdfa85c75fa624e0a36a9d135baad20a523be040178f5a215444b45c66ea4", [:mix], []}, - "inch_ex": {:hex, :inch_ex, "0.5.5", "b63f57e281467bd3456461525fdbc9e158c8edbe603da6e3e4671befde796a3d", [:mix], [{:poison, "~> 1.5 or ~> 2.0 or ~> 3.0", [hex: :poison, optional: false]}]}, - "mariaex": {:hex, :mariaex, "0.8.2", "a9ee64a02fd72579f844934b4cbecca9566593e499125edf6032c58f9d50b5f9", [:mix], [{:db_connection, "~> 1.1", [hex: :db_connection, optional: false]}, {:decimal, "~> 1.0", [hex: :decimal, optional: false]}]}, - "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], []}, - "poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [:rebar], []}, - "postgrex": {:hex, :postgrex, "0.13.0", "e101ab47d0725955c5c8830ae8812412992e02e4bd9db09e17abb0a5d82d09c7", [:mix], [{:connection, "~> 1.0", [hex: :connection, optional: false]}, {:db_connection, "~> 1.1", [hex: :db_connection, optional: false]}, {:decimal, "~> 1.0", [hex: :decimal, optional: false]}]}, - "sbroker": {:hex, :sbroker, "1.0.0", "28ff1b5e58887c5098539f236307b36fe1d3edaa2acff9d6a3d17c2dcafebbd0", [:rebar3], []}} + "connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [], [], "hexpm"}, + "db_connection": {:git, "https://github.com/elixir-ecto/db_connection.git", "9c6e4945112554bf40c240244c033f6dcc9c4a13", [branch: "jf-stream_stage"]}, + "decimal": {:hex, :decimal, "1.3.1", "157b3cedb2bfcb5359372a7766dd7a41091ad34578296e951f58a946fcab49c6", [:mix], [], "hexpm"}, + "earmark": {:hex, :earmark, "1.0.3", "89bdbaf2aca8bbb5c97d8b3b55c5dd0cff517ecc78d417e87f1d0982e514557b", [:mix], [], "hexpm"}, + "ex_doc": {:hex, :ex_doc, "0.14.5", "c0433c8117e948404d93ca69411dd575ec6be39b47802e81ca8d91017a0cf83c", [:mix], [{:earmark, "~> 1.0", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"}, + "flow": {:git, "https://github.com/elixir-lang/flow.git", "cfdc0d240aa990c5229c70e032595f2b063a1445", [branch: "jv-exit-signals"]}, + "gen_stage": {:git, "https://github.com/elixir-lang/gen_stage.git", "966d998c699dec58e0d183ddbaa2b783c46cd375", [branch: "jv-exit-signals"]}, + "inch_ex": {:hex, :inch_ex, "0.5.5", "b63f57e281467bd3456461525fdbc9e158c8edbe603da6e3e4671befde796a3d", [:mix], [{:poison, "~> 1.5 or ~> 2.0 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm"}, + "mariaex": {:hex, :mariaex, "0.8.2", "a9ee64a02fd72579f844934b4cbecca9566593e499125edf6032c58f9d50b5f9", [:mix], [{:db_connection, "~> 1.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: false]}], "hexpm"}, + "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"}, + "poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [], [], "hexpm"}, + "postgrex": {:git, "https://github.com/elixir-ecto/postgrex.git", "f9f2ae1ca374d462ff86622951fbd216f28d74f0", [branch: "jf-stage"]}, + "sbroker": {:hex, :sbroker, "1.0.0", "28ff1b5e58887c5098539f236307b36fe1d3edaa2acff9d6a3d17c2dcafebbd0", [:rebar3], [], "hexpm"}} From 2879f85babf9af42302deaa22b992f1151785e8c Mon Sep 17 00:00:00 2001 From: James Fish Date: Mon, 29 May 2017 00:27:59 +0100 Subject: [PATCH 4/5] Move GenStage API to Ecto.Repo (via Ecto.Adapter.Stage) --- integration_test/pg/copy_test.exs | 25 -------------- integration_test/pg/test_helper.exs | 4 --- integration_test/sql/stage.exs | 11 +++---- lib/ecto/adapters/postgres/connection.ex | 5 ++- lib/ecto/adapters/sql.ex | 41 +++++++++-------------- lib/ecto/adapters/sql/connection.ex | 15 +++++---- lib/ecto/repo.ex | 42 ++++++++++++++++++++++++ lib/ecto/repo/queryable.ex | 4 +-- 8 files changed, 74 insertions(+), 73 deletions(-) diff --git a/integration_test/pg/copy_test.exs b/integration_test/pg/copy_test.exs index 171569c247..fecb99e948 100644 --- a/integration_test/pg/copy_test.exs +++ b/integration_test/pg/copy_test.exs @@ -19,29 +19,4 @@ defmodule Ecto.Integration.CopyTest do assert TestRepo.all(Post) == [one, two] end end - - test "stage copy to and from table" do - one = TestRepo.insert!(%Post{title: "one"}) - two = TestRepo.insert!(%Post{title: "two"}) - - read = "COPY posts TO STDOUT" - {:ok, producer} = Ecto.Adapters.SQL.start_stage(TestRepo, read, [], []) - - data = - [{producer, cancel: :transient}] - |> GenStage.stream() - |> Enum.to_list() - - assert TestRepo.delete_all(Post) == {2, nil} - - write = "COPY posts FROM STDIN" - {:ok, consumer} = Ecto.Adapters.SQL.start_stage(TestRepo, write, [], [stage_module: Postgrex.CopyConsumer]) - - {:ok, _} = - data - |> Flow.from_enumerable() - |> Flow.into_stages([consumer]) - - assert TestRepo.all(Post) == [one, two] - end end diff --git a/integration_test/pg/test_helper.exs b/integration_test/pg/test_helper.exs index b6efcf4503..13871962f7 100644 --- a/integration_test/pg/test_helper.exs +++ b/integration_test/pg/test_helper.exs @@ -34,10 +34,6 @@ defmodule Ecto.Integration.TestRepo do use Ecto.Integration.Repo, otp_app: :ecto end -defmodule Ecto.Integration.TestStage do - use Ecto.Repo.Stage, repo: TestRepo -end - # Pool repo for non-async tests alias Ecto.Integration.PoolRepo diff --git a/integration_test/sql/stage.exs b/integration_test/sql/stage.exs index bdcb476323..252724ce84 100644 --- a/integration_test/sql/stage.exs +++ b/integration_test/sql/stage.exs @@ -2,16 +2,15 @@ defmodule Ecto.Integration.StageTest do use Ecto.Integration.Case, async: true alias Ecto.Integration.TestRepo - alias Ecto.Integration.TestStage alias Ecto.Integration.Post alias Ecto.Integration.Comment import Ecto.Query test "stream empty" do - {:ok, stage} = TestStage.start_link(Post) + {:ok, stage} = TestRepo.start_producer(Post) assert to_list(stage) === [] - {:ok, stage} = TestStage.start_link(from p in Post) + {:ok, stage} = TestRepo.start_producer(from p in Post) assert to_list(stage) == [] end @@ -20,7 +19,7 @@ defmodule Ecto.Integration.StageTest do %Post{} = TestRepo.insert!(%Post{title: "title2"}) query = from(p in "posts", order_by: p.title, select: p.title) - {:ok, stage} = TestStage.start_link(query) + {:ok, stage} = TestRepo.start_producer(query) assert to_list(stage) == ["title1", "title2"] end @@ -31,7 +30,7 @@ defmodule Ecto.Integration.StageTest do %Comment{id: cid1} = TestRepo.insert!(%Comment{text: "1", post_id: p1.id}) %Comment{id: cid2} = TestRepo.insert!(%Comment{text: "2", post_id: p1.id}) - {:ok, stage} = TestStage.start_link(Ecto.assoc(p1, :comments)) + {:ok, stage} = TestRepo.start_producer(Ecto.assoc(p1, :comments)) assert [c1, c2] = to_list(stage) assert c1.id == cid1 @@ -49,7 +48,7 @@ defmodule Ecto.Integration.StageTest do %Comment{id: cid4} = TestRepo.insert!(%Comment{text: "4", post_id: p2.id}) query = from(p in Post, preload: [:comments], select: p) - {:ok, stage} = TestStage.start_link(query, [max_rows: 2]) + {:ok, stage} = TestRepo.start_producer(query, [max_rows: 2]) assert [p1, p2, p3] = stage |> to_list() |> sort_by_id() diff --git a/lib/ecto/adapters/postgres/connection.ex b/lib/ecto/adapters/postgres/connection.ex index 3eb90a0d4a..19428a7d29 100644 --- a/lib/ecto/adapters/postgres/connection.ex +++ b/lib/ecto/adapters/postgres/connection.ex @@ -18,9 +18,8 @@ if Code.ensure_loaded?(Postgrex) do |> Postgrex.child_spec() end - def stage_spec(pool, statement, params, opts) do - stage_mod = Keyword.get(opts, :stage_module, Postgrex.Producer) - Supervisor.Spec.worker(stage_mod, [pool, statement, params, opts]) + def start_producer(pool, statement, params, opts) do + Postgrex.Producer.start_link(pool, statement, params, opts) end def to_constraints(%Postgrex.Error{postgres: %{code: :unique_violation, constraint: constraint}}), diff --git a/lib/ecto/adapters/sql.ex b/lib/ecto/adapters/sql.ex index 119b90685d..266640ddf5 100644 --- a/lib/ecto/adapters/sql.ex +++ b/lib/ecto/adapters/sql.ex @@ -17,6 +17,7 @@ defmodule Ecto.Adapters.SQL do @behaviour Ecto.Adapter @behaviour Ecto.Adapter.Migration @behaviour Ecto.Adapter.Transaction + @behaviour Ecto.Adapter.Stage @conn __MODULE__.Connection @adapter unquote(adapter) @@ -139,13 +140,13 @@ defmodule Ecto.Adapters.SQL do ## GenStage @doc false - def stage_spec(repo, meta, query, params, process, flat_map, opts) do - Ecto.Adapters.SQL.stage_spec(repo, meta, query, params, process, flat_map, opts) + def start_producer(repo, meta, query, params, process, flat_map, opts) do + Ecto.Adapters.SQL.start_producer(repo, meta, query, params, process, flat_map, opts) end defoverridable [prepare: 2, execute: 6, insert: 6, update: 6, delete: 4, insert_all: 7, execute_ddl: 3, loaders: 2, dumpers: 2, autogenerate: 1, ensure_all_started: 2, - stage_spec: 7] + start_producer: 7] end end @@ -640,41 +641,29 @@ defmodule Ecto.Adapters.SQL do end end - @doc """ - Return child specification for `GenStage` process running query. - """ - def stage_spec(repo, statement, params, opts) do - stage_spec(repo, statement, params, fn x -> x end, nil, opts) - end - - @doc """ - Start link `GenStage` process running query. - """ - def start_stage(repo, statement, params, opts) do - case stage_spec(repo, statement, params, opts) do - {_, {mod, fun, args}, _, _, _, _} -> - apply(mod, fun, args) - %{start: {mod, fun, args}} -> - apply(mod, fun, args) - end - end - @doc false - def stage_spec(repo, meta, prepared, params, mapper, flat_map, opts) do + def start_producer(repo, meta, prepared, params, mapper, flat_map, opts) do stream = stream(repo, meta, prepared, params, mapper, flat_map, opts) %Ecto.Adapters.SQL.Stream{repo: repo, statement: statement, params: params, mapper: mapper, flat_map: flat_map, opts: opts} = stream - stage_spec(repo, statement, params, mapper, flat_map, opts) + start_producer(repo, statement, params, mapper, flat_map, opts) + end + + @doc """ + Start link a `GenStage` producer that streams the result of a query. + """ + def start_producer(repo, statement, params, opts) do + start_producer(repo, statement, params, fn x -> x end, nil, opts) end - defp stage_spec(repo, statement, params, mapper, flat_map, opts) do + defp start_producer(repo, statement, params, mapper, flat_map, opts) do {repo_mod, pool, default_opts} = lookup_pool(repo) stage_opts = Keyword.delete(default_opts, :name) stream_mapper = flat_map && {__MODULE__, :stage_mapper, [pool, flat_map]} map_opts = [decode_mapper: mapper, stream_mapper: stream_mapper] opts = map_opts ++ with_log(repo, params, opts ++ stage_opts) opts = Keyword.put_new(opts, :caller, self()) - apply(repo_mod.__sql__, :stage_spec, [pool, statement, params, opts]) + repo_mod.__sql__.start_producer(pool, statement, params, opts) end @doc false diff --git a/lib/ecto/adapters/sql/connection.ex b/lib/ecto/adapters/sql/connection.ex index 25d441ea9e..fa0543b92a 100644 --- a/lib/ecto/adapters/sql/connection.ex +++ b/lib/ecto/adapters/sql/connection.ex @@ -15,13 +15,6 @@ defmodule Ecto.Adapters.SQL.Connection do """ @callback child_spec(options :: Keyword.t) :: {module, Keyword.t} - @doc """ - Receives `pool`, `statement`, `params` and `options` and returns `GenStage` - child specification - """ - @callback stage_spec(pool :: GenServer.server, statement :: prepared, params :: [term], options :: Keyword.t) :: - Supervisor.spec() - @doc """ Prepares and executes the given query with `DBConnection`. """ @@ -101,4 +94,12 @@ defmodule Ecto.Adapters.SQL.Connection do Receives a DDL command and returns a query that executes it. """ @callback execute_ddl(command :: Ecto.Adapter.Migration.command) :: String.t | [iodata] + + ## GenStage + + @doc """ + Start and link a `GenStage` producer that streams the result of a query. + """ + @callback start_producer(pool :: GenServer.server, statement :: prepared, params :: [term], options :: Keyword.t) :: + GenServer.on_start end diff --git a/lib/ecto/repo.ex b/lib/ecto/repo.ex index 871295eff1..a9cb312c05 100644 --- a/lib/ecto/repo.ex +++ b/lib/ecto/repo.ex @@ -229,6 +229,12 @@ defmodule Ecto.Repo do def load(schema_or_types, data) do Ecto.Repo.Schema.load(@adapter, schema_or_types, data) end + + if function_exported?(@adapter, :start_producer, 7) do + def start_producer(queryable, opts \\ []) do + Ecto.Repo.Queryable.start_producer(__MODULE__, @adapter, queryable, opts) + end + end end end @@ -1009,4 +1015,40 @@ defmodule Ecto.Repo do """ @callback load(Ecto.Schema.t | map(), map() | Keyword.t | {list, list}) :: Ecto.Schema.t | map() + + @doc """ + Starts and links to a `GenStage` producer that executes a query. + + + Returns a `GenStage` that produces all entries from the data store to its + consumer(s). SQL adapters, such as Postgres and MySQL, will wrap the query + inside a transaction. + + May raise `Ecto.QueryError` if query validation fails. + + ## Options + + * `:prefix` - The prefix to run the query on (such as the schema path + in Postgres or the database in MySQL). This overrides the prefix set + in the query + + * `:max_rows` - The number of rows to load from the database as we stream. + It is supported at least by Postgres and MySQL and defaults to 500. + + See the "Shared options" section at the module documentation. + + ## Example + + # Fetch all post titles + query = from p in Post, + select: p.title + {:ok, producer} = MyRepo.start_producer(query) + [{producer, cancel: :transient}] + |> Flow.from_stages() + |> Flow.map(&IO.inspect/1) + |> Flow.run() + """ + @callback start_producer(queryable :: Ecto.Query.t, opts :: Keyword.t) :: + GenServer.on_start + @optional_callbacks [start_producer: 2] end diff --git a/lib/ecto/repo/queryable.ex b/lib/ecto/repo/queryable.ex index cca20c29ae..bf432514ae 100644 --- a/lib/ecto/repo/queryable.ex +++ b/lib/ecto/repo/queryable.ex @@ -109,13 +109,13 @@ defmodule Ecto.Repo.Queryable do execute(:delete_all, repo, adapter, query, opts) end - def stage_spec(repo, adapter, queryable, opts) when is_list(opts) do + def start_producer(repo, adapter, queryable, opts) when is_list(opts) do query = queryable |> Ecto.Queryable.to_query |> Ecto.Query.Planner.returning(true) |> attach_prefix(opts) - lazy(&adapter.stage_spec/7, :all, repo, adapter, query, opts) + lazy(&adapter.start_producer/7, :all, repo, adapter, query, opts) end ## Helpers From 56df49d75c89d85b739d24b12d438186e76883ac Mon Sep 17 00:00:00 2001 From: James Fish Date: Mon, 29 May 2017 02:25:03 +0100 Subject: [PATCH 5/5] Update DBConnection/Postgrex and introduce start_consumer --- integration_test/sql/stage.exs | 14 +++++++++ lib/ecto/adapter/stage.ex | 32 +++++++++++++++++++ lib/ecto/adapters/postgres/connection.ex | 4 +++ lib/ecto/adapters/sql.ex | 28 +++++++++++++++-- lib/ecto/adapters/sql/connection.ex | 9 ++++++ lib/ecto/repo.ex | 27 ++++++++++++++++ lib/ecto/repo/schema.ex | 39 ++++++++++++++++++++++++ mix.lock | 4 +-- 8 files changed, 153 insertions(+), 4 deletions(-) create mode 100644 lib/ecto/adapter/stage.ex diff --git a/integration_test/sql/stage.exs b/integration_test/sql/stage.exs index 252724ce84..c2899f34fb 100644 --- a/integration_test/sql/stage.exs +++ b/integration_test/sql/stage.exs @@ -57,6 +57,20 @@ defmodule Ecto.Integration.StageTest do assert [] = p3.comments end + test "insert entries" do + {:ok, stage} = TestRepo.start_consumer(Post) + + mon = Process.monitor(stage) + + [[title: "hello"], [title: "world"]] + |> Flow.from_enumerable() + |> Flow.into_stages([{stage, [cancel: :transient, max_demand: 1]}]) + + assert_receive {:DOWN, ^mon, _, _, :normal} + + assert [%Post{title: "hello"}, %Post{title: "world"}] = TestRepo.all(Post) + end + defp to_list(stage) do stage |> Flow.from_stage() diff --git a/lib/ecto/adapter/stage.ex b/lib/ecto/adapter/stage.ex new file mode 100644 index 0000000000..cad90f2e9f --- /dev/null +++ b/lib/ecto/adapter/stage.ex @@ -0,0 +1,32 @@ +defmodule Ecto.Adapter.Stage do + @moduledoc """ + Specifies the adapter `GenStage` API. + """ + + @type flat_map :: ({integer, list | nil} -> list) + @type insert_all :: (list -> integer) + @type options :: Keyword.t + + @doc """ + Starts and links to a `GenStage` producer. + + The producer executes a query and returns the result to consumer(s). + + See `Ecto.Repo.start_producer/2`. + """ + @callback start_producer(repo :: Ecto.Repo.t, Ecto.Adapter.query_meta, query, params :: list(), Ecto.Adapter.process | nil, flat_map, options) :: + GenServer.on_start when + query: {:nocache, Ecto.Adapter.prepared} | + {:cached, (Ecto.Adapter.prepared -> :ok), Ecto.Adapter.cached} | + {:cache, (Ecto.Adapter.cached -> :ok), Ecto.Adapter.prepared} + + @doc """ + Starts and links to a `GenStage` consumer. + + The consumers inserts the entries it receives for the schema. + + See `Ecto.Repo.start_consumer/2` + """ + @callback start_consumer(repo :: Ecto.Repo.t, insert_all, options) :: + GenServer.on_start +end diff --git a/lib/ecto/adapters/postgres/connection.ex b/lib/ecto/adapters/postgres/connection.ex index 19428a7d29..faf5633643 100644 --- a/lib/ecto/adapters/postgres/connection.ex +++ b/lib/ecto/adapters/postgres/connection.ex @@ -22,6 +22,10 @@ if Code.ensure_loaded?(Postgrex) do Postgrex.Producer.start_link(pool, statement, params, opts) end + def start_consumer(pool, fun, opts) do + Postgrex.Consumer.start_link(pool, fun, opts) + end + def to_constraints(%Postgrex.Error{postgres: %{code: :unique_violation, constraint: constraint}}), do: [unique: constraint] def to_constraints(%Postgrex.Error{postgres: %{code: :foreign_key_violation, constraint: constraint}}), diff --git a/lib/ecto/adapters/sql.ex b/lib/ecto/adapters/sql.ex index 266640ddf5..ee8da40541 100644 --- a/lib/ecto/adapters/sql.ex +++ b/lib/ecto/adapters/sql.ex @@ -144,9 +144,14 @@ defmodule Ecto.Adapters.SQL do Ecto.Adapters.SQL.start_producer(repo, meta, query, params, process, flat_map, opts) end + @doc false + def start_consumer(repo, insert, opts) do + Ecto.Adapters.SQL.start_consumer(repo, insert, opts) + end + defoverridable [prepare: 2, execute: 6, insert: 6, update: 6, delete: 4, insert_all: 7, execute_ddl: 3, loaders: 2, dumpers: 2, autogenerate: 1, ensure_all_started: 2, - start_producer: 7] + start_producer: 7, start_consumer: 3] end end @@ -652,7 +657,7 @@ defmodule Ecto.Adapters.SQL do @doc """ Start link a `GenStage` producer that streams the result of a query. """ - def start_producer(repo, statement, params, opts) do + def start_producer(repo, statement, params, opts \\ []) do start_producer(repo, statement, params, fn x -> x end, nil, opts) end @@ -681,6 +686,25 @@ defmodule Ecto.Adapters.SQL do end end + @doc """ + Start link a `GenStage` consumers that runs a `transaction` for every batch of + events and maintains a state. + """ + def start_consumer(repo, fun, opts \\ []) when is_function(fun, 1) do + {repo_mod, pool, default_opts} = lookup_pool(repo) + + transaction = + fn(conn, rows) -> + put_conn(pool, conn) + fun.(rows) + end + + stage_opts = Keyword.delete(default_opts, :name) + opts = with_log(repo, [], opts ++ stage_opts) + opts = Keyword.put_new(opts, :caller, self()) + repo_mod.__sql__.start_consumer(pool, transaction, opts) + end + ## Log defp with_log(repo, params, opts) do diff --git a/lib/ecto/adapters/sql/connection.ex b/lib/ecto/adapters/sql/connection.ex index fa0543b92a..17168e82c7 100644 --- a/lib/ecto/adapters/sql/connection.ex +++ b/lib/ecto/adapters/sql/connection.ex @@ -9,6 +9,9 @@ defmodule Ecto.Adapters.SQL.Connection do @typedoc "The cache query which is a DBConnection Query" @type cached :: map + @typedoc "The DBConection transaction fun" + @type transaction :: (DBConnection.t, list -> term) + @doc """ Receives options and returns `DBConnection` supervisor child specification. @@ -102,4 +105,10 @@ defmodule Ecto.Adapters.SQL.Connection do """ @callback start_producer(pool :: GenServer.server, statement :: prepared, params :: [term], options :: Keyword.t) :: GenServer.on_start + + @doc """ + Start and link a `GenStage` consumer that runs a transaction for the events. + """ + @callback start_consumer(pool :: GenServer.server, transaction, options :: Keyword.t) :: + GenServer.on_start end diff --git a/lib/ecto/repo.ex b/lib/ecto/repo.ex index a9cb312c05..a3feba6a9e 100644 --- a/lib/ecto/repo.ex +++ b/lib/ecto/repo.ex @@ -234,6 +234,10 @@ defmodule Ecto.Repo do def start_producer(queryable, opts \\ []) do Ecto.Repo.Queryable.start_producer(__MODULE__, @adapter, queryable, opts) end + + def start_consumer(schema_or_source, opts \\ []) do + Ecto.Repo.Schema.start_consumer(__MODULE__, @adapter, schema_or_source, opts) + end end end end @@ -1051,4 +1055,27 @@ defmodule Ecto.Repo do @callback start_producer(queryable :: Ecto.Query.t, opts :: Keyword.t) :: GenServer.on_start @optional_callbacks [start_producer: 2] + + @doc """ + Starts and links to a `GenStage` consumer that inserts all entries it + consumes. + + Returns a `GenStage` consumer that uses `c:insert_all/3` to insert all + consumed entries in batches to the database. SQL adapters, such as Postgres + and MySQL, will wrap all the inserts inside a single transaction. + + See `c:insert_all/3` for options. + + ## Examples + + {:ok, consumer} = MyRepo.start_consumer(Post) + "posts" + |> File.stream!() + |> Flow.from_enumerable(stages: 1) + |> Flow.map(&MyDecoder.decode/1) + |> Flow.into_stages([{consumer, cancel: :transient}]) + """ + @callback start_consumer(schema_or_source :: binary | {binary, Ecto.Schema.t} | Ecto.Schema.t, opts :: Keyword.t) :: + GenServer.on_start + @optional_callbacks [start_consumer: 2] end diff --git a/lib/ecto/repo/schema.ex b/lib/ecto/repo/schema.ex index 4d95f5d071..331ba9b86e 100644 --- a/lib/ecto/repo/schema.ex +++ b/lib/ecto/repo/schema.ex @@ -50,6 +50,45 @@ defmodule Ecto.Repo.Schema do {count, postprocess(rows, fields, adapter, schema, source)} end + @doc """ + Implementation for `Ecto.Repo.start_consumer/2`. + """ + def start_consumer(repo, adapter, schema, opts) when is_atom(schema) do + start_consumer(repo, adapter, schema, schema.__schema__(:prefix), + schema.__schema__(:source), opts) + end + + def start_consumer(repo, adapter, table, opts) when is_binary(table) do + start_consumer(repo, adapter, nil, nil, table, opts) + end + + def start_consumer(repo, adapter, {source, schema}, opts) when is_atom(schema) do + start_consumer(repo, adapter, schema, schema.__schema__(:prefix), source, opts) + end + + defp start_consumer(repo, adapter, schema, prefix, source, opts) do + returning = opts[:returning] || false + autogen = schema && schema.__schema__(:autogenerate_id) + source = {Keyword.get(opts, :prefix, prefix), source} + fields = preprocess(returning, schema) + + insert_all = fn(rows) -> + {rows, header} = extract_header_and_fields(rows, schema, autogen, adapter) + counter = fn -> Enum.reduce(rows, 0, &length(&1) + &2) end + metadata = %{source: source, context: nil, schema: schema, autogenerate_id: autogen} + + {on_conflict, opts} = Keyword.pop(opts, :on_conflict, :raise) + {conflict_target, opts} = Keyword.pop(opts, :conflict_target, []) + on_conflict = on_conflict(on_conflict, conflict_target, metadata, counter, adapter) + + {count, _} = + adapter.insert_all(repo, metadata, Map.keys(header), rows, on_conflict, fields || [], opts) + count + end + + adapter.start_consumer(repo, insert_all, opts) + end + defp preprocess([_|_] = fields, _schema), do: fields defp preprocess([], _schema), diff --git a/mix.lock b/mix.lock index 481ebbea29..8a887a93a2 100644 --- a/mix.lock +++ b/mix.lock @@ -1,6 +1,6 @@ %{"backoff": {:hex, :backoff, "1.1.1"}, "connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [], [], "hexpm"}, - "db_connection": {:git, "https://github.com/elixir-ecto/db_connection.git", "9c6e4945112554bf40c240244c033f6dcc9c4a13", [branch: "jf-stream_stage"]}, + "db_connection": {:git, "https://github.com/elixir-ecto/db_connection.git", "845b46c234a3723bc4c1af1059a86bc9e9d85af0", [branch: "jf-stream_stage"]}, "decimal": {:hex, :decimal, "1.3.1", "157b3cedb2bfcb5359372a7766dd7a41091ad34578296e951f58a946fcab49c6", [:mix], [], "hexpm"}, "earmark": {:hex, :earmark, "1.0.3", "89bdbaf2aca8bbb5c97d8b3b55c5dd0cff517ecc78d417e87f1d0982e514557b", [:mix], [], "hexpm"}, "ex_doc": {:hex, :ex_doc, "0.14.5", "c0433c8117e948404d93ca69411dd575ec6be39b47802e81ca8d91017a0cf83c", [:mix], [{:earmark, "~> 1.0", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"}, @@ -10,5 +10,5 @@ "mariaex": {:hex, :mariaex, "0.8.2", "a9ee64a02fd72579f844934b4cbecca9566593e499125edf6032c58f9d50b5f9", [:mix], [{:db_connection, "~> 1.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: false]}], "hexpm"}, "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"}, "poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [], [], "hexpm"}, - "postgrex": {:git, "https://github.com/elixir-ecto/postgrex.git", "f9f2ae1ca374d462ff86622951fbd216f28d74f0", [branch: "jf-stage"]}, + "postgrex": {:git, "https://github.com/elixir-ecto/postgrex.git", "b825546e8954f9623e256f2423483bf42a2c6596", [branch: "jf-stage"]}, "sbroker": {:hex, :sbroker, "1.0.0", "28ff1b5e58887c5098539f236307b36fe1d3edaa2acff9d6a3d17c2dcafebbd0", [:rebar3], [], "hexpm"}}