diff --git a/integration_test/pg/all_test.exs b/integration_test/pg/all_test.exs index 2184f29084..24a9398413 100644 --- a/integration_test/pg/all_test.exs +++ b/integration_test/pg/all_test.exs @@ -3,6 +3,7 @@ Code.require_file "../sql/lock.exs", __DIR__ Code.require_file "../sql/migration.exs", __DIR__ Code.require_file "../sql/sandbox.exs", __DIR__ Code.require_file "../sql/sql.exs", __DIR__ +Code.require_file "../sql/stage.exs", __DIR__ Code.require_file "../sql/stream.exs", __DIR__ Code.require_file "../sql/subquery.exs", __DIR__ Code.require_file "../sql/transaction.exs", __DIR__ diff --git a/integration_test/pg/copy_test.exs b/integration_test/pg/copy_test.exs index ec1d7b3bd0..fecb99e948 100644 --- a/integration_test/pg/copy_test.exs +++ b/integration_test/pg/copy_test.exs @@ -4,7 +4,7 @@ defmodule Ecto.Integration.CopyTest do alias Ecto.Integration.TestRepo alias Ecto.Integration.Post - test "copy to and from table" do + test "stream copy to and from table" do read = Ecto.Adapters.SQL.stream(TestRepo, "COPY posts TO STDOUT") write = Ecto.Adapters.SQL.stream(TestRepo, "COPY posts FROM STDIN") diff --git a/integration_test/sql/stage.exs b/integration_test/sql/stage.exs new file mode 100644 index 0000000000..c2899f34fb --- /dev/null +++ b/integration_test/sql/stage.exs @@ -0,0 +1,83 @@ +defmodule Ecto.Integration.StageTest do + use Ecto.Integration.Case, async: true + + alias Ecto.Integration.TestRepo + alias Ecto.Integration.Post + alias Ecto.Integration.Comment + import Ecto.Query + + test "stream empty" do + {:ok, stage} = TestRepo.start_producer(Post) + assert to_list(stage) === [] + + {:ok, stage} = TestRepo.start_producer(from p in Post) + assert to_list(stage) == [] + end + + test "stream without schema" do + %Post{} = TestRepo.insert!(%Post{title: "title1"}) + %Post{} = TestRepo.insert!(%Post{title: "title2"}) + + query = from(p in "posts", order_by: p.title, select: p.title) + {:ok, stage} = TestRepo.start_producer(query) + + assert to_list(stage) == ["title1", "title2"] + end + + test "stream with assoc" do + p1 = TestRepo.insert!(%Post{title: "1"}) + + %Comment{id: cid1} = TestRepo.insert!(%Comment{text: "1", post_id: p1.id}) + %Comment{id: cid2} = TestRepo.insert!(%Comment{text: "2", post_id: p1.id}) + + {:ok, stage} = TestRepo.start_producer(Ecto.assoc(p1, :comments)) + assert [c1, c2] = to_list(stage) + + assert c1.id == cid1 + assert c2.id == cid2 + end + + test "stream with preload" do + p1 = TestRepo.insert!(%Post{title: "1"}) + p2 = TestRepo.insert!(%Post{title: "2"}) + TestRepo.insert!(%Post{title: "3"}) + + %Comment{id: cid1} = TestRepo.insert!(%Comment{text: "1", post_id: p1.id}) + %Comment{id: cid2} = TestRepo.insert!(%Comment{text: "2", post_id: p1.id}) + %Comment{id: cid3} = TestRepo.insert!(%Comment{text: "3", post_id: p2.id}) + %Comment{id: cid4} = TestRepo.insert!(%Comment{text: "4", post_id: p2.id}) + + query = from(p in Post, preload: [:comments], select: p) + {:ok, stage} = TestRepo.start_producer(query, [max_rows: 2]) + + assert [p1, p2, p3] = stage |> to_list() |> sort_by_id() + + assert [%Comment{id: ^cid1}, %Comment{id: ^cid2}] = p1.comments |> sort_by_id + assert [%Comment{id: ^cid3}, %Comment{id: ^cid4}] = p2.comments |> sort_by_id + assert [] = p3.comments + end + + test "insert entries" do + {:ok, stage} = TestRepo.start_consumer(Post) + + mon = Process.monitor(stage) + + [[title: "hello"], [title: "world"]] + |> Flow.from_enumerable() + |> Flow.into_stages([{stage, [cancel: :transient, max_demand: 1]}]) + + assert_receive {:DOWN, ^mon, _, _, :normal} + + assert [%Post{title: "hello"}, %Post{title: "world"}] = TestRepo.all(Post) + end + + defp to_list(stage) do + stage + |> Flow.from_stage() + |> Enum.to_list() + end + + defp sort_by_id(values) do + Enum.sort_by(values, &(&1.id)) + end +end diff --git a/lib/ecto/adapter/stage.ex b/lib/ecto/adapter/stage.ex new file mode 100644 index 0000000000..cad90f2e9f --- /dev/null +++ b/lib/ecto/adapter/stage.ex @@ -0,0 +1,32 @@ +defmodule Ecto.Adapter.Stage do + @moduledoc """ + Specifies the adapter `GenStage` API. + """ + + @type flat_map :: ({integer, list | nil} -> list) + @type insert_all :: (list -> integer) + @type options :: Keyword.t + + @doc """ + Starts and links to a `GenStage` producer. + + The producer executes a query and returns the result to consumer(s). + + See `Ecto.Repo.start_producer/2`. + """ + @callback start_producer(repo :: Ecto.Repo.t, Ecto.Adapter.query_meta, query, params :: list(), Ecto.Adapter.process | nil, flat_map, options) :: + GenServer.on_start when + query: {:nocache, Ecto.Adapter.prepared} | + {:cached, (Ecto.Adapter.prepared -> :ok), Ecto.Adapter.cached} | + {:cache, (Ecto.Adapter.cached -> :ok), Ecto.Adapter.prepared} + + @doc """ + Starts and links to a `GenStage` consumer. + + The consumers inserts the entries it receives for the schema. + + See `Ecto.Repo.start_consumer/2` + """ + @callback start_consumer(repo :: Ecto.Repo.t, insert_all, options) :: + GenServer.on_start +end diff --git a/lib/ecto/adapters/postgres/connection.ex b/lib/ecto/adapters/postgres/connection.ex index 4b7ca3abf3..faf5633643 100644 --- a/lib/ecto/adapters/postgres/connection.ex +++ b/lib/ecto/adapters/postgres/connection.ex @@ -18,6 +18,14 @@ if Code.ensure_loaded?(Postgrex) do |> Postgrex.child_spec() end + def start_producer(pool, statement, params, opts) do + Postgrex.Producer.start_link(pool, statement, params, opts) + end + + def start_consumer(pool, fun, opts) do + Postgrex.Consumer.start_link(pool, fun, opts) + end + def to_constraints(%Postgrex.Error{postgres: %{code: :unique_violation, constraint: constraint}}), do: [unique: constraint] def to_constraints(%Postgrex.Error{postgres: %{code: :foreign_key_violation, constraint: constraint}}), diff --git a/lib/ecto/adapters/sql.ex b/lib/ecto/adapters/sql.ex index 0fd9988a7c..ee8da40541 100644 --- a/lib/ecto/adapters/sql.ex +++ b/lib/ecto/adapters/sql.ex @@ -17,6 +17,7 @@ defmodule Ecto.Adapters.SQL do @behaviour Ecto.Adapter @behaviour Ecto.Adapter.Migration @behaviour Ecto.Adapter.Transaction + @behaviour Ecto.Adapter.Stage @conn __MODULE__.Connection @adapter unquote(adapter) @@ -69,8 +70,8 @@ defmodule Ecto.Adapters.SQL do end @doc false - def stream(repo, meta, query, params, process, opts) do - Ecto.Adapters.SQL.stream(repo, meta, query, params, process, opts) + def stream(repo, meta, query, params, process, flat_map, opts) do + Ecto.Adapters.SQL.stream(repo, meta, query, params, process, flat_map, opts) end @doc false @@ -136,8 +137,21 @@ defmodule Ecto.Adapters.SQL do :ok end + ## GenStage + + @doc false + def start_producer(repo, meta, query, params, process, flat_map, opts) do + Ecto.Adapters.SQL.start_producer(repo, meta, query, params, process, flat_map, opts) + end + + @doc false + def start_consumer(repo, insert, opts) do + Ecto.Adapters.SQL.start_consumer(repo, insert, opts) + end + defoverridable [prepare: 2, execute: 6, insert: 6, update: 6, delete: 4, insert_all: 7, - execute_ddl: 3, loaders: 2, dumpers: 2, autogenerate: 1, ensure_all_started: 2] + execute_ddl: 3, loaders: 2, dumpers: 2, autogenerate: 1, ensure_all_started: 2, + start_producer: 7, start_consumer: 3] end end @@ -477,55 +491,60 @@ defmodule Ecto.Adapters.SQL do """ @spec stream(Ecto.Repo.t, String.t, [term], Keyword.t) :: Enum.t def stream(repo, sql, params \\ [], opts \\ []) do - Ecto.Adapters.SQL.Stream.__build__(repo, sql, params, fn x -> x end, opts) + Ecto.Adapters.SQL.Stream.__build__(repo, sql, params, fn x -> x end, nil, opts) end @doc false - def stream(repo, meta, prepared, params, mapper, opts) do - do_stream(repo, meta, prepared, params, mapper, put_source(opts, meta)) + def stream(repo, meta, prepared, params, mapper, flat_map, opts) do + do_stream(repo, meta, prepared, params, mapper, flat_map, put_source(opts, meta)) end - def do_stream(repo, _meta, {:cache, _, {_, prepared}}, params, nil, opts) do - prepare_stream(repo, prepared, params, nil, opts) + def do_stream(repo, _meta, {:cache, _, {_, prepared}}, params, nil, flat_map, opts) do + prepare_stream(repo, prepared, params, nil, flat_map, opts) end - def do_stream(repo, %{fields: fields, sources: sources}, {:cache, _, {_, prepared}}, params, process, opts) do + def do_stream(repo, %{fields: fields, sources: sources}, {:cache, _, {_, prepared}}, params, process, flat_map, opts) do mapper = &process_row(&1, process, fields, sources) - prepare_stream(repo, prepared, params, mapper, opts) + prepare_stream(repo, prepared, params, mapper, flat_map, opts) end - def do_stream(repo, _, {:cached, _, {_, cached}}, params, nil, opts) do - prepare_stream(repo, String.Chars.to_string(cached), params, nil, opts) + def do_stream(repo, _, {:cached, _, {_, cached}}, params, nil, flat_map, opts) do + prepare_stream(repo, String.Chars.to_string(cached), params, nil, flat_map, opts) end - def do_stream(repo, %{fields: fields, sources: sources}, {:cached, _, {_, cached}}, params, process, opts) do + def do_stream(repo, %{fields: fields, sources: sources}, {:cached, _, {_, cached}}, params, process, flat_map, opts) do mapper = &process_row(&1, process, fields, sources) - prepare_stream(repo, String.Chars.to_string(cached), params, mapper, opts) + prepare_stream(repo, String.Chars.to_string(cached), params, mapper, flat_map, opts) end - def do_stream(repo, _meta, {:nocache, {_id, prepared}}, params, nil, opts) do - prepare_stream(repo, prepared, params, nil, opts) + def do_stream(repo, _meta, {:nocache, {_id, prepared}}, params, nil, flat_map, opts) do + prepare_stream(repo, prepared, params, nil, flat_map, opts) end - def do_stream(repo, %{fields: fields, sources: sources}, {:nocache, {_id, prepared}}, params, process, opts) do + def do_stream(repo, %{fields: fields, sources: sources}, {:nocache, {_id, prepared}}, params, process, flat_map, opts) do mapper = &process_row(&1, process, fields, sources) - prepare_stream(repo, prepared, params, mapper, opts) + prepare_stream(repo, prepared, params, mapper, flat_map, opts) + end + + defp prepare_stream(repo, prepared, params, mapper, flat_map, opts) do + Ecto.Adapters.SQL.Stream.__build__(repo, prepared, params, mapper, flat_map, opts) end - defp prepare_stream(repo, prepared, params, mapper, opts) do - repo - |> Ecto.Adapters.SQL.Stream.__build__(prepared, params, mapper, opts) - |> Stream.map(fn(%{num_rows: nrows, rows: rows}) -> {nrows, rows} end) + @doc false + def stream_mapper(conn, %{num_rows: nrows, rows: rows}, conn, flat_map) do + flat_map.({nrows, rows}) end @doc false - def reduce(repo, statement, params, mapper, opts, acc, fun) do + def reduce(repo, statement, params, mapper, flat_map, opts, acc, fun) do {repo_mod, pool, default_opts} = lookup_pool(repo) - opts = [decode_mapper: mapper] ++ with_log(repo, params, opts ++ default_opts) case get_conn(pool) do nil -> raise "cannot reduce stream outside of transaction" conn -> + stream_mapper = flat_map && {__MODULE__, :stream_mapper, [conn, flat_map]} + map_opts = [decode_mapper: mapper, stream_mapper: stream_mapper] + opts = map_opts ++ with_log(repo, params, opts ++ default_opts) apply(repo_mod.__sql__, :stream, [conn, statement, params, opts]) |> Enumerable.reduce(acc, fun) end @@ -627,6 +646,65 @@ defmodule Ecto.Adapters.SQL do end end + @doc false + def start_producer(repo, meta, prepared, params, mapper, flat_map, opts) do + stream = stream(repo, meta, prepared, params, mapper, flat_map, opts) + %Ecto.Adapters.SQL.Stream{repo: repo, statement: statement, params: params, + mapper: mapper, flat_map: flat_map, opts: opts} = stream + start_producer(repo, statement, params, mapper, flat_map, opts) + end + + @doc """ + Start link a `GenStage` producer that streams the result of a query. + """ + def start_producer(repo, statement, params, opts \\ []) do + start_producer(repo, statement, params, fn x -> x end, nil, opts) + end + + defp start_producer(repo, statement, params, mapper, flat_map, opts) do + {repo_mod, pool, default_opts} = lookup_pool(repo) + stage_opts = Keyword.delete(default_opts, :name) + stream_mapper = flat_map && {__MODULE__, :stage_mapper, [pool, flat_map]} + map_opts = [decode_mapper: mapper, stream_mapper: stream_mapper] + opts = map_opts ++ with_log(repo, params, opts ++ stage_opts) + opts = Keyword.put_new(opts, :caller, self()) + repo_mod.__sql__.start_producer(pool, statement, params, opts) + end + + @doc false + def stage_mapper(stream_conn, res, pool, flat_map) do + cur_conn = get_conn(pool) + try do + put_conn(pool, stream_conn) + stream_mapper(stream_conn, res, stream_conn, flat_map) + after + if cur_conn do + put_conn(pool, cur_conn) + else + delete_conn(pool) + end + end + end + + @doc """ + Start link a `GenStage` consumers that runs a `transaction` for every batch of + events and maintains a state. + """ + def start_consumer(repo, fun, opts \\ []) when is_function(fun, 1) do + {repo_mod, pool, default_opts} = lookup_pool(repo) + + transaction = + fn(conn, rows) -> + put_conn(pool, conn) + fun.(rows) + end + + stage_opts = Keyword.delete(default_opts, :name) + opts = with_log(repo, [], opts ++ stage_opts) + opts = Keyword.put_new(opts, :caller, self()) + repo_mod.__sql__.start_consumer(pool, transaction, opts) + end + ## Log defp with_log(repo, params, opts) do diff --git a/lib/ecto/adapters/sql/connection.ex b/lib/ecto/adapters/sql/connection.ex index 9bd1dcce3c..17168e82c7 100644 --- a/lib/ecto/adapters/sql/connection.ex +++ b/lib/ecto/adapters/sql/connection.ex @@ -9,6 +9,9 @@ defmodule Ecto.Adapters.SQL.Connection do @typedoc "The cache query which is a DBConnection Query" @type cached :: map + @typedoc "The DBConection transaction fun" + @type transaction :: (DBConnection.t, list -> term) + @doc """ Receives options and returns `DBConnection` supervisor child specification. @@ -94,4 +97,18 @@ defmodule Ecto.Adapters.SQL.Connection do Receives a DDL command and returns a query that executes it. """ @callback execute_ddl(command :: Ecto.Adapter.Migration.command) :: String.t | [iodata] + + ## GenStage + + @doc """ + Start and link a `GenStage` producer that streams the result of a query. + """ + @callback start_producer(pool :: GenServer.server, statement :: prepared, params :: [term], options :: Keyword.t) :: + GenServer.on_start + + @doc """ + Start and link a `GenStage` consumer that runs a transaction for the events. + """ + @callback start_consumer(pool :: GenServer.server, transaction, options :: Keyword.t) :: + GenServer.on_start end diff --git a/lib/ecto/adapters/sql/stream.ex b/lib/ecto/adapters/sql/stream.ex index ad6be65234..b14637bcf6 100644 --- a/lib/ecto/adapters/sql/stream.ex +++ b/lib/ecto/adapters/sql/stream.ex @@ -1,11 +1,11 @@ defmodule Ecto.Adapters.SQL.Stream do @moduledoc false - defstruct [:repo, :statement, :params, :mapper, :opts] + defstruct [:repo, :statement, :params, :mapper, :flat_map, :opts] - def __build__(repo, statement, params, mapper, opts) do - %__MODULE__{repo: repo, statement: statement, params: params, mapper: mapper, - opts: opts} + def __build__(repo, statement, params, mapper, flat_map, opts) do + %__MODULE__{repo: repo, statement: statement, params: params, + mapper: mapper, flat_map: flat_map, opts: opts} end end @@ -16,8 +16,9 @@ defimpl Enumerable, for: Ecto.Adapters.SQL.Stream do def reduce(stream, acc, fun) do %Ecto.Adapters.SQL.Stream{repo: repo, statement: statement, params: params, - mapper: mapper, opts: opts} = stream - Ecto.Adapters.SQL.reduce(repo, statement, params, mapper, opts, acc, fun) + mapper: mapper, flat_map: flat_map, + opts: opts} = stream + Ecto.Adapters.SQL.reduce(repo, statement, params, mapper, flat_map, opts, acc, fun) end end diff --git a/lib/ecto/repo.ex b/lib/ecto/repo.ex index 871295eff1..a3feba6a9e 100644 --- a/lib/ecto/repo.ex +++ b/lib/ecto/repo.ex @@ -229,6 +229,16 @@ defmodule Ecto.Repo do def load(schema_or_types, data) do Ecto.Repo.Schema.load(@adapter, schema_or_types, data) end + + if function_exported?(@adapter, :start_producer, 7) do + def start_producer(queryable, opts \\ []) do + Ecto.Repo.Queryable.start_producer(__MODULE__, @adapter, queryable, opts) + end + + def start_consumer(schema_or_source, opts \\ []) do + Ecto.Repo.Schema.start_consumer(__MODULE__, @adapter, schema_or_source, opts) + end + end end end @@ -1009,4 +1019,63 @@ defmodule Ecto.Repo do """ @callback load(Ecto.Schema.t | map(), map() | Keyword.t | {list, list}) :: Ecto.Schema.t | map() + + @doc """ + Starts and links to a `GenStage` producer that executes a query. + + + Returns a `GenStage` that produces all entries from the data store to its + consumer(s). SQL adapters, such as Postgres and MySQL, will wrap the query + inside a transaction. + + May raise `Ecto.QueryError` if query validation fails. + + ## Options + + * `:prefix` - The prefix to run the query on (such as the schema path + in Postgres or the database in MySQL). This overrides the prefix set + in the query + + * `:max_rows` - The number of rows to load from the database as we stream. + It is supported at least by Postgres and MySQL and defaults to 500. + + See the "Shared options" section at the module documentation. + + ## Example + + # Fetch all post titles + query = from p in Post, + select: p.title + {:ok, producer} = MyRepo.start_producer(query) + [{producer, cancel: :transient}] + |> Flow.from_stages() + |> Flow.map(&IO.inspect/1) + |> Flow.run() + """ + @callback start_producer(queryable :: Ecto.Query.t, opts :: Keyword.t) :: + GenServer.on_start + @optional_callbacks [start_producer: 2] + + @doc """ + Starts and links to a `GenStage` consumer that inserts all entries it + consumes. + + Returns a `GenStage` consumer that uses `c:insert_all/3` to insert all + consumed entries in batches to the database. SQL adapters, such as Postgres + and MySQL, will wrap all the inserts inside a single transaction. + + See `c:insert_all/3` for options. + + ## Examples + + {:ok, consumer} = MyRepo.start_consumer(Post) + "posts" + |> File.stream!() + |> Flow.from_enumerable(stages: 1) + |> Flow.map(&MyDecoder.decode/1) + |> Flow.into_stages([{consumer, cancel: :transient}]) + """ + @callback start_consumer(schema_or_source :: binary | {binary, Ecto.Schema.t} | Ecto.Schema.t, opts :: Keyword.t) :: + GenServer.on_start + @optional_callbacks [start_consumer: 2] end diff --git a/lib/ecto/repo/queryable.ex b/lib/ecto/repo/queryable.ex index 0d8b71d414..bf432514ae 100644 --- a/lib/ecto/repo/queryable.ex +++ b/lib/ecto/repo/queryable.ex @@ -41,7 +41,7 @@ defmodule Ecto.Repo.Queryable do |> Ecto.Queryable.to_query |> Ecto.Query.Planner.returning(true) |> attach_prefix(opts) - stream(:all, repo, adapter, query, opts) + lazy(&adapter.stream/7, :all, repo, adapter, query, opts) end def get(repo, adapter, queryable, id, opts) do @@ -109,6 +109,15 @@ defmodule Ecto.Repo.Queryable do execute(:delete_all, repo, adapter, query, opts) end + def start_producer(repo, adapter, queryable, opts) when is_list(opts) do + query = + queryable + |> Ecto.Queryable.to_query + |> Ecto.Query.Planner.returning(true) + |> attach_prefix(opts) + lazy(&adapter.start_producer/7, :all, repo, adapter, query, opts) + end + ## Helpers defp attach_prefix(query, opts) do @@ -137,25 +146,27 @@ defmodule Ecto.Repo.Queryable do end end - defp stream(operation, repo, adapter, query, opts) do + defp lazy(fun, operation, repo, adapter, query, opts) do {meta, prepared, params} = Planner.query(query, operation, repo, adapter, 0) case meta do %{fields: nil} -> - adapter.stream(repo, meta, prepared, params, nil, opts) - |> Stream.flat_map(fn({_, nil}) -> [] end) + flat_map = fn({_, nil}) -> [] end + fun.(repo, meta, prepared, params, nil, flat_map, opts) %{select: select, fields: fields, prefix: prefix, take: take, sources: sources, assocs: assocs, preloads: preloads} -> preprocess = preprocess(prefix, sources, adapter) - stream = adapter.stream(repo, meta, prepared, params, preprocess, opts) + postprocess = postprocess(select, fields, take) {_, take_0} = Map.get(take, 0, {:any, %{}}) - Stream.flat_map(stream, fn({_, rows}) -> + flat_map = fn({_, rows}) -> rows |> Ecto.Repo.Assoc.query(assocs, sources) |> Ecto.Repo.Preloader.query(repo, preloads, take_0, postprocess, opts) - end) + end + + fun.(repo, meta, prepared, params, preprocess, flat_map, opts) end end diff --git a/lib/ecto/repo/schema.ex b/lib/ecto/repo/schema.ex index 4d95f5d071..331ba9b86e 100644 --- a/lib/ecto/repo/schema.ex +++ b/lib/ecto/repo/schema.ex @@ -50,6 +50,45 @@ defmodule Ecto.Repo.Schema do {count, postprocess(rows, fields, adapter, schema, source)} end + @doc """ + Implementation for `Ecto.Repo.start_consumer/2`. + """ + def start_consumer(repo, adapter, schema, opts) when is_atom(schema) do + start_consumer(repo, adapter, schema, schema.__schema__(:prefix), + schema.__schema__(:source), opts) + end + + def start_consumer(repo, adapter, table, opts) when is_binary(table) do + start_consumer(repo, adapter, nil, nil, table, opts) + end + + def start_consumer(repo, adapter, {source, schema}, opts) when is_atom(schema) do + start_consumer(repo, adapter, schema, schema.__schema__(:prefix), source, opts) + end + + defp start_consumer(repo, adapter, schema, prefix, source, opts) do + returning = opts[:returning] || false + autogen = schema && schema.__schema__(:autogenerate_id) + source = {Keyword.get(opts, :prefix, prefix), source} + fields = preprocess(returning, schema) + + insert_all = fn(rows) -> + {rows, header} = extract_header_and_fields(rows, schema, autogen, adapter) + counter = fn -> Enum.reduce(rows, 0, &length(&1) + &2) end + metadata = %{source: source, context: nil, schema: schema, autogenerate_id: autogen} + + {on_conflict, opts} = Keyword.pop(opts, :on_conflict, :raise) + {conflict_target, opts} = Keyword.pop(opts, :conflict_target, []) + on_conflict = on_conflict(on_conflict, conflict_target, metadata, counter, adapter) + + {count, _} = + adapter.insert_all(repo, metadata, Map.keys(header), rows, on_conflict, fields || [], opts) + count + end + + adapter.start_consumer(repo, insert_all, opts) + end + defp preprocess([_|_] = fields, _schema), do: fields defp preprocess([], _schema), diff --git a/mix.exs b/mix.exs index 4caf6af410..ddfdbd93fa 100644 --- a/mix.exs +++ b/mix.exs @@ -43,8 +43,10 @@ defmodule Ecto.Mixfile do {:decimal, "~> 1.2"}, # Drivers - {:db_connection, "~> 1.1", optional: true}, - {:postgrex, "~> 0.13.0", optional: true}, + {:db_connection, "~> 1.1", github: "elixir-ecto/db_connection", branch: "jf-stream_stage", override: true, optional: true}, + {:gen_stage, "~> 0.11", github: "elixir-lang/gen_stage", branch: "jv-exit-signals", optional: true}, + {:flow, "~> 0.11", github: "elixir-lang/flow", branch: "jv-exit-signals", optional: true}, + {:postgrex, "~> 0.13.0", github: "elixir-ecto/postgrex", branch: "jf-stage", optional: true}, {:mariaex, "~> 0.8.0", optional: true}, # Optional diff --git a/mix.lock b/mix.lock index 99e7bcf1d9..8a887a93a2 100644 --- a/mix.lock +++ b/mix.lock @@ -1,12 +1,14 @@ %{"backoff": {:hex, :backoff, "1.1.1"}, - "connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], []}, - "db_connection": {:hex, :db_connection, "1.1.2", "2865c2a4bae0714e2213a0ce60a1b12d76a6efba0c51fbda59c9ab8d1accc7a8", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, optional: false]}, {:poolboy, "~> 1.5", [hex: :poolboy, optional: true]}, {:sbroker, "~> 1.0", [hex: :sbroker, optional: true]}]}, - "decimal": {:hex, :decimal, "1.3.1", "157b3cedb2bfcb5359372a7766dd7a41091ad34578296e951f58a946fcab49c6", [:mix], []}, - "earmark": {:hex, :earmark, "1.0.3", "89bdbaf2aca8bbb5c97d8b3b55c5dd0cff517ecc78d417e87f1d0982e514557b", [:mix], []}, - "ex_doc": {:hex, :ex_doc, "0.14.5", "c0433c8117e948404d93ca69411dd575ec6be39b47802e81ca8d91017a0cf83c", [:mix], [{:earmark, "~> 1.0", [hex: :earmark, optional: false]}]}, - "inch_ex": {:hex, :inch_ex, "0.5.5", "b63f57e281467bd3456461525fdbc9e158c8edbe603da6e3e4671befde796a3d", [:mix], [{:poison, "~> 1.5 or ~> 2.0 or ~> 3.0", [hex: :poison, optional: false]}]}, - "mariaex": {:hex, :mariaex, "0.8.2", "a9ee64a02fd72579f844934b4cbecca9566593e499125edf6032c58f9d50b5f9", [:mix], [{:db_connection, "~> 1.1", [hex: :db_connection, optional: false]}, {:decimal, "~> 1.0", [hex: :decimal, optional: false]}]}, - "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], []}, - "poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [:rebar], []}, - "postgrex": {:hex, :postgrex, "0.13.0", "e101ab47d0725955c5c8830ae8812412992e02e4bd9db09e17abb0a5d82d09c7", [:mix], [{:connection, "~> 1.0", [hex: :connection, optional: false]}, {:db_connection, "~> 1.1", [hex: :db_connection, optional: false]}, {:decimal, "~> 1.0", [hex: :decimal, optional: false]}]}, - "sbroker": {:hex, :sbroker, "1.0.0", "28ff1b5e58887c5098539f236307b36fe1d3edaa2acff9d6a3d17c2dcafebbd0", [:rebar3], []}} + "connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [], [], "hexpm"}, + "db_connection": {:git, "https://github.com/elixir-ecto/db_connection.git", "845b46c234a3723bc4c1af1059a86bc9e9d85af0", [branch: "jf-stream_stage"]}, + "decimal": {:hex, :decimal, "1.3.1", "157b3cedb2bfcb5359372a7766dd7a41091ad34578296e951f58a946fcab49c6", [:mix], [], "hexpm"}, + "earmark": {:hex, :earmark, "1.0.3", "89bdbaf2aca8bbb5c97d8b3b55c5dd0cff517ecc78d417e87f1d0982e514557b", [:mix], [], "hexpm"}, + "ex_doc": {:hex, :ex_doc, "0.14.5", "c0433c8117e948404d93ca69411dd575ec6be39b47802e81ca8d91017a0cf83c", [:mix], [{:earmark, "~> 1.0", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"}, + "flow": {:git, "https://github.com/elixir-lang/flow.git", "cfdc0d240aa990c5229c70e032595f2b063a1445", [branch: "jv-exit-signals"]}, + "gen_stage": {:git, "https://github.com/elixir-lang/gen_stage.git", "966d998c699dec58e0d183ddbaa2b783c46cd375", [branch: "jv-exit-signals"]}, + "inch_ex": {:hex, :inch_ex, "0.5.5", "b63f57e281467bd3456461525fdbc9e158c8edbe603da6e3e4671befde796a3d", [:mix], [{:poison, "~> 1.5 or ~> 2.0 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm"}, + "mariaex": {:hex, :mariaex, "0.8.2", "a9ee64a02fd72579f844934b4cbecca9566593e499125edf6032c58f9d50b5f9", [:mix], [{:db_connection, "~> 1.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: false]}], "hexpm"}, + "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"}, + "poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [], [], "hexpm"}, + "postgrex": {:git, "https://github.com/elixir-ecto/postgrex.git", "b825546e8954f9623e256f2423483bf42a2c6596", [branch: "jf-stage"]}, + "sbroker": {:hex, :sbroker, "1.0.0", "28ff1b5e58887c5098539f236307b36fe1d3edaa2acff9d6a3d17c2dcafebbd0", [:rebar3], [], "hexpm"}}