From 3ea2434dce4afdc3b7f9ed26378cd751668734d7 Mon Sep 17 00:00:00 2001 From: James Fish Date: Mon, 10 Jul 2017 00:47:49 +0100 Subject: [PATCH 1/3] Introduce begin, rollback and commit * Add begin(!)/1,2 to checkout and begin * Add rollback(!)/1,2 to rollback and checkin * Add commit(!)/1,2 to commit (or rollback) and checkin rollback/2 is overloaded and has old behaviour inside transaction/3 for backwards compatibility. However transaction/3 is deprecated because is makes assumptions about database status that may not hold. New return values from handle_begin/2, handle_rollback/2 and handle_commit/2 allow to signal mistaken database status. --- .../cases/transaction_execute_test.exs | 10 +- integration_test/cases/transaction_test.exs | 173 +++++- lib/db_connection.ex | 493 ++++++++++++++---- 3 files changed, 556 insertions(+), 120 deletions(-) diff --git a/integration_test/cases/transaction_execute_test.exs b/integration_test/cases/transaction_execute_test.exs index 7b0473f4..eb288329 100644 --- a/integration_test/cases/transaction_execute_test.exs +++ b/integration_test/cases/transaction_execute_test.exs @@ -263,7 +263,8 @@ defmodule TransactionExecuteTest do P.rollback(conn2, :oops) end) == {:error, :oops} - assert_raise DBConnection.ConnectionError, "transaction rolling back", + assert_raise DBConnection.ConnectionError, + "legacy transaction rolling back", fn() -> P.execute!(conn, %Q{}, [:param]) end end) == {:error, :rollback} @@ -273,7 +274,7 @@ defmodule TransactionExecuteTest do handle_rollback: [_, :new_state]] = A.record(agent) end - test "transaction does not log commit if closed" do + test "transaction logs rollback if closed" do stack = [ {:ok, :state}, {:ok, :began, :new_state}, @@ -290,13 +291,12 @@ defmodule TransactionExecuteTest do log = &send(parent, &1) P.transaction(pool, fn(conn) -> - assert_received %DBConnection.LogEntry{call: :transaction, - query: :begin} + assert_received %DBConnection.LogEntry{call: :begin, query: :begin} assert_raise DBConnection.ConnectionError, fn() -> P.execute(conn, %Q{}, [:param]) end end, [log: log]) - refute_received %DBConnection.LogEntry{call: :transaction} + assert_received %DBConnection.LogEntry{call: :rollback, query: :rollback} assert [ {:connect, [_]}, diff --git a/integration_test/cases/transaction_test.exs b/integration_test/cases/transaction_test.exs index 371263aa..2dd7e250 100644 --- a/integration_test/cases/transaction_test.exs +++ b/integration_test/cases/transaction_test.exs @@ -52,8 +52,8 @@ defmodule TransactionTest do log = &send(parent, &1) assert P.transaction(pool, fn(_) -> - assert_received %DBConnection.LogEntry{call: :transaction} = entry - assert %{query: :begin, params: nil, result: {:ok, :began}} = entry + assert_received %DBConnection.LogEntry{call: :begin} = entry + assert %{query: :begin, params: nil, result: {:ok, _, :began}} = entry assert is_integer(entry.pool_time) assert entry.pool_time >= 0 assert is_integer(entry.connection_time) @@ -63,7 +63,7 @@ defmodule TransactionTest do :result end, [log: log]) == {:ok, :result} - assert_received %DBConnection.LogEntry{call: :transaction} = entry + assert_received %DBConnection.LogEntry{call: :commit} = entry assert %{query: :commit, params: nil, result: {:ok, :committed}} = entry assert is_nil(entry.pool_time) assert is_integer(entry.connection_time) @@ -71,11 +71,11 @@ defmodule TransactionTest do assert is_nil(entry.decode_time) assert P.transaction(pool, fn(conn) -> - assert_received %DBConnection.LogEntry{} + assert_received %DBConnection.LogEntry{call: :begin} P.rollback(conn, :result) end, [log: log]) == {:error, :result} - assert_received %DBConnection.LogEntry{call: :transaction} = entry + assert_received %DBConnection.LogEntry{call: :rollback} = entry assert %{query: :rollback, params: nil, result: {:ok, :rolledback}} = entry assert is_nil(entry.pool_time) assert is_integer(entry.connection_time) @@ -304,7 +304,7 @@ defmodule TransactionTest do P.transaction(pool, fn(_) -> flunk("transaction ran") end, [log: log]) end - assert_received %DBConnection.LogEntry{call: :transaction} = entry + assert_received %DBConnection.LogEntry{call: :begin} = entry assert %{query: :begin, params: nil, result: {:error, ^err}} = entry assert is_integer(entry.pool_time) assert entry.pool_time >= 0 @@ -341,7 +341,7 @@ defmodule TransactionTest do P.transaction(pool, fn(_) -> flunk("transaction ran") end, [log: log]) end - assert_received %DBConnection.LogEntry{call: :transaction} = entry + assert_received %DBConnection.LogEntry{call: :begin} = entry assert %{query: :begin, params: nil, result: {:error, err}} = entry assert %DBConnection.ConnectionError{message: "an exception was raised: ** (RuntimeError) oops" <> _} = err assert is_integer(entry.pool_time) @@ -494,11 +494,11 @@ defmodule TransactionTest do assert_raise RuntimeError, "oops", fn() -> P.transaction(pool, fn(_) -> - assert_received %DBConnection.LogEntry{} + assert_received %DBConnection.LogEntry{call: :begin} end, [log: log]) end - assert_received %DBConnection.LogEntry{call: :transaction} = entry + assert_received %DBConnection.LogEntry{call: :commit} = entry assert %{query: :commit, params: nil, result: {:error, ^err}} = entry assert is_nil(entry.pool_time) assert is_integer(entry.connection_time) @@ -635,10 +635,10 @@ defmodule TransactionTest do P.transaction(pool, fn(_) -> :ok end, [log: log]) end - assert_received %DBConnection.LogEntry{call: :transaction} = entry - assert %{query: :begin, params: nil, result: {:ok, :began}} = entry + assert_received %DBConnection.LogEntry{call: :begin} = entry + assert %{query: :begin, params: nil, result: {:ok, _, :began}} = entry - assert_received %DBConnection.LogEntry{call: :transaction} = entry + assert_received %DBConnection.LogEntry{call: :commit} = entry assert %{query: :commit, params: nil, result: {:error, err}} = entry assert %DBConnection.ConnectionError{message: "an exception was raised: ** (RuntimeError) oops" <> _} = err assert is_nil(entry.pool_time) @@ -725,10 +725,10 @@ defmodule TransactionTest do P.transaction(pool, &P.rollback(&1, :oops), [log: log]) end - assert_received %DBConnection.LogEntry{call: :transaction} = entry - assert %{query: :begin, params: nil, result: {:ok, :began}} = entry + assert_received %DBConnection.LogEntry{call: :begin} = entry + assert %{query: :begin, params: nil, result: {:ok, _, :began}} = entry - assert_received %DBConnection.LogEntry{call: :transaction} = entry + assert_received %DBConnection.LogEntry{call: :rollback} = entry assert %{query: :rollback, params: nil, result: {:error, err}} = entry assert %DBConnection.ConnectionError{message: "an exception was raised: ** (RuntimeError) oops" <> _} = err assert is_nil(entry.pool_time) @@ -760,17 +760,154 @@ defmodule TransactionTest do assert_raise RuntimeError, "oops", fn() -> P.transaction(pool, fn(_) -> - assert_received %DBConnection.LogEntry{call: :transaction, - query: :begin} + assert_received %DBConnection.LogEntry{call: :begin, query: :begin} raise "oops" end, [log: log]) end - assert_received %DBConnection.LogEntry{call: :transaction, query: :rollback} + assert_received %DBConnection.LogEntry{call: :rollback, query: :rollback} assert [ connect: [_], handle_begin: [_, :state], handle_rollback: [_, :new_state]] = A.record(agent) end + + test "transaction logs begin :commit and :rollback" do + stack = [ + {:ok, :state}, + {:commit, :new_state}, + {:rollback, :newer_state}, + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + + log = &send(parent, &1) + + assert P.transaction(pool, fn(_) -> flunk("transaction ran") end, + [log: log]) == {:error, :rollback} + + assert_received %DBConnection.LogEntry{call: :begin} = entry + err = DBConnection.TransactionError.exception(:commit) + assert %{query: :begin, params: nil, result: {:error, ^err}} = entry + assert is_integer(entry.pool_time) + assert entry.pool_time >= 0 + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_nil(entry.decode_time) + + refute_received %DBConnection.LogEntry{} + + assert P.transaction(pool, fn(_) -> flunk("transaction ran") end, + [log: log]) == {:error, :rollback} + + assert_received %DBConnection.LogEntry{call: :begin} = entry + err = DBConnection.TransactionError.exception(:rollback) + assert %{query: :begin, params: nil, result: {:error, ^err}} = entry + assert is_integer(entry.pool_time) + assert entry.pool_time >= 0 + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_nil(entry.decode_time) + + refute_received %DBConnection.LogEntry{} + + assert [ + connect: [_], + handle_begin: [ _, :state], + handle_begin: [_, :new_state]] = A.record(agent) + end + + test "transaction logs commit :begin and :rollback" do + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:begin, :newer_state}, + {:ok, :began, :newest_state}, + {:rollback, :state2}, + {:ok, :rolledback, :new_state2} + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + + log = &send(parent, &1) + + assert P.transaction(pool, + fn(_) -> assert_receive %DBConnection.LogEntry{call: :begin} end, + [log: log]) == {:error, :rollback} + + assert_received %DBConnection.LogEntry{call: :commit} = entry + err = DBConnection.TransactionError.exception(:begin) + assert %{query: :commit, params: nil, result: {:error, ^err}} = entry + assert is_nil(entry.pool_time) + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_nil(entry.decode_time) + + refute_received %DBConnection.LogEntry{} + + assert P.transaction(pool, + fn(_) -> assert_receive %DBConnection.LogEntry{call: :begin} end, + [log: log]) == {:error, :rollback} + + assert_received %DBConnection.LogEntry{call: :commit} = entry + assert %{query: :rollback, params: nil, result: {:ok, :rolledback}} = entry + assert is_nil(entry.pool_time) + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_nil(entry.decode_time) + + refute_received %DBConnection.LogEntry{} + + assert [ + connect: [_], + handle_begin: [ _, :state], + handle_commit: [_, :new_state], + handle_begin: [_, :newer_state], + handle_commit: [_, :newest_state], + handle_rollback: [_, :state2]] = A.record(agent) + end + + test "transaction logs rollback :begin" do + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:begin, :newer_state} + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + + log = &send(parent, &1) + + assert P.transaction(pool, + fn(conn) -> + assert_receive %DBConnection.LogEntry{call: :begin} + P.rollback(conn, :oops) + end, + [log: log]) == {:error, :oops} + + assert_received %DBConnection.LogEntry{call: :rollback} = entry + err = DBConnection.TransactionError.exception(:begin) + assert %{query: :rollback, params: nil, result: {:error, ^err}} = entry + assert is_nil(entry.pool_time) + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_nil(entry.decode_time) + + refute_received %DBConnection.LogEntry{} + + assert [ + connect: [_], + handle_begin: [ _, :state], + handle_rollback: [_, :new_state]] = A.record(agent) + end end diff --git a/lib/db_connection.ex b/lib/db_connection.ex index 0175760c..e8b092c2 100644 --- a/lib/db_connection.ex +++ b/lib/db_connection.ex @@ -76,6 +76,17 @@ defmodule DBConnection do defstruct [:pool_mod, :pool_ref, :conn_mod, :conn_ref, :conn_mode] + defmodule TransactionError do + defexception [:next, :message] + + def exception(:begin), + do: %__MODULE__{next: :begin, message: "transaction is not started"} + def exception(:commit), + do: %__MODULE__{next: :commit, message: "transaction is already started"} + def exception(:rollback), + do: %__MODULE__{next: :rollback, message: "transaction is aborted"} + end + @typedoc """ Run or transaction connection reference. """ @@ -146,42 +157,56 @@ defmodule DBConnection do {:ok, new_state :: any} | {:disconnect, Exception.t, new_state :: any} @doc """ - Handle the beginning of a transaction. Return `{:ok, result, state}` to - continue, `{:error, exception, state}` to abort the transaction and + Handle the beginning of a transaction. + + Return `{:ok, result, state}` to continue, `{:commit, state}` to notify caller + to commit open transaction before continuing, `{:rollback, state}` to notify + caller to rollback aborted transaction before continuing, + `{:error, exception, state}` to abort the transaction and continue or `{:disconnect, exception, state}` to abort the transaction and disconnect. + A callback implementation should only return `:commit` or `:rollback` if it + can determine the database's transaction status without side effect. + This callback is called in the client process. """ @callback handle_begin(opts :: Keyword.t, state :: any) :: {:ok, result, new_state :: any} | + {:commit | :rollback, new_state :: any} | {:error | :disconnect, Exception.t, new_state :: any} @doc """ Handle committing a transaction. Return `{:ok, result, state}` on success and - to continue, `{:error, exception, state}` to abort the transaction and - continue or `{:disconnect, exception, state}` to abort the transaction - and disconnect. + to continue, `{:begin, state}` to open transaction before continuing, + `{:rollback, state}` to rollback aborted transaction, + `{:error, exception, state}` to abort the transaction and continue or + `{:disconnect, exception, state}` to abort the transaction and disconnect. + + A callback implementation should only return `:begin` or `:rollback` if it + can determine the database's transaction status without side effect. This callback is called in the client process. """ @callback handle_commit(opts :: Keyword.t, state :: any) :: {:ok, result, new_state :: any} | + {:begin | :rollback, new_state :: any} | {:error | :disconnect, Exception.t, new_state :: any} @doc """ Handle rolling back a transaction. Return `{:ok, result, state}` on success - and to continue, `{:error, exception, state}` to abort the transaction - and continue or `{:disconnect, exception, state}` to abort the - transaction and disconnect. + and to continue, `{:begin, state}` to open transaction before continuing, + `{:error, exception, state}` to abort the transaction and continue or + `{:disconnect, exception, state}` to abort the transaction and disconnect. - A transaction will be rolled back if an exception occurs or - `rollback/2` is called. + A callback implementation should only return `:begin` if it + can determine the database' transaction status without side effect. - This callback is called in the client process. + This callback is called in the client and connection process. """ @callback handle_rollback(opts :: Keyword.t, state :: any) :: {:ok, result, new_state :: any} | + {:begin, new_state :: any} | {:error | :disconnect, Exception.t, new_state :: any} @doc """ @@ -806,7 +831,8 @@ defmodule DBConnection do def transaction(%DBConnection{conn_mode: :transaction} = conn, fun, _opts) do %DBConnection{conn_ref: conn_ref} = conn try do - fun.(conn) + result = fun.(conn) + conclude(conn, result) catch :throw, {__MODULE__, ^conn_ref, reason} -> fail(conn) @@ -817,48 +843,216 @@ defmodule DBConnection do :erlang.raise(kind, reason, stack) else result -> - conclude(conn, result) + {:ok, result} end end - def transaction(conn, fun, opts) do - case run(conn, &run_transaction/5, fun, meter(opts), opts) do - {query, {:ok, res, {kind, reason, stack}, meter}} -> - log(meter, :transaction, query, nil, {:ok, res}) - :erlang.raise(kind, reason, stack) - {query, {:ok, res, return, meter}} -> - log(meter, :transaction, query, nil, {:ok, res}) - return - {query, {:error, err, meter}} when query != nil -> - log(meter, :transaction, query, nil, {:error, err}) + def transaction(%DBConnection{} = conn, fun, opts) do + case begin(conn, &run/4, opts) do + {:ok, conn, _} -> + run_transaction(conn, fun, &run/4, opts) + {:error, %DBConnection.TransactionError{}} -> + {:error, :rollback} + {:error, err} -> + raise err + end + end + def transaction(pool, fun, opts) do + case begin(pool, &checkout/4, opts) do + {:ok, conn, _} -> + run_transaction(conn, fun, &checkin/4, opts) + {:error, %DBConnection.TransactionError{}} -> + {:error, :rollback} + {:error, err} -> raise err - {query, return} -> - log(return, :transaction, query, nil) end end @doc """ - Rollback a transaction, does not return. + Acquire a lock on a connection and begin a database transaction. + + Return `{:ok, conn, result}` on success or `{:error, exception}` if there was + an error. If a new lock was acquired and there is an error, it is released. + + The callback implementation should determine (using transaction status of the + database) the state of a transaction. If a transaction is already started, + according to the callback implementation, then an error tuple with a + `DBConnection.TransactionError` is returned. - Aborts the current transaction fun. If inside `transaction/3` bubbles - up to the top level. + This function will return a error tuple with a `DBConnection.ConnectionError` + when called inside a deprecated `transaction/3`. + + ### Options + + See module documentation. The pool and connection module may support other + options. All options are passed to `handle_begin/2`. + + See `commit/2` and `rollback/2`. ### Example - {:error, :bar} = DBConnection.transaction(conn, fn(conn) -> - DBConnection.rollback(conn, :bar) - IO.puts "never reaches here!" + {:ok, conn, result} = DBConnection.begin(pool) + try do + DBConnection.execute!(conn, "SELECT * FROM table", []) + DBConnection.commit(conn) + after + DBConnection.rollback(conn) + end + """ + @spec begin(conn, opts :: Keyword.t) :: + {:ok, t, result} | {:error, Exception.t} + def begin(conn, opts \\ []) do + begin(conn, &checkout/4, opts) + end + + @doc """ + Acquire a lock on a connection and begin a database transaction. + + Returns `{conn, result}` on success, otherwise raises an exception on error. + + This function will raise a `DBConnection.ConnectionError` when called inside a + deprecated `transaction/3`. + + See `begin/2`. + """ + @spec begin!(conn, opts :: Keyword.t) :: {t, result} + def begin!(conn, opts \\ []) do + case begin(conn, opts) do + {:ok, conn, result} -> + {conn, result} + {:error, err} -> + raise err + end + end + + @doc """ + Rollback a database transaction and release lock on connection. + + When outside of a `transaction/3` call returns `{:ok, result}` on success or + `{:error, exception}` if there was an error. The lock on the connection is + always released. + + It is possible to issue multiple rollbacks requests without a begin (`begin/2` + or `checkout_begin/2`). The semantics are left to the callback + implementation. + + The callback implementation should determine (using transaction status of the + database) the state of a transaction If a transaction is not started, + according to callback implmentation, then an error tuple with a + `DBConnection.TransactionError` is returned. + + When inside of a `transaction/3` call does a non-local return, using a + `throw/1` to cause the transaction to enter a failed state and the + `transaction/3` call returns `{:error, reason}`. If `transaction/3` calls are + nested the connection is marked as failed until the outermost transaction call + does the database rollback. Note that `transaction/3` is deprecated. + + ### Options + + See module documentation. The pool and connection module may support other + options. All options are passed to `handle_rollback/2`. + + See `begin/2`, `commit/2`, and `transaction/3`. + + ### Example + + {:ok, result} = DBConnection.begin(conn) + try do + DBConnection.execute!(conn, "SELECT * FROM table", []) + DBConnection.commit!(conn) + after + DBConnection.rollback(conn) + end + + ### Deprecated Example + + {:error, :oops} = DBConnection.transaction(pool, fun(conn) -> + DBConnection.rollback(conn, :oops) end) """ - @spec rollback(t, reason :: any) :: no_return - def rollback(conn, reason) do - %DBConnection{conn_ref: conn_ref, conn_mode: mode} = conn - case get_info(conn, nil) do - {:error, err, _meter} -> + @spec rollback(conn, (opts :: Keyword.t) | reason :: term) :: + {:ok, result} | {:error, Exception.t} + def rollback(conn, opts \\ []) + def rollback(%DBConnection{conn_mode: :transaction} = conn, reason) do + %DBConnection{conn_ref: conn_ref} = conn + throw({__MODULE__, conn_ref, reason}) + end + def rollback(conn, opts) do + rollback(conn, &checkin/4, opts) + end + + @doc """ + Rollback a database transaction and release lock on connection. + + Returns `result` on success, otherwise raises an exception on error. + + This function will raise a `DBConnection.ConnectionError` when called inside a + deprecated `transaction/3`. + + See `rollback/2`. + """ + @spec rollback!(conn, opts :: Keyword.t) :: result + def rollback!(conn, opts \\ []) do + case rollback(conn, opts) do + {:ok, result} -> + result + {:error, err} -> + raise err + end + end + + @doc """ + Commit a database transaction and release lock on connection. + + Return `{:ok, result}` on success or `{:error, exception}` if there was an + error. + + The callback implementation should determine (using transaction status of the + database) the state of a transaction If the transaction is aborted or not + started, according to callback implmentation, then an error tuple with a + `DBConnection.TransactionError` is returned. In the aborted case a rollback + will be attempted and if it errors, that error will be returned instead. + + This function will return a error tuple with a `DBConnection.ConnectionError` + when called inside a deprecated `transaction/3`. + + ### Options + + See module documentation. The pool and connection module may support other + options. All options are passed to `handle_commit/2` and `handle_rollback/2`. + + See `begin/2` and `rollback/2`. + + ### Example + + {:ok, conn, result} = DBConnection.begin(conn) + try do + res = DBConnection.execute!(conn, "SELECT * FROM table", []) + after + DBConnection.commit(conn) + end + """ + @spec commit(conn, opts :: Keyword.t) :: {:ok, result} | {:error, Exception.t} + def commit(conn, opts \\ []) do + commit(conn, &checkin/4, opts) + end + + @doc """ + Commit a database transaction. + + Returns `result` on success, otherwise raises an exception on error. + + This function will raise a `DBConnection.ConnectionError` when called inside a + deprecated `transaction/3`. + + See `commit/2`. + """ + @spec commit!(conn, opts :: Keyword.t) :: result + def commit!(conn, opts \\ []) do + case commit(conn, opts) do + {:ok, result} -> + result + {:error, err} -> raise err - {_status, _conn_state, _meter} when mode == :transaction -> - throw({__MODULE__, conn_ref, reason}) - {_status, _conn_state, _meter} -> - raise "not inside transaction" end end @@ -969,6 +1163,24 @@ defmodule DBConnection do end end + defp checkout(%DBConnection{} = conn, fun, meter, opts) do + with {:ok, conn_state, meter} <- fetch_info(conn, meter), + {:ok, result, meter} <- fun.(conn, conn_state, meter, opts) do + {:ok, conn, result, meter} + end + end + defp checkout(pool, fun, meter, opts) do + with {:ok, conn, conn_state, meter} <- checkout(pool, meter, opts) do + case fun.(conn, conn_state, meter, opts) do + {:ok, result, meter} -> + {:ok, conn, result, meter} + error -> + checkin(conn, opts) + error + end + end + end + defp checkin(conn, opts) do case delete_info(conn) do {:ok, conn_state} -> @@ -980,6 +1192,17 @@ defmodule DBConnection do end end + defp checkin(%DBConnection{} = conn, fun, meter, opts) do + with {:ok, conn_state, meter} <- fetch_info(conn, meter) do + return = fun.(conn, conn_state, meter, opts) + checkin(conn, opts) + return + end + end + defp checkin(pool, fun, meter, opts) do + run(pool, fun, meter, opts) + end + defp delete_disconnect(conn, conn_state, err, opts) do _ = delete_info(conn) %DBConnection{pool_mod: pool_mod, pool_ref: pool_ref} = conn @@ -1011,10 +1234,18 @@ defmodule DBConnection do when fun in [:handle_first, :handle_next] -> put_info(conn, conn_state) {:deallocate, result, meter} - {:error, err, conn_state} when fun == :handle_begin -> + {:begin, conn_state} + when fun in [:handle_commit, :handle_rollback] -> put_info(conn, conn_state) - {:error, err, meter} - {:error, err, conn_state} when fun != :handle_begin -> + {:begin, meter} + {:commit, conn_state} when fun == :handle_begin -> + put_info(conn, conn_state) + {:commit, meter} + {:rollback, conn_state} + when fun in [:handle_begin, :handle_commit] -> + put_info(conn, conn_state) + {:rollback, meter} + {:error, err, conn_state} -> put_info(conn, conn_state) {:error, err, meter} {:disconnect, err, conn_state} -> @@ -1191,6 +1422,21 @@ defmodule DBConnection do end end + defp run(%DBConnection{} = conn, fun, meter, opts) do + with {:ok, conn_state, meter} <- fetch_info(conn, meter) do + fun.(conn, conn_state, meter, opts) + end + end + defp run(pool, fun, meter, opts) do + with {:ok, conn, conn_state, meter} <- checkout(pool, meter, opts) do + try do + fun.(conn, conn_state, meter, opts) + after + checkin(conn, opts) + end + end + end + defp run(%DBConnection{} = conn, fun, arg, meter, opts) do with {:ok, conn_state, meter} <- fetch_info(conn, meter) do fun.(conn, conn_state, arg, meter, opts) @@ -1284,27 +1530,41 @@ defmodule DBConnection do :ok end - defp run_transaction(conn, conn_state, fun, meter, opts) do - case run_begin(conn, conn_state, meter, opts) do - {:ok, res, meter} -> - %DBConnection{conn_ref: conn_ref} = conn - try do - log(meter, :transaction, :begin, nil, {:ok, res}) - fun.(%DBConnection{conn | conn_mode: :transaction}) - catch - :throw, {__MODULE__, ^conn_ref, reason} -> - rollback(conn, {:error, reason}, opts) - kind, reason -> - stack = System.stacktrace() - rollback(conn, {kind, reason, stack}, opts) - else - result -> - commit(conn, result, opts) + defp run_transaction(conn, fun, run, opts) do + %DBConnection{conn_ref: conn_ref} = conn + try do + result = fun.(%DBConnection{conn | conn_mode: :transaction}) + conclude(conn, result) + catch + :throw, {__MODULE__, ^conn_ref, reason} -> + reset(conn) + case rollback(conn, run, opts) do + {:ok, _} -> + {:error, reason} + {:error, %DBConnection.TransactionError{}} -> + {:error, reason} + {:error, %DBConnection.ConnectionError{}} -> + {:error, reason} + {:error, err} -> + raise err end - {:error, err, meter} -> - {:begin, {:error, err, meter}} - {kind, reason, stack, meter} -> - {:begin, {kind, reason, stack, meter}} + kind, reason -> + stack = System.stacktrace() + reset(conn) + _ = rollback(conn, run, opts) + :erlang.raise(kind, reason, stack) + else + result -> + case commit(conn, run, opts) do + {:ok, _} -> + {:ok, result} + {:error, %DBConnection.TransactionError{}} -> + {:error, :rollback} + {:error, err} -> + raise err + end + after + reset(conn) end end @@ -1317,12 +1577,46 @@ defmodule DBConnection do end end - defp conclude(conn, result) do + defp conclude(%DBConnection{conn_ref: conn_ref} = conn, result) do case get_info(conn, nil) do {:ok, _conn_state, _meter} -> - {:ok, result} + result _ -> - {:error, :rollback} + throw({__MODULE__, conn_ref, :rollback}) + end + end + + defp reset(conn) do + case get_info(conn, nil) do + {:failed, conn_state, _meter} -> + put_info(conn, :ok, conn_state) + _ -> + :ok + end + end + + defp transaction_error(conn, query, opts) do + run(conn, &run_transaction_error/5, query, meter(opts), opts) + end + + defp run_transaction_error(conn, conn_state, query, meter, opts) do + meter = event(meter, query) + msg = "can not #{query} inside legacy transaction" + err = DBConnection.ConnectionError.exception(msg) + delete_disconnect(conn, conn_state, err, opts) + log(meter, query, query, nil, {:error, err}) + end + + defp begin(%DBConnection{conn_mode: :transaction} = conn, _, opts) do + transaction_error(conn, :begin, opts) + end + defp begin(conn, run, opts) do + case run.(conn, &run_begin/4, meter(opts), opts) do + {next, meter} -> + err = DBConnection.TransactionError.exception(next) + log(meter, :begin, :begin, nil, {:error, err}) + other -> + log(other, :begin, :begin, nil) end end @@ -1331,47 +1625,52 @@ defmodule DBConnection do handle(conn, conn_state, :handle_begin, [], meter, opts) end - defp rollback(conn, return, opts) do - case get_info(conn, meter(opts)) do - {status, conn_state, meter} when status in [:ok, :failed] -> - run_rollback(conn, status, conn_state, return, meter, opts) - {:error, _err, _meter} -> - case return do - {:error, reason} -> - {nil, {:error, reason, nil}} - {kind, stack, reason} -> - {nil, {kind, stack, reason, nil}} - end + defp rollback(%DBConnection{conn_mode: :transaction} = conn, _, opts) do + transaction_error(conn, :rollback, opts) + end + defp rollback(conn, run, opts) do + case run.(conn, &run_rollback/4, meter(opts), opts) do + {next, meter} -> + err = DBConnection.TransactionError.exception(next) + log(meter, :rollback, :rollback, nil, {:error, err}) + other -> + log(other, :rollback, :rollback, nil) end end - defp run_rollback(conn, status, conn_state, return, meter, opts) do + defp run_rollback(conn, conn_state, meter, opts) do meter = event(meter, :rollback) - case cleanup(conn, status, conn_state, :handle_rollback, [], meter, opts) do - {:ok, res, meter} -> - {:rollback, {:ok, res, return, meter}} - return -> - {:rollback, return} - end + handle(conn, conn_state, :handle_rollback, [], meter, opts) end - defp commit(conn, result, opts) do - case get_info(conn, meter(opts)) do - {:ok, conn_state, meter} -> - run_commit(conn, conn_state, {:ok, result}, meter, opts) - {:failed, conn_state, meter} -> - return = {:error, :rollback} - run_rollback(conn, :failed, conn_state, return, meter, opts) - {:error, _err, _meter} -> - {nil, {:error, :rollback, nil}} + defp commit(%DBConnection{conn_mode: :transaction} = conn, _, opts) do + transaction_error(conn, :commit, opts) + end + defp commit(conn, run, opts) do + case run.(conn, &run_commit/4, meter(opts), opts) do + {:rollback, {:ok, result, meter}} -> + log(meter, :commit, :rollback, nil, {:ok, result}) + err = DBConnection.TransactionError.exception(:rollback) + {:error, err} + {query, {next, meter}} -> + err = DBConnection.TransactionError.exception(next) + log(meter, :commit, query, nil, {:error, err}) + {query, other} -> + log(other, :commit, query, nil) + {:error, err, meter} -> + log(meter, :commit, :commit, nil, {:error, err}) + {kind, reason, stack, meter} -> + log(meter, :commit, :commit, nil, {kind, reason, stack}) end end - defp run_commit(conn, conn_state, return, meter, opts) do + defp run_commit(conn, conn_state, meter, opts) do meter = event(meter, :commit) case handle(conn, conn_state, :handle_commit, [], meter, opts) do - {:ok, res, meter} -> - {:commit, {:ok, res, return, meter}} + {:rollback, meter} -> + # conn_state must valid as just put there in previous call + {:ok, conn_state, meter} = fetch_info(conn, meter) + {:rollback, run_rollback(conn, conn_state, meter, opts)} return -> {:commit, return} end @@ -1503,7 +1802,7 @@ defmodule DBConnection do {:ok, conn_state} -> {:ok, conn_state, meter} {:failed, _conn_state} -> - msg = "transaction rolling back" + msg = "legacy transaction rolling back" {:error, DBConnection.ConnectionError.exception(msg), meter} nil -> msg = "connection is closed" From 5cbc6aca8d8578710e92c4ca88b2a28b04cc5518 Mon Sep 17 00:00:00 2001 From: James Fish Date: Mon, 10 Jul 2017 21:21:02 +0100 Subject: [PATCH 2/3] Improve docs --- lib/db_connection.ex | 107 ++++++++++++++++++++++++++++++------------- 1 file changed, 76 insertions(+), 31 deletions(-) diff --git a/lib/db_connection.ex b/lib/db_connection.ex index e8b092c2..7843b256 100644 --- a/lib/db_connection.ex +++ b/lib/db_connection.ex @@ -162,9 +162,8 @@ defmodule DBConnection do Return `{:ok, result, state}` to continue, `{:commit, state}` to notify caller to commit open transaction before continuing, `{:rollback, state}` to notify caller to rollback aborted transaction before continuing, - `{:error, exception, state}` to abort the transaction and - continue or `{:disconnect, exception, state}` to abort the transaction - and disconnect. + `{:error, exception, state}` to error without beginning the transaction, or + `{:disconnect, exception, state}` to error and disconnect. A callback implementation should only return `:commit` or `:rollback` if it can determine the database's transaction status without side effect. @@ -177,11 +176,12 @@ defmodule DBConnection do {:error | :disconnect, Exception.t, new_state :: any} @doc """ - Handle committing a transaction. Return `{:ok, result, state}` on success and - to continue, `{:begin, state}` to open transaction before continuing, - `{:rollback, state}` to rollback aborted transaction, - `{:error, exception, state}` to abort the transaction and continue or - `{:disconnect, exception, state}` to abort the transaction and disconnect. + Handle committing a transaction. Return `{:ok, result, state}` on successfully + committing transaction, `{:begin, state}` to notify caller to begin + transaction before continuing, `{:rollback, state}` to notify caller to + rollback aborted transaction before continuing, `{:error, exception, state}` + to error and no longer be inside transaction, or + `{:disconnect, exception, state}` to error and disconnect. A callback implementation should only return `:begin` or `:rollback` if it can determine the database's transaction status without side effect. @@ -194,10 +194,11 @@ defmodule DBConnection do {:error | :disconnect, Exception.t, new_state :: any} @doc """ - Handle rolling back a transaction. Return `{:ok, result, state}` on success - and to continue, `{:begin, state}` to open transaction before continuing, - `{:error, exception, state}` to abort the transaction and continue or - `{:disconnect, exception, state}` to abort the transaction and disconnect. + Handle committing a transaction. Return `{:ok, result, state}` on successfully + committing transaction, `{:begin, state}` to notify caller to begin + transaction before continuing, `{:error, exception, state}` to error and no + longer be inside transaction, or `{:disconnect, exception, state}` to error + and disconnect. A callback implementation should only return `:begin` if it can determine the database' transaction status without side effect. @@ -544,11 +545,18 @@ defmodule DBConnection do ### Example - query = %Query{statement: "SELECT id FROM table"} - {:ok, query} = DBConnection.prepare(conn, query) - {:ok, result} = DBConnection.execute(conn, query, []) - :ok = DBConnection.close(conn, query) - + {conn, _} = DBConnection.begin!(pool) + try do + query = DBConnection.prepare!(conn) + try do + DBConnection.execute!(conn, "SELECT * FROM table", []) + after + DBConnection.close(conn, query) + end + DBConnection.commit!(conn) + after + DBConnection.rollback(conn) + end """ @spec prepare(conn, query, opts :: Keyword.t) :: {:ok, query} | {:error, Exception.t} @@ -873,6 +881,22 @@ defmodule DBConnection do Return `{:ok, conn, result}` on success or `{:error, exception}` if there was an error. If a new lock was acquired and there is an error, it is released. + It is possible to issue begin requests without a later `commit/2` or + `rollback/2` as transaction might be concluded by other actions. The semantics + are left to the callback implementation. However it is strongly advised to + ensure a matching `rollback/2` call always occur, even if `commit/2` should + occur beforehand. This ensures that on failure that the transaction is rolled + back and the lock on connection released. If the lock has been released before + a rollback call then `rollback/2` will return an error tuple. + + {:ok, conn, _result} = DBConnection.begin(pool) + try do + # transaction goes here! + DBConnection.commit!(conn) + after + DBConnection.rollback(conn) + end + The callback implementation should determine (using transaction status of the database) the state of a transaction. If a transaction is already started, according to the callback implementation, then an error tuple with a @@ -892,8 +916,9 @@ defmodule DBConnection do {:ok, conn, result} = DBConnection.begin(pool) try do - DBConnection.execute!(conn, "SELECT * FROM table", []) + res =DBConnection.execute!(conn, "SELECT * FROM table", []) DBConnection.commit(conn) + res after DBConnection.rollback(conn) end @@ -931,8 +956,8 @@ defmodule DBConnection do `{:error, exception}` if there was an error. The lock on the connection is always released. - It is possible to issue multiple rollbacks requests without a begin (`begin/2` - or `checkout_begin/2`). The semantics are left to the callback + It is possible to issue rollbacks requests without a `begin/2` as transaction + might be started by other actions. The semantics are left to the callback implementation. The callback implementation should determine (using transaction status of the @@ -955,10 +980,11 @@ defmodule DBConnection do ### Example - {:ok, result} = DBConnection.begin(conn) + {:ok, conn, result} = DBConnection.begin(pool) try do - DBConnection.execute!(conn, "SELECT * FROM table", []) - DBConnection.commit!(conn) + res = DBConnection.execute!(conn, "SELECT * FROM table", []) + DBConnection.commit(conn) + res after DBConnection.rollback(conn) end @@ -1006,6 +1032,10 @@ defmodule DBConnection do Return `{:ok, result}` on success or `{:error, exception}` if there was an error. + It is possible to issue commit requests without a `begin/2` as transaction + might be started by other actions. The semantics are left to the callback + implementation. + The callback implementation should determine (using transaction status of the database) the state of a transaction If the transaction is aborted or not started, according to callback implmentation, then an error tuple with a @@ -1024,11 +1054,13 @@ defmodule DBConnection do ### Example - {:ok, conn, result} = DBConnection.begin(conn) + {:ok, conn, result} = DBConnection.begin(pool) try do res = DBConnection.execute!(conn, "SELECT * FROM table", []) - after DBConnection.commit(conn) + res + after + DBConnection.rollback(conn) end """ @spec commit(conn, opts :: Keyword.t) :: {:ok, result} | {:error, Exception.t} @@ -1115,12 +1147,25 @@ defmodule DBConnection do ### Example - {:ok, results} = DBConnection.transaction(conn, fn(conn) -> - query = %Query{statement: "SELECT id FROM table"} - query = DBConnection.prepare!(conn, query) - stream = DBConnection.stream(conn, query, []) - Enum.to_list(stream) - end) + {conn, _} = DBConnection.begin!(pool) + try do + + query = %Query{statement: "SELECT id FROM table"} + query = DBConnection.prepare!(conn, query) + try do + stream = DBConnection.stream(conn, query, []) + res = Enum.to_list(stream) + DBConnection.commit!(conn) + res + after + # Make sure query is closed! + DBConnection.close(conn, query) + end + + after + # Make sure transaction is rolled back if anything goes wrong! + DBConnection.rollback(coon) + end """ @spec stream(t, query, params, opts :: Keyword.t) :: DBConnection.Stream.t def stream(%DBConnection{} = conn, query, params, opts \\ []) do From b8d128147ac2afa0e29db4675b3809210ce63c9a Mon Sep 17 00:00:00 2001 From: James Fish Date: Mon, 10 Jul 2017 21:31:42 +0100 Subject: [PATCH 3/3] Fail transaction/3 on explicit begin/rollback/commit --- integration_test/cases/transaction_test.exs | 43 +++++++++++++++++++++ lib/db_connection.ex | 10 +++-- test/test_support.exs | 12 ++++++ 3 files changed, 61 insertions(+), 4 deletions(-) diff --git a/integration_test/cases/transaction_test.exs b/integration_test/cases/transaction_test.exs index 2dd7e250..e5564da3 100644 --- a/integration_test/cases/transaction_test.exs +++ b/integration_test/cases/transaction_test.exs @@ -910,4 +910,47 @@ defmodule TransactionTest do handle_begin: [ _, :state], handle_rollback: [_, :new_state]] = A.record(agent) end + + test "transaction with explicit begin/rollback/commit call causes rollback" do + stack = [ + {:ok, :state}, + {:ok, :began, :new_state}, + {:ok, :rolledback, :newer_state}, + {:ok, :began, :newest_state}, + {:ok, :rolledback, :state2}, + {:ok, :began, :new_state2}, + {:ok, :rolledback, :newer_state2} + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self()] + {:ok, pool} = P.start_link(opts) + + assert P.transaction(pool, fn(conn) -> + assert_raise DBConnection.ConnectionError, + "can not begin inside legacy transaction", + fn -> P.begin!(conn) end + end) == {:error, :rollback} + + assert P.transaction(pool, fn(conn) -> + assert_raise DBConnection.ConnectionError, + "can not rollback inside legacy transaction", + fn -> P.rollback!(conn) end + end) == {:error, :rollback} + + assert P.transaction(pool, fn(conn) -> + assert_raise DBConnection.ConnectionError, + "can not commit inside legacy transaction", + fn -> P.commit!(conn) end + end) == {:error, :rollback} + + assert [ + connect: [_], + handle_begin: [ _, :state], + handle_rollback: [_, :new_state], + handle_begin: [ _, :newer_state], + handle_rollback: [_, :newest_state], + handle_begin: [_, :state2], + handle_rollback: [_, :new_state2]] = A.record(agent) + end end diff --git a/lib/db_connection.ex b/lib/db_connection.ex index 7843b256..ce74b197 100644 --- a/lib/db_connection.ex +++ b/lib/db_connection.ex @@ -997,7 +997,9 @@ defmodule DBConnection do """ @spec rollback(conn, (opts :: Keyword.t) | reason :: term) :: {:ok, result} | {:error, Exception.t} - def rollback(conn, opts \\ []) + def rollback(conn) do + rollback(conn, &checkin/4, []) + end def rollback(%DBConnection{conn_mode: :transaction} = conn, reason) do %DBConnection{conn_ref: conn_ref} = conn throw({__MODULE__, conn_ref, reason}) @@ -1018,7 +1020,7 @@ defmodule DBConnection do """ @spec rollback!(conn, opts :: Keyword.t) :: result def rollback!(conn, opts \\ []) do - case rollback(conn, opts) do + case rollback(conn, &checkin/4, opts) do {:ok, result} -> result {:error, err} -> @@ -1644,11 +1646,11 @@ defmodule DBConnection do run(conn, &run_transaction_error/5, query, meter(opts), opts) end - defp run_transaction_error(conn, conn_state, query, meter, opts) do + defp run_transaction_error(conn, _conn_state, query, meter, _opts) do meter = event(meter, query) msg = "can not #{query} inside legacy transaction" err = DBConnection.ConnectionError.exception(msg) - delete_disconnect(conn, conn_state, err, opts) + fail(conn) log(meter, query, query, nil, {:error, err}) end diff --git a/test/test_support.exs b/test/test_support.exs index cce1cd73..76f74be1 100644 --- a/test/test_support.exs +++ b/test/test_support.exs @@ -63,6 +63,18 @@ defmodule TestConnection do DBConnection.close!(pool, query, opts2 ++ unquote(opts)) end + def begin!(pool, opts2 \\ []) do + DBConnection.begin!(pool, opts2 ++ unquote(opts)) + end + + def rollback!(pool, opts2 \\ []) do + DBConnection.rollback!(pool, opts2 ++ unquote(opts)) + end + + def commit!(pool, opts2 \\ []) do + DBConnection.commit!(pool, opts2 ++ unquote(opts)) + end + defoverridable [start_link: 1] end end