Skip to content

Commit 035be29

Browse files
committed
Merge pull request #139 from scrapinghub/strategy-improvements
crawling strategy interface improvements
2 parents 99e56f7 + 496c03a commit 035be29

File tree

11 files changed

+227
-175
lines changed

11 files changed

+227
-175
lines changed

docs/source/topics/own_crawling_strategy.rst

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,16 @@ Crawling strategy
55
Use ``frontera.worker.strategies.bfs`` module for reference. In general, you need to write a crawling strategy class
66
implementing the interface:
77

8-
.. autoclass:: frontera.core.components.BaseCrawlingStrategy
8+
.. autoclass:: frontera.worker.strategies.BaseCrawlingStrategy
99

1010
**Methods**
1111

12-
.. automethod:: frontera.core.components.BaseCrawlingStrategy.from_worker
13-
.. automethod:: frontera.core.components.BaseCrawlingStrategy.add_seeds
14-
.. automethod:: frontera.core.components.BaseCrawlingStrategy.page_crawled
15-
.. automethod:: frontera.core.components.BaseCrawlingStrategy.page_error
16-
.. automethod:: frontera.core.components.BaseCrawlingStrategy.finished
17-
.. automethod:: frontera.core.components.BaseCrawlingStrategy.close
12+
.. automethod:: frontera.worker.strategies.BaseCrawlingStrategy.from_worker
13+
.. automethod:: frontera.worker.strategies.BaseCrawlingStrategy.add_seeds
14+
.. automethod:: frontera.worker.strategies.BaseCrawlingStrategy.page_crawled
15+
.. automethod:: frontera.worker.strategies.BaseCrawlingStrategy.page_error
16+
.. automethod:: frontera.worker.strategies.BaseCrawlingStrategy.finished
17+
.. automethod:: frontera.worker.strategies.BaseCrawlingStrategy.close
1818

1919

2020
The class can be put in any module and passed to :term:`strategy worker` using command line option or

frontera/contrib/backends/hbase.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,16 +261,15 @@ def update_cache(self, objs):
261261
objs = objs if type(objs) in [list, tuple] else [objs]
262262

263263
def put(obj):
264-
if obj.meta['state'] is not None:
265-
self._state_cache[obj.meta['fingerprint']] = obj.meta['state']
264+
self._state_cache[obj.meta['fingerprint']] = obj.meta['state']
266265
map(put, objs)
267266

268267
def set_states(self, objs):
269268
objs = objs if type(objs) in [list, tuple] else [objs]
270269

271270
def get(obj):
272271
fprint = obj.meta['fingerprint']
273-
obj.meta['state'] = self._state_cache[fprint] if fprint in self._state_cache else None
272+
obj.meta['state'] = self._state_cache[fprint] if fprint in self._state_cache else States.DEFAULT
274273
map(get, objs)
275274

276275
def flush(self, force_clear):

frontera/contrib/backends/memory/__init__.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,11 @@ def __init__(self, cache_size_limit):
120120
self.logger = logging.getLogger("frontera.contrib.backends.memory.MemoryStates")
121121

122122
def _put(self, obj):
123-
if obj.meta['state'] is not None:
124-
self._cache[obj.meta['fingerprint']] = obj.meta['state']
123+
self._cache[obj.meta['fingerprint']] = obj.meta['state']
125124

126125
def _get(self, obj):
127126
fprint = obj.meta['fingerprint']
128-
obj.meta['state'] = self._cache[fprint] if fprint in self._cache else None
127+
obj.meta['state'] = self._cache[fprint] if fprint in self._cache else States.DEFAULT
129128

130129
def update_cache(self, objs):
131130
objs = objs if type(objs) in [list, tuple] else [objs]

frontera/contrib/backends/sqlalchemy/revisiting.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ def _schedule(self, requests):
104104
batch = []
105105
queue_incr = 0
106106
for request in requests:
107-
if request.meta['state'] in [States.NOT_CRAWLED, None]:
107+
if request.meta['state'] in [States.NOT_CRAWLED]:
108108
schedule_at = datetime.utcnow()
109109
elif request.meta['state'] in [States.CRAWLED, States.ERROR]:
110110
schedule_at = datetime.utcnow() + self.interval

frontera/contrib/middlewares/domain.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
from frontera.core.components import Middleware
44
from frontera.utils.url import parse_domain_from_url_fast, parse_domain_from_url
55

6+
# TODO: Why not to put the whole url_parse result here in meta?
7+
68

79
class DomainMiddleware(Middleware):
810
"""

frontera/contrib/middlewares/fingerprint.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ class DomainFingerprintMiddleware(BaseFingerprintMiddleware):
106106
fingerprint_function_name = 'DOMAIN_FINGERPRINT_FUNCTION'
107107

108108
def _add_fingerprint(self, obj):
109-
if 'domain' in obj.meta:
109+
if 'domain' in obj.meta and 'name' in obj.meta['domain']:
110110
obj.meta['domain']['fingerprint'] = self.fingerprint_function(obj.meta['domain']['name'])
111111
if 'redirect_domains' in obj.meta:
112112
for domain in obj.meta['redirect_domains']:

frontera/core/components.py

Lines changed: 1 addition & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ class States(StartStopMixin):
9696
QUEUED = 1
9797
CRAWLED = 2
9898
ERROR = 3
99+
DEFAULT = NOT_CRAWLED
99100

100101
@abstractmethod
101102
def update_cache(self, objs):
@@ -272,75 +273,3 @@ def partition(self, key, partitions=None):
272273
raise NotImplementedError('partition function has to be implemented')
273274

274275

275-
class BaseCrawlingStrategy(object):
276-
"""
277-
Interface definition for a crawling strategy.
278-
279-
Before calling these methods strategy worker is adding 'state' key to meta field in every
280-
:class:`Request <frontera.core.models.Request>` with state of the URL. Pleases refer for the states to HBaseBackend
281-
implementation.
282-
283-
After exiting from all of these methods states from meta field are passed back and stored in the backend.
284-
"""
285-
__metaclass__ = ABCMeta
286-
287-
@classmethod
288-
def from_worker(cls, settings):
289-
"""
290-
Called on instantiation in strategy worker.
291-
292-
:param settings: :class:`Settings <frontera.settings.Settings>` instance
293-
:return: new instance
294-
"""
295-
raise NotImplementedError
296-
297-
@abstractmethod
298-
def add_seeds(self, seeds):
299-
"""
300-
Called when add_seeds event is received from spider log.
301-
302-
:param list seeds: A list of :class:`Request <frontera.core.models.Request>` objects.
303-
:return: dict with keys as fingerprints (as hex string) and values as float scores, if no scheduling is needed,
304-
no fingerprint should be returned
305-
"""
306-
return {}
307-
308-
@abstractmethod
309-
def page_crawled(self, response, links):
310-
"""
311-
Called every time document was successfully crawled, and receiving page_crawled event from spider log.
312-
313-
:param object response: The :class:`Response <frontera.core.models.Response>` object for the crawled page.
314-
:param list links: A list of :class:`Request <frontera.core.models.Request>` objects generated from \
315-
the links extracted for the crawled page.
316-
:return: dict with keys as fingerprints (as hex string) and values as float scores, if no scheduling is needed,
317-
no fingerprint should be returned
318-
"""
319-
return {}
320-
321-
@abstractmethod
322-
def page_error(self, request, error):
323-
"""
324-
Called every time there was error during page downloading.
325-
326-
:param object request: The fetched with error :class:`Request <frontera.core.models.Request>` object.
327-
:param str error: A string identifier for the error.
328-
:return: dict with one key as fingerprint (as hex string) and value as float score, if no scheduling is needed,
329-
empty dict should be returned
330-
"""
331-
return {}
332-
333-
def finished(self):
334-
"""
335-
Called by Strategy worker, after finishing processing each cycle of spider log. If this method returns true,
336-
then Strategy worker reports that crawling goal is achieved, stops and exits.
337-
338-
:return: bool
339-
"""
340-
return False
341-
342-
def close(self):
343-
"""
344-
Called when strategy worker is about to close crawling strategy.
345-
"""
346-
pass

frontera/core/models.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,9 @@ def body(self):
7474
return self._body
7575

7676
def __str__(self):
77-
return "<%s at 0x%0x %s>" % (type(self).__name__, id(self), self.url)
77+
return "<%s at 0x%0x %s meta=%s body=%s... cookies=%s, headers=%s>" % (type(self).__name__, id(self), self.url,
78+
str(self.meta), str(self.body[:20]),
79+
str(self.cookies), str(self.headers))
7880

7981
__repr__ = __str__
8082

@@ -149,6 +151,9 @@ def meta(self):
149151
"is not tied to any request")
150152

151153
def __str__(self):
152-
return "<%s at 0x%0x %s %s>" % (type(self).__name__, id(self), self.status_code, self.url)
154+
return "<%s at 0x%0x %s %s meta=%s body=%s... headers=%s>" % (type(self).__name__,
155+
id(self), self.status_code,
156+
self.url, str(self.meta),
157+
str(self.body[:20]), str(self.headers))
153158

154159
__repr__ = __str__
Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,107 @@
1-
# -*- coding: utf-8 -*-
1+
# -*- coding: utf-8 -*-
2+
from frontera.core.models import Request
3+
from frontera.contrib.middlewares.fingerprint import UrlFingerprintMiddleware
4+
5+
from abc import ABCMeta, abstractmethod
6+
7+
8+
class BaseCrawlingStrategy(object):
9+
"""
10+
Interface definition for a crawling strategy.
11+
12+
Before calling these methods strategy worker is adding 'state' key to meta field in every
13+
:class:`Request <frontera.core.models.Request>` with state of the URL. Pleases refer for the states to HBaseBackend
14+
implementation.
15+
16+
After exiting from all of these methods states from meta field are passed back and stored in the backend.
17+
"""
18+
__metaclass__ = ABCMeta
19+
20+
def __init__(self, manager, mb_stream, states_context):
21+
self._mb_stream = mb_stream
22+
self._states_context = states_context
23+
self.url_mw = UrlFingerprintMiddleware(manager)
24+
25+
@classmethod
26+
def from_worker(cls, manager, mb_stream, states_context):
27+
"""
28+
Called on instantiation in strategy worker.
29+
30+
:param manager: :class: `Backend <frontera.core.manager.FrontierManager>` instance
31+
:param mb_stream: :class: `UpdateScoreStream <frontera.worker.strategy.UpdateScoreStream>` instance
32+
:return: new instance
33+
"""
34+
raise cls(manager, mb_stream, states_context)
35+
36+
@abstractmethod
37+
def add_seeds(self, seeds):
38+
"""
39+
Called when add_seeds event is received from spider log.
40+
41+
:param list seeds: A list of :class:`Request <frontera.core.models.Request>` objects.
42+
"""
43+
return {}
44+
45+
@abstractmethod
46+
def page_crawled(self, response, links):
47+
"""
48+
Called every time document was successfully crawled, and receiving page_crawled event from spider log.
49+
50+
:param object response: The :class:`Response <frontera.core.models.Response>` object for the crawled page.
51+
:param list links: A list of :class:`Request <frontera.core.models.Request>` objects generated from \
52+
the links extracted for the crawled page.
53+
"""
54+
return {}
55+
56+
@abstractmethod
57+
def page_error(self, request, error):
58+
"""
59+
Called every time there was error during page downloading.
60+
61+
:param object request: The fetched with error :class:`Request <frontera.core.models.Request>` object.
62+
:param str error: A string identifier for the error.
63+
"""
64+
return {}
65+
66+
def finished(self):
67+
"""
68+
Called by Strategy worker, after finishing processing each cycle of spider log. If this method returns true,
69+
then Strategy worker reports that crawling goal is achieved, stops and exits.
70+
71+
:return: bool
72+
"""
73+
return False
74+
75+
def close(self):
76+
"""
77+
Called when strategy worker is about to close crawling strategy.
78+
"""
79+
self._mb_stream.flush()
80+
self._states_context.release()
81+
82+
def schedule(self, request, score=1.0, dont_queue=False):
83+
"""
84+
Schedule document for crawling with specified score.
85+
86+
:param request: A :class:`Request <frontera.core.models.Request>` object.
87+
:param score: float from 0.0 to 1.0
88+
:param dont_queue: bool, True - if no need to schedule, only update the score
89+
"""
90+
self._mb_stream.send(request.url, request.meta['fingerprint'], score, dont_queue)
91+
92+
def create_request(self, url, method='GET', headers=None, cookies=None, meta=None, body=''):
93+
"""
94+
Creates request with specified fields, with state fetched from backend.
95+
96+
:param url: str
97+
:param method: str
98+
:param headers: dict
99+
:param cookies: dict
100+
:param meta: dict
101+
:param body: str
102+
:return: :class:`Request <frontera.core.models.Request>`
103+
"""
104+
r = Request(url, method=method, headers=headers, cookies=cookies, meta=meta, body=body)
105+
self.url_mw._add_fingerprint(r)
106+
self._states_context.refresh_and_keep(r)
107+
return r

frontera/worker/strategies/bfs.py

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,27 @@
11
# -*- coding: utf-8 -*-
22
from urlparse import urlparse
3-
from frontera.core.components import States, BaseCrawlingStrategy
3+
from frontera.core.components import States
4+
from frontera.worker.strategies import BaseCrawlingStrategy
45

56

67
class CrawlingStrategy(BaseCrawlingStrategy):
78

8-
@classmethod
9-
def from_worker(cls, settings):
10-
return cls()
11-
129
def add_seeds(self, seeds):
13-
scores = {}
1410
for seed in seeds:
15-
if seed.meta['state'] is None:
16-
scores[seed.meta['fingerprint']] = 1.0
11+
if seed.meta['state'] is States.NOT_CRAWLED:
1712
seed.meta['state'] = States.QUEUED
18-
return scores
13+
self.schedule(seed)
1914

2015
def page_crawled(self, response, links):
21-
scores = {}
2216
response.meta['state'] = States.CRAWLED
2317
for link in links:
24-
if link.meta['state'] is None:
25-
scores[link.meta['fingerprint']] = self.get_score(link.url)
18+
if link.meta['state'] is States.NOT_CRAWLED:
2619
link.meta['state'] = States.QUEUED
27-
return scores
20+
self.schedule(link, self.get_score(link.url))
2821

2922
def page_error(self, request, error):
3023
request.meta['state'] = States.ERROR
31-
return {request.meta['fingerprint']: 0.0}
24+
self.schedule(request, score=0.0, dont_queue=True)
3225

3326
def get_score(self, url):
3427
url_parts = urlparse(url)

0 commit comments

Comments
 (0)