Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 0 additions & 27 deletions integration_test/cases/close_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -154,31 +154,4 @@ defmodule CloseTest do
connect: [_],
handle_close: [%Q{}, _, :state]] = A.record(agent)
end

test "close in failed transaction" do
stack = [
{:ok, :state},
{:ok, :began, :new_state},
{:ok, :result, :newer_state},
{:ok, :rolledback, :newest_state}
]
{:ok, agent} = A.start_link(stack)

opts = [agent: agent, parent: self()]
{:ok, pool} = P.start_link(opts)

assert P.transaction(pool, fn(conn) ->
assert P.transaction(conn, fn(conn2) ->
P.rollback(conn2, :oops)
end) == {:error, :oops}

assert P.close(conn, %Q{}, opts) == {:ok, :result}
end) == {:error, :rollback}

assert [
connect: [_],
handle_begin: [_, :state],
handle_close: [%Q{}, _, :new_state],
handle_rollback: [_, :newer_state]] = A.record(agent)
end
end
165 changes: 68 additions & 97 deletions integration_test/cases/prepare_stream_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,93 +10,83 @@ defmodule PrepareStreamTest do
test "prepare_stream returns result" do
stack = [
{:ok, :state},
{:ok, :began, :new_state},
{:ok, %Q{}, :newer_state},
{:ok, %C{}, :newest_state},
{:ok, %R{}, :state2},
{:deallocate, %R{}, :new_state2},
{:ok, :deallocated, :newer_state2},
{:ok, :committed, :newest_state2}
{:ok, %Q{}, :new_state},
{:ok, %C{}, :newer_state},
{:ok, %R{}, :newest_state},
{:deallocate, %R{}, :state2},
{:ok, :deallocated, :new_state2}
]
{:ok, agent} = A.start_link(stack)

opts = [agent: agent, parent: self()]
{:ok, pool} = P.start_link(opts)
assert P.transaction(pool, fn(conn) ->
assert P.run(pool, fn(conn) ->
stream = P.prepare_stream(conn, %Q{}, [:param])
assert %DBConnection.PrepareStream{} = stream
assert Enum.to_list(stream) == [%R{}, %R{}]
:hi
end) == {:ok, :hi}
end) == :hi

assert [
connect: [_],
handle_begin: [_, :state],
handle_prepare: [%Q{}, _, :new_state],
handle_declare: [%Q{}, [:param], _, :newer_state],
handle_first: [%Q{}, %C{}, _, :newest_state],
handle_next: [%Q{}, %C{}, _, :state2],
handle_deallocate: [%Q{}, %C{}, _, :new_state2],
handle_commit: [_, :newer_state2]
handle_prepare: [%Q{}, _, :state],
handle_declare: [%Q{}, [:param], _, :new_state],
handle_first: [%Q{}, %C{}, _, :newer_state],
handle_next: [%Q{}, %C{}, _, :newest_state],
handle_deallocate: [%Q{}, %C{}, _, :state2],
] = A.record(agent)
end

test "prepare_stream parses/describes/encodes/decodes" do
stack = [
{:ok, :state},
{:ok, :began, :new_state},
{:ok, %Q{state: :prepared}, :newer_state},
{:ok, %C{}, :newest_state},
{:deallocate, %R{}, :state2},
{:ok, :deallocated, :new_state2},
{:ok, :committed, :newer_state2}
{:ok, %Q{state: :prepared}, :new_state},
{:ok, %C{}, :newer_state},
{:deallocate, %R{}, :newest_state},
{:ok, :deallocated, :state2}
]
{:ok, agent} = A.start_link(stack)

opts = [agent: agent, parent: self()]
{:ok, pool} = P.start_link(opts)

assert P.transaction(pool, fn(conn) ->
assert P.run(pool, fn(conn) ->
opts2 = [parse: fn(%Q{state: nil}) -> %Q{state: :parsed} end,
describe: fn(%Q{state: :prepared}) -> %Q{state: :described} end,
encode: fn([:param]) -> :encoded end,
decode: fn(%R{}) -> :decoded end]
stream = P.prepare_stream(conn, %Q{}, [:param], opts2)
assert Enum.to_list(stream) == [:decoded]
:hi
end) == {:ok, :hi}
end) == :hi

assert [
connect: [_],
handle_begin: [_, :state],
handle_prepare: [%Q{state: :parsed}, _, :new_state],
handle_declare: [%Q{state: :described}, :encoded, _, :newer_state],
handle_first: [%Q{state: :described}, %C{}, _, :newest_state],
handle_deallocate: [%Q{}, %C{}, _, :state2],
handle_commit: [_, :new_state2]
handle_prepare: [%Q{state: :parsed}, _, :state],
handle_declare: [%Q{state: :described}, :encoded, _, :new_state],
handle_first: [%Q{state: :described}, %C{}, _, :newer_state],
handle_deallocate: [%Q{}, %C{}, _, :newest_state]
] = A.record(agent)
end

test "prepare_stream logs result" do
stack = [
{:ok, :state},
{:ok, :began, :new_state},
{:ok, %Q{}, :newer_state},
{:ok, %C{}, :newest_state},
{:ok, %R{}, :state2},
{:ok, :deallocated, :new_state2},
{:ok, :committed, :newest_state2}
{:ok, %Q{}, :new_state},
{:ok, %C{}, :newer_state},
{:ok, %R{}, :newest_state},
{:ok, :deallocated, :state2}
]
{:ok, agent} = A.start_link(stack)

parent = self()
opts = [agent: agent, parent: parent]
{:ok, pool} = P.start_link(opts)
assert P.transaction(pool, fn(conn) ->
assert P.run(pool, fn(conn) ->
stream = P.prepare_stream(conn, %Q{}, [:param], [log: &send(parent, &1)])
assert Enum.take(stream, 1) == [%R{}]
:hi
end) == {:ok, :hi}
end) == :hi

assert_received %DBConnection.LogEntry{call: :prepare_declare} = entry
assert %{query: %Q{}, params: [:param], result: {:ok, %Q{}, %C{}}} = entry
Expand All @@ -122,34 +112,30 @@ defmodule PrepareStreamTest do

assert [
connect: [_],
handle_begin: [_, :state],
handle_prepare: [%Q{}, _, :new_state],
handle_declare: [%Q{}, [:param], _, :newer_state],
handle_first: [%Q{}, %C{}, _, :newest_state],
handle_deallocate: [%Q{}, %C{}, _, :state2],
handle_commit: [_, :new_state2]
handle_prepare: [%Q{}, _, :state],
handle_declare: [%Q{}, [:param], _, :new_state],
handle_first: [%Q{}, %C{}, _, :newer_state],
handle_deallocate: [%Q{}, %C{}, _, :newest_state]
] = A.record(agent)
end

test "prepare_stream logs prepare error" do
err = RuntimeError.exception("oops")
stack = [
{:ok, :state},
{:ok, :began, :new_state},
{:error, err, :newer_state},
{:ok, :comitted, :newest_state}
{:error, err, :new_state}
]
{:ok, agent} = A.start_link(stack)

parent = self()
opts = [agent: agent, parent: parent]
{:ok, pool} = P.start_link(opts)

assert P.transaction(pool, fn(conn) ->
assert P.run(pool, fn(conn) ->
stream = P.prepare_stream(conn, %Q{}, [:param], [log: &send(parent, &1)])
assert_raise RuntimeError, "oops", fn() -> Enum.take(stream, 1) end
:hi
end) == {:ok, :hi}
end) == :hi

assert_received %DBConnection.LogEntry{call: :prepare_declare} = entry
assert %{query: %Q{}, params: [:param], result: {:error, ^err}} = entry
Expand All @@ -160,32 +146,28 @@ defmodule PrepareStreamTest do

assert [
connect: [_],
handle_begin: [_, :state],
handle_prepare: [%Q{}, _, :new_state],
handle_commit: [_, :newer_state],
handle_prepare: [%Q{}, _, :state]
] = A.record(agent)
end

test "prepare_stream logs declare error" do
err = RuntimeError.exception("oops")
stack = [
{:ok, :state},
{:ok, :began, :new_state},
{:ok, %Q{}, :newer_state},
{:error, err, :newest_state},
{:ok, :comitted, :state2}
{:ok, %Q{}, :new_state},
{:error, err, :newer_state},
]
{:ok, agent} = A.start_link(stack)

parent = self()
opts = [agent: agent, parent: parent]
{:ok, pool} = P.start_link(opts)

assert P.transaction(pool, fn(conn) ->
assert P.run(pool, fn(conn) ->
stream = P.prepare_stream(conn, %Q{}, [:param], [log: &send(parent, &1)])
assert_raise RuntimeError, "oops", fn() -> Enum.take(stream, 1) end
:hi
end) == {:ok, :hi}
end) == :hi

assert_received %DBConnection.LogEntry{call: :prepare_declare} = entry
assert %{query: %Q{}, params: [:param], result: {:error, ^err}} = entry
Expand All @@ -196,20 +178,17 @@ defmodule PrepareStreamTest do

assert [
connect: [_],
handle_begin: [_, :state],
handle_prepare: [%Q{}, _, :new_state],
handle_declare: [%Q{}, [:param], _, :newer_state],
handle_commit: [_, :newest_state],
handle_prepare: [%Q{}, _, :state],
handle_declare: [%Q{}, [:param], _, :new_state]
] = A.record(agent)
end

test "prepare_stream declare disconnects" do
err = RuntimeError.exception("oops")
stack = [
{:ok, :state},
{:ok, :began, :new_state},
{:ok, %Q{}, :newer_state},
{:disconnect, err, :newest_state},
{:ok, %Q{}, :new_state},
{:disconnect, err, :newer_state},
:ok,
fn(opts) ->
send(opts[:parent], :reconnected)
Expand All @@ -222,20 +201,19 @@ defmodule PrepareStreamTest do
opts = [agent: agent, parent: parent]
{:ok, pool} = P.start_link(opts)

assert P.transaction(pool, fn(conn) ->
assert P.run(pool, fn(conn) ->
stream = P.prepare_stream(conn, %Q{}, [:param])
assert_raise RuntimeError, "oops", fn() -> Enum.take(stream, 1) end
:hi
end) == {:error, :rollback}
end) == :hi

assert_receive :reconnected

assert [
connect: [_],
handle_begin: [_, :state],
handle_prepare: [%Q{}, _, :new_state],
handle_declare: [%Q{}, [:param], _, :newer_state],
disconnect: [^err, :newest_state],
handle_prepare: [%Q{}, _, :state],
handle_declare: [%Q{}, [:param], _, :new_state],
disconnect: [^err, :newer_state],
connect: [_]
] = A.record(agent)
end
Expand All @@ -247,7 +225,6 @@ defmodule PrepareStreamTest do
Process.link(opts[:parent])
{:ok, :state}
end,
{:ok, :began, :new_state},
:oops,
{:ok, :state2}
]
Expand All @@ -259,12 +236,12 @@ defmodule PrepareStreamTest do
assert_receive {:hi, conn}

Process.flag(:trap_exit, true)
assert P.transaction(pool, fn(conn) ->
assert P.run(pool, fn(conn) ->
stream = P.prepare_stream(conn, %Q{}, [:param])
assert_raise DBConnection.ConnectionError, "bad return value: :oops",
fn() -> Enum.to_list(stream) end
:hi
end) == {:error, :rollback}
end) == :hi

prefix = "client #{inspect self()} stopped: " <>
"** (DBConnection.ConnectionError) bad return value: :oops"
Expand All @@ -275,8 +252,7 @@ defmodule PrepareStreamTest do

assert [
{:connect, _},
{:handle_begin, [_, :state]},
{:handle_prepare, [%Q{}, _, :new_state]} | _] = A.record(agent)
{:handle_prepare, [%Q{}, _, :state]} | _] = A.record(agent)
end

test "prepare_stream declare raise raises and stops connection" do
Expand All @@ -286,8 +262,7 @@ defmodule PrepareStreamTest do
Process.link(opts[:parent])
{:ok, :state}
end,
{:ok, :began, :new_state},
{:ok, %Q{}, :newer_state},
{:ok, %Q{}, :new_state},
fn(_, _, _, _) ->
raise "oops"
end,
Expand All @@ -301,11 +276,11 @@ defmodule PrepareStreamTest do
assert_receive {:hi, conn}

Process.flag(:trap_exit, true)
assert P.transaction(pool, fn(conn) ->
assert P.run(pool, fn(conn) ->
stream = P.prepare_stream(conn, %Q{}, [:param])
assert_raise RuntimeError, "oops", fn() -> Enum.to_list(stream) end
:hi
end) == {:error, :rollback}
end) == :hi

prefix = "client #{inspect self()} stopped: ** (RuntimeError) oops"
len = byte_size(prefix)
Expand All @@ -315,28 +290,25 @@ defmodule PrepareStreamTest do

assert [
{:connect, _},
{:handle_begin, [_, :state]},
{:handle_prepare, [%Q{}, _, :new_state]},
{:handle_declare, [%Q{}, [:param], _, :newer_state]} | _] = A.record(agent)
{:handle_prepare, [%Q{}, _, :state]},
{:handle_declare, [%Q{}, [:param], _, :new_state]} | _] = A.record(agent)
end

test "prepare_stream describe or encode raises and closes query" do
stack = [
{:ok, :state},
{:ok, :began, :new_state},
{:ok, %Q{}, :newer_state},
{:ok, :closed, :newest_state},
{:ok, %Q{}, :state2},
{:ok, :result, :new_state2},
{:ok, :committed, :newer_state2}
{:ok, %Q{}, :new_state},
{:ok, :closed, :newer_state},
{:ok, %Q{}, :newest_state},
{:ok, :result, :state2}
]
{:ok, agent} = A.start_link(stack)

parent = self()
opts = [agent: agent, parent: parent]
{:ok, pool} = P.start_link(opts)

assert P.transaction(pool, fn(conn) ->
assert P.run(pool, fn(conn) ->
describe = fn(%Q{}) -> raise "oops" end
stream = P.prepare_stream(conn, %Q{}, [:param], [describe: describe])
assert_raise RuntimeError, "oops", fn() -> Enum.to_list(stream) end
Expand All @@ -346,15 +318,14 @@ defmodule PrepareStreamTest do
assert_raise RuntimeError, "oops", fn() -> Enum.to_list(stream) end

:hi
end) == {:ok, :hi}
end) == :hi

assert [
connect: [_],
handle_begin: [_, :state],
handle_prepare: [%Q{}, _, :new_state],
handle_close: [%Q{}, _, :newer_state],
handle_prepare: [%Q{}, _, :newest_state],
handle_close: [%Q{}, _, :state2],
handle_commit: [_, :new_state2]] = A.record(agent)
handle_prepare: [%Q{}, _, :state],
handle_close: [%Q{}, _, :new_state],
handle_prepare: [%Q{}, _, :newer_state],
handle_close: [%Q{}, _, :newest_state]
] = A.record(agent)
end
end
Loading