diff --git a/.travis.yml b/.travis.yml index 0f0d34f31..3a9f8062c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -47,6 +47,8 @@ before_script: - docker-compose --version - docker-compose --verbose -f tests/kafka/docker-compose.yml up -d - docker ps -a + - docker run --name cassandra -p 127.0.0.1:9042:9042 -d cassandra + - python tests/contrib/backends/cassandra/wait_for_cluster_up.py script: tox diff --git a/docs/source/topics/frontera-settings.rst b/docs/source/topics/frontera-settings.rst index 805fc52f7..deec37349 100644 --- a/docs/source/topics/frontera-settings.rst +++ b/docs/source/topics/frontera-settings.rst @@ -487,6 +487,97 @@ Default: ``timedelta(days=1)`` Time between document visits, expressed in ``datetime.timedelta`` objects. Changing of this setting will only affect documents scheduled after the change. All previously queued documents will be crawled with old periodicity. +.. _cassandra-settings: + +Cassandra +--------- + +.. setting:: CASSANDRABACKEND_CACHE_SIZE + +CASSANDRABACKEND_CACHE_SIZE +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Default:: ``10000`` + +Cassandra Metadata LRU Cache size. It's used for caching objects, which are requested from DB every time already known, +documents are crawled. This is mainly saves DB throughput, increase it if you're experiencing problems with too high +volume of SELECT's to Metadata table, or decrease if you need to save memory. + + +.. setting:: CASSANDRABACKEND_CLUSTER_HOSTS + +CASSANDRABACKEND_CLUSTER_HOSTS +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Default:: ``['127.0.0.1']`` + +The list of contact points to try connecting for cluster discovery. All contact points are not required, the driver +discovers the rest. + +.. setting:: CASSANDRABACKEND_CLUSTER_PORT + +CASSANDRABACKEND_CLUSTER_PORT +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Default:: ``9042`` + +The server-side port to open connections to Cassandra. + +.. setting:: CASSANDRABACKEND_DROP_ALL_TABLES + +CASSANDRABACKEND_DROP_ALL_TABLES +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Default: ``False`` + +Set to ``True`` to drop and create all DB tables on backend instantiation. + +.. setting:: CASSANDRABACKEND_KEYSPACE + +CASSANDRABACKEND_KEYSPACE +^^^^^^^^^^^^^^^^^^^^^^^^^ + +Default:: ``crawler`` + +Set Cassandra Keyspace. + +.. setting:: CASSANDRABACKEND_MODELS + +CASSANDRABACKEND_MODELS +^^^^^^^^^^^^^^^^^^^^^^^ + +Default:: + + { + 'MetadataModel': 'frontera.contrib.backends.cassandra.models.MetadataModel', + 'StateModel': 'frontera.contrib.backends.cassandra.models.StateModel', + 'QueueModel': 'frontera.contrib.backends.cassandra.models.QueueModel', + 'FifoOrLIfoQueueModel': 'frontera.contrib.backends.cassandra.models.FifoOrLIfoQueueModel', + } + +This is mapping of Cassandra models used by backends. It is mainly used for customization. + +.. setting:: CASSANDRABACKEND_REQUEST_TIMEOUT + +CASSANDRABACKEND_REQUEST_TIMEOUT +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Default:: ``60`` + +Timeout in seconds for every request made by the Cassandra driver for to Cassandra. + +Revisiting backend +------------------ + +.. setting:: CASSANDRABACKEND_REVISIT_INTERVAL + +CASSANDRABACKEND_REVISIT_INTERVAL +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Default: ``timedelta(days=1)`` + +Time between document visits, expressed in ``datetime.timedelta`` objects. Changing of this setting will only affect +documents scheduled after the change. All previously queued documents will be crawled with old periodicity. .. _hbase-settings: diff --git a/docs/source/topics/frontier-backends.rst b/docs/source/topics/frontier-backends.rst index 706da8377..cb6a715a0 100644 --- a/docs/source/topics/frontier-backends.rst +++ b/docs/source/topics/frontier-backends.rst @@ -254,13 +254,52 @@ For a complete list of all settings used for SQLAlchemy backends check the :doc: SQLAlchemy :class:`Backend ` implementation of a random selection algorithm. +.. _frontier-backends-cassandra: + +Cassandra backends +^^^^^^^^^^^^^^^^^^ + +This set of :class:`Backend ` objects will use Cassandra as storage for +:ref:`basic algorithms `. + +If you need to use your own `cassandra models`_, you can do it by using the +:setting:`CASSANDRABACKEND_MODELS` setting. + +This setting uses a dictionary where ``key`` represents the name of the model to define and ``value`` the model to use. + +For a complete list of all settings used for Cassandra backends check the :doc:`settings ` section. + +.. class:: frontera.contrib.backends.cassandra.BASE + + Base class for Cassandra :class:`Backend ` objects. + +.. class:: frontera.contrib.backends.cassandra.FIFO + + Cassandra :class:`Backend ` implementation of `FIFO`_ algorithm. + +.. class:: frontera.contrib.backends.cassandra.LIFO + + Cassandra :class:`Backend ` implementation of `LIFO`_ algorithm. + +.. class:: frontera.contrib.backends.cassandra.BFS + + Cassandra :class:`Backend ` implementation of `BFS`_ algorithm. + +.. class:: frontera.contrib.backends.cassandra.DFS + + Cassandra :class:`Backend ` implementation of `DFS`_ algorithm. + +.. class:: frontera.contrib.backends.cassandra.Distributed + + Cassandra :class:`Backend ` implementation of a distributed backend. Revisiting backend ^^^^^^^^^^^^^^^^^^ -Based on custom SQLAlchemy backend, and queue. Crawling starts with seeds. After seeds are crawled, every new -document will be scheduled for immediate crawling. On fetching every new document will be scheduled for recrawling -after fixed interval set by :setting:`SQLALCHEMYBACKEND_REVISIT_INTERVAL`. +There are two backends for Revisiting which are based on Cassandra and SqlAlchemy Backend and Queue. Crawling starts +with seeds. After seeds are crawled, every new document will be scheduled for immediate crawling. On fetching every new +document will be scheduled for recrawling after fixed interval set by :setting:`SQLALCHEMYBACKEND_REVISIT_INTERVAL` or +:setting:`CASSANDRABACKEND_REVISIT_INTERVAL`. Current implementation of revisiting backend has no prioritization. During long term runs spider could go idle, because there are no documents available for crawling, but there are documents waiting for their scheduled revisit time. @@ -270,6 +309,9 @@ there are no documents available for crawling, but there are documents waiting f Base class for SQLAlchemy :class:`Backend ` implementation of revisiting back-end. +.. class:: frontera.contrib.backends.cassandra.revisiting.Backend + + Base class for Cassandra :class:`Backend ` implementation of revisiting back-end. HBase backend ^^^^^^^^^^^^^ @@ -298,3 +340,4 @@ setting. .. _SQLAlchemy: http://www.sqlalchemy.org/ .. _any databases supported by SQLAlchemy: http://docs.sqlalchemy.org/en/latest/dialects/index.html .. _declarative sqlalchemy models: http://docs.sqlalchemy.org/en/latest/orm/extensions/declarative/index.html +.. _cassandra models: https://datastax.github.io/python-driver/cqlengine/models.html diff --git a/frontera/contrib/backends/__init__.py b/frontera/contrib/backends/__init__.py index 2dc89a1ee..f52b6aee2 100644 --- a/frontera/contrib/backends/__init__.py +++ b/frontera/contrib/backends/__init__.py @@ -1,9 +1,14 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import from collections import OrderedDict +from datetime import datetime from frontera import Backend -from frontera.core.components import States +from frontera.core.components import States, Queue as BaseQueue, DistributedBackend +from frontera.core.models import Request, Response +from frontera.utils.misc import utcnow_timestamp + +from w3lib.util import to_native_str class CommonBackend(Backend): @@ -84,3 +89,120 @@ def request_error(self, request, error): def finished(self): return self.queue_size == 0 + + +class CommonStorageBackend(CommonBackend): + + @property + def queue(self): + return self._queue + + @property + def metadata(self): + return self._metadata + + @property + def states(self): + return self._states + + +class CommonDistributedStorageBackend(DistributedBackend): + + @property + def queue(self): + return self._queue + + @property + def metadata(self): + return self._metadata + + @property + def states(self): + return self._states + + def frontier_start(self): + for component in [self.metadata, self.queue, self.states]: + if component: + component.frontier_start() + + def frontier_stop(self): + for component in [self.metadata, self.queue, self.states]: + if component: + component.frontier_stop() + + def add_seeds(self, seeds): + self.metadata.add_seeds(seeds) + + def get_next_requests(self, max_next_requests, **kwargs): + partitions = kwargs.pop('partitions', [0]) # TODO: Collect from all known partitions + batch = [] + for partition_id in partitions: + batch.extend(self.queue.get_next_requests(max_next_requests, partition_id, **kwargs)) + return batch + + def page_crawled(self, response): + self.metadata.page_crawled(response) + + def links_extracted(self, request, links): + self.metadata.links_extracted(request, links) + + def request_error(self, request, error): + self.metadata.request_error(request, error) + + def finished(self): + raise NotImplementedError + + +class CreateOrModifyPageMixin(object): + + def _create_page(self, obj): + db_page = self.model() + db_page.fingerprint = to_native_str(obj.meta[b'fingerprint']) + db_page.url = obj.url + db_page.created_at = datetime.utcnow() + db_page.meta = obj.meta + db_page.depth = 0 + + if isinstance(obj, Request): + db_page.headers = obj.headers + db_page.method = to_native_str(obj.method) + db_page.cookies = obj.cookies + elif isinstance(obj, Response): + db_page.headers = obj.request.headers + db_page.method = to_native_str(obj.request.method) + db_page.cookies = obj.request.cookies + db_page.status_code = obj.status_code + return db_page + + def _modify_page(self, obj): + db_page = self.cache[obj.meta[b'fingerprint']] + db_page.fetched_at = datetime.utcnow() + if isinstance(obj, Response): + db_page.headers = obj.request.headers + db_page.method = to_native_str(obj.request.method) + db_page.cookies = obj.request.cookies + db_page.status_code = obj.status_code + return db_page + + +class CommonRevisitingStorageBackendMixin(object): + + def _schedule(self, requests): + batch = [] + for request in requests: + if request.meta[b'state'] in [States.NOT_CRAWLED]: + request.meta[b'crawl_at'] = utcnow_timestamp() + elif request.meta[b'state'] in [States.CRAWLED, States.ERROR]: + request.meta[b'crawl_at'] = utcnow_timestamp() + self.interval + else: + continue # QUEUED + batch.append((request.meta[b'fingerprint'], self._get_score(request), request, True)) + self.queue.schedule(batch) + self.metadata.update_score(batch) + self.queue_size += len(batch) + + def page_crawled(self, response): + super(CommonRevisitingStorageBackendMixin, self).page_crawled(response) + self.states.set_states(response.request) + self._schedule([response.request]) + self.states.update_cache(response.request) diff --git a/frontera/contrib/backends/cassandra/__init__.py b/frontera/contrib/backends/cassandra/__init__.py new file mode 100644 index 000000000..ed952bd4a --- /dev/null +++ b/frontera/contrib/backends/cassandra/__init__.py @@ -0,0 +1,145 @@ +from __future__ import absolute_import + +import six +from cassandra.cqlengine import connection +from cassandra.cqlengine.management import drop_table + +from frontera.contrib.backends import (CommonDistributedStorageBackend, + CommonStorageBackend) +from frontera.contrib.backends.cassandra.components import (Metadata, + BroadCrawlingQueue, + Queue, States) +from frontera.utils.misc import load_object + + +class CassandraBackend(CommonStorageBackend): + + def __init__(self, manager): + self.manager = manager + settings = manager.settings + cluster_hosts = settings.get('CASSANDRABACKEND_CLUSTER_HOSTS') + cluster_port = settings.get('CASSANDRABACKEND_CLUSTER_PORT') + drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES') + models = settings.get('CASSANDRABACKEND_MODELS') + keyspace = settings.get('CASSANDRABACKEND_KEYSPACE') + + self.models = dict([(name, load_object(cls)) for name, cls in six.iteritems(models)]) + cluster_kwargs = { + 'port': cluster_port, + 'compression': True, + } + if not connection.cluster: + connection.setup(cluster_hosts, keyspace, **cluster_kwargs) + connection.session.default_timeout = settings.get('CASSANDRABACKEND_REQUEST_TIMEOUT') + + if drop_all_tables: + for name, table in six.iteritems(self.models): + drop_table(table) + + self._metadata = Metadata(self.models['MetadataModel'], settings.get('CASSANDRABACKEND_CACHE_SIZE')) + self._states = States(self.models['StateModel'], settings.get('STATE_CACHE_SIZE_LIMIT')) + self._queue = self._create_queue(settings) + + def frontier_stop(self): + self.states.flush() + connection.unregister_connection('default') + + def _create_queue(self, settings): + return Queue(self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS')) + + +class FIFOBackend(CassandraBackend): + component_name = 'Cassandra FIFO Backend' + + def _create_queue(self, settings): + return Queue(self.models['FifoOrLIfoQueueModel'], + settings.get('SPIDER_FEED_PARTITIONS'), + ordering='created') + + +class LIFOBackend(CassandraBackend): + component_name = 'Cassandra LIFO Backend' + + def _create_queue(self, settings): + return Queue(self.models['FifoOrLIfoQueueModel'], + settings.get('SPIDER_FEED_PARTITIONS'), + ordering='created_desc') + + +class DFSBackend(CassandraBackend): + component_name = 'Cassandra DFS Backend' + + def _create_queue(self, settings): + return Queue(self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS')) + + def _get_score(self, obj): + return -obj.meta[b'depth'] + + +class BFSBackend(CassandraBackend): + component_name = 'Cassandra BFS Backend' + + def _create_queue(self, settings): + return Queue(self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS')) + + def _get_score(self, obj): + return obj.meta[b'depth'] + + +BASE = CassandraBackend +LIFO = LIFOBackend +FIFO = FIFOBackend +DFS = DFSBackend +BFS = BFSBackend + + +class Distributed(CommonDistributedStorageBackend): + def __init__(self, manager): + self.manager = manager + settings = manager.settings + cluster_hosts = settings.get('CASSANDRABACKEND_CLUSTER_HOSTS') + cluster_port = settings.get('CASSANDRABACKEND_CLUSTER_PORT') + models = settings.get('CASSANDRABACKEND_MODELS') + keyspace = settings.get('CASSANDRABACKEND_KEYSPACE') + + self.models = dict([(name, load_object(cls)) for name, cls in six.iteritems(models)]) + cluster_kwargs = { + 'port': cluster_port, + 'compression': True, + } + if not connection.cluster: + connection.setup(cluster_hosts, keyspace, **cluster_kwargs) + connection.session.default_timeout = settings.get('CASSANDRABACKEND_REQUEST_TIMEOUT') + + self._metadata = None + self._queue = None + self._states = None + + @classmethod + def strategy_worker(cls, manager): + b = cls(manager) + settings = manager.settings + drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES') + state_model = b.models['StateModel'] + if drop_all_tables: + drop_table(state_model) + b._states = States(state_model, settings.get('STATE_CACHE_SIZE_LIMIT')) + return b + + @classmethod + def db_worker(cls, manager): + b = cls(manager) + settings = manager.settings + drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES') + metadata_model = b.models['MetadataModel'] + queue_model = b.models['QueueModel'] + if drop_all_tables: + drop_table(metadata_model) + drop_table(queue_model) + b._metadata = Metadata(metadata_model, settings.get('CASSANDRABACKEND_CACHE_SIZE')) + b._queue = BroadCrawlingQueue(queue_model, settings.get('SPIDER_FEED_PARTITIONS')) + return b + + def frontier_stop(self): + super(Distributed, self).frontier_stop() + connection.unregister_connection('default') diff --git a/frontera/contrib/backends/cassandra/components.py b/frontera/contrib/backends/cassandra/components.py new file mode 100644 index 000000000..d832b2436 --- /dev/null +++ b/frontera/contrib/backends/cassandra/components.py @@ -0,0 +1,239 @@ +# -*- coding: utf-8 -*- +import logging +import uuid +from time import time + +import six +from cachetools import LRUCache +from cassandra.cqlengine.management import sync_table +from cassandra.cqlengine.query import BatchQuery +from w3lib.util import to_bytes, to_native_str + +from frontera.contrib.backends import CreateOrModifyPageMixin +from frontera.contrib.backends.memory import MemoryStates +from frontera.contrib.backends.partitioners import Crc32NamePartitioner +from frontera.core.components import Metadata as BaseMetadata +from frontera.core.components import Queue as BaseQueue +from frontera.core.models import Request +from frontera.utils.misc import chunks, get_crc32 +from frontera.utils.url import parse_domain_from_url_fast + + +class Metadata(BaseMetadata, CreateOrModifyPageMixin): + + def __init__(self, model_cls, cache_size): + self.model = model_cls + self.cache = LRUCache(cache_size) + self.batch = BatchQuery() + self.logger = logging.getLogger("frontera.contrib.backends.cassandra.components.Metadata") + sync_table(model_cls) + + def frontier_stop(self): + pass + + def add_seeds(self, seeds): + for seed in seeds: + page = self._create_page(seed) + self._add_to_batch_and_update_cache(page) + self.batch.execute() + + def request_error(self, page, error): + page = self._modify_page(page) if page.meta[b'fingerprint'] in self.cache else self._create_page(page) + page.error = error + self._add_to_batch_and_update_cache(page) + self.batch.execute() + + def page_crawled(self, response): + page = self._modify_page(response) \ + if response.meta[b'fingerprint'] in self.cache else self._create_page(response) + self._add_to_batch_and_update_cache(page) + self.batch.execute() + + def links_extracted(self, request, links): + for link in links: + if link.meta[b'fingerprint'] not in self.cache: + page = self._create_page(link) + self._add_to_batch_and_update_cache(page) + self.batch.execute() + + def update_score(self, batch): + if isinstance(batch, dict): + batch = [(fprint, score, url, schedule) for fprint, (score, url, schedule) in six.iteritems(batch)] + for fprint, score, url, schedule in batch: + page = self.cache[fprint] + page.fingerprint = to_native_str(fprint) + page.score = score + self._add_to_batch_and_update_cache(page) + self.batch.execute() + + def _add_to_batch_and_update_cache(self, page): + self.cache[to_bytes(page.fingerprint)] = page.batch(self.batch).save() + + +class States(MemoryStates): + + def __init__(self, model_cls, cache_size_limit): + super(States, self).__init__(cache_size_limit) + self.model = model_cls + self.batch = BatchQuery() + self.logger = logging.getLogger("frontera.contrib.backends.cassandra.components.States") + sync_table(model_cls) + + def frontier_stop(self): + self.flush() + + def fetch(self, fingerprints): + to_fetch = [to_native_str(f) for f in fingerprints if f not in self._cache] + self.logger.debug("cache size %s", len(self._cache)) + self.logger.debug("to fetch %d from %d", len(to_fetch), len(fingerprints)) + + for chunk in chunks(to_fetch, 128): + for state in self.model.objects.filter(fingerprint__in=chunk): + self._cache[to_bytes(state.fingerprint)] = state.state + + def flush(self, force_clear=False): + for fingerprint, state_val in six.iteritems(self._cache): + state = self.model(fingerprint=to_native_str(fingerprint), state=state_val) + state.batch(self.batch).save() + self.batch.execute() + self.logger.debug("State cache has been flushed.") + super(States, self).flush(force_clear) + + +class Queue(BaseQueue): + + def __init__(self, queue_cls, partitions, ordering='default'): + self.queue_model = queue_cls + self.logger = logging.getLogger("frontera.contrib.backends.cassandra.components.Queue") + self.partitions = [i for i in range(0, partitions)] + self.partitioner = Crc32NamePartitioner(self.partitions) + self.ordering = ordering + self.batch = BatchQuery() + sync_table(queue_cls) + + def frontier_stop(self): + pass + + def _order_by(self, query): + if self.ordering == 'created': + return query.order_by('created_at') + if self.ordering == 'created_desc': + return query.order_by('-created_at') + return query.order_by('score', 'created_at') # TODO: remove second parameter, + # it's not necessary for proper crawling, but needed for tests + + def get_next_requests(self, max_n_requests, partition_id, **kwargs): + """ + Dequeues new batch of requests for crawling. + + :param max_n_requests: maximum number of requests to return + :param partition_id: partition id + :return: list of :class:`Request ` objects. + """ + results = [] + try: + for item in self._order_by(self.queue_model.filter(partition_id=partition_id).allow_filtering()).\ + limit(max_n_requests): + method = item.method or b'GET' + r = Request(item.url, method=method, meta=item.meta, headers=item.headers, cookies=item.cookies) + r.meta[b'fingerprint'] = to_bytes(item.fingerprint) + r.meta[b'score'] = item.score + results.append(r) + item.batch(self.batch).delete() + self.batch.execute() + except Exception as exc: + self.logger.exception(exc) + return results + + def schedule(self, batch): + for fprint, score, request, schedule in batch: + if schedule: + _, hostname, _, _, _, _ = parse_domain_from_url_fast(request.url) + if not hostname: + self.logger.error("Can't get hostname for URL %s, fingerprint %s" % (request.url, fprint)) + partition_id = self.partitions[0] + host_crc32 = 0 + else: + partition_id = self.partitioner.partition(hostname, self.partitions) + host_crc32 = get_crc32(hostname) + q = self.queue_model(id=uuid.uuid4(), + fingerprint=to_native_str(fprint), + score=score, + url=request.url, + meta=request.meta, + headers=request.headers, + cookies=request.cookies, + method=to_native_str(request.method), + partition_id=partition_id, + host_crc32=host_crc32, + created_at=time() * 1E+6) + q.batch(self.batch).save() + request.meta[b'state'] = States.QUEUED + self.batch.execute() + + def count(self): + return self.queue_model.objects.count() + + +class BroadCrawlingQueue(Queue): + GET_RETRIES = 3 + + def get_next_requests(self, max_n_requests, partition_id, **kwargs): + """ + Dequeues new batch of requests for crawling. + + Priorities, from highest to lowest: + - max_requests_per_host + - max_n_requests + - min_hosts & min_requests + + :param max_n_requests: + :param partition_id: + :param kwargs: min_requests, min_hosts, max_requests_per_host + :return: list of :class:`Request ` objects. + """ + min_requests = kwargs.pop("min_requests", None) + min_hosts = kwargs.pop("min_hosts", None) + max_requests_per_host = kwargs.pop("max_requests_per_host", None) + assert(max_n_requests > min_requests) + + queue = {} + limit = max_n_requests + tries = 0 + count = 0 + while tries < self.GET_RETRIES: + tries += 1 + limit *= 5.5 if tries > 1 else 1.0 + self.logger.debug("Try %d, limit %d, last attempt: requests %d, hosts %d", + tries, limit, count, len(queue.keys())) + queue.clear() + count = 0 + for item in self._order_by(self.queue_model.filter(partition_id=partition_id).allow_filtering()).\ + limit(max_n_requests): + if item.host_crc32 not in queue: + queue[item.host_crc32] = [] + if max_requests_per_host is not None and len(queue[item.host_crc32]) > max_requests_per_host: + continue + queue[item.host_crc32].append(item) + count += 1 + if count > max_n_requests: + break + if min_hosts is not None and len(queue.keys()) < min_hosts: + continue + if min_requests is not None and count < min_requests: + continue + break + self.logger.debug("Finished: tries %d, hosts %d, requests %d", tries, len(queue.keys()), count) + + results = [] + for items in six.itervalues(queue): + for item in items: + method = item.method or b'GET' + results.append(Request(item.url, + method=method, + meta=item.meta, + headers=item.headers, + cookies=item.cookies)) + item.batch(self.batch).delete() + self.batch.execute() + return results diff --git a/frontera/contrib/backends/cassandra/models.py b/frontera/contrib/backends/cassandra/models.py new file mode 100644 index 000000000..af20fdcbd --- /dev/null +++ b/frontera/contrib/backends/cassandra/models.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +import pickle +import six + +from cassandra.cqlengine.columns import (UUID, BigInt, Bytes, DateTime, Float, + Integer, SmallInt, Text) +from cassandra.cqlengine.models import Model + + +class PickleDict(Bytes): + """ + PickleDict applies Python's ``pickle.dumps()`` to incoming objects + if the value received is a dict, and ``pickle.loads()`` on the way out. + """ + + def to_database(self, value): + if value is None: + return + if isinstance(value, dict): + value = self._pickle_object(value) + return super(PickleDict, self).to_database(value) + + def to_python(self, value): + value = super(PickleDict, self).to_python(value) + if value is None: + return + if isinstance(value, dict): + return value + try: + return self._unpickle_object(value) + except TypeError: + return value + + def _pickle_object(self, obj): + pickled = pickle.dumps(obj) + return pickled.encode('hex') if six.PY2 else pickled + + def _unpickle_object(self, pickled_obj): + obj = pickled_obj.decode('hex') if six.PY2 else pickled_obj + return pickle.loads(obj) + + +class MetadataModel(Model): + __table_name__ = 'metadata' + + fingerprint = Text(primary_key=True) + url = Text(required=True) + depth = Integer(required=True) + created_at = DateTime(required=True) + fetched_at = DateTime() + status_code = Integer() + score = Float() + error = Text() + meta = PickleDict() + headers = PickleDict() + cookies = PickleDict() + method = Text() + + def __repr__(self): + return '' % (self.url, self.fingerprint) + + +class StateModel(Model): + __table_name__ = 'states' + + fingerprint = Text(primary_key=True) + state = SmallInt(required=True) + + def __repr__(self): + return '' % (self.fingerprint, self.state) + + +class BaseQueueModel(Model): + __abstract__ = True + + url = Text(required=True) + fingerprint = Text(required=True) + host_crc32 = Integer(required=True) + meta = PickleDict() + headers = PickleDict() + cookies = PickleDict() + method = Text() + depth = SmallInt() + + def __repr__(self): + return '' % (self.url, self.id) + + +class QueueModel(BaseQueueModel): + __abstract__ = False + __table_name__ = 'queue' + + partition_id = Integer(primary_key=True) + score = Float(primary_key=True) + created_at = BigInt(primary_key=True) + id = UUID(primary_key=True) + + +class FifoOrLIfoQueueModel(BaseQueueModel): + __abstract__ = False + __table_name__ = 'fifo_lifo_queue' + + partition_id = Integer(primary_key=True) + score = Float(required=True) + created_at = BigInt(primary_key=True) + id = UUID(primary_key=True) + + +class RevisitingQueueModel(BaseQueueModel): + __abstract__ = False + __table_name__ = 'revisiting_queue' + + partition_id = Integer(primary_key=True) + crawl_at = BigInt(primary_key=True) + id = UUID(primary_key=True) + score = Float(required=True) + created_at = BigInt(required=True) diff --git a/frontera/contrib/backends/cassandra/revisiting.py b/frontera/contrib/backends/cassandra/revisiting.py new file mode 100644 index 000000000..39f1400c3 --- /dev/null +++ b/frontera/contrib/backends/cassandra/revisiting.py @@ -0,0 +1,101 @@ +# -*- coding: utf-8 -*- +import logging +import uuid +from datetime import timedelta +from time import time + +from cassandra.cqlengine.management import sync_table +from cassandra.cqlengine.query import BatchQuery +from w3lib.util import to_native_str + +from frontera import Request +from frontera.contrib.backends import CommonRevisitingStorageBackendMixin +from frontera.contrib.backends.cassandra import CassandraBackend +from frontera.contrib.backends.cassandra.models import RevisitingQueueModel +from frontera.contrib.backends.partitioners import Crc32NamePartitioner +from frontera.core.components import Queue as BaseQueue +from frontera.core.components import States +from frontera.utils.misc import get_crc32, utcnow_timestamp +from frontera.utils.url import parse_domain_from_url_fast + + +class RevisitingQueue(BaseQueue): + def __init__(self, queue_cls, partitions): + self.queue_model = queue_cls + self.logger = logging.getLogger("frontera.contrib.backends.cassandra.revisiting.RevisitingQueue") + self.partitions = [i for i in range(0, partitions)] + self.partitioner = Crc32NamePartitioner(self.partitions) + self.batch = BatchQuery() + sync_table(queue_cls) + + def frontier_stop(self): + pass + + def get_next_requests(self, max_n_requests, partition_id, **kwargs): + results = [] + try: + for item in self.queue_model.objects.filter(partition_id=partition_id, + crawl_at__lte=utcnow_timestamp()).limit(max_n_requests): + method = 'GET' if not item.method else item.method + results.append(Request(item.url, method=method, meta=item.meta, headers=item.headers, + cookies=item.cookies)) + item.batch(self.batch).delete() + self.batch.execute() + except Exception as exc: + self.logger.exception(exc) + return results + + def schedule(self, batch): + for fprint, score, request, schedule in batch: + if schedule: + _, hostname, _, _, _, _ = parse_domain_from_url_fast(request.url) + if not hostname: + self.logger.error("Can't get hostname for URL %s, fingerprint %s" % (request.url, fprint)) + partition_id = self.partitions[0] + host_crc32 = 0 + else: + partition_id = self.partitioner.partition(hostname, self.partitions) + host_crc32 = get_crc32(hostname) + schedule_at = request.meta[b'crawl_at'] if b'crawl_at' in request.meta else utcnow_timestamp() + q = self.queue_model(id=uuid.uuid4(), + fingerprint=to_native_str(fprint), + score=score, + url=request.url, + meta=request.meta, + headers=request.headers, + cookies=request.cookies, + method=to_native_str(request.method), + partition_id=partition_id, + host_crc32=host_crc32, + created_at=time() * 1E+6, + crawl_at=schedule_at) + q.batch(self.batch).save() + request.meta[b'state'] = States.QUEUED + self.batch.execute() + + def _create_queue_obj(self, fprint, score, request, partition_id, host_crc32, schedule_at): + q = self.queue_model(id=uuid.uuid4(), + fingerprint=to_native_str(fprint), + score=score, + url=request.url, + meta=request.meta, + headers=request.headers, + cookies=request.cookies, + method=to_native_str(request.method), + partition_id=partition_id, + host_crc32=host_crc32, + created_at=time() * 1E+6, + crawl_at=schedule_at) + return q + + def count(self): + return self.queue_model.objects.count() + + +class Backend(CommonRevisitingStorageBackendMixin, CassandraBackend): + + def _create_queue(self, settings): + self.interval = settings.get("CASSANDRABACKEND_REVISIT_INTERVAL") + assert isinstance(self.interval, timedelta) + self.interval = self.interval.total_seconds() + return RevisitingQueue(RevisitingQueueModel, settings.get('SPIDER_FEED_PARTITIONS')) diff --git a/frontera/contrib/backends/sqlalchemy/__init__.py b/frontera/contrib/backends/sqlalchemy/__init__.py index b8e7b8aa1..dcff0ca1d 100644 --- a/frontera/contrib/backends/sqlalchemy/__init__.py +++ b/frontera/contrib/backends/sqlalchemy/__init__.py @@ -4,14 +4,14 @@ from sqlalchemy.orm import sessionmaker from sqlalchemy.engine.reflection import Inspector -from frontera.core.components import DistributedBackend -from frontera.contrib.backends import CommonBackend +from frontera.contrib.backends import CommonBackend, CommonStorageBackend, CommonDistributedStorageBackend from frontera.contrib.backends.sqlalchemy.components import Metadata, Queue, States from frontera.contrib.backends.sqlalchemy.models import DeclarativeBase from frontera.utils.misc import load_object -class SQLAlchemyBackend(CommonBackend): +class SQLAlchemyBackend(CommonStorageBackend): + def __init__(self, manager): self.manager = manager settings = manager.settings @@ -47,19 +47,7 @@ def frontier_stop(self): self.engine.dispose() def _create_queue(self, settings): - return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS')) - - @property - def queue(self): - return self._queue - - @property - def metadata(self): - return self._metadata - - @property - def states(self): - return self._states + return Queue(self.session, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS')) class FIFOBackend(SQLAlchemyBackend): @@ -105,7 +93,7 @@ def _get_score(self, obj): BFS = BFSBackend -class Distributed(DistributedBackend): +class Distributed(CommonDistributedStorageBackend): def __init__(self, manager): self.manager = manager settings = manager.settings @@ -171,48 +159,3 @@ def db_worker(cls, manager): settings.get('SQLALCHEMYBACKEND_CACHE_SIZE')) b._queue = Queue(b.session_cls, queue_m, settings.get('SPIDER_FEED_PARTITIONS')) return b - - @property - def queue(self): - return self._queue - - @property - def metadata(self): - return self._metadata - - @property - def states(self): - return self._states - - def frontier_start(self): - for component in [self.metadata, self.queue, self.states]: - if component: - component.frontier_start() - - def frontier_stop(self): - for component in [self.metadata, self.queue, self.states]: - if component: - component.frontier_stop() - - def add_seeds(self, seeds): - self.metadata.add_seeds(seeds) - - def get_next_requests(self, max_next_requests, **kwargs): - partitions = kwargs.pop('partitions', [0]) # TODO: Collect from all known partitions - batch = [] - for partition_id in partitions: - batch.extend(self.queue.get_next_requests(max_next_requests, partition_id, **kwargs)) - return batch - - def page_crawled(self, response): - self.metadata.page_crawled(response) - - def links_extracted(self, request, links): - self.metadata.links_extracted(request, links) - - def request_error(self, request, error): - self.metadata.request_error(request, error) - - def finished(self): - raise NotImplementedError - diff --git a/frontera/contrib/backends/sqlalchemy/components.py b/frontera/contrib/backends/sqlalchemy/components.py index 8661ac576..a3123f4f5 100644 --- a/frontera/contrib/backends/sqlalchemy/components.py +++ b/frontera/contrib/backends/sqlalchemy/components.py @@ -1,15 +1,15 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import import logging -from datetime import datetime from time import time, sleep from cachetools import LRUCache +from frontera.contrib.backends import CreateOrModifyPageMixin from frontera.contrib.backends.partitioners import Crc32NamePartitioner from frontera.contrib.backends.memory import MemoryStates from frontera.contrib.backends.sqlalchemy.models import DeclarativeBase from frontera.core.components import Metadata as BaseMetadata, Queue as BaseQueue -from frontera.core.models import Request, Response +from frontera.core.models import Request from frontera.utils.misc import get_crc32, chunks from frontera.utils.url import parse_domain_from_url_fast import six @@ -36,7 +36,7 @@ def func_wrapper(self, *args, **kwargs): return func_wrapper -class Metadata(BaseMetadata): +class Metadata(BaseMetadata, CreateOrModifyPageMixin): def __init__(self, session_cls, model_cls, cache_size): self.session = session_cls(expire_on_commit=False) # FIXME: Should be explicitly mentioned in docs self.model = model_cls @@ -73,35 +73,6 @@ def links_extracted(self, request, links): self.cache[link.meta[b'fingerprint']] = self.session.merge(self._create_page(link)) self.session.commit() - def _modify_page(self, obj): - db_page = self.cache[obj.meta[b'fingerprint']] - db_page.fetched_at = datetime.utcnow() - if isinstance(obj, Response): - db_page.headers = obj.request.headers - db_page.method = to_native_str(obj.request.method) - db_page.cookies = obj.request.cookies - db_page.status_code = obj.status_code - return db_page - - def _create_page(self, obj): - db_page = self.model() - db_page.fingerprint = to_native_str(obj.meta[b'fingerprint']) - db_page.url = obj.url - db_page.created_at = datetime.utcnow() - db_page.meta = obj.meta - db_page.depth = 0 - - if isinstance(obj, Request): - db_page.headers = obj.headers - db_page.method = to_native_str(obj.method) - db_page.cookies = obj.cookies - elif isinstance(obj, Response): - db_page.headers = obj.request.headers - db_page.method = to_native_str(obj.request.method) - db_page.cookies = obj.request.cookies - db_page.status_code = obj.status_code - return db_page - @retry_and_rollback def update_score(self, batch): for fprint, score, request, schedule in batch: diff --git a/frontera/contrib/backends/sqlalchemy/revisiting.py b/frontera/contrib/backends/sqlalchemy/revisiting.py index b2b574715..ccbe056bf 100644 --- a/frontera/contrib/backends/sqlalchemy/revisiting.py +++ b/frontera/contrib/backends/sqlalchemy/revisiting.py @@ -1,27 +1,22 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import import logging -from datetime import datetime, timedelta +from datetime import timedelta from time import time, sleep -from calendar import timegm from sqlalchemy import Column, BigInteger from frontera import Request +from frontera.contrib.backends import CommonRevisitingStorageBackendMixin from frontera.contrib.backends.partitioners import Crc32NamePartitioner from frontera.contrib.backends.sqlalchemy import SQLAlchemyBackend from frontera.contrib.backends.sqlalchemy.models import QueueModelMixin, DeclarativeBase from frontera.core.components import Queue as BaseQueue, States -from frontera.utils.misc import get_crc32 +from frontera.utils.misc import get_crc32, utcnow_timestamp from frontera.utils.url import parse_domain_from_url_fast from six.moves import range -def utcnow_timestamp(): - d = datetime.utcnow() - return timegm(d.timetuple()) - - class RevisitingQueueModel(QueueModelMixin, DeclarativeBase): __tablename__ = 'revisiting_queue' @@ -103,30 +98,10 @@ def count(self): return self.session.query(self.queue_model).count() -class Backend(SQLAlchemyBackend): +class Backend(CommonRevisitingStorageBackendMixin, SQLAlchemyBackend): def _create_queue(self, settings): self.interval = settings.get("SQLALCHEMYBACKEND_REVISIT_INTERVAL") assert isinstance(self.interval, timedelta) self.interval = self.interval.total_seconds() return RevisitingQueue(self.session_cls, RevisitingQueueModel, settings.get('SPIDER_FEED_PARTITIONS')) - - def _schedule(self, requests): - batch = [] - for request in requests: - if request.meta[b'state'] in [States.NOT_CRAWLED]: - request.meta[b'crawl_at'] = utcnow_timestamp() - elif request.meta[b'state'] in [States.CRAWLED, States.ERROR]: - request.meta[b'crawl_at'] = utcnow_timestamp() + self.interval - else: - continue # QUEUED - batch.append((request.meta[b'fingerprint'], self._get_score(request), request, True)) - self.queue.schedule(batch) - self.metadata.update_score(batch) - self.queue_size += len(batch) - - def page_crawled(self, response): - super(Backend, self).page_crawled(response) - self.states.set_states(response.request) - self._schedule([response.request]) - self.states.update_cache(response.request) diff --git a/frontera/settings/default_settings.py b/frontera/settings/default_settings.py index b049e7bdc..9b6b3886a 100644 --- a/frontera/settings/default_settings.py +++ b/frontera/settings/default_settings.py @@ -7,7 +7,22 @@ BC_MIN_REQUESTS = 64 BC_MIN_HOSTS = 24 BC_MAX_REQUESTS_PER_HOST = 128 + CANONICAL_SOLVER = 'frontera.contrib.canonicalsolvers.Basic' +CASSANDRABACKEND_CACHE_SIZE = 10000 +CASSANDRABACKEND_CLUSTER_HOSTS = ['127.0.0.1'] +CASSANDRABACKEND_CLUSTER_PORT = 9042 +CASSANDRABACKEND_DROP_ALL_TABLES = False +CASSANDRABACKEND_KEYSPACE = 'crawler' +CASSANDRABACKEND_MODELS = { + 'MetadataModel': 'frontera.contrib.backends.cassandra.models.MetadataModel', + 'StateModel': 'frontera.contrib.backends.cassandra.models.StateModel', + 'QueueModel': 'frontera.contrib.backends.cassandra.models.QueueModel', + 'FifoOrLIfoQueueModel': 'frontera.contrib.backends.cassandra.models.FifoOrLIfoQueueModel', +} +CASSANDRABACKEND_REQUEST_TIMEOUT = 60 +CASSANDRABACKEND_REVISIT_INTERVAL = timedelta(days=1) + DELAY_ON_EMPTY = 5.0 DOMAIN_FINGERPRINT_FUNCTION = 'frontera.utils.fingerprint.sha1' @@ -77,4 +92,4 @@ SCORING_LOG_DBW_GROUP = "dbw-scoring-log" SPIDER_FEED_GROUP = "fetchers-spider-feed" -KAFKA_CODEC = None \ No newline at end of file +KAFKA_CODEC = None diff --git a/frontera/utils/misc.py b/frontera/utils/misc.py index 15731195f..4de40dd7c 100644 --- a/frontera/utils/misc.py +++ b/frontera/utils/misc.py @@ -1,5 +1,7 @@ from __future__ import absolute_import from importlib import import_module +from calendar import timegm +from datetime import datetime from zlib import crc32 from six.moves import range from w3lib.util import to_bytes @@ -72,4 +74,9 @@ def dict_to_unicode(obj): if isinstance(obj, list): return map(dict_to_unicode, obj) else: - return obj \ No newline at end of file + return obj + + +def utcnow_timestamp(): + d = datetime.utcnow() + return timegm(d.timetuple()) diff --git a/requirements/tests.txt b/requirements/tests.txt index 0ac170f54..015988671 100644 --- a/requirements/tests.txt +++ b/requirements/tests.txt @@ -13,3 +13,4 @@ happybase>=1.0.0 mock boto>=2.42.0 -r logging.txt +cassandra-driver==3.7.0 diff --git a/setup.py b/setup.py index 87f423e18..23f5a7698 100644 --- a/setup.py +++ b/setup.py @@ -71,6 +71,10 @@ ], 'distributed': [ 'Twisted' + ], + 'cassandra': [ + 'cassandra-driver==3.7.0', + 'cachetools' ] }, tests_require=[ @@ -85,6 +89,7 @@ "mock", "boto>=2.42.0", "colorlog>=2.4.0", - "python-json-logger>=0.1.5" + "python-json-logger>=0.1.5", + "cassandra-driver==3.7.0" ] ) diff --git a/tests/contrib/backends/cassandra/test_backend_cassandra.py b/tests/contrib/backends/cassandra/test_backend_cassandra.py new file mode 100644 index 000000000..56e9e03be --- /dev/null +++ b/tests/contrib/backends/cassandra/test_backend_cassandra.py @@ -0,0 +1,302 @@ +import unittest +import uuid +from datetime import datetime, timedelta +from time import time + +import six +from cassandra.cqlengine import connection +from cassandra.cqlengine.management import (create_keyspace_simple, + drop_keyspace, drop_table, + sync_table) + +from frontera.contrib.backends.cassandra import CassandraBackend, Distributed +from frontera.contrib.backends.cassandra.models import (FifoOrLIfoQueueModel, + MetadataModel, + QueueModel, StateModel) +from frontera.core.components import States +from frontera.core.models import Request, Response +from frontera.settings import Settings +from tests import backends +from tests.test_revisiting_backend import RevisitingBackendTest + + +r1 = Request('https://www.example.com', meta={b'fingerprint': b'10', + b'domain': {b'name': b'www.example.com', b'fingerprint': b'81'}}) +r2 = Request('http://example.com/some/page/', meta={b'fingerprint': b'11', + b'domain': {b'name': b'example.com', b'fingerprint': b'82'}}) +r3 = Request('http://www.scrapy.org', meta={b'fingerprint': b'12', + b'domain': {b'name': b'www.scrapy.org', b'fingerprint': b'83'}}) +r4 = r3.copy() + + +class CassandraConfig(object): + + def setUp(self): + settings = Settings() + self.hosts = ['127.0.0.1'] + self.port = 9042 + self.manager = type('manager', (object,), {}) + self.manager.settings = settings + self.keyspace = settings.CASSANDRABACKEND_KEYSPACE + self.timeout = settings.CASSANDRABACKEND_REQUEST_TIMEOUT + self._set_global_connection(self.hosts, self.port, self.timeout) + create_keyspace_simple(self.keyspace, 1) + connection.session.set_keyspace(self.keyspace) + + def tearDown(self): + self._set_global_connection(self.hosts, self.port, self.timeout) + drop_keyspace(self.keyspace) + + def _set_global_connection(self, hosts, port, timeout): + if not connection.cluster: + connection.setup(hosts, self.keyspace, port=port) + connection.session.default_timeout = timeout + + +class TestCassandraBackendModels(CassandraConfig, unittest.TestCase): + + def test_pickled_fields(self): + sync_table(MetadataModel) + m = MetadataModel(fingerprint='fingerprint', + url='http://example.com', + depth=0, + created_at=datetime.now()) + meta = {b'fingerprint': b'10', + b'scrapy_meta': {'non_binary': 'www.example.com', + 'number': 81, + 'list': ['str', b'bytes', u'unicode']} + } + m.meta = meta + m.save() + stored_meta = m.get(fingerprint='fingerprint').meta + self.assertDictEqual(meta, stored_meta) + + def test_metadata_model(self): + fields = { + 'fingerprint': 'fingerprint', + 'url': 'http://example.com', + 'depth': 0, + 'created_at': datetime.now(), + 'fetched_at': datetime.now(), + 'status_code': 400, + 'score': 0.9, + 'error': 'Bad Request', + 'meta': {'meta': 'meta'}, + 'headers': {'headers': 'headers'}, + 'cookies': {'cookies': 'cookies'}, + 'method': 'GET', + } + self.assert_db_values(MetadataModel, {'fingerprint': fields['fingerprint']}, fields) + + def test_state_model(self): + fields = { + 'fingerprint': 'fingerprint', + 'state': 1 + } + self.assert_db_values(StateModel, {'fingerprint': fields['fingerprint']}, fields) + + def test_queue_model(self): + fields = { + 'id': uuid.uuid4(), + 'partition_id': 0, + 'score': 0.8, + 'url': 'http://example.com', + 'fingerprint': 'fingerprint', + 'host_crc32': 1234, + 'meta': {'meta': 'meta'}, + 'headers': {'headers': 'headers'}, + 'cookies': {'cookies': 'cookies'}, + 'method': 'GET', + 'created_at': int(time()*1E+6), + 'depth': 0, + } + for model in [FifoOrLIfoQueueModel, QueueModel]: + self.assert_db_values(model, {'id': fields['id']}, fields) + drop_table(model) + + def assert_db_values(self, model, _filter, fields): + sync_table(model) + m = model(**fields) + m.save() + stored_obj = m.objects.allow_filtering().get(**_filter) + for field, original_value in six.iteritems(fields): + stored_value = getattr(stored_obj, field) + if isinstance(original_value, dict): + self.assertDictEqual(stored_value, original_value) + elif isinstance(original_value, datetime): + self.assertEqual(stored_value.ctime(), original_value.ctime()) + elif isinstance(original_value, float): + self.assertAlmostEquals(stored_value, original_value) + else: + self.assertEqual(stored_value, original_value) + + +class TestCassandraBackend(CassandraConfig, unittest.TestCase): + + def init_backend(self): + self.backend = CassandraBackend(self.manager) + + @property + def metadata(self): + self.init_backend() + return self.backend.metadata + + @property + def states(self): + self.init_backend() + return self.backend.states + + @property + def queue(self): + self.init_backend() + return self.backend.queue + + def _get_tables(self): + query = 'SELECT table_name FROM system_schema.tables WHERE keyspace_name = \'{}\''.format(self.keyspace) + result = connection.execute(query) + return [row['table_name'] for row in result.current_rows] + + def test_tables_created(self): + tables_before = self._get_tables() + self.assertEqual(tables_before, []) + self.init_backend() + tables_after = self._get_tables() + self.assertEqual(set(tables_after), set(['metadata', 'states', 'queue'])) + + def test_tables_droped_and_created(self): + def _get_state_data(): + return StateModel.all() + + models = [MetadataModel, StateModel, QueueModel] + for model in models: + sync_table(model) + tables_before = self._get_tables() + self.assertEqual(set(tables_before), set(['metadata', 'states', 'queue'])) + StateModel.create(fingerprint='fingerprint', state=200) + rows_before = _get_state_data() + self.assertEqual(rows_before.count(), 1) + self.manager.settings.CASSANDRABACKEND_DROP_ALL_TABLES = True + self.init_backend() + tables_after = self._get_tables() + self.assertEqual(set(tables_after), set(['metadata', 'states', 'queue'])) + rows_after = _get_state_data() + self.assertEqual(rows_after.count(), 0) + + def test_metadata(self): + metadata = self.metadata + metadata.add_seeds([r1, r2, r3]) + meta_qs = MetadataModel.objects.all() + self.assertEqual(set([r1.url, r2.url, r3.url]), set([m.url for m in meta_qs])) + resp = Response('https://www.example.com', request=r1) + metadata.page_crawled(resp) + stored_response = meta_qs.get(fingerprint='10') + self.assertEqual(stored_response.status_code, 200) + metadata.request_error(r3, 'error') + stored_error = meta_qs.get(fingerprint='12') + self.assertEqual(stored_error.error, 'error') + batch = {r2.meta[b'fingerprint']: [0.8, r2.url, False]} + metadata.update_score(batch) + stored_score = meta_qs.get(fingerprint='11') + self.assertAlmostEquals(stored_score.score, 0.8) + self.assertEqual(meta_qs.count(), 3) + + def test_state(self): + state = self.states + state.set_states([r1, r2, r3]) + self.assertEqual([r.meta[b'state'] for r in [r1, r2, r3]], [States.NOT_CRAWLED] * 3) + state.update_cache([r1, r2, r3]) + self.assertDictEqual(state._cache, {b'10': States.NOT_CRAWLED, + b'11': States.NOT_CRAWLED, + b'12': States.NOT_CRAWLED}) + r1.meta[b'state'] = States.CRAWLED + r2.meta[b'state'] = States.CRAWLED + r3.meta[b'state'] = States.CRAWLED + state.update_cache([r1, r2, r3]) + state.flush(True) + self.assertDictEqual(state._cache, {}) + state.fetch([b'10', b'11', b'12']) + self.assertDictEqual(state._cache, {b'10': States.CRAWLED, + b'11': States.CRAWLED, + b'12': States.CRAWLED}) + r4.meta[b'state'] = States.ERROR + state.set_states([r1, r2, r4]) + self.assertEqual(r4.meta[b'state'], States.CRAWLED) + state.flush(True) + self.assertEqual(state._cache, {}) + + def test_queue(self): + self.manager.settings.SPIDER_FEED_PARTITIONS = 2 + queue = self.queue + batch = [('10', 0.5, r1, True), ('11', 0.6, r2, True), + ('12', 0.7, r3, True)] + queue.schedule(batch) + self.assertEqual(queue.count(), 3) + self.assertEqual(set([r.url for r in queue.get_next_requests(10, 0, + min_requests=3, + min_hosts=1, + max_requests_per_host=10)]), + set([r3.url])) + self.assertEqual(set([r.url for r in queue.get_next_requests(10, 1, + min_requests=3, + min_hosts=1, + max_requests_per_host=10)]), + set([r1.url, r2.url])) + self.assertEqual(queue.count(), 0) + + +class TestCassandraDistributedBackend(TestCassandraBackend): + + def init_backend(self): + self.backend = Distributed(self.manager) + self.strategy_worker = self.backend.strategy_worker(self.manager) + self.db_worker = self.backend.db_worker(self.manager) + + @property + def metadata(self): + self.init_backend() + return self.db_worker.metadata + + @property + def states(self): + self.init_backend() + return self.strategy_worker.states + + @property + def queue(self): + self.init_backend() + return self.db_worker.queue + + +class BaseCassandraIntegrationTests(object): + obj = CassandraConfig() + + def setup_backend(self, method): + self.obj.setUp() + + def teardown_backend(self, method): + self.obj.tearDown() + + +class TestCassandraFIFOBackend(BaseCassandraIntegrationTests, backends.FIFOBackendTest): + backend_class = 'frontera.contrib.backends.cassandra.FIFO' + + +class TestCassandraLIFOBackend(BaseCassandraIntegrationTests, backends.LIFOBackendTest): + backend_class = 'frontera.contrib.backends.cassandra.LIFO' + + +class TestCassandraDFSBackend(BaseCassandraIntegrationTests, backends.DFSBackendTest): + backend_class = 'frontera.contrib.backends.cassandra.DFS' + + +class TestCassandraBFSBackend(BaseCassandraIntegrationTests, backends.BFSBackendTest): + backend_class = 'frontera.contrib.backends.cassandra.BFS' + + +class TestCassandraRevisiting(BaseCassandraIntegrationTests, RevisitingBackendTest): + backend_class = 'frontera.contrib.backends.cassandra.revisiting.Backend' + + def get_settings(self): + settings = super(TestCassandraRevisiting, self).get_settings() + settings.CASSANDRABACKEND_REVISIT_INTERVAL = timedelta(seconds=0) + return settings diff --git a/tests/contrib/backends/cassandra/wait_for_cluster_up.py b/tests/contrib/backends/cassandra/wait_for_cluster_up.py new file mode 100644 index 000000000..f63fe4eb2 --- /dev/null +++ b/tests/contrib/backends/cassandra/wait_for_cluster_up.py @@ -0,0 +1,30 @@ +from time import time + +from cassandra.cluster import Cluster, NoHostAvailable + +_cluster_ips = ['127.0.0.1'] +_port = 9042 +_timeout = 36000 # 10 minutes + + +def _is_cluster_up(): + cluster = Cluster(_cluster_ips, _port) + try: + cluster.connect() + return True + except NoHostAvailable: + return False + + +def _wait_for_cluster_to_start(): + print('waiting for cassandra cluster to setup...') + start = time() + while not _is_cluster_up(): + time_taken = time() - start + if time_taken > _timeout: + raise TimeoutError('Cassandra node could not start within the timeout.') + time_taken = time() - start + print('cassandra cluster is up! Waited for %s seconds.' % int(time_taken)) + +if __name__ == '__main__': + _wait_for_cluster_to_start() diff --git a/tests/test_utils_misc.py b/tests/test_utils_misc.py index af6f6d992..a217fffe9 100644 --- a/tests/test_utils_misc.py +++ b/tests/test_utils_misc.py @@ -1,8 +1,11 @@ from __future__ import absolute_import import hashlib import pytest -from frontera.utils.misc import load_object, get_crc32, chunks, to_signed32 import six +from datetime import datetime + +from frontera.utils.misc import load_object, get_crc32, chunks, to_signed32, utcnow_timestamp +from tests import mock class TestGetCRC32(object): @@ -82,3 +85,13 @@ def test_name_error(self): load_object('tests.mocks.load_objects.non_existent_object') assert str(info.value) == ("Module 'tests.mocks.load_objects' doesn't define" " any object named 'non_existent_object'") + + +class TestUtcNowTimestamp(object): + + def test(self): + udt = datetime(2016, 11, 11, 0, 0, 0) + with mock.patch('frontera.utils.misc.datetime') as mocked_datetime: + mocked_datetime.utcnow = mock.Mock(return_value=udt) + utc_tstamp = utcnow_timestamp() + assert utc_tstamp == 1478822400