diff --git a/integration_test/cases/close_test.exs b/integration_test/cases/close_test.exs index 0c2da101..af843fd0 100644 --- a/integration_test/cases/close_test.exs +++ b/integration_test/cases/close_test.exs @@ -154,4 +154,31 @@ defmodule CloseTest do connect: [_], handle_close: [%Q{}, _, :state]] = A.record(agent) end + + test "close in failed transaction" do + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:ok, :result, :newer_state}, + {:ok, :rolledback, :newest_state} + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self()] + {:ok, pool} = P.start_link(opts) + + assert P.transaction(pool, fn(conn) -> + assert P.transaction(conn, fn(conn2) -> + P.rollback(conn2, :oops) + end) == {:error, :oops} + + assert P.close(conn, %Q{}, opts) == {:ok, :result} + end) == {:error, :rollback} + + assert [ + connect: [_], + handle_begin: [_, :state], + handle_close: [%Q{}, _, :new_state], + handle_rollback: [_, :newer_state]] = A.record(agent) + end end diff --git a/integration_test/cases/stream_test.exs b/integration_test/cases/stream_test.exs index 4bb92e43..54a23e49 100644 --- a/integration_test/cases/stream_test.exs +++ b/integration_test/cases/stream_test.exs @@ -241,6 +241,39 @@ defmodule StreamTest do ] = A.record(agent) end + test "stream deallocates after transaction failed" do + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:ok, %C{}, :newer_state}, + {:ok, %R{}, :newest_state}, + {:ok, :deallocated, :state2}, + {:ok, :rolledback, :new_state2} + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + + assert P.transaction(pool, fn(conn) -> + stream = P.stream(conn, %Q{}, [:param], [log: &send(parent, &1)]) + catch_throw(Enum.map(stream, fn(%R{}) -> + {:error, :oops} = P.transaction(conn, &P.rollback(&1, :oops)) + throw(:escape) + end)) + end) == {:error, :rollback} + + assert [ + connect: [_], + handle_begin: [_, :state], + handle_declare: [%Q{}, [:param], _, :new_state], + handle_first: [%Q{}, %C{}, _, :newer_state], + handle_deallocate: [%Q{}, %C{}, _, :newest_state], + handle_rollback: [_, :state2] + ] = A.record(agent) + end + test "stream logs deallocate disconnect" do err = RuntimeError.exception("oops") stack = [ diff --git a/integration_test/cases/transaction_test.exs b/integration_test/cases/transaction_test.exs index 1576f2d5..371263aa 100644 --- a/integration_test/cases/transaction_test.exs +++ b/integration_test/cases/transaction_test.exs @@ -117,7 +117,7 @@ defmodule TransactionTest do handle_commit: [_, :newest_state]] = A.record(agent) end - test "inner transaction rollback returns error on outer transaction" do + test "inner transaction rollback returns error on other transactions" do stack = [ {:ok, :state}, {:ok, :began, :new_state}, @@ -135,8 +135,7 @@ defmodule TransactionTest do P.rollback(conn2, :oops) end) == {:error, :oops} - assert_raise DBConnection.ConnectionError, "transaction rolling back", - fn() -> P.transaction(conn, fn(_) -> nil end) end + assert P.transaction(conn, fn(_) -> nil end) == {:error, :rollback} end) == {:error, :rollback} assert P.transaction(pool, fn(_) -> :result end) == {:ok, :result} @@ -180,7 +179,7 @@ defmodule TransactionTest do handle_commit: [_, :newest_state]] = A.record(agent) end - test "inner transaction raise returns error on outer transaction" do + test "inner transaction raise returns error on other transactions" do stack = [ {:ok, :state}, {:ok, :began, :new_state}, @@ -197,8 +196,7 @@ defmodule TransactionTest do assert_raise RuntimeError, "oops", fn() -> P.transaction(conn, fn(_) -> raise "oops" end) end - assert_raise DBConnection.ConnectionError, "transaction rolling back", - fn() -> P.transaction(conn, fn(_) -> nil end) end + assert P.transaction(conn, fn(_) -> nil end) == {:error, :rollback} end) == {:error, :rollback} assert P.transaction(pool, fn(_) -> :result end) == {:ok, :result} diff --git a/lib/db_connection.ex b/lib/db_connection.ex index 4755e323..d1e55ff3 100644 --- a/lib/db_connection.ex +++ b/lib/db_connection.ex @@ -756,10 +756,11 @@ defmodule DBConnection do `run/3` and `transaction/3` can be nested multiple times. If a transaction is rolled back or a nested transaction `fun` raises the transaction is marked as - failed. Any calls inside a failed transaction (except `rollback/2`) will raise - until the outer transaction call returns. All running `transaction/3` calls - will return `{:error, :rollback}` if the transaction failed or connection - closed and `rollback/2` is not called for that `transaction/3`. + failed. All calls except `run/3`, `transaction/3`, `rollback/2`, `close/3` and + `close!/3` will raise an exception inside a failed transaction until the outer + transaction call returns. All `transaction/3` calls will return + `{:error, :rollback}` if the transaction failed or connection closed and + `rollback/2` is not called for that `transaction/3`. ### Options @@ -952,8 +953,17 @@ defmodule DBConnection do :ok end - defp handle(%DBConnection{conn_mod: conn_mod} = conn, fun, args, opts) do - {status, conn_state} = fetch_info(conn) + defp handle(conn, fun, args, opts) do + case fetch_info(conn) do + {:failed, _} -> + raise DBConnection.ConnectionError, "transaction rolling back" + {status, conn_state} -> + handle(conn, status, conn_state, fun, args, opts) + end + end + + defp handle(conn, status, conn_state, fun, args, opts) do + %DBConnection{conn_mod: conn_mod} = conn try do apply(conn_mod, fun, args ++ [opts, conn_state]) else @@ -1085,7 +1095,7 @@ defmodule DBConnection do end defp raised_close(conn, query, opts, raised) do - case handle(conn, :handle_close, [query], opts) do + case handle_close(conn, query, opts) do {:ok, _} -> raised {:error, _} -> @@ -1107,8 +1117,12 @@ defmodule DBConnection do end defp run_close(conn, query, opts) do - fun = &handle(&1, :handle_close, [query], opts) - run_meter(conn, fun, opts) + run_meter(conn, &handle_close(&1, query, opts), opts) + end + + defp handle_close(conn, query, opts) do + {status, conn_state} = fetch_info(conn) + handle(conn, status, conn_state, :handle_close, [query], opts) end defmacrop time() do @@ -1222,7 +1236,7 @@ defmodule DBConnection do defp transaction_meter(%DBConnection{} = conn, fun, opts) do case fetch_info(conn) do - {:transaction, _} -> + {trans, _} when trans in [:transaction, :failed] -> {transaction_nested(conn, fun), nil} {:idle, conn_state} -> log = Keyword.get(opts, :log) @@ -1490,14 +1504,21 @@ defmodule DBConnection do end defp deallocate(conn, {_, query, cursor}, opts) do + deallocate(conn, query, cursor, opts) + end + + defp deallocate(conn, query, cursor, opts) do case get_info(conn) do - :closed -> :ok - _ -> deallocate(conn, query, cursor, opts) + {status, conn_state} -> + deallocate(conn, status, conn_state, query, cursor, opts) + :closed -> + :ok end end - defp deallocate(conn, query, cursor, opts) do - close = &handle(&1, :handle_deallocate, [query, cursor], opts) + defp deallocate(conn, status, conn_state, query, cursor, opts) do + args = [query, cursor] + close = &handle(&1, status, conn_state, :handle_deallocate, args, opts) {result, meter} = run_meter(conn, close, opts) case log(:deallocate, query, cursor, meter, result) do {:ok, _} -> :ok @@ -1519,8 +1540,6 @@ defmodule DBConnection do defp fetch_info(conn) do case get_info(conn) do - {:failed, _} -> - raise DBConnection.ConnectionError, "transaction rolling back" {_, _} = info -> info :closed ->