Skip to content

Commit 4c851aa

Browse files
authored
PYTHON-3075 [Backport-3.13] bulk_write does not apply CodecOptions to upserted_ids result (#842)
1 parent 336f69d commit 4c851aa

File tree

5 files changed

+120
-14
lines changed

5 files changed

+120
-14
lines changed

pymongo/collection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,7 @@ def _legacy_write(self, sock_info, name, cmd, op_id,
531531
sock_info.service_id)
532532
start = datetime.datetime.now()
533533
try:
534-
result = sock_info.legacy_write(rqst_id, msg, max_size, False)
534+
result = sock_info.legacy_write(rqst_id, msg, max_size, False, self.codec_options)
535535
except Exception as exc:
536536
if publish:
537537
dur = (datetime.datetime.now() - start) + duration

pymongo/message.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
_decode_selective,
3535
_dict_to_bson,
3636
_make_c_string)
37+
3738
from bson.codec_options import DEFAULT_CODEC_OPTIONS
3839
from bson.int64 import Int64
3940
from bson.raw_bson import (_inflate_bson, DEFAULT_RAW_BSON_OPTIONS,
@@ -1008,7 +1009,7 @@ def legacy_write(self, cmd, request_id, msg, max_doc_size, acknowledged,
10081009
start = datetime.datetime.now()
10091010
try:
10101011
result = self.sock_info.legacy_write(
1011-
request_id, msg, max_doc_size, acknowledged)
1012+
request_id, msg, max_doc_size, acknowledged, self.codec)
10121013
if self.publish:
10131014
duration = (datetime.datetime.now() - start) + duration
10141015
if result is not None:
@@ -1041,7 +1042,7 @@ def write_command(self, cmd, request_id, msg, docs):
10411042
self._start(cmd, request_id, docs)
10421043
start = datetime.datetime.now()
10431044
try:
1044-
reply = self.sock_info.write_command(request_id, msg)
1045+
reply = self.sock_info.write_command(request_id, msg, self.codec)
10451046
if self.publish:
10461047
duration = (datetime.datetime.now() - start) + duration
10471048
self._succeed(request_id, reply, duration)
@@ -1110,7 +1111,7 @@ def execute(self, cmd, docs, client):
11101111
batched_cmd, to_send = self._batch_command(cmd, docs)
11111112
result = self.sock_info.command(
11121113
self.db_name, batched_cmd,
1113-
codec_options=_UNICODE_REPLACE_CODEC_OPTIONS,
1114+
codec_options=self.codec,
11141115
session=self.session, client=client)
11151116
return result, to_send
11161117

@@ -1602,9 +1603,9 @@ def unpack_response(self, cursor_id=None,
16021603
return bson._decode_all_selective(
16031604
self.documents, codec_options, user_fields)
16041605

1605-
def command_response(self):
1606+
def command_response(self, codec_options):
16061607
"""Unpack a command response."""
1607-
docs = self.unpack_response()
1608+
docs = self.unpack_response(codec_options=codec_options)
16081609
assert self.number_returned == 1
16091610
return docs[0]
16101611

@@ -1672,9 +1673,9 @@ def unpack_response(self, cursor_id=None,
16721673
return bson._decode_all_selective(
16731674
self.payload_document, codec_options, user_fields)
16741675

1675-
def command_response(self):
1676+
def command_response(self, codec_options):
16761677
"""Unpack a command response."""
1677-
return self.unpack_response()[0]
1678+
return self.unpack_response(codec_options=codec_options)[0]
16781679

16791680
def raw_command_response(self):
16801681
"""Return the bytes of the command response."""

pymongo/pool.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
HAS_SNI as _HAVE_SNI,
2828
IPADDR_SAFE as _IPADDR_SAFE)
2929

30-
from bson import DEFAULT_CODEC_OPTIONS
30+
from bson import DEFAULT_CODEC_OPTIONS, codec_options
3131
from bson.py3compat import imap, itervalues, _unicode
3232
from bson.son import SON
3333
from pymongo import auth, helpers, thread_util, __version__
@@ -761,7 +761,7 @@ def _raise_if_not_writable(self, unacknowledged):
761761
raise NotPrimaryError("not primary", {
762762
"ok": 0, "errmsg": "not primary", "code": 10107})
763763

764-
def legacy_write(self, request_id, msg, max_doc_size, with_last_error):
764+
def legacy_write(self, request_id, msg, max_doc_size, with_last_error, codec_options):
765765
"""Send OP_INSERT, etc., optionally returning response as a dict.
766766
767767
Can raise ConnectionFailure or OperationFailure.
@@ -778,10 +778,11 @@ def legacy_write(self, request_id, msg, max_doc_size, with_last_error):
778778
self.send_message(msg, max_doc_size)
779779
if with_last_error:
780780
reply = self.receive_message(request_id)
781-
return helpers._check_gle_response(reply.command_response(),
781+
response = reply.command_response(codec_options)
782+
return helpers._check_gle_response(response,
782783
self.max_wire_version)
783784

784-
def write_command(self, request_id, msg):
785+
def write_command(self, request_id, msg, codec_options):
785786
"""Send "insert" etc. command, returning response as a dict.
786787
787788
Can raise ConnectionFailure or OperationFailure.
@@ -792,7 +793,7 @@ def write_command(self, request_id, msg):
792793
"""
793794
self.send_message(msg, 0)
794795
reply = self.receive_message(request_id)
795-
result = reply.command_response()
796+
result = reply.command_response(codec_options)
796797

797798
# Raises NotPrimaryError or OperationFailure.
798799
helpers._check_command_response(result, self.max_wire_version)

test/test_bulk.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,13 @@
1515
"""Test the bulk API."""
1616

1717
import sys
18+
import uuid
19+
from bson.binary import UuidRepresentation
20+
from bson.codec_options import CodecOptions
1821

1922
sys.path[0:0] = [""]
2023

24+
from bson import Binary
2125
from bson.objectid import ObjectId
2226
from pymongo.operations import *
2327
from pymongo.errors import (ConfigurationError,
@@ -361,6 +365,75 @@ def test_bulk_write_invalid_arguments(self):
361365
self.coll.bulk_write([{}])
362366

363367

368+
def test_upsert_uuid_standard(self):
369+
options = CodecOptions(uuid_representation=UuidRepresentation.STANDARD)
370+
coll = self.coll.with_options(codec_options=options)
371+
uuids = [uuid.uuid4() for _ in range(3)]
372+
result = coll.bulk_write([
373+
UpdateOne({'_id': uuids[0]}, {'$set': {'a': 0}}, upsert=True),
374+
ReplaceOne({'a': 1}, {'_id': uuids[1]}, upsert=True),
375+
# This is just here to make the counts right in all cases.
376+
ReplaceOne({'_id': uuids[2]}, {'_id': uuids[2]}, upsert=True),
377+
])
378+
self.assertEqualResponse(
379+
{'nMatched': 0,
380+
'nModified': 0,
381+
'nUpserted': 3,
382+
'nInserted': 0,
383+
'nRemoved': 0,
384+
'upserted': [{'index': 0, '_id': uuids[0]},
385+
{'index': 1, '_id': uuids[1]},
386+
{'index': 2, '_id': uuids[2]}]},
387+
result.bulk_api_result)
388+
389+
def test_upsert_uuid_unspecified(self):
390+
options = CodecOptions(uuid_representation=UuidRepresentation.UNSPECIFIED)
391+
coll = self.coll.with_options(codec_options=options)
392+
uuids = [Binary.from_uuid(uuid.uuid4()) for _ in range(3)]
393+
result = coll.bulk_write([
394+
UpdateOne({'_id': uuids[0]}, {'$set': {'a': 0}}, upsert=True),
395+
ReplaceOne({'a': 1}, {'_id': uuids[1]}, upsert=True),
396+
# This is just here to make the counts right in all cases.
397+
ReplaceOne({'_id': uuids[2]}, {'_id': uuids[2]}, upsert=True),
398+
])
399+
self.assertEqualResponse(
400+
{'nMatched': 0,
401+
'nModified': 0,
402+
'nUpserted': 3,
403+
'nInserted': 0,
404+
'nRemoved': 0,
405+
'upserted': [{'index': 0, '_id': uuids[0]},
406+
{'index': 1, '_id': uuids[1]},
407+
{'index': 2, '_id': uuids[2]}]},
408+
result.bulk_api_result)
409+
410+
def test_upsert_uuid_standard_subdocuments(self):
411+
options = CodecOptions(uuid_representation=UuidRepresentation.STANDARD)
412+
coll = self.coll.with_options(codec_options=options)
413+
ids = [
414+
{'f': i, 'f2': uuid.uuid4()}
415+
for i in range(3)
416+
]
417+
418+
result = coll.bulk_write([
419+
UpdateOne({'_id': ids[0]}, {'$set': {'a': 0}}, upsert=True),
420+
ReplaceOne({'a': 1}, {'_id': ids[1]}, upsert=True),
421+
# This is just here to make the counts right in all cases.
422+
ReplaceOne({'_id': ids[2]}, {'_id': ids[2]}, upsert=True),
423+
])
424+
425+
self.assertEqualResponse(
426+
{'nMatched': 0,
427+
'nModified': 0,
428+
'nUpserted': 3,
429+
'nInserted': 0,
430+
'nRemoved': 0,
431+
'upserted': [{'index': 0, '_id': ids[0]},
432+
{'index': 1, '_id': ids[1]},
433+
{'index': 2, '_id': ids[2]}]},
434+
result.bulk_api_result)
435+
436+
364437
class BulkAuthorizationTestBase(BulkTestBase):
365438

366439
@classmethod

test/test_encryption.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
from bson import encode, json_util
2929
from bson.binary import (Binary,
30+
UuidRepresentation,
3031
JAVA_LEGACY,
3132
STANDARD,
3233
UUID_SUBTYPE)
@@ -49,14 +50,15 @@
4950
ServerSelectionTimeoutError,
5051
WriteError)
5152
from pymongo.mongo_client import MongoClient
52-
from pymongo.operations import InsertOne
53+
from pymongo.operations import InsertOne, ReplaceOne, UpdateOne
5354
from pymongo.write_concern import WriteConcern
5455
from test.test_ssl import CA_PEM
5556

5657
from test import (unittest,
5758
client_context,
5859
IntegrationTest,
5960
PyMongoTestCase)
61+
from test.test_bulk import BulkTestBase
6062
from test.utils import (TestCreator,
6163
camel_to_snake_args,
6264
OvertCommandListener,
@@ -272,6 +274,35 @@ def test_use_after_close(self):
272274
client.admin.command('ping')
273275

274276

277+
class TestEncryptedBulkWrite(BulkTestBase, EncryptionIntegrationTest):
278+
279+
def test_upsert_uuid_standard_encrypte(self):
280+
opts = AutoEncryptionOpts(KMS_PROVIDERS, 'keyvault.datakeys')
281+
client = rs_or_single_client(auto_encryption_opts=opts)
282+
self.addCleanup(client.close)
283+
284+
options = CodecOptions(uuid_representation=UuidRepresentation.STANDARD)
285+
encrypted_coll = client.pymongo_test.test
286+
coll = encrypted_coll.with_options(codec_options=options)
287+
uuids = [uuid.uuid4() for _ in range(3)]
288+
result = coll.bulk_write([
289+
UpdateOne({'_id': uuids[0]}, {'$set': {'a': 0}}, upsert=True),
290+
ReplaceOne({'a': 1}, {'_id': uuids[1]}, upsert=True),
291+
# This is just here to make the counts right in all cases.
292+
ReplaceOne({'_id': uuids[2]}, {'_id': uuids[2]}, upsert=True),
293+
])
294+
self.assertEqualResponse(
295+
{'nMatched': 0,
296+
'nModified': 0,
297+
'nUpserted': 3,
298+
'nInserted': 0,
299+
'nRemoved': 0,
300+
'upserted': [{'index': 0, '_id': uuids[0]},
301+
{'index': 1, '_id': uuids[1]},
302+
{'index': 2, '_id': uuids[2]}]},
303+
result.bulk_api_result)
304+
305+
275306
class TestClientMaxWireVersion(IntegrationTest):
276307

277308
@classmethod

0 commit comments

Comments
 (0)