From 7bae3139537c59943a27bfb83a60e87b64e357b8 Mon Sep 17 00:00:00 2001 From: James Fish Date: Sun, 2 Jul 2017 22:47:42 +0100 Subject: [PATCH 1/2] No longer raise if logging raises --- .../cases/prepare_stream_test.exs | 34 ------------------- integration_test/cases/stream_test.exs | 20 ++++++----- lib/db_connection.ex | 21 +++++++++++- 3 files changed, 32 insertions(+), 43 deletions(-) diff --git a/integration_test/cases/prepare_stream_test.exs b/integration_test/cases/prepare_stream_test.exs index 132c03a2..84590dcb 100644 --- a/integration_test/cases/prepare_stream_test.exs +++ b/integration_test/cases/prepare_stream_test.exs @@ -357,38 +357,4 @@ defmodule PrepareStreamTest do handle_close: [%Q{}, _, :state2], handle_commit: [_, :new_state2]] = A.record(agent) end - - test "prepare_stream declare log raises and deallocates" do - stack = [ - {:ok, :state}, - {:ok, :began, :new_state}, - {:ok, %Q{}, :newer_state}, - {:ok, %C{}, :newest_state}, - {:ok, :deallocated, :state2}, - {:ok, :commited, :new_state2} - ] - {:ok, agent} = A.start_link(stack) - - parent = self() - opts = [agent: agent, parent: parent] - {:ok, pool} = P.start_link(opts) - - Process.flag(:trap_exit, true) - assert P.transaction(pool, fn(conn) -> - opts2 = [log: fn(_) -> raise "oops" end] - stream = P.prepare_stream(conn, %Q{}, [:param], opts2) - assert_raise RuntimeError, "oops", fn() -> Enum.to_list(stream) end - :hi - end) == {:ok, :hi} - - assert [ - connect: [_], - handle_begin: [_, :state], - handle_prepare: [%Q{}, _, :new_state], - handle_declare: [%Q{}, [:param], _, :newer_state], - handle_deallocate: [%Q{}, %C{}, _, :newest_state], - handle_commit: [_, :state2] - ] = A.record(agent) - end - end diff --git a/integration_test/cases/stream_test.exs b/integration_test/cases/stream_test.exs index 54a23e49..0a9e1020 100644 --- a/integration_test/cases/stream_test.exs +++ b/integration_test/cases/stream_test.exs @@ -1,5 +1,6 @@ defmodule StreamTest do use ExUnit.Case, async: true + import ExUnit.CaptureLog alias TestPool, as: P alias TestAgent, as: A @@ -401,13 +402,14 @@ defmodule StreamTest do {:handle_declare, [%Q{}, [:param], _, :new_state]} | _] = A.record(agent) end - test "stream declare log raises and deallocates" do + test "stream declare log raises and continues" do stack = [ {:ok, :state}, {:ok, :began, :new_state}, {:ok, %C{}, :newer_state}, - {:ok, :deallocated, :newest_state}, - {:ok, :commited, :state2} + {:deallocate, %R{}, :newest_state}, + {:ok, :deallocated, :state2}, + {:ok, :commited, :new_state2} ] {:ok, agent} = A.start_link(stack) @@ -415,11 +417,12 @@ defmodule StreamTest do opts = [agent: agent, parent: parent] {:ok, pool} = P.start_link(opts) - Process.flag(:trap_exit, true) assert P.transaction(pool, fn(conn) -> - opts2 = [log: fn(_) -> raise "oops" end] + opts2 = [log: fn(_) -> raise "logging oops" end] stream = P.stream(conn, %Q{}, [:param], opts2) - assert_raise RuntimeError, "oops", fn() -> Enum.to_list(stream) end + assert capture_log(fn() -> + assert Enum.to_list(stream) == [%R{}] + end) =~ "logging oops" :hi end) == {:ok, :hi} @@ -427,8 +430,9 @@ defmodule StreamTest do connect: [_], handle_begin: [_, :state], handle_declare: [%Q{}, [:param], _, :new_state], - handle_deallocate: [%Q{}, %C{}, _, :newer_state], - handle_commit: [_, :newest_state] + handle_first: [%Q{}, %C{}, _, :newer_state], + handle_deallocate: [%Q{}, %C{}, _, :newest_state], + handle_commit: [_, :state2] ] = A.record(agent) end diff --git a/lib/db_connection.ex b/lib/db_connection.ex index d1e55ff3..0f943b52 100644 --- a/lib/db_connection.ex +++ b/lib/db_connection.ex @@ -72,6 +72,8 @@ defmodule DBConnection do `run/3` or `transaction/3` fun and using the run/transaction connection reference (`t`). """ + require Logger + defstruct [:pool_mod, :pool_ref, :conn_mod, :conn_ref] @typedoc """ @@ -1189,7 +1191,13 @@ defmodule DBConnection do defp log(call, query, params, log, times, result) do entry = DBConnection.LogEntry.new(call, query, params, times, entry_result(result)) - log(log, entry) + try do + log(log, entry) + catch + kind, reason -> + stack = System.stacktrace() + log_failed(entry, kind, reason, stack) + end log_result(result) end @@ -1208,6 +1216,17 @@ defmodule DBConnection do end defp log_result(other), do: other + defp log_failed(entry, kind, reason, stack) do + try do + Logger.error(fn() -> + "an exception was raised logging #{inspect entry}: " <> Exception.format(kind, reason, stack) + end) + catch + _, _ -> + :ok + end + end + defp run_begin(conn, fun, opts) do try do fun.(conn) From 44e9e89695b1aca86fffccf511f85ed695874cff Mon Sep 17 00:00:00 2001 From: James Fish Date: Mon, 3 Jul 2017 23:21:57 +0100 Subject: [PATCH 2/2] Refactor log/metrics handling --- .../cases/transaction_execute_test.exs | 2 +- lib/db_connection.ex | 861 ++++++++---------- lib/db_connection/log_entry.ex | 33 +- 3 files changed, 405 insertions(+), 491 deletions(-) diff --git a/integration_test/cases/transaction_execute_test.exs b/integration_test/cases/transaction_execute_test.exs index ef67fae2..69293d43 100644 --- a/integration_test/cases/transaction_execute_test.exs +++ b/integration_test/cases/transaction_execute_test.exs @@ -264,7 +264,7 @@ defmodule TransactionExecuteTest do end) == {:error, :oops} assert_raise DBConnection.ConnectionError, "transaction rolling back", - fn() -> P.execute(conn, %Q{}, [:param]) end + fn() -> P.execute!(conn, %Q{}, [:param]) end end) == {:error, :rollback} assert [ diff --git a/lib/db_connection.ex b/lib/db_connection.ex index 0f943b52..ebeb4f0a 100644 --- a/lib/db_connection.ex +++ b/lib/db_connection.ex @@ -528,13 +528,12 @@ defmodule DBConnection do @spec prepare(conn, query, opts :: Keyword.t) :: {:ok, query} | {:error, Exception.t} def prepare(conn, query, opts \\ []) do - query = parse(:prepare, query, nil, opts) - case run_prepare(conn, query, opts) do - {{:ok, query} = ok, meter} -> - log(:prepare, query, nil, meter, ok) - {error, meter} -> - log(:prepare, query, nil, meter, error) - end + meter = meter(opts) + result = + with {:ok, query, meter} <- parse(query, meter, opts) do + run(conn, &run_prepare/5, query, meter, opts) + end + log(result, :prepare, query, nil) end @doc """ @@ -582,13 +581,14 @@ defmodule DBConnection do {:ok, query, result} | {:error, Exception.t} def prepare_execute(conn, query, params, opts \\ []) do - query = parse(:prepare_execute, query, params, opts) - case run_prepare_execute(conn, query, params, opts) do - {{:ok, query, result}, meter} -> - decode(:prepare_execute, query, params, meter, result, opts) - {error, meter} -> - log(:prepare_execute, query, params, meter, error) - end + result = + with {:ok, query, meter} <- parse(query, meter(opts), opts), + {:ok, query, result, meter} + <- run(conn, &run_prepare_execute/6, query, params, meter, opts), + {:ok, result, meter} <- decode(query, result, meter, opts) do + {:ok, query, result, meter} + end + log(result, :prepare_execute, query, params) end @doc """ @@ -634,13 +634,13 @@ defmodule DBConnection do @spec execute(conn, query, params, opts :: Keyword.t) :: {:ok, result} | {:error, Exception.t} def execute(conn, query, params, opts \\ []) do - encoded = encode(:execute, query, params, opts) - case run_execute(conn, query, encoded, opts) do - {{:ok, query, result}, meter} -> - decode(:execute, query, params, meter, result, opts) - {error, meter} -> - log(:execute, query, params, meter, error) - end + result = + with {:ok, params, meter} <- encode(query, params, meter(opts), opts), + {:ok, result, meter} + <- run(conn, &run_execute/6, query, params, meter, opts) do + decode(query, result, meter, opts) + end + log(result, :execute, query, params) end @doc """ @@ -685,8 +685,9 @@ defmodule DBConnection do @spec close(conn, query, opts :: Keyword.t) :: {:ok, result} | {:error, Exception.t} def close(conn, query, opts \\ []) do - {result, meter} = run_close(conn, query, opts) - log(:close, query, nil, meter, result) + conn + |> run(&run_close/5, query, meter(opts), opts) + |> log(:close, query, nil) end @doc """ @@ -737,13 +738,26 @@ defmodule DBConnection do @spec run(conn, (t -> result), opts :: Keyword.t) :: result when result: var def run(conn, fun, opts \\ []) def run(%DBConnection{} = conn, fun, _) do - _ = fetch_info(conn) - fun.(conn) + case get_info(conn) do + {_, _} -> + fun.(conn) + :closed -> + raise DBConnection.ConnectionError, "connection is closed" + end end def run(pool, fun, opts) do - {conn, conn_state} = checkout(pool, opts) - put_info(conn, :idle, conn_state) - run_begin(conn, fun, opts) + case checkout(pool, nil, opts) do + {:ok, conn, _, _} -> + try do + fun.(conn) + after + checkin(conn, opts) + end + {:error, err, _} -> + raise err + {kind, reason, stack, _} -> + :erlang.raise(kind, reason, stack) + end end @doc """ @@ -789,16 +803,22 @@ defmodule DBConnection do """ @spec transaction(conn, (conn -> result), opts :: Keyword.t) :: {:ok, result} | {:error, reason :: any} when result: var - def transaction(conn, fun, opts \\ []) do - {result, log_info} = transaction_meter(conn, fun, opts) - transaction_log(log_info) - case result do - {:raise, err} -> - raise err - {kind, reason, stack} -> + def transaction(conn, fun, opts) do + case run(conn, &run_transaction/5, fun, meter(opts), opts) do + {:commit, {:ok, res, return, meter}} -> + log(meter, :transaction, :commit, nil, {:ok, res}) + {:ok, return} + {:rollback, {:ok, res, reason, meter}} -> + log(meter, :transaction, :rollback, nil, {:ok, res}) + {:error, reason} + {:rollback, {:ok, res, kind, reason, stack, meter}} -> + log(meter, :transaction, :rollback, nil, {:ok, res}) :erlang.raise(kind, reason, stack) - other -> - other + {query, {:error, err, meter}} when query != nil -> + log(meter, :transaction, query, nil, {:error, err}) + raise err + {query, return} -> + log(return, :transaction, query, nil) end end @@ -820,12 +840,8 @@ defmodule DBConnection do case get_info(conn) do {transaction, _} when transaction in [:transaction, :failed] -> throw({:rollback, conn_ref, err}) - {transaction, _, _} when transaction in [:transaction, :failed] -> - throw({:rollback, conn_ref, err}) {:idle, _} -> raise "not inside transaction" - {:idle, _, _} -> - raise "not inside transaction" :closed -> raise DBConnection.ConnectionError, "connection is closed" end @@ -906,37 +922,53 @@ defmodule DBConnection do def reduce(%DBConnection.PrepareStream{} = stream, acc, fun) do %DBConnection.PrepareStream{conn: conn, query: query, params: params, opts: opts} = stream - start = &prepare_declare(&1, query, params, &2) - resource(conn, start, &fetch/3, &deallocate/3, opts).(acc, fun) + declare = &prepare_declare(&1, query, params, &2) + resource(conn, declare, &fetch/3, &deallocate/3, opts).(acc, fun) end def reduce(%DBConnection.Stream{} = stream, acc, fun) do %DBConnection.Stream{conn: conn, query: query, params: params, opts: opts} = stream - start = &declare(&1, query, params, &2) - resource(conn, start, &fetch/3, &deallocate/3, opts).(acc, fun) + declare = &declare(&1, query, params, &2) + resource(conn, declare, &fetch/3, &deallocate/3, opts).(acc, fun) end ## Helpers - defp checkout(pool, opts) do + defp checkout(pool, meter, opts) do pool_mod = Keyword.get(opts, :pool, DBConnection.Connection) - case apply(pool_mod, :checkout, [pool, opts]) do + try do + apply(pool_mod, :checkout, [pool, opts]) + catch + kind, reason -> + stack = System.stacktrace() + {kind, reason, stack, meter} + else {:ok, pool_ref, conn_mod, conn_state} -> conn = %DBConnection{pool_mod: pool_mod, pool_ref: pool_ref, conn_mod: conn_mod, conn_ref: make_ref()} - {conn, conn_state} + put_info(conn, :idle, conn_state) + {:ok, conn, {:idle, conn_state}, meter} {:error, err} -> - raise err + {:error, err, meter} end end - defp checkin(conn, conn_state, opts) do - %DBConnection{pool_mod: pool_mod, pool_ref: pool_ref} = conn - _ = apply(pool_mod, :checkin, [pool_ref, conn_state, opts]) - :ok + defp checkin(conn, opts) do + case delete_info(conn) do + {:idle, conn_state} -> + %DBConnection{pool_mod: pool_mod, pool_ref: pool_ref} = conn + _ = apply(pool_mod, :checkin, [pool_ref, conn_state, opts]) + :ok + {trans, _} = conn_info when trans in [:transaction, :failed] -> + msg = "check in during transaction" + err = DBConnection.ConnectionError.exception(msg) + delete_disconnect(conn, conn_info, err, opts) + :closed -> + :ok + end end - defp delete_disconnect(conn, conn_state, err, opts) do + defp delete_disconnect(conn, {_, conn_state}, err, opts) do _ = delete_info(conn) %DBConnection{pool_mod: pool_mod, pool_ref: pool_ref} = conn args = [pool_ref, err, conn_state, opts] @@ -944,7 +976,7 @@ defmodule DBConnection do :ok end - defp delete_stop(conn, conn_state, kind, reason, stack, opts) do + defp delete_stop(conn, {_, conn_state}, kind, reason, stack, opts) do _ = delete_info(conn) msg = "client #{inspect self()} stopped: " <> Exception.format(kind, reason, stack) @@ -955,248 +987,234 @@ defmodule DBConnection do :ok end - defp handle(conn, fun, args, opts) do - case fetch_info(conn) do - {:failed, _} -> - raise DBConnection.ConnectionError, "transaction rolling back" - {status, conn_state} -> - handle(conn, status, conn_state, fun, args, opts) - end + defp handle(_conn, :closed, _fun, _args, meter, _opts) do + err = DBConnection.ConnectionError.exception("connection is closed") + {:error, err, meter} + end + defp handle(_conn, {:failed, _}, _fun, _args, meter, _opts) do + err = DBConnection.ConnectionError.exception("transaction rolling back") + {:error, err, meter} + end + defp handle(conn, {status, _} = conn_info, fun, args, meter, opts) do + handle(conn, conn_info, fun, args, meter, opts, status) end - defp handle(conn, status, conn_state, fun, args, opts) do + defp handle(conn, conn_info, fun, args, meter, opts, next) do %DBConnection{conn_mod: conn_mod} = conn + {status, conn_state} = conn_info try do apply(conn_mod, fun, args ++ [opts, conn_state]) else {:ok, result, conn_state} -> - put_info(conn, status, conn_state) - {:ok, result} - {:deallocate, _, conn_state} = deallocate + put_info(conn, next, conn_state) + {:ok, result, meter} + {:deallocate, result, conn_state} when fun in [:handle_first, :handle_next] -> + put_info(conn, next, conn_state) + {:deallocate, result, meter} + {:error, err, conn_state} when fun == :handle_begin -> put_info(conn, status, conn_state) - Tuple.delete_at(deallocate, 2) - {:error, _, conn_state} = error -> - put_info(conn, status, conn_state) - Tuple.delete_at(error, 2) + {:error, err, meter} + {:error, err, conn_state} when fun != :handle_begin -> + put_info(conn, next, conn_state) + {:error, err, meter} {:disconnect, err, conn_state} -> - delete_disconnect(conn, conn_state, err, opts) - {:error, err} + delete_disconnect(conn, {status, conn_state}, err, opts) + {:error, err, meter} other -> try do raise DBConnection.ConnectionError, "bad return value: #{inspect other}" catch :error, reason -> stack = System.stacktrace() - delete_stop(conn, conn_state, :error, reason, stack, opts) - {:error, reason, stack} + delete_stop(conn, conn_info, :error, reason, stack, opts) + {:error, reason, stack, meter} end catch kind, reason -> stack = System.stacktrace() - delete_stop(conn, conn_state, kind, reason, stack, opts) - {kind, reason, stack} + delete_stop(conn, conn_info, kind, reason, stack, opts) + {kind, reason, stack, meter} end end - defp parse(call, query, params, opts) do + defp parse(query, meter, opts) do try do DBConnection.Query.parse(query, opts) catch kind, reason -> - pre_log(call, query, params, opts, kind, reason, System.stacktrace()) + stack = System.stacktrace() + {kind, reason, stack, meter} + else + query -> + {:ok, query, meter} end end - defp encode(call, query, params, opts) do + defp describe(conn, query, meter, opts) do try do - DBConnection.Query.encode(query, params, opts) + DBConnection.Query.describe(query, opts) catch kind, reason -> stack = System.stacktrace() - pre_log(call, query, params, opts, kind, reason, stack) + raised_close(conn, query, meter, opts, kind, reason, stack) + else + query -> + {:ok, query, meter} end end - defp decode(call, query, params, meter, result, opts) do + defp encode(conn, query, params, meter, opts) do try do - DBConnection.Query.decode(query, result, opts) + DBConnection.Query.encode(query, params, opts) catch kind, reason -> - raised = {kind, reason, System.stacktrace()} - decode_log(call, query, params, meter, raised) + stack = System.stacktrace() + raised_close(conn, query, meter, opts, kind, reason, stack) else - result when call == :prepare_execute -> - ok = {:ok, query, result} - decode_log(call, query, params, meter, ok) - result when call in [:execute, :first, :next] -> - ok = {:ok, result} - decode_log(call, query, params, meter, ok) + params -> + {:ok, params, meter} end end - defp pre_log(call, query, params, opts, kind, reason, stack) do - case Keyword.get(opts, :log) do - nil -> :erlang.raise(kind, reason, stack) - log -> log(call, query, params, {log, []}, {kind, reason, stack}) - end - end - - defp run_prepare(conn, query, opts) do - run_meter(conn, fn(conn2) -> - case handle(conn2, :handle_prepare, [query], opts) do - {:ok, query} -> - describe(conn2, query, opts) - other -> - other - end - end, opts) - end - - defp describe(conn, query, opts) do + defp encode(query, params, meter, opts) do try do - DBConnection.Query.describe(query, opts) + DBConnection.Query.encode(query, params, opts) catch kind, reason -> - raised = {kind, reason, System.stacktrace()} - raised_close(conn, query, opts, raised) + stack = System.stacktrace() + {kind, reason, stack, meter} else - query -> - {:ok, query} + params -> + {:ok, params, meter} end end - defp run_prepare_execute(conn, query, params, opts) do - run_meter(conn, fn(conn2) -> - case handle(conn2, :handle_prepare, [query], opts) do - {:ok, query} -> - describe_run(conn2, :handle_execute, query, params, opts) - other -> - other - end - end, opts) - end - - defp describe_run(conn, fun, query, params, opts) do + defp decode(query, result, meter, opts) do + meter = event(meter, :decode) try do - query = DBConnection.Query.describe(query, opts) - [query, DBConnection.Query.encode(query, params, opts)] + DBConnection.Query.decode(query, result, opts) catch kind, reason -> - raised = {kind, reason, System.stacktrace()} - raised_close(conn, query, opts, raised) + stack = System.stacktrace() + {kind, reason, stack, meter} else - [query, _params] = args -> - case handle(conn, fun, args, opts) do - {:ok, result} -> - {:ok, query, result} - other -> - other - end + result -> + {:ok, result, meter} end end - defp raised_close(conn, query, opts, raised) do - case handle_close(conn, query, opts) do - {:ok, _} -> - raised - {:error, _} -> - raised - {_kind, _reason, _stack} = raised -> - raised + defp run_prepare(conn, conn_info, query, meter, opts) do + with {:ok, query, meter} <- prepare(conn, conn_info, query, meter, opts) do + describe(conn, query, meter, opts) end end - defp run_execute(conn, query, params, opts) do - run_meter(conn, fn(conn2) -> - case handle(conn2, :handle_execute, [query, params], opts) do - {:ok, result} -> - {:ok, query, result} - other -> - other - end - end, opts) + defp prepare(conn, conn_info, query, meter, opts) do + meter = event(meter, :prepare) + handle(conn, conn_info, :handle_prepare, [query], meter, opts) end - defp run_close(conn, query, opts) do - run_meter(conn, &handle_close(&1, query, opts), opts) + defp run_prepare_execute(conn, conn_info, query, params, meter, opts) do + with {:ok, query, meter} <- prepare(conn, conn_info, query, meter, opts), + {:ok, query, meter} <- describe(conn, query, meter, opts), + {:ok, params, meter} <- encode(conn, query, params, meter, opts), + {:ok, result, meter} <- execute(conn, query, params, meter, opts) do + {:ok, query, result, meter} + end end - defp handle_close(conn, query, opts) do - {status, conn_state} = fetch_info(conn) - handle(conn, status, conn_state, :handle_close, [query], opts) + defp execute(conn, query, params, meter, opts) do + run_execute(conn, get_info(conn), query, params, meter, opts) end - defmacrop time() do - if function_exported?(:erlang, :monotonic_time, 0) do - quote do: :erlang.monotonic_time() - else - quote do: :os.timestamp() - end + defp run_execute(conn, conn_info, query, params, meter, opts) do + meter = event(meter, :execute) + handle(conn, conn_info, :handle_execute, [query, params], meter, opts) end - defp run_meter(%DBConnection{} = conn, fun, opts) do - case Keyword.get(opts, :log) do - nil -> - {run(conn, fun, opts), nil} - log -> - run_meter(conn, log, [], fun, opts) - end - end - defp run_meter(pool, fun, opts) do - case Keyword.get(opts, :log) do - nil -> - {run(pool, fun, opts), nil} - log -> - run_meter(pool, log, [checkout: time()], fun, opts) + defp raised_close(conn, query, meter, opts, kind, reason, stack) do + conn_info = get_info(conn) + with {:ok, _, meter} <- run_close(conn, conn_info, query, meter, opts) do + {kind, reason, stack, meter} end end - defp run_meter(conn, log, times, fun, opts) do - fun = fn(conn2) -> - start = time() - result = fun.(conn2) - stop = time() - meter = {log, [stop: stop, start: start] ++ times} - {result, meter} - end - run(conn, fun, opts) + defp run_close(conn, conn_info, query, meter, opts) do + meter = event(meter, :close) + handle_cleanup(conn, conn_info, :handle_close, [query], meter, opts) end - defp decode_log(_, _, _, nil, result), do: log_result(result) - defp decode_log(call, query, params, {log, times}, result) do - log(call, query, params, log, [decode: time()] ++ times, result) + defp handle_cleanup(_conn, :closed, _callback, _args, meter, _opts) do + err = DBConnection.ConnectionError.exception("connection is closed") + {:error, err, meter} end - - defp transaction_log(nil), do: :ok - defp transaction_log({log, times, callback, result}) do - call = transaction_call(callback) - result = transaction_result(result) - _ = log(:transaction, call, nil, log, times, result) - :ok + defp handle_cleanup(conn, conn_info, callback, args, meter, opts) do + {status, _} = conn_info + handle(conn, conn_info, callback, args, meter, opts, status) end - defp transaction_call(:handle_begin), do: :begin - defp transaction_call(:handle_commit), do: :commit - defp transaction_call(:handle_rollback), do: :rollback + defp run(%DBConnection{} = conn, fun, arg, meter, opts) do + fun.(conn, get_info(conn), arg, meter, opts) + end + defp run(pool, fun, arg, meter, opts) do + meter = event(meter, :checkout) + with {:ok, conn, conn_info, meter} <- checkout(pool, meter, opts) do + try do + fun.(conn, conn_info, arg, meter, opts) + after + checkin(conn, opts) + end + end + end - defp transaction_result({:ok, _} = ok), do: ok - defp transaction_result({:raise, err}), do: {:error, err} - defp transaction_result({_kind, _reason, _stack} = raised), do: raised + defp run(%DBConnection{} = conn, fun, arg1, arg2, meter, opts) do + fun.(conn, get_info(conn), arg1, arg2, meter, opts) + end + defp run(pool, fun, arg1, arg2, meter, opts) do + meter = event(meter, :checkout) + with {:ok, conn, conn_info, meter} <- checkout(pool, meter, opts) do + try do + fun.(conn, conn_info, arg1, arg2, meter, opts) + after + checkin(conn, opts) + end + end + end - defp log(_, _, _, nil, result), do: log_result(result) - defp log(call, query, params, {log, times}, result) do - log(call, query, params, log, times, result) + defp meter(opts) do + case Keyword.get(opts, :log) do + nil -> nil + log -> {log, []} + end end - defp log(call, query, params, log, times, result) do + defp event(nil, _), + do: nil + defp event({log, events}, event), + do: {log, [{event, :erlang.monotonic_time()} | events]} + + defp log({:ok, res, meter}, call, query, params), + do: log(meter, call, query, params, {:ok, res}) + defp log({:ok, res1, res2, meter}, call, query, params), + do: log(meter, call, query, params, {:ok, res1, res2}) + defp log({:deallocate, res, meter}, call, query, params), + do: log(meter, call, query, params, {:deallocate, res}) + defp log({:error, err, meter}, call, query, params), + do: log(meter, call, query, params, {:error, err}) + defp log({kind, reason, stack, meter}, call, query, params), + do: log(meter, call, query, params, {kind, reason, stack}) + + defp log(nil, _, _, _, result), + do: log_result(result) + defp log({log, times}, call, query, params, result) do entry = DBConnection.LogEntry.new(call, query, params, times, entry_result(result)) try do log(log, entry) catch kind, reason -> stack = System.stacktrace() - log_failed(entry, kind, reason, stack) + log_raised(entry, kind, reason, stack) end log_result(result) end @@ -1206,6 +1224,8 @@ defmodule DBConnection do msg = "an exception was raised: " <> Exception.format(kind, reason, stack) {:error, %DBConnection.ConnectionError{message: msg}} end + defp entry_result({:deallocate, res}), + do: {:ok, res} defp entry_result(other), do: other defp log({mod, fun, args}, entry), do: apply(mod, fun, [entry | args]) @@ -1216,286 +1236,192 @@ defmodule DBConnection do end defp log_result(other), do: other - defp log_failed(entry, kind, reason, stack) do - try do - Logger.error(fn() -> - "an exception was raised logging #{inspect entry}: " <> Exception.format(kind, reason, stack) - end) - catch - _, _ -> - :ok - end + defp log_raised(entry, kind, reason, stack) do + Logger.error(fn() -> + "an exception was raised logging #{inspect entry}: " <> Exception.format(kind, reason, stack) + end) + catch + _, _ -> + :ok end - defp run_begin(conn, fun, opts) do + defp run_transaction(conn, {:transaction, _}, fun, _meter, _opts) do + %DBConnection{conn_ref: conn_ref} = conn try do fun.(conn) - after - run_end(conn, opts) - end - end - - defp run_end(conn, opts) do - case delete_info(conn) do - {:idle, conn_state} -> - checkin(conn, conn_state, opts) - {status, conn_state} when status in [:transaction, :failed] -> - try do - raise "connection run ended in transaction" - catch - :error, reason -> - stack = System.stacktrace() - delete_stop(conn, conn_state, :error, reason, stack, opts) - :erlang.raise(:error, reason, stack) - end - :closed -> - :ok - end - end - - defp transaction_meter(%DBConnection{} = conn, fun, opts) do - case fetch_info(conn) do - {trans, _} when trans in [:transaction, :failed] -> - {transaction_nested(conn, fun), nil} - {:idle, conn_state} -> - log = Keyword.get(opts, :log) - begin_meter(conn, conn_state, log, [], fun, opts) - end - end - defp transaction_meter(pool, fun, opts) do - case Keyword.get(opts, :log) do - nil -> - run(pool, &begin(&1, nil, [], fun, opts), opts) - log -> - times = [checkout: time()] - run(pool, &begin(&1, log, times, fun, opts), opts) - end - end - - defp begin(conn, log, times, fun, opts) do - {:idle, conn_state} = get_info(conn) - begin_meter(conn, conn_state, log, times, fun, opts) - end - - defp begin_meter(conn, conn_state, nil, [], fun, opts) do - case handle(conn, conn_state, :handle_begin, opts, :transaction) do - {:ok, _} -> - transaction_run(conn, nil, fun, opts) - error -> - {error, nil} - end - end - defp begin_meter(conn, conn_state, log, times, fun, opts) do - start = time() - result = handle(conn, conn_state, :handle_begin, opts, :transaction) - stop = time() - log_info = {log, [stop: stop, start: start] ++ times, :handle_begin, result} - case result do - {:ok, _} -> - fun = fn(conn2) -> - transaction_log(log_info) - fun.(conn2) - end - transaction_run(conn, log, fun, opts) - error -> - {error, log_info} + catch + :throw, {:rollback, ^conn_ref, reason} -> + fail(conn) + {nil, {:error, reason, nil}} + kind, reason -> + stack = System.stacktrace() + fail(conn) + {nil, {kind, reason, stack, nil}} + else + result -> + {nil, conclude(conn, result)} end end - - defp transaction_run(conn, log, fun, opts) do + defp run_transaction(conn, {:failed, _}, fun, _meter, _opts) do %DBConnection{conn_ref: conn_ref} = conn try do fun.(conn) - else - result -> - result = {:ok, result} - commit(conn, log, opts, result) catch :throw, {:rollback, ^conn_ref, reason} -> - result = {:error, reason} - rollback(conn, log, opts, result) + {nil, {:error, reason, nil}} kind, reason -> - result = {kind, reason, System.stacktrace()} - rollback(conn, log, opts, result) + stack = System.stacktrace() + {nil, {kind, reason, stack, nil}} + else + _result -> + {nil, {:error, :rollback, nil}} + end + end + defp run_transaction(conn, conn_info, fun, meter, opts) do + case run_begin(conn, conn_info, meter, opts) do + {:ok, res, meter} -> + %DBConnection{conn_ref: conn_ref} = conn + try do + log(meter, :transaction, :begin, nil, {:ok, res}) + fun.(conn) + catch + :throw, {:rollback, ^conn_ref, reason} -> + run_rollback(conn, get_info(conn), reason, meter(opts), opts) + kind, reason -> + stack = System.stacktrace() + raised_rollback(conn, opts, kind, reason, stack) + else + result -> + run_commit(conn, get_info(conn), result, meter(opts), opts) + end + {:error, err, meter} -> + {:begin, {:error, err, meter}} + {kind, reason, stack, meter} -> + {:begin, {kind, reason, stack, meter}} end end - defp commit(conn, log, opts, result) do + defp fail(conn) do case get_info(conn) do {:transaction, conn_state} -> - conclude_meter(conn, conn_state, log, :handle_commit, opts, result) - {:failed, conn_state} -> - result = {:error, :rollback} - conclude_meter(conn, conn_state, log, :handle_rollback, opts, result) - :closed -> - {{:error, :rollback}, nil} + put_info(conn, :failed, conn_state) + _ -> + :ok end end - defp rollback(conn, log, opts, result) do + defp conclude(conn, result) do case get_info(conn) do - {trans, conn_state} when trans in [:transaction, :failed] -> - conclude_meter(conn, conn_state, log, :handle_rollback, opts, result) - :closed -> - {result, nil} + {:transaction, _} -> + {:ok, result, nil} + _ -> + {:error, :rollback, nil} end end - defp conclude_meter(conn, conn_state, nil, callback, opts, result) do - case handle(conn, conn_state, callback, opts, :idle) do - {:ok, _} -> - {result, nil} - error -> - {error, nil} - end + defp run_begin(conn, conn_info, meter, opts) do + meter = event(meter, :begin) + handle_trans(conn, conn_info, :handle_begin, meter, opts, :transaction) + end + + defp run_rollback(_conn, :closed, reason, _meter, _opts) do + {nil, {:error, reason, nil}} end - defp conclude_meter(conn, conn_state, log, callback, opts, result) do - start = time() - cb_result = handle(conn, conn_state, callback, opts, :idle) - stop = time() - times = [stop: stop, start: start] - case cb_result do - {:ok, _} -> - {result, {log, times, callback, cb_result}} - _error -> - {cb_result, {log, times, callback, cb_result}} + defp run_rollback(conn, conn_info, reason, meter, opts) do + meter = event(meter, :rollback) + case handle_trans(conn, conn_info, :handle_rollback, meter, opts, :idle) do + {:ok, res, meter} -> + {:rollback, {:ok, res, reason, meter}} + return -> + {:rollback, return} end end - defp handle(conn, conn_state, callback, opts, status) do - %DBConnection{conn_mod: conn_mod} = conn - try do - apply(conn_mod, callback, [opts, conn_state]) - else - {:ok, result, conn_state} -> - put_info(conn, status, conn_state) - {:ok, result} - {:error, err, conn_state} -> - put_info(conn, :idle, conn_state) - {:raise, err} - {:disconnect, err, conn_state} -> - delete_disconnect(conn, conn_state, err, opts) - {:raise, err} + defp raised_rollback(conn, opts, kind, reason, stack) do + case run_rollback(conn, get_info(conn), :rollback, meter(opts), opts) do + {:rollback, {:ok, res, _, meter}} -> + {:rollback, {:ok, res, kind, reason, stack, meter}} other -> - try do - raise DBConnection.ConnectionError, "bad return value: #{inspect other}" - catch - :error, reason -> - stack = System.stacktrace() - delete_stop(conn, conn_state, :error, reason, stack, opts) - {:error, reason, stack} - end - catch - kind, reason -> - stack = System.stacktrace() - delete_stop(conn, conn_state, kind, reason, stack, opts) - {kind, reason, stack} + other end end - defp transaction_nested(conn, fun) do - %DBConnection{conn_ref: conn_ref} = conn - try do - fun.(conn) - else - result -> - transaction_ok(conn, {:ok, result}) - catch - :throw, {:rollback, ^conn_ref, reason} -> - transaction_failed(conn) - {:error, reason} - kind, reason -> - stack = System.stacktrace() - transaction_failed(conn) - :erlang.raise(kind, reason, stack) + defp run_commit(conn, {:transaction, _} = conn_info, return, meter, opts) do + meter = event(meter, :commit) + case handle_trans(conn, conn_info, :handle_commit, meter, opts, :idle) do + {:ok, res, meter} -> + {:commit, {:ok, res, return, meter}} + return -> + {:commit, return} end end - - defp transaction_ok(conn, result) do - case get_info(conn) do - {:failed, _} -> - {:error, :rollback} - _ -> - result - end + defp run_commit(conn, conn_info, _return, meter, opts) do + run_rollback(conn, conn_info, :rollback, meter, opts) end - defp transaction_failed(conn) do - case get_info(conn) do - {:transaction, conn_state} -> - put_info(conn, :failed, conn_state) - _ -> - :ok - end + defp handle_trans(_conn, :closed, _callback, meter, _opts, _status) do + err = DBConnection.ConnectionError.exception("connection is closed") + {:error, err, meter} + end + defp handle_trans(_conn, {status, _}, _callback, meter, _opts, :transaction) + when status in [:transaction, :failed] do + err = DBConnection.ConnectionError.exception("inside transaction") + {:error, err, meter} + end + defp handle_trans(_conn, {:idle, _}, _callback, meter, _opts, :idle) do + err = DBConnection.ConnectionError.exception("outside transaction") + {:error, err, meter} + end + defp handle_trans(conn, conn_info, callback, meter, opts, status) do + handle(conn, conn_info, callback, [], meter, opts, status) end defp prepare_declare(conn, query, params, opts) do - query = parse(:prepare_declare, query, params, opts) - case run_prepare_declare(conn, query, params, opts) do - {{:ok, query, cursor}, meter} -> - prepare_declare_log(conn, query, params, meter, cursor, opts) - {error, meter} -> - {:error, err} = log(:prepare_declare, query, params, meter, error) + result = + with {:ok, query, meter} <- parse(query, meter(opts), opts) do + run(conn, &run_prepare_declare/6, query, params, meter, opts) + end + case log(result, :prepare_declare, query, params) do + {:ok, query, cursor} -> + {:first, query, cursor} + {:error, err} -> raise err end end - defp run_prepare_declare(conn, query, params, opts) do - run_meter(conn, fn(conn2) -> - case handle(conn2, :handle_prepare, [query], opts) do - {:ok, query} -> - describe_run(conn2, :handle_declare, query, params, opts) - other -> - other - end - end, opts) - end - - defp prepare_declare_log(conn, query, params, meter, cursor, opts) do - try do - log(:prepare_declare, query, params, meter, {:ok, query, cursor}) - catch - kind, reason -> - stack = System.stacktrace() - deallocate(conn, query, cursor, opts) - :erlang.raise(kind, reason, stack) - else - {:ok, query, cursor} -> - {:first, query, cursor} + defp run_prepare_declare(conn, conn_info, query, params, meter, opts) do + with {:ok, query, meter} <- prepare(conn, conn_info, query, meter, opts), + {:ok, query, meter} <- describe(conn, query, meter, opts), + {:ok, params, meter} <- encode(conn, query, params, meter, opts), + {:ok, cursor, meter} <- declare(conn, query, params, meter, opts) do + {:ok, query, cursor, meter} end end defp declare(conn, query, params, opts) do - encoded = encode(:declare, query, params, opts) - case run_declare(conn, query, encoded, opts) do - {{:ok, cursor}, meter} -> - declare_log(conn, query, params, meter, cursor, opts) - {error, meter} -> - {:error, err} = log(:declare, query, params, meter, error) + result = + with {:ok, params, meter} <- encode(query, params, meter(opts), opts) do + run(conn, &run_declare/6, query, params, meter, opts) + end + case log(result, :declare, query, params) do + {:ok, cursor} -> + {:first, query, cursor} + {:error, err} -> raise err end end - defp run_declare(conn, query, params, opts) do - run_meter(conn, &handle(&1, :handle_declare, [query, params], opts), opts) + defp declare(conn, query, params, meter, opts) do + run_declare(conn, get_info(conn), query, params, meter, opts) end - defp declare_log(conn, query, params, meter, cursor, opts) do - try do - log(:declare, query, params, meter, {:ok, cursor}) - catch - kind, reason -> - stack = System.stacktrace() - deallocate(conn, query, cursor, opts) - :erlang.raise(kind, reason, stack) - else - {:ok, cursor} -> - {:first, query, cursor} - end + defp run_declare(conn, conn_info, query, params, meter, opts) do + meter = event(meter, :declare) + handle(conn, conn_info, :handle_declare, [query, params], meter, opts) end defp fetch(conn, {:first, query, cursor}, opts) do - fetch(conn, :handle_first, :first, query, cursor, opts) + fetch(conn, :handle_first, :first, query, cursor,opts) end defp fetch(conn, {:next, query, cursor}, opts) do fetch(conn, :handle_next, :next, query, cursor, opts) @@ -1504,45 +1430,47 @@ defmodule DBConnection do {:halt, state} end - def fetch(conn, fun, call, query, cursor, opts) do - fetch = &handle(&1, fun, [query, cursor], opts) - case run_meter(conn, fetch, opts) do - {{:ok, result}, meter} -> - fetch_decode(:next, call, query, cursor, meter, result, opts) - {{:deallocate, result}, meter} -> - fetch_decode(:deallocate, call, query, cursor, meter, result, opts) - {error, meter} -> - {:error, err} = log(call, query, cursor, meter, error) + defp fetch(conn, fun, call, query, cursor, opts) do + result = + conn + |> run(&run_fetch/6, fun, [query, cursor], meter(opts), opts) + |> log(call, query, cursor) + case result do + {:ok, result} -> + {[result], {:next, query, cursor}} + {:deallocate, result} -> + {[result], {:deallocate, query, cursor}} + {:error, err} -> raise err end end - defp fetch_decode(status, call, query, cursor, meter, result, opts) do - {:ok, decoded} = decode(call, query, cursor, meter, result, opts) - {[decoded], {status, query, cursor}} + defp run_fetch(conn, conn_info, fun, [query, _] = args, meter, opts) do + meter = event(meter, :fetch) + with {ok, result, meter} when ok in [:ok, :deallocate] + <- handle(conn, conn_info, fun, args, meter, opts), + {:ok, result, meter} <- decode(query, result, meter, opts) do + {ok, result, meter} + end end defp deallocate(conn, {_, query, cursor}, opts) do - deallocate(conn, query, cursor, opts) - end - - defp deallocate(conn, query, cursor, opts) do - case get_info(conn) do - {status, conn_state} -> - deallocate(conn, status, conn_state, query, cursor, opts) - :closed -> - :ok + result = + conn + |> run(&run_deallocate/6, query, cursor, meter(opts), opts) + |> log(:deallocate, query, cursor) + case result do + {:ok, _} -> :ok + {:error, err} -> raise err end end - defp deallocate(conn, status, conn_state, query, cursor, opts) do + defp run_deallocate(_conn, :closed, _query, _cursor, _meter, _opts), + do: {:ok, nil, nil} + defp run_deallocate(conn, conn_info, query, cursor, meter, opts) do + meter = event(meter, :deallocate) args = [query, cursor] - close = &handle(&1, status, conn_state, :handle_deallocate, args, opts) - {result, meter} = run_meter(conn, close, opts) - case log(:deallocate, query, cursor, meter, result) do - {:ok, _} -> :ok - {:error, err} -> raise err - end + handle_cleanup(conn, conn_info, :handle_deallocate, args, meter, opts) end defp resource(%DBConnection{} = conn, start, next, stop, opts) do @@ -1557,15 +1485,6 @@ defmodule DBConnection do :ok end - defp fetch_info(conn) do - case get_info(conn) do - {_, _} = info -> - info - :closed -> - raise DBConnection.ConnectionError, "connection is closed" - end - end - defp get_info(conn), do: Process.get(key(conn), :closed) defp delete_info(conn) do diff --git a/lib/db_connection/log_entry.ex b/lib/db_connection/log_entry.ex index a5356dec..f5cd7c04 100644 --- a/lib/db_connection/log_entry.ex +++ b/lib/db_connection/log_entry.ex @@ -15,19 +15,20 @@ defmodule DBConnection.LogEntry do * `:result` - The result of the call * `:pool_time` - The length of time awaiting a connection from the pool (if the connection was not already checked out) - * `:connection_time` - The length of time using the connection + * `:connection_time` - The length of time using the connection (if a + connection was used) * `:decode_time` - The length of time decoding the result (if decoded the result using `DBConnection.Query.decode/3`) All times are in the native time units of the VM, see - `System.monotonic_time/0`. Falls back to `:os.timestamp/0`. + `System.monotonic_time/0`. """ @type t :: %__MODULE__{call: atom, query: any, params: any, result: {:ok, any} | {:ok, any, any} | {:error, Exception.t}, pool_time: non_neg_integer | nil, - connection_time: non_neg_integer, + connection_time: non_neg_integer | nil, decode_time: non_neg_integer | nil} @doc false @@ -40,26 +41,20 @@ defmodule DBConnection.LogEntry do ## Helpers defp parse_times([], entry), do: entry - defp parse_times([first | times], entry) do - {_, entry} = Enum.reduce(times, {first, entry}, &parse_time/2) + defp parse_times(times, entry) do + stop = :erlang.monotonic_time() + {_, entry} = Enum.reduce(times, {stop, entry}, &parse_time/2) entry end - defmacrop diff(to, from) do - if function_exported?(:erlang, :monotonic_time, 0) do - quote do: unquote(to) - unquote(from) - else - quote do: max(:timer.now_diff(unquote(to), unquote(from)), 0) - end + defp parse_time({:decode, start}, {stop, entry}) do + {start, %{entry | decode_time: stop - start}} end - - defp parse_time({:stop, stop} = time, {{:decode, decode}, entry}) do - {time, %{entry | decode_time: diff(decode, stop)}} - end - defp parse_time({:start, start} = time, {{:stop, stop}, entry}) do - {time, %{entry | connection_time: diff(stop, start)}} + defp parse_time({:checkout, start}, {stop, entry}) do + {start, %{entry | pool_time: stop - start}} end - defp parse_time({:checkout, checkout} = time, {{:start, start}, entry}) do - {time, %{entry | pool_time: diff(start, checkout)}} + defp parse_time({_, start}, {stop, entry}) do + %{connection_time: connection_time} = entry + {start, %{entry | connection_time: (connection_time || 0) + (stop - start)}} end end