From b66e709cefb2767552b81bb17c8ffc88a820e59d Mon Sep 17 00:00:00 2001 From: Alexander Sibiryakov Date: Fri, 22 Apr 2016 16:09:08 +0200 Subject: [PATCH 01/14] testing both message buses --- frontera/contrib/messagebus/kafkabus.py | 2 ++ frontera/tests/test_message_bus.py | 32 +++++++++++++++++++++---- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/frontera/contrib/messagebus/kafkabus.py b/frontera/contrib/messagebus/kafkabus.py index f31a788e7..fce31f7a0 100644 --- a/frontera/contrib/messagebus/kafkabus.py +++ b/frontera/contrib/messagebus/kafkabus.py @@ -38,6 +38,7 @@ def _connect_consumer(self): partitions=self._partition_ids, buffer_size=1048576, max_buffer_size=10485760) + self._cons.seek(0, 2) except BrokerResponseError: self._cons = None logger.warning("Could not connect consumer to Kafka server") @@ -131,6 +132,7 @@ def send(self, key, *messages): def flush(self): if self._prod is not None: self._prod.stop() + self._connect_producer() def get_offset(self, partition_id): # Kafka has it's own offset management diff --git a/frontera/tests/test_message_bus.py b/frontera/tests/test_message_bus.py index 2428e46d8..39ddde787 100644 --- a/frontera/tests/test_message_bus.py +++ b/frontera/tests/test_message_bus.py @@ -1,16 +1,18 @@ # -*- coding: utf-8 -*- from frontera.settings import Settings -from frontera.contrib.messagebus.zeromq import MessageBus +from frontera.contrib.messagebus.zeromq import MessageBus as ZeroMQMessageBus +from frontera.contrib.messagebus.kafkabus import MessageBus as KafkaMessageBus from frontera.utils.fingerprint import sha1 from random import randint from time import sleep +import logging class MessageBusTester(object): - def __init__(self, settings=Settings()): + def __init__(self, cls, settings=Settings()): settings.set('SPIDER_FEED_PARTITIONS', 1) settings.set('QUEUE_HOSTNAME_PARTITIONING', True) - self.messagebus = MessageBus(settings) + self.messagebus = cls(settings) spiderlog = self.messagebus.spider_log() # sw @@ -50,10 +52,15 @@ def spider_feed_activity(self): def sw_activity(self): c = 0 + p = 0 for m in self.sw_sl_c.get_messages(timeout=1.0, count=512): + + if m.startswith('http://helloworld.com/'): + p += 1 self.sw_us_p.send(None, 'message' + str(0) + "," + str(c)) c += 1 + assert p > 0 return c def db_activity(self, messages): @@ -90,9 +97,26 @@ def test_zmq_message_bus(): """ Test MessageBus with default settings, IPv6 and Star as ZMQ_ADDRESS """ - tester = MessageBusTester() + tester = MessageBusTester(ZeroMQMessageBus) tester.spider_log_activity(64) assert tester.sw_activity() == 64 assert tester.db_activity(128) == (64, 32) assert tester.spider_feed_activity() == 128 + + +def test_kafka_message_bus(): + """ + Test MessageBus with default settings, IPv6 and Star as ZMQ_ADDRESS + """ + logging.basicConfig(level=logging.INFO) + kafkabus = logging.getLogger("kafkabus") + kafkabus.addHandler(logging.StreamHandler()) + settings = Settings() + settings.set('KAFKA_LOCATION', 'localhost:9092') + settings.set('FRONTIER_GROUP', 'frontier2') + tester = MessageBusTester(KafkaMessageBus, settings) + tester.spider_log_activity(64) + assert tester.sw_activity() == 64 + assert tester.db_activity(128) == (64, 32) + assert tester.spider_feed_activity() == 128 From 23c1b0bbb4488f5e5bcc5cf67981cef8eb154b59 Mon Sep 17 00:00:00 2001 From: Alexander Sibiryakov Date: Thu, 28 Apr 2016 13:33:46 +0200 Subject: [PATCH 02/14] Kafka codec setting, moving of default values to default_settings --- docs/source/topics/frontera-settings.rst | 12 +++++++++ frontera/contrib/messagebus/kafkabus.py | 31 +++++++++++++++--------- frontera/settings/default_settings.py | 14 ++++++++++- 3 files changed, 44 insertions(+), 13 deletions(-) diff --git a/docs/source/topics/frontera-settings.rst b/docs/source/topics/frontera-settings.rst index e2e799a94..85b52bc2b 100644 --- a/docs/source/topics/frontera-settings.rst +++ b/docs/source/topics/frontera-settings.rst @@ -570,6 +570,16 @@ KAFKA_LOCATION Hostname and port of kafka broker, separated with :. Can be a string with hostname:port pair separated with commas(,). +.. setting:: KAFKA_CODEC + +KAFKA_CODEC +___________ + +Default:: ``CODEC_NONE`` + +Kafka protocol compression codec, see kafka-python documentation for more details. Please use symbols from kafka-python +package. + .. setting:: FRONTIER_GROUP FRONTIER_GROUP @@ -614,6 +624,8 @@ A group used by strategy workers for spider log reading. Needs to be different t SCORING_TOPIC ------------- +Default: ``frontier-score`` + Kafka topic used for :term:`scoring log` stream. diff --git a/frontera/contrib/messagebus/kafkabus.py b/frontera/contrib/messagebus/kafkabus.py index fce31f7a0..126e22682 100644 --- a/frontera/contrib/messagebus/kafkabus.py +++ b/frontera/contrib/messagebus/kafkabus.py @@ -5,7 +5,7 @@ from kafka import KafkaClient, SimpleConsumer, KeyedProducer as KafkaKeyedProducer, SimpleProducer as KafkaSimpleProducer from kafka.common import BrokerResponseError, MessageSizeTooLargeError -from kafka.protocol import CODEC_SNAPPY +from kafka.protocol import CODEC_NONE from frontera.contrib.backends.partitioners import FingerprintPartitioner, Crc32NamePartitioner from frontera.contrib.messagebus.kafka import OffsetsFetcher @@ -70,13 +70,14 @@ def get_offset(self): class SimpleProducer(BaseStreamProducer): - def __init__(self, connection, topic): + def __init__(self, connection, topic, codec): self._connection = connection self._topic = topic + self._codec = codec self._create() def _create(self): - self._producer = KafkaSimpleProducer(self._connection, codec=CODEC_SNAPPY) + self._producer = KafkaSimpleProducer(self._connection, codec=self._codec) def send(self, key, *messages): self._producer.send_messages(self._topic, *messages) @@ -92,16 +93,17 @@ def get_offset(self, partition_id): class KeyedProducer(BaseStreamProducer): - def __init__(self, connection, topic_done, partitioner_cls): + def __init__(self, connection, topic_done, partitioner_cls, codec): self._prod = None self._conn = connection self._topic_done = topic_done self._partitioner_cls = partitioner_cls + self._codec = codec def _connect_producer(self): if self._prod is None: try: - self._prod = KafkaKeyedProducer(self._conn, partitioner=self._partitioner_cls, codec=CODEC_SNAPPY) + self._prod = KafkaKeyedProducer(self._conn, partitioner=self._partitioner_cls, codec=self._codec) except BrokerResponseError: self._prod = None logger.warning("Could not connect producer to Kafka server") @@ -145,9 +147,10 @@ def __init__(self, messagebus): self._db_group = messagebus.general_group self._sw_group = messagebus.sw_group self._topic_done = messagebus.topic_done + self._codec = messagebus.codec def producer(self): - return KeyedProducer(self._conn, self._topic_done, FingerprintPartitioner) + return KeyedProducer(self._conn, self._topic_done, FingerprintPartitioner, self._codec) def consumer(self, partition_id, type): """ @@ -168,6 +171,7 @@ def __init__(self, messagebus): self._max_next_requests = messagebus.max_next_requests self._hostname_partitioning = messagebus.hostname_partitioning self._offset_fetcher = OffsetsFetcher(self._conn, self._topic, self._general_group) + self._codec = messagebus.codec def consumer(self, partition_id): return Consumer(self._conn, self._topic, self._general_group, partition_id) @@ -182,7 +186,7 @@ def available_partitions(self): def producer(self): partitioner = Crc32NamePartitioner if self._hostname_partitioning else FingerprintPartitioner - return KeyedProducer(self._conn, self._topic, partitioner) + return KeyedProducer(self._conn, self._topic, partitioner, self._codec) class ScoringLogStream(BaseScoringLogStream): @@ -190,25 +194,28 @@ def __init__(self, messagebus): self._topic = messagebus.topic_scoring self._group = messagebus.general_group self._conn = messagebus.conn + self._codec = messagebus.codec def consumer(self): return Consumer(self._conn, self._topic, self._group, partition_id=None) def producer(self): - return SimpleProducer(self._conn, self._topic) + return SimpleProducer(self._conn, self._topic, self._codec) class MessageBus(BaseMessageBus): def __init__(self, settings): server = settings.get('KAFKA_LOCATION') - self.topic_todo = settings.get('OUTGOING_TOPIC', "frontier-todo") - self.topic_done = settings.get('INCOMING_TOPIC', "frontier-done") + self.topic_todo = settings.get('OUTGOING_TOPIC') + self.topic_done = settings.get('INCOMING_TOPIC') self.topic_scoring = settings.get('SCORING_TOPIC') - self.general_group = settings.get('FRONTIER_GROUP', "general") - self.sw_group = settings.get('SCORING_GROUP', "strategy-workers") + self.general_group = settings.get('FRONTIER_GROUP') + self.sw_group = settings.get('SCORING_GROUP') self.spider_partition_id = settings.get('SPIDER_PARTITION_ID') self.max_next_requests = settings.MAX_NEXT_REQUESTS self.hostname_partitioning = settings.get('QUEUE_HOSTNAME_PARTITIONING') + codec = settings.get('KAFKA_CODEC') + self.codec = codec if codec else CODEC_NONE self.conn = KafkaClient(server) diff --git a/frontera/settings/default_settings.py b/frontera/settings/default_settings.py index 88370b23d..034bd642a 100644 --- a/frontera/settings/default_settings.py +++ b/frontera/settings/default_settings.py @@ -7,6 +7,7 @@ CONSUMER_BATCH_SIZE = 512 DELAY_ON_EMPTY = 5.0 DOMAIN_FINGERPRINT_FUNCTION = 'frontera.utils.fingerprint.sha1' +EVENT_LOG_MANAGER = 'frontera.logger.events.EventLogManager' HBASE_THRIFT_HOST = 'localhost' HBASE_THRIFT_PORT = 9090 @@ -18,7 +19,9 @@ HBASE_BATCH_SIZE = 9216 HBASE_STATE_CACHE_SIZE_LIMIT = 3000000 HBASE_QUEUE_TABLE = 'queue' +KAFKA_CODEC = None KAFKA_GET_TIMEOUT = 5.0 +LOGGING_CONFIG = 'logging.conf' MAX_NEXT_REQUESTS = 64 MAX_REQUESTS = 0 MESSAGE_BUS = 'frontera.contrib.messagebus.zeromq.MessageBus' @@ -53,4 +56,13 @@ ZMQ_ADDRESS = '127.0.0.1' ZMQ_BASE_PORT = 5550 -LOGGING_CONFIG = 'logging.conf' \ No newline at end of file + +#-------------------------------------------------------- +# Kafka +#-------------------------------------------------------- + +OUTGOING_TOPIC = "frontier-todo" +INCOMING_TOPIC = "frontier-done" +SCORING_TOPIC = "frontier-score" +FRONTIER_GROUP = "general" +SCORING_GROUP = "strategy-workers" \ No newline at end of file From 47c320a4631c2f527b5f2982f431a47a005b5ac7 Mon Sep 17 00:00:00 2001 From: Alexander Sibiryakov Date: Thu, 28 Apr 2016 15:08:45 +0200 Subject: [PATCH 03/14] removed obsolete KafkaBackend --- frontera/contrib/backends/remote/kafka.py | 226 ---------------------- 1 file changed, 226 deletions(-) delete mode 100644 frontera/contrib/backends/remote/kafka.py diff --git a/frontera/contrib/backends/remote/kafka.py b/frontera/contrib/backends/remote/kafka.py deleted file mode 100644 index b1027ad23..000000000 --- a/frontera/contrib/backends/remote/kafka.py +++ /dev/null @@ -1,226 +0,0 @@ -from __future__ import absolute_import -import time -from logging import getLogger, StreamHandler - -from kafka import KafkaClient, SimpleConsumer, KeyedProducer -from kafka.common import BrokerResponseError, OffsetOutOfRangeError, MessageSizeTooLargeError -from kafka.protocol import CODEC_SNAPPY -from frontera.core import OverusedBuffer - -from frontera.contrib.backends.remote.codecs.msgpack import Encoder, Decoder -from frontera import Backend, Settings -from frontera.contrib.backends.partitioners import FingerprintPartitioner - - -class TestManager(object): - class Nothing(object): - pass - - def __init__(self): - def log(msg): - print "Test Manager: ", msg - - self.logger = TestManager.Nothing() - self.settings = Settings() - self.logger.backend = TestManager.Nothing() - for log_level in ( - 'info' - 'debug', - 'warning', - 'error'): - setattr(self.logger.backend, log_level, log) - - -class KafkaBackend(Backend): - def __init__(self, manager): - self._manager = manager - settings = manager.settings - - # Kafka connection parameters - self._server = settings.get('KAFKA_LOCATION') - self._topic_todo = settings.get('OUTGOING_TOPIC', "frontier-todo") - self._topic_done = settings.get('INCOMING_TOPIC', "frontier-done") - self._group = settings.get('FRONTIER_GROUP', "scrapy-crawler") - self._get_timeout = float(settings.get('KAFKA_GET_TIMEOUT', 5.0)) - self._partition_id = settings.get('SPIDER_PARTITION_ID') - - # Kafka setup - self._conn = KafkaClient(self._server) - self._prod = None - self._cons = None - - logger = getLogger("kafka") - handler = StreamHandler() - logger.addHandler(handler) - - self._connect_consumer() - self._connect_producer() - - store_content = settings.get('STORE_CONTENT') - self._encoder = Encoder(manager.request_model, send_body=store_content) - self._decoder = Decoder(manager.request_model, manager.response_model) - - def _connect_producer(self): - """If producer is not connected try to connect it now. - - :returns: bool -- True if producer is connected - """ - if self._prod is None: - try: - self._prod = KeyedProducer(self._conn, partitioner=FingerprintPartitioner, codec=CODEC_SNAPPY) - except BrokerResponseError: - self._prod = None - if self._manager is not None: - self._manager.logger.backend.warning( - "Could not connect producer to Kafka server") - return False - - return True - - def _connect_consumer(self): - """If consumer is not connected try to connect it now. - - :returns: bool -- True if consumer is connected - """ - if self._cons is None: - try: - self._cons = SimpleConsumer( - self._conn, - self._group, - self._topic_todo, - partitions=[self._partition_id], - buffer_size=131072, - max_buffer_size=1048576) - except BrokerResponseError: - self._cons = None - if self._manager is not None: - self._manager.logger.backend.warning( - "Could not connect consumer to Kafka server") - return False - - return True - - @classmethod - def from_manager(clas, manager): - return clas(manager) - - def frontier_start(self): - if self._connect_consumer(): - self._manager.logger.backend.info( - "Successfully connected consumer to " + self._topic_todo) - else: - self._manager.logger.backend.warning( - "Could not connect consumer to {0}. I will try latter.".format( - self._topic_todo)) - - def frontier_stop(self): - # flush everything if a batch is incomplete - self._prod.stop() - - def _send_message(self, encoded_message, key, fail_wait_time=1.0, max_tries=5): - start = time.clock() - success = False - if self._connect_producer(): - n_tries = 0 - while not success and n_tries < max_tries: - try: - self._prod.send_messages(self._topic_done, key, encoded_message) - success = True - except MessageSizeTooLargeError, e: - self._manager.logger.backend.error(str(e)) - self._manager.logger.backend.debug("Message: %s" % encoded_message) - break - except BrokerResponseError: - n_tries += 1 - if self._manager is not None: - self._manager.logger.backend.warning( - "Could not send message. Try {0}/{1}".format( - n_tries, max_tries) - ) - - time.sleep(fail_wait_time) - - self._manager.logger.backend.debug("_send_message: {0}".format(time.clock() - start)) - return success - - def add_seeds(self, seeds): - self._send_message(self._encoder.encode_add_seeds(seeds), seeds[0].meta['fingerprint']) - - def page_crawled(self, response, links): - self._send_message(self._encoder.encode_page_crawled(response, links), response.meta['fingerprint']) - - def request_error(self, page, error): - self._send_message(self._encoder.encode_request_error(page, error), page.meta['fingerprint']) - - def get_next_requests(self, max_n_requests, **kwargs): - start = time.clock() - requests = [] - - if not self._connect_consumer(): - return [] - - while True: - try: - success = False - for offmsg in self._cons.get_messages( - max_n_requests, - timeout=self._get_timeout): - success = True - try: - request = self._decoder.decode_request(offmsg.message.value) - requests.append(request) - except ValueError: - self._manager.logger.backend.warning( - "Could not decode {0} message: {1}".format( - self._topic_todo, - offmsg.message.value)) - - if not success: - self._manager.logger.backend.warning( - "Timeout ({0} seconds) while trying to get {1} requests".format( - self._get_timeout, - max_n_requests) - ) - break - except OffsetOutOfRangeError, err: - self._manager.logger.backend.warning( - "%s" % (err)) - - # https://github.com/mumrah/kafka-python/issues/263 - self._cons.seek(0, 2) # moving to the tail of the log - continue - - except Exception, err: - self._manager.logger.backend.warning( - "Error %s" % (err)) - break - - self._manager.logger.backend.debug("get_next_requests: {0}".format(time.clock() - start)) - return requests - - def finished(self): - return False - - @property - def metadata(self): - return None - - @property - def states(self): - return None - - @property - def queue(self): - return None - - -class KafkaOverusedBackend(KafkaBackend): - component_name = 'Kafka Backend taking into account overused slots' - - def __init__(self, manager): - super(KafkaOverusedBackend, self).__init__(manager) - self._buffer = OverusedBuffer(super(KafkaOverusedBackend, self).get_next_requests, - manager.logger.manager.debug) - - def get_next_requests(self, max_n_requests, **kwargs): - return self._buffer.get_next_requests(max_n_requests, **kwargs) From 87884108eed6e21338fccb6caf880d1eeba5b11d Mon Sep 17 00:00:00 2001 From: Alexander Sibiryakov Date: Thu, 28 Apr 2016 15:05:36 +0200 Subject: [PATCH 04/14] forcing kafka 1.0.x series --- requirements/tests.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/tests.txt b/requirements/tests.txt index 485f80ba0..1cd7850ef 100644 --- a/requirements/tests.txt +++ b/requirements/tests.txt @@ -8,5 +8,5 @@ SQLAlchemy>=1.0.0 cachetools pyzmq msgpack-python -kafka-python<=0.9.5 +kafka-python>=1.0.0 From f3fa22d99af55a311ead8d7d9aeac73508f10a91 Mon Sep 17 00:00:00 2001 From: Alexander Sibiryakov Date: Thu, 28 Apr 2016 14:03:20 +0200 Subject: [PATCH 05/14] kafka integration test case (from kafka-python) --- .travis.yml | 8 +- frontera/tests/kafka_integration.sh | 85 +++++ frontera/tests/kafka_utils/__init__.py | 1 + frontera/tests/kafka_utils/case.py | 66 ++++ .../0.8.1.1/resources/kafka.properties | 124 +++++++ .../0.8.1.1/resources/log4j.properties | 25 ++ .../0.8.1.1/resources/zookeeper.properties | 21 ++ .../0.8.2.1/resources/kafka.properties | 124 +++++++ .../0.8.2.1/resources/log4j.properties | 25 ++ .../0.8.2.1/resources/zookeeper.properties | 21 ++ .../0.8.2.2/resources/kafka.properties | 124 +++++++ .../0.8.2.2/resources/log4j.properties | 25 ++ .../0.8.2.2/resources/zookeeper.properties | 21 ++ frontera/tests/kafka_utils/fixtures.py | 335 ++++++++++++++++++ frontera/tests/kafka_utils/service.py | 117 ++++++ frontera/tests/test_message_bus.py | 62 +++- tox.ini | 5 +- 17 files changed, 1169 insertions(+), 20 deletions(-) create mode 100755 frontera/tests/kafka_integration.sh create mode 100644 frontera/tests/kafka_utils/__init__.py create mode 100644 frontera/tests/kafka_utils/case.py create mode 100644 frontera/tests/kafka_utils/configs/0.8.1.1/resources/kafka.properties create mode 100644 frontera/tests/kafka_utils/configs/0.8.1.1/resources/log4j.properties create mode 100644 frontera/tests/kafka_utils/configs/0.8.1.1/resources/zookeeper.properties create mode 100644 frontera/tests/kafka_utils/configs/0.8.2.1/resources/kafka.properties create mode 100644 frontera/tests/kafka_utils/configs/0.8.2.1/resources/log4j.properties create mode 100644 frontera/tests/kafka_utils/configs/0.8.2.1/resources/zookeeper.properties create mode 100644 frontera/tests/kafka_utils/configs/0.8.2.2/resources/kafka.properties create mode 100644 frontera/tests/kafka_utils/configs/0.8.2.2/resources/log4j.properties create mode 100644 frontera/tests/kafka_utils/configs/0.8.2.2/resources/zookeeper.properties create mode 100644 frontera/tests/kafka_utils/fixtures.py create mode 100644 frontera/tests/kafka_utils/service.py diff --git a/.travis.yml b/.travis.yml index 61fc76a2e..2990a0a45 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,10 @@ language: python python: 2.7 env: - - TOXENV=py27 + - TOXENV=py27 KAFKA_VERSION=0.8.2.2 - TOXENV=flake8 +before_install: + - frontera/tests/kafka_integration.sh install: - pip install -U tox wheel - pip install -r requirements/tests.txt @@ -12,6 +14,10 @@ before_script: - mysql -u root -e "set global innodb_file_per_table=true;" - frontera/tests/run_zmq_broker.sh script: tox +cache: + directories: + - $HOME/.cache/pip + - servers/ deploy: provider: pypi distributions: sdist bdist_wheel diff --git a/frontera/tests/kafka_integration.sh b/frontera/tests/kafka_integration.sh new file mode 100755 index 000000000..6b8d62e17 --- /dev/null +++ b/frontera/tests/kafka_integration.sh @@ -0,0 +1,85 @@ +#!/bin/bash -x + +# Versions available for testing via binary distributions +OFFICIAL_RELEASES="0.8.1.1 0.8.2.2 0.9.0.1" + +# Useful configuration vars, with sensible defaults +if [ -z "$SCALA_VERSION" ]; then + SCALA_VERSION=2.10 +fi + +# On travis CI, empty KAFKA_VERSION means skip integration tests +# so we dont try to get binaries +# Otherwise it means test all official releases, so we get all of them! +if [ -z "$KAFKA_VERSION" -a -z "$TRAVIS" ]; then + KAFKA_VERSION=$OFFICIAL_RELEASES +fi + +# By default look for binary releases at archive.apache.org +if [ -z "$DIST_BASE_URL" ]; then + DIST_BASE_URL="https://archive.apache.org/dist/kafka/" +fi + +# When testing against source builds, use this git repo +if [ -z "$KAFKA_SRC_GIT" ]; then + KAFKA_SRC_GIT="https://github.com/apache/kafka.git" +fi + +mkdir -p servers +pushd servers + mkdir -p dist + pushd dist + for kafka in $KAFKA_VERSION; do + if [ "$kafka" == "trunk" ]; then + if [ ! -d "$kafka" ]; then + git clone $KAFKA_SRC_GIT $kafka + fi + pushd $kafka + git pull + ./gradlew -PscalaVersion=$SCALA_VERSION -Pversion=$kafka releaseTarGz -x signArchives + popd + # Not sure how to construct the .tgz name accurately, so use a wildcard (ugh) + tar xzvf $kafka/core/build/distributions/kafka_*.tgz -C ../$kafka/ + rm $kafka/core/build/distributions/kafka_*.tgz + rm -rf ../$kafka/kafka-bin + mv ../$kafka/kafka_* ../$kafka/kafka-bin + else + echo "-------------------------------------" + echo "Checking kafka binaries for ${kafka}" + echo + # kafka 0.8.0 is only available w/ scala 2.8.0 + if [ "$kafka" == "0.8.0" ]; then + KAFKA_ARTIFACT="kafka_2.8.0-${kafka}" + else + KAFKA_ARTIFACT="kafka_${SCALA_VERSION}-${kafka}" + fi + if [ ! -f "../$kafka/kafka-bin/bin/kafka-run-class.sh" ]; then + echo "Downloading kafka ${kafka} tarball" + if hash wget 2>/dev/null; then + wget -N https://archive.apache.org/dist/kafka/$kafka/${KAFKA_ARTIFACT}.tgz || wget -N https://archive.apache.org/dist/kafka/$kafka/${KAFKA_ARTIFACT}.tar.gz + else + echo "wget not found... using curl" + if [ -f "${KAFKA_ARTIFACT}.tar.gz" ]; then + echo "Using cached artifact: ${KAFKA_ARTIFACT}.tar.gz" + else + curl -f https://archive.apache.org/dist/kafka/$kafka/${KAFKA_ARTIFACT}.tgz -o ${KAFKA_ARTIFACT}.tar.gz || curl -f https://archive.apache.org/dist/kafka/$kafka/${KAFKA_ARTIFACT}.tar.gz -o ${KAFKA_ARTIFACT}.tar.gz + fi + fi + echo + echo "Extracting kafka ${kafka} binaries" + mkdir -p ../$kafka/ + tar xzvf ${KAFKA_ARTIFACT}.t* -C ../$kafka/ + rm -rf ../$kafka/kafka-bin + mv ../$kafka/${KAFKA_ARTIFACT} ../$kafka/kafka-bin + if [ ! -f "../$kafka/kafka-bin/bin/kafka-run-class.sh" ]; then + echo "Extraction Failed ($kafka/kafka-bin/bin/kafka-run-class.sh does not exist)!" + exit 1 + fi + else + echo "$kafka is already installed in servers/$kafka/ -- skipping" + fi + fi + echo + done + popd +popd diff --git a/frontera/tests/kafka_utils/__init__.py b/frontera/tests/kafka_utils/__init__.py new file mode 100644 index 000000000..7c68785e9 --- /dev/null +++ b/frontera/tests/kafka_utils/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- \ No newline at end of file diff --git a/frontera/tests/kafka_utils/case.py b/frontera/tests/kafka_utils/case.py new file mode 100644 index 000000000..9b108db80 --- /dev/null +++ b/frontera/tests/kafka_utils/case.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +import unittest +import os +import uuid +import random +from string import ascii_letters +from kafka import SimpleClient +from kafka.structs import OffsetRequestPayload + + +def random_string(l): + return "".join(random.choice(ascii_letters) for i in xrange(l)) + + +class KafkaIntegrationTestCase(unittest.TestCase): + create_client = True + topic = None + zk = None + server = None + + def setUp(self): + super(KafkaIntegrationTestCase, self).setUp() + if not os.environ.get('KAFKA_VERSION'): + self.skipTest('Integration test requires KAFKA_VERSION') + + if not self.topic: + topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10)) + self.topic = topic + + if self.create_client: + self.client = SimpleClient('%s:%d' % (self.server.host, self.server.port)) + + self.client.ensure_topic_exists(self.topic) + + self._messages = {} + + def tearDown(self): + super(KafkaIntegrationTestCase, self).tearDown() + if not os.environ.get('KAFKA_VERSION'): + return + + if self.create_client: + self.client.close() + + def current_offset(self, topic, partition): + try: + offsets, = self.client.send_offset_request([OffsetRequestPayload(topic, partition, -1, 1)]) + except: + # XXX: We've seen some UnknownErrors here and cant debug w/o server logs + self.zk.child.dump_logs() + self.server.child.dump_logs() + raise + else: + return offsets.offsets[0] + + def msgs(self, iterable): + return [self.msg(x) for x in iterable] + + def msg(self, s): + if s not in self._messages: + self._messages[s] = '%s-%s-%s' % (s, self.id(), str(uuid.uuid4())) + + return self._messages[s].encode('utf-8') + + def key(self, k): + return k.encode('utf-8') \ No newline at end of file diff --git a/frontera/tests/kafka_utils/configs/0.8.1.1/resources/kafka.properties b/frontera/tests/kafka_utils/configs/0.8.1.1/resources/kafka.properties new file mode 100644 index 000000000..685aed15e --- /dev/null +++ b/frontera/tests/kafka_utils/configs/0.8.1.1/resources/kafka.properties @@ -0,0 +1,124 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id={broker_id} + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port={port} + +# Hostname the broker will bind to. If not set, the server will bind to all interfaces +host.name={host} + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#advertised.host.name= + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port= + +# The number of threads handling network requests +num.network.threads=2 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma seperated list of directories under which to store log files +log.dirs={tmp_dir}/data + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions={partitions} +default.replication.factor={replicas} + +## Short Replica Lag -- Drops failed brokers out of ISR +replica.lag.time.max.ms=1000 +replica.socket.timeout.ms=1000 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=536870912 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=60000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect={zk_host}:{zk_port}/{zk_chroot} + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=1000000 +# We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly +zookeeper.session.timeout.ms=500 diff --git a/frontera/tests/kafka_utils/configs/0.8.1.1/resources/log4j.properties b/frontera/tests/kafka_utils/configs/0.8.1.1/resources/log4j.properties new file mode 100644 index 000000000..b0b76aa79 --- /dev/null +++ b/frontera/tests/kafka_utils/configs/0.8.1.1/resources/log4j.properties @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, stdout, logfile + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.logfile=org.apache.log4j.FileAppender +log4j.appender.logfile.File=${kafka.logs.dir}/server.log +log4j.appender.logfile.layout=org.apache.log4j.PatternLayout +log4j.appender.logfile.layout.ConversionPattern=[%d] %p %m (%c)%n diff --git a/frontera/tests/kafka_utils/configs/0.8.1.1/resources/zookeeper.properties b/frontera/tests/kafka_utils/configs/0.8.1.1/resources/zookeeper.properties new file mode 100644 index 000000000..e3fd09742 --- /dev/null +++ b/frontera/tests/kafka_utils/configs/0.8.1.1/resources/zookeeper.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir={tmp_dir} +# the port at which the clients will connect +clientPort={port} +clientPortAddress={host} +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 diff --git a/frontera/tests/kafka_utils/configs/0.8.2.1/resources/kafka.properties b/frontera/tests/kafka_utils/configs/0.8.2.1/resources/kafka.properties new file mode 100644 index 000000000..685aed15e --- /dev/null +++ b/frontera/tests/kafka_utils/configs/0.8.2.1/resources/kafka.properties @@ -0,0 +1,124 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id={broker_id} + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port={port} + +# Hostname the broker will bind to. If not set, the server will bind to all interfaces +host.name={host} + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#advertised.host.name= + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port= + +# The number of threads handling network requests +num.network.threads=2 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma seperated list of directories under which to store log files +log.dirs={tmp_dir}/data + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions={partitions} +default.replication.factor={replicas} + +## Short Replica Lag -- Drops failed brokers out of ISR +replica.lag.time.max.ms=1000 +replica.socket.timeout.ms=1000 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=536870912 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=60000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect={zk_host}:{zk_port}/{zk_chroot} + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=1000000 +# We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly +zookeeper.session.timeout.ms=500 diff --git a/frontera/tests/kafka_utils/configs/0.8.2.1/resources/log4j.properties b/frontera/tests/kafka_utils/configs/0.8.2.1/resources/log4j.properties new file mode 100644 index 000000000..b0b76aa79 --- /dev/null +++ b/frontera/tests/kafka_utils/configs/0.8.2.1/resources/log4j.properties @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, stdout, logfile + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.logfile=org.apache.log4j.FileAppender +log4j.appender.logfile.File=${kafka.logs.dir}/server.log +log4j.appender.logfile.layout=org.apache.log4j.PatternLayout +log4j.appender.logfile.layout.ConversionPattern=[%d] %p %m (%c)%n diff --git a/frontera/tests/kafka_utils/configs/0.8.2.1/resources/zookeeper.properties b/frontera/tests/kafka_utils/configs/0.8.2.1/resources/zookeeper.properties new file mode 100644 index 000000000..e3fd09742 --- /dev/null +++ b/frontera/tests/kafka_utils/configs/0.8.2.1/resources/zookeeper.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir={tmp_dir} +# the port at which the clients will connect +clientPort={port} +clientPortAddress={host} +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 diff --git a/frontera/tests/kafka_utils/configs/0.8.2.2/resources/kafka.properties b/frontera/tests/kafka_utils/configs/0.8.2.2/resources/kafka.properties new file mode 100644 index 000000000..685aed15e --- /dev/null +++ b/frontera/tests/kafka_utils/configs/0.8.2.2/resources/kafka.properties @@ -0,0 +1,124 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id={broker_id} + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port={port} + +# Hostname the broker will bind to. If not set, the server will bind to all interfaces +host.name={host} + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#advertised.host.name= + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port= + +# The number of threads handling network requests +num.network.threads=2 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma seperated list of directories under which to store log files +log.dirs={tmp_dir}/data + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions={partitions} +default.replication.factor={replicas} + +## Short Replica Lag -- Drops failed brokers out of ISR +replica.lag.time.max.ms=1000 +replica.socket.timeout.ms=1000 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=536870912 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=60000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect={zk_host}:{zk_port}/{zk_chroot} + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=1000000 +# We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly +zookeeper.session.timeout.ms=500 diff --git a/frontera/tests/kafka_utils/configs/0.8.2.2/resources/log4j.properties b/frontera/tests/kafka_utils/configs/0.8.2.2/resources/log4j.properties new file mode 100644 index 000000000..b0b76aa79 --- /dev/null +++ b/frontera/tests/kafka_utils/configs/0.8.2.2/resources/log4j.properties @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, stdout, logfile + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.logfile=org.apache.log4j.FileAppender +log4j.appender.logfile.File=${kafka.logs.dir}/server.log +log4j.appender.logfile.layout=org.apache.log4j.PatternLayout +log4j.appender.logfile.layout.ConversionPattern=[%d] %p %m (%c)%n diff --git a/frontera/tests/kafka_utils/configs/0.8.2.2/resources/zookeeper.properties b/frontera/tests/kafka_utils/configs/0.8.2.2/resources/zookeeper.properties new file mode 100644 index 000000000..e3fd09742 --- /dev/null +++ b/frontera/tests/kafka_utils/configs/0.8.2.2/resources/zookeeper.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir={tmp_dir} +# the port at which the clients will connect +clientPort={port} +clientPortAddress={host} +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 diff --git a/frontera/tests/kafka_utils/fixtures.py b/frontera/tests/kafka_utils/fixtures.py new file mode 100644 index 000000000..1b8d73ca8 --- /dev/null +++ b/frontera/tests/kafka_utils/fixtures.py @@ -0,0 +1,335 @@ +import atexit +import logging +import os +import os.path +import shutil +import subprocess +import tempfile +import time +import uuid + +from six.moves import urllib +from six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401 + +from service import ExternalService, SpawnedService + + +log = logging.getLogger(__name__) + + +class Fixture(object): + kafka_version = os.environ.get('KAFKA_VERSION', '0.8.0') + scala_version = os.environ.get("SCALA_VERSION", '2.8.0') + project_root = os.environ.get('PROJECT_ROOT', os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + kafka_root = os.environ.get("KAFKA_ROOT", os.path.join(project_root, 'servers', kafka_version, "kafka-bin")) + ivy_root = os.environ.get('IVY_ROOT', os.path.expanduser("~/.ivy2/cache")) + + @classmethod + def download_official_distribution(cls, + kafka_version=None, + scala_version=None, + output_dir=None): + if not kafka_version: + kafka_version = cls.kafka_version + if not scala_version: + scala_version = cls.scala_version + if not output_dir: + output_dir = os.path.join(cls.project_root, 'servers', 'dist') + + distfile = 'kafka_%s-%s' % (scala_version, kafka_version,) + url_base = 'https://archive.apache.org/dist/kafka/%s/' % (kafka_version,) + output_file = os.path.join(output_dir, distfile + '.tgz') + + if os.path.isfile(output_file): + log.info("Found file already on disk: %s", output_file) + return output_file + + # New tarballs are .tgz, older ones are sometimes .tar.gz + try: + url = url_base + distfile + '.tgz' + log.info("Attempting to download %s", url) + response = urllib.request.urlopen(url) + except urllib.error.HTTPError: + log.exception("HTTP Error") + url = url_base + distfile + '.tar.gz' + log.info("Attempting to download %s", url) + response = urllib.request.urlopen(url) + + log.info("Saving distribution file to %s", output_file) + with open(output_file, 'w') as output_file_fd: + output_file_fd.write(response.read()) + + return output_file + + @classmethod + def test_resource(cls, filename): + return os.path.join(cls.project_root, "frontera", "tests", "kafka_utils", "configs", + cls.kafka_version, "resources", filename) + + @classmethod + def kafka_run_class_args(cls, *args): + result = [os.path.join(cls.kafka_root, 'bin', 'kafka-run-class.sh')] + result.extend(args) + return result + + def kafka_run_class_env(self): + env = os.environ.copy() + env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % self.test_resource("log4j.properties") + return env + + @classmethod + def render_template(cls, source_file, target_file, binding): + log.info('Rendering %s from template %s', target_file, source_file) + with open(source_file, "r") as handle: + template = handle.read() + assert len(template) > 0, 'Empty template %s' % source_file + with open(target_file, "w") as handle: + handle.write(template.format(**binding)) + handle.flush() + os.fsync(handle) + + # fsync directory for durability + # https://blog.gocept.com/2013/07/15/reliable-file-updates-with-python/ + dirfd = os.open(os.path.dirname(target_file), os.O_DIRECTORY) + os.fsync(dirfd) + os.close(dirfd) + + +class ZookeeperFixture(Fixture): + @classmethod + def instance(cls): + if "ZOOKEEPER_URI" in os.environ: + parse = urlparse(os.environ["ZOOKEEPER_URI"]) + (host, port) = (parse.hostname, parse.port) + fixture = ExternalService(host, port) + else: + (host, port) = ("127.0.0.1", 2181) + fixture = cls(host, port) + + fixture.open() + return fixture + + def __init__(self, host, port): + self.host = host + self.port = port + + self.tmp_dir = None + self.child = None + + def kafka_run_class_env(self): + env = super(ZookeeperFixture, self).kafka_run_class_env() + env['LOG_DIR'] = os.path.join(self.tmp_dir, 'logs') + return env + + def out(self, message): + log.info("*** Zookeeper [%s:%d]: %s", self.host, self.port, message) + + def open(self): + self.tmp_dir = tempfile.mkdtemp() + self.out("Running local instance...") + log.info(" host = %s", self.host) + log.info(" port = %s", self.port) + log.info(" tmp_dir = %s", self.tmp_dir) + + # Generate configs + template = self.test_resource("zookeeper.properties") + properties = os.path.join(self.tmp_dir, "zookeeper.properties") + self.render_template(template, properties, vars(self)) + + # Configure Zookeeper child process + args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties) + env = self.kafka_run_class_env() + + # Party! + timeout = 5 + max_timeout = 30 + backoff = 1 + end_at = time.time() + max_timeout + tries = 1 + while time.time() < end_at: + self.out('Attempting to start (try #%d)' % tries) + try: + os.stat(properties) + except: + log.warning('Config %s not found -- re-rendering', properties) + self.render_template(template, properties, vars(self)) + self.child = SpawnedService(args, env) + self.child.start() + timeout = min(timeout, max(end_at - time.time(), 0)) + if self.child.wait_for(r"binding to port", timeout=timeout): + break + self.child.stop() + timeout *= 2 + time.sleep(backoff) + tries += 1 + else: + raise Exception('Failed to start Zookeeper before max_timeout') + self.out("Done!") + atexit.register(self.close) + + def close(self): + if self.child is None: + return + self.out("Stopping...") + self.child.stop() + self.child = None + self.out("Done!") + shutil.rmtree(self.tmp_dir) + + def __del__(self): + self.close() + + +class KafkaFixture(Fixture): + @classmethod + def instance(cls, broker_id, zk_host, zk_port, zk_chroot=None, port=None, + transport='PLAINTEXT', replicas=1, partitions=2): + if zk_chroot is None: + zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_") + if "KAFKA_URI" in os.environ: + parse = urlparse(os.environ["KAFKA_URI"]) + (host, port) = (parse.hostname, parse.port) + fixture = ExternalService(host, port) + else: + if port is None: + port = 9092 + # force IPv6 here because of a confusing point: + # + # - if the string "localhost" is passed, Kafka will *only* bind to the IPv4 address of localhost + # (127.0.0.1); however, kafka-python will attempt to connect on ::1 and fail + # + # - if the address literal 127.0.0.1 is passed, the metadata request during bootstrap will return + # the name "localhost" and we'll go back to the first case. This is odd! + # + # Ideally, Kafka would bind to all loopback addresses when we tell it to listen on "localhost" the + # way it makes an IPv6 socket bound to both 0.0.0.0/0 and ::/0 when we tell it to bind to "" (that is + # to say, when we make a listener of PLAINTEXT://:port. + # + # Note that even though we specify the bind host in bracket notation, Kafka responds to the bootstrap + # metadata request without square brackets later. + host = "127.0.0.1" + fixture = KafkaFixture(host, port, broker_id, + zk_host, zk_port, zk_chroot, + transport=transport, + replicas=replicas, partitions=partitions) + fixture.open() + return fixture + + def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, + replicas=1, partitions=2, transport='PLAINTEXT'): + self.host = host + self.port = port + + self.broker_id = broker_id + self.transport = transport.upper() + self.ssl_dir = self.test_resource('ssl') + + self.zk_host = zk_host + self.zk_port = zk_port + self.zk_chroot = zk_chroot + + self.replicas = replicas + self.partitions = partitions + + self.tmp_dir = None + self.child = None + self.running = False + + def kafka_run_class_env(self): + env = super(KafkaFixture, self).kafka_run_class_env() + env['LOG_DIR'] = os.path.join(self.tmp_dir, 'logs') + return env + + def out(self, message): + log.info("*** Kafka [%s:%d]: %s", self.host, self.port, message) + + def open(self): + if self.running: + self.out("Instance already running") + return + + self.tmp_dir = tempfile.mkdtemp() + self.out("Running local instance...") + log.info(" host = %s", self.host) + log.info(" port = %s", self.port) + log.info(" transport = %s", self.transport) + log.info(" broker_id = %s", self.broker_id) + log.info(" zk_host = %s", self.zk_host) + log.info(" zk_port = %s", self.zk_port) + log.info(" zk_chroot = %s", self.zk_chroot) + log.info(" replicas = %s", self.replicas) + log.info(" partitions = %s", self.partitions) + log.info(" tmp_dir = %s", self.tmp_dir) + + # Create directories + os.mkdir(os.path.join(self.tmp_dir, "logs")) + os.mkdir(os.path.join(self.tmp_dir, "data")) + + # Generate configs + template = self.test_resource("kafka.properties") + properties = os.path.join(self.tmp_dir, "kafka.properties") + self.render_template(template, properties, vars(self)) + + # Party! + self.out("Creating Zookeeper chroot node...") + args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain", + "-server", "%s:%d" % (self.zk_host, self.zk_port), + "create", + "/%s" % self.zk_chroot, + "kafka-python") + env = self.kafka_run_class_env() + proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + if proc.wait() != 0: + self.out("Failed to create Zookeeper chroot node") + self.out(proc.stdout.read()) + self.out(proc.stderr.read()) + raise RuntimeError("Failed to create Zookeeper chroot node") + self.out("Done!") + + # Configure Kafka child process + args = self.kafka_run_class_args("kafka.Kafka", properties) + env = self.kafka_run_class_env() + + timeout = 5 + max_timeout = 30 + backoff = 1 + end_at = time.time() + max_timeout + tries = 1 + while time.time() < end_at: + self.out('Attempting to start (try #%d)' % tries) + try: + os.stat(properties) + except: + log.warning('Config %s not found -- re-rendering', properties) + self.render_template(template, properties, vars(self)) + self.child = SpawnedService(args, env) + self.child.start() + timeout = min(timeout, max(end_at - time.time(), 0)) + if self.child.wait_for(r"\[Kafka Server %d\], Started" % + self.broker_id, timeout=timeout): + break + self.child.stop() + timeout *= 2 + time.sleep(backoff) + tries += 1 + else: + raise Exception('Failed to start KafkaInstance before max_timeout') + self.out("Done!") + self.running = True + atexit.register(self.close) + + def __del__(self): + self.close() + + def close(self): + if not self.running: + self.out("Instance already stopped") + return + + self.out("Stopping...") + self.child.stop() + self.child = None + self.out("Done!") + shutil.rmtree(self.tmp_dir) + self.running = False diff --git a/frontera/tests/kafka_utils/service.py b/frontera/tests/kafka_utils/service.py new file mode 100644 index 000000000..bfbe77ee4 --- /dev/null +++ b/frontera/tests/kafka_utils/service.py @@ -0,0 +1,117 @@ +import logging +import os +import re +import select +import subprocess +import sys +import threading +import time + +__all__ = [ + 'ExternalService', + 'SpawnedService', +] + +log = logging.getLogger(__name__) + + +class ExternalService(object): + def __init__(self, host, port): + log.info("Using already running service at %s:%d", host, port) + self.host = host + self.port = port + + def open(self): + pass + + def close(self): + pass + + +class SpawnedService(threading.Thread): + def __init__(self, args=None, env=None): + super(SpawnedService, self).__init__() + + if args is None: + raise TypeError("args parameter is required") + self.args = args + self.env = env + self.captured_stdout = [] + self.captured_stderr = [] + + self.should_die = threading.Event() + self.child = None + self.alive = False + self.daemon = True + + def _spawn(self): + if self.alive: + return + if self.child and self.child.poll() is None: + return + + self.child = subprocess.Popen( + self.args, + preexec_fn=os.setsid, # to avoid propagating signals + env=self.env, + bufsize=1, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + self.alive = True + + def _despawn(self): + if self.child.poll() is None: + self.child.terminate() + self.alive = False + for _ in range(50): + if self.child.poll() is not None: + self.child = None + break + time.sleep(0.1) + else: + self.child.kill() + + def run(self): + self._spawn() + while True: + (rds, _, _) = select.select([self.child.stdout, self.child.stderr], [], [], 1) + + if self.child.stdout in rds: + line = self.child.stdout.readline() + self.captured_stdout.append(line.decode('utf-8').rstrip()) + + if self.child.stderr in rds: + line = self.child.stderr.readline() + self.captured_stderr.append(line.decode('utf-8').rstrip()) + + if self.child.poll() is not None: + self.dump_logs() + break + + if self.should_die.is_set(): + self._despawn() + break + + def dump_logs(self): + sys.stderr.write('\n'.join(self.captured_stderr)) + sys.stdout.write('\n'.join(self.captured_stdout)) + + def wait_for(self, pattern, timeout=30): + start = time.time() + while True: + elapsed = time.time() - start + if elapsed >= timeout: + log.error("Waiting for %r timed out after %d seconds", pattern, timeout) + return False + + if re.search(pattern, '\n'.join(self.captured_stdout), re.IGNORECASE) is not None: + log.info("Found pattern %r in %d seconds via stdout", pattern, elapsed) + return True + if re.search(pattern, '\n'.join(self.captured_stderr), re.IGNORECASE) is not None: + log.info("Found pattern %r in %d seconds via stderr", pattern, elapsed) + return True + time.sleep(0.1) + + def stop(self): + self.should_die.set() + self.join() diff --git a/frontera/tests/test_message_bus.py b/frontera/tests/test_message_bus.py index 39ddde787..99ae5299e 100644 --- a/frontera/tests/test_message_bus.py +++ b/frontera/tests/test_message_bus.py @@ -5,6 +5,9 @@ from frontera.utils.fingerprint import sha1 from random import randint from time import sleep +import os +from kafka_utils.case import KafkaIntegrationTestCase, random_string +from kafka_utils.fixtures import ZookeeperFixture, KafkaFixture import logging @@ -54,8 +57,6 @@ def sw_activity(self): c = 0 p = 0 for m in self.sw_sl_c.get_messages(timeout=1.0, count=512): - - if m.startswith('http://helloworld.com/'): p += 1 self.sw_us_p.send(None, 'message' + str(0) + "," + str(c)) @@ -98,25 +99,50 @@ def test_zmq_message_bus(): Test MessageBus with default settings, IPv6 and Star as ZMQ_ADDRESS """ tester = MessageBusTester(ZeroMQMessageBus) - tester.spider_log_activity(64) assert tester.sw_activity() == 64 assert tester.db_activity(128) == (64, 32) assert tester.spider_feed_activity() == 128 -def test_kafka_message_bus(): - """ - Test MessageBus with default settings, IPv6 and Star as ZMQ_ADDRESS - """ - logging.basicConfig(level=logging.INFO) - kafkabus = logging.getLogger("kafkabus") - kafkabus.addHandler(logging.StreamHandler()) - settings = Settings() - settings.set('KAFKA_LOCATION', 'localhost:9092') - settings.set('FRONTIER_GROUP', 'frontier2') - tester = MessageBusTester(KafkaMessageBus, settings) - tester.spider_log_activity(64) - assert tester.sw_activity() == 64 - assert tester.db_activity(128) == (64, 32) - assert tester.spider_feed_activity() == 128 +class KafkaMessageBusTestCase(KafkaIntegrationTestCase): + @classmethod + def setUpClass(cls): + logging.basicConfig(level=logging.DEBUG) + logger = logging.getLogger("frontera.tests.kafka_utils.service") + logger.addHandler(logging.StreamHandler()) + if not os.environ.get('KAFKA_VERSION'): + return + + cls.zk = ZookeeperFixture.instance() + chroot = random_string(10) + cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port, + zk_chroot=chroot, partitions=1) + + @classmethod + def tearDownClass(cls): + if not os.environ.get('KAFKA_VERSION'): + return + + cls.server.close() + cls.zk.close() + + def test_kafka_message_bus(self): + """ + Test MessageBus with default settings, IPv6 and Star as ZMQ_ADDRESS + """ + self.client.ensure_topic_exists("frontier-todo") + self.client.ensure_topic_exists("frontier-done") + self.client.ensure_topic_exists("frontier-score") + + #logging.basicConfig(level=logging.INFO) + #kafkabus = logging.getLogger("kafkabus") + #kafkabus.addHandler(logging.StreamHandler()) + settings = Settings() + settings.set('KAFKA_LOCATION', '%s:%s' % (self.server.host, self.server.port)) + settings.set('FRONTIER_GROUP', 'frontier2') + tester = MessageBusTester(KafkaMessageBus, settings) + tester.spider_log_activity(64) + assert tester.sw_activity() == 64 + assert tester.db_activity(128) == (64, 32) + assert tester.spider_feed_activity() == 128 \ No newline at end of file diff --git a/tox.ini b/tox.ini index bd06a1693..2d811d6dd 100644 --- a/tox.ini +++ b/tox.ini @@ -14,6 +14,9 @@ deps = -r{toxinidir}/requirements/tests.txt commands = py.test -s -v {envsitepackagesdir}/frontera +setenv = + PROJECT_ROOT = {toxinidir} +passenv = KAFKA_VERSION [testenv:flake8] changedir = {toxinidir} @@ -28,5 +31,5 @@ exclude = frontera/_version.py,versioneer.py,docs/source/conf.py,frontera/contri # Options for pytest [pytest] -addopts = -rsvXf +addopts = -rsvxXf -k test_kafka ignore=requirements From 664d3ada320fe10cea36aec955beadf26a1559c0 Mon Sep 17 00:00:00 2001 From: Alexander Sibiryakov Date: Tue, 10 May 2016 16:17:22 +0200 Subject: [PATCH 06/14] dedicated client per producer/consumer --- frontera/contrib/messagebus/kafka/__init__.py | 11 ++-- frontera/contrib/messagebus/kafkabus.py | 55 ++++++++----------- 2 files changed, 29 insertions(+), 37 deletions(-) diff --git a/frontera/contrib/messagebus/kafka/__init__.py b/frontera/contrib/messagebus/kafka/__init__.py index 2256481a1..d22ec9f62 100644 --- a/frontera/contrib/messagebus/kafka/__init__.py +++ b/frontera/contrib/messagebus/kafka/__init__.py @@ -2,15 +2,16 @@ from collections import namedtuple from logging import getLogger -from kafka.common import OffsetRequest, check_error, OffsetFetchRequest, UnknownTopicOrPartitionError +from kafka import KafkaClient +from kafka.common import OffsetRequestPayload, check_error, OffsetFetchRequestPayload, UnknownTopicOrPartitionError logger = getLogger("offset-fetcher") OffsetsStruct = namedtuple("OffsetsStruct", ["commit", "produced"]) class OffsetsFetcher(object): - def __init__(self, client, topic, group_id): - self._client = client + def __init__(self, location, topic, group_id): + self._client = KafkaClient(location) self._topic = topic self._group_id = group_id self._client.load_metadata_for_topics() @@ -29,7 +30,7 @@ def _update_produced_offsets(self): the earliest offset will always return you a single element. """ for partition in self._client.get_partition_ids_for_topic(self._topic): - reqs = [OffsetRequest(self._topic, partition, -1, 1)] + reqs = [OffsetRequestPayload(self._topic, partition, -1, 1)] (resp,) = self._client.send_offset_request(reqs) @@ -43,7 +44,7 @@ def _update_group_offsets(self): for partition in self._client.get_partition_ids_for_topic(self._topic): (resp,) = self._client.send_offset_fetch_request( self._group_id, - [OffsetFetchRequest(self._topic, partition)], + [OffsetFetchRequestPayload(self._topic, partition)], fail_on_error=False) try: check_error(resp) diff --git a/frontera/contrib/messagebus/kafkabus.py b/frontera/contrib/messagebus/kafkabus.py index 126e22682..1ee5772a3 100644 --- a/frontera/contrib/messagebus/kafkabus.py +++ b/frontera/contrib/messagebus/kafkabus.py @@ -19,8 +19,8 @@ class Consumer(BaseStreamConsumer): """ Used in DB and SW worker. SW consumes per partition. """ - def __init__(self, conn, topic, group, partition_id): - self._conn = conn + def __init__(self, location, topic, group, partition_id): + self._location = location self._group = group self._topic = topic self._partition_ids = [partition_id] if partition_id is not None else None @@ -31,8 +31,9 @@ def __init__(self, conn, topic, group, partition_id): def _connect_consumer(self): if self._cons is None: try: + self._client = KafkaClient(self._location) self._cons = SimpleConsumer( - self._conn, + self._client, self._group, self._topic, partitions=self._partition_ids, @@ -70,32 +71,28 @@ def get_offset(self): class SimpleProducer(BaseStreamProducer): - def __init__(self, connection, topic, codec): - self._connection = connection + def __init__(self, location, topic, codec): + self._location = location self._topic = topic self._codec = codec self._create() def _create(self): - self._producer = KafkaSimpleProducer(self._connection, codec=self._codec) + self._client = KafkaClient(self._location) + self._producer = KafkaSimpleProducer(self._client, codec=self._codec) def send(self, key, *messages): self._producer.send_messages(self._topic, *messages) - def flush(self): - self._producer.stop() - del self._producer - self._create() - def get_offset(self, partition_id): # Kafka has it's own offset management raise KeyError class KeyedProducer(BaseStreamProducer): - def __init__(self, connection, topic_done, partitioner_cls, codec): + def __init__(self, location, topic_done, partitioner_cls, codec): self._prod = None - self._conn = connection + self._location = location self._topic_done = topic_done self._partitioner_cls = partitioner_cls self._codec = codec @@ -103,7 +100,8 @@ def __init__(self, connection, topic_done, partitioner_cls, codec): def _connect_producer(self): if self._prod is None: try: - self._prod = KafkaKeyedProducer(self._conn, partitioner=self._partitioner_cls, codec=self._codec) + self._client = KafkaClient(self._location) + self._prod = KafkaKeyedProducer(self._client, partitioner=self._partitioner_cls, codec=self._codec) except BrokerResponseError: self._prod = None logger.warning("Could not connect producer to Kafka server") @@ -131,11 +129,6 @@ def send(self, key, *messages): sleep(1.0) return success - def flush(self): - if self._prod is not None: - self._prod.stop() - self._connect_producer() - def get_offset(self, partition_id): # Kafka has it's own offset management raise KeyError @@ -143,14 +136,14 @@ def get_offset(self, partition_id): class SpiderLogStream(BaseSpiderLogStream): def __init__(self, messagebus): - self._conn = messagebus.conn + self._location = messagebus.kafka_location self._db_group = messagebus.general_group self._sw_group = messagebus.sw_group self._topic_done = messagebus.topic_done self._codec = messagebus.codec def producer(self): - return KeyedProducer(self._conn, self._topic_done, FingerprintPartitioner, self._codec) + return KeyedProducer(self._location, self._topic_done, FingerprintPartitioner, self._codec) def consumer(self, partition_id, type): """ @@ -160,21 +153,21 @@ def consumer(self, partition_id, type): :return: """ group = self._sw_group if type == 'sw' else self._db_group - return Consumer(self._conn, self._topic_done, group, partition_id) + return Consumer(self._location, self._topic_done, group, partition_id) class SpiderFeedStream(BaseSpiderFeedStream): def __init__(self, messagebus): - self._conn = messagebus.conn + self._location = messagebus.kafka_location self._general_group = messagebus.general_group self._topic = messagebus.topic_todo self._max_next_requests = messagebus.max_next_requests self._hostname_partitioning = messagebus.hostname_partitioning - self._offset_fetcher = OffsetsFetcher(self._conn, self._topic, self._general_group) + self._offset_fetcher = OffsetsFetcher(self._location, self._topic, self._general_group) self._codec = messagebus.codec def consumer(self, partition_id): - return Consumer(self._conn, self._topic, self._general_group, partition_id) + return Consumer(self._location, self._topic, self._general_group, partition_id) def available_partitions(self): partitions = [] @@ -186,26 +179,25 @@ def available_partitions(self): def producer(self): partitioner = Crc32NamePartitioner if self._hostname_partitioning else FingerprintPartitioner - return KeyedProducer(self._conn, self._topic, partitioner, self._codec) + return KeyedProducer(self._location, self._topic, partitioner, self._codec) class ScoringLogStream(BaseScoringLogStream): def __init__(self, messagebus): self._topic = messagebus.topic_scoring self._group = messagebus.general_group - self._conn = messagebus.conn + self._location = messagebus.kafka_location self._codec = messagebus.codec def consumer(self): - return Consumer(self._conn, self._topic, self._group, partition_id=None) + return Consumer(self._location, self._topic, self._group, partition_id=None) def producer(self): - return SimpleProducer(self._conn, self._topic, self._codec) + return SimpleProducer(self._location, self._topic, self._codec) class MessageBus(BaseMessageBus): def __init__(self, settings): - server = settings.get('KAFKA_LOCATION') self.topic_todo = settings.get('OUTGOING_TOPIC') self.topic_done = settings.get('INCOMING_TOPIC') self.topic_scoring = settings.get('SCORING_TOPIC') @@ -216,8 +208,7 @@ def __init__(self, settings): self.hostname_partitioning = settings.get('QUEUE_HOSTNAME_PARTITIONING') codec = settings.get('KAFKA_CODEC') self.codec = codec if codec else CODEC_NONE - - self.conn = KafkaClient(server) + self.kafka_location = settings.get('KAFKA_LOCATION') def spider_log(self): return SpiderLogStream(self) From ad50442705619c7237d7913e27c407622a441c5a Mon Sep 17 00:00:00 2001 From: Alexander Sibiryakov Date: Tue, 10 May 2016 16:20:07 +0200 Subject: [PATCH 07/14] removing obsolete test case --- frontera/tests/test_kafka_import.py | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 frontera/tests/test_kafka_import.py diff --git a/frontera/tests/test_kafka_import.py b/frontera/tests/test_kafka_import.py deleted file mode 100644 index 7aaff4c30..000000000 --- a/frontera/tests/test_kafka_import.py +++ /dev/null @@ -1,7 +0,0 @@ -# -*- coding: utf-8 -*- - - -def test_kafka_messagebus_import(): - import frontera.contrib.messagebus.kafka - import frontera.contrib.messagebus.kafkabus - pass \ No newline at end of file From 2d567f8a217b288c52c1ee3e3e704488d0a73095 Mon Sep 17 00:00:00 2001 From: Alexander Sibiryakov Date: Wed, 11 May 2016 12:30:40 +0200 Subject: [PATCH 08/14] removing abstract declaration --- frontera/core/messagebus.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/frontera/core/messagebus.py b/frontera/core/messagebus.py index e690561e5..69d67ae05 100644 --- a/frontera/core/messagebus.py +++ b/frontera/core/messagebus.py @@ -38,7 +38,6 @@ def send(self, key, *messages): """ raise NotImplementedError - @abstractmethod def flush(self): """ Flushes all internal buffers. @@ -46,7 +45,6 @@ def flush(self): """ raise NotImplementedError - @abstractmethod def get_offset(self, partition_id): """ Returns producer offset for partition. Raises KeyError, if partition isn't available or doesn't exist. From 5df38b30768b6bf90e35735102372ad592f9dc2d Mon Sep 17 00:00:00 2001 From: Alexander Sibiryakov Date: Wed, 11 May 2016 12:37:21 +0200 Subject: [PATCH 09/14] using new kafka-python interfaces --- docs/source/topics/frontera-settings.rst | 13 +- frontera/contrib/backends/partitioners.py | 8 +- frontera/contrib/messagebus/kafkabus.py | 143 ++++++++++++---------- frontera/settings/default_settings.py | 3 +- frontera/tests/test_message_bus.py | 13 +- tox.ini | 2 +- 6 files changed, 101 insertions(+), 81 deletions(-) diff --git a/docs/source/topics/frontera-settings.rst b/docs/source/topics/frontera-settings.rst index 85b52bc2b..63ee4355e 100644 --- a/docs/source/topics/frontera-settings.rst +++ b/docs/source/topics/frontera-settings.rst @@ -570,15 +570,14 @@ KAFKA_LOCATION Hostname and port of kafka broker, separated with :. Can be a string with hostname:port pair separated with commas(,). -.. setting:: KAFKA_CODEC +.. setting:: KAFKA_COMPRESSION -KAFKA_CODEC -___________ +KAFKA_COMPRESSION +----------------- -Default:: ``CODEC_NONE`` +Default:: ``None`` -Kafka protocol compression codec, see kafka-python documentation for more details. Please use symbols from kafka-python -package. +Kafka's producer compression type string, see `kafka-python documentation`_ for more details. .. setting:: FRONTIER_GROUP @@ -635,3 +634,5 @@ Default settings If no settings are specified, frontier will use the built-in default ones. For a complete list of default values see: :ref:`Built-in settings reference `. All default settings can be overridden. + +.. _`kafka-python documentation`: http://kafka-python.readthedocs.io/en/1.1.1/apidoc/KafkaProducer.html diff --git a/frontera/contrib/backends/partitioners.py b/frontera/contrib/backends/partitioners.py index 466ffd310..8d3873c03 100644 --- a/frontera/contrib/backends/partitioners.py +++ b/frontera/contrib/backends/partitioners.py @@ -18,6 +18,9 @@ def partition_by_hash(self, value, partitions): idx = value % size return partitions[idx] + def __call__(self, key, all_partitions, available): + return self.partition(key, all_partitions) + class FingerprintPartitioner(Partitioner): def partition(self, key, partitions=None): @@ -26,4 +29,7 @@ def partition(self, key, partitions=None): digest = unhexlify(key[0:2] + key[5:7] + key[10:12] + key[15:17]) value = unpack(" Date: Wed, 25 May 2016 17:31:33 +0200 Subject: [PATCH 10/14] proper consumer initialization --- frontera/contrib/messagebus/kafkabus.py | 44 +++++++++++++++---------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/frontera/contrib/messagebus/kafkabus.py b/frontera/contrib/messagebus/kafkabus.py index bc0b7d881..216f17f3e 100644 --- a/frontera/contrib/messagebus/kafkabus.py +++ b/frontera/contrib/messagebus/kafkabus.py @@ -79,28 +79,34 @@ def __init__(self, location, topic, group, partition_id): self._location = location self._group = group self._topic = topic - self._partition_ids = [partition_id] if partition_id is not None else None - self._consumer = KafkaConsumer( bootstrap_servers=self._location, group_id=self._group, - max_partition_fetch_bytes=10485760) - if self._partition_ids: - self._consumer.assign([TopicPartition(self._topic, pid) for pid in self._partition_ids]) + max_partition_fetch_bytes=10485760, + consumer_timeout_ms=100, + client_id="%s-%s" % (self._topic, str(partition_id) if partition_id else "all") + ) + if partition_id: + self._partition_ids = [TopicPartition(self._topic, partition_id)] + self._consumer.assign(self._partition_ids) else: - self._consumer.subscribe(self._topic) + self._partition_ids = [TopicPartition(self._topic, pid) for pid in self._consumer.partitions_for_topic(self._topic)] + self._consumer.subscribe(topics=[self._topic]) + + for tp in self._partition_ids: + self._consumer.committed(tp) + self._consumer._update_fetch_positions(self._partition_ids) def get_messages(self, timeout=0.1, count=1): - while True: + result = [] + while count > 0: try: - batch = self._consumer.poll(timeout_ms=timeout) - for _, records in batch.iteritems(): - for record in records: - yield record.value - except Exception, err: - logger.warning("Error %s" % err) - finally: + m = next(self._consumer) + result.append(m.value) + count -= 1 + except StopIteration: break + return result def get_offset(self): return 0 @@ -163,7 +169,8 @@ def consumer(self, partition_id, type): :return: """ group = self._sw_group if type == 'sw' else self._db_group - return DeprecatedConsumer(self._location, self._topic_done, group, partition_id) + #return DeprecatedConsumer(self._location, self._topic_done, group, partition_id) + return Consumer(self._location, self._topic_done, group, partition_id) class SpiderFeedStream(BaseSpiderFeedStream): @@ -178,7 +185,8 @@ def __init__(self, messagebus): self._partitions = messagebus.spider_feed_partitions def consumer(self, partition_id): - return DeprecatedConsumer(self._location, self._topic, self._general_group, partition_id) + #return DeprecatedConsumer(self._location, self._topic, self._general_group, partition_id) + return Consumer(self._location, self._topic, self._general_group, partition_id) def available_partitions(self): partitions = [] @@ -202,7 +210,9 @@ def __init__(self, messagebus): self._compression_type = messagebus.compression_type def consumer(self): - return DeprecatedConsumer(self._location, self._topic, self._group, partition_id=None) + #return DeprecatedConsumer(self._location, self._topic, self._group, partition_id=None) + return Consumer(self._location, self._topic, self._group, partition_id=None) + def producer(self): return SimpleProducer(self._location, self._topic, self._compression_type) From 8ea81dd3750f564e862a269c57f6aa8058e58f9c Mon Sep 17 00:00:00 2001 From: Alexander Sibiryakov Date: Thu, 26 May 2016 11:53:37 +0200 Subject: [PATCH 11/14] log errors, and offset metadata --- frontera/contrib/messagebus/kafka/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/frontera/contrib/messagebus/kafka/__init__.py b/frontera/contrib/messagebus/kafka/__init__.py index d22ec9f62..bee926202 100644 --- a/frontera/contrib/messagebus/kafka/__init__.py +++ b/frontera/contrib/messagebus/kafka/__init__.py @@ -48,7 +48,8 @@ def _update_group_offsets(self): fail_on_error=False) try: check_error(resp) - except UnknownTopicOrPartitionError: + except Exception, exc: + logger.error(exc) pass if resp.offset == -1: @@ -67,5 +68,6 @@ def get(self): for partition in self._client.get_partition_ids_for_topic(self._topic): produced = self._offsets.produced[partition] lag = produced - self._offsets.commit[partition] if self._offsets.commit[partition] else 0.0 + logger.debug("%s (%s): %s, %s, %s", self._topic, partition, produced, self._offsets.commit[partition], lag) lags[partition] = lag return lags \ No newline at end of file From 6d3c5e58b24a263014f8bbdec4993abd9d7b11b6 Mon Sep 17 00:00:00 2001 From: Alexander Sibiryakov Date: Thu, 26 May 2016 14:05:14 +0200 Subject: [PATCH 12/14] crash fix, KAFKA_USE_SIMPLE_CONSUMER setting --- docs/source/topics/frontera-settings.rst | 11 +++++++++++ frontera/contrib/messagebus/kafkabus.py | 18 ++++++++++-------- frontera/settings/default_settings.py | 3 ++- frontera/worker/db.py | 5 ++--- 4 files changed, 25 insertions(+), 12 deletions(-) diff --git a/docs/source/topics/frontera-settings.rst b/docs/source/topics/frontera-settings.rst index 63ee4355e..41f1b1766 100644 --- a/docs/source/topics/frontera-settings.rst +++ b/docs/source/topics/frontera-settings.rst @@ -579,6 +579,17 @@ Default:: ``None`` Kafka's producer compression type string, see `kafka-python documentation`_ for more details. +.. setting:: KAFKA_USE_SIMPLE_CONSUMER + +KAFKA_USE_SIMPLE_CONSUMER +------------------------- + +Default:: ``False`` + +In case of ``True`` Kafka :term:`message bus` will use Simple* deprecated interfaces from ``kafka-python`` package. For +older (<0.9.0) Kafka versions this allows to enable consumer offsets auto commit, and therefore have a working flow +control in :term:`db worker`. On the other side, older versions doesn't support automatic consumer rebalancing. + .. setting:: FRONTIER_GROUP FRONTIER_GROUP diff --git a/frontera/contrib/messagebus/kafkabus.py b/frontera/contrib/messagebus/kafkabus.py index 216f17f3e..2a6ae98ee 100644 --- a/frontera/contrib/messagebus/kafkabus.py +++ b/frontera/contrib/messagebus/kafkabus.py @@ -6,7 +6,6 @@ from kafka import KafkaClient, SimpleConsumer from kafka import KafkaConsumer, KafkaProducer, TopicPartition from kafka.common import BrokerResponseError -from kafka.protocol import CODEC_NONE from frontera.contrib.backends.partitioners import FingerprintPartitioner, Crc32NamePartitioner from frontera.contrib.messagebus.kafka import OffsetsFetcher @@ -147,6 +146,9 @@ def send(self, key, *messages): def flush(self): self._producer.flush() + def get_offset(self, partition_id): + pass + class SpiderLogStream(BaseSpiderLogStream): def __init__(self, messagebus): @@ -156,6 +158,7 @@ def __init__(self, messagebus): self._topic_done = messagebus.topic_done self._compression_type = messagebus.compression_type self._partitions = messagebus.spider_log_partitions + self._consumer_cls = DeprecatedConsumer if messagebus.use_simple_consumer else Consumer def producer(self): return KeyedProducer(self._location, self._topic_done, FingerprintPartitioner(self._partitions), @@ -169,8 +172,7 @@ def consumer(self, partition_id, type): :return: """ group = self._sw_group if type == 'sw' else self._db_group - #return DeprecatedConsumer(self._location, self._topic_done, group, partition_id) - return Consumer(self._location, self._topic_done, group, partition_id) + return self._consumer_cls(self._location, self._topic_done, group, partition_id) class SpiderFeedStream(BaseSpiderFeedStream): @@ -183,10 +185,10 @@ def __init__(self, messagebus): self._offset_fetcher = OffsetsFetcher(self._location, self._topic, self._general_group) self._compression_type = messagebus.compression_type self._partitions = messagebus.spider_feed_partitions + self._consumer_cls = DeprecatedConsumer if messagebus.use_simple_consumer else Consumer def consumer(self, partition_id): - #return DeprecatedConsumer(self._location, self._topic, self._general_group, partition_id) - return Consumer(self._location, self._topic, self._general_group, partition_id) + return self._consumer_cls(self._location, self._topic, self._general_group, partition_id) def available_partitions(self): partitions = [] @@ -208,11 +210,10 @@ def __init__(self, messagebus): self._group = messagebus.general_group self._location = messagebus.kafka_location self._compression_type = messagebus.compression_type + self._consumer_cls = DeprecatedConsumer if messagebus.use_simple_consumer else Consumer def consumer(self): - #return DeprecatedConsumer(self._location, self._topic, self._group, partition_id=None) - return Consumer(self._location, self._topic, self._group, partition_id=None) - + return self._consumer_cls(self._location, self._topic, self._group, partition_id=None) def producer(self): return SimpleProducer(self._location, self._topic, self._compression_type) @@ -232,6 +233,7 @@ def __init__(self, settings): self.kafka_location = settings.get('KAFKA_LOCATION') self.spider_log_partitions = settings.get('SPIDER_LOG_PARTITIONS') self.spider_feed_partitions = settings.get('SPIDER_FEED_PARTITIONS') + self.use_simple_consumer = settings.get('KAFKA_USE_SIMPLE_CONSUMER') def spider_log(self): return SpiderLogStream(self) diff --git a/frontera/settings/default_settings.py b/frontera/settings/default_settings.py index e7cb3a02a..5b1c94c93 100644 --- a/frontera/settings/default_settings.py +++ b/frontera/settings/default_settings.py @@ -64,4 +64,5 @@ INCOMING_TOPIC = "frontier-done" SCORING_TOPIC = "frontier-score" FRONTIER_GROUP = "general" -SCORING_GROUP = "strategy-workers" \ No newline at end of file +SCORING_GROUP = "strategy-workers" +KAFKA_USE_SIMPLE_CONSUMER = False \ No newline at end of file diff --git a/frontera/worker/db.py b/frontera/worker/db.py index a83cadc0b..1b166e8d1 100644 --- a/frontera/worker/db.py +++ b/frontera/worker/db.py @@ -147,9 +147,8 @@ def consume_incoming(self, *args, **kwargs): self._backend.request_error(request, error) if type == 'offset': _, partition_id, offset = msg - try: - producer_offset = self.spider_feed_producer.get_offset(partition_id) - except KeyError: + producer_offset = self.spider_feed_producer.get_offset(partition_id) + if producer_offset == None: continue else: lag = producer_offset - offset From 800131fe7e899fa0cf2c2326690aa36805369f2c Mon Sep 17 00:00:00 2001 From: Alexander Sibiryakov Date: Thu, 26 May 2016 14:06:27 +0200 Subject: [PATCH 13/14] changing requirements for kafka-python --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 5ed41879a..4c8f3055b 100644 --- a/setup.py +++ b/setup.py @@ -62,7 +62,7 @@ 'msgpack-python' ], 'kafka': [ - 'kafka-python<=0.9.5', + 'kafka-python>=1.0.0', 'python-snappy' ], 'distributed': [ From 145d0b58da0ce5175ba2795ea76f63f1e2a12184 Mon Sep 17 00:00:00 2001 From: Alexander Sibiryakov Date: Thu, 26 May 2016 14:27:07 +0200 Subject: [PATCH 14/14] style fix --- frontera/worker/db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frontera/worker/db.py b/frontera/worker/db.py index 1b166e8d1..fd71da28f 100644 --- a/frontera/worker/db.py +++ b/frontera/worker/db.py @@ -148,7 +148,7 @@ def consume_incoming(self, *args, **kwargs): if type == 'offset': _, partition_id, offset = msg producer_offset = self.spider_feed_producer.get_offset(partition_id) - if producer_offset == None: + if producer_offset is None: continue else: lag = producer_offset - offset