Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -229,17 +229,27 @@ def execute_ddl statements

# Transaction

def transaction requires_new: nil, isolation: nil, joinable: true
def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs
commit_options = kwargs.delete :commit_options

if !requires_new && current_transaction.joinable?
return super
end

backoff = 0.2
begin
super
super do
# Once the transaction has been started by `super`, apply your custom options
# to the Spanner transaction object.
if commit_options && @connection.current_transaction
@connection.current_transaction.set_commit_options commit_options
end

yield
end
rescue ActiveRecord::StatementInvalid => err
if err.cause.is_a? Google::Cloud::AbortedError
sleep(delay_from_aborted(err) || backoff *= 1.3)
sleep(delay_from_aborted(err) || (backoff *= 1.3))
retry
end
raise
Expand Down
2 changes: 2 additions & 0 deletions lib/activerecord_spanner_adapter/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ def self.spanners config
# Call this method if you drop and recreate a database with the same name
# to prevent the cached information to be used for the new database.
def self.reset_information_schemas!
return unless @database

@information_schemas.each_value do |info_schema|
info_schema.connection.disconnect!
end
Expand Down
19 changes: 16 additions & 3 deletions lib/activerecord_spanner_adapter/transaction.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@
module ActiveRecordSpannerAdapter
class Transaction
attr_reader :state
attr_reader :commit_options

def initialize connection, isolation


def initialize connection, isolation, commit_options = nil
@connection = connection
@isolation = isolation
@committable = ![:read_only, :pdml].include?(isolation) && !isolation.is_a?(Hash)
@state = :INITIALIZED
@sequence_number = 0
@mutations = []
@commit_options = commit_options
end

def active?
Expand Down Expand Up @@ -95,14 +99,23 @@ def next_sequence_number
@sequence_number += 1 if @committable
end

# Sets the commit options for this transaction.
# This is used to set the options for the commit RPC, such as return_commit_stats and max_commit_delay.
def set_commit_options options # rubocop:disable Naming/AccessorMethodName
@commit_options = options&.dup
end

def commit
raise "This transaction is not active" unless active?

begin
# Start a transaction with an explicit BeginTransaction RPC if the transaction only contains mutations.
force_begin_read_write if @committable && [email protected]? && !@grpc_transaction

@connection.session.commit_transaction @grpc_transaction, @mutations if @committable && @grpc_transaction
if @committable && @grpc_transaction
@connection.session.commit_transaction @grpc_transaction,
@mutations,
commit_options: commit_options
end
@state = :COMMITTED
rescue Google::Cloud::NotFoundError => e
if @connection.session_not_found? e
Expand Down
5 changes: 3 additions & 2 deletions lib/spanner_client_ext.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ def create_session instance_id, database_id, labels: nil
end

class Session
def commit_transaction transaction, mutations = []
def commit_transaction transaction, mutations = [], commit_options: nil
ensure_service!

resp = service.commit(
path,
mutations,
transaction_id: transaction.transaction_id
transaction_id: transaction.transaction_id,
commit_options: commit_options
)
@last_updated_at = Time.now
Convert.timestamp_to_time resp.commit_timestamp
Expand Down
10 changes: 10 additions & 0 deletions test/activerecord_spanner_adapter/transaction_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ def test_rollback
assert_equal :ROLLED_BACK, transaction.state
end

def test_commit_options
transaction.begin
transaction.set_commit_options return_commit_stats: true, max_commit_delay: 1000
transaction.commit
assert_equal :COMMITTED, transaction.state
commit_options = transaction.commit_options
assert commit_options[:return_commit_stats]
assert_equal 1000, commit_options[:max_commit_delay]
end

def test_no_nested_transactions
transaction.begin

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,25 @@ def test_read_write_transaction_uses_inlined_begin
}
end

def test_read_write_transaction_with_commit_options
insert_sql = register_insert_singer_result
options_to_test = { return_commit_stats: true, max_commit_delay: 1000 }
# Start a transaction, passing the commit_options.
ActiveRecord::Base.transaction commit_options: options_to_test do
Singer.create(first_name: "Test", last_name: "User")
end
# Find the CommitRequest sent to the mock server.
commit_requests = @mock.requests.select { |req| req.is_a?(Google::Cloud::Spanner::V1::CommitRequest) }
assert_equal 1, commit_requests.length
commit_request = commit_requests.first
refute_nil commit_request

# Assert that the commit_options are present and have the correct values.
assert_equal true, commit_request.return_commit_stats
refute_nil commit_request.max_commit_delay
assert_equal 1, commit_request.max_commit_delay.seconds
end

def test_read_write_transaction_aborted_dml_is_automatically_retried_with_inline_begin
insert_sql = register_insert_singer_result

Expand Down