Skip to content

Commit 664d962

Browse files
authored
Introduce declare/fetch/deallocate (#89)
* Add declare(!)/3,4 * Add fetch(!)/3,4 * Add deallocate(!)/3,4 Explicitly defining handle_first/4 and handle_next/4 is deprecated because callback implementations that are required to differentiate also need to track cursors in their state. A single fetch/4 is cleaner to use and these callbacks forward to handle_fetch/4 on `use DBConnection`. For first/next/fetch the return value is `{:cont | :halt, result, state}` where `:cont` is continue (same as `:ok`) and `:halt` means both that the cursor has finished enumerating results and the cursor is deallocated. Therefore `:halt` does not require a `deallocate` call. This API is chosen to avoid an extra roundtrip that current adapters do. handle_first/4 and handle_next/4 still support `:ok` and `:deallocate` tuples. A callback implementation may need to deallocate if the transaction ends without a cursor being deallocated (via `fetch/4` or `deallocate/4`).
1 parent ab041a1 commit 664d962

File tree

6 files changed

+510
-108
lines changed

6 files changed

+510
-108
lines changed
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
defmodule CursorTest do
2+
use ExUnit.Case, async: true
3+
4+
alias TestPool, as: P
5+
alias TestAgent, as: A
6+
alias TestQuery, as: Q
7+
alias TestCursor, as: C
8+
alias TestResult, as: R
9+
10+
test "declare/fetch/deallocate return result" do
11+
stack = [
12+
{:ok, :state},
13+
{:ok, %C{}, :new_state},
14+
{:ok, %C{}, :newer_state},
15+
{:cont, %R{}, :newest_state},
16+
{:halt, %R{}, :state2},
17+
{:ok, :deallocated, :new_state2},
18+
{:ok, :deallocated, :newer_state2}
19+
]
20+
{:ok, agent} = A.start_link(stack)
21+
22+
opts = [agent: agent, parent: self()]
23+
{:ok, pool} = P.start_link(opts)
24+
assert P.declare(pool, %Q{}, [:param]) == {:ok, %C{}}
25+
assert P.declare!(pool, %Q{}, [:param], [key: :value]) == %C{}
26+
27+
assert P.fetch(pool, %Q{}, %C{}) == {:cont, %R{}}
28+
assert P.fetch!(pool, %Q{}, %C{}, [key: :value]) == {:halt, %R{}}
29+
30+
assert P.deallocate(pool, %Q{}, %C{}) == {:ok, :deallocated}
31+
assert P.deallocate!(pool, %Q{}, %C{}, [key: :value]) == :deallocated
32+
33+
assert [
34+
connect: [_],
35+
handle_declare: [%Q{}, [:param], _, :state],
36+
handle_declare: [%Q{}, [:param], [{:key, :value} | _], :new_state],
37+
handle_fetch: [%Q{}, %C{}, _, :newer_state],
38+
handle_fetch: [%Q{}, %C{}, [{:key, :value} | _], :newest_state],
39+
handle_deallocate: [%Q{}, %C{}, _, :state2],
40+
handle_deallocate: [%Q{}, %C{}, [{:key, :value} | _], :new_state2]
41+
] = A.record(agent)
42+
end
43+
44+
test "declare encodes params" do
45+
stack = [
46+
{:ok, :state},
47+
{:ok, %C{}, :new_state},
48+
]
49+
{:ok, agent} = A.start_link(stack)
50+
51+
opts = [agent: agent, parent: self()]
52+
{:ok, pool} = P.start_link(opts)
53+
54+
opts2 = [encode: fn([:param]) -> :encoded end]
55+
assert P.declare(pool, %Q{}, [:param], opts2) == {:ok, %C{}}
56+
57+
assert [
58+
connect: [_],
59+
handle_declare: [%Q{}, :encoded, _, :state]] = A.record(agent)
60+
end
61+
62+
test "fetch decodes result" do
63+
stack = [
64+
{:ok, :state},
65+
{:cont, %R{}, :new_state},
66+
{:halt, %R{}, :newer_state}
67+
]
68+
{:ok, agent} = A.start_link(stack)
69+
70+
opts = [agent: agent, parent: self()]
71+
{:ok, pool} = P.start_link(opts)
72+
73+
opts2 = [decode: fn(%R{}) -> :decoded end]
74+
assert P.fetch(pool, %Q{}, %C{}, opts2) == {:cont, :decoded}
75+
assert P.fetch(pool, %Q{}, %C{}, opts2) == {:halt, :decoded}
76+
77+
assert [
78+
connect: [_],
79+
handle_fetch: [%Q{}, %C{}, _, :state],
80+
handle_fetch: [%Q{}, %C{}, _, :new_state]
81+
] = A.record(agent)
82+
end
83+
84+
test "declare/fetch/deallocate logs result" do
85+
stack = [
86+
{:ok, :state},
87+
{:ok, %C{}, :new_state},
88+
{:cont, %R{}, :newer_state},
89+
{:halt, %R{}, :newest_state},
90+
{:ok, :deallocated, :state2}
91+
]
92+
{:ok, agent} = A.start_link(stack)
93+
94+
parent = self()
95+
opts = [agent: agent, parent: parent]
96+
{:ok, pool} = P.start_link(opts)
97+
98+
log = &send(parent, &1)
99+
assert P.declare(pool, %Q{}, [:param], [log: log]) == {:ok, %C{}}
100+
101+
assert_receive %DBConnection.LogEntry{call: :declare, query: %Q{},
102+
params: [:param], result: {:ok, %C{}}} = entry
103+
assert is_integer(entry.pool_time)
104+
assert entry.pool_time >= 0
105+
assert is_integer(entry.connection_time)
106+
assert entry.connection_time >= 0
107+
assert is_nil(entry.decode_time)
108+
109+
assert P.fetch(pool, %Q{}, %C{}, [log: log]) == {:cont, %R{}}
110+
111+
assert_receive %DBConnection.LogEntry{call: :fetch, query: %Q{},
112+
params: %C{}, result: {:ok, %R{}}} = entry
113+
assert is_integer(entry.pool_time)
114+
assert entry.pool_time >= 0
115+
assert is_integer(entry.connection_time)
116+
assert entry.connection_time >= 0
117+
assert is_integer(entry.decode_time)
118+
assert entry.decode_time >= 0
119+
120+
assert P.fetch(pool, %Q{}, %C{}, [log: log]) == {:halt, %R{}}
121+
122+
assert_receive %DBConnection.LogEntry{call: :fetch, query: %Q{},
123+
params: %C{}, result: {:ok, %R{}}} = entry
124+
assert is_integer(entry.pool_time)
125+
assert entry.pool_time >= 0
126+
assert is_integer(entry.connection_time)
127+
assert entry.connection_time >= 0
128+
assert is_integer(entry.decode_time)
129+
assert entry.decode_time >= 0
130+
131+
assert P.deallocate(pool, %Q{}, %C{}, [log: log]) == {:ok, :deallocated}
132+
133+
assert_receive %DBConnection.LogEntry{call: :deallocate, query: %Q{},
134+
params: %C{}, result: {:ok, :deallocated}} = entry
135+
assert is_integer(entry.pool_time)
136+
assert entry.pool_time >= 0
137+
assert is_integer(entry.connection_time)
138+
assert entry.connection_time >= 0
139+
assert is_nil(entry.decode_time)
140+
141+
assert [
142+
connect: [_],
143+
handle_declare: [%Q{}, [:param], _, :state],
144+
handle_fetch: [%Q{}, %C{}, _, :new_state],
145+
handle_fetch: [%Q{}, %C{}, _, :newer_state],
146+
handle_deallocate: [%Q{}, %C{}, _, :newest_state]
147+
] = A.record(agent)
148+
end
149+
150+
test "declare/fetch/deallocate error returns error" do
151+
err = RuntimeError.exception("oops")
152+
stack = [
153+
{:ok, :state},
154+
{:error, err, :new_state},
155+
{:error, err, :newer_state},
156+
{:error, err, :newesr_state}
157+
]
158+
{:ok, agent} = A.start_link(stack)
159+
160+
opts = [agent: agent, parent: self()]
161+
{:ok, pool} = P.start_link(opts)
162+
assert P.declare(pool, %Q{}, [:param]) == {:error, err}
163+
assert P.fetch(pool, %Q{}, %C{}) == {:error, err}
164+
assert P.deallocate(pool, %Q{}, %C{}) == {:error, err}
165+
166+
assert [
167+
connect: [_],
168+
handle_declare: [%Q{}, [:param], _, :state],
169+
handle_fetch: [%Q{}, %C{}, _, :new_state],
170+
handle_deallocate: [%Q{}, %C{}, _, :newer_state]
171+
] = A.record(agent)
172+
end
173+
end

integration_test/cases/prepare_stream_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ defmodule PrepareStreamTest do
105105
assert entry.connection_time >= 0
106106
assert is_nil(entry.decode_time)
107107

108-
assert_received %DBConnection.LogEntry{call: :first} = entry
108+
assert_received %DBConnection.LogEntry{call: :fetch} = entry
109109
assert %{query: %Q{}, params: %C{}, result: {:ok, %R{}}} = entry
110110
assert is_nil(entry.pool_time)
111111
assert is_integer(entry.connection_time)

integration_test/cases/stream_test.exs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ defmodule StreamTest do
9999
assert entry.connection_time >= 0
100100
assert is_nil(entry.decode_time)
101101

102-
assert_received %DBConnection.LogEntry{call: :first} = entry
102+
assert_received %DBConnection.LogEntry{call: :fetch} = entry
103103
assert %{query: %Q{}, params: %C{}, result: {:ok, %R{}}} = entry
104104
assert is_nil(entry.pool_time)
105105
assert is_integer(entry.connection_time)
@@ -221,14 +221,19 @@ defmodule StreamTest do
221221

222222
assert_received %DBConnection.LogEntry{call: :declare}
223223

224-
assert_received %DBConnection.LogEntry{call: :first} = entry
224+
assert_received %DBConnection.LogEntry{call: :fetch} = entry
225225
assert %{query: %Q{}, params: %C{}, result: {:error, ^err}} = entry
226226
assert is_nil(entry.pool_time)
227227
assert is_integer(entry.connection_time)
228228
assert entry.connection_time >= 0
229229
assert is_nil(entry.decode_time)
230230

231-
refute_received %DBConnection.LogEntry{call: :deallocate}
231+
assert_received %DBConnection.LogEntry{call: :deallocate} = entry
232+
closed = DBConnection.ConnectionError.exception("connection is closed")
233+
assert %{query: %Q{}, params: %C{}, result: {:error, ^closed}} = entry
234+
assert is_nil(entry.pool_time)
235+
assert is_nil(entry.connection_time)
236+
assert is_nil(entry.decode_time)
232237

233238
assert_receive :reconnected
234239

@@ -297,12 +302,12 @@ defmodule StreamTest do
297302

298303
assert P.transaction(pool, fn(conn) ->
299304
stream = P.stream(conn, %Q{}, [:param], [log: &send(parent, &1)])
300-
assert_raise RuntimeError, "oops", fn() -> Enum.take(stream, 1) end
305+
assert Enum.take(stream, 1) == [%R{}]
301306
:hi
302307
end) == {:error, :rollback}
303308

304309
assert_received %DBConnection.LogEntry{call: :declare}
305-
assert_received %DBConnection.LogEntry{call: :first}
310+
assert_received %DBConnection.LogEntry{call: :fetch}
306311

307312
assert_received %DBConnection.LogEntry{call: :deallocate} = entry
308313
assert %{query: %Q{}, params: %C{}, result: {:error, ^err}} = entry

integration_test/tests.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ Code.require_file "cases/after_connect_test.exs", __DIR__
22
Code.require_file "cases/backoff_test.exs", __DIR__
33
Code.require_file "cases/client_test.exs", __DIR__
44
Code.require_file "cases/close_test.exs", __DIR__
5+
Code.require_file "cases/cursor_test.exs", __DIR__
56
Code.require_file "cases/execute_test.exs", __DIR__
67
Code.require_file "cases/idle_test.exs", __DIR__
78
Code.require_file "cases/overflow_test.exs", __DIR__

0 commit comments

Comments
 (0)