From cb74848822679ba2bbfc7873a1c6cc9f804a52ef Mon Sep 17 00:00:00 2001 From: voith Date: Sat, 10 Dec 2016 22:03:21 +0530 Subject: [PATCH 1/2] fixed msgpack codec --- .../contrib/backends/remote/codecs/msgpack.py | 43 ++++++------------- 1 file changed, 12 insertions(+), 31 deletions(-) diff --git a/frontera/contrib/backends/remote/codecs/msgpack.py b/frontera/contrib/backends/remote/codecs/msgpack.py index 701f61732..9ee99c039 100644 --- a/frontera/contrib/backends/remote/codecs/msgpack.py +++ b/frontera/contrib/backends/remote/codecs/msgpack.py @@ -6,31 +6,11 @@ from msgpack import packb, unpackb from frontera.core.codec import BaseDecoder, BaseEncoder -import six from w3lib.util import to_native_str def _prepare_request_message(request): - def serialize(obj): - """Recursively walk object's hierarchy.""" - if isinstance(obj, six.text_type): - return obj.encode('utf8') - if isinstance(obj, (bool, six.integer_types, float, six.binary_type)): - return obj - elif isinstance(obj, dict): - obj = obj.copy() - for key in obj: - obj[key] = serialize(obj[key]) - return obj - elif isinstance(obj, list): - return [serialize(item) for item in obj] - elif isinstance(obj, tuple): - return tuple(serialize([item for item in obj])) - elif hasattr(obj, '__dict__'): - return serialize(obj.__dict__) - else: - return None - return [request.url, request.method, request.headers, request.cookies, serialize(request.meta)] + return [request.url, request.method, request.headers, request.cookies, request.meta] def _prepare_response_message(response, send_body): @@ -42,28 +22,29 @@ def __init__(self, request_model, *a, **kw): self.send_body = True if 'send_body' in kw and kw['send_body'] else False def encode_add_seeds(self, seeds): - return packb([b'as', [_prepare_request_message(seed) for seed in seeds]]) + return packb([b'as', [_prepare_request_message(seed) for seed in seeds]], use_bin_type=True, encoding="utf-8") def encode_page_crawled(self, response): - return packb([b'pc', _prepare_response_message(response, self.send_body)]) + return packb([b'pc', _prepare_response_message(response, self.send_body)], use_bin_type=True, encoding="utf-8") def encode_links_extracted(self, request, links): - return packb([b'le', _prepare_request_message(request), [_prepare_request_message(link) for link in links]]) + return packb([b'le', _prepare_request_message(request), [_prepare_request_message(link) for link in links]], + use_bin_type=True, encoding="utf-8") def encode_request_error(self, request, error): - return packb([b're', _prepare_request_message(request), str(error)]) + return packb([b're', _prepare_request_message(request), str(error)], use_bin_type=True, encoding="utf-8") def encode_request(self, request): - return packb(_prepare_request_message(request)) + return packb(_prepare_request_message(request), use_bin_type=True, encoding="utf-8") def encode_update_score(self, request, score, schedule): - return packb([b'us', _prepare_request_message(request), score, schedule]) + return packb([b'us', _prepare_request_message(request), score, schedule], use_bin_type=True, encoding="utf-8") def encode_new_job_id(self, job_id): - return packb([b'njid', int(job_id)]) + return packb([b'njid', int(job_id)], use_bin_type=True, encoding="utf-8") def encode_offset(self, partition_id, offset): - return packb([b'of', int(partition_id), int(offset)]) + return packb([b'of', int(partition_id), int(offset)], use_bin_type=True, encoding="utf-8") class Decoder(BaseDecoder): @@ -87,7 +68,7 @@ def _request_from_object(self, obj): meta=obj[4]) def decode(self, buffer): - obj = unpackb(buffer) + obj = unpackb(buffer, encoding="utf-8") if obj[0] == b'pc': return ('page_crawled', self._response_from_object(obj[1])) @@ -108,4 +89,4 @@ def decode(self, buffer): return TypeError('Unknown message type') def decode_request(self, buffer): - return self._request_from_object(unpackb(buffer)) + return self._request_from_object(unpackb(buffer, encoding="utf-8")) From 9c1b87397fe28f03241f39708474ef8ab46dd0f0 Mon Sep 17 00:00:00 2001 From: voith Date: Mon, 12 Dec 2016 23:01:22 +0530 Subject: [PATCH 2/2] fixed json codec added test for testing encoded unicode values in meta using msgpack codec fixed message_bus_backend_test added tests for _convert, _reconvert make msgpack requirement >=0.4 made suggested changes added warning message for non serializable objects and added an early return renamed method names for encode-decode to convert-revert and added docstring to these methods renamed methdod _revert_from_saved_type to _convert_from_saved_type replaced revert with restores in a doc string added assertion to check object length in _convert_from_saved_type resolved merge conflicts --- .../contrib/backends/remote/codecs/json.py | 135 ++++++++++++------ .../contrib/backends/remote/codecs/msgpack.py | 46 ++++-- requirements/tests.txt | 2 +- setup.py | 2 +- tests/test_codecs.py | 76 +++++++++- tests/test_message_bus_backend.py | 2 +- 6 files changed, 201 insertions(+), 62 deletions(-) diff --git a/frontera/contrib/backends/remote/codecs/json.py b/frontera/contrib/backends/remote/codecs/json.py index 7df267894..ef4aa538d 100644 --- a/frontera/contrib/backends/remote/codecs/json.py +++ b/frontera/contrib/backends/remote/codecs/json.py @@ -3,18 +3,64 @@ """ from __future__ import absolute_import import json +import six from base64 import b64decode, b64encode from frontera.core.codec import BaseDecoder, BaseEncoder -from w3lib.util import to_unicode, to_native_str -from frontera.utils.misc import dict_to_unicode, dict_to_bytes +from w3lib.util import to_unicode, to_bytes + + +def _convert_and_save_type(obj): + """ + :param obj: dict object + + The purpose of this method is to transform the given dict + into a form that would be able to serialize with JSONEncoder. + In order to implement this, this method converts all byte strings + inside a dict to unicode and saves their type for reverting to its + original state. The type and the value are stored as a tuple in the + following format: (original_type, converted value). All other objects + like dict, tuple, list are converted to the same format for the sake + of serialization and for the ease of reverting. + Refer `https://github.com/scrapinghub/frontera/pull/233#discussion_r97432868` + for the detailed explanation about the design. + """ + if isinstance(obj, bytes): + return 'bytes', to_unicode(obj) + elif isinstance(obj, dict): + return 'dict', [(_convert_and_save_type(k), _convert_and_save_type(v)) for k, v in six.iteritems(obj)] + elif isinstance(obj, (list, tuple)): + return type(obj).__name__, [_convert_and_save_type(item) for item in obj] + return 'other', obj + + +def _convert_from_saved_type(obj): + """ + :param obj: object returned by `_convert_and_save_type` + + Restores the original state of the object converted + earlier by `_convert_and_save_type`. This method considers every + first element of the nested tuple as the original type information and + the second value to be the converted value. It applies the original type + recursively on the object to retrieve the original form of the object. + """ + assert len(obj) == 2 + obj_type, obj_value = obj + if obj_type == 'bytes': + return to_bytes(obj_value) + elif obj_type == 'dict': + return dict([(_convert_from_saved_type(k), _convert_from_saved_type(v)) for k, v in obj_value]) + elif obj_type in ['list', 'tuple']: + _type = list if obj_type == 'list' else tuple + return _type([_convert_from_saved_type(item) for item in obj_value]) + return obj_value def _prepare_request_message(request): - return {'url': to_unicode(request.url), - 'method': to_unicode(request.method), - 'headers': dict_to_unicode(request.headers), - 'cookies': dict_to_unicode(request.cookies), - 'meta': dict_to_unicode(request.meta)} + return {'url': request.url, + 'method': request.method, + 'headers': request.headers, + 'cookies': request.cookies, + 'meta': request.meta} def _prepare_links_message(links): @@ -22,10 +68,10 @@ def _prepare_links_message(links): def _prepare_response_message(response, send_body): - return {'url': to_unicode(response.url), + return {'url': response.url, 'status_code': response.status_code, - 'meta': dict_to_unicode(response.meta), - 'body': to_unicode(b64encode(response.body)) if send_body else None} + 'meta': response.meta, + 'body': b64encode(response.body) if send_body else None} class CrawlFrontierJSONEncoder(json.JSONEncoder): @@ -45,6 +91,10 @@ def __init__(self, request_model, *a, **kw): self.send_body = kw.pop('send_body', False) super(Encoder, self).__init__(request_model, *a, **kw) + def encode(self, obj): + encoded = _convert_and_save_type(obj) + return super(Encoder, self).encode(encoded) + def encode_add_seeds(self, seeds): return self.encode({ 'type': 'add_seeds', @@ -101,52 +151,51 @@ def __init__(self, request_model, response_model, *a, **kw): super(Decoder, self).__init__(*a, **kw) def _response_from_object(self, obj): - url = to_native_str(obj[b'url']) + url = obj['url'] request = self._request_model(url=url, - meta=obj[b'meta']) + meta=obj['meta']) return self._response_model(url=url, - status_code=obj[b'status_code'], - body=b64decode(obj[b'body']) if obj[b'body'] is not None else None, + status_code=obj['status_code'], + body=b64decode(obj['body']) if obj['body'] is not None else None, request=request) def _request_from_object(self, obj): - return self._request_model(url=to_native_str(obj[b'url']), - method=obj[b'method'], - headers=obj[b'headers'], - cookies=obj[b'cookies'], - meta=obj[b'meta']) + return self._request_model(url=obj['url'], + method=obj['method'], + headers=obj['headers'], + cookies=obj['cookies'], + meta=obj['meta']) def decode(self, message): - message = dict_to_bytes(super(Decoder, self).decode(message)) - if message[b'type'] == b'links_extracted': - request = self._request_from_object(message[b'r']) - links = [self._request_from_object(link) for link in message[b'links']] + message = _convert_from_saved_type(super(Decoder, self).decode(message)) + if message['type'] == 'links_extracted': + request = self._request_from_object(message['r']) + links = [self._request_from_object(link) for link in message['links']] return ('links_extracted', request, links) - if message[b'type'] == b'page_crawled': - response = self._response_from_object(message[b'r']) + if message['type'] == 'page_crawled': + response = self._response_from_object(message['r']) return ('page_crawled', response) - if message[b'type'] == b'request_error': - request = self._request_from_object(message[b'r']) - return ('request_error', request, to_native_str(message[b'error'])) - if message[b'type'] == b'update_score': - return ('update_score', self._request_from_object(message[b'r']), message[b'score'], message[b'schedule']) - if message[b'type'] == b'add_seeds': + if message['type'] == 'request_error': + request = self._request_from_object(message['r']) + return ('request_error', request, message['error']) + if message['type'] == 'update_score': + return ('update_score', self._request_from_object(message['r']), message['score'], message['schedule']) + if message['type'] == 'add_seeds': seeds = [] - for seed in message[b'seeds']: + for seed in message['seeds']: request = self._request_from_object(seed) seeds.append(request) return ('add_seeds', seeds) - if message[b'type'] == b'new_job_id': - return ('new_job_id', int(message[b'job_id'])) - if message[b'type'] == b'offset': - return ('offset', int(message[b'partition_id']), int(message[b'offset'])) + if message['type'] == 'new_job_id': + return ('new_job_id', int(message['job_id'])) + if message['type'] == 'offset': + return ('offset', int(message['partition_id']), int(message['offset'])) return TypeError('Unknown message type') def decode_request(self, message): - obj = dict_to_bytes(super(Decoder, self).decode(message)) - return self._request_model(url=to_native_str(obj[b'url']), - method=obj[b'method'], - headers=obj[b'headers'], - cookies=obj[b'cookies'], - meta=obj[b'meta']) - + obj = _convert_from_saved_type(super(Decoder, self).decode(message)) + return self._request_model(url=obj['url'], + method=obj['method'], + headers=obj['headers'], + cookies=obj['cookies'], + meta=obj['meta']) diff --git a/frontera/contrib/backends/remote/codecs/msgpack.py b/frontera/contrib/backends/remote/codecs/msgpack.py index 9ee99c039..6be589dae 100644 --- a/frontera/contrib/backends/remote/codecs/msgpack.py +++ b/frontera/contrib/backends/remote/codecs/msgpack.py @@ -2,15 +2,37 @@ """ A MsgPack codec for Frontera. Implemented using native msgpack-python library. """ from __future__ import absolute_import - +import logging from msgpack import packb, unpackb from frontera.core.codec import BaseDecoder, BaseEncoder +import six from w3lib.util import to_native_str +logger = logging.getLogger(__name__) + + def _prepare_request_message(request): - return [request.url, request.method, request.headers, request.cookies, request.meta] + def serialize(obj): + """Recursively walk object's hierarchy.""" + if isinstance(obj, (bool, six.integer_types, float, six.binary_type, six.text_type)): + return obj + elif isinstance(obj, dict): + obj = obj.copy() + for key in obj: + obj[key] = serialize(obj[key]) + return obj + elif isinstance(obj, list): + return [serialize(item) for item in obj] + elif isinstance(obj, tuple): + return tuple(serialize([item for item in obj])) + elif hasattr(obj, '__dict__'): + return serialize(obj.__dict__) + else: + logger.warning('unable to serialize object: {}'.format(obj)) + return None + return [request.url, request.method, request.headers, request.cookies, serialize(request.meta)] def _prepare_response_message(response, send_body): @@ -22,29 +44,29 @@ def __init__(self, request_model, *a, **kw): self.send_body = True if 'send_body' in kw and kw['send_body'] else False def encode_add_seeds(self, seeds): - return packb([b'as', [_prepare_request_message(seed) for seed in seeds]], use_bin_type=True, encoding="utf-8") + return packb([b'as', [_prepare_request_message(seed) for seed in seeds]], use_bin_type=True) def encode_page_crawled(self, response): - return packb([b'pc', _prepare_response_message(response, self.send_body)], use_bin_type=True, encoding="utf-8") + return packb([b'pc', _prepare_response_message(response, self.send_body)], use_bin_type=True) def encode_links_extracted(self, request, links): return packb([b'le', _prepare_request_message(request), [_prepare_request_message(link) for link in links]], - use_bin_type=True, encoding="utf-8") + use_bin_type=True) def encode_request_error(self, request, error): - return packb([b're', _prepare_request_message(request), str(error)], use_bin_type=True, encoding="utf-8") + return packb([b're', _prepare_request_message(request), str(error)], use_bin_type=True) def encode_request(self, request): - return packb(_prepare_request_message(request), use_bin_type=True, encoding="utf-8") + return packb(_prepare_request_message(request), use_bin_type=True) def encode_update_score(self, request, score, schedule): - return packb([b'us', _prepare_request_message(request), score, schedule], use_bin_type=True, encoding="utf-8") + return packb([b'us', _prepare_request_message(request), score, schedule], use_bin_type=True) def encode_new_job_id(self, job_id): - return packb([b'njid', int(job_id)], use_bin_type=True, encoding="utf-8") + return packb([b'njid', int(job_id)], use_bin_type=True) def encode_offset(self, partition_id, offset): - return packb([b'of', int(partition_id), int(offset)], use_bin_type=True, encoding="utf-8") + return packb([b'of', int(partition_id), int(offset)], use_bin_type=True) class Decoder(BaseDecoder): @@ -68,7 +90,7 @@ def _request_from_object(self, obj): meta=obj[4]) def decode(self, buffer): - obj = unpackb(buffer, encoding="utf-8") + obj = unpackb(buffer, encoding='utf-8') if obj[0] == b'pc': return ('page_crawled', self._response_from_object(obj[1])) @@ -89,4 +111,4 @@ def decode(self, buffer): return TypeError('Unknown message type') def decode_request(self, buffer): - return self._request_from_object(unpackb(buffer, encoding="utf-8")) + return self._request_from_object(unpackb(buffer, encoding='utf-8')) diff --git a/requirements/tests.txt b/requirements/tests.txt index 0ac170f54..455cd0c35 100644 --- a/requirements/tests.txt +++ b/requirements/tests.txt @@ -6,7 +6,7 @@ scrapy>=0.24 SQLAlchemy>=1.0.0 cachetools pyzmq -msgpack-python +msgpack-python>=0.4 kafka-python>=1.0.0 pytest-cov happybase>=1.0.0 diff --git a/setup.py b/setup.py index e498c97b0..5f305a258 100644 --- a/setup.py +++ b/setup.py @@ -64,7 +64,7 @@ ], 'zeromq': [ 'pyzmq', - 'msgpack-python' + 'msgpack-python>=0.4' ], 'kafka': [ 'kafka-python>=1.0.0' diff --git a/tests/test_codecs.py b/tests/test_codecs.py index b887e24a9..82136f14b 100644 --- a/tests/test_codecs.py +++ b/tests/test_codecs.py @@ -1,12 +1,44 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import -from frontera.contrib.backends.remote.codecs.json import Encoder as JsonEncoder, Decoder as JsonDecoder +import json +import unittest +from frontera.contrib.backends.remote.codecs.json import (Encoder as JsonEncoder, Decoder as JsonDecoder, + _convert_and_save_type, _convert_from_saved_type) from frontera.contrib.backends.remote.codecs.msgpack import Encoder as MsgPackEncoder, Decoder as MsgPackDecoder from frontera.core.models import Request, Response import pytest +def _compare_dicts(dict1, dict2): + """ + Compares two dicts + :return: True if both dicts are equal else False + """ + if dict1 == None or dict2 == None: + return False + + if type(dict1) is not dict or type(dict2) is not dict: + return False + + shared_keys = set(dict2.keys()) & set(dict2.keys()) + + if not (len(shared_keys) == len(dict1.keys()) and len(shared_keys) == len(dict2.keys())): + return False + + dicts_are_equal = True + for key in dict1.keys(): + if type(dict1[key]) is dict: + dicts_are_equal = _compare_dicts(dict1[key], dict2[key]) + else: + dicts_are_equal = (dict1[key] == dict2[key]) and (type(dict1[key]) == type(dict2[key])) + + if not dicts_are_equal: + return False + + return dicts_are_equal + + @pytest.mark.parametrize('send_body', [True, False]) @pytest.mark.parametrize( ('encoder', 'decoder'), [ @@ -16,12 +48,13 @@ ) def test_codec(encoder, decoder, send_body): def check_request(req1, req2): - assert req1.url == req2.url and req1.meta == req2.meta and req1.headers == req2.headers \ - and req1.method == req2.method + assert req1.url == req2.url and _compare_dicts(req1.meta, req2.meta) == True and \ + _compare_dicts(req1.headers, req2.headers) == True and req1.method == req2.method enc = encoder(Request, send_body=send_body) dec = decoder(Request, Response) - req = Request(url="http://www.yandex.ru",method=b'GET', meta={b"test": b"shmest"}, headers={b'reqhdr': b'value'}) + req = Request(url="http://www.yandex.ru", method=b'GET', + meta={b'test': b'shmest', b'scrapy_meta': {'rule': 0, 'key': 'value'}}, headers={b'reqhdr': b'value'}) req2 = Request(url="http://www.yandex.ru/search") msgs = [ enc.encode_add_seeds([req]), @@ -85,3 +118,38 @@ def check_request(req1, req2): o = dec.decode_request(next(it)) check_request(o, req) + + +class TestEncodeDecodeJson(unittest.TestCase): + """ + Test for testing methods `_encode_recursively` and `_decode_recursively` used in json codec + """ + + def test_encode_decode_json_recursively(self): + _int = 1 + _bytes = b'bytes' + _unicode = u'unicode' + _bool = True + _none = None + simple_dict = {'key': 'value'} + simple_list = ['item', 1] + simple_tuple = ('str', 2) + mixed_type_dict = {b'k1': 'v1', 'k2': b'v2', 'int': 1, b'none': None, 'bool': False} + mixed_type_list = [b'i1', 'i2', 23, None, True] + mixed_type_tuple = [b'i1', 'i2', 23, None, True] + nested_dict = {'k1': b'v1', 'lst': [b'i1', 1, ('str', 1, {'k2': b'v1', 'tup': (1, None)})]} + nested_list = [True, None, (1, 2, 3), {b'k1': b'v1', 'tup': ('a', b'b', [None, False])}] + nested_tuple = (1, None, ['a', 'b', True, {b'k1': 'v2', 'lst': ['a', False, (2, 3, 5)]}]) + msgs = [_int, _bytes, _unicode, _bool, _none, simple_dict, simple_list, simple_tuple, + mixed_type_dict, mixed_type_list, mixed_type_tuple, nested_dict, nested_list, nested_tuple] + encoder = json.JSONEncoder() + decoder = json.JSONDecoder() + for original_msg in msgs: + encoded_msg_1 = _convert_and_save_type(original_msg) + encoded_msg_2 = encoder.encode(encoded_msg_1) + decoded_msg_2 = decoder.decode(encoded_msg_2) + decoded_msg_1 = _convert_from_saved_type(decoded_msg_2) + if isinstance(decoded_msg_1, dict): + self.assertDictEqual(decoded_msg_1, original_msg) + elif isinstance(decoded_msg_1, (list, tuple)): + self.assertSequenceEqual(decoded_msg_1, original_msg) diff --git a/tests/test_message_bus_backend.py b/tests/test_message_bus_backend.py index d4753c52e..68278d133 100644 --- a/tests/test_message_bus_backend.py +++ b/tests/test_message_bus_backend.py @@ -49,7 +49,7 @@ def test_page_crawled(self): resp = Response(r1.url, body='body', request=r1) mbb.page_crawled(resp) page = mbb._decoder.decode(mbb.spider_log_producer.messages[0])[1] - self.assertEqual((page.request.url, page.body), (resp.request.url, b'body')) + self.assertEqual((page.request.url, page.body), (resp.request.url, 'body')) def test_links_extracted(self): mbb = self.mbb_setup()