Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions acceptance/cases/transactions/read_write_transactions_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,35 @@ def test_create_record_with_sequence_using_mutations
TableWithSequence.connection.use_client_side_id_for_mutations = reset_value
end
end

def test_single_dml_succeeds_with_fallback_to_pdml_enabled
# This test verifies that a normal, successful DML statement works as
# expected when the fallback isolation is enabled. Because no mutation
# limit error occurs, the fallback to PDML should NOT be triggered.
create_test_records
assert_equal 1, Author.count

Author.transaction isolation: :fallback_to_pdml do
Author.where(name: "David").delete_all
end

assert_equal 0, Author.count, "The record should have been deleted"
end

def test_other_errors_do_not_trigger_fallback
# This test ensures that if a transaction with fallback enabled fails
# for a reason OTHER than the mutation limit, it fails normally and
# does not attempt to fall back to PDML.
create_test_records
initial_author_count = Author.count

assert_raises ActiveRecord::StatementInvalid do
Author.transaction isolation: :fallback_to_pdml do
Author.create! name: nil
end
end
assert_equal initial_author_count, Author.count, "Transaction should have rolled back"
end
end
end
end
31 changes: 31 additions & 0 deletions examples/snippets/partitioned-dml/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,37 @@ def self.run
puts "Deleted #{count} albums"
end

puts ""
puts "Deleting all singers in the database using normal Read-Write transaction with PDML fallback"
#
# This example demonstrates using `isolation: :fallback_to_pdml`.
#
# --- HOW IT WORKS ---
# 1. Initial Attempt: The transaction starts as a normal, atomic, read-write transaction.
#
# 2. The Trigger: If that transaction fails with a `TransactionMutationLimitExceededError`,
# the adapter automatically catches the error.
#
# 3. The Fallback: The adapter then retries the ENTIRE code block in a new,
# non-atomic Partitioned DML (PDML) transaction.
#
# --- USAGE REQUIREMENTS ---
# This implementation retries the whole transaction block without checking its contents.
# The user of this feature is responsible for ensuring the following:
#
# 1. SINGLE DML STATEMENT: The block should contain only ONE DML statement.
# If it contains more, the PDML retry will fail with a low-level `seqno` error.
#
# 2. IDEMPOTENCY: The DML statement must be idempotent. See https://cloud.google.com/spanner/docs/dml-partitioned#partitionable-idempotent for more information. # rubocop:disable Layout/LineLength
#
# 3. NON-ATOMIC: The retried PDML transaction is NOT atomic. Do not use this
# for multi-step operations that must all succeed or fail together.
#
Singer.transaction isolation: :fallback_to_pdml do
count = Singer.delete_all
puts "Deleted #{count} singers"
end

puts ""
puts "Deleting all singers in the database using Partitioned DML"
Singer.transaction isolation: :pdml do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
# frozen_string_literal: true

require "active_record/gem_version"
require "active_record/connection_adapters/spanner/errors/transaction_mutation_limit_exceeded_error"

module ActiveRecord
module ConnectionAdapters
module Spanner
module DatabaseStatements
VERSION_7_1_0 = Gem::Version.create "7.1.0"
RequestOptions = Google::Cloud::Spanner::V1::RequestOptions
TransactionMutationLimitExceededError = Google::Cloud::Spanner::Errors::TransactionMutationLimitExceededError

# DDL, DML and DQL Statements

Expand Down Expand Up @@ -72,10 +74,16 @@ def execute_query_or_dml statement_type, sql, name, binds
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
if transaction_required
transaction do
@connection.execute_query sql, params: params, types: types, request_options: request_options
@connection.execute_query sql,
params: params,
types: types,
request_options: request_options
end
else
@connection.execute_query sql, params: params, types: types, single_use_selector: selector,
@connection.execute_query sql,
params: params,
types: types,
single_use_selector: selector,
request_options: request_options
end
end
Expand Down Expand Up @@ -229,7 +237,7 @@ def execute_ddl statements

# Transaction

def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs
def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs, &block # rubocop:disable Metrics/PerceivedComplexity
commit_options = kwargs.delete :commit_options
exclude_from_streams = kwargs.delete :exclude_txn_from_change_streams
@_spanner_begin_transaction_options = {
Expand All @@ -254,8 +262,14 @@ def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs
if err.cause.is_a? Google::Cloud::AbortedError
sleep(delay_from_aborted(err) || (backoff *= 1.3))
retry
elsif TransactionMutationLimitExceededError.is_mutation_limit_error? err.cause
is_fallback_enabled = isolation == :fallback_to_pdml
raise unless is_fallback_enabled
@_spanner_begin_transaction_options[:isolation] = :pdml
retry
else
raise
end
raise
ensure
# Clean up the instance variable to avoid leaking options.
@_spanner_begin_transaction_options = nil
Expand All @@ -272,7 +286,8 @@ def transaction_isolation_levels
# These are not really isolation levels, but it is the only (best) way to pass in additional
# transaction options to the connection.
read_only: "READ_ONLY",
buffered_mutations: "BUFFERED_MUTATIONS"
buffered_mutations: "BUFFERED_MUTATIONS",
fallback_to_pdml: "FALLBACK_TO_PDML"
}
end

Expand Down Expand Up @@ -302,19 +317,22 @@ def begin_db_transaction
# (this is the same as :read_only)
#
def begin_isolated_db_transaction isolation
if isolation.is_a? Hash
raise "Unsupported isolation level: #{isolation}" unless
isolation[:timestamp] || isolation[:staleness] || isolation[:strong]
opts = @_spanner_begin_transaction_options || {}
# If isolation level is specified in the options, use that instead of the default isolation level.
isolation_option = opts[:isolation] || isolation
if isolation_option.is_a? Hash
raise "Unsupported isolation level: #{isolation_option}" unless
isolation_option[:timestamp] || isolation_option[:staleness] || isolation_option[:strong]
raise "Only one option is supported. It must be one of `timestamp`, `staleness` or `strong`." \
if isolation.count != 1
if isolation_option.count != 1
else
raise "Unsupported isolation level: #{isolation}" unless
[:serializable, :repeatable_read, :read_only, :buffered_mutations, :pdml].include? isolation
raise "Unsupported isolation level: #{isolation_option}" unless
[:serializable, :repeatable_read, :read_only, :buffered_mutations, :pdml,
:fallback_to_pdml].include? isolation_option
end

log "BEGIN #{isolation}" do
opts = @_spanner_begin_transaction_options || {}
@connection.begin_transaction isolation, **opts
log "BEGIN #{isolation_option}" do
@connection.begin_transaction isolation_option, **opts.except(:isolation)
end
end

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2025 Google LLC
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.

module Google
module Cloud
module Spanner
module Errors
# Custom exception raised when a transaction exceeds the mutation limit in Google Cloud Spanner.
# This provides a specific error class for a common, recoverable scenario.
class TransactionMutationLimitExceededError < ActiveRecord::StatementInvalid
ERROR_MESSAGE = "The transaction contains too many mutations".freeze

def self.is_mutation_limit_error? error
return false if error.nil?
error.is_a?(Google::Cloud::InvalidArgumentError) &&
error.message&.include?(ERROR_MESSAGE)
end
end
end
end
end
end
6 changes: 6 additions & 0 deletions lib/activerecord_spanner_adapter/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
require "google/cloud/spanner"
require "spanner_client_ext"
require "activerecord_spanner_adapter/information_schema"
require_relative "../active_record/connection_adapters/spanner/errors/transaction_mutation_limit_exceeded_error"

module ActiveRecordSpannerAdapter
TransactionMutationLimitExceededError = Google::Cloud::Spanner::Errors::TransactionMutationLimitExceededError

class Connection
attr_reader :instance_id
attr_reader :database_id
Expand Down Expand Up @@ -252,6 +255,9 @@ def execute_sql_request sql, converted_params, types, selector, request_options
end
raise
rescue Google::Cloud::Error => e
if TransactionMutationLimitExceededError.is_mutation_limit_error? e
raise
end
# Check if it was the first statement in a transaction that included a BeginTransaction
# option in the request. If so, execute an explicit BeginTransaction and then retry the
# request without the BeginTransaction option.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
module MockServerTests
CommitRequest = Google::Cloud::Spanner::V1::CommitRequest
ExecuteSqlRequest = Google::Cloud::Spanner::V1::ExecuteSqlRequest
BeginTransactionRequest = Google::Cloud::Spanner::V1::BeginTransactionRequest

class SpannerActiveRecordMockServerTest < BaseSpannerMockServerTest
VERSION_7_1_0 = Gem::Version.create('7.1.0')
Expand Down Expand Up @@ -192,6 +193,75 @@ def test_selects_one_singer_without_transaction
end
end

def test_update_all_on_table_with_sequence_falls_back_to_pdml
update_sql = "UPDATE `table_with_sequence` SET `name` = @p1 WHERE `table_with_sequence`.`id` = @p2"

mutation_limit_error = GRPC::InvalidArgument.new("The transaction contains too many mutations")

@mock.push_error(update_sql, mutation_limit_error)
@mock.put_statement_result(update_sql, StatementResult.new(1))

TableWithSequence.transaction isolation: :fallback_to_pdml do
TableWithSequence.where(id: 1).update_all(name: "New Foo Name")
end

# The first attempt should have failed with a TransactionMutationLimitExceededError.
# The second attempt should have succeeded with a PDML transaction.
# So we should have two requests for the same DML statement.
update_requests = @mock.requests.select { |req| req.is_a?(ExecuteSqlRequest) && req.sql == update_sql }
assert_equal 2, update_requests.length, "Should have been two attempts for the UPDATE DML"

# A PDML transaction should have been started for the final, successful attempt.
pdml_begin_request = @mock.requests.find { |req| req.is_a?(Google::Cloud::Spanner::V1::BeginTransactionRequest) && req.options&.partitioned_dml }
refute_nil pdml_begin_request, "A BeginTransactionRequest for PDML should have been sent"

fallback_request = update_requests[1]
assert fallback_request.transaction&.id, "Fallback DML should run within a transaction that has an ID (created by the PDML transaction)"
assert_nil fallback_request.transaction&.begin, "Fallback DML should use the existing PDML transaction, not begin a new one"
end

def test_no_fallback_to_pdml_on_table_with_sequence_when_disabled
update_sql = "UPDATE `table_with_sequence` SET `name` = @p1 WHERE `table_with_sequence`.`id` = @p2"

mutation_limit_error = GRPC::InvalidArgument.new("The transaction contains too many mutations")
@mock.push_error(update_sql, mutation_limit_error)

err = assert_raises ActiveRecord::StatementInvalid do
TableWithSequence.transaction do
TableWithSequence.where(id: 1).update_all(name: "This name will not be updated")
end
end

assert_kind_of Google::Cloud::InvalidArgumentError, err.cause


update_requests = @mock.requests.select { |req| req.is_a?(ExecuteSqlRequest) && req.sql == update_sql }
assert_equal 1, update_requests.length, "Should only have been two attempts for the UPDATE DML"

pdml_begin_request = @mock.requests.find { |req| req.is_a?(BeginTransactionRequest) && req.options&.partitioned_dml }
assert_nil pdml_begin_request, "No PDML transaction should have been started"
end

def test_no_fallback_to_pdml_on_table_with_sequence_when_error_is_not_valid

update_sql_regex = /UPDATE `table_with_sequence`/
other_error = GRPC::AlreadyExists.new("This is some other database error")
@mock.push_error(update_sql_regex, other_error)

err = assert_raises ActiveRecord::StatementInvalid do
TableWithSequence.transaction isolation: :fallback_to_pdml do
TableWithSequence.where(id: 1).update_all(name: "This name will not be updated")
end
end

assert_kind_of Google::Cloud::InvalidArgumentError, err.cause
update_requests = @mock.requests.select { |req| req.is_a?(ExecuteSqlRequest) && req.sql.match(update_sql_regex) }
assert_equal 2, update_requests.length, "Should only have been one attempt for the UPDATE DML"

pdml_begin_request = @mock.requests.find { |req| req.is_a?(BeginTransactionRequest) && req.options&.partitioned_dml }
assert_nil pdml_begin_request, "No PDML transaction should have been started"
end

def test_selects_singers_with_condition
# This query does not use query parameters because the where clause is specified as a string.
# ActiveRecord sees that as a SQL fragment that will disable the usage of prepared statements.
Expand Down