Skip to content

Commit 9c1b873

Browse files
committed
fixed json codec
added test for testing encoded unicode values in meta using msgpack codec fixed message_bus_backend_test added tests for _convert, _reconvert make msgpack requirement >=0.4 made suggested changes added warning message for non serializable objects and added an early return renamed method names for encode-decode to convert-revert and added docstring to these methods renamed methdod _revert_from_saved_type to _convert_from_saved_type replaced revert with restores in a doc string added assertion to check object length in _convert_from_saved_type resolved merge conflicts
1 parent cb74848 commit 9c1b873

File tree

6 files changed

+201
-62
lines changed

6 files changed

+201
-62
lines changed

frontera/contrib/backends/remote/codecs/json.py

Lines changed: 92 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,29 +3,75 @@
33
"""
44
from __future__ import absolute_import
55
import json
6+
import six
67
from base64 import b64decode, b64encode
78
from frontera.core.codec import BaseDecoder, BaseEncoder
8-
from w3lib.util import to_unicode, to_native_str
9-
from frontera.utils.misc import dict_to_unicode, dict_to_bytes
9+
from w3lib.util import to_unicode, to_bytes
10+
11+
12+
def _convert_and_save_type(obj):
13+
"""
14+
:param obj: dict object
15+
16+
The purpose of this method is to transform the given dict
17+
into a form that would be able to serialize with JSONEncoder.
18+
In order to implement this, this method converts all byte strings
19+
inside a dict to unicode and saves their type for reverting to its
20+
original state. The type and the value are stored as a tuple in the
21+
following format: (original_type, converted value). All other objects
22+
like dict, tuple, list are converted to the same format for the sake
23+
of serialization and for the ease of reverting.
24+
Refer `https://github.com/scrapinghub/frontera/pull/233#discussion_r97432868`
25+
for the detailed explanation about the design.
26+
"""
27+
if isinstance(obj, bytes):
28+
return 'bytes', to_unicode(obj)
29+
elif isinstance(obj, dict):
30+
return 'dict', [(_convert_and_save_type(k), _convert_and_save_type(v)) for k, v in six.iteritems(obj)]
31+
elif isinstance(obj, (list, tuple)):
32+
return type(obj).__name__, [_convert_and_save_type(item) for item in obj]
33+
return 'other', obj
34+
35+
36+
def _convert_from_saved_type(obj):
37+
"""
38+
:param obj: object returned by `_convert_and_save_type`
39+
40+
Restores the original state of the object converted
41+
earlier by `_convert_and_save_type`. This method considers every
42+
first element of the nested tuple as the original type information and
43+
the second value to be the converted value. It applies the original type
44+
recursively on the object to retrieve the original form of the object.
45+
"""
46+
assert len(obj) == 2
47+
obj_type, obj_value = obj
48+
if obj_type == 'bytes':
49+
return to_bytes(obj_value)
50+
elif obj_type == 'dict':
51+
return dict([(_convert_from_saved_type(k), _convert_from_saved_type(v)) for k, v in obj_value])
52+
elif obj_type in ['list', 'tuple']:
53+
_type = list if obj_type == 'list' else tuple
54+
return _type([_convert_from_saved_type(item) for item in obj_value])
55+
return obj_value
1056

1157

1258
def _prepare_request_message(request):
13-
return {'url': to_unicode(request.url),
14-
'method': to_unicode(request.method),
15-
'headers': dict_to_unicode(request.headers),
16-
'cookies': dict_to_unicode(request.cookies),
17-
'meta': dict_to_unicode(request.meta)}
59+
return {'url': request.url,
60+
'method': request.method,
61+
'headers': request.headers,
62+
'cookies': request.cookies,
63+
'meta': request.meta}
1864

1965

2066
def _prepare_links_message(links):
2167
return [_prepare_request_message(link) for link in links]
2268

2369

2470
def _prepare_response_message(response, send_body):
25-
return {'url': to_unicode(response.url),
71+
return {'url': response.url,
2672
'status_code': response.status_code,
27-
'meta': dict_to_unicode(response.meta),
28-
'body': to_unicode(b64encode(response.body)) if send_body else None}
73+
'meta': response.meta,
74+
'body': b64encode(response.body) if send_body else None}
2975

3076

3177
class CrawlFrontierJSONEncoder(json.JSONEncoder):
@@ -45,6 +91,10 @@ def __init__(self, request_model, *a, **kw):
4591
self.send_body = kw.pop('send_body', False)
4692
super(Encoder, self).__init__(request_model, *a, **kw)
4793

94+
def encode(self, obj):
95+
encoded = _convert_and_save_type(obj)
96+
return super(Encoder, self).encode(encoded)
97+
4898
def encode_add_seeds(self, seeds):
4999
return self.encode({
50100
'type': 'add_seeds',
@@ -101,52 +151,51 @@ def __init__(self, request_model, response_model, *a, **kw):
101151
super(Decoder, self).__init__(*a, **kw)
102152

103153
def _response_from_object(self, obj):
104-
url = to_native_str(obj[b'url'])
154+
url = obj['url']
105155
request = self._request_model(url=url,
106-
meta=obj[b'meta'])
156+
meta=obj['meta'])
107157
return self._response_model(url=url,
108-
status_code=obj[b'status_code'],
109-
body=b64decode(obj[b'body']) if obj[b'body'] is not None else None,
158+
status_code=obj['status_code'],
159+
body=b64decode(obj['body']) if obj['body'] is not None else None,
110160
request=request)
111161

112162
def _request_from_object(self, obj):
113-
return self._request_model(url=to_native_str(obj[b'url']),
114-
method=obj[b'method'],
115-
headers=obj[b'headers'],
116-
cookies=obj[b'cookies'],
117-
meta=obj[b'meta'])
163+
return self._request_model(url=obj['url'],
164+
method=obj['method'],
165+
headers=obj['headers'],
166+
cookies=obj['cookies'],
167+
meta=obj['meta'])
118168

119169
def decode(self, message):
120-
message = dict_to_bytes(super(Decoder, self).decode(message))
121-
if message[b'type'] == b'links_extracted':
122-
request = self._request_from_object(message[b'r'])
123-
links = [self._request_from_object(link) for link in message[b'links']]
170+
message = _convert_from_saved_type(super(Decoder, self).decode(message))
171+
if message['type'] == 'links_extracted':
172+
request = self._request_from_object(message['r'])
173+
links = [self._request_from_object(link) for link in message['links']]
124174
return ('links_extracted', request, links)
125-
if message[b'type'] == b'page_crawled':
126-
response = self._response_from_object(message[b'r'])
175+
if message['type'] == 'page_crawled':
176+
response = self._response_from_object(message['r'])
127177
return ('page_crawled', response)
128-
if message[b'type'] == b'request_error':
129-
request = self._request_from_object(message[b'r'])
130-
return ('request_error', request, to_native_str(message[b'error']))
131-
if message[b'type'] == b'update_score':
132-
return ('update_score', self._request_from_object(message[b'r']), message[b'score'], message[b'schedule'])
133-
if message[b'type'] == b'add_seeds':
178+
if message['type'] == 'request_error':
179+
request = self._request_from_object(message['r'])
180+
return ('request_error', request, message['error'])
181+
if message['type'] == 'update_score':
182+
return ('update_score', self._request_from_object(message['r']), message['score'], message['schedule'])
183+
if message['type'] == 'add_seeds':
134184
seeds = []
135-
for seed in message[b'seeds']:
185+
for seed in message['seeds']:
136186
request = self._request_from_object(seed)
137187
seeds.append(request)
138188
return ('add_seeds', seeds)
139-
if message[b'type'] == b'new_job_id':
140-
return ('new_job_id', int(message[b'job_id']))
141-
if message[b'type'] == b'offset':
142-
return ('offset', int(message[b'partition_id']), int(message[b'offset']))
189+
if message['type'] == 'new_job_id':
190+
return ('new_job_id', int(message['job_id']))
191+
if message['type'] == 'offset':
192+
return ('offset', int(message['partition_id']), int(message['offset']))
143193
return TypeError('Unknown message type')
144194

145195
def decode_request(self, message):
146-
obj = dict_to_bytes(super(Decoder, self).decode(message))
147-
return self._request_model(url=to_native_str(obj[b'url']),
148-
method=obj[b'method'],
149-
headers=obj[b'headers'],
150-
cookies=obj[b'cookies'],
151-
meta=obj[b'meta'])
152-
196+
obj = _convert_from_saved_type(super(Decoder, self).decode(message))
197+
return self._request_model(url=obj['url'],
198+
method=obj['method'],
199+
headers=obj['headers'],
200+
cookies=obj['cookies'],
201+
meta=obj['meta'])

frontera/contrib/backends/remote/codecs/msgpack.py

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,37 @@
22
""" A MsgPack codec for Frontera. Implemented using native msgpack-python library.
33
"""
44
from __future__ import absolute_import
5-
5+
import logging
66
from msgpack import packb, unpackb
77

88
from frontera.core.codec import BaseDecoder, BaseEncoder
9+
import six
910
from w3lib.util import to_native_str
1011

1112

13+
logger = logging.getLogger(__name__)
14+
15+
1216
def _prepare_request_message(request):
13-
return [request.url, request.method, request.headers, request.cookies, request.meta]
17+
def serialize(obj):
18+
"""Recursively walk object's hierarchy."""
19+
if isinstance(obj, (bool, six.integer_types, float, six.binary_type, six.text_type)):
20+
return obj
21+
elif isinstance(obj, dict):
22+
obj = obj.copy()
23+
for key in obj:
24+
obj[key] = serialize(obj[key])
25+
return obj
26+
elif isinstance(obj, list):
27+
return [serialize(item) for item in obj]
28+
elif isinstance(obj, tuple):
29+
return tuple(serialize([item for item in obj]))
30+
elif hasattr(obj, '__dict__'):
31+
return serialize(obj.__dict__)
32+
else:
33+
logger.warning('unable to serialize object: {}'.format(obj))
34+
return None
35+
return [request.url, request.method, request.headers, request.cookies, serialize(request.meta)]
1436

1537

1638
def _prepare_response_message(response, send_body):
@@ -22,29 +44,29 @@ def __init__(self, request_model, *a, **kw):
2244
self.send_body = True if 'send_body' in kw and kw['send_body'] else False
2345

2446
def encode_add_seeds(self, seeds):
25-
return packb([b'as', [_prepare_request_message(seed) for seed in seeds]], use_bin_type=True, encoding="utf-8")
47+
return packb([b'as', [_prepare_request_message(seed) for seed in seeds]], use_bin_type=True)
2648

2749
def encode_page_crawled(self, response):
28-
return packb([b'pc', _prepare_response_message(response, self.send_body)], use_bin_type=True, encoding="utf-8")
50+
return packb([b'pc', _prepare_response_message(response, self.send_body)], use_bin_type=True)
2951

3052
def encode_links_extracted(self, request, links):
3153
return packb([b'le', _prepare_request_message(request), [_prepare_request_message(link) for link in links]],
32-
use_bin_type=True, encoding="utf-8")
54+
use_bin_type=True)
3355

3456
def encode_request_error(self, request, error):
35-
return packb([b're', _prepare_request_message(request), str(error)], use_bin_type=True, encoding="utf-8")
57+
return packb([b're', _prepare_request_message(request), str(error)], use_bin_type=True)
3658

3759
def encode_request(self, request):
38-
return packb(_prepare_request_message(request), use_bin_type=True, encoding="utf-8")
60+
return packb(_prepare_request_message(request), use_bin_type=True)
3961

4062
def encode_update_score(self, request, score, schedule):
41-
return packb([b'us', _prepare_request_message(request), score, schedule], use_bin_type=True, encoding="utf-8")
63+
return packb([b'us', _prepare_request_message(request), score, schedule], use_bin_type=True)
4264

4365
def encode_new_job_id(self, job_id):
44-
return packb([b'njid', int(job_id)], use_bin_type=True, encoding="utf-8")
66+
return packb([b'njid', int(job_id)], use_bin_type=True)
4567

4668
def encode_offset(self, partition_id, offset):
47-
return packb([b'of', int(partition_id), int(offset)], use_bin_type=True, encoding="utf-8")
69+
return packb([b'of', int(partition_id), int(offset)], use_bin_type=True)
4870

4971

5072
class Decoder(BaseDecoder):
@@ -68,7 +90,7 @@ def _request_from_object(self, obj):
6890
meta=obj[4])
6991

7092
def decode(self, buffer):
71-
obj = unpackb(buffer, encoding="utf-8")
93+
obj = unpackb(buffer, encoding='utf-8')
7294
if obj[0] == b'pc':
7395
return ('page_crawled',
7496
self._response_from_object(obj[1]))
@@ -89,4 +111,4 @@ def decode(self, buffer):
89111
return TypeError('Unknown message type')
90112

91113
def decode_request(self, buffer):
92-
return self._request_from_object(unpackb(buffer, encoding="utf-8"))
114+
return self._request_from_object(unpackb(buffer, encoding='utf-8'))

requirements/tests.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ scrapy>=0.24
66
SQLAlchemy>=1.0.0
77
cachetools
88
pyzmq
9-
msgpack-python
9+
msgpack-python>=0.4
1010
kafka-python>=1.0.0
1111
pytest-cov
1212
happybase>=1.0.0

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
],
6565
'zeromq': [
6666
'pyzmq',
67-
'msgpack-python'
67+
'msgpack-python>=0.4'
6868
],
6969
'kafka': [
7070
'kafka-python>=1.0.0'

tests/test_codecs.py

Lines changed: 72 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,44 @@
11
# -*- coding: utf-8 -*-
22

33
from __future__ import absolute_import
4-
from frontera.contrib.backends.remote.codecs.json import Encoder as JsonEncoder, Decoder as JsonDecoder
4+
import json
5+
import unittest
6+
from frontera.contrib.backends.remote.codecs.json import (Encoder as JsonEncoder, Decoder as JsonDecoder,
7+
_convert_and_save_type, _convert_from_saved_type)
58
from frontera.contrib.backends.remote.codecs.msgpack import Encoder as MsgPackEncoder, Decoder as MsgPackDecoder
69
from frontera.core.models import Request, Response
710
import pytest
811

912

13+
def _compare_dicts(dict1, dict2):
14+
"""
15+
Compares two dicts
16+
:return: True if both dicts are equal else False
17+
"""
18+
if dict1 == None or dict2 == None:
19+
return False
20+
21+
if type(dict1) is not dict or type(dict2) is not dict:
22+
return False
23+
24+
shared_keys = set(dict2.keys()) & set(dict2.keys())
25+
26+
if not (len(shared_keys) == len(dict1.keys()) and len(shared_keys) == len(dict2.keys())):
27+
return False
28+
29+
dicts_are_equal = True
30+
for key in dict1.keys():
31+
if type(dict1[key]) is dict:
32+
dicts_are_equal = _compare_dicts(dict1[key], dict2[key])
33+
else:
34+
dicts_are_equal = (dict1[key] == dict2[key]) and (type(dict1[key]) == type(dict2[key]))
35+
36+
if not dicts_are_equal:
37+
return False
38+
39+
return dicts_are_equal
40+
41+
1042
@pytest.mark.parametrize('send_body', [True, False])
1143
@pytest.mark.parametrize(
1244
('encoder', 'decoder'), [
@@ -16,12 +48,13 @@
1648
)
1749
def test_codec(encoder, decoder, send_body):
1850
def check_request(req1, req2):
19-
assert req1.url == req2.url and req1.meta == req2.meta and req1.headers == req2.headers \
20-
and req1.method == req2.method
51+
assert req1.url == req2.url and _compare_dicts(req1.meta, req2.meta) == True and \
52+
_compare_dicts(req1.headers, req2.headers) == True and req1.method == req2.method
2153

2254
enc = encoder(Request, send_body=send_body)
2355
dec = decoder(Request, Response)
24-
req = Request(url="http://www.yandex.ru",method=b'GET', meta={b"test": b"shmest"}, headers={b'reqhdr': b'value'})
56+
req = Request(url="http://www.yandex.ru", method=b'GET',
57+
meta={b'test': b'shmest', b'scrapy_meta': {'rule': 0, 'key': 'value'}}, headers={b'reqhdr': b'value'})
2558
req2 = Request(url="http://www.yandex.ru/search")
2659
msgs = [
2760
enc.encode_add_seeds([req]),
@@ -85,3 +118,38 @@ def check_request(req1, req2):
85118

86119
o = dec.decode_request(next(it))
87120
check_request(o, req)
121+
122+
123+
class TestEncodeDecodeJson(unittest.TestCase):
124+
"""
125+
Test for testing methods `_encode_recursively` and `_decode_recursively` used in json codec
126+
"""
127+
128+
def test_encode_decode_json_recursively(self):
129+
_int = 1
130+
_bytes = b'bytes'
131+
_unicode = u'unicode'
132+
_bool = True
133+
_none = None
134+
simple_dict = {'key': 'value'}
135+
simple_list = ['item', 1]
136+
simple_tuple = ('str', 2)
137+
mixed_type_dict = {b'k1': 'v1', 'k2': b'v2', 'int': 1, b'none': None, 'bool': False}
138+
mixed_type_list = [b'i1', 'i2', 23, None, True]
139+
mixed_type_tuple = [b'i1', 'i2', 23, None, True]
140+
nested_dict = {'k1': b'v1', 'lst': [b'i1', 1, ('str', 1, {'k2': b'v1', 'tup': (1, None)})]}
141+
nested_list = [True, None, (1, 2, 3), {b'k1': b'v1', 'tup': ('a', b'b', [None, False])}]
142+
nested_tuple = (1, None, ['a', 'b', True, {b'k1': 'v2', 'lst': ['a', False, (2, 3, 5)]}])
143+
msgs = [_int, _bytes, _unicode, _bool, _none, simple_dict, simple_list, simple_tuple,
144+
mixed_type_dict, mixed_type_list, mixed_type_tuple, nested_dict, nested_list, nested_tuple]
145+
encoder = json.JSONEncoder()
146+
decoder = json.JSONDecoder()
147+
for original_msg in msgs:
148+
encoded_msg_1 = _convert_and_save_type(original_msg)
149+
encoded_msg_2 = encoder.encode(encoded_msg_1)
150+
decoded_msg_2 = decoder.decode(encoded_msg_2)
151+
decoded_msg_1 = _convert_from_saved_type(decoded_msg_2)
152+
if isinstance(decoded_msg_1, dict):
153+
self.assertDictEqual(decoded_msg_1, original_msg)
154+
elif isinstance(decoded_msg_1, (list, tuple)):
155+
self.assertSequenceEqual(decoded_msg_1, original_msg)

0 commit comments

Comments
 (0)