Skip to content

Commit 83618b2

Browse files
committed
refactor: add new message handler in ruby wrapper
1 parent df8fcac commit 83618b2

File tree

8 files changed

+248
-76
lines changed

8 files changed

+248
-76
lines changed

spannerlib/wrappers/spannerlib-ruby/Rakefile

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,3 @@ task :compile do
4242
end
4343

4444
task default: %i[compile spec rubocop]
45-

spannerlib/wrappers/spannerlib-ruby/lib/spannerlib/connection.rb

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# frozen_string_literal: true
1616

1717
require_relative "ffi"
18+
require_relative "rows"
1819

1920
class Connection
2021
attr_reader :pool_id, :conn_id
@@ -35,8 +36,7 @@ def write_mutations(mutation_group)
3536
else
3637
mutation_group.to_s
3738
end
38-
39-
SpannerLib.write_mutations(@pool_id, @conn_id, req_bytes)
39+
SpannerLib.write_mutations(@pool_id, @conn_id, req_bytes, proto_klass: Google::Cloud::Spanner::V1::CommitResponse)
4040
end
4141

4242
# Begin a read/write transaction on this connection. Accepts TransactionOptions proto or bytes.
@@ -52,7 +52,7 @@ def begin_transaction(transaction_options = nil)
5252

5353
# Commit the current transaction. Returns CommitResponse bytes or nil.
5454
def commit
55-
SpannerLib.commit(@pool_id, @conn_id)
55+
SpannerLib.commit(@pool_id, @conn_id, proto_klass: Google::Cloud::Spanner::V1::CommitResponse)
5656
end
5757

5858
# Rollback the current transaction.
@@ -68,7 +68,8 @@ def execute(request)
6868
else
6969
request.is_a?(String) ? request : request.to_s
7070
end
71-
SpannerLib.execute(@pool_id, @conn_id, bytes)
71+
rows_id = SpannerLib.execute(@pool_id, @conn_id, bytes)
72+
SpannerLib::Rows.new(self, rows_id)
7273
end
7374

7475
# Execute batch DML/DDL request. Returns ExecuteBatchDmlResponse bytes (or nil).
@@ -78,24 +79,8 @@ def execute_batch(request)
7879
else
7980
request.is_a?(String) ? request : request.to_s
8081
end
81-
SpannerLib.execute_batch(@pool_id, @conn_id, bytes)
82-
end
83-
84-
# Rows helpers — return raw message bytes (caller should parse them).
85-
def metadata(rows_id)
86-
SpannerLib.metadata(@pool_id, @conn_id, rows_id)
87-
end
88-
89-
def next_rows(rows_id, num_rows, encoding = 0)
90-
SpannerLib.next(@pool_id, @conn_id, rows_id, num_rows, encoding)
91-
end
92-
93-
def result_set_stats(rows_id)
94-
SpannerLib.result_set_stats(@pool_id, @conn_id, rows_id)
95-
end
9682

97-
def close_rows(rows_id)
98-
SpannerLib.close_rows(@pool_id, @conn_id, rows_id)
83+
SpannerLib.execute_batch(@pool_id, @conn_id, bytes, proto_klass: Google::Cloud::Spanner::V1::ExecuteBatchDmlResponse)
9984
end
10085

10186
# Closes this connection. Any active transaction on the connection is rolled back.

spannerlib/wrappers/spannerlib-ruby/lib/spannerlib/ffi.rb

Lines changed: 14 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
require "google/rpc/status_pb"
2424

2525
require "ffi"
26+
require_relative "message_handler"
2627

2728
module SpannerLib
2829
extend FFI::Library
@@ -128,45 +129,24 @@ def self.ensure_release(message)
128129
end
129130
end
130131

131-
def self.handle_object_id_response(message, func_name)
132+
def self.handle_object_id_response(message, _func_name)
132133
ensure_release(message) do
133-
if message[:code] != 0
134-
error_msg = read_error_message(message)
135-
raise "#{func_name} failed with code #{message[:code]}: #{error_msg}"
136-
end
137-
message[:objectId]
134+
MessageHandler.new(message).object_id
138135
end
139136
end
140137

141-
def self.handle_status_response(message, func_name)
138+
def self.handle_status_response(message, _func_name)
142139
ensure_release(message) do
143-
if message[:code] != 0
144-
error_msg = read_error_message(message)
145-
raise "#{func_name} failed with code #{message[:code]}: #{error_msg}"
146-
end
140+
MessageHandler.new(message).throw_if_error!
147141
end
148142
nil
149143
end
150144

151-
# rubocop:disable Metrics/MethodLength
152-
def self.handle_data_response(message, func_name)
145+
def self.handle_data_response(message, _func_name, proto_klass: nil)
153146
ensure_release(message) do
154-
if message[:code] != 0
155-
error_msg = read_error_message(message)
156-
raise "#{func_name} failed with code #{message[:code]}: #{error_msg}"
157-
end
158-
159-
len = message[:length]
160-
ptr = message[:pointer]
161-
162-
if len.positive? && !ptr.null?
163-
ptr.read_bytes(len)
164-
else
165-
""
166-
end
147+
MessageHandler.new(message).data(proto_klass: proto_klass)
167148
end
168149
end
169-
# rubocop:enable Metrics/MethodLength
170150

171151
# rubocop:disable Metrics/MethodLength
172152
def self.read_error_message(message)
@@ -187,10 +167,10 @@ def self.read_error_message(message)
187167
end
188168
# rubocop:enable Metrics/MethodLength
189169

190-
def self.write_mutations(pool_id, conn_id, proto_bytes)
170+
def self.write_mutations(pool_id, conn_id, proto_bytes, proto_klass: nil)
191171
with_gobytes(proto_bytes) do |gobytes|
192172
message = WriteMutations(pool_id, conn_id, gobytes)
193-
handle_data_response(message, "WriteMutations")
173+
handle_data_response(message, "WriteMutations", proto_klass: proto_klass)
194174
end
195175
end
196176

@@ -201,9 +181,9 @@ def self.begin_transaction(pool_id, conn_id, proto_bytes)
201181
end
202182
end
203183

204-
def self.commit(pool_id, conn_id)
184+
def self.commit(pool_id, conn_id, proto_klass: nil)
205185
message = Commit(pool_id, conn_id)
206-
handle_data_response(message, "Commit")
186+
handle_data_response(message, "Commit", proto_klass: proto_klass)
207187
end
208188

209189
def self.rollback(pool_id, conn_id)
@@ -218,10 +198,11 @@ def self.execute(pool_id, conn_id, proto_bytes)
218198
end
219199
end
220200

221-
def self.execute_batch(pool_id, conn_id, proto_bytes)
201+
def self.execute_batch(pool_id, conn_id, proto_bytes, options = {})
202+
proto_klass = options[:proto_klass]
222203
with_gobytes(proto_bytes) do |gobytes|
223204
message = ExecuteBatch(pool_id, conn_id, gobytes)
224-
handle_data_response(message, "ExecuteBatch")
205+
handle_data_response(message, "ExecuteBatch", proto_klass: proto_klass)
225206
end
226207
end
227208

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# frozen_string_literal: true
2+
3+
# lib/spannerlib/message_handler.rb
4+
5+
require "spannerlib/exceptions"
6+
7+
module SpannerLib
8+
class MessageHandler
9+
def initialize(message)
10+
@message = message
11+
end
12+
13+
def object_id
14+
throw_if_error!
15+
@message[:objectId]
16+
end
17+
18+
# Returns the data payload from the message.
19+
# If a proto_klass is provided, it decodes the bytes into a Protobuf object.
20+
# Otherwise, it returns the raw bytes as a string.
21+
def data(proto_klass: nil)
22+
throw_if_error!
23+
24+
len = @message[:length]
25+
ptr = @message[:pointer]
26+
27+
return (proto_klass ? proto_klass.new : "") unless len.positive? && !ptr.null?
28+
29+
bytes = ptr.read_string(len)
30+
31+
proto_klass ? proto_klass.decode(bytes) : bytes
32+
end
33+
34+
def throw_if_error!
35+
code = @message[:code]
36+
return if code.zero?
37+
38+
error_msg = SpannerLib.read_error_message(@message)
39+
raise SpannerLibException, "Call failed with code #{code}: #{error_msg}"
40+
end
41+
end
42+
end
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# frozen_string_literal: true
2+
3+
module SpannerLib
4+
class Rows
5+
include Enumerable
6+
7+
attr_reader :id, :connection
8+
9+
def initialize(connection, rows_id)
10+
@connection = connection
11+
@id = rows_id
12+
@closed = false
13+
end
14+
15+
def each
16+
return enum_for(:each) unless block_given?
17+
18+
while (row = self.next)
19+
yield row
20+
end
21+
ensure
22+
close
23+
end
24+
25+
def next
26+
return nil if @closed
27+
28+
row_data = SpannerLib.next(connection.pool_id, connection.conn_id, id, 1, 0)
29+
30+
if row_data.nil? || row_data.empty? || (row_data.respond_to?(:values) && row_data.values.empty?)
31+
close
32+
return nil
33+
end
34+
35+
row_data
36+
end
37+
38+
def metadata
39+
SpannerLib.metadata(connection.pool_id, connection.conn_id, id)
40+
end
41+
42+
def result_set_stats
43+
SpannerLib.result_set_stats(connection.pool_id, connection.conn_id, id)
44+
end
45+
46+
def close
47+
return if @closed
48+
49+
SpannerLib.close_rows(connection.pool_id, connection.conn_id, id)
50+
@closed = true
51+
end
52+
end
53+
end
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# frozen_string_literal: true
16+
17+
require "spec_helper"
18+
require "google/cloud/spanner/v1"
19+
20+
RSpec.describe "Batch API test", :integration do
21+
before(:all) do
22+
@emulator_host = ENV.fetch("SPANNER_EMULATOR_HOST", nil)
23+
skip "SPANNER_EMULATOR_HOST not set" unless @emulator_host && !@emulator_host.empty?
24+
25+
require "spannerlib/pool"
26+
@dsn = "projects/your-project-id/instances/test-instance/databases/test-database?autoConfigEmulator=true"
27+
28+
pool = Pool.create_pool(@dsn)
29+
conn = pool.create_connection
30+
ddl_batch_req = Google::Cloud::Spanner::V1::ExecuteBatchDmlRequest.new(
31+
statements: [
32+
Google::Cloud::Spanner::V1::ExecuteBatchDmlRequest::Statement.new(sql: "DROP TABLE IF EXISTS test_table"),
33+
Google::Cloud::Spanner::V1::ExecuteBatchDmlRequest::Statement.new(
34+
sql: "CREATE TABLE test_table (id INT64 NOT NULL, name STRING(100)) PRIMARY KEY(id)"
35+
)
36+
]
37+
)
38+
conn.execute_batch(ddl_batch_req)
39+
conn.close
40+
pool.close
41+
end
42+
43+
before do
44+
@pool = Pool.create_pool(@dsn)
45+
@conn = @pool.create_connection
46+
delete_req = Google::Cloud::Spanner::V1::BatchWriteRequest::MutationGroup.new(
47+
mutations: [
48+
Google::Cloud::Spanner::V1::Mutation.new(
49+
delete: Google::Cloud::Spanner::V1::Mutation::Delete.new(
50+
table: "test_table",
51+
key_set: Google::Cloud::Spanner::V1::KeySet.new(all: true)
52+
)
53+
)
54+
]
55+
)
56+
@conn.write_mutations(delete_req)
57+
end
58+
59+
after do
60+
@conn.close
61+
@pool.close
62+
end
63+
64+
it "tests a batch DML request" do
65+
dml_batch_req = Google::Cloud::Spanner::V1::ExecuteBatchDmlRequest.new(
66+
statements: [
67+
Google::Cloud::Spanner::V1::ExecuteBatchDmlRequest::Statement.new(
68+
sql: "INSERT INTO test_table (id, name) VALUES (1, 'name1')"
69+
),
70+
Google::Cloud::Spanner::V1::ExecuteBatchDmlRequest::Statement.new(
71+
sql: "INSERT INTO test_table (id, name) VALUES (2, 'name2')"
72+
),
73+
Google::Cloud::Spanner::V1::ExecuteBatchDmlRequest::Statement.new(
74+
sql: "UPDATE test_table SET name='name3' WHERE id=1"
75+
)
76+
]
77+
)
78+
resp = @conn.execute_batch(dml_batch_req)
79+
expect(resp.result_sets.length).to eq 3
80+
81+
select_req = Google::Cloud::Spanner::V1::ExecuteSqlRequest.new(
82+
sql: "SELECT id, name FROM test_table ORDER BY id"
83+
)
84+
rows = @conn.execute(select_req)
85+
all_rows = rows.map { |row_bytes| Google::Protobuf::ListValue.decode(row_bytes) }
86+
87+
expect(all_rows.length).to eq 2
88+
expect(all_rows[0].values[0].string_value).to eq "1"
89+
expect(all_rows[0].values[1].string_value).to eq "name3"
90+
expect(all_rows[1].values[0].string_value).to eq "2"
91+
expect(all_rows[1].values[1].string_value).to eq "name2"
92+
end
93+
94+
it "tests a batch DDL request" do
95+
ddl_batch_req = Google::Cloud::Spanner::V1::ExecuteBatchDmlRequest.new(
96+
statements: [
97+
Google::Cloud::Spanner::V1::ExecuteBatchDmlRequest::Statement.new(
98+
sql: "DROP TABLE IF EXISTS test_table"
99+
),
100+
Google::Cloud::Spanner::V1::ExecuteBatchDmlRequest::Statement.new(
101+
sql: "CREATE TABLE test_table (key INT64 NOT NULL, data STRING(MAX)) PRIMARY KEY(key)"
102+
)
103+
]
104+
)
105+
106+
expect { @conn.execute_batch(ddl_batch_req) }.not_to raise_error
107+
108+
insert_req = Google::Cloud::Spanner::V1::BatchWriteRequest::MutationGroup.new(
109+
mutations: [
110+
Google::Cloud::Spanner::V1::Mutation.new(
111+
insert: Google::Cloud::Spanner::V1::Mutation::Write.new(
112+
table: "test_table",
113+
columns: %w[key data],
114+
values: [
115+
Google::Protobuf::ListValue.new(values: [
116+
Google::Protobuf::Value.new(string_value: "101"),
117+
Google::Protobuf::Value.new(string_value: "VerificationData")
118+
])
119+
]
120+
)
121+
)
122+
]
123+
)
124+
expect { @conn.write_mutations(insert_req) }.not_to raise_error
125+
end
126+
end

0 commit comments

Comments
 (0)