Skip to content
This repository was archived by the owner on Mar 19, 2021. It is now read-only.

with_transaction #2 #99

Merged
merged 13 commits into from
Aug 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions lib/sqlitex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ defmodule Sqlitex do
Sqlitex.query(db, "select * from mytable", db_chunk_size: 500)
```
in this case all rows will be passed from native sqlite OS thread to the erlang process in two passes.
Each pass will contain 500 rows.
Each pass will contain 500 rows.
This parameter decrease overhead of transmitting rows from native OS sqlite thread to the erlang process by
chunking list of result rows.
Please, decrease this value if rows are heavy. Default value is 5000.
chunking list of result rows.
Please, decrease this value if rows are heavy. Default value is 5000.
If you in doubt what to do with this parameter, please, do nothing. Default value is ok.
```
config :sqlitex, db_chunk_size: 500 # if most of the database rows are heavy
Expand Down Expand Up @@ -150,9 +150,46 @@ defmodule Sqlitex do
exec(db, stmt, call_opts)
end

@doc """
Runs `fun` inside a transaction. If `fun` returns without raising an exception,
the transaction will be commited via `commit`. Otherwise, `rollback` will be called.

## Examples
iex> {:ok, db} = Sqlitex.open(":memory:")
iex> Sqlitex.with_transaction(db, fn(db) ->
...> Sqlitex.exec(db, "create table foo(id integer)")
...> Sqlitex.exec(db, "insert into foo (id) values(42)")
...> end)
iex> Sqlitex.query(db, "select * from foo")
{:ok, [[{:id, 42}]]}
"""
@spec with_transaction(Sqlitex.connection, (Sqlitex.connection -> any()), Keyword.t) :: any
def with_transaction(db, fun, opts \\ []) do
with :ok <- exec(db, "begin", opts),
{:ok, result} <- apply_rescuing(fun, [db]),
:ok <- exec(db, "commit", opts)
do
{:ok, result}
else
err ->
:ok = exec(db, "rollback", opts)
err
end
end

if Version.compare(System.version, "1.3.0") == :lt do
defp string_to_charlist(string), do: String.to_char_list(string)
else
defp string_to_charlist(string), do: String.to_charlist(string)
end

## Private Helpers

defp apply_rescuing(fun, args) do
try do
{:ok, apply(fun, args)}
rescue
error -> {:rescued, error, __STACKTRACE__}
end
end
end
66 changes: 60 additions & 6 deletions lib/sqlitex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ defmodule Sqlitex.Server do
{:reply, result, {db, stmt_cache, config}}
end

def handle_call({:with_transaction, fun}, _from, {db, _stmt_cache, _config} = state) do
pid = self()
Process.put({:state, pid}, state)
result = Sqlitex.with_transaction(db, fn _db -> fun.(pid) end)
{:reply, result, Process.delete({:state, pid})}
end

def handle_cast(:stop, {db, stmt_cache, config}) do
{:stop, :normal, {db, stmt_cache, config}}
end
Expand All @@ -136,7 +143,7 @@ defmodule Sqlitex.Server do
Returns the results otherwise.
"""
def exec(pid, sql, opts \\ []) do
GenServer.call(pid, {:exec, sql}, Config.call_timeout(opts))
call(pid, {:exec, sql}, opts)
end

@doc """
Expand All @@ -145,7 +152,7 @@ defmodule Sqlitex.Server do
Returns the results otherwise.
"""
def query(pid, sql, opts \\ []) do
GenServer.call(pid, {:query, sql, opts}, Config.call_timeout(opts))
call(pid, {:query, sql, opts}, opts)
end

@doc """
Expand All @@ -154,11 +161,11 @@ defmodule Sqlitex.Server do
Returns the results otherwise.
"""
def query_rows(pid, sql, opts \\ []) do
GenServer.call(pid, {:query_rows, sql, opts}, Config.call_timeout(opts))
call(pid, {:query_rows, sql, opts}, opts)
end

def set_update_hook(server_pid, notification_pid, opts \\ []) do
GenServer.call(server_pid, {:set_update_hook, notification_pid, opts}, Config.call_timeout(opts))
call(server_pid, {:set_update_hook, notification_pid, opts}, opts)
end

@doc """
Expand All @@ -180,18 +187,65 @@ defmodule Sqlitex.Server do
could not be prepared.
"""
def prepare(pid, sql, opts \\ []) do
GenServer.call(pid, {:prepare, sql}, Config.call_timeout(opts))
call(pid, {:prepare, sql}, opts)
end

def create_table(pid, name, table_opts \\ [], cols) do
GenServer.call(pid, {:create_table, name, table_opts, cols})
call(pid, {:create_table, name, table_opts, cols}, [])
end

def stop(pid) do
GenServer.cast(pid, :stop)
end

@doc """
Runs `fun` inside a transaction. If `fun` returns without raising an exception,
the transaction will be commited via `commit`. Otherwise, `rollback` will be called.

Be careful if `fun` might take a long time to run. The function is executed in the
context of the server and therefore blocks other requests until it's finished.

## Examples
iex> {:ok, server} = Sqlitex.Server.start_link(":memory:")
iex> Sqlitex.Server.with_transaction(server, fn(db) ->
...> Sqlitex.Server.exec(db, "create table foo(id integer)")
...> Sqlitex.Server.exec(db, "insert into foo (id) values(42)")
...> end)
iex> Sqlitex.Server.query(server, "select * from foo")
{:ok, [[{:id, 42}]]}
"""
def with_transaction(pid, fun, opts \\ []) do
case call(pid, {:with_transaction, fun}, opts) do
{:rescued, error, trace} ->
Kernel.reraise(error, trace)

other ->
other
end
end

## Helpers
defp call(atom, command, opts) when is_atom(atom) do
call(Process.whereis(atom), command, opts)
end

defp call(pid, command, opts) when is_pid(pid) do
if pid == self() do
key = {:state, pid}
state = Process.get(key)
case command do
{:with_transaction, fun} ->
{db, _stmt_cache, _config} = state
{:ok, fun.(db)}
_other ->
{:reply, result, state} = handle_call(command, nil, state)
Process.put(key, state)
result
end
else
GenServer.call(pid, command, Config.call_timeout(opts))
end
end

defp query_impl(sql, stmt_cache, opts) do
with {%Cache{} = new_cache, stmt} <- Cache.prepare(stmt_cache, sql, opts),
Expand Down
31 changes: 31 additions & 0 deletions test/server_test.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,35 @@
defmodule Sqlitex.ServerTest do
use ExUnit.Case
doctest Sqlitex.Server

test "with_transaction commit" do
alias Sqlitex.Server

{:ok, server} = Server.start_link(':memory:')
:ok = Server.exec(server, "create table foo(id integer)")

Server.with_transaction(server, fn db ->
:ok = Server.exec(db, "insert into foo (id) values (42)")
end)

assert Server.query(server, "select * from foo") == {:ok, [[{:id, 42}]]}
end

test "with_transaction rollback" do
alias Sqlitex.Server

{:ok, server} = Server.start_link(':memory:')
:ok = Server.exec(server, "create table foo(id integer)")

try do
Server.with_transaction(server, fn db ->
:ok = Server.exec(db, "insert into foo (id) values (42)")
raise "Error to roll back transaction"
end)
rescue
_ -> nil
end

assert Server.query(server, "select * from foo") == {:ok, []}
end
end
27 changes: 27 additions & 0 deletions test/sqlitex_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -250,4 +250,31 @@ defmodule Sqlitex.Test do
assert row[:b] == nil
assert row[:c] == nil
end

test "with_transaction commit" do
{:ok, db} = Sqlitex.open(":memory:")
:ok = Sqlitex.exec(db, "create table foo(id integer)")

Sqlitex.with_transaction(db, fn db ->
:ok = Sqlitex.exec(db, "insert into foo (id) values (42)")
end)

assert Sqlitex.query(db, "select * from foo") == {:ok, [[{:id, 42}]]}
end

test "with_transaction rollback" do
{:ok, db} = Sqlitex.open(':memory:')
:ok = Sqlitex.exec(db, "create table foo(id integer)")

try do
Sqlitex.with_transaction(db, fn db ->
:ok = Sqlitex.exec(db, "insert into foo (id) values (42)")
raise "Error to roll back transaction"
end)
rescue
_ -> nil
end

assert Sqlitex.query(db, "select * from foo") == {:ok, []}
end
end