From 3c78af5e1145e25248b6554c4576e89c4e08746a Mon Sep 17 00:00:00 2001 From: James Fish Date: Tue, 11 Jul 2017 15:50:16 +0100 Subject: [PATCH] Introduce declare/fetch/deallocate * Add declare(!)/3,4 * Add fetch(!)/3,4 * Add deallocate(!)/3,4 Explicitly defining handle_first/4 and handle_next/4 is deprecated because callback implementations that are required to differentiate also need to track cursors in their state. A single fetch/4 is cleaner to use and these callbacks forward to handle_fetch/4 on `use DBConnection`. For first/next/fetch the return value is `{:cont | :halt, result, state}` where `:cont` is continue (same as :ok`) and `:halt` means both that the cursor has finished enumerating results and the cursor is deallocated. Therefore `:halt` does not require a `deallocate` call. This API is chosen to avoid an extra roundtrip that current adapters do. handle_first/4 and handle_next/4 still support `:ok` and `:deallocate` tuples. A callback implementation may need to deallocate if the transaction ends without a cursor being deallocated (via `fetch/4` or `deallocate/4`). --- integration_test/cases/cursor_test.exs | 173 ++++++++ .../cases/prepare_stream_test.exs | 2 +- integration_test/cases/stream_test.exs | 15 +- integration_test/tests.exs | 1 + lib/db_connection.ex | 399 +++++++++++++----- test/test_support.exs | 28 ++ 6 files changed, 510 insertions(+), 108 deletions(-) create mode 100644 integration_test/cases/cursor_test.exs diff --git a/integration_test/cases/cursor_test.exs b/integration_test/cases/cursor_test.exs new file mode 100644 index 00000000..17959237 --- /dev/null +++ b/integration_test/cases/cursor_test.exs @@ -0,0 +1,173 @@ +defmodule CursorTest do + use ExUnit.Case, async: true + + alias TestPool, as: P + alias TestAgent, as: A + alias TestQuery, as: Q + alias TestCursor, as: C + alias TestResult, as: R + + test "declare/fetch/deallocate return result" do + stack = [ + {:ok, :state}, + {:ok, %C{}, :new_state}, + {:ok, %C{}, :newer_state}, + {:cont, %R{}, :newest_state}, + {:halt, %R{}, :state2}, + {:ok, :deallocated, :new_state2}, + {:ok, :deallocated, :newer_state2} + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self()] + {:ok, pool} = P.start_link(opts) + assert P.declare(pool, %Q{}, [:param]) == {:ok, %C{}} + assert P.declare!(pool, %Q{}, [:param], [key: :value]) == %C{} + + assert P.fetch(pool, %Q{}, %C{}) == {:cont, %R{}} + assert P.fetch!(pool, %Q{}, %C{}, [key: :value]) == {:halt, %R{}} + + assert P.deallocate(pool, %Q{}, %C{}) == {:ok, :deallocated} + assert P.deallocate!(pool, %Q{}, %C{}, [key: :value]) == :deallocated + + assert [ + connect: [_], + handle_declare: [%Q{}, [:param], _, :state], + handle_declare: [%Q{}, [:param], [{:key, :value} | _], :new_state], + handle_fetch: [%Q{}, %C{}, _, :newer_state], + handle_fetch: [%Q{}, %C{}, [{:key, :value} | _], :newest_state], + handle_deallocate: [%Q{}, %C{}, _, :state2], + handle_deallocate: [%Q{}, %C{}, [{:key, :value} | _], :new_state2] + ] = A.record(agent) + end + + test "declare encodes params" do + stack = [ + {:ok, :state}, + {:ok, %C{}, :new_state}, + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self()] + {:ok, pool} = P.start_link(opts) + + opts2 = [encode: fn([:param]) -> :encoded end] + assert P.declare(pool, %Q{}, [:param], opts2) == {:ok, %C{}} + + assert [ + connect: [_], + handle_declare: [%Q{}, :encoded, _, :state]] = A.record(agent) + end + + test "fetch decodes result" do + stack = [ + {:ok, :state}, + {:cont, %R{}, :new_state}, + {:halt, %R{}, :newer_state} + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self()] + {:ok, pool} = P.start_link(opts) + + opts2 = [decode: fn(%R{}) -> :decoded end] + assert P.fetch(pool, %Q{}, %C{}, opts2) == {:cont, :decoded} + assert P.fetch(pool, %Q{}, %C{}, opts2) == {:halt, :decoded} + + assert [ + connect: [_], + handle_fetch: [%Q{}, %C{}, _, :state], + handle_fetch: [%Q{}, %C{}, _, :new_state] + ] = A.record(agent) + end + + test "declare/fetch/deallocate logs result" do + stack = [ + {:ok, :state}, + {:ok, %C{}, :new_state}, + {:cont, %R{}, :newer_state}, + {:halt, %R{}, :newest_state}, + {:ok, :deallocated, :state2} + ] + {:ok, agent} = A.start_link(stack) + + parent = self() + opts = [agent: agent, parent: parent] + {:ok, pool} = P.start_link(opts) + + log = &send(parent, &1) + assert P.declare(pool, %Q{}, [:param], [log: log]) == {:ok, %C{}} + + assert_receive %DBConnection.LogEntry{call: :declare, query: %Q{}, + params: [:param], result: {:ok, %C{}}} = entry + assert is_integer(entry.pool_time) + assert entry.pool_time >= 0 + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_nil(entry.decode_time) + + assert P.fetch(pool, %Q{}, %C{}, [log: log]) == {:cont, %R{}} + + assert_receive %DBConnection.LogEntry{call: :fetch, query: %Q{}, + params: %C{}, result: {:ok, %R{}}} = entry + assert is_integer(entry.pool_time) + assert entry.pool_time >= 0 + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_integer(entry.decode_time) + assert entry.decode_time >= 0 + + assert P.fetch(pool, %Q{}, %C{}, [log: log]) == {:halt, %R{}} + + assert_receive %DBConnection.LogEntry{call: :fetch, query: %Q{}, + params: %C{}, result: {:ok, %R{}}} = entry + assert is_integer(entry.pool_time) + assert entry.pool_time >= 0 + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_integer(entry.decode_time) + assert entry.decode_time >= 0 + + assert P.deallocate(pool, %Q{}, %C{}, [log: log]) == {:ok, :deallocated} + + assert_receive %DBConnection.LogEntry{call: :deallocate, query: %Q{}, + params: %C{}, result: {:ok, :deallocated}} = entry + assert is_integer(entry.pool_time) + assert entry.pool_time >= 0 + assert is_integer(entry.connection_time) + assert entry.connection_time >= 0 + assert is_nil(entry.decode_time) + + assert [ + connect: [_], + handle_declare: [%Q{}, [:param], _, :state], + handle_fetch: [%Q{}, %C{}, _, :new_state], + handle_fetch: [%Q{}, %C{}, _, :newer_state], + handle_deallocate: [%Q{}, %C{}, _, :newest_state] + ] = A.record(agent) + end + + test "declare/fetch/deallocate error returns error" do + err = RuntimeError.exception("oops") + stack = [ + {:ok, :state}, + {:error, err, :new_state}, + {:error, err, :newer_state}, + {:error, err, :newesr_state} + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self()] + {:ok, pool} = P.start_link(opts) + assert P.declare(pool, %Q{}, [:param]) == {:error, err} + assert P.fetch(pool, %Q{}, %C{}) == {:error, err} + assert P.deallocate(pool, %Q{}, %C{}) == {:error, err} + + assert [ + connect: [_], + handle_declare: [%Q{}, [:param], _, :state], + handle_fetch: [%Q{}, %C{}, _, :new_state], + handle_deallocate: [%Q{}, %C{}, _, :newer_state] + ] = A.record(agent) + end +end diff --git a/integration_test/cases/prepare_stream_test.exs b/integration_test/cases/prepare_stream_test.exs index 84590dcb..d80c7e39 100644 --- a/integration_test/cases/prepare_stream_test.exs +++ b/integration_test/cases/prepare_stream_test.exs @@ -105,7 +105,7 @@ defmodule PrepareStreamTest do assert entry.connection_time >= 0 assert is_nil(entry.decode_time) - assert_received %DBConnection.LogEntry{call: :first} = entry + assert_received %DBConnection.LogEntry{call: :fetch} = entry assert %{query: %Q{}, params: %C{}, result: {:ok, %R{}}} = entry assert is_nil(entry.pool_time) assert is_integer(entry.connection_time) diff --git a/integration_test/cases/stream_test.exs b/integration_test/cases/stream_test.exs index 0a9e1020..5ab952fb 100644 --- a/integration_test/cases/stream_test.exs +++ b/integration_test/cases/stream_test.exs @@ -99,7 +99,7 @@ defmodule StreamTest do assert entry.connection_time >= 0 assert is_nil(entry.decode_time) - assert_received %DBConnection.LogEntry{call: :first} = entry + assert_received %DBConnection.LogEntry{call: :fetch} = entry assert %{query: %Q{}, params: %C{}, result: {:ok, %R{}}} = entry assert is_nil(entry.pool_time) assert is_integer(entry.connection_time) @@ -221,14 +221,19 @@ defmodule StreamTest do assert_received %DBConnection.LogEntry{call: :declare} - assert_received %DBConnection.LogEntry{call: :first} = entry + assert_received %DBConnection.LogEntry{call: :fetch} = entry assert %{query: %Q{}, params: %C{}, result: {:error, ^err}} = entry assert is_nil(entry.pool_time) assert is_integer(entry.connection_time) assert entry.connection_time >= 0 assert is_nil(entry.decode_time) - refute_received %DBConnection.LogEntry{call: :deallocate} + assert_received %DBConnection.LogEntry{call: :deallocate} = entry + closed = DBConnection.ConnectionError.exception("connection is closed") + assert %{query: %Q{}, params: %C{}, result: {:error, ^closed}} = entry + assert is_nil(entry.pool_time) + assert is_nil(entry.connection_time) + assert is_nil(entry.decode_time) assert_receive :reconnected @@ -297,12 +302,12 @@ defmodule StreamTest do assert P.transaction(pool, fn(conn) -> stream = P.stream(conn, %Q{}, [:param], [log: &send(parent, &1)]) - assert_raise RuntimeError, "oops", fn() -> Enum.take(stream, 1) end + assert Enum.take(stream, 1) == [%R{}] :hi end) == {:error, :rollback} assert_received %DBConnection.LogEntry{call: :declare} - assert_received %DBConnection.LogEntry{call: :first} + assert_received %DBConnection.LogEntry{call: :fetch} assert_received %DBConnection.LogEntry{call: :deallocate} = entry assert %{query: %Q{}, params: %C{}, result: {:error, ^err}} = entry diff --git a/integration_test/tests.exs b/integration_test/tests.exs index f8ed4c1a..04b9e6dc 100644 --- a/integration_test/tests.exs +++ b/integration_test/tests.exs @@ -2,6 +2,7 @@ Code.require_file "cases/after_connect_test.exs", __DIR__ Code.require_file "cases/backoff_test.exs", __DIR__ Code.require_file "cases/client_test.exs", __DIR__ Code.require_file "cases/close_test.exs", __DIR__ +Code.require_file "cases/cursor_test.exs", __DIR__ Code.require_file "cases/execute_test.exs", __DIR__ Code.require_file "cases/idle_test.exs", __DIR__ Code.require_file "cases/overflow_test.exs", __DIR__ diff --git a/lib/db_connection.ex b/lib/db_connection.ex index ce74b197..d68b02e6 100644 --- a/lib/db_connection.ex +++ b/lib/db_connection.ex @@ -262,30 +262,59 @@ defmodule DBConnection do {:ok, cursor, new_state :: any} | {:error | :disconnect, Exception.t, new_state :: any} + @doc """ + Fetch the next result from a cursor declared by `handle_declare/4`. Return + `{:cont, result, state}` to return the result `result` and continue using + cursor, `{:halt, result, state}` to return the result `result` and close the + cursor, `{:error, exception, state}` to return an error and close the + cursor, `{:disconnect, exception, state}` to return an error and disconnect. + + This callback is called in the client process. + """ + @callback handle_fetch(query, cursor, opts :: Keyword.t, state :: any) :: + {:cont | :halt, result, new_state :: any} | + {:error | :disconnect, Exception.t, new_state :: any} + @doc """ Fetch the first result from a cursor declared by `handle_declare/4`. Return - `{:ok, result, state}` to return the result `result` and continue, - `{:deallocate, result, state}` to return the result `result` and deallocate, - `{:error, exception, state}` to return an error and close the cursor, - `{:disconnect, exception, state}` to return an error and disconnect. + `{:cont | :ok, result, state}` to return the result `result` and continue + using cursor, `{:halt, result, state}` to return the result `result` and close + the cursor, `{:deallocate, result, state}` to return the result `result` and + require cursor to be deallocated, `{:error, exception, state}` to return an + error and close the cursor, `{:disconnect, exception, state}` to return an + error and disconnect. + + This callback is called when fetching the first result using `stream/4` and + `prepare_stream/4`. `use DBConnection` will add a default implementation for + this callback to call `hande_fetch/4`. Explicitly defining this callback is + deprecated but it is still called for backwards compatibility. `fetch/4` will + only use `handle_fetch/4`. This callback is called in the client process. """ @callback handle_first(query, cursor, opts :: Keyword.t, state :: any) :: - {:ok | :deallocate, result, new_state :: any} | + {:cont | :ok | :halt | :deallocate, result, new_state :: any} | {:error | :disconnect, Exception.t, new_state :: any} @doc """ Fetch the next result from a cursor declared by `handle_declare/4`. Return - `{:ok, result, state}` to return the result `result` and continue, - `{:deallocate, result, state}` to return the result `result` and deallocate, - `{:error, exception, state}` to return an error and close the cursor, - `{:disconnect, exception, state}` to return an error and disconnect. + `{:cont | :ok, result, state}` to return the result `result` and continue + using cursor, `{:halt, result, state}` to return the result `result` and close + the cursor, `{:deallocate, result, state}` to return the result `result` and + require cursor to be deallocated, `{:error, exception, state}` to return an + error and close the cursor, `{:disconnect, exception, state}` to return an + error and disconnect. + + This callback is called when fetching the first result using `stream/4` and + `prepare_stream/4`. `use DBConnection` will add a default implementation for + this callback to call `hande_fetch/4`. Explicitly defining this callback is + deprecated but it is still called for backwards compatibility. `fetch/4` will + only use `handle_fetch/4`. This callback is called in the client process. """ @callback handle_next(query, cursor, opts :: Keyword.t, state :: any) :: - {:ok | :deallocate, result, new_state :: any} | + {:cont | :ok | :halt | :deallocate, result, new_state :: any} | {:error | :disconnect, Exception.t, new_state :: any} @doc """ @@ -427,20 +456,20 @@ defmodule DBConnection do end end - def handle_first(_, _, _, state) do - message = "handle_first/4 not implemented" + def handle_fetch(_, _, _, state) do + message = "handle_fetch/4 not implemented" case :erlang.phash2(1, 1) do 0 -> raise message 1 -> {:error, RuntimeError.exception(message), state} end end - def handle_next(_, _, _, state) do - message = "handle_next/4 not implemented" - case :erlang.phash2(1, 1) do - 0 -> raise message - 1 -> {:error, RuntimeError.exception(message), state} - end + def handle_first(query, cursor, opts, state) do + handle_fetch(query, cursor, opts, state) + end + + def handle_next(query, cursor, opts, state) do + handle_fetch(query, cursor, opts, state) end def handle_deallocate(_, _, _, state) do @@ -719,7 +748,7 @@ defmodule DBConnection do {:ok, result} | {:error, Exception.t} def close(conn, query, opts \\ []) do conn - |> cleanup(&run_close/6, query, meter(opts), opts) + |> cleanup(&run_close/6, [query], meter(opts), opts) |> log(:close, query, nil) end @@ -1174,18 +1203,207 @@ defmodule DBConnection do %DBConnection.Stream{conn: conn, query: query, params: params, opts: opts} end + @doc """ + Prepare a query and declare a cursor. + + Returns `{:ok, query, cursor}` on success or `{:error, exception}` if there + was an error. + + ### Options + + See module documentation. The pool and connection module may support other + options. All options are passed to `handle_prepare/3`, handle_declare/4` and + `handle_close/3`. + + ### Example + + query = "SELECT id FROM table" + {:ok, query, cursor} = DBConnection.prepare_declare(conn, query, []) + try do + {:cont, result} = DBConnection.fetch!(conn, query, cursor, opts) + {:halt, result} = DBConnection.fetch!(conn, query, cursor, opts) + after + DBConnection.deallocate(conn, query, cursor, opts) + end + """ + @spec prepare_declare(conn, query, params, opts :: Keyword.t) :: + {:ok, cursor} | {:error, Exception.t} + def prepare_declare(conn, query, params, opts) do + result = + with {:ok, query, meter} <- parse(query, meter(opts), opts) do + run(conn, &run_prepare_declare/6, query, params, meter, opts) + end + log(result, :prepare_declare, query, params) + end + + @spec prepare_declare!(conn, query, params, opts :: Keyword.t) :: cursor + def prepare_declare!(conn, query, params, opts) do + case prepare_declare(conn, query, params, opts) do + {:ok, query, cursor} -> + {query, cursor} + {:error, err} -> + raise err + end + end + + @doc """ + Declare a cursor. + + Returns `{:ok, cursor}` on success or `{:error, exception}` if there was + an error. + + ### Options + + See module documentation. The pool and connection module may support other + options. All options are passed to `handle_declare/4`. + + ### Example + + query = "SELECT id FROM table" + {:ok, cursor} = DBConnection.declare(conn, query, []) + try do + {:cont, result} = DBConnection.fetch!(conn, query, cursor, opts) + {:halt, result} = DBConnection.fetch!(conn, query, cursor, opts) + after + DBConnection.deallocate(conn, query, cursor, opts) + end + """ + @spec declare(conn, query, params, opts :: Keyword.t) :: + {:ok, cursor} | {:error, Exception.t} + def declare(conn, query, params, opts) do + result = + with {:ok, params, meter} <- encode(query, params, meter(opts), opts) do + run(conn, &run_declare/6, query, params, meter, opts) + end + log(result, :declare, query, params) + end + + @doc """ + Declare a cursor. + + Returns `cursor` on success or `{:error, exception}` if there was an error. + + See `declare/4`. + """ + @spec declare!(conn, query, params, opts :: Keyword.t) :: cursor + def declare!(conn, query, params, opts) do + case declare(conn, query, params, opts) do + {:ok, cursor} -> + cursor + {:error, err} -> + raise err + end + end + + @doc """ + Fetch using a cursor. + + Returns `{:cont, result}` on success and the cursor can be used to fetch + again, `{:halt, result}` on success and the cursor is closed, or + `{:error, exception}` if there was an error. + + On `:halt` tuple the cursor does not need to be deallocated but the callback + implementation may allow it. + + ### Options + + See module documentation. The pool and connection module may support other + options. All options are passed to `handle_fetch/4`. + + See `declare/4`. + """ + @spec fetch(conn, query, cursor, opts :: Keyword.t) :: + {:cont | :halt, result} | {:error, Exception.t} + def fetch(conn, query, cursor, opts) do + fun = :handle_fetch + args = [query, cursor] + result = + with {ok, result, meter} when ok in [:cont, :halt] + <- run(conn, &run_fetch/6, fun, args, meter(opts), opts), + {:ok, result, meter} <- decode(query, result, meter, opts) do + {ok, result, meter} + end + log(result, :fetch, query, cursor) + end + + @doc """ + Fetch using a cursor. + + Returns `{:cont, result}` on success and the cursor can be used to fetch + again, `{:halt, result}` on success and the cursor is closed, or raises an + exception if there was an error. + + See `fetch/4`. + """ + @spec fetch!(conn, query, cursor, opts :: Keyword.t) :: + {:cont | :halt, result} + def fetch!(conn, query, cursor, opts) do + case fetch(conn, query, cursor, opts) do + {:cont, _} = cont -> + cont + {:halt, _} = halt -> + halt + {:error, err} -> + raise err + end + end + + @doc """ + Deallocate a cursor. + + Returns `{:ok, result}` on success or `{:error, exception}` if there was an + error. + + ### Options + + See module documentation. The pool and connection module may support other + options. All options are passed to `handle_deallocate/4`. + + See `declare/4`. + """ + @spec deallocate(conn, query, cursor, opts :: Keyword.t) :: + {:ok, result} | {:error, Exception.t} + def deallocate(conn, query, cursor, opts) do + conn + |> cleanup(&run_deallocate/6, [query, cursor], meter(opts), opts) + |> log(:deallocate, query, cursor) + end + + @doc """ + Deallocate a cursor. + + Returns `result` on success or raises an exception if there was an error. + + See `deallocate/4`. + """ + @spec deallocate!(conn, query, cursor, opts :: Keyword.t) :: result + def deallocate!(conn, query, cursor, opts) do + case deallocate(conn, query, cursor, opts) do + {:ok, result} -> + result + {:error, err} -> + raise err + end + end + @doc false def reduce(%DBConnection.PrepareStream{} = stream, acc, fun) do %DBConnection.PrepareStream{conn: conn, query: query, params: params, opts: opts} = stream - declare = &prepare_declare(&1, query, params, &2) - resource(conn, declare, &fetch/3, &deallocate/3, opts).(acc, fun) + declare = + fn(conn, opts) -> + {query, cursor} = prepare_declare!(conn, query, params, opts) + {:first, query, cursor} + end + enum = resource(conn, declare, &stream_fetch/3, &stream_deallocate/3, opts) + enum.(acc, fun) end def reduce(%DBConnection.Stream{} = stream, acc, fun) do %DBConnection.Stream{conn: conn, query: query, params: params, opts: opts} = stream - declare = &declare(&1, query, params, &2) - resource(conn, declare, &fetch/3, &deallocate/3, opts).(acc, fun) + declare = &{:first, query, declare!(&1, query, params, &2)} + enum = resource(conn, declare, &stream_fetch/3, &stream_deallocate/3, opts) + enum.(acc, fun) end ## Helpers @@ -1274,9 +1492,21 @@ defmodule DBConnection do try do apply(conn_mod, fun, args ++ [opts, conn_state]) else + {:ok, result, conn_state} + when fun in [:handle_first, :handle_next] -> + put_info(conn, conn_state) + {:cont, result, meter} {:ok, result, conn_state} -> put_info(conn, conn_state) {:ok, result, meter} + {:cont, result, conn_state} + when fun in [:handle_fetch, :handle_first, :handle_next] -> + put_info(conn, conn_state) + {:cont, result, meter} + {:halt, result, conn_state} + when fun in [:handle_fetch, :handle_first, :handle_next] -> + put_info(conn, conn_state) + {:halt, result, meter} {:deallocate, result, conn_state} when fun in [:handle_first, :handle_next] -> put_info(conn, conn_state) @@ -1412,26 +1642,26 @@ defmodule DBConnection do with {status, conn_state, meter} when status in [:ok, :failed] <- get_info(conn, meter), {:ok, _, meter} - <- run_close(conn, status, conn_state, query, meter, opts) do + <- run_close(conn, status, conn_state, [query], meter, opts) do {kind, reason, stack, meter} end end - defp run_close(conn, status, conn_state, query, meter, opts) do + defp run_close(conn, status, conn_state, args, meter, opts) do meter = event(meter, :close) - cleanup(conn, status, conn_state, :handle_close, [query], meter, opts) + cleanup(conn, status, conn_state, :handle_close, args, meter, opts) end - defp cleanup(%DBConnection{} = conn, fun, arg, meter, opts) do + defp cleanup(%DBConnection{} = conn, fun, args, meter, opts) do with {status, conn_state, meter} when status in [:ok, :failed] <- get_info(conn, meter) do - fun.(conn, status, conn_state, arg, meter, opts) + fun.(conn, status, conn_state, args, meter, opts) end end - defp cleanup(pool, fun, arg, meter, opts) do + defp cleanup(pool, fun, args, meter, opts) do with {:ok, conn, conn_state, meter} <- checkout(pool, meter, opts) do try do - fun.(conn, :ok, conn_state, arg, meter, opts) + fun.(conn, :ok, conn_state, args, meter, opts) after checkin(conn, opts) end @@ -1530,8 +1760,8 @@ defmodule DBConnection do do: log(meter, call, query, params, {:ok, res}) defp log({:ok, res1, res2, meter}, call, query, params), do: log(meter, call, query, params, {:ok, res1, res2}) - defp log({:deallocate, res, meter}, call, query, params), - do: log(meter, call, query, params, {:deallocate, res}) + defp log({ok, res, meter}, call, query, cursor) when ok in [:cont, :halt], + do: log(meter, call, query, cursor, {ok, res}) defp log({:error, err, meter}, call, query, params), do: log(meter, call, query, params, {:error, err}) defp log({kind, reason, stack, meter}, call, query, params), @@ -1556,7 +1786,7 @@ defmodule DBConnection do msg = "an exception was raised: " <> Exception.format(kind, reason, stack) {:error, %DBConnection.ConnectionError{message: msg}} end - defp entry_result({:deallocate, res}), + defp entry_result({ok, res}) when ok in [:cont, :halt], do: {:ok, res} defp entry_result(other), do: other @@ -1723,111 +1953,77 @@ defmodule DBConnection do end end - defp prepare_declare(conn, query, params, opts) do - result = - with {:ok, query, meter} <- parse(query, meter(opts), opts) do - run(conn, &run_prepare_declare/6, query, params, meter, opts) - end - case log(result, :prepare_declare, query, params) do - {:ok, query, cursor} -> - {:first, query, cursor} - {:error, err} -> - raise err - end - end - defp run_prepare_declare(conn, conn_info, query, params, meter, opts) do with {:ok, query, meter} <- prepare(conn, conn_info, query, meter, opts), {:ok, query, meter} <- describe(conn, query, meter, opts), {:ok, params, meter} <- encode(conn, query, params, meter, opts), - {:ok, cursor, meter} <- declare(conn, query, params, meter, opts) do + {:ok, conn_state, meter} <- fetch_info(conn, meter), + {:ok, cursor, meter} + <- run_declare(conn, conn_state, query, params, meter, opts) do {:ok, query, cursor, meter} end end - defp declare(conn, query, params, opts) do - result = - with {:ok, params, meter} <- encode(query, params, meter(opts), opts) do - run(conn, &run_declare/6, query, params, meter, opts) - end - case log(result, :declare, query, params) do - {:ok, cursor} -> - {:first, query, cursor} - {:error, err} -> - raise err - end - end - - defp declare(conn, query, params, meter, opts) do - with {:ok, conn_state, meter} <- fetch_info(conn, meter) do - run_declare(conn, conn_state, query, params, meter, opts) - end - end - defp run_declare(conn, conn_state, query, params, meter, opts) do meter = event(meter, :declare) handle(conn, conn_state, :handle_declare, [query, params], meter, opts) end - defp fetch(conn, {:first, query, cursor}, opts) do - fetch(conn, :handle_first, :first, query, cursor,opts) + defp stream_fetch(conn, {:first, query, cursor}, opts) do + stream_fetch(conn, :handle_first, query, cursor,opts) end - defp fetch(conn, {:next, query, cursor}, opts) do - fetch(conn, :handle_next, :next, query, cursor, opts) + defp stream_fetch(conn, {:cont, query, cursor}, opts) do + stream_fetch(conn, :handle_next, query, cursor, opts) end - defp fetch(_, {:deallocate, _, _} = state, _) do + defp stream_fetch(_, {:halt, _, _} = state, _) do {:halt, state} end - defp fetch(conn, fun, call, query, cursor, opts) do + defp stream_fetch(conn, fun, query, cursor, opts) do result = conn - |> run(&run_fetch/6, fun, [query, cursor], meter(opts), opts) - |> log(call, query, cursor) + |> run(&run_stream_fetch/6, fun, [query, cursor], meter(opts), opts) + |> log(:fetch, query, cursor) case result do - {:ok, result} -> - {[result], {:next, query, cursor}} - {:deallocate, result} -> - {[result], {:deallocate, query, cursor}} + {ok, result} when ok in [:cont, :halt] -> + {[result], {ok, query, cursor}} {:error, err} -> raise err end end - defp run_fetch(conn, conn_info, fun, [query, _] = args, meter, opts) do - meter = event(meter, :fetch) - with {ok, result, meter} when ok in [:ok, :deallocate] - <- handle(conn, conn_info, fun, args, meter, opts), + defp run_stream_fetch(conn, conn_state, fun, args, meter, opts) do + [query, _] = args + with {ok, result, meter} when ok in [:cont, :halt] + <- run_fetch(conn, conn_state, fun, args, meter, opts), {:ok, result, meter} <- decode(query, result, meter, opts) do {ok, result, meter} end end - defp deallocate(conn, {_, query, cursor}, opts) do - case get_info(conn, meter(opts)) do - {status, conn_state, meter} when status in [:ok, :failed] -> - deallocate(conn, status, conn_state, query, cursor, meter, opts) - {:error, err, _meter} -> - {:error, err} + defp run_fetch(conn, conn_state, fun, args, meter, opts) do + with {:deallocate, result, meter} + <- fetch(conn, conn_state, fun, args, meter, opts), + {status, conn_state, meter} when status in [:ok, :failed] + <- get_info(conn, meter), + {:ok, _, meter} + <- run_deallocate(conn, status, conn_state, args, meter, opts) do + {:halt, result, meter} end end - defp deallocate(conn, status, conn_state, query, cursor, meter, opts) do - result = - conn - |> run_deallocate(status, conn_state, query, cursor, meter, opts) - |> log(:deallocate, query, cursor) - case result do - {:ok, _} -> - :ok - {:error, err} -> - raise err - end + defp fetch(conn, conn_state, fun, args, meter, opts) do + meter = event(meter, :fetch) + handle(conn, conn_state, fun, args, meter, opts) end - defp run_deallocate(conn, status, conn_state, query, cursor, meter, opts) do + defp stream_deallocate(_conn, {:halt, _query, _cursor}, _opts), + do: :ok + defp stream_deallocate(conn, {_cont, query, cursor}, opts), + do: deallocate(conn, query, cursor, opts) + + defp run_deallocate(conn, status, conn_state, args, meter, opts) do meter = event(meter, :deallocate) - args = [query, cursor] cleanup(conn, status, conn_state, :handle_deallocate, args, meter, opts) end @@ -1838,7 +2034,6 @@ defmodule DBConnection do Stream.resource(start, next, stop) end - defp put_info(conn, status \\ :ok, conn_state) do _ = Process.put(key(conn), {status, conn_state}) :ok diff --git a/test/test_support.exs b/test/test_support.exs index 76f74be1..5af4cd3d 100644 --- a/test/test_support.exs +++ b/test/test_support.exs @@ -55,6 +55,30 @@ defmodule TestConnection do DBConnection.prepare_stream(conn, query, params, opts2 ++ unquote(opts)) end + def declare(pool, query, params, opts2 \\ []) do + DBConnection.declare(pool, query, params, opts2 ++ unquote(opts)) + end + + def declare!(pool, query, params, opts2 \\ []) do + DBConnection.declare!(pool, query, params, opts2 ++ unquote(opts)) + end + + def fetch(pool, query, cursor, opts2 \\ []) do + DBConnection.fetch(pool, query, cursor, opts2 ++ unquote(opts)) + end + + def fetch!(pool, query, cursor, opts2 \\ []) do + DBConnection.fetch!(pool, query, cursor, opts2 ++ unquote(opts)) + end + + def deallocate(pool, query, cursor, opts2 \\ []) do + DBConnection.deallocate(pool, query, cursor, opts2 ++ unquote(opts)) + end + + def deallocate!(pool, query, cursor, opts2 \\ []) do + DBConnection.deallocate!(pool, query, cursor, opts2 ++ unquote(opts)) + end + def close(pool, query, opts2 \\ []) do DBConnection.close(pool, query, opts2 ++ unquote(opts)) end @@ -131,6 +155,10 @@ defmodule TestConnection do TestAgent.eval(:handle_declare, [query, params, opts, state]) end + def handle_fetch(query, cursor, opts, state) do + TestAgent.eval(:handle_fetch, [query, cursor, opts, state]) + end + def handle_first(query, cursor, opts, state) do TestAgent.eval(:handle_first, [query, cursor, opts, state]) end