Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 92 additions & 43 deletions frontera/contrib/backends/remote/codecs/json.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -3,29 +3,75 @@
""" """
from __future__ import absolute_import from __future__ import absolute_import
import json import json
import six
from base64 import b64decode, b64encode from base64 import b64decode, b64encode
from frontera.core.codec import BaseDecoder, BaseEncoder from frontera.core.codec import BaseDecoder, BaseEncoder
from w3lib.util import to_unicode, to_native_str from w3lib.util import to_unicode, to_bytes
from frontera.utils.misc import dict_to_unicode, dict_to_bytes

def _convert_and_save_type(obj):
"""
:param obj: dict object

The purpose of this method is to transform the given dict
into a form that would be able to serialize with JSONEncoder.
In order to implement this, this method converts all byte strings
inside a dict to unicode and saves their type for reverting to its
original state. The type and the value are stored as a tuple in the
following format: (original_type, converted value). All other objects
like dict, tuple, list are converted to the same format for the sake
of serialization and for the ease of reverting.
Refer `https://github.com/scrapinghub/frontera/pull/233#discussion_r97432868`
for the detailed explanation about the design.
"""
if isinstance(obj, bytes):
return 'bytes', to_unicode(obj)
elif isinstance(obj, dict):
return 'dict', [(_convert_and_save_type(k), _convert_and_save_type(v)) for k, v in six.iteritems(obj)]
elif isinstance(obj, (list, tuple)):
return type(obj).__name__, [_convert_and_save_type(item) for item in obj]
return 'other', obj


def _convert_from_saved_type(obj):
"""
:param obj: object returned by `_convert_and_save_type`

Restores the original state of the object converted
earlier by `_convert_and_save_type`. This method considers every
first element of the nested tuple as the original type information and
the second value to be the converted value. It applies the original type
recursively on the object to retrieve the original form of the object.
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is super minor, but I would maybe add a check that obj has exactly two items, e.g. with obj_type, obj_value = obj, just in case some list is passed instead of an object returned by _convert_and_save_type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lopuhin thanks for the review. Please see if this looks good to you

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks @voith! 👍

assert len(obj) == 2
obj_type, obj_value = obj
if obj_type == 'bytes':
return to_bytes(obj_value)
elif obj_type == 'dict':
return dict([(_convert_from_saved_type(k), _convert_from_saved_type(v)) for k, v in obj_value])
elif obj_type in ['list', 'tuple']:
_type = list if obj_type == 'list' else tuple
return _type([_convert_from_saved_type(item) for item in obj_value])
return obj_value




def _prepare_request_message(request): def _prepare_request_message(request):
return {'url': to_unicode(request.url), return {'url': request.url,
'method': to_unicode(request.method), 'method': request.method,
'headers': dict_to_unicode(request.headers), 'headers': request.headers,
'cookies': dict_to_unicode(request.cookies), 'cookies': request.cookies,
'meta': dict_to_unicode(request.meta)} 'meta': request.meta}




def _prepare_links_message(links): def _prepare_links_message(links):
return [_prepare_request_message(link) for link in links] return [_prepare_request_message(link) for link in links]




def _prepare_response_message(response, send_body): def _prepare_response_message(response, send_body):
return {'url': to_unicode(response.url), return {'url': response.url,
'status_code': response.status_code, 'status_code': response.status_code,
'meta': dict_to_unicode(response.meta), 'meta': response.meta,
'body': to_unicode(b64encode(response.body)) if send_body else None} 'body': b64encode(response.body) if send_body else None}




class CrawlFrontierJSONEncoder(json.JSONEncoder): class CrawlFrontierJSONEncoder(json.JSONEncoder):
Expand All @@ -45,6 +91,10 @@ def __init__(self, request_model, *a, **kw):
self.send_body = kw.pop('send_body', False) self.send_body = kw.pop('send_body', False)
super(Encoder, self).__init__(request_model, *a, **kw) super(Encoder, self).__init__(request_model, *a, **kw)


def encode(self, obj):
encoded = _convert_and_save_type(obj)
return super(Encoder, self).encode(encoded)

def encode_add_seeds(self, seeds): def encode_add_seeds(self, seeds):
return self.encode({ return self.encode({
'type': 'add_seeds', 'type': 'add_seeds',
Expand Down Expand Up @@ -101,52 +151,51 @@ def __init__(self, request_model, response_model, *a, **kw):
super(Decoder, self).__init__(*a, **kw) super(Decoder, self).__init__(*a, **kw)


def _response_from_object(self, obj): def _response_from_object(self, obj):
url = to_native_str(obj[b'url']) url = obj['url']
request = self._request_model(url=url, request = self._request_model(url=url,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the keys guaranteed to be native strings for this and all the following obj accesses?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have a look at this line https://github.com/scrapinghub/frontera/pull/233/files?diff=split#diff-6cd48fdd254557ffc1b50d60a0323364R154. The previous method called in the decoder was dict_to_bytes which was changing the native_string types to bytes. But the encoder was never changed and it used native_Strings. Since the encoder takes care that it uses native strings the decoder need not worry about any native_string conversion.

Copy link
Member

@sibiryakov sibiryakov Jan 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If url had type 'bytes' then after decoding it will have type bytes, and in Frontera URLs have to be native strings (when in process). so maybe we still need a preventive conversion to native strings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If url had type 'bytes' then after decoding it will have type bytes

You are right about the fact that this PR will save the type of the object it was originally given to the codec and on on decoding it will return the original type of value.

and in Frontera URLs have to be native strings

If this is the case than such type checking and conversion should be done somewhere else, Not In a codec.

so maybe we still need a preventive conversion to native strings

A codec should never change the type of the object it was originally inputed with. Forcing such a mechanism in codec is a hacky way of getting around this problem.

Copy link
Member

@sibiryakov sibiryakov Jan 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A codec should never change the type of the object it was originally inputed with. Forcing such a mechanism in codec is a hacky way of getting around this problem.

Agree, it's not a codec's responsibility. But then we would need to check the rest of the code and make sure we're using native strings for URLs. URLs could appear in crawling strategy (sw), backend, message bus or converters. Because such a change could break the code relying on URLs present as native strings.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, URLs shouldn't be a problem, they are guaranteed to be native strings. @voith I understand. The reason for using dict_to_bytes was that the JSON encoder returns unicode strings and we wanted meta, headers, etc to be bytes. But it seems the new function takes care of that.

meta=obj[b'meta']) meta=obj['meta'])
return self._response_model(url=url, return self._response_model(url=url,
status_code=obj[b'status_code'], status_code=obj['status_code'],
body=b64decode(obj[b'body']) if obj[b'body'] is not None else None, body=b64decode(obj['body']) if obj['body'] is not None else None,
request=request) request=request)


def _request_from_object(self, obj): def _request_from_object(self, obj):
return self._request_model(url=to_native_str(obj[b'url']), return self._request_model(url=obj['url'],
method=obj[b'method'], method=obj['method'],
headers=obj[b'headers'], headers=obj['headers'],
cookies=obj[b'cookies'], cookies=obj['cookies'],
meta=obj[b'meta']) meta=obj['meta'])


def decode(self, message): def decode(self, message):
message = dict_to_bytes(super(Decoder, self).decode(message)) message = _convert_from_saved_type(super(Decoder, self).decode(message))
if message[b'type'] == b'links_extracted': if message['type'] == 'links_extracted':
request = self._request_from_object(message[b'r']) request = self._request_from_object(message['r'])
links = [self._request_from_object(link) for link in message[b'links']] links = [self._request_from_object(link) for link in message['links']]
return ('links_extracted', request, links) return ('links_extracted', request, links)
if message[b'type'] == b'page_crawled': if message['type'] == 'page_crawled':
response = self._response_from_object(message[b'r']) response = self._response_from_object(message['r'])
return ('page_crawled', response) return ('page_crawled', response)
if message[b'type'] == b'request_error': if message['type'] == 'request_error':
request = self._request_from_object(message[b'r']) request = self._request_from_object(message['r'])
return ('request_error', request, to_native_str(message[b'error'])) return ('request_error', request, message['error'])
if message[b'type'] == b'update_score': if message['type'] == 'update_score':
return ('update_score', self._request_from_object(message[b'r']), message[b'score'], message[b'schedule']) return ('update_score', self._request_from_object(message['r']), message['score'], message['schedule'])
if message[b'type'] == b'add_seeds': if message['type'] == 'add_seeds':
seeds = [] seeds = []
for seed in message[b'seeds']: for seed in message['seeds']:
request = self._request_from_object(seed) request = self._request_from_object(seed)
seeds.append(request) seeds.append(request)
return ('add_seeds', seeds) return ('add_seeds', seeds)
if message[b'type'] == b'new_job_id': if message['type'] == 'new_job_id':
return ('new_job_id', int(message[b'job_id'])) return ('new_job_id', int(message['job_id']))
if message[b'type'] == b'offset': if message['type'] == 'offset':
return ('offset', int(message[b'partition_id']), int(message[b'offset'])) return ('offset', int(message['partition_id']), int(message['offset']))
return TypeError('Unknown message type') return TypeError('Unknown message type')


def decode_request(self, message): def decode_request(self, message):
obj = dict_to_bytes(super(Decoder, self).decode(message)) obj = _convert_from_saved_type(super(Decoder, self).decode(message))
return self._request_model(url=to_native_str(obj[b'url']), return self._request_model(url=obj['url'],
method=obj[b'method'], method=obj['method'],
headers=obj[b'headers'], headers=obj['headers'],
cookies=obj[b'cookies'], cookies=obj['cookies'],
meta=obj[b'meta']) meta=obj['meta'])

31 changes: 17 additions & 14 deletions frontera/contrib/backends/remote/codecs/msgpack.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -2,20 +2,21 @@
""" A MsgPack codec for Frontera. Implemented using native msgpack-python library. """ A MsgPack codec for Frontera. Implemented using native msgpack-python library.
""" """
from __future__ import absolute_import from __future__ import absolute_import

import logging
from msgpack import packb, unpackb from msgpack import packb, unpackb


from frontera.core.codec import BaseDecoder, BaseEncoder from frontera.core.codec import BaseDecoder, BaseEncoder
import six import six
from w3lib.util import to_native_str from w3lib.util import to_native_str




logger = logging.getLogger(__name__)


def _prepare_request_message(request): def _prepare_request_message(request):
def serialize(obj): def serialize(obj):
"""Recursively walk object's hierarchy.""" """Recursively walk object's hierarchy."""
if isinstance(obj, six.text_type): if isinstance(obj, (bool, six.integer_types, float, six.binary_type, six.text_type)):
return obj.encode('utf8')
if isinstance(obj, (bool, six.integer_types, float, six.binary_type)):
return obj return obj
elif isinstance(obj, dict): elif isinstance(obj, dict):
obj = obj.copy() obj = obj.copy()
Expand All @@ -29,6 +30,7 @@ def serialize(obj):
elif hasattr(obj, '__dict__'): elif hasattr(obj, '__dict__'):
return serialize(obj.__dict__) return serialize(obj.__dict__)
else: else:
logger.warning('unable to serialize object: {}'.format(obj))
return None return None
return [request.url, request.method, request.headers, request.cookies, serialize(request.meta)] return [request.url, request.method, request.headers, request.cookies, serialize(request.meta)]


Expand All @@ -42,28 +44,29 @@ def __init__(self, request_model, *a, **kw):
self.send_body = True if 'send_body' in kw and kw['send_body'] else False self.send_body = True if 'send_body' in kw and kw['send_body'] else False


def encode_add_seeds(self, seeds): def encode_add_seeds(self, seeds):
return packb([b'as', [_prepare_request_message(seed) for seed in seeds]]) return packb([b'as', [_prepare_request_message(seed) for seed in seeds]], use_bin_type=True)


def encode_page_crawled(self, response): def encode_page_crawled(self, response):
return packb([b'pc', _prepare_response_message(response, self.send_body)]) return packb([b'pc', _prepare_response_message(response, self.send_body)], use_bin_type=True)


def encode_links_extracted(self, request, links): def encode_links_extracted(self, request, links):
return packb([b'le', _prepare_request_message(request), [_prepare_request_message(link) for link in links]]) return packb([b'le', _prepare_request_message(request), [_prepare_request_message(link) for link in links]],
use_bin_type=True)


def encode_request_error(self, request, error): def encode_request_error(self, request, error):
return packb([b're', _prepare_request_message(request), str(error)]) return packb([b're', _prepare_request_message(request), str(error)], use_bin_type=True)


def encode_request(self, request): def encode_request(self, request):
return packb(_prepare_request_message(request)) return packb(_prepare_request_message(request), use_bin_type=True)


def encode_update_score(self, request, score, schedule): def encode_update_score(self, request, score, schedule):
return packb([b'us', _prepare_request_message(request), score, schedule]) return packb([b'us', _prepare_request_message(request), score, schedule], use_bin_type=True)


def encode_new_job_id(self, job_id): def encode_new_job_id(self, job_id):
return packb([b'njid', int(job_id)]) return packb([b'njid', int(job_id)], use_bin_type=True)


def encode_offset(self, partition_id, offset): def encode_offset(self, partition_id, offset):
return packb([b'of', int(partition_id), int(offset)]) return packb([b'of', int(partition_id), int(offset)], use_bin_type=True)




class Decoder(BaseDecoder): class Decoder(BaseDecoder):
Expand All @@ -87,7 +90,7 @@ def _request_from_object(self, obj):
meta=obj[4]) meta=obj[4])


def decode(self, buffer): def decode(self, buffer):
obj = unpackb(buffer) obj = unpackb(buffer, encoding='utf-8')
if obj[0] == b'pc': if obj[0] == b'pc':
return ('page_crawled', return ('page_crawled',
self._response_from_object(obj[1])) self._response_from_object(obj[1]))
Expand All @@ -108,4 +111,4 @@ def decode(self, buffer):
return TypeError('Unknown message type') return TypeError('Unknown message type')


def decode_request(self, buffer): def decode_request(self, buffer):
return self._request_from_object(unpackb(buffer)) return self._request_from_object(unpackb(buffer, encoding='utf-8'))
2 changes: 1 addition & 1 deletion requirements/tests.txt
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ scrapy>=0.24
SQLAlchemy>=1.0.0 SQLAlchemy>=1.0.0
cachetools cachetools
pyzmq pyzmq
msgpack-python msgpack-python>=0.4
kafka-python>=1.0.0 kafka-python>=1.0.0
pytest-cov pytest-cov
happybase>=1.0.0 happybase>=1.0.0
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
], ],
'zeromq': [ 'zeromq': [
'pyzmq', 'pyzmq',
'msgpack-python' 'msgpack-python>=0.4'
], ],
'kafka': [ 'kafka': [
'kafka-python>=1.0.0' 'kafka-python>=1.0.0'
Expand Down
76 changes: 72 additions & 4 deletions tests/test_codecs.py
Original file line number Original file line Diff line number Diff line change
@@ -1,12 +1,44 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-


from __future__ import absolute_import from __future__ import absolute_import
from frontera.contrib.backends.remote.codecs.json import Encoder as JsonEncoder, Decoder as JsonDecoder import json
import unittest
from frontera.contrib.backends.remote.codecs.json import (Encoder as JsonEncoder, Decoder as JsonDecoder,
_convert_and_save_type, _convert_from_saved_type)
from frontera.contrib.backends.remote.codecs.msgpack import Encoder as MsgPackEncoder, Decoder as MsgPackDecoder from frontera.contrib.backends.remote.codecs.msgpack import Encoder as MsgPackEncoder, Decoder as MsgPackDecoder
from frontera.core.models import Request, Response from frontera.core.models import Request, Response
import pytest import pytest




def _compare_dicts(dict1, dict2):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any reason to have such complex machinery for comparing dicts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to be a little bit precise

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me it was not immediately obvious, but I think the main difference from built-in dict comparison is this line https://github.com/scrapinghub/frontera/pull/233/files#diff-252cc4202700183e8f0c782fb28dbfc2R34, so it will catch e.g. python 2 dicts with equal str and unicode values, int/float, etc.

"""
Compares two dicts
:return: True if both dicts are equal else False
"""
if dict1 == None or dict2 == None:
return False

if type(dict1) is not dict or type(dict2) is not dict:
return False

shared_keys = set(dict2.keys()) & set(dict2.keys())

if not (len(shared_keys) == len(dict1.keys()) and len(shared_keys) == len(dict2.keys())):
return False

dicts_are_equal = True
for key in dict1.keys():
if type(dict1[key]) is dict:
dicts_are_equal = _compare_dicts(dict1[key], dict2[key])
else:
dicts_are_equal = (dict1[key] == dict2[key]) and (type(dict1[key]) == type(dict2[key]))

if not dicts_are_equal:
return False

return dicts_are_equal


@pytest.mark.parametrize('send_body', [True, False]) @pytest.mark.parametrize('send_body', [True, False])
@pytest.mark.parametrize( @pytest.mark.parametrize(
('encoder', 'decoder'), [ ('encoder', 'decoder'), [
Expand All @@ -16,12 +48,13 @@
) )
def test_codec(encoder, decoder, send_body): def test_codec(encoder, decoder, send_body):
def check_request(req1, req2): def check_request(req1, req2):
assert req1.url == req2.url and req1.meta == req2.meta and req1.headers == req2.headers \ assert req1.url == req2.url and _compare_dicts(req1.meta, req2.meta) == True and \
and req1.method == req2.method _compare_dicts(req1.headers, req2.headers) == True and req1.method == req2.method


enc = encoder(Request, send_body=send_body) enc = encoder(Request, send_body=send_body)
dec = decoder(Request, Response) dec = decoder(Request, Response)
req = Request(url="http://www.yandex.ru",method=b'GET', meta={b"test": b"shmest"}, headers={b'reqhdr': b'value'}) req = Request(url="http://www.yandex.ru", method=b'GET',
meta={b'test': b'shmest', b'scrapy_meta': {'rule': 0, 'key': 'value'}}, headers={b'reqhdr': b'value'})
req2 = Request(url="http://www.yandex.ru/search") req2 = Request(url="http://www.yandex.ru/search")
msgs = [ msgs = [
enc.encode_add_seeds([req]), enc.encode_add_seeds([req]),
Expand Down Expand Up @@ -85,3 +118,38 @@ def check_request(req1, req2):


o = dec.decode_request(next(it)) o = dec.decode_request(next(it))
check_request(o, req) check_request(o, req)


class TestEncodeDecodeJson(unittest.TestCase):
"""
Test for testing methods `_encode_recursively` and `_decode_recursively` used in json codec
"""

def test_encode_decode_json_recursively(self):
_int = 1
_bytes = b'bytes'
_unicode = u'unicode'
_bool = True
_none = None
simple_dict = {'key': 'value'}
simple_list = ['item', 1]
simple_tuple = ('str', 2)
mixed_type_dict = {b'k1': 'v1', 'k2': b'v2', 'int': 1, b'none': None, 'bool': False}
mixed_type_list = [b'i1', 'i2', 23, None, True]
mixed_type_tuple = [b'i1', 'i2', 23, None, True]
nested_dict = {'k1': b'v1', 'lst': [b'i1', 1, ('str', 1, {'k2': b'v1', 'tup': (1, None)})]}
nested_list = [True, None, (1, 2, 3), {b'k1': b'v1', 'tup': ('a', b'b', [None, False])}]
nested_tuple = (1, None, ['a', 'b', True, {b'k1': 'v2', 'lst': ['a', False, (2, 3, 5)]}])
msgs = [_int, _bytes, _unicode, _bool, _none, simple_dict, simple_list, simple_tuple,
mixed_type_dict, mixed_type_list, mixed_type_tuple, nested_dict, nested_list, nested_tuple]
encoder = json.JSONEncoder()
decoder = json.JSONDecoder()
for original_msg in msgs:
encoded_msg_1 = _convert_and_save_type(original_msg)
encoded_msg_2 = encoder.encode(encoded_msg_1)
decoded_msg_2 = decoder.decode(encoded_msg_2)
decoded_msg_1 = _convert_from_saved_type(decoded_msg_2)
if isinstance(decoded_msg_1, dict):
self.assertDictEqual(decoded_msg_1, original_msg)
elif isinstance(decoded_msg_1, (list, tuple)):
self.assertSequenceEqual(decoded_msg_1, original_msg)
2 changes: 1 addition & 1 deletion tests/test_message_bus_backend.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def test_page_crawled(self):
resp = Response(r1.url, body='body', request=r1) resp = Response(r1.url, body='body', request=r1)
mbb.page_crawled(resp) mbb.page_crawled(resp)
page = mbb._decoder.decode(mbb.spider_log_producer.messages[0])[1] page = mbb._decoder.decode(mbb.spider_log_producer.messages[0])[1]
self.assertEqual((page.request.url, page.body), (resp.request.url, b'body')) self.assertEqual((page.request.url, page.body), (resp.request.url, 'body'))


def test_links_extracted(self): def test_links_extracted(self):
mbb = self.mbb_setup() mbb = self.mbb_setup()
Expand Down