Skip to content

PYTHON-3075 [Backport-3.13] bulk_write does not apply CodecOptions to upserted_ids result #842

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pymongo/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ def _legacy_write(self, sock_info, name, cmd, op_id,
sock_info.service_id)
start = datetime.datetime.now()
try:
result = sock_info.legacy_write(rqst_id, msg, max_size, False)
result = sock_info.legacy_write(rqst_id, msg, max_size, False, self.codec_options)
except Exception as exc:
if publish:
dur = (datetime.datetime.now() - start) + duration
Expand Down
15 changes: 8 additions & 7 deletions pymongo/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
_decode_selective,
_dict_to_bson,
_make_c_string)

from bson.codec_options import DEFAULT_CODEC_OPTIONS
from bson.int64 import Int64
from bson.raw_bson import (_inflate_bson, DEFAULT_RAW_BSON_OPTIONS,
Expand Down Expand Up @@ -1008,7 +1009,7 @@ def legacy_write(self, cmd, request_id, msg, max_doc_size, acknowledged,
start = datetime.datetime.now()
try:
result = self.sock_info.legacy_write(
request_id, msg, max_doc_size, acknowledged)
request_id, msg, max_doc_size, acknowledged, self.codec)
if self.publish:
duration = (datetime.datetime.now() - start) + duration
if result is not None:
Expand Down Expand Up @@ -1041,7 +1042,7 @@ def write_command(self, cmd, request_id, msg, docs):
self._start(cmd, request_id, docs)
start = datetime.datetime.now()
try:
reply = self.sock_info.write_command(request_id, msg)
reply = self.sock_info.write_command(request_id, msg, self.codec)
if self.publish:
duration = (datetime.datetime.now() - start) + duration
self._succeed(request_id, reply, duration)
Expand Down Expand Up @@ -1110,7 +1111,7 @@ def execute(self, cmd, docs, client):
batched_cmd, to_send = self._batch_command(cmd, docs)
result = self.sock_info.command(
self.db_name, batched_cmd,
codec_options=_UNICODE_REPLACE_CODEC_OPTIONS,
codec_options=self.codec,
session=self.session, client=client)
return result, to_send

Expand Down Expand Up @@ -1602,9 +1603,9 @@ def unpack_response(self, cursor_id=None,
return bson._decode_all_selective(
self.documents, codec_options, user_fields)

def command_response(self):
def command_response(self, codec_options):
"""Unpack a command response."""
docs = self.unpack_response()
docs = self.unpack_response(codec_options=codec_options)
assert self.number_returned == 1
return docs[0]

Expand Down Expand Up @@ -1672,9 +1673,9 @@ def unpack_response(self, cursor_id=None,
return bson._decode_all_selective(
self.payload_document, codec_options, user_fields)

def command_response(self):
def command_response(self, codec_options):
"""Unpack a command response."""
return self.unpack_response()[0]
return self.unpack_response(codec_options=codec_options)[0]

def raw_command_response(self):
"""Return the bytes of the command response."""
Expand Down
11 changes: 6 additions & 5 deletions pymongo/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
HAS_SNI as _HAVE_SNI,
IPADDR_SAFE as _IPADDR_SAFE)

from bson import DEFAULT_CODEC_OPTIONS
from bson import DEFAULT_CODEC_OPTIONS, codec_options
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Importing codec_options is not needed. Is your editor adding this automatically by accident?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I just disabled it

from bson.py3compat import imap, itervalues, _unicode
from bson.son import SON
from pymongo import auth, helpers, thread_util, __version__
Expand Down Expand Up @@ -761,7 +761,7 @@ def _raise_if_not_writable(self, unacknowledged):
raise NotPrimaryError("not primary", {
"ok": 0, "errmsg": "not primary", "code": 10107})

def legacy_write(self, request_id, msg, max_doc_size, with_last_error):
def legacy_write(self, request_id, msg, max_doc_size, with_last_error, codec_options):
"""Send OP_INSERT, etc., optionally returning response as a dict.

Can raise ConnectionFailure or OperationFailure.
Expand All @@ -778,10 +778,11 @@ def legacy_write(self, request_id, msg, max_doc_size, with_last_error):
self.send_message(msg, max_doc_size)
if with_last_error:
reply = self.receive_message(request_id)
return helpers._check_gle_response(reply.command_response(),
response = reply.command_response(codec_options)
return helpers._check_gle_response(response,
self.max_wire_version)

def write_command(self, request_id, msg):
def write_command(self, request_id, msg, codec_options):
"""Send "insert" etc. command, returning response as a dict.

Can raise ConnectionFailure or OperationFailure.
Expand All @@ -792,7 +793,7 @@ def write_command(self, request_id, msg):
"""
self.send_message(msg, 0)
reply = self.receive_message(request_id)
result = reply.command_response()
result = reply.command_response(codec_options)

# Raises NotPrimaryError or OperationFailure.
helpers._check_command_response(result, self.max_wire_version)
Expand Down
73 changes: 73 additions & 0 deletions test/test_bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
"""Test the bulk API."""

import sys
import uuid
from bson.binary import UuidRepresentation
from bson.codec_options import CodecOptions

sys.path[0:0] = [""]

from bson import Binary
from bson.objectid import ObjectId
from pymongo.operations import *
from pymongo.errors import (ConfigurationError,
Expand Down Expand Up @@ -361,6 +365,75 @@ def test_bulk_write_invalid_arguments(self):
self.coll.bulk_write([{}])


def test_upsert_uuid_standard(self):
options = CodecOptions(uuid_representation=UuidRepresentation.STANDARD)
coll = self.coll.with_options(codec_options=options)
uuids = [uuid.uuid4() for _ in range(3)]
result = coll.bulk_write([
UpdateOne({'_id': uuids[0]}, {'$set': {'a': 0}}, upsert=True),
ReplaceOne({'a': 1}, {'_id': uuids[1]}, upsert=True),
# This is just here to make the counts right in all cases.
ReplaceOne({'_id': uuids[2]}, {'_id': uuids[2]}, upsert=True),
])
self.assertEqualResponse(
{'nMatched': 0,
'nModified': 0,
'nUpserted': 3,
'nInserted': 0,
'nRemoved': 0,
'upserted': [{'index': 0, '_id': uuids[0]},
{'index': 1, '_id': uuids[1]},
{'index': 2, '_id': uuids[2]}]},
result.bulk_api_result)

def test_upsert_uuid_unspecified(self):
options = CodecOptions(uuid_representation=UuidRepresentation.UNSPECIFIED)
coll = self.coll.with_options(codec_options=options)
uuids = [Binary.from_uuid(uuid.uuid4()) for _ in range(3)]
result = coll.bulk_write([
UpdateOne({'_id': uuids[0]}, {'$set': {'a': 0}}, upsert=True),
ReplaceOne({'a': 1}, {'_id': uuids[1]}, upsert=True),
# This is just here to make the counts right in all cases.
ReplaceOne({'_id': uuids[2]}, {'_id': uuids[2]}, upsert=True),
])
self.assertEqualResponse(
{'nMatched': 0,
'nModified': 0,
'nUpserted': 3,
'nInserted': 0,
'nRemoved': 0,
'upserted': [{'index': 0, '_id': uuids[0]},
{'index': 1, '_id': uuids[1]},
{'index': 2, '_id': uuids[2]}]},
result.bulk_api_result)

def test_upsert_uuid_standard_subdocuments(self):
options = CodecOptions(uuid_representation=UuidRepresentation.STANDARD)
coll = self.coll.with_options(codec_options=options)
ids = [
{'f': i, 'f2': uuid.uuid4()}
for i in range(3)
]

result = coll.bulk_write([
UpdateOne({'_id': ids[0]}, {'$set': {'a': 0}}, upsert=True),
ReplaceOne({'a': 1}, {'_id': ids[1]}, upsert=True),
# This is just here to make the counts right in all cases.
ReplaceOne({'_id': ids[2]}, {'_id': ids[2]}, upsert=True),
])

self.assertEqualResponse(
{'nMatched': 0,
'nModified': 0,
'nUpserted': 3,
'nInserted': 0,
'nRemoved': 0,
'upserted': [{'index': 0, '_id': ids[0]},
{'index': 1, '_id': ids[1]},
{'index': 2, '_id': ids[2]}]},
result.bulk_api_result)


class BulkAuthorizationTestBase(BulkTestBase):

@classmethod
Expand Down
33 changes: 32 additions & 1 deletion test/test_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from bson import encode, json_util
from bson.binary import (Binary,
UuidRepresentation,
JAVA_LEGACY,
STANDARD,
UUID_SUBTYPE)
Expand All @@ -49,14 +50,15 @@
ServerSelectionTimeoutError,
WriteError)
from pymongo.mongo_client import MongoClient
from pymongo.operations import InsertOne
from pymongo.operations import InsertOne, ReplaceOne, UpdateOne
from pymongo.write_concern import WriteConcern
from test.test_ssl import CA_PEM

from test import (unittest,
client_context,
IntegrationTest,
PyMongoTestCase)
from test.test_bulk import BulkTestBase
from test.utils import (TestCreator,
camel_to_snake_args,
OvertCommandListener,
Expand Down Expand Up @@ -272,6 +274,35 @@ def test_use_after_close(self):
client.admin.command('ping')


class TestEncryptedBulkWrite(BulkTestBase, EncryptionIntegrationTest):

def test_upsert_uuid_standard_encrypte(self):
opts = AutoEncryptionOpts(KMS_PROVIDERS, 'keyvault.datakeys')
client = rs_or_single_client(auto_encryption_opts=opts)
self.addCleanup(client.close)

options = CodecOptions(uuid_representation=UuidRepresentation.STANDARD)
encrypted_coll = client.pymongo_test.test
coll = encrypted_coll.with_options(codec_options=options)
uuids = [uuid.uuid4() for _ in range(3)]
result = coll.bulk_write([
UpdateOne({'_id': uuids[0]}, {'$set': {'a': 0}}, upsert=True),
ReplaceOne({'a': 1}, {'_id': uuids[1]}, upsert=True),
# This is just here to make the counts right in all cases.
ReplaceOne({'_id': uuids[2]}, {'_id': uuids[2]}, upsert=True),
])
self.assertEqualResponse(
{'nMatched': 0,
'nModified': 0,
'nUpserted': 3,
'nInserted': 0,
'nRemoved': 0,
'upserted': [{'index': 0, '_id': uuids[0]},
{'index': 1, '_id': uuids[1]},
{'index': 2, '_id': uuids[2]}]},
result.bulk_api_result)


class TestClientMaxWireVersion(IntegrationTest):

@classmethod
Expand Down