diff --git a/integration_test/cases/transaction_test.exs b/integration_test/cases/transaction_test.exs index e5564da..6735e2f 100644 --- a/integration_test/cases/transaction_test.exs +++ b/integration_test/cases/transaction_test.exs @@ -776,8 +776,8 @@ defmodule TransactionTest do test "transaction logs begin :commit and :rollback" do stack = [ {:ok, :state}, - {:commit, :new_state}, - {:rollback, :newer_state}, + {:transaction, :new_state}, + {:error, :newer_state}, ] {:ok, agent} = A.start_link(stack) @@ -791,7 +791,7 @@ defmodule TransactionTest do [log: log]) == {:error, :rollback} assert_received %DBConnection.LogEntry{call: :begin} = entry - err = DBConnection.TransactionError.exception(:commit) + err = DBConnection.TransactionError.exception(:transaction) assert %{query: :begin, params: nil, result: {:error, ^err}} = entry assert is_integer(entry.pool_time) assert entry.pool_time >= 0 @@ -805,7 +805,7 @@ defmodule TransactionTest do [log: log]) == {:error, :rollback} assert_received %DBConnection.LogEntry{call: :begin} = entry - err = DBConnection.TransactionError.exception(:rollback) + err = DBConnection.TransactionError.exception(:error) assert %{query: :begin, params: nil, result: {:error, ^err}} = entry assert is_integer(entry.pool_time) assert entry.pool_time >= 0 @@ -825,9 +825,9 @@ defmodule TransactionTest do stack = [ {:ok, :state}, {:ok, :began, :new_state}, - {:begin, :newer_state}, + {:idle, :newer_state}, {:ok, :began, :newest_state}, - {:rollback, :state2}, + {:error, :state2}, {:ok, :rolledback, :new_state2} ] {:ok, agent} = A.start_link(stack) @@ -843,7 +843,7 @@ defmodule TransactionTest do [log: log]) == {:error, :rollback} assert_received %DBConnection.LogEntry{call: :commit} = entry - err = DBConnection.TransactionError.exception(:begin) + err = DBConnection.TransactionError.exception(:idle) assert %{query: :commit, params: nil, result: {:error, ^err}} = entry assert is_nil(entry.pool_time) assert is_integer(entry.connection_time) @@ -878,7 +878,7 @@ defmodule TransactionTest do stack = [ {:ok, :state}, {:ok, :began, :new_state}, - {:begin, :newer_state} + {:idle, :newer_state} ] {:ok, agent} = A.start_link(stack) @@ -896,7 +896,7 @@ defmodule TransactionTest do [log: log]) == {:error, :oops} assert_received %DBConnection.LogEntry{call: :rollback} = entry - err = DBConnection.TransactionError.exception(:begin) + err = DBConnection.TransactionError.exception(:idle) assert %{query: :rollback, params: nil, result: {:error, ^err}} = entry assert is_nil(entry.pool_time) assert is_integer(entry.connection_time) @@ -953,4 +953,44 @@ defmodule TransactionTest do handle_begin: [_, :state2], handle_rollback: [_, :new_state2]] = A.record(agent) end + + test "status returns result" do + err = RuntimeError.exception("oops") + stack = [ + {:ok, :state}, + {:idle, :new_state}, + {:transaction, :newer_state}, + {:error, :newest_state}, + {:disconnect, err, :state2}, + :ok, + fn(opts) -> + send(opts[:parent], :reconnected) + {:ok, :new_state2} + end, + ] + {:ok, agent} = A.start_link(stack) + + opts = [agent: agent, parent: self()] + {:ok, pool} = P.start_link(opts) + + assert P.status(pool, opts) == :idle + assert P.status(pool, opts) == :transaction + assert P.run(pool, fn(conn) -> + assert P.status(pool, [queue: false] ++ opts) == :error + assert P.status(conn, opts) + end, opts) + assert P.status(pool, opts) == :error + + assert_receive :reconnected + + assert [ + connect: [_], + handle_status: [ _, :state], + handle_status: [_, :new_state], + handle_status: [_, :newer_state], + handle_status: [_, :newest_state], + disconnect: [^err, :state2], + connect: [_] + ] = A.record(agent) + end end diff --git a/lib/db_connection.ex b/lib/db_connection.ex index d68b02e..1396cc1 100644 --- a/lib/db_connection.ex +++ b/lib/db_connection.ex @@ -77,14 +77,14 @@ defmodule DBConnection do defstruct [:pool_mod, :pool_ref, :conn_mod, :conn_ref, :conn_mode] defmodule TransactionError do - defexception [:next, :message] + defexception [:status, :message] - def exception(:begin), - do: %__MODULE__{next: :begin, message: "transaction is not started"} - def exception(:commit), - do: %__MODULE__{next: :commit, message: "transaction is already started"} - def exception(:rollback), - do: %__MODULE__{next: :rollback, message: "transaction is aborted"} + def exception(:idle), + do: %__MODULE__{status: :idle, message: "transaction is not started"} + def exception(:transaction), + do: %__MODULE__{status: :transaction, message: "transaction is already started"} + def exception(:error), + do: %__MODULE__{status: :error, message: "transaction is aborted"} end @typedoc """ @@ -159,57 +159,70 @@ defmodule DBConnection do @doc """ Handle the beginning of a transaction. - Return `{:ok, result, state}` to continue, `{:commit, state}` to notify caller - to commit open transaction before continuing, `{:rollback, state}` to notify + Return `{:ok, result, state}` to continue, `{:transaction, state}` to notify caller + to commit open transaction before continuing, `{:error, state}` to notify caller to rollback aborted transaction before continuing, - `{:error, exception, state}` to error without beginning the transaction, or - `{:disconnect, exception, state}` to error and disconnect. + `{:error, exception, state}` (deprecated) to error without beginning the + transaction, or `{:disconnect, exception, state}` to error and disconnect. - A callback implementation should only return `:commit` or `:rollback` if it + A callback implementation should only return `:transaction` or `:error` if it can determine the database's transaction status without side effect. This callback is called in the client process. """ @callback handle_begin(opts :: Keyword.t, state :: any) :: {:ok, result, new_state :: any} | - {:commit | :rollback, new_state :: any} | + {:transaction | :error, new_state :: any} | {:error | :disconnect, Exception.t, new_state :: any} @doc """ Handle committing a transaction. Return `{:ok, result, state}` on successfully - committing transaction, `{:begin, state}` to notify caller to begin - transaction before continuing, `{:rollback, state}` to notify caller to + committing transaction, `{:idle, state}` to notify caller to begin + transaction before continuing, `{:error, state}` to notify caller to rollback aborted transaction before continuing, `{:error, exception, state}` - to error and no longer be inside transaction, or + (deprecated) to error and no longer be inside transaction, or `{:disconnect, exception, state}` to error and disconnect. - A callback implementation should only return `:begin` or `:rollback` if it + A callback implementation should only return `:idle` or `:error` if it can determine the database's transaction status without side effect. This callback is called in the client process. """ @callback handle_commit(opts :: Keyword.t, state :: any) :: {:ok, result, new_state :: any} | - {:begin | :rollback, new_state :: any} | + {:idle | :error, new_state :: any} | {:error | :disconnect, Exception.t, new_state :: any} @doc """ Handle committing a transaction. Return `{:ok, result, state}` on successfully - committing transaction, `{:begin, state}` to notify caller to begin - transaction before continuing, `{:error, exception, state}` to error and no - longer be inside transaction, or `{:disconnect, exception, state}` to error - and disconnect. + committing transaction, `{:idle, state}` to notify caller to begin + transaction before continuing, `{:error, exception, state}` (deprecated) to + error and no longer be inside transaction, or + `{:disconnect, exception, state}` to error and disconnect. - A callback implementation should only return `:begin` if it + A callback implementation should only return `:idle` if it can determine the database' transaction status without side effect. This callback is called in the client and connection process. """ @callback handle_rollback(opts :: Keyword.t, state :: any) :: {:ok, result, new_state :: any} | - {:begin, new_state :: any} | + {:idle, new_state :: any} | {:error | :disconnect, Exception.t, new_state :: any} + @doc """ + Handle getting the transaction status. Return `{:idle, state}` if outside a + transaction, `{:transaction, state}` if inside a transaction, + `{:error, state}` if inside an aborted transaction, or + `{:disconnect, exception, state}` to error and disconnect. + + If the callback returns a `:disconnect` tuples then `status/2` will return + `:error`. + """ + @callback handle_status(opts :: Keyword.t, state :: any) :: + {:idle | :transaction | :error, new_state :: any} | + {:disconnect, Exception.t, new_state :: any} + @doc """ Prepare a query with the database. Return `{:ok, query, state}` where `query` is a query to pass to `execute/4` or `close/3`, @@ -404,7 +417,7 @@ defmodule DBConnection do message = "handle_begin/2 not implemented" case :erlang.phash2(1, 1) do 0 -> raise message - 1 -> {:error, RuntimeError.exception(message), state} + 1 -> {:disconnect, RuntimeError.exception(message), state} end end @@ -412,7 +425,7 @@ defmodule DBConnection do message = "handle_commit/2 not implemented" case :erlang.phash2(1, 1) do 0 -> raise message - 1 -> {:error, RuntimeError.exception(message), state} + 1 -> {:disconnect, RuntimeError.exception(message), state} end end @@ -420,7 +433,15 @@ defmodule DBConnection do message = "handle_rollback/2 not implemented" case :erlang.phash2(1, 1) do 0 -> raise message - 1 -> {:error, RuntimeError.exception(message), state} + 1 -> {:disconnect, RuntimeError.exception(message), state} + end + end + + def handle_status(_, state) do + message = "handle_status/2 not implemented" + case :erlang.phash2(1, 1) do + 0 -> raise message + 1 -> {:disconnect, RuntimeError.exception(message), state} end end @@ -1119,6 +1140,54 @@ defmodule DBConnection do end end + @doc """ + Return the transaction status of a connection. + + The callback implementation should return the transaction status according to + the database, and not make assumption based. + + This function will raise a `DBConnection.ConnectionError` when called inside a + deprecated `transaction/3`. + + ### Options + + See module documentation. The pool and connection module may support other + options. All options are passed to `handle_status/2`. + + ### Example + + {:ok, conn, _result} = DBConnection.begin(pool) + try do + fun.(conn) + after + case DBConnection.status(conn) do + :transaction -> + DBConnection.commit(conn) + :error -> + DBConnection.rollback(conn) + raise "transaction is aborted!" + :idle -> + raise "transaction is not started!" + end + end + """ + @spec status(conn, opts :: Keyword.t) :: :idle | :transaction | :error + def status(conn, opts \\ []) + def status(%DBConnection{conn_mode: :transaction}, _opts) do + raise DBConnection.ConnectionError, + "can not get status inside legacy transaction" + end + def status(conn, opts) do + case run(conn, &run_status/4, nil, opts) do + {status, _meter} -> + status + {:error, _err, _meter} -> + :error + {kind, reason, stack, _meter} -> + :erlang.raise(kind, reason, stack) + end + end + @doc """ Create a stream that will prepare a query, execute it and stream results using a cursor. @@ -1511,17 +1580,17 @@ defmodule DBConnection do when fun in [:handle_first, :handle_next] -> put_info(conn, conn_state) {:deallocate, result, meter} - {:begin, conn_state} + {:idle, conn_state} when fun in [:handle_commit, :handle_rollback] -> put_info(conn, conn_state) - {:begin, meter} - {:commit, conn_state} when fun == :handle_begin -> + {:idle, meter} + {:transaction, conn_state} when fun == :handle_begin -> put_info(conn, conn_state) - {:commit, meter} - {:rollback, conn_state} + {:transaction, meter} + {:error, conn_state} when fun in [:handle_begin, :handle_commit] -> put_info(conn, conn_state) - {:rollback, meter} + {:error, meter} {:error, err, conn_state} -> put_info(conn, conn_state) {:error, err, meter} @@ -1889,8 +1958,8 @@ defmodule DBConnection do end defp begin(conn, run, opts) do case run.(conn, &run_begin/4, meter(opts), opts) do - {next, meter} -> - err = DBConnection.TransactionError.exception(next) + {status, meter} -> + err = DBConnection.TransactionError.exception(status) log(meter, :begin, :begin, nil, {:error, err}) other -> log(other, :begin, :begin, nil) @@ -1907,8 +1976,8 @@ defmodule DBConnection do end defp rollback(conn, run, opts) do case run.(conn, &run_rollback/4, meter(opts), opts) do - {next, meter} -> - err = DBConnection.TransactionError.exception(next) + {status, meter} -> + err = DBConnection.TransactionError.exception(status) log(meter, :rollback, :rollback, nil, {:error, err}) other -> log(other, :rollback, :rollback, nil) @@ -1927,10 +1996,10 @@ defmodule DBConnection do case run.(conn, &run_commit/4, meter(opts), opts) do {:rollback, {:ok, result, meter}} -> log(meter, :commit, :rollback, nil, {:ok, result}) - err = DBConnection.TransactionError.exception(:rollback) + err = DBConnection.TransactionError.exception(:error) {:error, err} - {query, {next, meter}} -> - err = DBConnection.TransactionError.exception(next) + {query, {status, meter}} -> + err = DBConnection.TransactionError.exception(status) log(meter, :commit, query, nil, {:error, err}) {query, other} -> log(other, :commit, query, nil) @@ -1944,7 +2013,7 @@ defmodule DBConnection do defp run_commit(conn, conn_state, meter, opts) do meter = event(meter, :commit) case handle(conn, conn_state, :handle_commit, [], meter, opts) do - {:rollback, meter} -> + {:error, meter} -> # conn_state must valid as just put there in previous call {:ok, conn_state, meter} = fetch_info(conn, meter) {:rollback, run_rollback(conn, conn_state, meter, opts)} @@ -1953,6 +2022,34 @@ defmodule DBConnection do end end + defp run_status(conn, conn_state, meter, opts) do + %DBConnection{conn_mod: conn_mod} = conn + try do + apply(conn_mod, :handle_status, [opts, conn_state]) + else + {status, conn_state} when status in [:idle, :transaction, :error] -> + put_info(conn, conn_state) + {status, conn_state} + {:disconnect, err, conn_state} -> + delete_disconnect(conn, conn_state, err, opts) + {:error, meter} + other -> + try do + raise DBConnection.ConnectionError, "bad return value: #{inspect other}" + catch + :error, reason -> + stack = System.stacktrace() + delete_stop(conn, conn_state, :error, reason, stack, opts) + {:error, reason, stack, meter} + end + catch + kind, reason -> + stack = System.stacktrace() + delete_stop(conn, conn_state, kind, reason, stack, opts) + {kind, reason, stack, meter} + end + end + defp run_prepare_declare(conn, conn_info, query, params, meter, opts) do with {:ok, query, meter} <- prepare(conn, conn_info, query, meter, opts), {:ok, query, meter} <- describe(conn, query, meter, opts), diff --git a/test/test_support.exs b/test/test_support.exs index 5af4cd3..ce8655f 100644 --- a/test/test_support.exs +++ b/test/test_support.exs @@ -99,6 +99,10 @@ defmodule TestConnection do DBConnection.commit!(pool, opts2 ++ unquote(opts)) end + def status(pool, opts2 \\ []) do + DBConnection.status(pool, opts2 ++ unquote(opts)) + end + defoverridable [start_link: 1] end end @@ -139,6 +143,10 @@ defmodule TestConnection do TestAgent.eval(:handle_rollback, [opts, state]) end + def handle_status(opts, state) do + TestAgent.eval(:handle_status, [opts, state]) + end + def handle_prepare(query, opts, state) do TestAgent.eval(:handle_prepare, [query, opts, state]) end