Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
language: python
python: 2.7
env:
- TOXENV=py27
- TOXENV=py27 KAFKA_VERSION=0.8.2.2
- TOXENV=flake8
before_install:
- frontera/tests/kafka_integration.sh
install:
- pip install -U tox wheel
- pip install -r requirements/tests.txt
Expand All @@ -12,6 +14,10 @@ before_script:
- mysql -u root -e "set global innodb_file_per_table=true;"
- frontera/tests/run_zmq_broker.sh
script: tox
cache:
directories:
- $HOME/.cache/pip
- servers/
deploy:
provider: pypi
distributions: sdist bdist_wheel
Expand Down
24 changes: 24 additions & 0 deletions docs/source/topics/frontera-settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,26 @@ KAFKA_LOCATION

Hostname and port of kafka broker, separated with :. Can be a string with hostname:port pair separated with commas(,).

.. setting:: KAFKA_COMPRESSION

KAFKA_COMPRESSION
-----------------

Default:: ``None``

Kafka's producer compression type string, see `kafka-python documentation`_ for more details.

.. setting:: KAFKA_USE_SIMPLE_CONSUMER

KAFKA_USE_SIMPLE_CONSUMER
-------------------------

Default:: ``False``

In case of ``True`` Kafka :term:`message bus` will use Simple* deprecated interfaces from ``kafka-python`` package. For
older (<0.9.0) Kafka versions this allows to enable consumer offsets auto commit, and therefore have a working flow
control in :term:`db worker`. On the other side, older versions doesn't support automatic consumer rebalancing.

.. setting:: FRONTIER_GROUP

FRONTIER_GROUP
Expand Down Expand Up @@ -614,6 +634,8 @@ A group used by strategy workers for spider log reading. Needs to be different t
SCORING_TOPIC
-------------

Default: ``frontier-score``

Kafka topic used for :term:`scoring log` stream.


Expand All @@ -623,3 +645,5 @@ Default settings
If no settings are specified, frontier will use the built-in default ones. For a complete list of default values see:
:ref:`Built-in settings reference <frontier-built-in-frontier-settings>`. All default settings can be overridden.


.. _`kafka-python documentation`: http://kafka-python.readthedocs.io/en/1.1.1/apidoc/KafkaProducer.html
8 changes: 7 additions & 1 deletion frontera/contrib/backends/partitioners.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ def partition_by_hash(self, value, partitions):
idx = value % size
return partitions[idx]

def __call__(self, key, all_partitions, available):
return self.partition(key, all_partitions)


class FingerprintPartitioner(Partitioner):
def partition(self, key, partitions=None):
Expand All @@ -26,4 +29,7 @@ def partition(self, key, partitions=None):
digest = unhexlify(key[0:2] + key[5:7] + key[10:12] + key[15:17])
value = unpack("<I", digest)
idx = value[0] % len(partitions)
return partitions[idx]
return partitions[idx]

def __call__(self, key, all_partitions, available):
return self.partition(key, all_partitions)
226 changes: 0 additions & 226 deletions frontera/contrib/backends/remote/kafka.py

This file was deleted.

15 changes: 9 additions & 6 deletions frontera/contrib/messagebus/kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
from collections import namedtuple
from logging import getLogger

from kafka.common import OffsetRequest, check_error, OffsetFetchRequest, UnknownTopicOrPartitionError
from kafka import KafkaClient
from kafka.common import OffsetRequestPayload, check_error, OffsetFetchRequestPayload, UnknownTopicOrPartitionError

logger = getLogger("offset-fetcher")
OffsetsStruct = namedtuple("OffsetsStruct", ["commit", "produced"])


class OffsetsFetcher(object):
def __init__(self, client, topic, group_id):
self._client = client
def __init__(self, location, topic, group_id):
self._client = KafkaClient(location)
self._topic = topic
self._group_id = group_id
self._client.load_metadata_for_topics()
Expand All @@ -29,7 +30,7 @@ def _update_produced_offsets(self):
the earliest offset will always return you a single element.
"""
for partition in self._client.get_partition_ids_for_topic(self._topic):
reqs = [OffsetRequest(self._topic, partition, -1, 1)]
reqs = [OffsetRequestPayload(self._topic, partition, -1, 1)]

(resp,) = self._client.send_offset_request(reqs)

Expand All @@ -43,11 +44,12 @@ def _update_group_offsets(self):
for partition in self._client.get_partition_ids_for_topic(self._topic):
(resp,) = self._client.send_offset_fetch_request(
self._group_id,
[OffsetFetchRequest(self._topic, partition)],
[OffsetFetchRequestPayload(self._topic, partition)],
fail_on_error=False)
try:
check_error(resp)
except UnknownTopicOrPartitionError:
except Exception, exc:
logger.error(exc)
pass

if resp.offset == -1:
Expand All @@ -66,5 +68,6 @@ def get(self):
for partition in self._client.get_partition_ids_for_topic(self._topic):
produced = self._offsets.produced[partition]
lag = produced - self._offsets.commit[partition] if self._offsets.commit[partition] else 0.0
logger.debug("%s (%s): %s, %s, %s", self._topic, partition, produced, self._offsets.commit[partition], lag)
lags[partition] = lag
return lags
Loading