Skip to content

Commit 9fdb62a

Browse files
authored
MOTOR-1136 Ability to create and manage Atlas search indexes in the database (#220)
1 parent 332360b commit 9fdb62a

File tree

9 files changed

+242
-34
lines changed

9 files changed

+242
-34
lines changed

motor/core.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,92 @@ def watch(
722722
show_expanded_events,
723723
)
724724

725+
async def _cursor_command(
726+
self,
727+
command,
728+
value=1,
729+
read_preference=None,
730+
codec_options=None,
731+
session=None,
732+
comment=None,
733+
max_await_time_ms=None,
734+
**kwargs,
735+
):
736+
"""Issue a MongoDB command and parse the response as a cursor.
737+
738+
If the response from the server does not include a cursor field, an error will be thrown.
739+
740+
Otherwise, behaves identically to issuing a normal MongoDB command.
741+
742+
:Parameters:
743+
- `command`: document representing the command to be issued,
744+
or the name of the command (for simple commands only).
745+
746+
.. note:: the order of keys in the `command` document is
747+
significant (the "verb" must come first), so commands
748+
which require multiple keys (e.g. `findandmodify`)
749+
should use an instance of :class:`~bson.son.SON` or
750+
a string and kwargs instead of a Python `dict`.
751+
752+
- `value` (optional): value to use for the command verb when
753+
`command` is passed as a string
754+
- `read_preference` (optional): The read preference for this
755+
operation. See :mod:`~pymongo.read_preferences` for options.
756+
If the provided `session` is in a transaction, defaults to the
757+
read preference configured for the transaction.
758+
Otherwise, defaults to
759+
:attr:`~pymongo.read_preferences.ReadPreference.PRIMARY`.
760+
- `codec_options`: A :class:`~bson.codec_options.CodecOptions`
761+
instance.
762+
- `session` (optional): A
763+
:class:`MotorClientSession`.
764+
- `comment` (optional): A user-provided comment to attach to future getMores for this
765+
command.
766+
- `max_await_time_ms` (optional): The number of ms to wait for more data on future getMores for this command.
767+
- `**kwargs` (optional): additional keyword arguments will
768+
be added to the command document before it is sent
769+
770+
.. note:: :meth:`command` does **not** obey this Database's
771+
:attr:`read_preference` or :attr:`codec_options`. You must use the
772+
``read_preference`` and ``codec_options`` parameters instead.
773+
774+
.. note:: :meth:`command` does **not** apply any custom TypeDecoders
775+
when decoding the command response.
776+
777+
.. note:: If this client has been configured to use MongoDB Stable
778+
API (see :ref:`versioned-api-ref`), then :meth:`command` will
779+
automatically add API versioning options to the given command.
780+
Explicitly adding API versioning options in the command and
781+
declaring an API version on the client is not supported.
782+
783+
.. seealso:: The MongoDB documentation on `commands <https://dochub.mongodb.org/core/commands>`_.
784+
"""
785+
args = (command,)
786+
kwargs["value"] = value
787+
kwargs["read_preference"] = read_preference
788+
kwargs["codec_options"] = codec_options
789+
kwargs["session"] = session
790+
kwargs["comment"] = comment
791+
kwargs["max_await_time_ms"] = max_await_time_ms
792+
793+
def inner():
794+
return self.delegate.cursor_command(
795+
*unwrap_args_session(args), **unwrap_kwargs_session(kwargs)
796+
)
797+
798+
loop = self.get_io_loop()
799+
cursor = await self._framework.run_on_executor(loop, inner)
800+
801+
cursor_class = create_class_with_framework(
802+
AgnosticCommandCursor, self._framework, self.__module__
803+
)
804+
805+
return cursor_class(cursor, self)
806+
807+
# TODO: MOTOR-1169
808+
if hasattr(Database, "cursor_command"):
809+
cursor_command = _cursor_command
810+
725811
@property
726812
def client(self):
727813
"""This MotorDatabase's :class:`MotorClient`."""
@@ -805,8 +891,17 @@ class AgnosticCollection(AgnosticBaseProperties):
805891
replace_one = AsyncCommand(doc=docstrings.replace_one_doc)
806892
update_many = AsyncCommand(doc=docstrings.update_many_doc)
807893
update_one = AsyncCommand(doc=docstrings.update_one_doc)
894+
808895
with_options = DelegateMethod().wrap(Collection)
809896

897+
# TODO: MOTOR-1169
898+
if hasattr(Collection, "create_search_index"):
899+
create_search_index = AsyncCommand()
900+
create_search_indexes = AsyncCommand()
901+
drop_search_index = AsyncCommand()
902+
update_search_index = AsyncCommand()
903+
_async_list_search_indexes = AsyncRead(attr_name="list_search_indexes")
904+
810905
_async_aggregate = AsyncRead(attr_name="aggregate")
811906
_async_aggregate_raw_batches = AsyncRead(attr_name="aggregate_raw_batches")
812907
_async_list_indexes = AsyncRead(attr_name="list_indexes")
@@ -1224,6 +1319,19 @@ async def print_indexes():
12241319
# Latent cursor that will send initial command on first "async for".
12251320
return cursor_class(self, self._async_list_indexes, session=session, **kwargs)
12261321

1322+
def _list_search_indexes(self, session=None, **kwargs):
1323+
"""Return a cursor over search indexes for the current collection."""
1324+
cursor_class = create_class_with_framework(
1325+
AgnosticLatentCommandCursor, self._framework, self.__module__
1326+
)
1327+
1328+
# Latent cursor that will send initial command on first "async for".
1329+
return cursor_class(self, self._async_list_search_indexes, session=session, **kwargs)
1330+
1331+
# TODO: MOTOR-1169
1332+
if hasattr(Collection, "list_search_indexes"):
1333+
list_search_indexes = _list_search_indexes
1334+
12271335
def wrap(self, obj):
12281336
if obj.__class__ is Collection:
12291337
# Replace pymongo.collection.Collection with MotorCollection.
@@ -1670,6 +1778,32 @@ class AgnosticCommandCursor(AgnosticBaseCursor):
16701778

16711779
_CommandCursor__die = AsyncRead()
16721780

1781+
async def _try_next(self):
1782+
"""Advance the cursor without blocking indefinitely.
1783+
1784+
This method returns the next document without waiting
1785+
indefinitely for data.
1786+
1787+
If no document is cached locally then this method runs a single
1788+
getMore command. If the getMore yields any documents, the next
1789+
document is returned, otherwise, if the getMore returns no documents
1790+
(because there is no additional data) then ``None`` is returned.
1791+
1792+
:Returns:
1793+
The next document or ``None`` when no document is available
1794+
after running a single getMore or when the cursor is closed.
1795+
"""
1796+
1797+
def inner():
1798+
return self.delegate.try_next()
1799+
1800+
loop = self.get_io_loop()
1801+
return await self._framework.run_on_executor(loop, inner)
1802+
1803+
# TODO: MOTOR-1169
1804+
if hasattr(CommandCursor, "try_next"):
1805+
try_next = _try_next
1806+
16731807
def _query_flags(self):
16741808
return 0
16751809

synchro/__init__.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@
9696
from pymongo.periodic_executor import *
9797
from pymongo.periodic_executor import _EXECUTORS
9898
from pymongo.pool import *
99-
from pymongo.pool import _METADATA, Pool, SocketInfo, _PoolClosedError
99+
from pymongo.pool import _METADATA, Connection, Pool, _PoolClosedError
100100
from pymongo.read_concern import *
101101
from pymongo.read_preferences import *
102102
from pymongo.read_preferences import _ServerMode
@@ -468,6 +468,13 @@ def __init__(self, client, name, **kwargs):
468468
def client(self):
469469
return self._client
470470

471+
def cursor_command(self, *args, **kwargs):
472+
if "session" in kwargs:
473+
# Workaround for validation added in PYTHON-3228.
474+
kwargs["session"] = kwargs["session"].delegate
475+
cursor = self.synchronize(self.delegate.cursor_command)(*args, **kwargs)
476+
return CommandCursor(cursor)
477+
471478
def __getattr__(self, name):
472479
return Collection(self, name, delegate=getattr(self.delegate, name))
473480

@@ -483,6 +490,11 @@ class Collection(Synchro):
483490
aggregate = WrapOutgoing()
484491
aggregate_raw_batches = WrapOutgoing()
485492
list_indexes = WrapOutgoing()
493+
create_search_index = WrapOutgoing()
494+
create_search_indexes = WrapOutgoing()
495+
drop_search_index = WrapOutgoing()
496+
list_search_indexes = WrapOutgoing()
497+
update_search_index = WrapOutgoing()
486498
watch = WrapOutgoing()
487499
__bool__ = WrapOutgoing()
488500

@@ -618,6 +630,8 @@ def __exit__(self, exc_type, exc_val, exc_tb):
618630
class CommandCursor(Cursor):
619631
__delegate_class__ = motor.motor_tornado.MotorCommandCursor
620632

633+
try_next = Sync("try_next")
634+
621635

622636
class GridOutCursor(Cursor):
623637
__delegate_class__ = motor.motor_tornado.MotorGridOutCursor

test/asyncio_tests/test_asyncio_client.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,16 @@ async def test_reconnect_in_case_connection_closed_by_mongo(self):
107107
# lost, as result we should have AutoReconnect instead of
108108
# IncompleteReadError
109109
pool = get_primary_pool(cx)
110-
socket = pool.sockets.pop()
111-
socket.sock.close()
112-
pool.sockets.appendleft(socket)
110+
111+
# TODO: MOTOR-1169
112+
if hasattr(pool, "conns"):
113+
conn = pool.conns.pop()
114+
conn.conn.close()
115+
pool.conns.appendleft(conn)
116+
else:
117+
socket = pool.sockets.pop()
118+
socket.sock.close()
119+
pool.sockets.appendleft(socket)
113120

114121
with self.assertRaises(pymongo.errors.AutoReconnect):
115122
await cx.motor_test.test_collection.find_one()

test/asyncio_tests/test_asyncio_cursor.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -456,53 +456,58 @@ async def test_exhaust(self):
456456
# Ensure a pool.
457457
await client.db.collection.find_one()
458458

459-
socks = get_primary_pool(client).sockets
459+
# TODO: MOTOR-1169
460+
pool = get_primary_pool(client)
461+
if hasattr(pool, "conns"):
462+
conns = pool.conns
463+
else:
464+
conns = pool.sockets
460465

461466
# Make sure the socket is returned after exhaustion.
462467
cur = client[self.db.name].test.find(cursor_type=CursorType.EXHAUST)
463468
has_next = await cur.fetch_next
464469
self.assertTrue(has_next)
465-
self.assertEqual(0, len(socks))
470+
self.assertEqual(0, len(conns))
466471

467472
while await cur.fetch_next:
468473
cur.next_object()
469474

470-
self.assertEqual(1, len(socks))
475+
self.assertEqual(1, len(conns))
471476

472477
# Same as previous but with to_list instead of next_object.
473478
docs = await client[self.db.name].test.find(cursor_type=CursorType.EXHAUST).to_list(None)
474-
self.assertEqual(1, len(socks))
479+
self.assertEqual(1, len(conns))
475480
self.assertEqual((await self.db.test.count_documents({})), len(docs))
476481

477482
# If the Cursor instance is discarded before being
478483
# completely iterated we have to close and
479484
# discard the socket.
480-
sock = one(socks)
485+
conn = one(conns)
481486
cur = client[self.db.name].test.find(cursor_type=CursorType.EXHAUST).batch_size(1)
482487
await cur.fetch_next
483488
self.assertTrue(cur.next_object())
484489
# Run at least one getMore to initiate the OP_MSG exhaust protocol.
485490
if env.version.at_least(4, 2):
486491
await cur.fetch_next
487492
self.assertTrue(cur.next_object())
488-
self.assertEqual(0, len(socks))
493+
self.assertEqual(0, len(conns))
489494
if "PyPy" in sys.version:
490495
# Don't wait for GC or use gc.collect(), it's unreliable.
491496
await cur.close()
492497

493498
del cur
494499

495-
async def sock_closed():
496-
return sock not in socks and sock.closed
500+
async def conn_closed():
501+
return conn not in conns and conn.closed
497502

498503
await wait_until(
499-
sock_closed, "close exhaust cursor socket", timeout=get_async_test_timeout()
504+
conn_closed, "close exhaust cursor socket", timeout=get_async_test_timeout()
500505
)
501506

502507
# The exhaust cursor's socket was discarded, although another may
503508
# already have been opened to send OP_KILLCURSORS.
504-
self.assertNotIn(sock, socks)
505-
self.assertTrue(sock.closed)
509+
self.assertNotIn(conn, conns)
510+
self.assertTrue(conn.closed)
506511

507512
@asyncio_test
508513
async def test_close_with_docs_in_batch(self):

test/asyncio_tests/test_asyncio_database.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
"""Test AsyncIOMotorDatabase."""
1616

1717
import unittest
18-
from test import env
18+
from test import SkipTest, env
1919
from test.asyncio_tests import AsyncIOTestCase, asyncio_test
2020

2121
import pymongo.database
@@ -156,6 +156,21 @@ def test_with_options(self):
156156
self.assertEqual(db.codec_options, db2.codec_options)
157157
self.assertEqual(db.write_concern, db2.write_concern)
158158

159+
@asyncio_test
160+
async def test_cursor_command(self):
161+
db = self.db
162+
if not hasattr(pymongo.database.Database, "cursor_command"):
163+
raise SkipTest("MOTOR-1169")
164+
await db.test.drop()
165+
166+
docs = [{"_id": i, "doc": i} for i in range(3)]
167+
await db.test.insert_many(docs)
168+
169+
cursor = await db.cursor_command("find", "test")
170+
for i in range(3):
171+
item = await cursor.try_next()
172+
self.assertEqual(item, docs[i])
173+
159174

160175
if __name__ == "__main__":
161176
unittest.main()

test/tornado_tests/test_motor_client.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,13 @@ async def test_timeout(self):
200200
client.close()
201201

202202

203+
def get_conns(pool):
204+
# TODO: MOTOR-1169
205+
if hasattr(pool, "conns"):
206+
return pool.conns
207+
return pool.sockets
208+
209+
203210
class MotorClientExhaustCursorTest(MotorMockServerTest):
204211
def primary_server(self):
205212
primary = self.server()
@@ -221,7 +228,8 @@ async def _test_exhaust_query_server_error(self, rs):
221228
client = motor.MotorClient(server.uri, maxPoolSize=1)
222229
await client.admin.command("ismaster")
223230
pool = get_primary_pool(client)
224-
sock_info = one(pool.sockets)
231+
conns = get_conns(pool)
232+
conn = one(conns)
225233
cursor = client.db.collection.find(cursor_type=CursorType.EXHAUST)
226234

227235
# With Tornado, simply accessing fetch_next starts the fetch.
@@ -232,8 +240,9 @@ async def _test_exhaust_query_server_error(self, rs):
232240
with self.assertRaises(pymongo.errors.OperationFailure):
233241
await fetch_next
234242

235-
self.assertFalse(sock_info.closed)
236-
self.assertEqual(sock_info, one(pool.sockets))
243+
self.assertFalse(conn.closed)
244+
conns = get_conns(pool)
245+
self.assertEqual(conn, one(conns))
237246

238247
@gen_test
239248
async def test_exhaust_query_server_error_standalone(self):
@@ -252,7 +261,8 @@ async def _test_exhaust_query_network_error(self, rs):
252261
await client.admin.command("ismaster")
253262
pool = get_primary_pool(client)
254263
pool._check_interval_seconds = None # Never check.
255-
sock_info = one(pool.sockets)
264+
conns = get_conns(pool)
265+
conn = one(conns)
256266

257267
cursor = client.db.collection.find(cursor_type=CursorType.EXHAUST)
258268

@@ -264,9 +274,10 @@ async def _test_exhaust_query_network_error(self, rs):
264274
with self.assertRaises(pymongo.errors.ConnectionFailure):
265275
await fetch_next
266276

267-
self.assertTrue(sock_info.closed)
277+
self.assertTrue(conn.closed)
268278
del cursor
269-
self.assertNotIn(sock_info, pool.sockets)
279+
conns = get_conns(pool)
280+
self.assertNotIn(conn, conns)
270281

271282
@gen_test
272283
async def test_exhaust_query_network_error_standalone(self):

0 commit comments

Comments
 (0)