Skip to content

Commit 506d925

Browse files
author
Chris Rossi
authored
fix: refactor transactions to use their own event loops (#443)
Referring to issue #426, the problem ultimately turned out to be that we could fall out of the transaction scope and trigger a commit while there is still work left queued on the event loop, including, in this case, the tasklet that would eventually schedule the call to delete, causing the delete to never actually happen. The fix is to go ahead and consume the event loop queues before scheduling the call to COMMIT. However, if there are other tasks happening in parallel, this can really mess with the natural sequence of events in ways that can cause things to blow up. (All of the `parallel_transaction` tests in `tests/system/test_misc.py` for instance, will fail.) The fix for that is to give each transaction its own event loop, so that when it calls `_eventloop.run` prior to commit, it is only flushing tasks that pertain to it. Fixes #426
1 parent 0387288 commit 506d925

File tree

13 files changed

+271
-123
lines changed

13 files changed

+271
-123
lines changed

packages/google-cloud-ndb/google/cloud/ndb/_batch.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414

1515
"""Support for batching operations."""
1616

17-
from google.cloud.ndb import _eventloop
18-
1917

2018
def get_batch(batch_cls, options=None):
2119
"""Gets a data structure for storing batched calls to Datastore Lookup.
@@ -68,5 +66,5 @@ def idle():
6866
return idle
6967

7068
batches[options_key] = batch = batch_cls(options)
71-
_eventloop.add_idle(idler(batch))
69+
context.eventloop.add_idle(idler(batch))
7270
return batch

packages/google-cloud-ndb/google/cloud/ndb/_datastore_api.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,19 @@ def commit_callback(rpc):
519519
rpc.add_done_callback(commit_callback)
520520

521521

522+
def prepare_to_commit(transaction):
523+
"""Signal that we're ready to commit a transaction.
524+
525+
Currently just used to signal to the commit batch that we're not going to
526+
need to call `AllocateIds`, because we're ready to commit now.
527+
528+
Args:
529+
transaction (bytes): The transaction id about to be committed.
530+
"""
531+
batch = _get_commit_batch(transaction, _options.Options())
532+
batch.preparing_to_commit = True
533+
534+
522535
def commit(transaction, retries=None, timeout=None):
523536
"""Commit a transaction.
524537
@@ -605,6 +618,7 @@ def __init__(self, transaction, options):
605618
self.allocating_ids = []
606619
self.incomplete_mutations = []
607620
self.incomplete_futures = []
621+
self.preparing_to_commit = False
608622

609623
def put(self, entity_pb):
610624
"""Add an entity to batch to be stored.
@@ -657,8 +671,9 @@ def delete(self, key):
657671

658672
def idle_callback(self):
659673
"""Call AllocateIds on any incomplete keys in the batch."""
660-
if not self.incomplete_mutations:
661-
# This will happen if `commit` is called first.
674+
# If there are no incomplete mutations, or if we're already preparing
675+
# to commit, there's no need to allocate ids.
676+
if self.preparing_to_commit or not self.incomplete_mutations:
662677
return
663678

664679
# Signal to a future commit that there is an id allocation in
@@ -728,11 +743,6 @@ def commit(self, retries=None, timeout=None):
728743
if not future.done():
729744
yield future
730745

731-
# Head off making any more AllocateId calls. Any remaining incomplete
732-
# keys will get ids as part of the Commit call.
733-
self.incomplete_mutations = []
734-
self.incomplete_futures = []
735-
736746
future = tasklets.Future("Commit")
737747
futures = self.futures
738748

packages/google-cloud-ndb/google/cloud/ndb/_eventloop.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
"queue_call",
3535
"queue_rpc",
3636
"run",
37-
"run0",
3837
"run1",
3938
]
4039

@@ -396,13 +395,7 @@ def run():
396395
loop.run()
397396

398397

399-
def run0():
400-
"""Calls :method:`EventLoop.run0` on current event loop."""
401-
loop = get_event_loop()
402-
loop.run0()
403-
404-
405398
def run1():
406399
"""Calls :method:`EventLoop.run1` on current event loop."""
407400
loop = get_event_loop()
408-
loop.run1()
401+
return loop.run1()

packages/google-cloud-ndb/google/cloud/ndb/_transaction.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,15 +130,36 @@ def _transaction_async(context, callback, read_only=False):
130130
tx_context = context.new(
131131
transaction=transaction_id,
132132
on_commit_callbacks=on_commit_callbacks,
133-
cache=None, # Use new, empty cache for transaction
133+
batches=None,
134+
commit_batches=None,
135+
cache=None,
136+
# We could just pass `None` here and let the `Context` constructor
137+
# instantiate a new event loop, but our unit tests inject a subclass of
138+
# `EventLoop` that makes testing a little easier. This makes sure the
139+
# new event loop is of the same type as the current one, to propagate
140+
# the event loop class used for testing.
141+
eventloop=type(context.eventloop)(),
134142
)
143+
144+
# The outer loop is dependent on the inner loop
145+
def run_inner_loop(inner_context):
146+
with inner_context.use():
147+
if inner_context.eventloop.run1():
148+
return True # schedule again
149+
150+
context.eventloop.add_idle(run_inner_loop, tx_context)
151+
135152
with tx_context.use():
136153
try:
137154
# Run the callback
138155
result = callback()
139156
if isinstance(result, tasklets.Future):
140157
result = yield result
141158

159+
# Make sure we've run everything we can run before calling commit
160+
_datastore_api.prepare_to_commit(transaction_id)
161+
tx_context.eventloop.run()
162+
142163
# Commit the transaction
143164
yield _datastore_api.commit(transaction_id, retries=0)
144165

packages/google-cloud-ndb/google/cloud/ndb/tasklets.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,10 @@ def wait(self):
123123
after a call to this method.
124124
"""
125125
while not self._done:
126-
_eventloop.run1()
126+
if not _eventloop.run1():
127+
raise RuntimeError(
128+
"Eventloop is exhausted with unfinished futures."
129+
)
127130

128131
def check_success(self):
129132
"""Check whether a future has completed without raising an exception.
@@ -348,16 +351,20 @@ def done_callback(yielded):
348351

349352
error = yielded.exception()
350353
if error:
351-
_eventloop.call_soon(self._advance_tasklet, error=error)
354+
self.context.eventloop.call_soon(
355+
self._advance_tasklet, error=error
356+
)
352357
else:
353-
_eventloop.call_soon(self._advance_tasklet, yielded.result())
358+
self.context.eventloop.call_soon(
359+
self._advance_tasklet, yielded.result()
360+
)
354361

355362
if isinstance(yielded, Future):
356363
yielded.add_done_callback(done_callback)
357364
self.waiting_on = yielded
358365

359366
elif isinstance(yielded, _remote.RemoteCall):
360-
_eventloop.queue_rpc(yielded, done_callback)
367+
self.context.eventloop.queue_rpc(yielded, done_callback)
361368
self.waiting_on = yielded
362369

363370
elif isinstance(yielded, (list, tuple)):
@@ -515,7 +522,10 @@ def wait_any(futures):
515522
if future.done():
516523
return future
517524

518-
_eventloop.run1()
525+
if not _eventloop.run1():
526+
raise RuntimeError(
527+
"Eventloop is exhausted with unfinished futures."
528+
)
519529

520530

521531
def wait_all(futures):

packages/google-cloud-ndb/tests/system/__init__.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import functools
16+
import operator
1517
import time
1618

1719
KIND = "SomeKind"
@@ -61,3 +63,23 @@ def eventually(f, predicate, timeout=120, interval=2):
6163
time.sleep(interval)
6264

6365
assert predicate(value)
66+
67+
68+
def length_equals(n):
69+
"""Returns predicate that returns True if passed a sequence of length `n`.
70+
71+
For use with `eventually`.
72+
"""
73+
74+
def predicate(sequence):
75+
return len(sequence) == n
76+
77+
return predicate
78+
79+
80+
def equals(n):
81+
"""Returns predicate that returns True if passed `n`.
82+
83+
For use with `eventually`.
84+
"""
85+
return functools.partial(operator.eq, n)

packages/google-cloud-ndb/tests/system/test_crud.py

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
System tests for Create, Update, Delete. (CRUD)
1717
"""
1818
import datetime
19-
import functools
20-
import operator
2119
import os
2220
import pickle
2321
import random
@@ -37,15 +35,11 @@
3735
from google.cloud.ndb import _cache
3836
from google.cloud.ndb import global_cache as global_cache_module
3937

40-
from tests.system import KIND, eventually
38+
from tests.system import KIND, eventually, equals
4139

4240
USE_REDIS_CACHE = bool(os.environ.get("REDIS_CACHE_URL"))
4341

4442

45-
def _equals(n):
46-
return functools.partial(operator.eq, n)
47-
48-
4943
@pytest.mark.usefixtures("client_context")
5044
def test_retrieve_entity(ds_entity):
5145
entity_id = test_utils.system.unique_resource_id()
@@ -526,7 +520,7 @@ class SomeKind(ndb.Model):
526520
# Sneaky. Delete entity out from under cache so we know we're getting
527521
# cached copy.
528522
key.delete()
529-
eventually(key.get, _equals(None))
523+
eventually(key.get, equals(None))
530524

531525
retrieved = key.get()
532526
assert retrieved.foo == 42
@@ -772,6 +766,29 @@ def delete_entity():
772766
assert key.get() is None
773767

774768

769+
def test_delete_entity_in_transaction_with_global_cache(
770+
client_context, ds_entity
771+
):
772+
"""Regression test for #426
773+
774+
https://github.com/googleapis/python-ndb/issues/426
775+
"""
776+
777+
class SomeKind(ndb.Model):
778+
foo = ndb.IntegerProperty()
779+
780+
entity_id = test_utils.system.unique_resource_id()
781+
ds_entity(KIND, entity_id, foo=42)
782+
783+
global_cache = global_cache_module._InProcessGlobalCache()
784+
with client_context.new(global_cache=global_cache).use():
785+
key = ndb.Key(KIND, entity_id)
786+
assert key.get().foo == 42
787+
788+
ndb.transaction(key.delete)
789+
assert key.get() is None
790+
791+
775792
@pytest.mark.usefixtures("client_context")
776793
def test_delete_entity_in_transaction_then_rollback(ds_entity):
777794
entity_id = test_utils.system.unique_resource_id()

packages/google-cloud-ndb/tests/system/test_misc.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222

2323
from google.cloud import ndb
2424

25+
from tests.system import eventually, length_equals
26+
2527
USE_REDIS_CACHE = bool(os.environ.get("REDIS_CACHE_URL"))
2628

2729

@@ -271,3 +273,25 @@ def update(id, add, fail=False):
271273

272274
entity = SomeKind.get_by_id(id)
273275
assert entity.foo == 142
276+
277+
278+
@pytest.mark.usefixtures("client_context")
279+
def test_insert_entity_in_transaction_without_preallocating_id(dispose_of):
280+
class SomeKind(ndb.Model):
281+
foo = ndb.IntegerProperty()
282+
bar = ndb.StringProperty()
283+
284+
def save_entity():
285+
# By not waiting on the Future, we don't force a call to AllocateIds
286+
# before the transaction is committed.
287+
SomeKind(foo=42, bar="none").put_async()
288+
289+
ndb.transaction(save_entity)
290+
291+
query = SomeKind.query()
292+
eventually(query.fetch, length_equals(1))
293+
retrieved = query.fetch()[0]
294+
dispose_of(retrieved._key._key)
295+
296+
assert retrieved.foo == 42
297+
assert retrieved.bar == "none"

0 commit comments

Comments
 (0)