From 4ea6bba5b2d3d4026e8b7ee50dbf742c502a69fa Mon Sep 17 00:00:00 2001 From: James Fish Date: Wed, 5 Jul 2017 00:09:38 +0100 Subject: [PATCH 1/2] Switch to non-closure based transactions Requiring a fun for a transaction and non-local return for a rollback is a very limiting API. With this breaking change the API becomes more flexible, in particular it allows transaction to continue over multiple OTP behaviour callbacks and use of (potentially nested) savepoints. Removes: * DBConnection.transaction/3 * DBConnection.rollback/2 Adds: * DBConnection.checkout/2 * DBConnection.checkin/2 * DBConnection.begin/2 * DBConnection.commit/2 * DBConnection.rollback/2 * DBConnection.checkout_begin/2 * DBConnection.commit_checkin/2 * DBConnection.rollback_checkin/2 --- integration_test/cases/close_test.exs | 27 - .../cases/prepare_stream_test.exs | 165 ++-- ...nsaction_execute_test.exs => run_test.exs} | 169 +--- integration_test/cases/stream_test.exs | 221 ++--- integration_test/cases/transaction_test.exs | 837 +++++++++-------- integration_test/tests.exs | 2 +- lib/db_connection.ex | 889 +++++++++++------- test/test_support.exs | 56 +- 8 files changed, 1274 insertions(+), 1092 deletions(-) rename integration_test/cases/{transaction_execute_test.exs => run_test.exs} (51%) diff --git a/integration_test/cases/close_test.exs b/integration_test/cases/close_test.exs index af843fd0..0c2da101 100644 --- a/integration_test/cases/close_test.exs +++ b/integration_test/cases/close_test.exs @@ -154,31 +154,4 @@ defmodule CloseTest do connect: [_], handle_close: [%Q{}, _, :state]] = A.record(agent) end - - test "close in failed transaction" do - stack = [ - {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, :result, :newer_state}, - {:ok, :rolledback, :newest_state} - ] - {:ok, agent} = A.start_link(stack) - - opts = [agent: agent, parent: self()] - {:ok, pool} = P.start_link(opts) - - assert P.transaction(pool, fn(conn) -> - assert P.transaction(conn, fn(conn2) -> - P.rollback(conn2, :oops) - end) == {:error, :oops} - - assert P.close(conn, %Q{}, opts) == {:ok, :result} - end) == {:error, :rollback} - - assert [ - connect: [_], - handle_begin: [_, :state], - handle_close: [%Q{}, _, :new_state], - handle_rollback: [_, :newer_state]] = A.record(agent) - end end diff --git a/integration_test/cases/prepare_stream_test.exs b/integration_test/cases/prepare_stream_test.exs index 84590dcb..5f5916b9 100644 --- a/integration_test/cases/prepare_stream_test.exs +++ b/integration_test/cases/prepare_stream_test.exs @@ -10,53 +10,47 @@ defmodule PrepareStreamTest do test "prepare_stream returns result" do stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, %Q{}, :newer_state}, - {:ok, %C{}, :newest_state}, - {:ok, %R{}, :state2}, - {:deallocate, %R{}, :new_state2}, - {:ok, :deallocated, :newer_state2}, - {:ok, :committed, :newest_state2} + {:ok, %Q{}, :new_state}, + {:ok, %C{}, :newer_state}, + {:ok, %R{}, :newest_state}, + {:deallocate, %R{}, :state2}, + {:ok, :deallocated, :new_state2} ] {:ok, agent} = A.start_link(stack) opts = [agent: agent, parent: self()] {:ok, pool} = P.start_link(opts) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> stream = P.prepare_stream(conn, %Q{}, [:param]) assert %DBConnection.PrepareStream{} = stream assert Enum.to_list(stream) == [%R{}, %R{}] :hi - end) == {:ok, :hi} + end) == :hi assert [ connect: [_], - handle_begin: [_, :state], - handle_prepare: [%Q{}, _, :new_state], - handle_declare: [%Q{}, [:param], _, :newer_state], - handle_first: [%Q{}, %C{}, _, :newest_state], - handle_next: [%Q{}, %C{}, _, :state2], - handle_deallocate: [%Q{}, %C{}, _, :new_state2], - handle_commit: [_, :newer_state2] + handle_prepare: [%Q{}, _, :state], + handle_declare: [%Q{}, [:param], _, :new_state], + handle_first: [%Q{}, %C{}, _, :newer_state], + handle_next: [%Q{}, %C{}, _, :newest_state], + handle_deallocate: [%Q{}, %C{}, _, :state2], ] = A.record(agent) end test "prepare_stream parses/describes/encodes/decodes" do stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, %Q{state: :prepared}, :newer_state}, - {:ok, %C{}, :newest_state}, - {:deallocate, %R{}, :state2}, - {:ok, :deallocated, :new_state2}, - {:ok, :committed, :newer_state2} + {:ok, %Q{state: :prepared}, :new_state}, + {:ok, %C{}, :newer_state}, + {:deallocate, %R{}, :newest_state}, + {:ok, :deallocated, :state2} ] {:ok, agent} = A.start_link(stack) opts = [agent: agent, parent: self()] {:ok, pool} = P.start_link(opts) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> opts2 = [parse: fn(%Q{state: nil}) -> %Q{state: :parsed} end, describe: fn(%Q{state: :prepared}) -> %Q{state: :described} end, encode: fn([:param]) -> :encoded end, @@ -64,39 +58,35 @@ defmodule PrepareStreamTest do stream = P.prepare_stream(conn, %Q{}, [:param], opts2) assert Enum.to_list(stream) == [:decoded] :hi - end) == {:ok, :hi} + end) == :hi assert [ connect: [_], - handle_begin: [_, :state], - handle_prepare: [%Q{state: :parsed}, _, :new_state], - handle_declare: [%Q{state: :described}, :encoded, _, :newer_state], - handle_first: [%Q{state: :described}, %C{}, _, :newest_state], - handle_deallocate: [%Q{}, %C{}, _, :state2], - handle_commit: [_, :new_state2] + handle_prepare: [%Q{state: :parsed}, _, :state], + handle_declare: [%Q{state: :described}, :encoded, _, :new_state], + handle_first: [%Q{state: :described}, %C{}, _, :newer_state], + handle_deallocate: [%Q{}, %C{}, _, :newest_state] ] = A.record(agent) end test "prepare_stream logs result" do stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, %Q{}, :newer_state}, - {:ok, %C{}, :newest_state}, - {:ok, %R{}, :state2}, - {:ok, :deallocated, :new_state2}, - {:ok, :committed, :newest_state2} + {:ok, %Q{}, :new_state}, + {:ok, %C{}, :newer_state}, + {:ok, %R{}, :newest_state}, + {:ok, :deallocated, :state2} ] {:ok, agent} = A.start_link(stack) parent = self() opts = [agent: agent, parent: parent] {:ok, pool} = P.start_link(opts) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> stream = P.prepare_stream(conn, %Q{}, [:param], [log: &send(parent, &1)]) assert Enum.take(stream, 1) == [%R{}] :hi - end) == {:ok, :hi} + end) == :hi assert_received %DBConnection.LogEntry{call: :prepare_declare} = entry assert %{query: %Q{}, params: [:param], result: {:ok, %Q{}, %C{}}} = entry @@ -122,12 +112,10 @@ defmodule PrepareStreamTest do assert [ connect: [_], - handle_begin: [_, :state], - handle_prepare: [%Q{}, _, :new_state], - handle_declare: [%Q{}, [:param], _, :newer_state], - handle_first: [%Q{}, %C{}, _, :newest_state], - handle_deallocate: [%Q{}, %C{}, _, :state2], - handle_commit: [_, :new_state2] + handle_prepare: [%Q{}, _, :state], + handle_declare: [%Q{}, [:param], _, :new_state], + handle_first: [%Q{}, %C{}, _, :newer_state], + handle_deallocate: [%Q{}, %C{}, _, :newest_state] ] = A.record(agent) end @@ -135,9 +123,7 @@ defmodule PrepareStreamTest do err = RuntimeError.exception("oops") stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:error, err, :newer_state}, - {:ok, :comitted, :newest_state} + {:error, err, :new_state} ] {:ok, agent} = A.start_link(stack) @@ -145,11 +131,11 @@ defmodule PrepareStreamTest do opts = [agent: agent, parent: parent] {:ok, pool} = P.start_link(opts) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> stream = P.prepare_stream(conn, %Q{}, [:param], [log: &send(parent, &1)]) assert_raise RuntimeError, "oops", fn() -> Enum.take(stream, 1) end :hi - end) == {:ok, :hi} + end) == :hi assert_received %DBConnection.LogEntry{call: :prepare_declare} = entry assert %{query: %Q{}, params: [:param], result: {:error, ^err}} = entry @@ -160,9 +146,7 @@ defmodule PrepareStreamTest do assert [ connect: [_], - handle_begin: [_, :state], - handle_prepare: [%Q{}, _, :new_state], - handle_commit: [_, :newer_state], + handle_prepare: [%Q{}, _, :state] ] = A.record(agent) end @@ -170,10 +154,8 @@ defmodule PrepareStreamTest do err = RuntimeError.exception("oops") stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, %Q{}, :newer_state}, - {:error, err, :newest_state}, - {:ok, :comitted, :state2} + {:ok, %Q{}, :new_state}, + {:error, err, :newer_state}, ] {:ok, agent} = A.start_link(stack) @@ -181,11 +163,11 @@ defmodule PrepareStreamTest do opts = [agent: agent, parent: parent] {:ok, pool} = P.start_link(opts) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> stream = P.prepare_stream(conn, %Q{}, [:param], [log: &send(parent, &1)]) assert_raise RuntimeError, "oops", fn() -> Enum.take(stream, 1) end :hi - end) == {:ok, :hi} + end) == :hi assert_received %DBConnection.LogEntry{call: :prepare_declare} = entry assert %{query: %Q{}, params: [:param], result: {:error, ^err}} = entry @@ -196,10 +178,8 @@ defmodule PrepareStreamTest do assert [ connect: [_], - handle_begin: [_, :state], - handle_prepare: [%Q{}, _, :new_state], - handle_declare: [%Q{}, [:param], _, :newer_state], - handle_commit: [_, :newest_state], + handle_prepare: [%Q{}, _, :state], + handle_declare: [%Q{}, [:param], _, :new_state] ] = A.record(agent) end @@ -207,9 +187,8 @@ defmodule PrepareStreamTest do err = RuntimeError.exception("oops") stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, %Q{}, :newer_state}, - {:disconnect, err, :newest_state}, + {:ok, %Q{}, :new_state}, + {:disconnect, err, :newer_state}, :ok, fn(opts) -> send(opts[:parent], :reconnected) @@ -222,20 +201,19 @@ defmodule PrepareStreamTest do opts = [agent: agent, parent: parent] {:ok, pool} = P.start_link(opts) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> stream = P.prepare_stream(conn, %Q{}, [:param]) assert_raise RuntimeError, "oops", fn() -> Enum.take(stream, 1) end :hi - end) == {:error, :rollback} + end) == :hi assert_receive :reconnected assert [ connect: [_], - handle_begin: [_, :state], - handle_prepare: [%Q{}, _, :new_state], - handle_declare: [%Q{}, [:param], _, :newer_state], - disconnect: [^err, :newest_state], + handle_prepare: [%Q{}, _, :state], + handle_declare: [%Q{}, [:param], _, :new_state], + disconnect: [^err, :newer_state], connect: [_] ] = A.record(agent) end @@ -247,7 +225,6 @@ defmodule PrepareStreamTest do Process.link(opts[:parent]) {:ok, :state} end, - {:ok, :began, :new_state}, :oops, {:ok, :state2} ] @@ -259,12 +236,12 @@ defmodule PrepareStreamTest do assert_receive {:hi, conn} Process.flag(:trap_exit, true) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> stream = P.prepare_stream(conn, %Q{}, [:param]) assert_raise DBConnection.ConnectionError, "bad return value: :oops", fn() -> Enum.to_list(stream) end :hi - end) == {:error, :rollback} + end) == :hi prefix = "client #{inspect self()} stopped: " <> "** (DBConnection.ConnectionError) bad return value: :oops" @@ -275,8 +252,7 @@ defmodule PrepareStreamTest do assert [ {:connect, _}, - {:handle_begin, [_, :state]}, - {:handle_prepare, [%Q{}, _, :new_state]} | _] = A.record(agent) + {:handle_prepare, [%Q{}, _, :state]} | _] = A.record(agent) end test "prepare_stream declare raise raises and stops connection" do @@ -286,8 +262,7 @@ defmodule PrepareStreamTest do Process.link(opts[:parent]) {:ok, :state} end, - {:ok, :began, :new_state}, - {:ok, %Q{}, :newer_state}, + {:ok, %Q{}, :new_state}, fn(_, _, _, _) -> raise "oops" end, @@ -301,11 +276,11 @@ defmodule PrepareStreamTest do assert_receive {:hi, conn} Process.flag(:trap_exit, true) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> stream = P.prepare_stream(conn, %Q{}, [:param]) assert_raise RuntimeError, "oops", fn() -> Enum.to_list(stream) end :hi - end) == {:error, :rollback} + end) == :hi prefix = "client #{inspect self()} stopped: ** (RuntimeError) oops" len = byte_size(prefix) @@ -315,20 +290,17 @@ defmodule PrepareStreamTest do assert [ {:connect, _}, - {:handle_begin, [_, :state]}, - {:handle_prepare, [%Q{}, _, :new_state]}, - {:handle_declare, [%Q{}, [:param], _, :newer_state]} | _] = A.record(agent) + {:handle_prepare, [%Q{}, _, :state]}, + {:handle_declare, [%Q{}, [:param], _, :new_state]} | _] = A.record(agent) end test "prepare_stream describe or encode raises and closes query" do stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, %Q{}, :newer_state}, - {:ok, :closed, :newest_state}, - {:ok, %Q{}, :state2}, - {:ok, :result, :new_state2}, - {:ok, :committed, :newer_state2} + {:ok, %Q{}, :new_state}, + {:ok, :closed, :newer_state}, + {:ok, %Q{}, :newest_state}, + {:ok, :result, :state2} ] {:ok, agent} = A.start_link(stack) @@ -336,7 +308,7 @@ defmodule PrepareStreamTest do opts = [agent: agent, parent: parent] {:ok, pool} = P.start_link(opts) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> describe = fn(%Q{}) -> raise "oops" end stream = P.prepare_stream(conn, %Q{}, [:param], [describe: describe]) assert_raise RuntimeError, "oops", fn() -> Enum.to_list(stream) end @@ -346,15 +318,14 @@ defmodule PrepareStreamTest do assert_raise RuntimeError, "oops", fn() -> Enum.to_list(stream) end :hi - end) == {:ok, :hi} + end) == :hi assert [ connect: [_], - handle_begin: [_, :state], - handle_prepare: [%Q{}, _, :new_state], - handle_close: [%Q{}, _, :newer_state], - handle_prepare: [%Q{}, _, :newest_state], - handle_close: [%Q{}, _, :state2], - handle_commit: [_, :new_state2]] = A.record(agent) + handle_prepare: [%Q{}, _, :state], + handle_close: [%Q{}, _, :new_state], + handle_prepare: [%Q{}, _, :newer_state], + handle_close: [%Q{}, _, :newest_state] + ] = A.record(agent) end end diff --git a/integration_test/cases/transaction_execute_test.exs b/integration_test/cases/run_test.exs similarity index 51% rename from integration_test/cases/transaction_execute_test.exs rename to integration_test/cases/run_test.exs index 69293d43..1722865c 100644 --- a/integration_test/cases/transaction_execute_test.exs +++ b/integration_test/cases/run_test.exs @@ -1,4 +1,4 @@ -defmodule TransactionExecuteTest do +defmodule RunTest do use ExUnit.Case, async: true alias TestPool, as: P @@ -6,39 +6,33 @@ defmodule TransactionExecuteTest do alias TestQuery, as: Q alias TestResult, as: R - test "execute returns result" do + test "run execute returns result" do stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, %R{}, :newer_state}, - {:ok, %R{}, :newest_state}, - {:ok, :committed, :newest_state} + {:ok, %R{}, :new_state}, + {:ok, %R{}, :newer_state} ] {:ok, agent} = A.start_link(stack) opts = [agent: agent, parent: self()] {:ok, pool} = P.start_link(opts) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> assert P.execute(conn, %Q{}, [:param]) == {:ok, %R{}} assert P.execute(conn, %Q{}, [:param], [key: :value]) == {:ok, %R{}} :hi - end) == {:ok, :hi} + end) == :hi assert [ connect: [_], - handle_begin: [_, :state], - handle_execute: [%Q{}, [:param], _, :new_state], - handle_execute: [%Q{}, [:param], - [{:key, :value} | _], :newer_state], - handle_commit: [_, :newest_state]] = A.record(agent) + handle_execute: [%Q{}, [:param], _, :state], + handle_execute: [%Q{}, [:param], [{:key, :value} | _], :new_state] + ] = A.record(agent) end - test "execute logs result" do + test "run execute logs result" do stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, %R{}, :newer_state}, - {:ok, :committed, :newest_state} + {:ok, %R{}, :new_state} ] {:ok, agent} = A.start_link(stack) @@ -57,84 +51,76 @@ defmodule TransactionExecuteTest do assert entry.decode_time >= 0 send(parent, :logged) end - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> assert P.execute(conn, %Q{}, [:param], [log: log]) == {:ok, %R{}} :hi - end) == {:ok, :hi} + end) == :hi assert_received :logged assert [ connect: [_], - handle_begin: [_, :state], - handle_execute: [%Q{}, [:param], _, :new_state], - handle_commit: [_, :newer_state]] = A.record(agent) + handle_execute: [%Q{}, [:param], _, :state] + ] = A.record(agent) end - test "execute error returns error" do + test "run execute error returns error" do err = RuntimeError.exception("oops") stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:error, err, :newer_state}, - {:ok, :committed, :newest_state} + {:error, err, :newer_state} ] {:ok, agent} = A.start_link(stack) opts = [agent: agent, parent: self()] {:ok, pool} = P.start_link(opts) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> assert P.execute(conn, %Q{}, [:param]) == {:error, err} :hi - end) == {:ok, :hi} + end) == :hi assert [ connect: [_], - handle_begin: [_, :state], - handle_execute: [%Q{}, [:param], _, :new_state], - handle_commit: [_, :newer_state]] = A.record(agent) + handle_execute: [%Q{}, [:param], _, :state] + ] = A.record(agent) end - test "execute! error raises error" do + test "run execute! error raises error" do err = RuntimeError.exception("oops") stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:error, err, :newer_state}, - {:ok, %R{}, :newest_state}, - {:ok, :committed, :newest_state} + {:error, err, :new_state}, + {:ok, %R{}, :newer_state}, ] {:ok, agent} = A.start_link(stack) opts = [agent: agent, parent: self()] {:ok, pool} = P.start_link(opts) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> assert_raise RuntimeError, "oops", fn() -> P.execute!(conn, %Q{}, [:param]) end assert P.execute(conn, %Q{}, [:param]) == {:ok, %R{}} :hi - end) == {:ok, :hi} + end) == :hi assert [ connect: [_], - handle_begin: [_, :state], + handle_execute: [%Q{}, [:param], _, :state], handle_execute: [%Q{}, [:param], _, :new_state], - handle_execute: [%Q{}, [:param], _, :newer_state], - handle_commit: [_, :newest_state]] = A.record(agent) + ] = A.record(agent) end - test "execute disconnect returns error" do + test "run execute disconnect returns error" do err = RuntimeError.exception("oops") stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:disconnect, err, :newer_state}, + {:disconnect, err, :new_state}, :ok, fn(opts) -> send(opts[:parent], :reconnected) - {:ok, :state} + {:ok, :state2} end ] {:ok, agent} = A.start_link(stack) @@ -142,33 +128,31 @@ defmodule TransactionExecuteTest do opts = [agent: agent, parent: self()] {:ok, pool} = P.start_link(opts) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> assert P.execute(conn, %Q{}, [:param]) == {:error, err} assert_raise DBConnection.ConnectionError, "connection is closed", - fn() -> P.execute(conn, %Q{}, [:param]) end + fn() -> P.execute!(conn, %Q{}, [:param]) end :closed - end) == {:error, :rollback} + end) == :closed assert_receive :reconnected assert [ connect: [opts2], - handle_begin: [_, :state], - handle_execute: [%Q{}, [:param], _, :new_state], - disconnect: [^err, :newer_state], + handle_execute: [%Q{}, [:param], _, :state], + disconnect: [^err, :new_state], connect: [opts2]] = A.record(agent) end - test "execute bad return raises DBConnection.ConnectionError and stops" do + test "run execute bad return raises DBConnection.ConnectionError and stops" do stack = [ fn(opts) -> send(opts[:parent], {:hi, self()}) Process.link(opts[:parent]) {:ok, :state} end, - {:ok, :began, :new_state}, :oops, {:ok, :state} ] @@ -180,15 +164,15 @@ defmodule TransactionExecuteTest do Process.flag(:trap_exit, true) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> assert_raise DBConnection.ConnectionError, "bad return value: :oops", fn() -> P.execute(conn, %Q{}, [:param]) end assert_raise DBConnection.ConnectionError, "connection is closed", - fn() -> P.execute(conn, %Q{}, [:param]) end + fn() -> P.execute!(conn, %Q{}, [:param]) end :closed - end) == {:error, :rollback} + end) == :closed prefix = "client #{inspect self()} stopped: " <> "** (DBConnection.ConnectionError) bad return value: :oops" @@ -199,18 +183,16 @@ defmodule TransactionExecuteTest do assert [ {:connect, _}, - {:handle_begin, [_, :state]}, - {:handle_execute, [%Q{}, [:param], _, :new_state]} | _] = A.record(agent) + {:handle_execute, [%Q{}, [:param], _, :state]} | _] = A.record(agent) end - test "execute raise raises and stops connection" do + test "run execute raise raises and stops connection" do stack = [ fn(opts) -> send(opts[:parent], {:hi, self()}) Process.link(opts[:parent]) {:ok, :state} end, - {:ok, :began, :new_state}, fn(_, _, _, _) -> raise "oops" end, @@ -224,15 +206,15 @@ defmodule TransactionExecuteTest do Process.flag(:trap_exit, true) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> assert_raise RuntimeError, "oops", fn() -> P.execute(conn, %Q{}, [:param]) end assert_raise DBConnection.ConnectionError, "connection is closed", - fn() -> P.execute(conn, %Q{}, [:param]) end + fn() -> P.execute!(conn, %Q{}, [:param]) end :closed - end) == {:error, :rollback} + end) == :closed prefix = "client #{inspect self()} stopped: ** (RuntimeError) oops" @@ -243,65 +225,6 @@ defmodule TransactionExecuteTest do assert [ {:connect, _}, - {:handle_begin, [_, :state]}, - {:handle_execute, [%Q{}, [:param], _, :new_state]}| _] = A.record(agent) - end - - test "execute raises after inner transaction rollback" do - stack = [ - {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, :rolledback, :newer_state} - ] - {:ok, agent} = A.start_link(stack) - - opts = [agent: agent, parent: self()] - {:ok, pool} = P.start_link(opts) - - assert P.transaction(pool, fn(conn) -> - assert P.transaction(conn, fn(conn2) -> - P.rollback(conn2, :oops) - end) == {:error, :oops} - - assert_raise DBConnection.ConnectionError, "transaction rolling back", - fn() -> P.execute!(conn, %Q{}, [:param]) end - end) == {:error, :rollback} - - assert [ - connect: [_], - handle_begin: [ _, :state], - handle_rollback: [_, :new_state]] = A.record(agent) - end - - test "transaction does not log commit if closed" do - stack = [ - {:ok, :state}, - {:ok, :began, :new_state}, - :oops, - {:ok, :state2} - ] - {:ok, agent} = A.start_link(stack) - - parent = self() - opts = [agent: agent, parent: parent] - Process.flag(:trap_exit, true) - {:ok, pool} = P.start_link(opts) - - log = &send(parent, &1) - - P.transaction(pool, fn(conn) -> - assert_received %DBConnection.LogEntry{call: :transaction, - query: :begin} - assert_raise DBConnection.ConnectionError, - fn() -> P.execute(conn, %Q{}, [:param]) end - end, [log: log]) - - refute_received %DBConnection.LogEntry{call: :transaction} - - assert [ - {:connect, [_]}, - {:handle_begin, [_, :state]}, - {:handle_execute, [%Q{}, [:param], _, :new_state]} | - _] = A.record(agent) + {:handle_execute, [%Q{}, [:param], _, :state]}| _] = A.record(agent) end end diff --git a/integration_test/cases/stream_test.exs b/integration_test/cases/stream_test.exs index 0a9e1020..ac931f51 100644 --- a/integration_test/cases/stream_test.exs +++ b/integration_test/cases/stream_test.exs @@ -11,86 +11,76 @@ defmodule StreamTest do test "stream returns result" do stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, %C{}, :newer_state}, - {:ok, %R{}, :newest_state}, - {:deallocate, %R{}, :state2}, - {:ok, :deallocated, :new_state2}, - {:ok, :commited, :newer_state2} + {:ok, %C{}, :new_state}, + {:ok, %R{}, :newer_state}, + {:deallocate, %R{}, :newest_state}, + {:ok, :deallocated, :state2}, ] {:ok, agent} = A.start_link(stack) opts = [agent: agent, parent: self()] {:ok, pool} = P.start_link(opts) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> stream = P.stream(conn, %Q{}, [:param]) assert %DBConnection.Stream{} = stream assert Enum.to_list(stream) == [%R{}, %R{}] :hi - end) == {:ok, :hi} + end) == :hi assert [ connect: [_], - handle_begin: [_, :state], - handle_declare: [%Q{}, [:param], _, :new_state], - handle_first: [%Q{}, %C{}, _, :newer_state], - handle_next: [%Q{}, %C{}, _, :newest_state], - handle_deallocate: [%Q{}, %C{}, _, :state2], - handle_commit: [_, :new_state2] + handle_declare: [%Q{}, [:param], _, :state], + handle_first: [%Q{}, %C{}, _, :new_state], + handle_next: [%Q{}, %C{}, _, :newer_state], + handle_deallocate: [%Q{}, %C{}, _, :newest_state] ] = A.record(agent) end test "stream encodes params and decodes result" do stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, %C{}, :newer_state}, - {:deallocate, %R{}, :newest_state}, - {:ok, :deallocated, :state2}, - {:ok, :committed, :new_state2} + {:ok, %C{}, :new_state}, + {:deallocate, %R{}, :newer_state}, + {:ok, :deallocated, :newest_state} ] {:ok, agent} = A.start_link(stack) opts = [agent: agent, parent: self()] {:ok, pool} = P.start_link(opts) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> opts2 = [encode: fn([:param]) -> :encoded end, decode: fn(%R{}) -> :decoded end] stream = P.stream(conn, %Q{}, [:param], opts2) assert Enum.to_list(stream) == [:decoded] :hi - end) == {:ok, :hi} + end) == :hi assert [ connect: [_], - handle_begin: [_, :state], - handle_declare: [_, :encoded, _, :new_state], - handle_first: [%Q{}, %C{}, _, :newer_state], - handle_deallocate: [%Q{}, %C{}, _, :newest_state], - handle_commit: [_, :state2] + handle_declare: [_, :encoded, _, :state], + handle_first: [%Q{}, %C{}, _, :new_state], + handle_deallocate: [%Q{}, %C{}, _, :newer_state] ] = A.record(agent) end test "stream logs result" do stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, %C{}, :newer_state}, - {:ok, %R{}, :newest_state}, - {:ok, :result, :state2}, - {:ok, :committed, :new_state2} + {:ok, %C{}, :new_state}, + {:ok, %R{}, :newer_state}, + {:ok, :result, :newest_state} ] {:ok, agent} = A.start_link(stack) parent = self() opts = [agent: agent, parent: parent] {:ok, pool} = P.start_link(opts) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> stream = P.stream(conn, %Q{}, [:param], [log: &send(parent, &1)]) assert Enum.take(stream, 1) == [%R{}] :hi - end) == {:ok, :hi} + end) == :hi assert_received %DBConnection.LogEntry{call: :declare} = entry assert %{query: %Q{}, params: [:param], result: {:ok, %C{}}} = entry @@ -116,11 +106,9 @@ defmodule StreamTest do assert [ connect: [_], - handle_begin: [_, :state], - handle_declare: [%Q{}, [:param], _, :new_state], - handle_first: [%Q{}, %C{}, _, :newer_state], - handle_deallocate: [%Q{}, %C{}, _, :newest_state], - handle_commit: [_, :state2] + handle_declare: [%Q{}, [:param], _, :state], + handle_first: [%Q{}, %C{}, _, :new_state], + handle_deallocate: [%Q{}, %C{}, _, :newer_state] ] = A.record(agent) end @@ -128,9 +116,7 @@ defmodule StreamTest do err = RuntimeError.exception("oops") stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:error, err, :newer_state}, - {:ok, :comitted, :newest_state} + {:error, err, :new_state} ] {:ok, agent} = A.start_link(stack) @@ -138,11 +124,11 @@ defmodule StreamTest do opts = [agent: agent, parent: parent] {:ok, pool} = P.start_link(opts) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> stream = P.stream(conn, %Q{}, [:param], [log: &send(parent, &1)]) assert_raise RuntimeError, "oops", fn() -> Enum.take(stream, 1) end :hi - end) == {:ok, :hi} + end) == :hi assert_received %DBConnection.LogEntry{call: :declare} = entry assert %{query: %Q{}, params: [:param], result: {:error, ^err}} = entry @@ -153,9 +139,7 @@ defmodule StreamTest do assert [ connect: [_], - handle_begin: [_, :state], - handle_declare: [%Q{}, [:param], _, :new_state], - handle_commit: [_, :newer_state], + handle_declare: [%Q{}, [:param], _, :state] ] = A.record(agent) end @@ -163,8 +147,7 @@ defmodule StreamTest do err = RuntimeError.exception("oops") stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:disconnect, err, :newer_state}, + {:disconnect, err, :new_state}, :ok, fn(opts) -> send(opts[:parent], :reconnected) @@ -177,19 +160,18 @@ defmodule StreamTest do opts = [agent: agent, parent: parent] {:ok, pool} = P.start_link(opts) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> stream = P.stream(conn, %Q{}, [:param]) assert_raise RuntimeError, "oops", fn() -> Enum.take(stream, 1) end :hi - end) == {:error, :rollback} + end) == :hi assert_receive :reconnected assert [ connect: [_], - handle_begin: [_, :state], - handle_declare: [%Q{}, [:param], _, :new_state], - disconnect: [^err, :newer_state], + handle_declare: [%Q{}, [:param], _, :state], + disconnect: [^err, :new_state], connect: [_] ] = A.record(agent) end @@ -198,9 +180,8 @@ defmodule StreamTest do err = RuntimeError.exception("oops") stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, %C{}, :newer_state}, - {:disconnect, err, :newest_state}, + {:ok, %C{}, :new_state}, + {:disconnect, err, :newer_state}, :ok, fn(opts) -> send(opts[:parent], :reconnected) @@ -213,11 +194,11 @@ defmodule StreamTest do opts = [agent: agent, parent: parent] {:ok, pool} = P.start_link(opts) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> stream = P.stream(conn, %Q{}, [:param], [log: &send(parent, &1)]) assert_raise RuntimeError, "oops", fn() -> Enum.take(stream, 1) end :hi - end) == {:error, :rollback} + end) == :hi assert_received %DBConnection.LogEntry{call: :declare} @@ -234,55 +215,20 @@ defmodule StreamTest do assert [ connect: [_], - handle_begin: [_, :state], - handle_declare: [%Q{}, [:param], _, :new_state], - handle_first: [%Q{}, %C{}, _, :newer_state], - disconnect: [^err, :newest_state], + handle_declare: [%Q{}, [:param], _, :state], + handle_first: [%Q{}, %C{}, _, :new_state], + disconnect: [^err, :newer_state], connect: [_] ] = A.record(agent) end - test "stream deallocates after transaction failed" do - stack = [ - {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, %C{}, :newer_state}, - {:ok, %R{}, :newest_state}, - {:ok, :deallocated, :state2}, - {:ok, :rolledback, :new_state2} - ] - {:ok, agent} = A.start_link(stack) - - parent = self() - opts = [agent: agent, parent: parent] - {:ok, pool} = P.start_link(opts) - - assert P.transaction(pool, fn(conn) -> - stream = P.stream(conn, %Q{}, [:param], [log: &send(parent, &1)]) - catch_throw(Enum.map(stream, fn(%R{}) -> - {:error, :oops} = P.transaction(conn, &P.rollback(&1, :oops)) - throw(:escape) - end)) - end) == {:error, :rollback} - - assert [ - connect: [_], - handle_begin: [_, :state], - handle_declare: [%Q{}, [:param], _, :new_state], - handle_first: [%Q{}, %C{}, _, :newer_state], - handle_deallocate: [%Q{}, %C{}, _, :newest_state], - handle_rollback: [_, :state2] - ] = A.record(agent) - end - test "stream logs deallocate disconnect" do err = RuntimeError.exception("oops") stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, %C{}, :newer_state}, - {:ok, %R{}, :newest_state}, - {:disconnect, err, :state2}, + {:ok, %C{}, :new_state}, + {:ok, %R{}, :newer_state}, + {:disconnect, err, :newest_state}, :ok, fn(opts) -> send(opts[:parent], :reconnected) @@ -295,11 +241,11 @@ defmodule StreamTest do opts = [agent: agent, parent: parent] {:ok, pool} = P.start_link(opts) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> stream = P.stream(conn, %Q{}, [:param], [log: &send(parent, &1)]) assert_raise RuntimeError, "oops", fn() -> Enum.take(stream, 1) end :hi - end) == {:error, :rollback} + end) == :hi assert_received %DBConnection.LogEntry{call: :declare} assert_received %DBConnection.LogEntry{call: :first} @@ -315,11 +261,10 @@ defmodule StreamTest do assert [ connect: [_], - handle_begin: [_, :state], - handle_declare: [%Q{}, [:param], _, :new_state], - handle_first: [%Q{}, %C{}, _, :newer_state], - handle_deallocate: [%Q{}, %C{}, _, :newest_state], - disconnect: [^err, :state2], + handle_declare: [%Q{}, [:param], _, :state], + handle_first: [%Q{}, %C{}, _, :new_state], + handle_deallocate: [%Q{}, %C{}, _, :newer_state], + disconnect: [^err, :newest_state], connect: [_] ] = A.record(agent) end @@ -331,7 +276,6 @@ defmodule StreamTest do Process.link(opts[:parent]) {:ok, :state} end, - {:ok, :began, :new_state}, :oops, {:ok, :state2} ] @@ -343,12 +287,12 @@ defmodule StreamTest do assert_receive {:hi, conn} Process.flag(:trap_exit, true) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> stream = P.stream(conn, %Q{}, [:param]) assert_raise DBConnection.ConnectionError, "bad return value: :oops", fn() -> Enum.to_list(stream) end :hi - end) == {:error, :rollback} + end) == :hi prefix = "client #{inspect self()} stopped: " <> "** (DBConnection.ConnectionError) bad return value: :oops" @@ -359,8 +303,7 @@ defmodule StreamTest do assert [ {:connect, _}, - {:handle_begin, [_, :state]}, - {:handle_declare, [%Q{}, [:param], _, :new_state]} | _] = A.record(agent) + {:handle_declare, [%Q{}, [:param], _, :state]} | _] = A.record(agent) end test "stream declare raise raises and stops connection" do @@ -370,7 +313,6 @@ defmodule StreamTest do Process.link(opts[:parent]) {:ok, :state} end, - {:ok, :began, :new_state}, fn(_, _, _, _) -> raise "oops" end, @@ -384,11 +326,11 @@ defmodule StreamTest do assert_receive {:hi, conn} Process.flag(:trap_exit, true) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> stream = P.stream(conn, %Q{}, [:param]) assert_raise RuntimeError, "oops", fn() -> Enum.to_list(stream) end :hi - end) == {:error, :rollback} + end) == :hi prefix = "client #{inspect self()} stopped: ** (RuntimeError) oops" len = byte_size(prefix) @@ -398,18 +340,15 @@ defmodule StreamTest do assert [ {:connect, _}, - {:handle_begin, [_, :state]}, - {:handle_declare, [%Q{}, [:param], _, :new_state]} | _] = A.record(agent) + {:handle_declare, [%Q{}, [:param], _, :state]} | _] = A.record(agent) end test "stream declare log raises and continues" do stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, %C{}, :newer_state}, - {:deallocate, %R{}, :newest_state}, - {:ok, :deallocated, :state2}, - {:ok, :commited, :new_state2} + {:ok, %C{}, :new_state}, + {:deallocate, %R{}, :newer_state}, + {:ok, :deallocated, :newest_state}, ] {:ok, agent} = A.start_link(stack) @@ -417,22 +356,20 @@ defmodule StreamTest do opts = [agent: agent, parent: parent] {:ok, pool} = P.start_link(opts) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> opts2 = [log: fn(_) -> raise "logging oops" end] stream = P.stream(conn, %Q{}, [:param], opts2) assert capture_log(fn() -> assert Enum.to_list(stream) == [%R{}] end) =~ "logging oops" :hi - end) == {:ok, :hi} + end) == :hi assert [ connect: [_], - handle_begin: [_, :state], - handle_declare: [%Q{}, [:param], _, :new_state], - handle_first: [%Q{}, %C{}, _, :newer_state], - handle_deallocate: [%Q{}, %C{}, _, :newest_state], - handle_commit: [_, :state2] + handle_declare: [%Q{}, [:param], _, :state], + handle_first: [%Q{}, %C{}, _, :new_state], + handle_deallocate: [%Q{}, %C{}, _, :newer_state] ] = A.record(agent) end @@ -443,8 +380,7 @@ defmodule StreamTest do Process.link(opts[:parent]) {:ok, :state} end, - {:ok, :began, :new_state}, - {:ok, %C{}, :newer_state}, + {:ok, %C{}, :new_state}, :oops, {:ok, :state2} ] @@ -456,12 +392,12 @@ defmodule StreamTest do assert_receive {:hi, conn} Process.flag(:trap_exit, true) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> stream = P.stream(conn, %Q{}, [:param]) assert_raise DBConnection.ConnectionError, "bad return value: :oops", fn() -> Enum.to_list(stream) end :hi - end) == {:error, :rollback} + end) == :hi prefix = "client #{inspect self()} stopped: " <> "** (DBConnection.ConnectionError) bad return value: :oops" @@ -472,9 +408,8 @@ defmodule StreamTest do assert [ {:connect, _}, - {:handle_begin, [_, :state]}, - {:handle_declare, [%Q{}, [:param], _, :new_state]}, - {:handle_first, [%Q{}, %C{}, _, :newer_state]} | _] = A.record(agent) + {:handle_declare, [%Q{}, [:param], _, :state]}, + {:handle_first, [%Q{}, %C{}, _, :new_state]} | _] = A.record(agent) end test "stream deallocate raise raises and stops connection" do @@ -484,9 +419,8 @@ defmodule StreamTest do Process.link(opts[:parent]) {:ok, :state} end, - {:ok, :began, :new_state}, - {:ok, %C{}, :newer_state}, - {:ok, %R{}, :newest_state}, + {:ok, %C{}, :new_state}, + {:ok, %R{}, :newer_state}, fn(_, _, _, _) -> raise "oops" end, @@ -500,11 +434,11 @@ defmodule StreamTest do assert_receive {:hi, conn} Process.flag(:trap_exit, true) - assert P.transaction(pool, fn(conn) -> + assert P.run(pool, fn(conn) -> stream = P.stream(conn, %Q{}, [:param]) assert_raise RuntimeError, "oops", fn() -> Enum.take(stream, 1) end :hi - end) == {:error, :rollback} + end) == :hi prefix = "client #{inspect self()} stopped: ** (RuntimeError) oops" len = byte_size(prefix) @@ -514,9 +448,8 @@ defmodule StreamTest do assert [ {:connect, _}, - {:handle_begin, [_, :state]}, - {:handle_declare, [%Q{}, [:param], _, :new_state]}, - {:handle_first, [%Q{}, %C{}, _, :newer_state]}, - {:handle_deallocate, [%Q{}, %C{}, _, :newest_state]} | _] = A.record(agent) + {:handle_declare, [%Q{}, [:param], _, :state]}, + {:handle_first, [%Q{}, %C{}, _, :new_state]}, + {:handle_deallocate, [%Q{}, %C{}, _, :newer_state]} | _] = A.record(agent) end end diff --git a/integration_test/cases/transaction_test.exs b/integration_test/cases/transaction_test.exs index 371263aa..63e80783 100644 --- a/integration_test/cases/transaction_test.exs +++ b/integration_test/cases/transaction_test.exs @@ -4,43 +4,46 @@ defmodule TransactionTest do alias TestPool, as: P alias TestAgent, as: A - test "transaction returns result" do + test "begin/commit/rollback return result" do stack = [ {:ok, :state}, {:ok, :began, :new_state}, - {:ok, :comitted, :newer_state}, - {:ok, :began, :newest_state}, - {:ok, :committed, :newest_state} + {:ok, :committed, :newer_state}, + {:ok, :rolledback, :newest_state}, + {:ok, :began, :state2}, + {:ok, :committed, :new_state2}, + {:ok, :rolledback, :newer_state2} ] {:ok, agent} = A.start_link(stack) opts = [agent: agent, parent: self()] {:ok, pool} = P.start_link(opts) - assert P.transaction(pool, fn(conn) -> - assert %DBConnection{} = conn - :result - end) == {:ok, :result} + assert P.begin(pool, opts) == {:ok, :began} + assert P.commit(pool, opts) == {:ok, :committed} + assert P.rollback(pool, opts) == {:ok, :rolledback} - assert P.transaction(pool, fn(conn) -> - assert %DBConnection{} = conn - :result - end, [key: :value]) == {:ok, :result} + opts2 = [key: :value] ++ opts + assert P.begin(pool, opts2) == {:ok, :began} + assert P.commit(pool, opts2) == {:ok, :committed} + assert P.rollback(pool, opts2) == {:ok, :rolledback} assert [ connect: [_], handle_begin: [ _, :state], handle_commit: [_, :new_state], - handle_begin: [[{:key, :value} | _], :newer_state], - handle_commit: [[{:key, :value} | _], :newest_state]] = A.record(agent) + handle_rollback: [_, :newer_state], + handle_begin: [[{:key, :value} | _], :newest_state], + handle_commit: [[{:key, :value} | _], :state2], + handle_rollback: [[{:key, :value} | _], :new_state2] + ] = A.record(agent) end - test "transaction logs begin/commit/rollback" do + test "begin/commit/rollback log results" do stack = [ {:ok, :state}, {:ok, :began, :new_state}, {:ok, :committed, :newer_state}, - {:ok, :began, :newest_state}, {:ok, :rolledback, :newest_state} ] {:ok, agent} = A.start_link(stack) @@ -51,33 +54,32 @@ defmodule TransactionTest do log = &send(parent, &1) - assert P.transaction(pool, fn(_) -> - assert_received %DBConnection.LogEntry{call: :transaction} = entry - assert %{query: :begin, params: nil, result: {:ok, :began}} = entry - assert is_integer(entry.pool_time) - assert entry.pool_time >= 0 - assert is_integer(entry.connection_time) - assert entry.connection_time >= 0 - assert is_nil(entry.decode_time) + assert P.begin(pool, [log: log] ++ opts) == {:ok, :began} + + assert_received %DBConnection.LogEntry{call: :begin} = entry + assert %{query: :begin, params: nil, result: {:ok, :began}} = entry + assert is_integer(entry.pool_time) + assert entry.pool_time >= 0 + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_nil(entry.decode_time) - :result - end, [log: log]) == {:ok, :result} + assert P.commit(pool, [log: log] ++ opts) == {:ok, :committed} - assert_received %DBConnection.LogEntry{call: :transaction} = entry + assert_received %DBConnection.LogEntry{call: :commit} = entry assert %{query: :commit, params: nil, result: {:ok, :committed}} = entry - assert is_nil(entry.pool_time) + assert is_integer(entry.pool_time) + assert entry.pool_time >= 0 assert is_integer(entry.connection_time) assert entry.connection_time >= 0 assert is_nil(entry.decode_time) - assert P.transaction(pool, fn(conn) -> - assert_received %DBConnection.LogEntry{} - P.rollback(conn, :result) - end, [log: log]) == {:error, :result} + assert P.rollback(pool, [log: log] ++ opts) == {:ok, :rolledback} - assert_received %DBConnection.LogEntry{call: :transaction} = entry + assert_received %DBConnection.LogEntry{call: :rollback} = entry assert %{query: :rollback, params: nil, result: {:ok, :rolledback}} = entry - assert is_nil(entry.pool_time) + assert is_integer(entry.pool_time) + assert entry.pool_time >= 0 assert is_integer(entry.connection_time) assert entry.connection_time >= 0 assert is_nil(entry.decode_time) @@ -86,211 +88,216 @@ defmodule TransactionTest do connect: [_], handle_begin: [ _, :state], handle_commit: [_, :new_state], - handle_begin: [_, :newer_state], - handle_rollback: [_, :newest_state]] = A.record(agent) + handle_rollback: [_, :newer_state]] = A.record(agent) end - test "transaction rollback returns error" do + test "begin!/commit!/rollback! error raises error" do + err = RuntimeError.exception("oops") stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, :rolledback, :newer_state}, - {:ok, :began, :newest_state}, - {:ok, :rolledback, :newest_state} + {:error, err, :new_state}, + {:error, err, :newer_state}, + {:error, err, :newest_state} ] {:ok, agent} = A.start_link(stack) opts = [agent: agent, parent: self()] {:ok, pool} = P.start_link(opts) - assert P.transaction(pool, fn(conn) -> - P.rollback(conn, :oops) - end) == {:error, :oops} - - assert P.transaction(pool, fn(_) -> :result end) == {:ok, :result} + assert_raise RuntimeError, "oops", fn() -> P.begin!(pool, opts) end + assert_raise RuntimeError, "oops", fn() -> P.commit!(pool, opts) end + assert_raise RuntimeError, "oops", fn() -> P.rollback!(pool, opts) end assert [ connect: [_], handle_begin: [ _, :state], - handle_rollback: [_, :new_state], - handle_begin: [_, :newer_state], - handle_commit: [_, :newest_state]] = A.record(agent) + handle_commit: [_, :new_state], + handle_rollback: [_, :newer_state] + ] = A.record(agent) end - test "inner transaction rollback returns error on other transactions" do + test "begin/commit/rollback logs error" do + err = RuntimeError.exception("oops") stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, :rolledback, :newer_state}, - {:ok, :began, :newest_state}, - {:ok, :comittted, :newest_state} + {:error, err, :new_state}, + {:error, err, :newer_state}, + {:error, err, :newest_state} ] {:ok, agent} = A.start_link(stack) - opts = [agent: agent, parent: self()] + parent = self() + opts = [agent: agent, parent: parent] {:ok, pool} = P.start_link(opts) - assert P.transaction(pool, fn(conn) -> - assert P.transaction(conn, fn(conn2) -> - P.rollback(conn2, :oops) - end) == {:error, :oops} - - assert P.transaction(conn, fn(_) -> nil end) == {:error, :rollback} - end) == {:error, :rollback} - - assert P.transaction(pool, fn(_) -> :result end) == {:ok, :result} + log = &send(parent, &1) - assert [ - connect: [_], - handle_begin: [ _, :state], - handle_rollback: [_, :new_state], - handle_begin: [_, :newer_state], - handle_commit: [_, :newest_state]] = A.record(agent) - end + assert P.begin(pool, [log: log] ++ opts) == {:error, err} - test "outer transaction rolls back after inner rollback" do - stack = [ - {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, :rolledback, :newer_state}, - {:ok, :began, :newest_state}, - {:ok, :committed, :newest_state} - ] - {:ok, agent} = A.start_link(stack) + assert_received %DBConnection.LogEntry{call: :begin} = entry + assert %{query: :begin, params: nil, result: {:error, ^err}} = entry + assert is_integer(entry.pool_time) + assert entry.pool_time >= 0 + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_nil(entry.decode_time) - opts = [agent: agent, parent: self()] - {:ok, pool} = P.start_link(opts) + assert P.commit(pool, [log: log] ++ opts) == {:error, err} - assert P.transaction(pool, fn(conn) -> - assert P.transaction(conn, fn(conn2) -> - P.rollback(conn2, :oops) - end) == {:error, :oops} + assert_received %DBConnection.LogEntry{call: :commit} = entry + assert %{query: :commit, params: nil, result: {:error, ^err}} = entry + assert is_integer(entry.pool_time) + assert entry.pool_time >= 0 + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_nil(entry.decode_time) - P.rollback(conn, :oops2) - end) == {:error, :oops2} + assert P.rollback(pool, [log: log] ++ opts) == {:error, err} - assert P.transaction(pool, fn(_) -> :result end) == {:ok, :result} + assert_received %DBConnection.LogEntry{call: :rollback} = entry + assert %{query: :rollback, params: nil, result: {:error, ^err}} = entry + assert is_integer(entry.pool_time) + assert entry.pool_time >= 0 + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_nil(entry.decode_time) assert [ connect: [_], - handle_begin: [ _, :state], - handle_rollback: [_, :new_state], - handle_begin: [_, :newer_state], - handle_commit: [_, :newest_state]] = A.record(agent) + handle_begin: [_, :state], + handle_commit: [_, :new_state], + handle_rollback: [_, :newer_state] + ] = A.record(agent) end - test "inner transaction raise returns error on other transactions" do + test "begin! disconnect raises error" do + err = RuntimeError.exception("oops") stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, :rolledback, :newer_state}, - {:ok, :began, :newest_state}, - {:ok, :committed, :newest_state} + {:disconnect, err, :new_state}, + :ok, + fn(opts) -> + send(opts[:parent], :reconnected) + {:ok, :newest_state} + end ] {:ok, agent} = A.start_link(stack) opts = [agent: agent, parent: self()] {:ok, pool} = P.start_link(opts) - assert P.transaction(pool, fn(conn) -> - assert_raise RuntimeError, "oops", - fn() -> P.transaction(conn, fn(_) -> raise "oops" end) end - - assert P.transaction(conn, fn(_) -> nil end) == {:error, :rollback} - end) == {:error, :rollback} + assert_raise RuntimeError, "oops", fn() -> P.begin!(pool, opts) end - assert P.transaction(pool, fn(_) -> :result end) == {:ok, :result} + assert_receive :reconnected assert [ connect: [_], - handle_begin: [ _, :state], - handle_rollback: [_, :new_state], - handle_begin: [_, :newer_state], - handle_commit: [_, :newest_state]] = A.record(agent) + handle_begin: [_, :state], + disconnect: [_, :new_state], + connect: [_]] = A.record(agent) end - test "transaction and transaction returns result" do + test "begin bad return raises and stops connection" do stack = [ - {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, :committed, :newer_state} + fn(opts) -> + send(opts[:parent], {:hi, self()}) + Process.link(opts[:parent]) + {:ok, :state} + end, + :oops, + {:ok, :state} ] {:ok, agent} = A.start_link(stack) opts = [agent: agent, parent: self()] {:ok, pool} = P.start_link(opts) + assert_receive {:hi, pid} + + Process.flag(:trap_exit, true) + assert_raise DBConnection.ConnectionError, "bad return value: :oops", + fn() -> P.begin(pool, opts) end - assert P.transaction(pool, fn(conn) -> - assert P.transaction(conn, fn(conn2) -> - assert %DBConnection{} = conn2 - assert conn == conn2 - :result - end) == {:ok, :result} - :result - end) == {:ok, :result} + prefix = "client #{inspect self()} stopped: " <> + "** (DBConnection.ConnectionError) bad return value: :oops" + len = byte_size(prefix) + assert_receive {:EXIT, ^pid, + {%DBConnection.ConnectionError{message: <<^prefix::binary-size(len), _::binary>>}, + [_|_]}} assert [ - connect: [_], - handle_begin: [ _, :state], - handle_commit: [_, :new_state]] = A.record(agent) + {:connect, _}, + {:handle_begin, [_, :state]}| _] = A.record(agent) end - test "transaction and run returns result" do + test "begin raise raises and stops connection" do stack = [ - {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, :committed, :newer_state} + fn(opts) -> + send(opts[:parent], {:hi, self()}) + Process.link(opts[:parent]) + {:ok, :state} + end, + fn(_, _) -> + raise "oops" + end, + {:ok, :state} ] {:ok, agent} = A.start_link(stack) opts = [agent: agent, parent: self()] {:ok, pool} = P.start_link(opts) + assert_receive {:hi, pid} - assert P.transaction(pool, fn(conn) -> - assert P.run(conn, fn(conn2) -> - assert %DBConnection{} = conn2 - assert conn == conn2 - :result - end) == :result - :result - end) == {:ok, :result} + Process.flag(:trap_exit, true) + assert_raise RuntimeError, "oops", + fn() -> P.begin(pool, opts) end + + prefix = "client #{inspect self()} stopped: ** (RuntimeError) oops" + len = byte_size(prefix) + assert_receive {:EXIT, ^pid, + {%DBConnection.ConnectionError{message: <<^prefix::binary-size(len), _::binary>>}, + [_|_]}} assert [ - connect: [_], - handle_begin: [ _, :state], - handle_commit: [_, :new_state]] = A.record(agent) + {:connect, _}, + {:handle_begin, [_, :state]} | _] = A.record(agent) end - test "transaction begin error raises error" do + test "run begin!/commit!/rollback! error raises error" do err = RuntimeError.exception("oops") stack = [ {:ok, :state}, {:error, err, :new_state}, - {:ok, :began, :newer_state}, - {:ok, :committed, :newest_state} + {:error, err, :newer_state}, + {:error, err, :newest_state} ] {:ok, agent} = A.start_link(stack) opts = [agent: agent, parent: self()] {:ok, pool} = P.start_link(opts) - assert_raise RuntimeError, "oops", - fn() -> P.transaction(pool, fn(_) -> flunk("transaction ran") end) end + assert P.run(pool, fn(conn) -> + assert_raise RuntimeError, "oops", fn() -> P.begin!(conn, opts) end + assert_raise RuntimeError, "oops", fn() -> P.commit!(conn, opts) end + assert_raise RuntimeError, "oops", fn() -> P.rollback!(conn, opts) end - assert P.transaction(pool, fn(_) -> :result end) == {:ok, :result} + :hi + end) == :hi assert [ connect: [_], handle_begin: [ _, :state], - handle_begin: [_, :new_state], - handle_commit: [_, :newer_state]] = A.record(agent) + handle_commit: [_, :new_state], + handle_rollback: [_, :newer_state] + ] = A.record(agent) end - test "transaction logs begin error" do + test "run begin/commit/rollback logs error" do err = RuntimeError.exception("oops") stack = [ {:ok, :state}, - {:error, err, :new_state} + {:error, err, :new_state}, + {:error, err, :newer_state}, + {:error, err, :newest_state} ] {:ok, agent} = A.start_link(stack) @@ -299,64 +306,55 @@ defmodule TransactionTest do {:ok, pool} = P.start_link(opts) log = &send(parent, &1) - assert_raise RuntimeError, "oops", - fn() -> - P.transaction(pool, fn(_) -> flunk("transaction ran") end, [log: log]) - end - assert_received %DBConnection.LogEntry{call: :transaction} = entry + assert P.run(pool, fn(conn) -> + assert P.begin(conn, [log: log] ++ opts) == {:error, err} + + :hi + end) == :hi + + assert_received %DBConnection.LogEntry{call: :begin} = entry assert %{query: :begin, params: nil, result: {:error, ^err}} = entry - assert is_integer(entry.pool_time) - assert entry.pool_time >= 0 + assert is_nil(entry.pool_time) assert is_integer(entry.connection_time) assert entry.connection_time >= 0 assert is_nil(entry.decode_time) - assert [ - connect: [_], - handle_begin: [ _, :state]] = A.record(agent) - end + assert P.run(pool, fn(conn) -> + assert P.commit(conn, [log: log] ++ opts) == {:error, err} - test "transaction logs begin raise" do - stack = [ - fn(opts) -> - Process.link(opts[:parent]) - {:ok, :state} - end, - fn(_, _) -> - raise "oops" - end, - {:ok, :state2} - ] - {:ok, agent} = A.start_link(stack) + :hi + end) == :hi - parent = self() - opts = [agent: agent, parent: parent] - _ = Process.flag(:trap_exit, true) - {:ok, pool} = P.start_link(opts) + assert_received %DBConnection.LogEntry{call: :commit} = entry + assert %{query: :commit, params: nil, result: {:error, ^err}} = entry + assert is_nil(entry.pool_time) + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_nil(entry.decode_time) - log = &send(parent, &1) - assert_raise RuntimeError, "oops", - fn() -> - P.transaction(pool, fn(_) -> flunk("transaction ran") end, [log: log]) - end + assert P.run(pool, fn(conn) -> + assert P.rollback(conn, [log: log] ++ opts) == {:error, err} - assert_received %DBConnection.LogEntry{call: :transaction} = entry - assert %{query: :begin, params: nil, result: {:error, err}} = entry - assert %DBConnection.ConnectionError{message: "an exception was raised: ** (RuntimeError) oops" <> _} = err - assert is_integer(entry.pool_time) - assert entry.pool_time >= 0 + :hi + end) == :hi + + assert_received %DBConnection.LogEntry{call: :rollback} = entry + assert %{query: :rollback, params: nil, result: {:error, ^err}} = entry + assert is_nil(entry.pool_time) assert is_integer(entry.connection_time) assert entry.connection_time >= 0 assert is_nil(entry.decode_time) - assert_receive {:EXIT, _, {%DBConnection.ConnectionError{}, [_|_]}} assert [ - {:connect, [_]}, - {:handle_begin, [ _, :state]} | _] = A.record(agent) + connect: [_], + handle_begin: [_, :state], + handle_commit: [_, :new_state], + handle_rollback: [_, :newer_state] + ] = A.record(agent) end - test "transaction begin disconnect raises error" do + test "run begin! disconnect raises error" do err = RuntimeError.exception("oops") stack = [ {:ok, :state}, @@ -372,8 +370,11 @@ defmodule TransactionTest do opts = [agent: agent, parent: self()] {:ok, pool} = P.start_link(opts) - assert_raise RuntimeError, "oops", - fn() -> P.transaction(pool, fn(_) -> flunk("transaction ran") end) end + assert P.run(pool, fn(conn) -> + assert_raise RuntimeError, "oops", fn() -> P.begin!(conn, opts) end + + :hi + end) == :hi assert_receive :reconnected @@ -384,7 +385,7 @@ defmodule TransactionTest do connect: [_]] = A.record(agent) end - test "transaction begin bad return raises and stops connection" do + test "run begin bad return raises and stops connection" do stack = [ fn(opts) -> send(opts[:parent], {:hi, self()}) @@ -398,16 +399,21 @@ defmodule TransactionTest do opts = [agent: agent, parent: self()] {:ok, pool} = P.start_link(opts) - assert_receive {:hi, conn} + assert_receive {:hi, pid} Process.flag(:trap_exit, true) - assert_raise DBConnection.ConnectionError, "bad return value: :oops", - fn() -> P.transaction(pool, fn(_) -> flunk("transaction ran") end) end + + assert P.run(pool, fn(conn) -> + assert_raise DBConnection.ConnectionError, "bad return value: :oops", + fn() -> P.begin(conn, opts) end + + :hi + end) == :hi prefix = "client #{inspect self()} stopped: " <> "** (DBConnection.ConnectionError) bad return value: :oops" len = byte_size(prefix) - assert_receive {:EXIT, ^conn, + assert_receive {:EXIT, ^pid, {%DBConnection.ConnectionError{message: <<^prefix::binary-size(len), _::binary>>}, [_|_]}} @@ -416,7 +422,7 @@ defmodule TransactionTest do {:handle_begin, [_, :state]}| _] = A.record(agent) end - test "transaction begin raise raises and stops connection" do + test "run begin raise raises and stops connection" do stack = [ fn(opts) -> send(opts[:parent], {:hi, self()}) @@ -432,15 +438,20 @@ defmodule TransactionTest do opts = [agent: agent, parent: self()] {:ok, pool} = P.start_link(opts) - assert_receive {:hi, conn} + assert_receive {:hi, pid} Process.flag(:trap_exit, true) - assert_raise RuntimeError, "oops", - fn() -> P.transaction(pool, fn(_) -> flunk("transaction ran") end) end + + assert P.run(pool, fn(conn) -> + assert_raise RuntimeError, "oops", + fn() -> P.begin(conn, opts) end + + :hi + end) == :hi prefix = "client #{inspect self()} stopped: ** (RuntimeError) oops" len = byte_size(prefix) - assert_receive {:EXIT, ^conn, + assert_receive {:EXIT, ^pid, {%DBConnection.ConnectionError{message: <<^prefix::binary-size(len), _::binary>>}, [_|_]}} @@ -449,74 +460,225 @@ defmodule TransactionTest do {:handle_begin, [_, :state]} | _] = A.record(agent) end - test "transaction commit error raises error" do - err = RuntimeError.exception("oops") + test "checkout_begin returns result and conn" do stack = [ {:ok, :state}, {:ok, :began, :new_state}, + {:ok, :began, :newer_state} + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self()] + {:ok, pool} = P.start_link(opts) + + assert {:ok, conn, :began} = P.checkout_begin(pool, opts) + assert P.checkin(conn, opts) == :ok + + + opts2 = [key: :value] ++ opts + assert {:ok, conn, :began} = P.checkout_begin(pool, opts2) + assert P.checkin(conn, opts) == :ok + + assert [ + connect: [_], + handle_begin: [ _, :state], + handle_begin: [[{:key, :value} | _], :new_state], + ] = A.record(agent) + end + + test "commit_checkin returns result" do + stack = [ + {:ok, :state}, + {:ok, :committed, :new_state}, + {:ok, :committed, :newer_state} + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self()] + {:ok, pool} = P.start_link(opts) + + assert {:ok, conn} = P.checkout(pool, opts) + assert P.commit_checkin(conn, opts) == {:ok, :committed} + + + opts2 = [key: :value] ++ opts + assert {:ok, conn} = P.checkout(pool, opts2) + assert P.commit_checkin(conn, opts2) == {:ok, :committed} + + assert [ + connect: [_], + handle_commit: [ _, :state], + handle_commit: [[{:key, :value} | _], :new_state], + ] = A.record(agent) + end + + test "rollback_checkin returns result" do + stack = [ + {:ok, :state}, + {:ok, :rolledback, :new_state}, + {:ok, :rolledback, :newer_state} + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self()] + {:ok, pool} = P.start_link(opts) + + assert {:ok, conn} = P.checkout(pool, opts) + assert P.rollback_checkin(conn, opts) == {:ok, :rolledback} + + + opts2 = [key: :value] ++ opts + assert {:ok, conn} = P.checkout(pool, opts2) + assert P.rollback_checkin(conn, opts2) == {:ok, :rolledback} + + assert [ + connect: [_], + handle_rollback: [ _, :state], + handle_rollback: [[{:key, :value} | _], :new_state], + ] = A.record(agent) + end + + test "checkout_begin/commit_checkin/rollback_checkin log results" do + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:ok, :committed, :newer_state}, + {:ok, :rolledback, :newest_state} + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + + log = &send(parent, &1) + + assert {:ok, conn, :began} = P.checkout_begin(pool, [log: log] ++ opts) + + assert_received %DBConnection.LogEntry{call: :checkout_begin} = entry + assert %{query: :begin, params: nil, result: {:ok, ^conn, :began}} = entry + assert is_integer(entry.pool_time) + assert entry.pool_time >= 0 + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_nil(entry.decode_time) + + assert P.commit_checkin(conn, [log: log] ++ opts) == {:ok, :committed} + + assert_received %DBConnection.LogEntry{call: :commit_checkin} = entry + assert %{query: :commit, params: nil, result: {:ok, :committed}} = entry + assert is_nil(entry.pool_time) + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_nil(entry.decode_time) + + assert {:ok, conn} = P.checkout(pool, opts) + + assert P.rollback_checkin(conn, [log: log] ++ opts) == {:ok, :rolledback} + + assert_received %DBConnection.LogEntry{call: :rollback_checkin} = entry + assert %{query: :rollback, params: nil, result: {:ok, :rolledback}} = entry + assert is_nil(entry.pool_time) + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_nil(entry.decode_time) + + assert [ + connect: [_], + handle_begin: [ _, :state], + handle_commit: [_, :new_state], + handle_rollback: [_, :newer_state]] = A.record(agent) + end + + test "checkout_begin!/commit_checkin!/rollback_checkin! error raises error" do + err = RuntimeError.exception("oops") + stack = [ + {:ok, :state}, + {:error, err, :new_state}, {:error, err, :newer_state}, - {:ok, :began, :newest_state}, - {:ok, :committed, :newest_state} + {:error, err, :newest_state} ] {:ok, agent} = A.start_link(stack) opts = [agent: agent, parent: self()] {:ok, pool} = P.start_link(opts) - assert_raise RuntimeError, "oops", - fn() -> P.transaction(pool, fn(_) -> :ok end) end + assert_raise RuntimeError, "oops", fn() -> P.checkout_begin!(pool, opts) end + + assert {:ok, conn} = P.checkout(pool, opts) + assert_raise RuntimeError, "oops", fn() -> P.commit!(conn, opts) end + assert_raise RuntimeError, "oops", fn() -> P.rollback!(conn, opts) end - assert P.transaction(pool, fn(_) -> :result end) == {:ok, :result} + assert :ok = P.checkin(conn, opts) assert [ connect: [_], - handle_begin: [_, :state], + handle_begin: [ _, :state], handle_commit: [_, :new_state], - handle_begin: [_, :newer_state], - handle_commit: [_, :newest_state]] = A.record(agent) + handle_rollback: [_, :newer_state] + ] = A.record(agent) end - test "transaction logs commit error" do + test "checkout_begin/checkout_commit/checkout_rollback logs error" do err = RuntimeError.exception("oops") stack = [ {:ok, :state}, - {:ok, :began, :new_state}, + {:error, err, :new_state}, {:error, err, :newer_state}, + {:error, err, :newest_state} ] {:ok, agent} = A.start_link(stack) parent = self() - opts = [agent: agent, parent: self()] + opts = [agent: agent, parent: parent] {:ok, pool} = P.start_link(opts) log = &send(parent, &1) - assert_raise RuntimeError, "oops", - fn() -> - P.transaction(pool, fn(_) -> - assert_received %DBConnection.LogEntry{} - end, [log: log]) - end + assert P.checkout_begin(pool, [log: log] ++ opts) == {:error, err} + + assert_received %DBConnection.LogEntry{call: :checkout_begin} = entry + assert %{query: :begin, params: nil, result: {:error, ^err}} = entry + assert is_integer(entry.pool_time) + assert entry.pool_time >= 0 + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_nil(entry.decode_time) - assert_received %DBConnection.LogEntry{call: :transaction} = entry + assert {:ok, conn} = P.checkout(pool, opts) + assert P.commit_checkin(conn, [log: log] ++ opts) == {:error, err} + + assert_received %DBConnection.LogEntry{call: :commit_checkin} = entry assert %{query: :commit, params: nil, result: {:error, ^err}} = entry assert is_nil(entry.pool_time) assert is_integer(entry.connection_time) assert entry.connection_time >= 0 assert is_nil(entry.decode_time) + assert P.rollback_checkin(conn, [log: log] ++ opts) == {:error, err} + + assert_received %DBConnection.LogEntry{call: :rollback_checkin} = entry + assert %{query: :rollback, params: nil, result: {:error, ^err}} = entry + assert is_nil(entry.pool_time) + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_nil(entry.decode_time) + + assert :ok = P.checkin(conn, opts) + assert [ connect: [_], handle_begin: [_, :state], - handle_commit: [_, :new_state]] = A.record(agent) + handle_commit: [_, :new_state], + handle_rollback: [_, :newer_state] + ] = A.record(agent) end - test "transaction commit disconnect raises error" do + test "checkout_begin! disconnect raises error" do err = RuntimeError.exception("oops") stack = [ {:ok, :state}, - {:ok, :began, :new_state}, - {:disconnect, err, :newer_state}, + {:disconnect, err, :new_state}, :ok, fn(opts) -> send(opts[:parent], :reconnected) @@ -528,27 +690,52 @@ defmodule TransactionTest do opts = [agent: agent, parent: self()] {:ok, pool} = P.start_link(opts) - assert_raise RuntimeError, "oops", - fn() -> P.transaction(pool, fn(_) -> :result end) end + assert_raise RuntimeError, "oops", fn() -> P.checkout_begin!(pool, opts) end assert_receive :reconnected assert [ connect: [_], handle_begin: [_, :state], - handle_commit: [_, :new_state], - disconnect: [_, :newer_state], + disconnect: [_, :new_state], + connect: [_]] = A.record(agent) + end + + test "commit_checkin! disconnect raises error" do + err = RuntimeError.exception("oops") + stack = [ + {:ok, :state}, + {:disconnect, err, :new_state}, + :ok, + fn(opts) -> + send(opts[:parent], :reconnected) + {:ok, :newest_state} + end + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self()] + {:ok, pool} = P.start_link(opts) + + {:ok, conn} = P.checkout(pool, opts) + assert_raise RuntimeError, "oops", fn() -> P.commit_checkin!(conn, opts) end + + assert_receive :reconnected + + assert [ + connect: [_], + handle_commit: [_, :state], + disconnect: [_, :new_state], connect: [_]] = A.record(agent) end - test "transaction commit bad return raises and stops connection" do + test "checkout_begin bad return raises and stops connection" do stack = [ fn(opts) -> send(opts[:parent], {:hi, self()}) Process.link(opts[:parent]) {:ok, :state} end, - {:ok, :began, :new_state}, :oops, {:ok, :state} ] @@ -556,221 +743,121 @@ defmodule TransactionTest do opts = [agent: agent, parent: self()] {:ok, pool} = P.start_link(opts) - assert_receive {:hi, conn} + assert_receive {:hi, pid} Process.flag(:trap_exit, true) assert_raise DBConnection.ConnectionError, "bad return value: :oops", - fn() -> P.transaction(pool, fn(_) -> :result end) end + fn() -> P.checkout_begin(pool, opts) end prefix = "client #{inspect self()} stopped: " <> "** (DBConnection.ConnectionError) bad return value: :oops" len = byte_size(prefix) - assert_receive {:EXIT, ^conn, + assert_receive {:EXIT, ^pid, {%DBConnection.ConnectionError{message: <<^prefix::binary-size(len), _::binary>>}, [_|_]}} assert [ {:connect, _}, - {:handle_begin, [_, :state]}, - {:handle_commit, [_, :new_state]} | _] = A.record(agent) + {:handle_begin, [_, :state]}| _] = A.record(agent) end - test "transaction commit raise raises and stops connection" do + test "rollback_checkin bad return raises and stops connection" do stack = [ fn(opts) -> send(opts[:parent], {:hi, self()}) Process.link(opts[:parent]) {:ok, :state} end, - {:ok, :began, :new_state}, - fn(_, _) -> - raise "oops" - end, + :oops, {:ok, :state} ] {:ok, agent} = A.start_link(stack) opts = [agent: agent, parent: self()] {:ok, pool} = P.start_link(opts) - assert_receive {:hi, conn} + assert_receive {:hi, pid} + assert {:ok, conn} = P.checkout(pool, opts) Process.flag(:trap_exit, true) - assert_raise RuntimeError, "oops", - fn() -> P.transaction(pool, fn(_) -> :result end) end + assert_raise DBConnection.ConnectionError, "bad return value: :oops", + fn() -> P.rollback_checkin(conn, opts) end - prefix = "client #{inspect self()} stopped: ** (RuntimeError) oops" + prefix = "client #{inspect self()} stopped: " <> + "** (DBConnection.ConnectionError) bad return value: :oops" len = byte_size(prefix) - assert_receive {:EXIT, ^conn, + assert_receive {:EXIT, ^pid, {%DBConnection.ConnectionError{message: <<^prefix::binary-size(len), _::binary>>}, - [_|_]}} + [_|_]}} assert [ {:connect, _}, - {:handle_begin, [_, :state]}, - {:handle_commit, [_, :new_state]} | _] = A.record(agent) + {:handle_rollback, [_, :state]}| _] = A.record(agent) end - test "transaction logs commit raise" do + test "checkout_begin raise raises and stops connection" do stack = [ fn(opts) -> + send(opts[:parent], {:hi, self()}) Process.link(opts[:parent]) {:ok, :state} end, - {:ok, :began, :new_state}, fn(_, _) -> raise "oops" end, - {:ok, :state2} - ] - {:ok, agent} = A.start_link(stack) - - parent = self() - opts = [agent: agent, parent: parent] - _ = Process.flag(:trap_exit, true) - {:ok, pool} = P.start_link(opts) - - log = &send(parent, &1) - assert_raise RuntimeError, "oops", - fn() -> - P.transaction(pool, fn(_) -> :ok end, [log: log]) - end - - assert_received %DBConnection.LogEntry{call: :transaction} = entry - assert %{query: :begin, params: nil, result: {:ok, :began}} = entry - - assert_received %DBConnection.LogEntry{call: :transaction} = entry - assert %{query: :commit, params: nil, result: {:error, err}} = entry - assert %DBConnection.ConnectionError{message: "an exception was raised: ** (RuntimeError) oops" <> _} = err - assert is_nil(entry.pool_time) - assert is_integer(entry.connection_time) - assert entry.connection_time >= 0 - assert is_nil(entry.decode_time) - assert_receive {:EXIT, _, {%DBConnection.ConnectionError{}, [_|_]}} - - assert [ - {:connect, [_]}, - {:handle_begin, [_, :state]}, - {:handle_commit, [_, :new_state]} | _] = A.record(agent) - end - - test "transaction rollback error raises error" do - err = RuntimeError.exception("oops") - stack = [ - {:ok, :state}, - {:ok, :began, :new_state}, - {:error, err, :newer_state}, - {:ok, :began, :newest_state}, - {:ok, :rolledback, :newest_state} + {:ok, :state} ] {:ok, agent} = A.start_link(stack) opts = [agent: agent, parent: self()] {:ok, pool} = P.start_link(opts) + assert_receive {:hi, pid} + Process.flag(:trap_exit, true) assert_raise RuntimeError, "oops", - fn() -> P.transaction(pool, &P.rollback(&1, :oops)) end - - assert P.transaction(pool, fn(_) -> :result end) == {:ok, :result} + fn() -> P.checkout_begin(pool, opts) end - assert [ - connect: [_], - handle_begin: [_, :state], - handle_rollback: [_, :new_state], - handle_begin: [_, :newer_state], - handle_commit: [_, :newest_state]] = A.record(agent) - end - - test "transaction fun raise rolls back and re-raises" do - stack = [ - {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, :rolledback, :newer_state}, - ] - {:ok, agent} = A.start_link(stack) - - opts = [agent: agent, parent: self()] - {:ok, pool} = P.start_link(opts) - - assert_raise RuntimeError, "oops", - fn() -> P.transaction(pool, fn(_) -> raise "oops"end) end + prefix = "client #{inspect self()} stopped: ** (RuntimeError) oops" + len = byte_size(prefix) + assert_receive {:EXIT, ^pid, + {%DBConnection.ConnectionError{message: <<^prefix::binary-size(len), _::binary>>}, + [_|_]}} assert [ - connect: [_], - handle_begin: [_, :state], - handle_rollback: [_, :new_state]] = A.record(agent) + {:connect, _}, + {:handle_begin, [_, :state]} | _] = A.record(agent) end - test "transaction logs rollback raise" do + test "commit_checkin raise raises and stops connection" do stack = [ fn(opts) -> + send(opts[:parent], {:hi, self()}) Process.link(opts[:parent]) {:ok, :state} end, - {:ok, :began, :new_state}, fn(_, _) -> raise "oops" end, - {:ok, :state2} - ] - {:ok, agent} = A.start_link(stack) - - parent = self() - opts = [agent: agent, parent: parent] - _ = Process.flag(:trap_exit, true) - {:ok, pool} = P.start_link(opts) - - log = &send(parent, &1) - assert_raise RuntimeError, "oops", - fn() -> - P.transaction(pool, &P.rollback(&1, :oops), [log: log]) - end - - assert_received %DBConnection.LogEntry{call: :transaction} = entry - assert %{query: :begin, params: nil, result: {:ok, :began}} = entry - - assert_received %DBConnection.LogEntry{call: :transaction} = entry - assert %{query: :rollback, params: nil, result: {:error, err}} = entry - assert %DBConnection.ConnectionError{message: "an exception was raised: ** (RuntimeError) oops" <> _} = err - assert is_nil(entry.pool_time) - assert is_integer(entry.connection_time) - assert entry.connection_time >= 0 - assert is_nil(entry.decode_time) - assert_receive {:EXIT, _, {%DBConnection.ConnectionError{}, [_|_]}} - - assert [ - {:connect, [_]}, - {:handle_begin, [_, :state]}, - {:handle_rollback, [_, :new_state]} | _] = A.record(agent) - end - - test "transaction logs on fun raise" do - stack = [ - {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, :rolledback, :newer_state}, + {:ok, :state} ] {:ok, agent} = A.start_link(stack) - parent = self() - opts = [agent: agent, parent: parent] + opts = [agent: agent, parent: self()] {:ok, pool} = P.start_link(opts) + assert_receive {:hi, pid} - log = &send(parent, &1) - + assert {:ok, conn} = P.checkout(pool, opts) + Process.flag(:trap_exit, true) assert_raise RuntimeError, "oops", - fn() -> - P.transaction(pool, fn(_) -> - assert_received %DBConnection.LogEntry{call: :transaction, - query: :begin} - raise "oops" - end, [log: log]) - end + fn() -> P.commit_checkin(conn, opts) end - assert_received %DBConnection.LogEntry{call: :transaction, query: :rollback} + prefix = "client #{inspect self()} stopped: ** (RuntimeError) oops" + len = byte_size(prefix) + assert_receive {:EXIT, ^pid, + {%DBConnection.ConnectionError{message: <<^prefix::binary-size(len), _::binary>>}, + [_|_]}} assert [ - connect: [_], - handle_begin: [_, :state], - handle_rollback: [_, :new_state]] = A.record(agent) + {:connect, _}, + {:handle_commit, [_, :state]} | _] = A.record(agent) end end diff --git a/integration_test/tests.exs b/integration_test/tests.exs index f8ed4c1a..7b77389e 100644 --- a/integration_test/tests.exs +++ b/integration_test/tests.exs @@ -9,6 +9,6 @@ Code.require_file "cases/prepare_execute_test.exs", __DIR__ Code.require_file "cases/prepare_stream_test.exs", __DIR__ Code.require_file "cases/prepare_test.exs", __DIR__ Code.require_file "cases/queue_test.exs", __DIR__ +Code.require_file "cases/run_test.exs", __DIR__ Code.require_file "cases/stream_test.exs", __DIR__ -Code.require_file "cases/transaction_execute_test.exs", __DIR__ Code.require_file "cases/transaction_test.exs", __DIR__ diff --git a/lib/db_connection.ex b/lib/db_connection.ex index ebeb4f0a..557058f1 100644 --- a/lib/db_connection.ex +++ b/lib/db_connection.ex @@ -69,15 +69,15 @@ defmodule DBConnection do By default the `DBConnection` provides a single connection. However the `:pool` option can be set to use a pool of connections. If a pool is used the module must be passed as an option - unless inside a - `run/3` or `transaction/3` fun and using the run/transaction - connection reference (`t`). + `run/3` fun or checked out with `checkout/2` or `checkout_begin/2` + and using the connection reference (`t`). """ require Logger defstruct [:pool_mod, :pool_ref, :conn_mod, :conn_ref] @typedoc """ - Run or transaction connection reference. + Locked connection reference. """ @type t :: %__MODULE__{pool_mod: module, pool_ref: any, @@ -508,8 +508,8 @@ defmodule DBConnection do * `:queue` - Whether to block waiting in an internal queue for the connection's state (boolean, default: `true`) * `:timeout` - The maximum time that the caller is allowed the - to hold the connection's state (ignored when using a run/transaction - connection, default: `15_000`) + to hold the connection's state (ignored when using a locked connection, + default: `15_000`) * `:log` - A function to log information about a call, either a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t` prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) @@ -564,8 +564,8 @@ defmodule DBConnection do * `:queue` - Whether to block waiting in an internal queue for the connection's state (boolean, default: `true`) * `:timeout` - The maximum time that the caller is allowed the - to hold the connection's state (ignored when using a run/transaction - connection, default: `15_000`) + to hold the connection's state (ignored when using a locked connection, + default: `15_000`) * `:log` - A function to log information about a call, either a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t` prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) @@ -620,8 +620,8 @@ defmodule DBConnection do * `:queue` - Whether to block waiting in an internal queue for the connection's state (boolean, default: `true`) * `:timeout` - The maximum time that the caller is allowed the - to hold the connection's state (ignored when using a run/transaction - connection, default: `15_000`) + to hold the connection's state (ignored when using a locked connection, + default: `15_000`) * `:log` - A function to log information about a call, either a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t` prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) @@ -671,8 +671,8 @@ defmodule DBConnection do * `:queue` - Whether to block waiting in an internal queue for the connection's state (boolean, default: `true`) * `:timeout` - The maximum time that the caller is allowed the - to hold the connection's state (ignored when using a run/transaction - connection, default: `15_000`) + to hold the connection's state (ignored when using a locked connection, + default: `15_000`) * `:log` - A function to log information about a call, either a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t` prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) @@ -714,9 +714,7 @@ defmodule DBConnection do connection disconnects all future calls using that connection reference will fail. - `run/3` and `transaction/3` can be nested multiple times but a - `transaction/3` call inside another `transaction/3` will be treated - the same as `run/3`. + `run/3` can be nested multiple times. ### Options @@ -738,45 +736,134 @@ defmodule DBConnection do @spec run(conn, (t -> result), opts :: Keyword.t) :: result when result: var def run(conn, fun, opts \\ []) def run(%DBConnection{} = conn, fun, _) do - case get_info(conn) do - {_, _} -> + case fetch_conn(conn, nil) do + {:ok, _, _} -> fun.(conn) - :closed -> - raise DBConnection.ConnectionError, "connection is closed" + {:error, err, _} -> + raise err end end def run(pool, fun, opts) do + conn = checkout!(pool, opts) + try do + fun.(conn) + after + checkin(conn, opts) + end + end + @doc """ + Acquire a lock on a connection. + + Returns `{:ok, conn}` on success or `{;error, exception}` if there is an + error. + + To use the locked connection call requests with the connection + reference returned. If the connection disconnects all future calls using that + connection reference will fail. + + Use `checkin/2`, `commit_checkin/2` or `rollback_checkin/2` to release the + lock and return the connection to the pool. + + ### Options + + * `:pool_timeout` - The maximum time to wait for a reply when making a + synchronous call to the pool (default: `5_000`) + * `:queue` - Whether to block waiting in an internal queue for the + connection's state (boolean, default: `true`) + * `:timeout` - The maximum time that the caller is allowed the + to hold the connection's state (default: `15_000`) + + The pool may support other options. + + ### Example + + {:ok, conn} = DBConnection.checkout(pool) + try do + DBConnection.execute(conn, "SELECT * FROM table", []) + after + DBConnection.checkin(conn) + end + """ + @spec checkout(pool :: GenServer.t, opts :: Keyword.t) :: + {:ok, t} | {:error, Exception.t} + def checkout(pool, opts \\ []) do case checkout(pool, nil, opts) do - {:ok, conn, _, _} -> - try do - fun.(conn) - after - checkin(conn, opts) - end - {:error, err, _} -> + {:ok, conn, _conn_state, _meter} -> + {:ok, conn} + {:error, err, _meter} -> + {:error, err} + {kind, reason, stack, _meter} -> + :erlang.raise(kind, reason, stack) + end + end + + @doc """ + Acquire a lock on a connection. + + Returns the connection reference on success, and raises an exception on error. + See `checkout/2`. + """ + @spec checkout!(pool :: GenServer.t, opts :: Keyword.t) :: t + def checkout!(pool, opts \\ []) do + case checkout(pool, nil, opts) do + {:ok, conn, _conn_state, _meter} -> + conn + {:error, err, _meter} -> raise err - {kind, reason, stack, _} -> + {kind, reason, stack, _meter} -> :erlang.raise(kind, reason, stack) end end + @doc """ - Acquire a lock on a connection and run a series of requests inside a - transaction. The result of the transaction fun is return inside an `:ok` - tuple: `{:ok, result}`. + Release a lock on a connection. - To use the locked connection call the request with the connection - reference passed as the single argument to the `fun`. If the - connection disconnects all future calls using that connection - reference will fail. + Returns `:ok` on success, otherwise `{:error, exception}` on error. + + The pool may support options. + + ### Example + + {:ok, conn} = DBConnection.checkout(pool) + try do + DBConnection.execute(conn, "SELECT * FROM table", []) + after + DBConnection.checkin(conn) + end + """ + @spec checkin(t, opts :: Keyword.t) :: :ok | {:error, Exception.t} + def checkin(%DBConnection{} = conn, opts \\ []) do + case delete_conn(conn) do + {:ok, conn_state} -> + checkin(conn, conn_state, opts) + {:error, _} = error -> + error + end + end + + @doc """ + Release a lock on a connection. - `run/3` and `transaction/3` can be nested multiple times. If a transaction is - rolled back or a nested transaction `fun` raises the transaction is marked as - failed. All calls except `run/3`, `transaction/3`, `rollback/2`, `close/3` and - `close!/3` will raise an exception inside a failed transaction until the outer - transaction call returns. All `transaction/3` calls will return - `{:error, :rollback}` if the transaction failed or connection closed and - `rollback/2` is not called for that `transaction/3`. + Returns `:ok` on sucess, otherwsie raises an exception on error. + + See `checkin/2`. + """ + @spec checkin!(t, opts :: Keyword.t) :: :ok + def checkin!(conn, opts \\ []) do + case checkin(conn, opts) do + :ok -> + :ok + {:error, err} -> + raise err + end + end + + @doc """ + Begin a transaction. + + Return `{:ok, result}` on sucess or `{:error, exception}` if there was an + error. ### Options @@ -785,65 +872,342 @@ defmodule DBConnection do * `:queue` - Whether to block waiting in an internal queue for the connection's state (boolean, default: `true`) * `:timeout` - The maximum time that the caller is allowed the - to hold the connection's state (default: `15_000`) - * `:log` - A function to log information about begin, commit and rollback - calls made as part of the transaction, either a 1-arity fun, - `{module, function, args}` with `DBConnection.LogEntry.t` prepended to - `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) + to hold the connection's state (ignored when using a locked connection, + default: `15_000`) + * `:log` - A function to log information about a call, either + a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t` + prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) The pool and connection module may support other options. All options - are passed to `handle_begin/2`, `handle_commit/2` and - `handle_rollback/2`. + are passed to `handle_begin/2`. + + See `commit/2` and `rollback/2`. ### Example - {:ok, res} = DBConnection.transaction(conn, fn(conn) -> - DBConnection.execute!(conn, "SELECT id FROM table", []) - end) + {:ok, result} = DBConnection.begin(conn) + try do + DBConnection.execute(conn, "SELECT * FROM table", []) + after + DBConnection.commit(conn) + end """ - @spec transaction(conn, (conn -> result), opts :: Keyword.t) :: - {:ok, result} | {:error, reason :: any} when result: var - def transaction(conn, fun, opts) do - case run(conn, &run_transaction/5, fun, meter(opts), opts) do - {:commit, {:ok, res, return, meter}} -> - log(meter, :transaction, :commit, nil, {:ok, res}) - {:ok, return} - {:rollback, {:ok, res, reason, meter}} -> - log(meter, :transaction, :rollback, nil, {:ok, res}) - {:error, reason} - {:rollback, {:ok, res, kind, reason, stack, meter}} -> - log(meter, :transaction, :rollback, nil, {:ok, res}) - :erlang.raise(kind, reason, stack) - {query, {:error, err, meter}} when query != nil -> - log(meter, :transaction, query, nil, {:error, err}) + @spec begin(conn, opts :: Keyword.t) :: {:ok, result} | {:error, Exception.t} + def begin(conn, opts \\ []) do + conn + |> run(&run_begin/4, meter(opts), opts) + |> log(:begin, :begin, nil) + end + + @doc """ + Begin a transaction. + + Returns `result` on success, otherwise raises an exception on error. + + See `begin/2`. + """ + @spec begin!(conn, opts :: Keyword.t) :: result + def begin!(conn, opts \\ []) do + case begin(conn, opts) do + {:ok, result} -> + result + {:error, err} -> raise err - {query, return} -> - log(return, :transaction, query, nil) end end @doc """ - Rollback a transaction, does not return. + Acquire a lock on a connection and begin a transaction. + + Return `{:ok, conn, result}` on success or `{:error, exception}` if there was + an error. On error the connection is released back to the pool. + + ### Options + + * `:pool_timeout` - The maximum time to wait for a reply when making a + synchronous call to the pool (default: `5_000`) + * `:queue` - Whether to block waiting in an internal queue for the + connection's state (boolean, default: `true`) + * `:timeout` - The maximum time that the caller is allowed the + to hold the connection's state (ignored when using a locked connection, + default: `15_000`) + * `:log` - A function to log information about a call, either + a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t` + prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) - Aborts the current transaction fun. If inside `transaction/3` bubbles - up to the top level. + The pool and connection module may support other options. All options + are passed to `handle_begin/2`. + + See `commit_checkin/2` and `rollback_checkin/2`. ### Example - {:error, :bar} = DBConnection.transaction(conn, fn(conn) -> - DBConnection.rollback(conn, :bar) - IO.puts "never reaches here!" - end) + {:ok, conn, result} = DBConnection.checkout_begin(pool) + try do + DBConnection.execute(conn, "SELECT * FROM table", []) + after + DBConnection.commit_checkin(conn) + end + """ + @spec checkout_begin(GenServer.server, opts :: Keyword.t) :: + {:ok, conn, result} | {:error, Exception.t} + def checkout_begin(pool, opts \\ []) do + meter = meter(opts) + return = + with {:ok, conn, conn_state, meter} <- checkout(pool, meter, opts) do + case run_begin(conn, conn_state, meter, opts) do + {:ok, result, meter} -> + {:ok, conn, result, meter} + return -> + checkin(conn, opts) + return + end + end + log(return, :checkout_begin, :begin, nil) + end + + @doc """ + Acquire a lock on a connection and begin a transaction. + + Return `{:ok, conn, result}` on success or raise an exception if there was + an error. + + See `checkout_begin/2`. + """ + @spec checkout_begin!(GenServer.server, opts :: Keyword.t) :: {t, result} + def checkout_begin!(pool, opts \\ []) do + case checkout_begin(pool, opts) do + {:ok, conn, result} -> + {conn, result} + {:error, err} -> + raise err + end + end + + @doc """ + Commit a transaction. + + Return `{:ok, result}` on sucess or `{:error, exception}` if there was an + error. + + ### Options + + * `:pool_timeout` - The maximum time to wait for a reply when making a + synchronous call to the pool (default: `5_000`) + * `:queue` - Whether to block waiting in an internal queue for the + connection's state (boolean, default: `true`) + * `:timeout` - The maximum time that the caller is allowed the + to hold the connection's state (ignored when using a locked connection, + default: `15_000`) + * `:log` - A function to log information about a call, either + a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t` + prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) + + The pool and connection module may support other options. All options + are passed to `handle_commit/2`. + + See `begin/2` and `rollback/2`. + + ### Example + + {:ok, result} = DBConnection.begin(conn) + try do + DBConnection.execute(conn, "SELECT * FROM table", []) + after + DBConnection.commit(conn) + end + """ + @spec commit(conn, opts :: Keyword.t) :: {:ok, result} | {:error, Exception.t} + def commit(conn, opts \\ []) do + conn + |> run(&run_commit/4, meter(opts), opts) + |> log(:commit, :commit, nil) + end + + @doc """ + Commit a transaction. + + Returns `result` on success, otherwise raises an exception on error. + + See `commit/2`. + """ + @spec commit!(conn, opts :: Keyword.t) :: result + def commit!(conn, opts \\ []) do + case commit(conn, opts) do + {:ok, result} -> + result + {:error, err} -> + raise err + end + end + + @doc """ + Commit a transaction and release lock on a connection. + + Return `{:ok, result}` on success or `{:error, exception}` if there was an + error. On error the connection is not released back to the pool. + + ### Options + + * `:log` - A function to log information about a call, either + a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t` + prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) + + The pool and connection module may support other options. All options + are passed to `handle_commit/2`. + + See `checkout_begin/2` and `rollback_checkin/2`. + + ### Example + + {:ok, conn, result} = DBConnection.checkout_begin(pool) + try do + DBConnection.execute(conn, "SELECT * FROM table", []) + after + DBConnection.commit_checkin(conn) + end + """ + @spec commit_checkin(t, opts :: Keyword.t) :: + {:ok, result} | {:error, Exceptiont.t} + def commit_checkin(%DBConnection{} = conn, opts) do + return = + with {:ok, result, meter} + <- run(conn, &run_commit/4, meter(opts), opts) do + checkin(conn, opts) + {:ok, result, meter} + end + log(return, :commit_checkin, :commit, nil) + end + + @doc """ + Commit a transaction and release lock on a connection. + + Return `result` on success or raises an exception if there was an + error. On error the connection is not released back to the pool. + + See `commit_checkin/2`. + """ + @spec commit_checkin!(t, opts :: Keyword.t) :: result + def commit_checkin!(conn, opts \\ []) do + case commit_checkin(conn, opts) do + {:ok, result} -> + result + {:error, err} -> + raise err + end + end + + @doc """ + Rollback a transaction. + + Return `{:ok, result}` on sucess or `{:error, exception}` if there was an + error. + + ### Options + + * `:pool_timeout` - The maximum time to wait for a reply when making a + synchronous call to the pool (default: `5_000`) + * `:queue` - Whether to block waiting in an internal queue for the + connection's state (boolean, default: `true`) + * `:timeout` - The maximum time that the caller is allowed the + to hold the connection's state (ignored when using a locked connection, + default: `15_000`) + * `:log` - A function to log information about a call, either + a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t` + prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) + + The pool and connection module may support other options. All options + are passed to `handle_rollback/2`. + + See `begin/2` and `rollback/2`. + + ### Example + + {:ok, result} = DBConnection.begin(conn) + try do + DBConnection.execute(conn, "SELECT * FROM table", []) + after + DBConnection.rollback(conn) + end """ - @spec rollback(t, reason :: any) :: no_return - def rollback(%DBConnection{conn_ref: conn_ref} = conn, err) do - case get_info(conn) do - {transaction, _} when transaction in [:transaction, :failed] -> - throw({:rollback, conn_ref, err}) - {:idle, _} -> - raise "not inside transaction" - :closed -> - raise DBConnection.ConnectionError, "connection is closed" + @spec rollback(conn, opts :: Keyword.t) :: + {:ok, result} | {:error, Exception.t} + def rollback(conn, opts \\ []) do + conn + |> run(&run_rollback/4, meter(opts), opts) + |> log(:rollback, :rollback, nil) + end + + @doc """ + Rollback a transaction. + + Returns `result` on success, otherwise raises an exception on error. + + See `rollback/2`. + """ + @spec rollback!(conn, opts :: Keyword.t) :: result + def rollback!(conn, opts \\ []) do + case rollback(conn, opts) do + {:ok, result} -> + result + {:error, err} -> + raise err + end + end + + @doc """ + Rollback a transaction and release lock on a connection. + + Return `{:ok, result}` on success or `{:error, exception}` if there was an + error. On error the connection is not released back to the pool. + + ### Options + + * `:log` - A function to log information about a call, either + a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t` + prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) + + The pool and connection module may support other options. All options + are passed to `handle_rollback/2`. + + See `checkout_begin/2` and `commit_checkin/2`. + + ### Example + + {:ok, conn, result} = DBConnection.checkout_begin(pool) + try do + DBConnection.execute(conn, "SELECT * FROM table", []) + after + DBConnection.rollback_checkin(conn) + end + """ + @spec rollback_checkin(t, opts :: Keyword.t) :: + {:ok, result} | {:error, Exceptiont.t} + def rollback_checkin(%DBConnection{} = conn, opts) do + return = + with {:ok, result, meter} + <- run(conn, &run_rollback/4, meter(opts), opts) do + checkin(conn, opts) + {:ok, result, meter} + end + log(return, :rollback_checkin, :rollback, nil) + end + + @doc """ + Rollback a transaction and release lock on a connection. + + Return `result` on success or raises an exception if there was an + error. On error the connection is not released back to the pool. + + See `rollback_checkin/2`. + """ + @spec rollback_checkin!(conn, opts :: Keyword.t) :: result + def rollback_checkin!(conn, opts \\ []) do + case rollback_checkin(conn, opts) do + {:ok, result} -> + result + {:error, err} -> + raise err end end @@ -858,8 +1222,8 @@ defmodule DBConnection do * `:queue` - Whether to block waiting in an internal queue for the connection's state (boolean, default: `true`) * `:timeout` - The maximum time that the caller is allowed the - to hold the connection's state (ignored when using a run/transaction - connection, default: `15_000`) + to hold the connection's state (ignored when using a locked connection, + default: `15_000`) * `:log` - A function to log information about a call, either a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t` prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) @@ -870,7 +1234,7 @@ defmodule DBConnection do ### Example - {:ok, results} = DBConnection.transaction(conn, fn(conn) -> + DBConnection.run(conn, fn(conn) -> query = %Query{statement: "SELECT id FROM table"} stream = DBConnection.prepare_stream(conn, query, []) Enum.to_list(stream) @@ -894,8 +1258,8 @@ defmodule DBConnection do * `:queue` - Whether to block waiting in an internal queue for the connection's state (boolean, default: `true`) * `:timeout` - The maximum time that the caller is allowed the - to hold the connection's state (ignored when using a run/transaction - connection, default: `15_000`) + to hold the connection's state (ignored when using a locked connection, + default: `15_000`) * `:log` - A function to log information about a call, either a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t` prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) @@ -906,7 +1270,7 @@ defmodule DBConnection do ### Example - {:ok, results} = DBConnection.transaction(conn, fn(conn) -> + DBConnection.run(conn, fn(conn) -> query = %Query{statement: "SELECT id FROM table"} query = DBConnection.prepare!(conn, query) stream = DBConnection.stream(conn, query, []) @@ -935,6 +1299,7 @@ defmodule DBConnection do ## Helpers defp checkout(pool, meter, opts) do + meter = event(meter, :checkout) pool_mod = Keyword.get(opts, :pool, DBConnection.Connection) try do apply(pool_mod, :checkout, [pool, opts]) @@ -946,38 +1311,29 @@ defmodule DBConnection do {:ok, pool_ref, conn_mod, conn_state} -> conn = %DBConnection{pool_mod: pool_mod, pool_ref: pool_ref, conn_mod: conn_mod, conn_ref: make_ref()} - put_info(conn, :idle, conn_state) - {:ok, conn, {:idle, conn_state}, meter} + put_conn(conn, conn_state) + {:ok, conn, conn_state, meter} {:error, err} -> {:error, err, meter} end end - defp checkin(conn, opts) do - case delete_info(conn) do - {:idle, conn_state} -> - %DBConnection{pool_mod: pool_mod, pool_ref: pool_ref} = conn - _ = apply(pool_mod, :checkin, [pool_ref, conn_state, opts]) - :ok - {trans, _} = conn_info when trans in [:transaction, :failed] -> - msg = "check in during transaction" - err = DBConnection.ConnectionError.exception(msg) - delete_disconnect(conn, conn_info, err, opts) - :closed -> - :ok - end + defp checkin(conn, conn_state, opts) do + %DBConnection{pool_mod: pool_mod, pool_ref: pool_ref} = conn + _ = apply(pool_mod, :checkin, [pool_ref, conn_state, opts]) + :ok end - defp delete_disconnect(conn, {_, conn_state}, err, opts) do - _ = delete_info(conn) + defp delete_disconnect(conn, conn_state, err, opts) do + _ = delete_conn(conn) %DBConnection{pool_mod: pool_mod, pool_ref: pool_ref} = conn args = [pool_ref, err, conn_state, opts] _ = apply(pool_mod, :disconnect, args) :ok end - defp delete_stop(conn, {_, conn_state}, kind, reason, stack, opts) do - _ = delete_info(conn) + defp delete_stop(conn, conn_state, kind, reason, stack, opts) do + _ = delete_conn(conn) msg = "client #{inspect self()} stopped: " <> Exception.format(kind, reason, stack) exception = DBConnection.ConnectionError.exception(msg) @@ -987,39 +1343,23 @@ defmodule DBConnection do :ok end - defp handle(_conn, :closed, _fun, _args, meter, _opts) do - err = DBConnection.ConnectionError.exception("connection is closed") - {:error, err, meter} - end - defp handle(_conn, {:failed, _}, _fun, _args, meter, _opts) do - err = DBConnection.ConnectionError.exception("transaction rolling back") - {:error, err, meter} - end - defp handle(conn, {status, _} = conn_info, fun, args, meter, opts) do - handle(conn, conn_info, fun, args, meter, opts, status) - end - - defp handle(conn, conn_info, fun, args, meter, opts, next) do + defp handle(conn, conn_state, fun, args, meter, opts) do %DBConnection{conn_mod: conn_mod} = conn - {status, conn_state} = conn_info try do apply(conn_mod, fun, args ++ [opts, conn_state]) else {:ok, result, conn_state} -> - put_info(conn, next, conn_state) + put_conn(conn, conn_state) {:ok, result, meter} {:deallocate, result, conn_state} when fun in [:handle_first, :handle_next] -> - put_info(conn, next, conn_state) + put_conn(conn, conn_state) {:deallocate, result, meter} - {:error, err, conn_state} when fun == :handle_begin -> - put_info(conn, status, conn_state) - {:error, err, meter} - {:error, err, conn_state} when fun != :handle_begin -> - put_info(conn, next, conn_state) + {:error, err, conn_state} -> + put_conn(conn, conn_state) {:error, err, meter} {:disconnect, err, conn_state} -> - delete_disconnect(conn, {status, conn_state}, err, opts) + delete_disconnect(conn, conn_state, err, opts) {:error, err, meter} other -> try do @@ -1027,13 +1367,13 @@ defmodule DBConnection do catch :error, reason -> stack = System.stacktrace() - delete_stop(conn, conn_info, :error, reason, stack, opts) + delete_stop(conn, conn_state, :error, reason, stack, opts) {:error, reason, stack, meter} end catch kind, reason -> stack = System.stacktrace() - delete_stop(conn, conn_info, kind, reason, stack, opts) + delete_stop(conn, conn_state, kind, reason, stack, opts) {kind, reason, stack, meter} end end @@ -1104,64 +1444,69 @@ defmodule DBConnection do end end - defp run_prepare(conn, conn_info, query, meter, opts) do - with {:ok, query, meter} <- prepare(conn, conn_info, query, meter, opts) do + defp run_prepare(conn, conn_state, query, meter, opts) do + with {:ok, query, meter} <- prepare(conn, conn_state, query, meter, opts) do describe(conn, query, meter, opts) end end - defp prepare(conn, conn_info, query, meter, opts) do + defp prepare(conn, conn_state, query, meter, opts) do meter = event(meter, :prepare) - handle(conn, conn_info, :handle_prepare, [query], meter, opts) + handle(conn, conn_state, :handle_prepare, [query], meter, opts) end - defp run_prepare_execute(conn, conn_info, query, params, meter, opts) do - with {:ok, query, meter} <- prepare(conn, conn_info, query, meter, opts), - {:ok, query, meter} <- describe(conn, query, meter, opts), + defp run_prepare_execute(conn, conn_state, query, params, meter, opts) do + with {:ok, query, meter} + <- run_prepare(conn, conn_state, query, meter, opts), {:ok, params, meter} <- encode(conn, query, params, meter, opts), - {:ok, result, meter} <- execute(conn, query, params, meter, opts) do + {:ok, conn_state, meter} <- fetch_conn(conn, meter), + {:ok, result, meter} + <- run_execute(conn, conn_state, query, params, meter, opts) do {:ok, query, result, meter} end end - defp execute(conn, query, params, meter, opts) do - run_execute(conn, get_info(conn), query, params, meter, opts) - end - - defp run_execute(conn, conn_info, query, params, meter, opts) do + defp run_execute(conn, conn_state, query, params, meter, opts) do meter = event(meter, :execute) - handle(conn, conn_info, :handle_execute, [query, params], meter, opts) + handle(conn, conn_state, :handle_execute, [query, params], meter, opts) end defp raised_close(conn, query, meter, opts, kind, reason, stack) do - conn_info = get_info(conn) - with {:ok, _, meter} <- run_close(conn, conn_info, query, meter, opts) do + with {:ok, conn_state, meter} <- fetch_conn(conn, meter), + {:ok, _, meter} <- run_close(conn, conn_state, query, meter, opts) do {kind, reason, stack, meter} end end - defp run_close(conn, conn_info, query, meter, opts) do + defp run_close(conn, conn_state, query, meter, opts) do meter = event(meter, :close) - handle_cleanup(conn, conn_info, :handle_close, [query], meter, opts) + handle(conn, conn_state, :handle_close, [query], meter, opts) end - defp handle_cleanup(_conn, :closed, _callback, _args, meter, _opts) do - err = DBConnection.ConnectionError.exception("connection is closed") - {:error, err, meter} + defp run(%DBConnection{} = conn, fun, meter, opts) do + with {:ok, conn_state, meter} <- fetch_conn(conn, meter) do + fun.(conn, conn_state, meter, opts) + end end - defp handle_cleanup(conn, conn_info, callback, args, meter, opts) do - {status, _} = conn_info - handle(conn, conn_info, callback, args, meter, opts, status) + defp run(pool, fun, meter, opts) do + with {:ok, conn, conn_state, meter} <- checkout(pool, meter, opts) do + try do + fun.(conn, conn_state, meter, opts) + after + checkin(conn, opts) + end + end end defp run(%DBConnection{} = conn, fun, arg, meter, opts) do - fun.(conn, get_info(conn), arg, meter, opts) + with {:ok, conn_state, meter} <- fetch_conn(conn, meter) do + fun.(conn, conn_state, arg, meter, opts) + end end defp run(pool, fun, arg, meter, opts) do - meter = event(meter, :checkout) - with {:ok, conn, conn_info, meter} <- checkout(pool, meter, opts) do + with {:ok, conn, conn_state, meter} <- checkout(pool, meter, opts) do try do - fun.(conn, conn_info, arg, meter, opts) + fun.(conn, conn_state, arg, meter, opts) after checkin(conn, opts) end @@ -1169,13 +1514,14 @@ defmodule DBConnection do end defp run(%DBConnection{} = conn, fun, arg1, arg2, meter, opts) do - fun.(conn, get_info(conn), arg1, arg2, meter, opts) + with {:ok, conn_state, meter} <- fetch_conn(conn, meter) do + fun.(conn, conn_state, arg1, arg2, meter, opts) + end end defp run(pool, fun, arg1, arg2, meter, opts) do - meter = event(meter, :checkout) - with {:ok, conn, conn_info, meter} <- checkout(pool, meter, opts) do + with {:ok, conn, conn_state, meter} <- checkout(pool, meter, opts) do try do - fun.(conn, conn_info, arg1, arg2, meter, opts) + fun.(conn, conn_state, arg1, arg2, meter, opts) after checkin(conn, opts) end @@ -1245,135 +1591,19 @@ defmodule DBConnection do :ok end - defp run_transaction(conn, {:transaction, _}, fun, _meter, _opts) do - %DBConnection{conn_ref: conn_ref} = conn - try do - fun.(conn) - catch - :throw, {:rollback, ^conn_ref, reason} -> - fail(conn) - {nil, {:error, reason, nil}} - kind, reason -> - stack = System.stacktrace() - fail(conn) - {nil, {kind, reason, stack, nil}} - else - result -> - {nil, conclude(conn, result)} - end - end - defp run_transaction(conn, {:failed, _}, fun, _meter, _opts) do - %DBConnection{conn_ref: conn_ref} = conn - try do - fun.(conn) - catch - :throw, {:rollback, ^conn_ref, reason} -> - {nil, {:error, reason, nil}} - kind, reason -> - stack = System.stacktrace() - {nil, {kind, reason, stack, nil}} - else - _result -> - {nil, {:error, :rollback, nil}} - end - end - defp run_transaction(conn, conn_info, fun, meter, opts) do - case run_begin(conn, conn_info, meter, opts) do - {:ok, res, meter} -> - %DBConnection{conn_ref: conn_ref} = conn - try do - log(meter, :transaction, :begin, nil, {:ok, res}) - fun.(conn) - catch - :throw, {:rollback, ^conn_ref, reason} -> - run_rollback(conn, get_info(conn), reason, meter(opts), opts) - kind, reason -> - stack = System.stacktrace() - raised_rollback(conn, opts, kind, reason, stack) - else - result -> - run_commit(conn, get_info(conn), result, meter(opts), opts) - end - {:error, err, meter} -> - {:begin, {:error, err, meter}} - {kind, reason, stack, meter} -> - {:begin, {kind, reason, stack, meter}} - end - end - - defp fail(conn) do - case get_info(conn) do - {:transaction, conn_state} -> - put_info(conn, :failed, conn_state) - _ -> - :ok - end - end - - defp conclude(conn, result) do - case get_info(conn) do - {:transaction, _} -> - {:ok, result, nil} - _ -> - {:error, :rollback, nil} - end - end - - defp run_begin(conn, conn_info, meter, opts) do + defp run_begin(conn, conn_state, meter, opts) do meter = event(meter, :begin) - handle_trans(conn, conn_info, :handle_begin, meter, opts, :transaction) - end - - defp run_rollback(_conn, :closed, reason, _meter, _opts) do - {nil, {:error, reason, nil}} - end - defp run_rollback(conn, conn_info, reason, meter, opts) do - meter = event(meter, :rollback) - case handle_trans(conn, conn_info, :handle_rollback, meter, opts, :idle) do - {:ok, res, meter} -> - {:rollback, {:ok, res, reason, meter}} - return -> - {:rollback, return} - end - end - - defp raised_rollback(conn, opts, kind, reason, stack) do - case run_rollback(conn, get_info(conn), :rollback, meter(opts), opts) do - {:rollback, {:ok, res, _, meter}} -> - {:rollback, {:ok, res, kind, reason, stack, meter}} - other -> - other - end + handle(conn, conn_state, :handle_begin, [], meter, opts) end - defp run_commit(conn, {:transaction, _} = conn_info, return, meter, opts) do + defp run_commit(conn, conn_state, meter, opts) do meter = event(meter, :commit) - case handle_trans(conn, conn_info, :handle_commit, meter, opts, :idle) do - {:ok, res, meter} -> - {:commit, {:ok, res, return, meter}} - return -> - {:commit, return} - end - end - defp run_commit(conn, conn_info, _return, meter, opts) do - run_rollback(conn, conn_info, :rollback, meter, opts) + handle(conn, conn_state, :handle_commit, [], meter, opts) end - defp handle_trans(_conn, :closed, _callback, meter, _opts, _status) do - err = DBConnection.ConnectionError.exception("connection is closed") - {:error, err, meter} - end - defp handle_trans(_conn, {status, _}, _callback, meter, _opts, :transaction) - when status in [:transaction, :failed] do - err = DBConnection.ConnectionError.exception("inside transaction") - {:error, err, meter} - end - defp handle_trans(_conn, {:idle, _}, _callback, meter, _opts, :idle) do - err = DBConnection.ConnectionError.exception("outside transaction") - {:error, err, meter} - end - defp handle_trans(conn, conn_info, callback, meter, opts, status) do - handle(conn, conn_info, callback, [], meter, opts, status) + defp run_rollback(conn, conn_state, meter, opts) do + meter = event(meter, :rollback) + handle(conn, conn_state, :handle_rollback, [], meter, opts) end defp prepare_declare(conn, query, params, opts) do @@ -1389,11 +1619,13 @@ defmodule DBConnection do end end - defp run_prepare_declare(conn, conn_info, query, params, meter, opts) do - with {:ok, query, meter} <- prepare(conn, conn_info, query, meter, opts), - {:ok, query, meter} <- describe(conn, query, meter, opts), + defp run_prepare_declare(conn, conn_state, query, params, meter, opts) do + with {:ok, query, meter} + <- run_prepare(conn, conn_state, query, meter, opts), {:ok, params, meter} <- encode(conn, query, params, meter, opts), - {:ok, cursor, meter} <- declare(conn, query, params, meter, opts) do + {:ok, conn_state, meter} <- fetch_conn(conn, meter), + {:ok, cursor, meter} + <- run_declare(conn, conn_state, query, params, meter, opts) do {:ok, query, cursor, meter} end end @@ -1411,13 +1643,9 @@ defmodule DBConnection do end end - defp declare(conn, query, params, meter, opts) do - run_declare(conn, get_info(conn), query, params, meter, opts) - end - - defp run_declare(conn, conn_info, query, params, meter, opts) do + defp run_declare(conn, conn_state, query, params, meter, opts) do meter = event(meter, :declare) - handle(conn, conn_info, :handle_declare, [query, params], meter, opts) + handle(conn, conn_state, :handle_declare, [query, params], meter, opts) end defp fetch(conn, {:first, query, cursor}, opts) do @@ -1431,11 +1659,14 @@ defmodule DBConnection do end defp fetch(conn, fun, call, query, cursor, opts) do + args = [query, cursor] result = - conn - |> run(&run_fetch/6, fun, [query, cursor], meter(opts), opts) - |> log(call, query, cursor) - case result do + with {ok, result, meter} when ok in [:ok, :deallocate] + <- run(conn, &run_fetch/6, {fun, call}, args, meter(opts), opts), + {:ok, result, meter} <- decode(query, result, meter, opts) do + {ok, result, meter} + end + case log(result, call, query, cursor) do {:ok, result} -> {[result], {:next, query, cursor}} {:deallocate, result} -> @@ -1445,32 +1676,32 @@ defmodule DBConnection do end end - defp run_fetch(conn, conn_info, fun, [query, _] = args, meter, opts) do - meter = event(meter, :fetch) - with {ok, result, meter} when ok in [:ok, :deallocate] - <- handle(conn, conn_info, fun, args, meter, opts), - {:ok, result, meter} <- decode(query, result, meter, opts) do - {ok, result, meter} - end + defp run_fetch(conn, conn_state, {fun, call}, args, meter, opts) do + meter = event(meter, call) + handle(conn, conn_state, fun, args, meter, opts) end defp deallocate(conn, {_, query, cursor}, opts) do - result = - conn - |> run(&run_deallocate/6, query, cursor, meter(opts), opts) - |> log(:deallocate, query, cursor) - case result do + case fetch_conn(conn, nil) do + {:ok, _conn_state, _meter} -> + deallocate(conn, query, cursor, opts) + {:error, _err, _meter} -> + :ok + end + end + + defp deallocate(conn, query, cursor, opts) do + result = run(conn, &run_deallocate/6, query, cursor, meter(opts), opts) + case log(result, :deallocate, query, cursor) do {:ok, _} -> :ok {:error, err} -> raise err end end - defp run_deallocate(_conn, :closed, _query, _cursor, _meter, _opts), - do: {:ok, nil, nil} - defp run_deallocate(conn, conn_info, query, cursor, meter, opts) do + defp run_deallocate(conn, conn_state, query, cursor, meter, opts) do meter = event(meter, :deallocate) args = [query, cursor] - handle_cleanup(conn, conn_info, :handle_deallocate, args, meter, opts) + handle(conn, conn_state, :handle_deallocate, args, meter, opts) end defp resource(%DBConnection{} = conn, start, next, stop, opts) do @@ -1480,15 +1711,29 @@ defmodule DBConnection do Stream.resource(start, next, stop) end - defp put_info(conn, status, conn_state) do - _ = Process.put(key(conn), {status, conn_state}) + defp put_conn(conn, conn_state) do + _ = Process.put(key(conn), {:ok, conn_state}) :ok end - defp get_info(conn), do: Process.get(key(conn), :closed) + defp fetch_conn(conn, meter) do + case Process.get(key(conn)) do + {:ok, conn_state} -> + {:ok, conn_state, meter} + nil -> + msg = "connection is closed" + {:error, DBConnection.ConnectionError.exception(msg), meter} + end + end - defp delete_info(conn) do - Process.delete(key(conn)) || :closed + defp delete_conn(conn) do + case Process.delete(key(conn)) do + {:ok, _conn_state} = ok -> + ok + nil -> + msg = "connection is closed" + {:error, DBConnection.ConnectionError.exception(msg)} + end end defp key(%DBConnection{conn_ref: conn_ref}), do: {__MODULE__, conn_ref} diff --git a/test/test_support.exs b/test/test_support.exs index cce1cd73..1ba72a4d 100644 --- a/test/test_support.exs +++ b/test/test_support.exs @@ -15,11 +15,13 @@ defmodule TestConnection do DBConnection.run(pool, fun, opts2 ++ unquote(opts)) end - def transaction(pool, fun, opts2 \\ []) do - DBConnection.transaction(pool, fun, opts2 ++ unquote(opts)) + def checkout(pool, opts2 \\ []) do + DBConnection.checkout(pool, opts2 ++ unquote(opts)) end - defdelegate rollback(conn, reason), to: DBConnection + def checkin(conn, opts2 \\ []) do + DBConnection.checkin(conn, opts2 ++ unquote(opts)) + end def prepare(pool, query, opts2 \\ []) do DBConnection.prepare(pool, query, opts2 ++ unquote(opts)) @@ -63,6 +65,54 @@ defmodule TestConnection do DBConnection.close!(pool, query, opts2 ++ unquote(opts)) end + def begin(pool, opts2 \\ []) do + DBConnection.begin(pool, opts2 ++ unquote(opts)) + end + + def begin!(pool, opts2 \\ []) do + DBConnection.begin!(pool, opts2 ++ unquote(opts)) + end + + def commit(pool, opts2 \\ []) do + DBConnection.commit(pool, opts2 ++ unquote(opts)) + end + + def commit!(pool, opts2 \\ []) do + DBConnection.commit!(pool, opts2 ++ unquote(opts)) + end + + def rollback(pool, opts2 \\ []) do + DBConnection.rollback(pool, opts2 ++ unquote(opts)) + end + + def rollback!(pool, opts2 \\ []) do + DBConnection.rollback!(pool, opts2 ++ unquote(opts)) + end + + def checkout_begin(pool, opts2 \\ []) do + DBConnection.checkout_begin(pool, opts2 ++ unquote(opts)) + end + + def checkout_begin!(pool, opts2 \\ []) do + DBConnection.checkout_begin!(pool, opts2 ++ unquote(opts)) + end + + def commit_checkin(pool, opts2 \\ []) do + DBConnection.commit_checkin(pool, opts2 ++ unquote(opts)) + end + + def commit_checkin!(pool, opts2 \\ []) do + DBConnection.commit_checkin!(pool, opts2 ++ unquote(opts)) + end + + def rollback_checkin(pool, opts2 \\ []) do + DBConnection.rollback_checkin(pool, opts2 ++ unquote(opts)) + end + + def rollback_checkin!(pool, opts2 \\ []) do + DBConnection.rollback_checkin!(pool, opts2 ++ unquote(opts)) + end + defoverridable [start_link: 1] end end From 8930aac569d086dcf60bb80b413b86922044ae10 Mon Sep 17 00:00:00 2001 From: James Fish Date: Wed, 5 Jul 2017 11:10:17 +0100 Subject: [PATCH 2/2] Improve pool/transaction function docs --- lib/db_connection.ex | 106 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 84 insertions(+), 22 deletions(-) diff --git a/lib/db_connection.ex b/lib/db_connection.ex index 557058f1..7ada896f 100644 --- a/lib/db_connection.ex +++ b/lib/db_connection.ex @@ -751,15 +751,21 @@ defmodule DBConnection do checkin(conn, opts) end end + @doc """ Acquire a lock on a connection. - Returns `{:ok, conn}` on success or `{;error, exception}` if there is an + Returns `{:ok, conn}` on success or `{:error, exception}` if there is an error. - To use the locked connection call requests with the connection - reference returned. If the connection disconnects all future calls using that - connection reference will fail. + The `conn` is a connection reference that can be used as the first argument to + other requests (except `checkout/2` and `checkout_begin/2`). If the connection + disconnects all future calls using that connection reference will return (or + raise if bang(!) variant) a `%DBConnection.ConnectionError{}` error. + + Multiple `checkout/2` (and `checkout_begin/2`) calls on the same pool will + checkout multiple connections. The caller is responsible for checking all + connections in. Use `checkin/2`, `commit_checkin/2` or `rollback_checkin/2` to release the lock and return the connection to the pool. @@ -783,6 +789,10 @@ defmodule DBConnection do after DBConnection.checkin(conn) end + + {:ok, conn1} = DBConnection.checkout(pool) + {:ok, conn2} = DBConnection.checkout(pool) + true = (conn1 != conn2) """ @spec checkout(pool :: GenServer.t, opts :: Keyword.t) :: {:ok, t} | {:error, Exception.t} @@ -819,7 +829,12 @@ defmodule DBConnection do @doc """ Release a lock on a connection. - Returns `:ok` on success, otherwise `{:error, exception}` on error. + Returns `:ok` if the connection is still connected and was checked in, + otherwise if the connection is disconnected returns + `{:error, %DBConnection.ConnectionError{}}`. If a connection has already been + checked in, or previously disconnected, then the lock has been released and it + will be treated as disconnected to the caller. Other exceptions may be + returned in the error tuple in future. The pool may support options. @@ -831,6 +846,7 @@ defmodule DBConnection do after DBConnection.checkin(conn) end + {:error, %DBConnection.ConnectionError{}} = DBConnection.checkin(conn) """ @spec checkin(t, opts :: Keyword.t) :: :ok | {:error, Exception.t} def checkin(%DBConnection{} = conn, opts \\ []) do @@ -845,7 +861,9 @@ defmodule DBConnection do @doc """ Release a lock on a connection. - Returns `:ok` on sucess, otherwsie raises an exception on error. + Returns `:ok` if the connection is still connected and was checked in, + otherwise if the connection is disconnected raises a + `DBConnection.ConnectionError`. Other exceptions may be raised in future. See `checkin/2`. """ @@ -860,11 +878,15 @@ defmodule DBConnection do end @doc """ - Begin a transaction. + Execute a begin request. - Return `{:ok, result}` on sucess or `{:error, exception}` if there was an + Return `{:ok, result}` on success or `{:error, exception}` if there was an error. + It is possible to issue multiple begin requests without a commit (`commit/2` + or `commit_checkin/2`) or rollback (`rollback/2` and `rollback_checkin/2`). + The semantics are left to the callback implementation + ### Options * `:pool_timeout` - The maximum time to wait for a reply when making a @@ -900,9 +922,9 @@ defmodule DBConnection do end @doc """ - Begin a transaction. + Execute a begin request. - Returns `result` on success, otherwise raises an exception on error. + Returns the `result` on success, otherwise raises an exception on error. See `begin/2`. """ @@ -917,10 +939,26 @@ defmodule DBConnection do end @doc """ - Acquire a lock on a connection and begin a transaction. + Acquire a lock on a connection and execute a begin request. Return `{:ok, conn, result}` on success or `{:error, exception}` if there was - an error. On error the connection is released back to the pool. + an error. If an error is returned the connection is released back to the pool. + + The `conn` is a connection reference that can be used as the first argument to + other requests (except `checkout/2` and `checkout_begin/2`). If the connection + disconnects all future calls using that connection reference will return (or + raise if bang(!) variant) a `%DBConnection.ConnectionError{}` error. + + Use `checkin/2`, `commit_checkin/2` or `rollback_checkin/2` to release the + lock and return the connection to the pool. + + Multiple `checkout_begin/2` (and `checkout/2`) calls on the same pool will + checkout multiple connections. The caller is responsible for checking all + connections in. + + It is possible to issue multiple begin requests without a commit (`commit/2` + or `commit_checkin/2`) or rollback (`rollback/2` and `rollback_checkin/2`). + The semantics are left to the callback implementation. ### Options @@ -985,11 +1023,15 @@ defmodule DBConnection do end @doc """ - Commit a transaction. + Execute a commit request. - Return `{:ok, result}` on sucess or `{:error, exception}` if there was an + Return `{:ok, result}` on success or `{:error, exception}` if there was an error. + It is possible to issue multiple commit requests without a begin (`begin/2` + or `checkout_begin/2`). The semantics are left to the callback + implementation. + ### Options * `:pool_timeout` - The maximum time to wait for a reply when making a @@ -1042,11 +1084,15 @@ defmodule DBConnection do end @doc """ - Commit a transaction and release lock on a connection. + Execute a commit request, and if successful release the lock on connection. Return `{:ok, result}` on success or `{:error, exception}` if there was an error. On error the connection is not released back to the pool. + It is possible to issue multiple commit requests without a begin (`begin/2` + or `checkout_begin/2`). The semantics are left to the callback + implementation. + ### Options * `:log` - A function to log information about a call, either @@ -1080,11 +1126,15 @@ defmodule DBConnection do end @doc """ - Commit a transaction and release lock on a connection. + Execute a commit request, and if successful release the lock on connection. Return `result` on success or raises an exception if there was an error. On error the connection is not released back to the pool. + It is possible to issue multiple commit requests without a begin (`begin/2` + or `checkout_begin/2`). The semantics are left to the callback + implementation. + See `commit_checkin/2`. """ @spec commit_checkin!(t, opts :: Keyword.t) :: result @@ -1098,11 +1148,15 @@ defmodule DBConnection do end @doc """ - Rollback a transaction. + Execute a rollback request. - Return `{:ok, result}` on sucess or `{:error, exception}` if there was an + Return `{:ok, result}` on success or `{:error, exception}` if there was an error. + It is possible to issue multiple rollbacks requests without a begin (`begin/2` + or `checkout_begin/2`). The semantics are left to the callback + implementation. + ### Options * `:pool_timeout` - The maximum time to wait for a reply when making a @@ -1119,7 +1173,7 @@ defmodule DBConnection do The pool and connection module may support other options. All options are passed to `handle_rollback/2`. - See `begin/2` and `rollback/2`. + See `begin/2` and `commit/2`. ### Example @@ -1143,7 +1197,7 @@ defmodule DBConnection do Returns `result` on success, otherwise raises an exception on error. - See `rollback/2`. + See `rollback2`. """ @spec rollback!(conn, opts :: Keyword.t) :: result def rollback!(conn, opts \\ []) do @@ -1156,11 +1210,15 @@ defmodule DBConnection do end @doc """ - Rollback a transaction and release lock on a connection. + Execute a rollback request, and if successful release the lock on connection. Return `{:ok, result}` on success or `{:error, exception}` if there was an error. On error the connection is not released back to the pool. + It is possible to issue multiple rollback requests without a begin (`begin/2` + or `checkout_begin/2`). The semantics are left to the callback + implementation. + ### Options * `:log` - A function to log information about a call, either @@ -1194,11 +1252,15 @@ defmodule DBConnection do end @doc """ - Rollback a transaction and release lock on a connection. + Execute a rollback request, and if successful release the lock on connection. Return `result` on success or raises an exception if there was an error. On error the connection is not released back to the pool. + It is possible to issue multiple rollback requests without a begin (`begin/2` + or `checkout_begin/2`). The semantics are left to the callback + implementation. + See `rollback_checkin/2`. """ @spec rollback_checkin!(conn, opts :: Keyword.t) :: result