diff --git a/.travis.yml b/.travis.yml index f568d758..84a820e9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,16 +2,12 @@ language: elixir env: - ELIXIR_ERL_OPTIONS="+T 9" elixir: - - 1.2.6 - 1.3.0 - 1.3.4 otp_release: - 18.3 - 19.1 -matrix: - include: - - elixir: 1.2.0 - otp_release: 18.2 + - 19.3 sudo: false script: - mix test.all diff --git a/integration_test/cases/consumer_test.exs b/integration_test/cases/consumer_test.exs new file mode 100644 index 00000000..0cd178e6 --- /dev/null +++ b/integration_test/cases/consumer_test.exs @@ -0,0 +1,35 @@ +defmodule ConsumerTest do + use ExUnit.Case, async: true + + alias TestPool, as: P + alias TestAgent, as: A + alias TestQuery, as: Q + alias TestResult, as: R + + test "start_link consumes events" do + stack = [ + {:ok, :state}, + {:ok, %R{}, :new_state}, + {:ok, %R{}, :newer_state}, + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self(), stage_transaction: false] + {:ok, pool} = P.start_link(opts) + {:ok, stage} = P.start_consumer(pool, &P.execute!(&1, %Q{}, &2, opts), opts) + mon = Process.monitor(stage) + + {:ok, _} = + [1, 2] + |> Flow.from_enumerable() + |> Flow.into_stages([{stage, [cancel: :transient, max_demand: 1]}]) + + assert_receive {:DOWN, ^mon, :process, ^stage, :normal} + + assert [ + connect: [_], + handle_execute: [%Q{}, [1], _, :state], + handle_execute: [%Q{}, [2], _, :new_state] + ] = A.record(agent) + end +end diff --git a/integration_test/cases/continuation_test.exs b/integration_test/cases/continuation_test.exs new file mode 100644 index 00000000..05336d26 --- /dev/null +++ b/integration_test/cases/continuation_test.exs @@ -0,0 +1,411 @@ +defmodule ContinuationTest do + use ExUnit.Case, async: true + + alias TestPool, as: P + alias TestAgent, as: A + alias TestQuery, as: Q + alias TestResult, as: R + + test "transaction commits after stream resource reduced" do + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:ok, %R{}, :newer_state}, + {:ok, :committed, :newest_state} + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self()] + {:ok, pool} = P.start_link(opts) + + start = fn() -> P.checkout_begin(pool, opts) end + next = fn(conn) -> + {:ok, res} = P.transaction(conn, fn(conn2) -> + P.execute!(conn2, %Q{}, [:param], opts) + end, opts) + {[res], conn} + end + stop = &P.commit_checkin/1 + + assert Stream.resource(start, next, stop) |> Enum.take(1) == [%R{}] + + assert [ + connect: [_], + handle_begin: [_, :state], + handle_execute: [%Q{}, [:param], _, :new_state], + handle_commit: [_, :newer_state] + ] = A.record(agent) + end + + test "transaction commits per trigger inside Flow pipeline" do + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:ok, %R{}, :newer_state}, + {:ok, :committed, :newest_state} + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self()] + {:ok, pool} = P.start_link(opts) + + assert [[:param]] + |> Flow.from_enumerable() + |> Flow.partition(stages: 1) + |> Flow.reduce(fn() -> {[], P.checkout_begin(pool, opts)} end, + fn(params, {acc, conn}) -> + {:ok, res} = P.transaction(conn, fn(conn2) -> + P.execute!(conn2, %Q{}, params, opts) + end) + {[res | acc], conn} + end) + |> Flow.map_state(fn({acc, conn}) -> + P.commit_checkin(conn, opts) + Enum.reverse(acc) + end) + |> Enum.to_list() == [%R{}] + + assert [ + connect: [_], + handle_begin: [_, :state], + handle_execute: [%Q{}, [:param], _, :new_state], + handle_commit: [_, :newer_state] + ] = A.record(agent) + end + + test "checkout_begin raises on checkin" do + stack = [ + fn(opts) -> + Process.link(opts[:parent]) + {:ok, :state} + end, + {:ok, :began, :new_state}, + {:ok, :state2} + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self()] + Process.flag(:trap_exit, true) + {:ok, pool} = P.start_link(opts) + + conn = P.checkout_begin(pool, opts) + + assert_raise RuntimeError, "inside transaction", + fn() -> P.checkin(conn, opts) end + + assert P.commit_checkin(conn, opts) == {:error, :rollback} + + assert_receive {:EXIT, _, {%DBConnection.ConnectionError{}, [_|_]}} + + assert [ + {:connect, [_]}, + {:handle_begin, [_, :state]} | _] = A.record(agent) + end + + test "transaction raises on checkin" do + stack = [ + fn(opts) -> + Process.link(opts[:parent]) + {:ok, :state} + end, + {:ok, :began, :new_state}, + {:ok, :state2} + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self()] + Process.flag(:trap_exit, true) + {:ok, pool} = P.start_link(opts) + + conn = P.checkout_begin(pool, opts) + + assert_raise RuntimeError, "inside transaction", + fn() -> P.checkin(conn, opts) end + + assert_receive {:EXIT, _, {%DBConnection.ConnectionError{}, [_|_]}} + + assert P.transaction(conn, fn(conn2) -> + assert_raise RuntimeError, "inside transaction", + fn() -> P.checkin(conn2, opts) end + :hello + end) == {:error, :rollback} + + assert P.commit_checkin(conn, opts) == {:error, :rollback} + + assert [ + {:connect, [_]}, + {:handle_begin, [_, :state]} | _] = A.record(agent) + end + + test "transaction runs inside transaction" do + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:ok, %R{}, :newer_state}, + {:ok, :commited, :newest_state} + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self()] + {:ok, pool} = P.start_link(opts) + + assert P.transaction(pool, fn(conn) -> + assert P.transaction(conn, &P.execute!(&1, %Q{}, [:param], opts), + opts) == {:ok, %R{}} + :hello + end) == {:ok, :hello} + + assert [ + connect: [_], + handle_begin: [_, :state], + handle_execute: [%Q{}, [:param], _, :new_state], + handle_commit: [_, :newer_state] + ] = A.record(agent) + end + + test "transaction rolls back and returns error" do + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:ok, :rolledback, :newest_state} + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self()] + {:ok, pool} = P.start_link(opts) + + conn = P.checkout_begin(pool, opts) + + assert P.rollback_checkin(conn, :oops, opts) == {:error, :oops} + assert P.commit_checkin(conn, opts) == {:error, :rollback} + assert P.rollback_checkin(conn, :oops, opts) == {:error, :oops} + assert P.transaction(conn, fn(_) -> + flunk "should not fun" + end, opts) == {:error, :rollback} + + assert [ + connect: [_], + handle_begin: [_, :state], + handle_rollback: [_, :new_state] + ] = A.record(agent) + end + + test "transaction runs inside resource_transaction" do + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:ok, %R{}, :newer_state}, + {:ok, :commited, :newest_state} + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self()] + {:ok, pool} = P.start_link(opts) + + conn = P.checkout_begin(pool, opts) + + assert P.transaction(conn, fn(conn2) -> + assert P.transaction(conn2, fn(conn3) -> + P.execute!(conn3, %Q{}, [:param], opts) + end, opts) == {:ok, %R{}} + :hello + end) == {:ok, :hello} + + assert P.commit_checkin(conn, opts) == :ok + + assert [ + connect: [_], + handle_begin: [_, :state], + handle_execute: [%Q{}, [:param], _, :new_state], + handle_commit: [_, :newer_state] + ] = A.record(agent) + end + + test "checkout_begin logs error" do + err = RuntimeError.exception("oops") + stack = [ + {:ok, :state}, + {:error, err, :new_state} + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + + log = &send(parent, &1) + assert_raise RuntimeError, "oops", + fn() -> P.checkout_begin(pool, [log: log]) end + + assert_received %DBConnection.LogEntry{call: :checkout_begin} = entry + assert %{query: :begin, params: nil, result: {:error, ^err}} = entry + assert is_integer(entry.pool_time) + assert entry.pool_time >= 0 + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_nil(entry.decode_time) + + assert [ + connect: [_], + handle_begin: [ _, :state]] = A.record(agent) + end + + test "checkout_begin logs raises and rolls back" do + err = RuntimeError.exception("oops") + stack = [ + {:ok, :state}, + {:ok, :raise, :new_state}, + {:ok, :rolledback, :newer_state} + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + + log = fn + %DBConnection.LogEntry{result: {:ok, :raise}} -> raise err + entry -> send(parent, entry) + end + + assert_raise RuntimeError, "oops", + fn() -> P.checkout_begin(pool, [log: log]) end + + assert_received %DBConnection.LogEntry{call: :checkout_begin} = entry + assert %{query: :rollback, params: nil, result: {:ok, :rolledback}} = entry + assert is_nil(entry.pool_time) + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_nil(entry.decode_time) + + assert [ + connect: [_], + handle_begin: [ _, :state], + handle_rollback: [_, :new_state] + ] = A.record(agent) + end + + test "transaction logs on rollback" do + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:ok, :rolledback, :newer_state} + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + + conn = P.checkout_begin(pool, opts) + + log = &send(parent, &1) + assert P.transaction(conn, fn(conn2) -> + P.rollback(conn2, :oops) + end, [log: log]) == {:error, :oops} + + assert_received %DBConnection.LogEntry{call: :transaction} = entry + assert %{query: :rollback, params: nil, result: {:ok, :rolledback}} = entry + assert is_nil(entry.pool_time) + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_nil(entry.decode_time) + + assert [ + connect: [_], + handle_begin: [ _, :state], + handle_rollback: [_, :new_state] + ] = A.record(agent) + end + + test "transaction rolls back on failed transaction" do + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:ok, :rolledback, :newer_state} + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + + conn = P.checkout_begin(pool, opts) + + assert P.transaction(conn, fn(conn2) -> + assert P.transaction(conn2, &P.rollback(&1, :oops), opts) == {:error, :oops} + end, opts) == {:error, :rollback} + + assert P.transaction(conn, fn(_) -> + flunk "should not run" + end, opts) == {:error, :rollback} + + assert [ + connect: [_], + handle_begin: [ _, :state], + handle_rollback: [_, :new_state] + ] = A.record(agent) + end + + test "commit_checkin logs" do + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:ok, :committed, :newer_state} + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + + conn = P.checkout_begin(pool, opts) + + log = &send(parent, &1) + assert P.commit_checkin(conn, [log: log]) == :ok + + assert_received %DBConnection.LogEntry{call: :commit_checkin} = entry + assert %{query: :commit, params: nil, result: {:ok, :committed}} = entry + assert is_nil(entry.pool_time) + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_nil(entry.decode_time) + + assert [ + connect: [_], + handle_begin: [ _, :state], + handle_commit: [_, :new_state] + ] = A.record(agent) + end + + test "rollback_checkin logs" do + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:ok, :rolledback, :newer_state} + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + + conn = P.checkout_begin(pool, opts) + + log = &send(parent, &1) + assert P.rollback_checkin(conn, :oops, [log: log]) == {:error, :oops} + + assert_received %DBConnection.LogEntry{call: :rollback_checkin} = entry + assert %{query: :rollback, params: nil, result: {:ok, :rolledback}} = entry + assert is_nil(entry.pool_time) + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_nil(entry.decode_time) + + assert [ + connect: [_], + handle_begin: [ _, :state], + handle_rollback: [_, :new_state] + ] = A.record(agent) + end +end diff --git a/integration_test/cases/producer_test.exs b/integration_test/cases/producer_test.exs new file mode 100644 index 00000000..9244b94e --- /dev/null +++ b/integration_test/cases/producer_test.exs @@ -0,0 +1,351 @@ +defmodule ProducerTest do + use ExUnit.Case, async: true + + alias TestPool, as: P + alias TestAgent, as: A + alias TestQuery, as: Q + alias TestCursor, as: C + alias TestResult, as: R + + test "start_link produces result" do + stack = [ + {:ok, :state}, + {:ok, %C{}, :new_state}, + {:ok, %R{}, :newer_state}, + {:deallocate, %R{}, :newest_state}, + {:ok, :deallocated, :state2}, + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self(), stage_transaction: false] + {:ok, pool} = P.start_link(opts) + {:ok, stage} = P.start_producer(pool, %Q{}, [:param], opts) + mon = Process.monitor(stage) + assert [{stage, [cancel: :transient]}] |> GenStage.stream() |> Enum.to_list() == [%R{}, %R{}] + + assert_receive {:DOWN, ^mon, :process, ^stage, :normal} + + assert [ + connect: [_], + handle_declare: [%Q{}, [:param], _, :state], + handle_first: [%Q{}, %C{}, _, :new_state], + handle_next: [%Q{}, %C{}, _, :newer_state], + handle_deallocate: [%Q{}, %C{}, _, :newest_state] + ] = A.record(agent) + end + + test "start_link with prepare: true produces result" do + stack = [ + {:ok, :state}, + {:ok, %Q{}, :new_state}, + {:ok, %C{}, :newer_state}, + {:ok, %R{}, :newest_state}, + {:deallocate, %R{}, :state2}, + {:ok, :deallocated, :new_state2}, + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self(), stage_transaction: false] + {:ok, pool} = P.start_link(opts) + {:ok, stage} = P.start_producer(pool, %Q{}, [:param], [stage_prepare: true] ++ opts) + mon = Process.monitor(stage) + assert [{stage, [cancel: :transient]}] |> GenStage.stream() |> Enum.to_list() == [%R{}, %R{}] + + assert_receive {:DOWN, ^mon, :process, ^stage, :normal} + + assert [ + connect: [_], + handle_prepare: [%Q{}, _, :state], + handle_declare: [%Q{}, [:param], _, :new_state], + handle_first: [%Q{}, %C{}, _, :newer_state], + handle_next: [%Q{}, %C{}, _, :newest_state], + handle_deallocate: [%Q{}, %C{}, _, :state2] + ] = A.record(agent) + end + + test "stage stops normally after it's done" do + stack = [ + {:ok, :state}, + {:ok, %C{}, :new_state}, + {:deallocate, %R{}, :newer_state}, + {:ok, :deallocated, :newest_state}, + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent, stage_transaction: false] + {:ok, pool} = P.start_link(opts) + {:ok, stage} = P.start_producer(pool, %Q{}, [:param], opts) + + ref = Process.monitor(stage) + send(stage, {:"$gen_producer", {parent, ref}, {:subscribe, nil, [cancel: :transient]}}) + sub = {stage, ref} + GenStage.ask(sub, 1000) + + assert_receive {:"$gen_consumer", ^sub, [%R{}]} + + assert_receive {:DOWN, ^ref, :process, ^stage, :normal} + refute_received {:"$gen_producer", ^sub, _} + + assert [ + connect: [_], + handle_declare: [%Q{}, [:param], _, :state], + handle_first: [%Q{}, %C{}, _, :new_state], + handle_deallocate: [%Q{}, %C{}, _, :newer_state], + ] = A.record(agent) + end + + test "stage with execute in stream_mapper" do + stack = [ + {:ok, :state}, + {:ok, %C{}, :new_state}, + {:ok, %R{}, :newer_state}, + {:ok, %R{}, :newest_state}, + {:deallocate, %R{}, :state2}, + {:ok, %R{}, :new_state2}, + {:ok, :deallocated, :newer_state2}, + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self(), stage_transaction: false] + {:ok, pool} = P.start_link(opts) + + mapper = fn(conn, res) -> [P.execute!(conn, %Q{}, res, opts), :mapped] end + opts = [stream_mapper: mapper] ++ opts + {:ok, stage} = P.start_producer(pool, %Q{}, [:param], opts) + mon = Process.monitor(stage) + assert [{stage, [cancel: :transient]}] |> GenStage.stream() |> Enum.to_list() == [%R{}, :mapped, %R{}, :mapped] + + assert_receive {:DOWN, ^mon, :process, ^stage, :normal} + + assert [ + connect: [_], + handle_declare: [%Q{}, [:param], _, :state], + handle_first: [%Q{}, %C{}, _, :new_state], + handle_execute: [%Q{}, %R{}, _, :newer_state], + handle_next: [%Q{}, %C{}, _, :newest_state], + handle_execute: [%Q{}, %R{}, _, :state2], + handle_deallocate: [%Q{}, %C{}, _, :new_state2], + ] = A.record(agent) + end + + test "stage checks in on abnormal exit" do + stack = [ + {:ok, :state}, + {:ok, %C{}, :new_state}, + {:ok, :deallocated, :newer_state}, + {:ok, %R{}, :new_state2} + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent, stage_transaction: false] + Process.flag(:trap_exit, true) + {:ok, pool} = P.start_link(opts) + {:ok, stage} = P.start_producer(pool, %Q{}, [:param], opts) + + send(stage, {:"$gen_producer", {parent, make_ref()}, {:subscribe, nil, []}}) + + GenStage.stop(stage, :oops) + + assert P.execute!(pool, %Q{}, [:param], opts) == %R{} + + assert [ + connect: [_], + handle_declare: [%Q{}, [:param], _, :state], + handle_deallocate: [%Q{}, %C{}, _, :new_state], + handle_execute: [%Q{}, [:param], _, :newer_state] + ] = A.record(agent) + end + + test "stage declare disconnects" do + err = RuntimeError.exception("oops") + stack = [ + {:ok, :state}, + {:disconnect, err, :new_state}, + :ok, + fn(opts) -> + send(opts[:parent], :reconnected) + {:ok, :state2} + end + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent, stage_transaction: false] + Process.flag(:trap_exit, true) + {:ok, pool} = P.start_link(opts) + assert {:error, {^err, _}} = P.start_producer(pool, %Q{}, [:param], opts) + + assert_receive :reconnected + + assert [ + connect: [_], + handle_declare: [%Q{}, [:param], _, :state], + disconnect: [^err, :new_state], + connect: [_] + ] = A.record(agent) + end + + test "stage declare bad return raises and stops" do + stack = [ + fn(opts) -> + send(opts[:parent], {:hi, self()}) + Process.link(opts[:parent]) + {:ok, :state} + end, + :oops, + {:ok, :state2} + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent, stage_transaction: false] + {:ok, pool} = P.start_link(opts) + assert_receive {:hi, conn} + + Process.flag(:trap_exit, true) + {:error, {%DBConnection.ConnectionError{}, _} = reason} = P.start_producer(pool, %Q{}, [:param], opts) + + assert_receive {:EXIT, stage, ^reason} + + prefix = "client #{inspect stage} stopped: " <> + "** (DBConnection.ConnectionError) bad return value: :oops" + len = byte_size(prefix) + assert_receive {:EXIT, ^conn, + {%DBConnection.ConnectionError{message: <<^prefix::binary-size(len), _::binary>>}, + [_|_]}} + + assert [ + {:connect, _}, + {:handle_declare, [%Q{}, [:param], _, :state]} | _] = A.record(agent) + end + + test "stage checks in if first errors" do + err = RuntimeError.exception("oops") + stack = [ + {:ok, :state}, + {:ok, %C{}, :new_state}, + {:error, err, :newer_state}, + {:ok, :deallocated, :newest_state}, + {:ok, %R{}, :state2} + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent, stage_transaction: false] + {:ok, pool} = P.start_link(opts) + Process.flag(:trap_exit, true) + {:ok, stage} = P.start_producer(pool, %Q{}, [:param], opts) + catch_exit([{stage, [cancel: :transient]}] |> GenStage.stream() |> Enum.to_list()) + assert_receive {:EXIT, ^stage, {^err, _}} + + assert P.execute!(pool, %Q{}, [:param], opts) == %R{} + + assert [ + connect: [_], + handle_declare: [%Q{}, [:param], _, :state], + handle_first: [%Q{}, %C{}, _, :new_state], + handle_deallocate: [%Q{}, %C{}, _, :newer_state], + handle_execute: [%Q{}, [:param], _, :newest_state] + ] = A.record(agent) + end + + test "stage first disconnects" do + err = RuntimeError.exception("oops") + stack = [ + {:ok, :state}, + {:ok, %C{}, :new_state}, + {:disconnect, err, :newer_state}, + :ok, + fn(opts) -> + send(opts[:parent], :reconnected) + {:ok, :state2} + end + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent, stage_transaction: false] + {:ok, pool} = P.start_link(opts) + Process.flag(:trap_exit, true) + {:ok, stage} = P.start_producer(pool, %Q{}, [:param], opts) + catch_exit([{stage, [cancel: :transient]}] |> GenStage.stream() |> Enum.to_list()) + assert_receive {:EXIT, ^stage, {^err, _}} + + assert_receive :reconnected + + assert [ + connect: [_], + handle_declare: [%Q{}, [:param], _, :state], + handle_first: [%Q{}, %C{}, _, :new_state], + disconnect: [^err, :newer_state], + connect: [_] + ] = A.record(agent) + end + + test "stage checks in if deallocate errors" do + err = RuntimeError.exception("oops") + stack = [ + {:ok, :state}, + {:ok, %C{}, :new_state}, + {:deallocate, %R{}, :newer_state}, + {:error, err, :newest_state}, + {:ok, %R{}, :state2} + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent, stage_transaction: false] + {:ok, pool} = P.start_link(opts) + Process.flag(:trap_exit, true) + {:ok, stage} = P.start_producer(pool, %Q{}, [:param], opts) + catch_exit([{stage, [cancel: :transient]}] |> GenStage.stream() |> Enum.to_list()) + assert_receive {:EXIT, ^stage, {^err, _}} + + assert P.execute!(pool, %Q{}, [:param], opts) == %R{} + + assert [ + connect: [_], + handle_declare: [%Q{}, [:param], _, :state], + handle_first: [%Q{}, %C{}, _, :new_state], + handle_deallocate: [%Q{}, %C{}, _, :newer_state], + handle_execute: [%Q{}, [:param], _, :newest_state] + ] = A.record(agent) + end + + test "stage deallocate disconnects" do + err = RuntimeError.exception("oops") + stack = [ + {:ok, :state}, + {:ok, %C{}, :new_state}, + {:deallocate, %R{}, :newer_state}, + {:disconnect, err, :newest_state}, + :ok, + fn(opts) -> + send(opts[:parent], :reconnected) + {:ok, :new_state2} + end + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent, stage_transaction: false] + {:ok, pool} = P.start_link(opts) + Process.flag(:trap_exit, true) + {:ok, stage} = P.start_producer(pool, %Q{}, [:param], opts) + catch_exit([{stage, [cancel: :transient]}] |> GenStage.stream() |> Enum.to_list()) + assert_receive {:EXIT, ^stage, {^err, _}} + + assert_receive :reconnected + + assert [ + connect: [_], + handle_declare: [%Q{}, [:param], _, :state], + handle_first: [%Q{}, %C{}, _, :new_state], + handle_deallocate: [%Q{}, %C{}, _, :newer_state], + disconnect: [^err, :newest_state], + connect: [_] + ] = A.record(agent) + end +end diff --git a/integration_test/cases/queue_test.exs b/integration_test/cases/queue_test.exs index 0383ace6..78ea4e72 100644 --- a/integration_test/cases/queue_test.exs +++ b/integration_test/cases/queue_test.exs @@ -3,8 +3,9 @@ defmodule QueueTest do alias TestPool, as: P alias TestAgent, as: A + alias TestQuery, as: Q - test "queue: false raises on busy" do + test "run queue: false raises on busy" do stack = [{:ok, :state}] {:ok, agent} = A.start_link(stack) @@ -22,6 +23,151 @@ defmodule QueueTest do end) end + test "execute queue: false raises on busy" do + stack = [{:ok, :state}] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + + P.run(pool, fn(_) -> + {queue_time, _} = :timer.tc(fn() -> + opts = [queue: false] ++ opts + assert_raise DBConnection.ConnectionError, + "connection not available and queuing is disabled", + fn() -> P.execute!(pool, %Q{}, [:param], opts) end + end) + assert queue_time <= 1_000_000, "request was queued" + end) + end + + test "execute queue: false raises on busy and logs" do + stack = [{:ok, :state}] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + + P.run(pool, fn(_) -> + {queue_time, _} = :timer.tc(fn() -> + log = &send(parent, &1) + opts = [queue: false, log: log] ++ opts + assert_raise DBConnection.ConnectionError, + "connection not available and queuing is disabled", + fn() -> P.execute!(pool, %Q{}, [:param], opts) end + end) + assert queue_time <= 1_000_000, "request was queued" + end) + + assert_received %DBConnection.LogEntry{call: :execute} = entry + assert %{query: %Q{}, params: [:param], result: result} = entry + assert {:error, %DBConnection.ConnectionError{}} = result + assert is_integer(entry.pool_time) + assert entry.pool_time >= 0 + assert is_nil(entry.connection_time) + assert is_nil(entry.decode_time) + end + + test "transaction queue: false raises on busy" do + stack = [{:ok, :state}] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + + P.run(pool, fn(_) -> + {queue_time, _} = :timer.tc(fn() -> + opts = [queue: false] ++ opts + assert_raise DBConnection.ConnectionError, + "connection not available and queuing is disabled", + fn() -> + P.transaction(pool, fn() -> flunk("should not run") end, opts) + end + end) + assert queue_time <= 1_000_000, "request was queued" + end) + end + + test "transaction queue: false raises on busy and logs" do + stack = [{:ok, :state}] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + + P.run(pool, fn(_) -> + {queue_time, _} = :timer.tc(fn() -> + log = &send(parent, &1) + opts = [queue: false, log: log] ++ opts + assert_raise DBConnection.ConnectionError, + "connection not available and queuing is disabled", + fn() -> + P.transaction(pool, fn() -> flunk("should not run") end, opts) + end + end) + assert queue_time <= 1_000_000, "request was queued" + end) + + assert_received %DBConnection.LogEntry{call: :transaction} = entry + assert %{query: :begin, params: nil, result: result} = entry + assert {:error, %DBConnection.ConnectionError{}} = result + assert is_integer(entry.pool_time) + assert entry.pool_time >= 0 + assert is_nil(entry.connection_time) + assert is_nil(entry.decode_time) + end + + test "checkout_begin queue: false raises on busy" do + stack = [{:ok, :state}] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + + P.run(pool, fn(_) -> + {queue_time, _} = :timer.tc(fn() -> + opts = [queue: false] ++ opts + assert_raise DBConnection.ConnectionError, + "connection not available and queuing is disabled", + fn() -> P.checkout_begin(pool, opts) end + end) + assert queue_time <= 1_000_000, "request was queued" + end) + end + + test "checkout_begin queue: false raises on busy and logs" do + stack = [{:ok, :state}] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + + P.run(pool, fn(_) -> + {queue_time, _} = :timer.tc(fn() -> + log = &send(parent, &1) + opts = [queue: false, log: log] ++ opts + assert_raise DBConnection.ConnectionError, + "connection not available and queuing is disabled", + fn() -> P.checkout_begin(pool, opts) end + end) + assert queue_time <= 1_000_000, "request was queued" + end) + + assert_received %DBConnection.LogEntry{call: :checkout_begin} = entry + assert %{query: :begin, params: nil, result: result} = entry + assert {:error, %DBConnection.ConnectionError{}} = result + assert is_integer(entry.pool_time) + assert entry.pool_time >= 0 + assert is_nil(entry.connection_time) + assert is_nil(entry.decode_time) + end + test "queue many async" do stack = [{:ok, :state}] {:ok, agent} = A.start_link(stack) diff --git a/integration_test/cases/stream_test.exs b/integration_test/cases/stream_test.exs index 4bb92e43..b2965e56 100644 --- a/integration_test/cases/stream_test.exs +++ b/integration_test/cases/stream_test.exs @@ -71,6 +71,43 @@ defmodule StreamTest do ] = A.record(agent) end + test "stream with execute in stream_mapper" do + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:ok, %C{}, :newer_state}, + {:ok, %R{}, :newest_state}, + {:ok, %R{}, :state2}, + {:deallocate, %R{}, :new_state2}, + {:ok, %R{}, :newer_state2}, + {:ok, :deallocated, :newest_state2}, + {:ok, :commited, :state3} + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self()] + {:ok, pool} = P.start_link(opts) + assert P.transaction(pool, fn(conn) -> + mapper = fn(conn, res) -> [P.execute!(conn, %Q{}, res, opts), :mapped] end + stream = P.stream(conn, %Q{}, [:param], [stream_mapper: mapper]) + assert %DBConnection.Stream{} = stream + assert Enum.to_list(stream) == [%R{}, :mapped, %R{}, :mapped] + :hi + end) == {:ok, :hi} + + assert [ + connect: [_], + handle_begin: [_, :state], + handle_declare: [%Q{}, [:param], _, :new_state], + handle_first: [%Q{}, %C{}, _, :newer_state], + handle_execute: [%Q{}, %R{}, _, :newest_state], + handle_next: [%Q{}, %C{}, _, :state2], + handle_execute: [%Q{}, %R{}, _, :new_state2], + handle_deallocate: [%Q{}, %C{}, _, :newer_state2], + handle_commit: [_, :newest_state2] + ] = A.record(agent) + end + test "stream logs result" do stack = [ {:ok, :state}, diff --git a/integration_test/cases/transaction_producer_test.exs b/integration_test/cases/transaction_producer_test.exs new file mode 100644 index 00000000..06145014 --- /dev/null +++ b/integration_test/cases/transaction_producer_test.exs @@ -0,0 +1,375 @@ +defmodule TransactionProducerTest do + use ExUnit.Case, async: true + + alias TestPool, as: P + alias TestAgent, as: A + alias TestQuery, as: Q + alias TestCursor, as: C + alias TestResult, as: R + + test "start_link produces result" do + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:ok, %C{}, :newer_state}, + {:ok, %R{}, :newest_state}, + {:deallocate, %R{}, :state2}, + {:ok, :deallocated, :new_state2}, + {:ok, :commited, :newer_state2} + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self()] + {:ok, pool} = P.start_link(opts) + {:ok, stage} = P.start_producer(pool, %Q{}, [:param], opts) + mon = Process.monitor(stage) + assert [{stage, [cancel: :transient]}] |> GenStage.stream() |> Enum.to_list() == [%R{}, %R{}] + + assert_receive {:DOWN, ^mon, :process, ^stage, :normal} + + assert [ + connect: [_], + handle_begin: [_, :state], + handle_declare: [%Q{}, [:param], _, :new_state], + handle_first: [%Q{}, %C{}, _, :newer_state], + handle_next: [%Q{}, %C{}, _, :newest_state], + handle_deallocate: [%Q{}, %C{}, _, :state2], + handle_commit: [_, :new_state2] + ] = A.record(agent) + end + + test "start_link with prepare: true produces result" do + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:ok, %Q{}, :newer_state}, + {:ok, %C{}, :newest_state}, + {:ok, %R{}, :state2}, + {:deallocate, %R{}, :new_state2}, + {:ok, :deallocated, :newer_state2}, + {:ok, :commited, :newest_state2} + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self()] + {:ok, pool} = P.start_link(opts) + {:ok, stage} = P.start_producer(pool, %Q{}, [:param], [stage_prepare: true] ++ opts) + mon = Process.monitor(stage) + assert [{stage, [cancel: :transient]}] |> GenStage.stream() |> Enum.to_list() == [%R{}, %R{}] + + assert_receive {:DOWN, ^mon, :process, ^stage, :normal} + + assert [ + connect: [_], + handle_begin: [_, :state], + handle_prepare: [%Q{}, _, :new_state], + handle_declare: [%Q{}, [:param], _, :newer_state], + handle_first: [%Q{}, %C{}, _, :newest_state], + handle_next: [%Q{}, %C{}, _, :state2], + handle_deallocate: [%Q{}, %C{}, _, :new_state2], + handle_commit: [_, :newer_state2] + ] = A.record(agent) + end + + test "stage stops normally after it's done" do + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:ok, %C{}, :newer_state}, + {:deallocate, %R{}, :state2}, + {:ok, :deallocated, :new_state2}, + {:ok, :commited, :newer_state2} + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + {:ok, stage} = P.start_producer(pool, %Q{}, [:param], opts) + + ref = Process.monitor(stage) + send(stage, {:"$gen_producer", {parent, ref}, {:subscribe, nil, [cancel: :transient]}}) + sub = {stage, ref} + GenStage.ask(sub, 1000) + + assert_receive {:"$gen_consumer", ^sub, [%R{}]} + + assert_receive {:DOWN, ^ref, :process, ^stage, :normal} + refute_received {:"$gen_producer", ^sub, _} + + assert [ + connect: [_], + handle_begin: [_, :state], + handle_declare: [%Q{}, [:param], _, :new_state], + handle_first: [%Q{}, %C{}, _, :newer_state], + handle_deallocate: [%Q{}, %C{}, _, :state2], + handle_commit: [_, :new_state2] + ] = A.record(agent) + end + + test "stage with execute in stream_mapper" do + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:ok, %C{}, :newer_state}, + {:ok, %R{}, :newest_state}, + {:ok, %R{}, :state2}, + {:deallocate, %R{}, :new_state2}, + {:ok, %R{}, :newer_state2}, + {:ok, :deallocated, :newest_state2}, + {:ok, :committed, :state3} + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self()] + {:ok, pool} = P.start_link(opts) + + mapper = fn(conn, res) -> [P.execute!(conn, %Q{}, res, opts), :mapped] end + opts = [stream_mapper: mapper] ++ opts + {:ok, stage} = P.start_producer(pool, %Q{}, [:param], opts) + mon = Process.monitor(stage) + assert [{stage, [cancel: :transient]}] |> GenStage.stream() |> Enum.to_list() == [%R{}, :mapped, %R{}, :mapped] + + assert_receive {:DOWN, ^mon, :process, ^stage, :normal} + + assert [ + connect: [_], + handle_begin: [_, :state], + handle_declare: [%Q{}, [:param], _, :new_state], + handle_first: [%Q{}, %C{}, _, :newer_state], + handle_execute: [%Q{}, %R{}, _, :newest_state], + handle_next: [%Q{}, %C{}, _, :state2], + handle_execute: [%Q{}, %R{}, _, :new_state2], + handle_deallocate: [%Q{}, %C{}, _, :newer_state2], + handle_commit: [_, :newest_state2] + ] = A.record(agent) + end + + test "stage rolls back on abnormal exit" do + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:ok, %C{}, :newer_state}, + {:ok, :deallocated, :newest_state}, + {:ok, :rolledback, :state2} + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + Process.flag(:trap_exit, true) + {:ok, pool} = P.start_link(opts) + {:ok, stage} = P.start_producer(pool, %Q{}, [:param], opts) + + send(stage, {:"$gen_producer", {parent, make_ref()}, {:subscribe, nil, []}}) + + GenStage.stop(stage, :oops) + + assert [ + connect: [_], + handle_begin: [_, :state], + handle_declare: [%Q{}, [:param], _, :new_state], + handle_deallocate: [%Q{}, %C{}, _, :newer_state], + handle_rollback: [_, :newest_state] + ] = A.record(agent) + end + + test "stage declare disconnects" do + err = RuntimeError.exception("oops") + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:disconnect, err, :newer_state}, + :ok, + fn(opts) -> + send(opts[:parent], :reconnected) + {:ok, :state2} + end + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + Process.flag(:trap_exit, true) + {:ok, pool} = P.start_link(opts) + assert {:error, {^err, _}} = P.start_producer(pool, %Q{}, [:param], opts) + + assert_receive :reconnected + + assert [ + connect: [_], + handle_begin: [_, :state], + handle_declare: [%Q{}, [:param], _, :new_state], + disconnect: [^err, :newer_state], + connect: [_] + ] = A.record(agent) + end + + test "stage declare bad return raises and stops" do + stack = [ + fn(opts) -> + send(opts[:parent], {:hi, self()}) + Process.link(opts[:parent]) + {:ok, :state} + end, + {:ok, :began, :new_state}, + :oops, + {:ok, :state2} + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + assert_receive {:hi, conn} + + Process.flag(:trap_exit, true) + {:error, {%DBConnection.ConnectionError{}, _} = reason} = P.start_producer(pool, %Q{}, [:param], opts) + + assert_receive {:EXIT, stage, ^reason} + + prefix = "client #{inspect stage} stopped: " <> + "** (DBConnection.ConnectionError) bad return value: :oops" + len = byte_size(prefix) + assert_receive {:EXIT, ^conn, + {%DBConnection.ConnectionError{message: <<^prefix::binary-size(len), _::binary>>}, + [_|_]}} + + assert [ + {:connect, _}, + {:handle_begin, [_, :state]}, + {:handle_declare, [%Q{}, [:param], _, :new_state]} | _] = A.record(agent) + end + + test "stage rolls back if first errors" do + err = RuntimeError.exception("oops") + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:ok, %C{}, :newer_state}, + {:error, err, :newest_state}, + {:ok, :deallocated, :state2}, + {:ok, :rolledback, :new_state2} + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + Process.flag(:trap_exit, true) + {:ok, stage} = P.start_producer(pool, %Q{}, [:param], opts) + catch_exit([{stage, [cancel: :transient]}] |> GenStage.stream() |> Enum.to_list()) + assert_receive {:EXIT, ^stage, {^err, _}} + + assert [ + connect: [_], + handle_begin: [_, :state], + handle_declare: [%Q{}, [:param], _, :new_state], + handle_first: [%Q{}, %C{}, _, :newer_state], + handle_deallocate: [%Q{}, %C{}, _, :newest_state], + handle_rollback: [_, :state2] + ] = A.record(agent) + end + + test "stage first disconnects" do + err = RuntimeError.exception("oops") + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:ok, %C{}, :newer_state}, + {:disconnect, err, :newest_state}, + :ok, + fn(opts) -> + send(opts[:parent], :reconnected) + {:ok, :state2} + end + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + Process.flag(:trap_exit, true) + {:ok, stage} = P.start_producer(pool, %Q{}, [:param], opts) + catch_exit([{stage, [cancel: :transient]}] |> GenStage.stream() |> Enum.to_list()) + assert_receive {:EXIT, ^stage, {^err, _}} + + assert_receive :reconnected + + assert [ + connect: [_], + handle_begin: [_, :state], + handle_declare: [%Q{}, [:param], _, :new_state], + handle_first: [%Q{}, %C{}, _, :newer_state], + disconnect: [^err, :newest_state], + connect: [_] + ] = A.record(agent) + end + + test "stage rolls back if deallocate errors" do + err = RuntimeError.exception("oops") + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:ok, %C{}, :newer_state}, + {:deallocate, %R{}, :newest_state}, + {:error, err, :state2}, + {:ok, :rolledback, :new_state2} + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + Process.flag(:trap_exit, true) + {:ok, stage} = P.start_producer(pool, %Q{}, [:param], opts) + catch_exit([{stage, [cancel: :transient]}] |> GenStage.stream() |> Enum.to_list()) + assert_receive {:EXIT, ^stage, {^err, _}} + + assert [ + connect: [_], + handle_begin: [_, :state], + handle_declare: [%Q{}, [:param], _, :new_state], + handle_first: [%Q{}, %C{}, _, :newer_state], + handle_deallocate: [%Q{}, %C{}, _, :newest_state], + handle_rollback: [_, :state2] + ] = A.record(agent) + end + + test "stage deallocate disconnects" do + err = RuntimeError.exception("oops") + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:ok, %C{}, :newer_state}, + {:deallocate, %R{}, :newest_state}, + {:disconnect, err, :state2}, + :ok, + fn(opts) -> + send(opts[:parent], :reconnected) + {:ok, :new_state2} + end + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + Process.flag(:trap_exit, true) + {:ok, stage} = P.start_producer(pool, %Q{}, [:param], opts) + catch_exit([{stage, [cancel: :transient]}] |> GenStage.stream() |> Enum.to_list()) + assert_receive {:EXIT, ^stage, {^err, _}} + + assert_receive :reconnected + + assert [ + connect: [_], + handle_begin: [_, :state], + handle_declare: [%Q{}, [:param], _, :new_state], + handle_first: [%Q{}, %C{}, _, :newer_state], + handle_deallocate: [%Q{}, %C{}, _, :newest_state], + disconnect: [^err, :state2], + connect: [_] + ] = A.record(agent) + end +end diff --git a/integration_test/cases/transaction_test.exs b/integration_test/cases/transaction_test.exs index 1576f2d5..69ff378e 100644 --- a/integration_test/cases/transaction_test.exs +++ b/integration_test/cases/transaction_test.exs @@ -135,8 +135,7 @@ defmodule TransactionTest do P.rollback(conn2, :oops) end) == {:error, :oops} - assert_raise DBConnection.ConnectionError, "transaction rolling back", - fn() -> P.transaction(conn, fn(_) -> nil end) end + assert P.transaction(conn, fn(_) -> nil end) == {:error, :rollback} end) == {:error, :rollback} assert P.transaction(pool, fn(_) -> :result end) == {:ok, :result} @@ -197,8 +196,7 @@ defmodule TransactionTest do assert_raise RuntimeError, "oops", fn() -> P.transaction(conn, fn(_) -> raise "oops" end) end - assert_raise DBConnection.ConnectionError, "transaction rolling back", - fn() -> P.transaction(conn, fn(_) -> nil end) end + assert P.transaction(conn, fn(_) -> nil end) == {:error, :rollback} end) == {:error, :rollback} assert P.transaction(pool, fn(_) -> :result end) == {:ok, :result} diff --git a/integration_test/tests.exs b/integration_test/tests.exs index f8ed4c1a..619b9620 100644 --- a/integration_test/tests.exs +++ b/integration_test/tests.exs @@ -2,6 +2,8 @@ Code.require_file "cases/after_connect_test.exs", __DIR__ Code.require_file "cases/backoff_test.exs", __DIR__ Code.require_file "cases/client_test.exs", __DIR__ Code.require_file "cases/close_test.exs", __DIR__ +Code.require_file "cases/consumer_test.exs", __DIR__ +Code.require_file "cases/continuation_test.exs", __DIR__ Code.require_file "cases/execute_test.exs", __DIR__ Code.require_file "cases/idle_test.exs", __DIR__ Code.require_file "cases/overflow_test.exs", __DIR__ @@ -9,6 +11,8 @@ Code.require_file "cases/prepare_execute_test.exs", __DIR__ Code.require_file "cases/prepare_stream_test.exs", __DIR__ Code.require_file "cases/prepare_test.exs", __DIR__ Code.require_file "cases/queue_test.exs", __DIR__ +Code.require_file "cases/producer_test.exs", __DIR__ Code.require_file "cases/stream_test.exs", __DIR__ Code.require_file "cases/transaction_execute_test.exs", __DIR__ +Code.require_file "cases/transaction_producer_test.exs", __DIR__ Code.require_file "cases/transaction_test.exs", __DIR__ diff --git a/lib/db_connection.ex b/lib/db_connection.ex index 4755e323..0118b3f5 100644 --- a/lib/db_connection.ex +++ b/lib/db_connection.ex @@ -739,9 +739,12 @@ defmodule DBConnection do fun.(conn) end def run(pool, fun, opts) do - {conn, conn_state} = checkout(pool, opts) - put_info(conn, :idle, conn_state) - run_begin(conn, fun, opts) + conn = checkout(pool, opts) + try do + fun.(conn) + after + checkin(conn, opts) + end end @doc """ @@ -756,10 +759,11 @@ defmodule DBConnection do `run/3` and `transaction/3` can be nested multiple times. If a transaction is rolled back or a nested transaction `fun` raises the transaction is marked as - failed. Any calls inside a failed transaction (except `rollback/2`) will raise - until the outer transaction call returns. All running `transaction/3` calls - will return `{:error, :rollback}` if the transaction failed or connection - closed and `rollback/2` is not called for that `transaction/3`. + failed. Any calls inside a failed transaction (except `rollback/2` and + `transaction/3`) will raise until the outer transaction call returns. All + running (and future) `transaction/3` calls will return `{:error, :rollback}` + if the transaction failed or connection closed and `rollback/2` is not + called for that `transaction/3`. ### Options @@ -817,11 +821,7 @@ defmodule DBConnection do case get_info(conn) do {transaction, _} when transaction in [:transaction, :failed] -> throw({:rollback, conn_ref, err}) - {transaction, _, _} when transaction in [:transaction, :failed] -> - throw({:rollback, conn_ref, err}) - {:idle, _} -> - raise "not inside transaction" - {:idle, _, _} -> + {idle, _} when idle in [:idle, :continuation] -> raise "not inside transaction" :closed -> raise DBConnection.ConnectionError, "connection is closed" @@ -834,6 +834,9 @@ defmodule DBConnection do ### Options + * `:stream_mapper` - A function to flat map the results of the query, either + a 2-arity fun, `{module, function, args}` with `DBConnection.t` and the + result prepended to `args` or `nil` (default: `nil`) * `:pool_timeout` - The maximum time to wait for a reply when making a synchronous call to the pool (default: `5_000`) * `:queue` - Whether to block waiting in an internal queue for the @@ -870,6 +873,9 @@ defmodule DBConnection do ### Options + * `:stream_mapper` - A function to flat map the results of the query, + either a 2-arity fun, `{module, function, args}` with `DBConnection.t` and + the result prepended to `args` or `nil` (default: `nil`) * `:pool_timeout` - The maximum time to wait for a reply when making a synchronous call to the pool (default: `5_000`) * `:queue` - Whether to block waiting in an internal queue for the @@ -890,7 +896,8 @@ defmodule DBConnection do {:ok, results} = DBConnection.transaction(conn, fn(conn) -> query = %Query{statement: "SELECT id FROM table"} query = DBConnection.prepare!(conn, query) - stream = DBConnection.stream(conn, query, []) + opts = [stream_mapper: &Map.fetch!(&1, :rows)] + stream = DBConnection.stream(conn, query, [], opts) Enum.to_list(stream) end) """ @@ -913,21 +920,219 @@ defmodule DBConnection do resource(conn, start, &fetch/3, &deallocate/3, opts).(acc, fun) end + @doc """ + Acquire a lock on a connection and return the connection struct for use with + `run/3` and/or `transaction/3` calls. + + `run/3` and `transaction/3` can be nested multiple times but a + `transaction/3` call inside another `transaction/3` will be treated + the same as `run/3`. + + ### Options + + * `:pool_timeout` - The maximum time to wait for a reply when making a + synchronous call to the pool (default: `5_000`) + * `:queue` - Whether to block waiting in an internal queue for the + connection's state (boolean, default: `true`) + * `:timeout` - The maximum time that the caller is allowed the + to hold the connection's state (default: `15_000`) + + The pool may support other options. + + ### Example + + conn = DBConnection.checkout(pool) + try do + DBConnection.execute!(conn, "SELECT id FROM table", []) + after + DBConnection.checkin(conn) + end + """ + @spec checkout(pool :: GenServer.server, opts :: Keyword.t) :: t + def checkout(pool, opts \\ []) do + case run_checkout(pool, opts) do + {:ok, conn, _} -> + conn + {:error, err} -> + raise err + end + end + + @doc """ + Release the lock on a connection. + + Returns `:ok`. + + The pool may support options. + + ### Example + + conn = DBConnection.checkout(pool) + try do + DBConnection.execute!(conn, "SELECT id FROM table", []) + after + DBConnection.checkin(conn) + end + """ + @spec checkin(conn :: t, opts :: Keyword.t) :: :ok + def checkin(%DBConnection{} = conn, opts \\ []) do + case delete_info(conn) do + {:idle, conn_state} -> + run_checkin(conn, conn_state, opts) + {status, conn_state} + when status in [:transaction, :failed, :continuation] -> + try do + raise "inside transaction" + catch + :error, reason -> + stack = System.stacktrace() + delete_stop(conn, conn_state, :error, reason, stack, opts) + :erlang.raise(:error, reason, stack) + end + :closed -> + :ok + end + end + + @doc """ + Acquire a lock on a connection and return the connection struct for use with + `transaction/3` calls. + + To use the locked connection call `transaction/3`. If the transaction is + rolled back the connection is checked in. To release the lock the connection + call `commit_checkin/2` or `rollback_checkin/3`. + + ### Options + + * `:pool_timeout` - The maximum time to wait for a reply when making a + synchronous call to the pool (default: `5_000`) + * `:queue` - Whether to block waiting in an internal queue for the + connection's state (boolean, default: `true`) + * `:timeout` - The maximum time that the caller is allowed the + to hold the connection's state (default: `15_000`) + * `:log` - A function to log information about begin, commit and rollback + calls made as part of the transaction, either a 1-arity fun, + `{module, function, args}` with `DBConnection.LogEntry.t` prepended to + `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) + + The pool and connection module may support other options. All options + are passed to `handle_begin/2`, and `handle_rollback/2`. + + ### Example + + conn = DBConnection.checkout_begin(pool) + {:ok, res} = DBConnection.transaction(conn, fn(conn) -> + DBConnection.execute!(conn, "SELECT id FROM table", []) + end) + DBConnection.commit_checkin(conn) + """ + @spec checkout_begin(pool :: GenServer.server, opts :: Keyword.t) :: t + def checkout_begin(pool, opts \\ []) do + {result, log_info} = checkout_begin_meter(pool, opts) + transaction_log(log_info, :checkout_begin) + case result do + {:ok, conn} -> + conn + {:raise, err} -> + raise err + {kind, reason, stack} -> + :erlang.raise(kind, reason, stack) + end + end + + @doc """ + Commit transaction and release the lock on a connection. + + Returns `:ok` on sucess, otherwise `{:error, :rollback}` if the + transaction failed and was rolled back or the connection is not available. + + Can only be called for a connection checked out with `checkout_begin/2` when + outside of `transaction/3`. + + ### Options + + * `:log` - A function to log information about begin, commit and rollback + calls made as part of the transaction, either a 1-arity fun, + `{module, function, args}` with `DBConnection.LogEntry.t` prepended to + `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) + + The pool and connection module may support other options. All options + are passed to `handle_commit/2`. + + ### Example + + conn = DBConnection.checkout_begin(pool) + DBConnection.commit_checkin(conn) + """ + @spec commit_checkin(t, Keyword.t) :: :ok | {:error, :rollback} + def commit_checkin(conn, opts \\ []) do + log = Keyword.get(opts, :log) + {result, log_info} = continuation_conclude(conn, &commit/4, log, opts, :ok) + transaction_log(log_info, :commit_checkin) + case result do + {:raise, err} -> + raise err + {kind, reason, stack} -> + :erlang.raise(kind, reason, stack) + other -> + other + end + end + + @doc """ + Rollback transaction and release the lock on a connection. + + Returns `{:error, reason}` if rolls back or connection not available. + + Can only be called for a connection checked out with `checkout_begin/2` when + outside of `transaction/3`. + + ### Options + + * `:log` - A function to log information about begin, commit and rollback + calls made as part of the transaction, either a 1-arity fun, + `{module, function, args}` with `DBConnection.LogEntry.t` prepended to + `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) + + The pool and connection module may support other options. All options + are passed to `handle_rollback/2`. + + ### Example + + conn = DBConnection.checkout_begin(pool) + DBConnection.rollback_checkin(conn, :oops) + """ + @spec rollback_checkin(t, reason, Keyword.t) :: + {:error, reason} when reason: var + def rollback_checkin(conn, reason, opts \\ []) do + {result, log_info} = run_continuation_rollback(conn, reason, opts) + transaction_log(log_info, :rollback_checkin) + case result do + {:raise, err} -> + raise err + {kind, reason, stack} -> + :erlang.raise(kind, reason, stack) + other -> + other + end + end + ## Helpers - defp checkout(pool, opts) do + defp run_checkout(pool, opts) do pool_mod = Keyword.get(opts, :pool, DBConnection.Connection) case apply(pool_mod, :checkout, [pool, opts]) do {:ok, pool_ref, conn_mod, conn_state} -> conn = %DBConnection{pool_mod: pool_mod, pool_ref: pool_ref, conn_mod: conn_mod, conn_ref: make_ref()} - {conn, conn_state} - {:error, err} -> - raise err + put_info(conn, :idle, conn_state) + {:ok, conn, conn_state} + {:error, _} = error -> + error end end - defp checkin(conn, conn_state, opts) do + defp run_checkin(conn, conn_state, opts) do %DBConnection{pool_mod: pool_mod, pool_ref: pool_ref} = conn _ = apply(pool_mod, :checkin, [pool_ref, conn_state, opts]) :ok @@ -1122,29 +1327,40 @@ defmodule DBConnection do defp run_meter(%DBConnection{} = conn, fun, opts) do case Keyword.get(opts, :log) do nil -> - {run(conn, fun, opts), nil} + {fun.(conn), nil} log -> - run_meter(conn, log, [], fun, opts) - end + start = time() + result = fun.(conn) + stop = time() + meter = {log, [stop: stop, start: start]} + {result, meter} + end end defp run_meter(pool, fun, opts) do case Keyword.get(opts, :log) do nil -> {run(pool, fun, opts), nil} log -> - run_meter(pool, log, [checkout: time()], fun, opts) + run_meter(pool, log, fun, opts) end end - defp run_meter(conn, log, times, fun, opts) do - fun = fn(conn2) -> - start = time() - result = fun.(conn2) - stop = time() - meter = {log, [stop: stop, start: start] ++ times} - {result, meter} + defp run_meter(pool, log, fun, opts) do + checkout = time() + case run_checkout(pool, opts) do + {:ok, conn, _} -> + try do + start = time() + result = fun.(conn) + stop = time() + meter = {log, [stop: stop, start: start, checkout: checkout]} + {result, meter} + after + checkin(conn, opts) + end + {:error, _} = error -> + {error, {log, [stop: time(), checkout: checkout]}} end - run(conn, fun, opts) end defp decode_log(_, _, _, nil, result), do: log_result(result) @@ -1152,11 +1368,13 @@ defmodule DBConnection do log(call, query, params, log, [decode: time()] ++ times, result) end - defp transaction_log(nil), do: :ok - defp transaction_log({log, times, callback, result}) do + defp transaction_log(log, fun \\ :transaction) + + defp transaction_log(nil, _fun), do: :ok + defp transaction_log({log, times, callback, result}, fun) do call = transaction_call(callback) result = transaction_result(result) - _ = log(:transaction, call, nil, log, times, result) + _ = log(fun, call, nil, log, times, result) :ok end @@ -1194,39 +1412,19 @@ defmodule DBConnection do end defp log_result(other), do: other - defp run_begin(conn, fun, opts) do - try do - fun.(conn) - after - run_end(conn, opts) - end - end - - defp run_end(conn, opts) do - case delete_info(conn) do - {:idle, conn_state} -> - checkin(conn, conn_state, opts) - {status, conn_state} when status in [:transaction, :failed] -> - try do - raise "connection run ended in transaction" - catch - :error, reason -> - stack = System.stacktrace() - delete_stop(conn, conn_state, :error, reason, stack, opts) - :erlang.raise(:error, reason, stack) - end - :closed -> - :ok - end - end - defp transaction_meter(%DBConnection{} = conn, fun, opts) do - case fetch_info(conn) do + case get_info(conn) do {:transaction, _} -> {transaction_nested(conn, fun), nil} + {:continuation, conn_state} -> + {transaction_continue(conn, conn_state, fun, opts), nil} {:idle, conn_state} -> log = Keyword.get(opts, :log) begin_meter(conn, conn_state, log, [], fun, opts) + {:failed, _} -> + {{:error, :rollback}, nil} + :closed -> + {{:error, :rollback}, nil} end end defp transaction_meter(pool, fun, opts) do @@ -1234,8 +1432,24 @@ defmodule DBConnection do nil -> run(pool, &begin(&1, nil, [], fun, opts), opts) log -> - times = [checkout: time()] - run(pool, &begin(&1, log, times, fun, opts), opts) + transaction_meter(pool, log, fun, opts) + end + end + + defp transaction_meter(pool, log, fun, opts) do + checkout = time() + case run_checkout(pool, opts) do + {:ok, conn, conn_state} -> + try do + begin_meter(conn, conn_state, log, [checkout: checkout], fun, opts) + after + checkin(conn, opts) + end + {:error, err} -> + times = [stop: time(), checkout: checkout] + result = {:raise, err} + log_info = {log, times, :handle_begin, result} + {result, log_info} end end @@ -1289,7 +1503,7 @@ defmodule DBConnection do defp commit(conn, log, opts, result) do case get_info(conn) do - {:transaction, conn_state} -> + {trans, conn_state} when trans in [:transaction, :continuation] -> conclude_meter(conn, conn_state, log, :handle_commit, opts, result) {:failed, conn_state} -> result = {:error, :rollback} @@ -1301,7 +1515,8 @@ defmodule DBConnection do defp rollback(conn, log, opts, result) do case get_info(conn) do - {trans, conn_state} when trans in [:transaction, :failed] -> + {trans, conn_state} + when trans in [:transaction, :failed, :continuation] -> conclude_meter(conn, conn_state, log, :handle_rollback, opts, result) :closed -> {result, nil} @@ -1396,7 +1611,62 @@ defmodule DBConnection do end end - defp prepare_declare(conn, query, params, opts) do + defp transaction_continue(conn, state, fun, opts) do + %DBConnection{conn_ref: conn_ref} = conn + try do + put_info(conn, :transaction, state) + fun.(conn) + catch + :throw, {:rollback, ^conn_ref, reason} -> + continue_failed(conn, reason, opts) + kind, reason -> + stack = System.stacktrace() + continue_failed(conn, :raise, opts) + :erlang.raise(kind, reason, stack) + else + result -> + continue_ok(conn, result, opts) + end + end + + defp continue_ok(conn, result, opts) do + case get_info(conn) do + {:transaction, conn_state} -> + put_info(conn, :continuation, conn_state) + {:ok, result} + {:failed, conn_state} -> + put_info(conn, :continuation, conn_state) + continue_rollback(conn, :rollback, opts) + _ -> + {:error, :rollback} + end + end + + defp continue_failed(conn, reason, opts) do + case get_info(conn) do + {trans, conn_state} when trans in [:transaction, :failed] -> + put_info(conn, :continuation, conn_state) + continue_rollback(conn, reason, opts) + :closed -> + {:error, reason} + end + end + + defp continue_rollback(conn, reason, opts) do + {result, log_info} = run_continuation_rollback(conn, reason, opts) + transaction_log(log_info) + case result do + {:raise, err} -> + raise err + {kind, reason, stack} -> + :erlang.raise(kind, reason, stack) + other -> + other + end + end + + @doc false + def prepare_declare(conn, query, params, opts) do query = parse(:prepare_declare, query, params, opts) case run_prepare_declare(conn, query, params, opts) do {{:ok, query, cursor}, meter} -> @@ -1432,7 +1702,8 @@ defmodule DBConnection do end end - defp declare(conn, query, params, opts) do + @doc false + def declare(conn, query, params, opts) do encoded = encode(:declare, query, params, opts) case run_declare(conn, query, encoded, opts) do {{:ok, cursor}, meter} -> @@ -1461,17 +1732,27 @@ defmodule DBConnection do end end - defp fetch(conn, {:first, query, cursor}, opts) do - fetch(conn, :handle_first, :first, query, cursor, opts) + @doc false + def fetch(conn, state, opts) do + case run_fetch(conn, state, opts) do + {:ok, result, state} -> + {fetch_map(conn, result, opts), state} + {:halt, _} = halt -> + halt + end + end + + defp run_fetch(conn, {:first, query, cursor}, opts) do + run_fetch(conn, :handle_first, :first, query, cursor, opts) end - defp fetch(conn, {:next, query, cursor}, opts) do - fetch(conn, :handle_next, :next, query, cursor, opts) + defp run_fetch(conn, {:next, query, cursor}, opts) do + run_fetch(conn, :handle_next, :next, query, cursor, opts) end - defp fetch(_, {:deallocate, _, _} = state, _) do + defp run_fetch(_, {:deallocate, _, _} = state, _) do {:halt, state} end - def fetch(conn, fun, call, query, cursor, opts) do + defp run_fetch(conn, fun, call, query, cursor, opts) do fetch = &handle(&1, fun, [query, cursor], opts) case run_meter(conn, fetch, opts) do {{:ok, result}, meter} -> @@ -1486,10 +1767,22 @@ defmodule DBConnection do defp fetch_decode(status, call, query, cursor, meter, result, opts) do {:ok, decoded} = decode(call, query, cursor, meter, result, opts) - {[decoded], {status, query, cursor}} + {:ok, decoded, {status, query, cursor}} end - defp deallocate(conn, {_, query, cursor}, opts) do + defp fetch_map(conn, result, opts) do + case Keyword.get(opts, :stream_mapper) do + map when is_function(map, 2) -> + map.(conn, result) + {mod, fun, args} -> + apply(mod, fun, [conn, result | args]) + nil -> + [result] + end + end + + @doc false + def deallocate(conn, {_, query, cursor}, opts) do case get_info(conn) do :closed -> :ok _ -> deallocate(conn, query, cursor, opts) @@ -1512,6 +1805,95 @@ defmodule DBConnection do Stream.resource(start, next, stop) end + defp checkout_begin_meter(pool, opts) do + case Keyword.get(opts, :log) do + nil -> + run_checkout_begin(pool, opts) + log -> + checkout_begin_meter(pool, log, opts) + end + end + + defp run_checkout_begin(pool, opts) do + case run_checkout(pool, opts) do + {:ok, conn, conn_state} -> + run_checkout_begin(conn, conn_state, opts) + {:error, err} -> + {{:raise, err}, nil} + end + end + + defp run_checkout_begin(conn, conn_state, opts) do + case handle(conn, conn_state, :handle_begin, opts, :continuation) do + {:ok, _} -> + {{:ok, conn}, nil} + error -> + checkin(conn, opts) + {error, nil} + end + end + + defp checkout_begin_meter(pool, log, opts) do + checkout = time() + case run_checkout(pool, opts) do + {:ok, conn, conn_state} -> + checkout_begin_meter(conn, conn_state, log, [checkout: checkout], opts) + {:error, err} -> + times = [stop: time(), checkout: checkout] + result = {:raise, err} + log_info = {log, times, :handle_begin, result} + {result, log_info} + end + end + + defp checkout_begin_meter(conn, conn_state, log, times, opts) do + start = time() + result = handle(conn, conn_state, :handle_begin, opts, :continuation) + times = [stop: time(), start: start] ++ times + log_info = {log, times, :handle_begin, result} + case result do + {:ok, _} -> + checkout_begin_log(conn, log_info, opts) + error -> + checkin(conn, opts) + {error, log_info} + end + end + + defp checkout_begin_log(conn, {log, _, _, _} = log_info, opts) do + try do + transaction_log(log_info, :checkout_begin) + catch + kind, reason -> + result = {kind, reason, System.stacktrace()} + continuation_conclude(conn, &rollback/4, log, opts, result) + else + _ -> + {{:ok, conn}, nil} + end + end + + defp run_continuation_rollback(conn, reason, opts) do + log = Keyword.get(opts, :log) + result = {:error, reason} + continuation_conclude(conn, &rollback/4, log, opts, result, result) + end + + defp continuation_conclude(conn, fun, log, opts, result, closed \\ {:error, :rollback}) do + case get_info(conn) do + {:continuation, _} -> + try do + fun.(conn, log, opts, result) + after + checkin(conn, opts) + end + {trans, _} when trans in [:transaction, :failed] -> + raise "inside transaction" + :closed -> + {closed, nil} + end + end + defp put_info(conn, status, conn_state) do _ = Process.put(key(conn), {status, conn_state}) :ok @@ -1521,6 +1903,8 @@ defmodule DBConnection do case get_info(conn) do {:failed, _} -> raise DBConnection.ConnectionError, "transaction rolling back" + {:continuation, _} -> + raise "not inside transaction" {_, _} = info -> info :closed -> diff --git a/lib/db_connection/consumer.ex b/lib/db_connection/consumer.ex new file mode 100644 index 00000000..ea14a056 --- /dev/null +++ b/lib/db_connection/consumer.ex @@ -0,0 +1,150 @@ +defmodule DBConnection.Consumer do + @moduledoc """ + A `GenStage` consumer that runs a fun for each batch of events, optionally + encapsulated in a transaction. + + ### Options + + * `:stage_transaction` - Whether the producer should encapsulate the query + in a transaction (default: `true`) + * `:pool_timeout` - The maximum time to wait for a reply when making a + synchronous call to the pool (default: `5_000`) + * `:queue` - Whether to block waiting in an internal queue for the + connection's state (boolean, default: `true`) + * `:timeout` - The maximum time that the caller is allowed the + to hold the connection's state (ignored when using a run/transaction + connection, default: `15_000`) + * `:log` - A function to log information about a call, either + a 1-arity fun or `{module, function, args}` with `DBConnection.LogEntry.t` + prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) + + The pool and connection module may support other options. All options + are passed to `handle_begin/2`, `handle_commit/2` and `handle_rollback/2`. + """ + alias __MODULE__, as: Consumer + + use GenStage + + @enforce_keys [:conn, :fun, :transaction?, :opts, :done?, :producers] + defstruct [:conn, :fun, :transaction?, :opts, :done?, :producers] + + @start_opts [:name, :spawn_opt, :debug] + @stage_opts [:subscribe_to] + + @doc """ + Start link a `GenStage` consumer thay will run an anonymous for each batch of + events, optionally inside a transaction. + + The transaction is rolled back if the process terminates with a reason other + than `:normal`. If the transaction is rolled back the process exits with the + same reason. + + For options see "Options" in the module documentation. + + ### Example + + fun = fn(conn, rows) -> + DBConnection.execute!(conn, statement, [rows]) + end + {:ok, stage} = DBConnection.Consumer.start_link(pool, fun, opts) + enum + |> Flow.from_enumerable() + |> Flow.map(&process_row/1) + |> Flow.into_stages([{stage, cancel: :transient}] + """ + def start_link(pool, fun, opts \\ []) when is_function(fun, 2) do + start_opts = Keyword.take(opts, @start_opts) + GenStage.start_link(__MODULE__, {pool, fun, opts}, start_opts) + end + + @doc false + def init({pool, fun, opts}) do + stage_opts = Keyword.take(opts, @stage_opts) + {:consumer, init(pool, fun, opts), stage_opts} + end + + @doc false + def handle_subscribe(:producer, _, {pid, ref}, consumer) do + case consumer do + %Consumer{done?: true} = consumer -> + GenStage.cancel({pid, ref}, :normal, [:noconnect]) + {:manual, consumer} + %Consumer{done?: false, producers: producers} = consumer -> + new_producers = Map.put(producers, ref, pid) + {:automatic, %Consumer{consumer | producers: new_producers}} + end + end + + @doc false + def handle_cancel(_, {_, ref}, consumer) do + %Consumer{producers: producers} = consumer + case Map.delete(producers, ref) do + new_producers when new_producers == %{} and producers != %{} -> + GenStage.async_info(self(), :stop) + {:noreply, [], %Consumer{consumer | done?: true, producers: %{}}} + new_producers -> + {:noreply, [], %Consumer{consumer | producers: new_producers}} + end + end + + @doc false + def handle_info(:stop, state) do + {:stop, :normal, state} + end + def handle_info(_msg, state) do + {:noreply, [], state} + end + + @doc false + def handle_events(events, _, %Consumer{transaction?: true} = consumer) do + %Consumer{conn: conn, fun: fun, opts: opts} = consumer + case DBConnection.transaction(conn, &fun.(&1, events), opts) do + {:ok, _} -> + {:noreply, [], consumer} + {:error, reason} -> + exit(reason) + end + end + def handle_events(events, _, %Consumer{transaction?: false} = consumer) do + %Consumer{conn: conn, fun: fun, opts: opts} = consumer + _ = DBConnection.run(conn, &fun.(&1, events), opts) + {:noreply, [], consumer} + end + + @doc false + def terminate(reason, %Consumer{transaction?: true} = stage) do + %Consumer{conn: conn, opts: opts} = stage + case DBConnection.transaction(conn, fn(_) -> reason end, opts) do + {:ok, :normal} -> + DBConnection.commit_checkin(conn, opts) + {:ok, reason} -> + DBConnection.rollback_checkin(conn, reason, opts) + {:error, :rollback} -> + :ok + end + end + def terminate(_, %Consumer{transaction?: false} = stage) do + %Consumer{conn: conn, opts: opts} = stage + DBConnection.checkin(conn, opts) + end + + ## Helpers + + defp init(pool, fun, opts) do + case Keyword.get(opts, :stage_transaction, true) do + true -> + pool + |> DBConnection.checkout_begin(opts) + |> init(true, fun, opts) + false -> + pool + |> DBConnection.checkout(opts) + |> init(false, fun, opts) + end + end + + defp init(conn, transaction?, fun, opts) do + %Consumer{conn: conn, transaction?: transaction?, fun: fun, opts: opts, + done?: false, producers: %{}} + end +end diff --git a/lib/db_connection/log_entry.ex b/lib/db_connection/log_entry.ex index a5356dec..131092fc 100644 --- a/lib/db_connection/log_entry.ex +++ b/lib/db_connection/log_entry.ex @@ -62,4 +62,7 @@ defmodule DBConnection.LogEntry do defp parse_time({:checkout, checkout} = time, {{:start, start}, entry}) do {time, %{entry | pool_time: diff(start, checkout)}} end + defp parse_time({:checkout, checkout} = time, {{:stop, stop}, entry}) do + {time, %{entry | pool_time: diff(stop, checkout)}} + end end diff --git a/lib/db_connection/producer.ex b/lib/db_connection/producer.ex new file mode 100644 index 00000000..2d27212e --- /dev/null +++ b/lib/db_connection/producer.ex @@ -0,0 +1,178 @@ +defmodule DBConnection.Producer do + @moduledoc """ + A `GenStage` producer that streams the result of a query, optionally + encapsulated in a transaction. + + ### Options + + * `:stream_mapper` - A function to flat map the results of the query, either + a 2-arity fun, `{module, function, args}` with `DBConnection.t` and the + result prepended to `args` or `nil` (default: `nil`) + * `:stage_prepare` - Whether the producer should prepare the query before + streaming it (default: `false`) + * `:stage_transaction` - Whether the producer should encapsulate the query + in a transaction (default: `true`) + * `:pool_timeout` - The maximum time to wait for a reply when making a + synchronous call to the pool (default: `5_000`) + * `:queue` - Whether to block waiting in an internal queue for the + connection's state (boolean, default: `true`) + * `:timeout` - The maximum time that the caller is allowed the + to hold the connection's state (ignored when using a run/transaction + connection, default: `15_000`) + * `:log` - A function to log information about a call, either + a 1-arity fun or `{module, function, args}` with `DBConnection.LogEntry.t` + prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) + + The pool and connection module may support other options. All options + are passed to `handle_begin/2`, `handle_prepare/3`, `handle_declare/4`, + `handle_first/4`, `handle_next/4`, `handle_deallocate/4`, `handle_commit/2` + and `handle_rollback/2`. In addition, the demand will be passed to + `handle_first/4` and `handle_next/4` by adding `fetch: demand` to the options. + """ + alias __MODULE__, as: Producer + + use GenStage + + @enforce_keys [:conn, :state, :transaction?, :opts] + defstruct [:conn, :state, :transaction?, :opts] + + @start_opts [:name, :spawn_opt, :debug] + @stage_opts [:demand, :buffer_size, :buffer_keep, :dispatcher] + + @doc """ + Start link a `GenStage` producer that will optionally prepare a query, execute + it and stream results using a cursor inside a transaction. + + The transaction is rolled back if the process terminates with a reason other + than `:normal`. + + For options see "Options" in the module documentation. + + ### Example + + query = %Query{statement: "SELECT id FROM table"} + opts = [stream_mapper: &Map.fetch!(&1, :rows)] + {:ok, stage} = DBConnection.Producer.start_link(pool, query, [], opts) + stage |> GenStage.stream() |> Enum.to_list() + """ + def start_link(pool, query, params, opts \\ []) do + start_opts = Keyword.take(opts, @start_opts) + args = {pool, query, params, opts} + GenStage.start_link(__MODULE__, args, start_opts) + end + + @doc false + def init({pool, query, params, opts}) do + stage_opts = Keyword.take(opts, @stage_opts) + stage = init(pool, opts) + state = run(&declare(&1, query, params, opts), opts, stage) + {:producer, %Producer{stage | state: state}, stage_opts} + end + + @doc false + def handle_info(:stop, stage) do + {:stop, :normal, stage} + end + def handle_info({:fetch, conn, pending}, %Producer{conn: conn} = stage) do + handle_demand(pending, stage) + end + def handle_info(_, stage) do + {:noreply, [], stage} + end + + @doc false + def handle_demand(demand, stage) do + %Producer{conn: conn, state: state, opts: opts} = stage + case run(&fetch(&1, demand, state, opts), opts, stage) do + {:halt, state} -> + GenStage.async_info(self(), :stop) + {:noreply, [], %Producer{stage | state: state}} + {events, state} -> + # stream_mapper may not produce the desired number of events, i.e. at + # the end of the results, so we can close the cursor as soon as + # possible. + pending = demand - length(events) + _ = if pending > 0, do: send(self(), {:fetch, conn, pending}) + {:noreply, events, %Producer{stage | state: state}} + end + end + + @doc false + def terminate(reason, %Producer{transaction?: true} = stage) do + %Producer{conn: conn, state: state, opts: opts} = stage + deallocate = &deallocate(&1, reason, state, opts) + case DBConnection.transaction(conn, deallocate, opts) do + {:ok, :normal} -> + DBConnection.commit_checkin(conn, opts) + {:ok, reason} -> + DBConnection.rollback_checkin(conn, reason, opts) + {:error, :rollback} -> + :ok + end + end + def terminate(reason, %Producer{transaction?: false} = stage) do + %Producer{conn: conn, state: state, opts: opts} = stage + try do + deallocate(conn, reason, state, opts) + after + DBConnection.checkin(conn, opts) + end + end + + ## Helpers + + defp init(pool, opts) do + case Keyword.get(opts, :stage_transaction, true) do + true -> + conn = DBConnection.checkout_begin(pool, opts) + %Producer{conn: conn, transaction?: true, state: :declare, opts: opts} + false -> + conn = DBConnection.checkout(pool, opts) + %Producer{conn: conn, transaction?: false, state: :declare, opts: opts} + end + end + + defp run(fun, opts, %Producer{conn: conn, transaction?: true}) do + case DBConnection.transaction(conn, fun, opts) do + {:ok, result} -> + result + {:error, reason} -> + exit(reason) + end + end + defp run(fun, opts, %Producer{conn: conn, transaction?: false}) do + try do + fun.(conn) + catch + kind, reason -> + stack = System.stacktrace() + DBConnection.checkin(conn, opts) + :erlang.raise(kind, reason, stack) + end + end + + defp declare(conn, query, params, opts) do + case Keyword.get(opts, :stage_prepare, false) do + true -> + DBConnection.prepare_declare(conn, query, params, opts) + false -> + DBConnection.declare(conn, query, params, opts) + end + end + + defp fetch(conn, demand, state, opts) do + try do + DBConnection.fetch(conn, state, [fetch: demand] ++ opts) + catch + kind, reason -> + stack = System.stacktrace() + DBConnection.deallocate(conn, state, opts) + :erlang.raise(kind, reason, stack) + end + end + + defp deallocate(conn, reason, state, opts) do + :ok = DBConnection.deallocate(conn, state, opts) + reason + end +end diff --git a/mix.exs b/mix.exs index 8844a7e1..4259fae4 100644 --- a/mix.exs +++ b/mix.exs @@ -7,7 +7,7 @@ defmodule DBConnection.Mixfile do def project do [app: :db_connection, version: @version, - elixir: "~> 1.2", + elixir: "~> 1.3", deps: deps(), docs: docs(), description: description(), @@ -30,6 +30,10 @@ defmodule DBConnection.Mixfile do [{:connection, "~> 1.0.2"}, {:poolboy, "~> 1.5", [optional: true]}, {:sbroker, "~> 1.0", [optional: true]}, + {:gen_stage, "~> 0.11", [optional: true, github: "elixir-lang/gen_stage", + branch: "jv-exit-signals"]}, + {:flow, "~> 0.11", [optional: true, github: "elixir-lang/flow", + branch: "jv-exit-signals"]}, {:ex_doc, "~> 0.12", only: :dev}] end diff --git a/mix.lock b/mix.lock index 725242b0..33473b27 100644 --- a/mix.lock +++ b/mix.lock @@ -1,5 +1,7 @@ %{"connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], []}, "earmark": {:hex, :earmark, "0.2.1", "ba6d26ceb16106d069b289df66751734802777a3cbb6787026dd800ffeb850f3", [:mix], []}, "ex_doc": {:hex, :ex_doc, "0.12.0", "b774aabfede4af31c0301aece12371cbd25995a21bb3d71d66f5c2fe074c603f", [:mix], [{:earmark, "~> 0.2", [hex: :earmark, optional: false]}]}, + "flow": {:git, "https://github.com/elixir-lang/flow.git", "cfdc0d240aa990c5229c70e032595f2b063a1445", [branch: "jv-exit-signals"]}, + "gen_stage": {:git, "https://github.com/elixir-lang/gen_stage.git", "966d998c699dec58e0d183ddbaa2b783c46cd375", [branch: "jv-exit-signals"]}, "poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [:rebar], []}, "sbroker": {:hex, :sbroker, "1.0.0", "28ff1b5e58887c5098539f236307b36fe1d3edaa2acff9d6a3d17c2dcafebbd0", [:rebar3], []}} diff --git a/test/test_support.exs b/test/test_support.exs index cce1cd73..b23be059 100644 --- a/test/test_support.exs +++ b/test/test_support.exs @@ -63,6 +63,30 @@ defmodule TestConnection do DBConnection.close!(pool, query, opts2 ++ unquote(opts)) end + def start_producer(pool, query, params, opts2 \\ []) do + DBConnection.Producer.start_link(pool, query, params, opts2 ++ unquote(opts)) + end + + def start_consumer(pool, fun, opts2 \\ []) do + DBConnection.Consumer.start_link(pool, fun, opts2 ++ unquote(opts)) + end + + def checkout_begin(pool, opts2 \\ []) do + DBConnection.checkout_begin(pool, opts2 ++ unquote(opts)) + end + + def commit_checkin(conn, opts2 \\ []) do + DBConnection.commit_checkin(conn, opts2 ++ unquote(opts)) + end + + def rollback_checkin(conn, reason, opts2 \\ []) do + DBConnection.rollback_checkin(conn, reason, opts2 ++ unquote(opts)) + end + + def checkin(conn, opts2 \\ []) do + DBConnection.checkin(conn, opts2 ++ unquote(opts)) + end + defoverridable [start_link: 1] end end @@ -70,8 +94,6 @@ defmodule TestConnection do def start_link(opts), do: DBConnection.start_link(__MODULE__, opts) def connect(opts) do - agent = Keyword.fetch!(opts, :agent) - _ = Process.put(:agent, agent) TestAgent.eval(:connect, [opts]) end @@ -136,7 +158,6 @@ defmodule TestConnection do end end - defmodule TestQuery do defstruct [:state] end @@ -179,7 +200,11 @@ defmodule TestAgent do ok end - def eval(agent \\ Process.get(:agent), fun, args) do + def eval(fun, args) do + eval(get_agent(args), fun, args) + end + + def eval(agent, fun, args) do action = {fun, args} case Agent.get_and_update(agent, &get_and_update(&1, action)) do fun when is_function(fun) -> @@ -189,6 +214,23 @@ defmodule TestAgent do end end + defp get_agent(args) do + case Process.get(:agent) do + agent when is_pid(agent) -> + agent + nil -> + opts = get_opts(args) + agent = Keyword.fetch!(opts, :agent) + _ = Process.put(:agent, agent) + agent + end + end + + defp get_opts([opts]), do: opts + defp get_opts([opts, _]), do: opts + defp get_opts([_, opts, _]), do: opts + defp get_opts([_, _, opts, _]), do: opts + def record(agent) do Enum.reverse(Agent.get(agent, &elem(&1, 1))) end