diff --git a/.travis.yml b/.travis.yml index 61fc76a2e..2990a0a45 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,10 @@ language: python python: 2.7 env: - - TOXENV=py27 + - TOXENV=py27 KAFKA_VERSION=0.8.2.2 - TOXENV=flake8 +before_install: + - frontera/tests/kafka_integration.sh install: - pip install -U tox wheel - pip install -r requirements/tests.txt @@ -12,6 +14,10 @@ before_script: - mysql -u root -e "set global innodb_file_per_table=true;" - frontera/tests/run_zmq_broker.sh script: tox +cache: + directories: + - $HOME/.cache/pip + - servers/ deploy: provider: pypi distributions: sdist bdist_wheel diff --git a/docs/source/topics/frontera-settings.rst b/docs/source/topics/frontera-settings.rst index e2e799a94..41f1b1766 100644 --- a/docs/source/topics/frontera-settings.rst +++ b/docs/source/topics/frontera-settings.rst @@ -570,6 +570,26 @@ KAFKA_LOCATION Hostname and port of kafka broker, separated with :. Can be a string with hostname:port pair separated with commas(,). +.. setting:: KAFKA_COMPRESSION + +KAFKA_COMPRESSION +----------------- + +Default:: ``None`` + +Kafka's producer compression type string, see `kafka-python documentation`_ for more details. + +.. setting:: KAFKA_USE_SIMPLE_CONSUMER + +KAFKA_USE_SIMPLE_CONSUMER +------------------------- + +Default:: ``False`` + +In case of ``True`` Kafka :term:`message bus` will use Simple* deprecated interfaces from ``kafka-python`` package. For +older (<0.9.0) Kafka versions this allows to enable consumer offsets auto commit, and therefore have a working flow +control in :term:`db worker`. On the other side, older versions doesn't support automatic consumer rebalancing. + .. setting:: FRONTIER_GROUP FRONTIER_GROUP @@ -614,6 +634,8 @@ A group used by strategy workers for spider log reading. Needs to be different t SCORING_TOPIC ------------- +Default: ``frontier-score`` + Kafka topic used for :term:`scoring log` stream. @@ -623,3 +645,5 @@ Default settings If no settings are specified, frontier will use the built-in default ones. For a complete list of default values see: :ref:`Built-in settings reference `. All default settings can be overridden. + +.. _`kafka-python documentation`: http://kafka-python.readthedocs.io/en/1.1.1/apidoc/KafkaProducer.html diff --git a/frontera/contrib/backends/partitioners.py b/frontera/contrib/backends/partitioners.py index 466ffd310..8d3873c03 100644 --- a/frontera/contrib/backends/partitioners.py +++ b/frontera/contrib/backends/partitioners.py @@ -18,6 +18,9 @@ def partition_by_hash(self, value, partitions): idx = value % size return partitions[idx] + def __call__(self, key, all_partitions, available): + return self.partition(key, all_partitions) + class FingerprintPartitioner(Partitioner): def partition(self, key, partitions=None): @@ -26,4 +29,7 @@ def partition(self, key, partitions=None): digest = unhexlify(key[0:2] + key[5:7] + key[10:12] + key[15:17]) value = unpack(" 0: + try: + m = next(self._consumer) + result.append(m.value) + count -= 1 + except StopIteration: + break + return result + + def get_offset(self): + return 0 + + class SimpleProducer(BaseStreamProducer): - def __init__(self, connection, topic): - self._connection = connection + def __init__(self, location, topic, compression): + self._location = location self._topic = topic + self._compression = compression self._create() def _create(self): - self._producer = KafkaSimpleProducer(self._connection, codec=CODEC_SNAPPY) + self._producer = KafkaProducer(bootstrap_servers=self._location, retries=5, + compression_type=self._compression) def send(self, key, *messages): - self._producer.send_messages(self._topic, *messages) + for msg in messages: + self._producer.send(self._topic, value=msg) def flush(self): - self._producer.stop() - del self._producer - self._create() - - def get_offset(self, partition_id): - # Kafka has it's own offset management - raise KeyError + self._producer.flush() class KeyedProducer(BaseStreamProducer): - def __init__(self, connection, topic_done, partitioner_cls): - self._prod = None - self._conn = connection + def __init__(self, location, topic_done, partitioner, compression): + self._location = location self._topic_done = topic_done - self._partitioner_cls = partitioner_cls - - def _connect_producer(self): - if self._prod is None: - try: - self._prod = KafkaKeyedProducer(self._conn, partitioner=self._partitioner_cls, codec=CODEC_SNAPPY) - except BrokerResponseError: - self._prod = None - logger.warning("Could not connect producer to Kafka server") - return False - return True + self._partitioner = partitioner + self._compression = compression + self._producer = KafkaProducer(bootstrap_servers=self._location, partitioner=partitioner, retries=5, + compression_type=self._compression) def send(self, key, *messages): - success = False - max_tries = 5 - if self._connect_producer(): - n_tries = 0 - while not success and n_tries < max_tries: - try: - self._prod.send_messages(self._topic_done, key, *messages) - success = True - except MessageSizeTooLargeError, e: - logger.error(str(e)) - break - except BrokerResponseError: - n_tries += 1 - logger.warning( - "Could not send message. Try {0}/{1}".format( - n_tries, max_tries) - ) - sleep(1.0) - return success + for msg in messages: + self._producer.send(self._topic_done, key=key, value=msg) def flush(self): - if self._prod is not None: - self._prod.stop() + self._producer.flush() def get_offset(self, partition_id): - # Kafka has it's own offset management - raise KeyError + pass class SpiderLogStream(BaseSpiderLogStream): def __init__(self, messagebus): - self._conn = messagebus.conn + self._location = messagebus.kafka_location self._db_group = messagebus.general_group self._sw_group = messagebus.sw_group self._topic_done = messagebus.topic_done + self._compression_type = messagebus.compression_type + self._partitions = messagebus.spider_log_partitions + self._consumer_cls = DeprecatedConsumer if messagebus.use_simple_consumer else Consumer def producer(self): - return KeyedProducer(self._conn, self._topic_done, FingerprintPartitioner) + return KeyedProducer(self._location, self._topic_done, FingerprintPartitioner(self._partitions), + self._compression_type) def consumer(self, partition_id, type): """ @@ -155,20 +172,23 @@ def consumer(self, partition_id, type): :return: """ group = self._sw_group if type == 'sw' else self._db_group - return Consumer(self._conn, self._topic_done, group, partition_id) + return self._consumer_cls(self._location, self._topic_done, group, partition_id) class SpiderFeedStream(BaseSpiderFeedStream): def __init__(self, messagebus): - self._conn = messagebus.conn + self._location = messagebus.kafka_location self._general_group = messagebus.general_group self._topic = messagebus.topic_todo self._max_next_requests = messagebus.max_next_requests self._hostname_partitioning = messagebus.hostname_partitioning - self._offset_fetcher = OffsetsFetcher(self._conn, self._topic, self._general_group) + self._offset_fetcher = OffsetsFetcher(self._location, self._topic, self._general_group) + self._compression_type = messagebus.compression_type + self._partitions = messagebus.spider_feed_partitions + self._consumer_cls = DeprecatedConsumer if messagebus.use_simple_consumer else Consumer def consumer(self, partition_id): - return Consumer(self._conn, self._topic, self._general_group, partition_id) + return self._consumer_cls(self._location, self._topic, self._general_group, partition_id) def available_partitions(self): partitions = [] @@ -179,36 +199,41 @@ def available_partitions(self): return partitions def producer(self): - partitioner = Crc32NamePartitioner if self._hostname_partitioning else FingerprintPartitioner - return KeyedProducer(self._conn, self._topic, partitioner) + partitioner = Crc32NamePartitioner(self._partitions) if self._hostname_partitioning \ + else FingerprintPartitioner(self._partitions) + return KeyedProducer(self._location, self._topic, partitioner, self._compression_type) class ScoringLogStream(BaseScoringLogStream): def __init__(self, messagebus): self._topic = messagebus.topic_scoring self._group = messagebus.general_group - self._conn = messagebus.conn + self._location = messagebus.kafka_location + self._compression_type = messagebus.compression_type + self._consumer_cls = DeprecatedConsumer if messagebus.use_simple_consumer else Consumer def consumer(self): - return Consumer(self._conn, self._topic, self._group, partition_id=None) + return self._consumer_cls(self._location, self._topic, self._group, partition_id=None) def producer(self): - return SimpleProducer(self._conn, self._topic) + return SimpleProducer(self._location, self._topic, self._compression_type) class MessageBus(BaseMessageBus): def __init__(self, settings): - server = settings.get('KAFKA_LOCATION') - self.topic_todo = settings.get('OUTGOING_TOPIC', "frontier-todo") - self.topic_done = settings.get('INCOMING_TOPIC', "frontier-done") + self.topic_todo = settings.get('OUTGOING_TOPIC') + self.topic_done = settings.get('INCOMING_TOPIC') self.topic_scoring = settings.get('SCORING_TOPIC') - self.general_group = settings.get('FRONTIER_GROUP', "general") - self.sw_group = settings.get('SCORING_GROUP', "strategy-workers") + self.general_group = settings.get('FRONTIER_GROUP') + self.sw_group = settings.get('SCORING_GROUP') self.spider_partition_id = settings.get('SPIDER_PARTITION_ID') self.max_next_requests = settings.MAX_NEXT_REQUESTS self.hostname_partitioning = settings.get('QUEUE_HOSTNAME_PARTITIONING') - - self.conn = KafkaClient(server) + self.compression_type = settings.get('KAFKA_COMPRESSION') + self.kafka_location = settings.get('KAFKA_LOCATION') + self.spider_log_partitions = settings.get('SPIDER_LOG_PARTITIONS') + self.spider_feed_partitions = settings.get('SPIDER_FEED_PARTITIONS') + self.use_simple_consumer = settings.get('KAFKA_USE_SIMPLE_CONSUMER') def spider_log(self): return SpiderLogStream(self) diff --git a/frontera/core/messagebus.py b/frontera/core/messagebus.py index e690561e5..69d67ae05 100644 --- a/frontera/core/messagebus.py +++ b/frontera/core/messagebus.py @@ -38,7 +38,6 @@ def send(self, key, *messages): """ raise NotImplementedError - @abstractmethod def flush(self): """ Flushes all internal buffers. @@ -46,7 +45,6 @@ def flush(self): """ raise NotImplementedError - @abstractmethod def get_offset(self, partition_id): """ Returns producer offset for partition. Raises KeyError, if partition isn't available or doesn't exist. diff --git a/frontera/settings/default_settings.py b/frontera/settings/default_settings.py index 88370b23d..5b1c94c93 100644 --- a/frontera/settings/default_settings.py +++ b/frontera/settings/default_settings.py @@ -7,6 +7,7 @@ CONSUMER_BATCH_SIZE = 512 DELAY_ON_EMPTY = 5.0 DOMAIN_FINGERPRINT_FUNCTION = 'frontera.utils.fingerprint.sha1' +EVENT_LOG_MANAGER = 'frontera.logger.events.EventLogManager' HBASE_THRIFT_HOST = 'localhost' HBASE_THRIFT_PORT = 9090 @@ -19,6 +20,7 @@ HBASE_STATE_CACHE_SIZE_LIMIT = 3000000 HBASE_QUEUE_TABLE = 'queue' KAFKA_GET_TIMEOUT = 5.0 +LOGGING_CONFIG = 'logging.conf' MAX_NEXT_REQUESTS = 64 MAX_REQUESTS = 0 MESSAGE_BUS = 'frontera.contrib.messagebus.zeromq.MessageBus' @@ -53,4 +55,14 @@ ZMQ_ADDRESS = '127.0.0.1' ZMQ_BASE_PORT = 5550 -LOGGING_CONFIG = 'logging.conf' \ No newline at end of file + +#-------------------------------------------------------- +# Kafka +#-------------------------------------------------------- +KAFKA_COMPRESSION = None +OUTGOING_TOPIC = "frontier-todo" +INCOMING_TOPIC = "frontier-done" +SCORING_TOPIC = "frontier-score" +FRONTIER_GROUP = "general" +SCORING_GROUP = "strategy-workers" +KAFKA_USE_SIMPLE_CONSUMER = False \ No newline at end of file diff --git a/frontera/tests/kafka_integration.sh b/frontera/tests/kafka_integration.sh new file mode 100755 index 000000000..6b8d62e17 --- /dev/null +++ b/frontera/tests/kafka_integration.sh @@ -0,0 +1,85 @@ +#!/bin/bash -x + +# Versions available for testing via binary distributions +OFFICIAL_RELEASES="0.8.1.1 0.8.2.2 0.9.0.1" + +# Useful configuration vars, with sensible defaults +if [ -z "$SCALA_VERSION" ]; then + SCALA_VERSION=2.10 +fi + +# On travis CI, empty KAFKA_VERSION means skip integration tests +# so we dont try to get binaries +# Otherwise it means test all official releases, so we get all of them! +if [ -z "$KAFKA_VERSION" -a -z "$TRAVIS" ]; then + KAFKA_VERSION=$OFFICIAL_RELEASES +fi + +# By default look for binary releases at archive.apache.org +if [ -z "$DIST_BASE_URL" ]; then + DIST_BASE_URL="https://archive.apache.org/dist/kafka/" +fi + +# When testing against source builds, use this git repo +if [ -z "$KAFKA_SRC_GIT" ]; then + KAFKA_SRC_GIT="https://github.com/apache/kafka.git" +fi + +mkdir -p servers +pushd servers + mkdir -p dist + pushd dist + for kafka in $KAFKA_VERSION; do + if [ "$kafka" == "trunk" ]; then + if [ ! -d "$kafka" ]; then + git clone $KAFKA_SRC_GIT $kafka + fi + pushd $kafka + git pull + ./gradlew -PscalaVersion=$SCALA_VERSION -Pversion=$kafka releaseTarGz -x signArchives + popd + # Not sure how to construct the .tgz name accurately, so use a wildcard (ugh) + tar xzvf $kafka/core/build/distributions/kafka_*.tgz -C ../$kafka/ + rm $kafka/core/build/distributions/kafka_*.tgz + rm -rf ../$kafka/kafka-bin + mv ../$kafka/kafka_* ../$kafka/kafka-bin + else + echo "-------------------------------------" + echo "Checking kafka binaries for ${kafka}" + echo + # kafka 0.8.0 is only available w/ scala 2.8.0 + if [ "$kafka" == "0.8.0" ]; then + KAFKA_ARTIFACT="kafka_2.8.0-${kafka}" + else + KAFKA_ARTIFACT="kafka_${SCALA_VERSION}-${kafka}" + fi + if [ ! -f "../$kafka/kafka-bin/bin/kafka-run-class.sh" ]; then + echo "Downloading kafka ${kafka} tarball" + if hash wget 2>/dev/null; then + wget -N https://archive.apache.org/dist/kafka/$kafka/${KAFKA_ARTIFACT}.tgz || wget -N https://archive.apache.org/dist/kafka/$kafka/${KAFKA_ARTIFACT}.tar.gz + else + echo "wget not found... using curl" + if [ -f "${KAFKA_ARTIFACT}.tar.gz" ]; then + echo "Using cached artifact: ${KAFKA_ARTIFACT}.tar.gz" + else + curl -f https://archive.apache.org/dist/kafka/$kafka/${KAFKA_ARTIFACT}.tgz -o ${KAFKA_ARTIFACT}.tar.gz || curl -f https://archive.apache.org/dist/kafka/$kafka/${KAFKA_ARTIFACT}.tar.gz -o ${KAFKA_ARTIFACT}.tar.gz + fi + fi + echo + echo "Extracting kafka ${kafka} binaries" + mkdir -p ../$kafka/ + tar xzvf ${KAFKA_ARTIFACT}.t* -C ../$kafka/ + rm -rf ../$kafka/kafka-bin + mv ../$kafka/${KAFKA_ARTIFACT} ../$kafka/kafka-bin + if [ ! -f "../$kafka/kafka-bin/bin/kafka-run-class.sh" ]; then + echo "Extraction Failed ($kafka/kafka-bin/bin/kafka-run-class.sh does not exist)!" + exit 1 + fi + else + echo "$kafka is already installed in servers/$kafka/ -- skipping" + fi + fi + echo + done + popd +popd diff --git a/frontera/tests/kafka_utils/__init__.py b/frontera/tests/kafka_utils/__init__.py new file mode 100644 index 000000000..7c68785e9 --- /dev/null +++ b/frontera/tests/kafka_utils/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- \ No newline at end of file diff --git a/frontera/tests/kafka_utils/case.py b/frontera/tests/kafka_utils/case.py new file mode 100644 index 000000000..9b108db80 --- /dev/null +++ b/frontera/tests/kafka_utils/case.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +import unittest +import os +import uuid +import random +from string import ascii_letters +from kafka import SimpleClient +from kafka.structs import OffsetRequestPayload + + +def random_string(l): + return "".join(random.choice(ascii_letters) for i in xrange(l)) + + +class KafkaIntegrationTestCase(unittest.TestCase): + create_client = True + topic = None + zk = None + server = None + + def setUp(self): + super(KafkaIntegrationTestCase, self).setUp() + if not os.environ.get('KAFKA_VERSION'): + self.skipTest('Integration test requires KAFKA_VERSION') + + if not self.topic: + topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10)) + self.topic = topic + + if self.create_client: + self.client = SimpleClient('%s:%d' % (self.server.host, self.server.port)) + + self.client.ensure_topic_exists(self.topic) + + self._messages = {} + + def tearDown(self): + super(KafkaIntegrationTestCase, self).tearDown() + if not os.environ.get('KAFKA_VERSION'): + return + + if self.create_client: + self.client.close() + + def current_offset(self, topic, partition): + try: + offsets, = self.client.send_offset_request([OffsetRequestPayload(topic, partition, -1, 1)]) + except: + # XXX: We've seen some UnknownErrors here and cant debug w/o server logs + self.zk.child.dump_logs() + self.server.child.dump_logs() + raise + else: + return offsets.offsets[0] + + def msgs(self, iterable): + return [self.msg(x) for x in iterable] + + def msg(self, s): + if s not in self._messages: + self._messages[s] = '%s-%s-%s' % (s, self.id(), str(uuid.uuid4())) + + return self._messages[s].encode('utf-8') + + def key(self, k): + return k.encode('utf-8') \ No newline at end of file diff --git a/frontera/tests/kafka_utils/configs/0.8.1.1/resources/kafka.properties b/frontera/tests/kafka_utils/configs/0.8.1.1/resources/kafka.properties new file mode 100644 index 000000000..685aed15e --- /dev/null +++ b/frontera/tests/kafka_utils/configs/0.8.1.1/resources/kafka.properties @@ -0,0 +1,124 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id={broker_id} + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port={port} + +# Hostname the broker will bind to. If not set, the server will bind to all interfaces +host.name={host} + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#advertised.host.name= + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port= + +# The number of threads handling network requests +num.network.threads=2 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma seperated list of directories under which to store log files +log.dirs={tmp_dir}/data + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions={partitions} +default.replication.factor={replicas} + +## Short Replica Lag -- Drops failed brokers out of ISR +replica.lag.time.max.ms=1000 +replica.socket.timeout.ms=1000 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=536870912 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=60000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect={zk_host}:{zk_port}/{zk_chroot} + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=1000000 +# We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly +zookeeper.session.timeout.ms=500 diff --git a/frontera/tests/kafka_utils/configs/0.8.1.1/resources/log4j.properties b/frontera/tests/kafka_utils/configs/0.8.1.1/resources/log4j.properties new file mode 100644 index 000000000..b0b76aa79 --- /dev/null +++ b/frontera/tests/kafka_utils/configs/0.8.1.1/resources/log4j.properties @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, stdout, logfile + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.logfile=org.apache.log4j.FileAppender +log4j.appender.logfile.File=${kafka.logs.dir}/server.log +log4j.appender.logfile.layout=org.apache.log4j.PatternLayout +log4j.appender.logfile.layout.ConversionPattern=[%d] %p %m (%c)%n diff --git a/frontera/tests/kafka_utils/configs/0.8.1.1/resources/zookeeper.properties b/frontera/tests/kafka_utils/configs/0.8.1.1/resources/zookeeper.properties new file mode 100644 index 000000000..e3fd09742 --- /dev/null +++ b/frontera/tests/kafka_utils/configs/0.8.1.1/resources/zookeeper.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir={tmp_dir} +# the port at which the clients will connect +clientPort={port} +clientPortAddress={host} +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 diff --git a/frontera/tests/kafka_utils/configs/0.8.2.1/resources/kafka.properties b/frontera/tests/kafka_utils/configs/0.8.2.1/resources/kafka.properties new file mode 100644 index 000000000..685aed15e --- /dev/null +++ b/frontera/tests/kafka_utils/configs/0.8.2.1/resources/kafka.properties @@ -0,0 +1,124 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id={broker_id} + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port={port} + +# Hostname the broker will bind to. If not set, the server will bind to all interfaces +host.name={host} + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#advertised.host.name= + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port= + +# The number of threads handling network requests +num.network.threads=2 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma seperated list of directories under which to store log files +log.dirs={tmp_dir}/data + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions={partitions} +default.replication.factor={replicas} + +## Short Replica Lag -- Drops failed brokers out of ISR +replica.lag.time.max.ms=1000 +replica.socket.timeout.ms=1000 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=536870912 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=60000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect={zk_host}:{zk_port}/{zk_chroot} + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=1000000 +# We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly +zookeeper.session.timeout.ms=500 diff --git a/frontera/tests/kafka_utils/configs/0.8.2.1/resources/log4j.properties b/frontera/tests/kafka_utils/configs/0.8.2.1/resources/log4j.properties new file mode 100644 index 000000000..b0b76aa79 --- /dev/null +++ b/frontera/tests/kafka_utils/configs/0.8.2.1/resources/log4j.properties @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, stdout, logfile + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.logfile=org.apache.log4j.FileAppender +log4j.appender.logfile.File=${kafka.logs.dir}/server.log +log4j.appender.logfile.layout=org.apache.log4j.PatternLayout +log4j.appender.logfile.layout.ConversionPattern=[%d] %p %m (%c)%n diff --git a/frontera/tests/kafka_utils/configs/0.8.2.1/resources/zookeeper.properties b/frontera/tests/kafka_utils/configs/0.8.2.1/resources/zookeeper.properties new file mode 100644 index 000000000..e3fd09742 --- /dev/null +++ b/frontera/tests/kafka_utils/configs/0.8.2.1/resources/zookeeper.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir={tmp_dir} +# the port at which the clients will connect +clientPort={port} +clientPortAddress={host} +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 diff --git a/frontera/tests/kafka_utils/configs/0.8.2.2/resources/kafka.properties b/frontera/tests/kafka_utils/configs/0.8.2.2/resources/kafka.properties new file mode 100644 index 000000000..685aed15e --- /dev/null +++ b/frontera/tests/kafka_utils/configs/0.8.2.2/resources/kafka.properties @@ -0,0 +1,124 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id={broker_id} + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port={port} + +# Hostname the broker will bind to. If not set, the server will bind to all interfaces +host.name={host} + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#advertised.host.name= + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port= + +# The number of threads handling network requests +num.network.threads=2 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma seperated list of directories under which to store log files +log.dirs={tmp_dir}/data + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions={partitions} +default.replication.factor={replicas} + +## Short Replica Lag -- Drops failed brokers out of ISR +replica.lag.time.max.ms=1000 +replica.socket.timeout.ms=1000 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=536870912 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=60000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect={zk_host}:{zk_port}/{zk_chroot} + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=1000000 +# We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly +zookeeper.session.timeout.ms=500 diff --git a/frontera/tests/kafka_utils/configs/0.8.2.2/resources/log4j.properties b/frontera/tests/kafka_utils/configs/0.8.2.2/resources/log4j.properties new file mode 100644 index 000000000..b0b76aa79 --- /dev/null +++ b/frontera/tests/kafka_utils/configs/0.8.2.2/resources/log4j.properties @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, stdout, logfile + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.logfile=org.apache.log4j.FileAppender +log4j.appender.logfile.File=${kafka.logs.dir}/server.log +log4j.appender.logfile.layout=org.apache.log4j.PatternLayout +log4j.appender.logfile.layout.ConversionPattern=[%d] %p %m (%c)%n diff --git a/frontera/tests/kafka_utils/configs/0.8.2.2/resources/zookeeper.properties b/frontera/tests/kafka_utils/configs/0.8.2.2/resources/zookeeper.properties new file mode 100644 index 000000000..e3fd09742 --- /dev/null +++ b/frontera/tests/kafka_utils/configs/0.8.2.2/resources/zookeeper.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir={tmp_dir} +# the port at which the clients will connect +clientPort={port} +clientPortAddress={host} +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 diff --git a/frontera/tests/kafka_utils/fixtures.py b/frontera/tests/kafka_utils/fixtures.py new file mode 100644 index 000000000..1b8d73ca8 --- /dev/null +++ b/frontera/tests/kafka_utils/fixtures.py @@ -0,0 +1,335 @@ +import atexit +import logging +import os +import os.path +import shutil +import subprocess +import tempfile +import time +import uuid + +from six.moves import urllib +from six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401 + +from service import ExternalService, SpawnedService + + +log = logging.getLogger(__name__) + + +class Fixture(object): + kafka_version = os.environ.get('KAFKA_VERSION', '0.8.0') + scala_version = os.environ.get("SCALA_VERSION", '2.8.0') + project_root = os.environ.get('PROJECT_ROOT', os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + kafka_root = os.environ.get("KAFKA_ROOT", os.path.join(project_root, 'servers', kafka_version, "kafka-bin")) + ivy_root = os.environ.get('IVY_ROOT', os.path.expanduser("~/.ivy2/cache")) + + @classmethod + def download_official_distribution(cls, + kafka_version=None, + scala_version=None, + output_dir=None): + if not kafka_version: + kafka_version = cls.kafka_version + if not scala_version: + scala_version = cls.scala_version + if not output_dir: + output_dir = os.path.join(cls.project_root, 'servers', 'dist') + + distfile = 'kafka_%s-%s' % (scala_version, kafka_version,) + url_base = 'https://archive.apache.org/dist/kafka/%s/' % (kafka_version,) + output_file = os.path.join(output_dir, distfile + '.tgz') + + if os.path.isfile(output_file): + log.info("Found file already on disk: %s", output_file) + return output_file + + # New tarballs are .tgz, older ones are sometimes .tar.gz + try: + url = url_base + distfile + '.tgz' + log.info("Attempting to download %s", url) + response = urllib.request.urlopen(url) + except urllib.error.HTTPError: + log.exception("HTTP Error") + url = url_base + distfile + '.tar.gz' + log.info("Attempting to download %s", url) + response = urllib.request.urlopen(url) + + log.info("Saving distribution file to %s", output_file) + with open(output_file, 'w') as output_file_fd: + output_file_fd.write(response.read()) + + return output_file + + @classmethod + def test_resource(cls, filename): + return os.path.join(cls.project_root, "frontera", "tests", "kafka_utils", "configs", + cls.kafka_version, "resources", filename) + + @classmethod + def kafka_run_class_args(cls, *args): + result = [os.path.join(cls.kafka_root, 'bin', 'kafka-run-class.sh')] + result.extend(args) + return result + + def kafka_run_class_env(self): + env = os.environ.copy() + env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % self.test_resource("log4j.properties") + return env + + @classmethod + def render_template(cls, source_file, target_file, binding): + log.info('Rendering %s from template %s', target_file, source_file) + with open(source_file, "r") as handle: + template = handle.read() + assert len(template) > 0, 'Empty template %s' % source_file + with open(target_file, "w") as handle: + handle.write(template.format(**binding)) + handle.flush() + os.fsync(handle) + + # fsync directory for durability + # https://blog.gocept.com/2013/07/15/reliable-file-updates-with-python/ + dirfd = os.open(os.path.dirname(target_file), os.O_DIRECTORY) + os.fsync(dirfd) + os.close(dirfd) + + +class ZookeeperFixture(Fixture): + @classmethod + def instance(cls): + if "ZOOKEEPER_URI" in os.environ: + parse = urlparse(os.environ["ZOOKEEPER_URI"]) + (host, port) = (parse.hostname, parse.port) + fixture = ExternalService(host, port) + else: + (host, port) = ("127.0.0.1", 2181) + fixture = cls(host, port) + + fixture.open() + return fixture + + def __init__(self, host, port): + self.host = host + self.port = port + + self.tmp_dir = None + self.child = None + + def kafka_run_class_env(self): + env = super(ZookeeperFixture, self).kafka_run_class_env() + env['LOG_DIR'] = os.path.join(self.tmp_dir, 'logs') + return env + + def out(self, message): + log.info("*** Zookeeper [%s:%d]: %s", self.host, self.port, message) + + def open(self): + self.tmp_dir = tempfile.mkdtemp() + self.out("Running local instance...") + log.info(" host = %s", self.host) + log.info(" port = %s", self.port) + log.info(" tmp_dir = %s", self.tmp_dir) + + # Generate configs + template = self.test_resource("zookeeper.properties") + properties = os.path.join(self.tmp_dir, "zookeeper.properties") + self.render_template(template, properties, vars(self)) + + # Configure Zookeeper child process + args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties) + env = self.kafka_run_class_env() + + # Party! + timeout = 5 + max_timeout = 30 + backoff = 1 + end_at = time.time() + max_timeout + tries = 1 + while time.time() < end_at: + self.out('Attempting to start (try #%d)' % tries) + try: + os.stat(properties) + except: + log.warning('Config %s not found -- re-rendering', properties) + self.render_template(template, properties, vars(self)) + self.child = SpawnedService(args, env) + self.child.start() + timeout = min(timeout, max(end_at - time.time(), 0)) + if self.child.wait_for(r"binding to port", timeout=timeout): + break + self.child.stop() + timeout *= 2 + time.sleep(backoff) + tries += 1 + else: + raise Exception('Failed to start Zookeeper before max_timeout') + self.out("Done!") + atexit.register(self.close) + + def close(self): + if self.child is None: + return + self.out("Stopping...") + self.child.stop() + self.child = None + self.out("Done!") + shutil.rmtree(self.tmp_dir) + + def __del__(self): + self.close() + + +class KafkaFixture(Fixture): + @classmethod + def instance(cls, broker_id, zk_host, zk_port, zk_chroot=None, port=None, + transport='PLAINTEXT', replicas=1, partitions=2): + if zk_chroot is None: + zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_") + if "KAFKA_URI" in os.environ: + parse = urlparse(os.environ["KAFKA_URI"]) + (host, port) = (parse.hostname, parse.port) + fixture = ExternalService(host, port) + else: + if port is None: + port = 9092 + # force IPv6 here because of a confusing point: + # + # - if the string "localhost" is passed, Kafka will *only* bind to the IPv4 address of localhost + # (127.0.0.1); however, kafka-python will attempt to connect on ::1 and fail + # + # - if the address literal 127.0.0.1 is passed, the metadata request during bootstrap will return + # the name "localhost" and we'll go back to the first case. This is odd! + # + # Ideally, Kafka would bind to all loopback addresses when we tell it to listen on "localhost" the + # way it makes an IPv6 socket bound to both 0.0.0.0/0 and ::/0 when we tell it to bind to "" (that is + # to say, when we make a listener of PLAINTEXT://:port. + # + # Note that even though we specify the bind host in bracket notation, Kafka responds to the bootstrap + # metadata request without square brackets later. + host = "127.0.0.1" + fixture = KafkaFixture(host, port, broker_id, + zk_host, zk_port, zk_chroot, + transport=transport, + replicas=replicas, partitions=partitions) + fixture.open() + return fixture + + def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, + replicas=1, partitions=2, transport='PLAINTEXT'): + self.host = host + self.port = port + + self.broker_id = broker_id + self.transport = transport.upper() + self.ssl_dir = self.test_resource('ssl') + + self.zk_host = zk_host + self.zk_port = zk_port + self.zk_chroot = zk_chroot + + self.replicas = replicas + self.partitions = partitions + + self.tmp_dir = None + self.child = None + self.running = False + + def kafka_run_class_env(self): + env = super(KafkaFixture, self).kafka_run_class_env() + env['LOG_DIR'] = os.path.join(self.tmp_dir, 'logs') + return env + + def out(self, message): + log.info("*** Kafka [%s:%d]: %s", self.host, self.port, message) + + def open(self): + if self.running: + self.out("Instance already running") + return + + self.tmp_dir = tempfile.mkdtemp() + self.out("Running local instance...") + log.info(" host = %s", self.host) + log.info(" port = %s", self.port) + log.info(" transport = %s", self.transport) + log.info(" broker_id = %s", self.broker_id) + log.info(" zk_host = %s", self.zk_host) + log.info(" zk_port = %s", self.zk_port) + log.info(" zk_chroot = %s", self.zk_chroot) + log.info(" replicas = %s", self.replicas) + log.info(" partitions = %s", self.partitions) + log.info(" tmp_dir = %s", self.tmp_dir) + + # Create directories + os.mkdir(os.path.join(self.tmp_dir, "logs")) + os.mkdir(os.path.join(self.tmp_dir, "data")) + + # Generate configs + template = self.test_resource("kafka.properties") + properties = os.path.join(self.tmp_dir, "kafka.properties") + self.render_template(template, properties, vars(self)) + + # Party! + self.out("Creating Zookeeper chroot node...") + args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain", + "-server", "%s:%d" % (self.zk_host, self.zk_port), + "create", + "/%s" % self.zk_chroot, + "kafka-python") + env = self.kafka_run_class_env() + proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + if proc.wait() != 0: + self.out("Failed to create Zookeeper chroot node") + self.out(proc.stdout.read()) + self.out(proc.stderr.read()) + raise RuntimeError("Failed to create Zookeeper chroot node") + self.out("Done!") + + # Configure Kafka child process + args = self.kafka_run_class_args("kafka.Kafka", properties) + env = self.kafka_run_class_env() + + timeout = 5 + max_timeout = 30 + backoff = 1 + end_at = time.time() + max_timeout + tries = 1 + while time.time() < end_at: + self.out('Attempting to start (try #%d)' % tries) + try: + os.stat(properties) + except: + log.warning('Config %s not found -- re-rendering', properties) + self.render_template(template, properties, vars(self)) + self.child = SpawnedService(args, env) + self.child.start() + timeout = min(timeout, max(end_at - time.time(), 0)) + if self.child.wait_for(r"\[Kafka Server %d\], Started" % + self.broker_id, timeout=timeout): + break + self.child.stop() + timeout *= 2 + time.sleep(backoff) + tries += 1 + else: + raise Exception('Failed to start KafkaInstance before max_timeout') + self.out("Done!") + self.running = True + atexit.register(self.close) + + def __del__(self): + self.close() + + def close(self): + if not self.running: + self.out("Instance already stopped") + return + + self.out("Stopping...") + self.child.stop() + self.child = None + self.out("Done!") + shutil.rmtree(self.tmp_dir) + self.running = False diff --git a/frontera/tests/kafka_utils/service.py b/frontera/tests/kafka_utils/service.py new file mode 100644 index 000000000..bfbe77ee4 --- /dev/null +++ b/frontera/tests/kafka_utils/service.py @@ -0,0 +1,117 @@ +import logging +import os +import re +import select +import subprocess +import sys +import threading +import time + +__all__ = [ + 'ExternalService', + 'SpawnedService', +] + +log = logging.getLogger(__name__) + + +class ExternalService(object): + def __init__(self, host, port): + log.info("Using already running service at %s:%d", host, port) + self.host = host + self.port = port + + def open(self): + pass + + def close(self): + pass + + +class SpawnedService(threading.Thread): + def __init__(self, args=None, env=None): + super(SpawnedService, self).__init__() + + if args is None: + raise TypeError("args parameter is required") + self.args = args + self.env = env + self.captured_stdout = [] + self.captured_stderr = [] + + self.should_die = threading.Event() + self.child = None + self.alive = False + self.daemon = True + + def _spawn(self): + if self.alive: + return + if self.child and self.child.poll() is None: + return + + self.child = subprocess.Popen( + self.args, + preexec_fn=os.setsid, # to avoid propagating signals + env=self.env, + bufsize=1, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + self.alive = True + + def _despawn(self): + if self.child.poll() is None: + self.child.terminate() + self.alive = False + for _ in range(50): + if self.child.poll() is not None: + self.child = None + break + time.sleep(0.1) + else: + self.child.kill() + + def run(self): + self._spawn() + while True: + (rds, _, _) = select.select([self.child.stdout, self.child.stderr], [], [], 1) + + if self.child.stdout in rds: + line = self.child.stdout.readline() + self.captured_stdout.append(line.decode('utf-8').rstrip()) + + if self.child.stderr in rds: + line = self.child.stderr.readline() + self.captured_stderr.append(line.decode('utf-8').rstrip()) + + if self.child.poll() is not None: + self.dump_logs() + break + + if self.should_die.is_set(): + self._despawn() + break + + def dump_logs(self): + sys.stderr.write('\n'.join(self.captured_stderr)) + sys.stdout.write('\n'.join(self.captured_stdout)) + + def wait_for(self, pattern, timeout=30): + start = time.time() + while True: + elapsed = time.time() - start + if elapsed >= timeout: + log.error("Waiting for %r timed out after %d seconds", pattern, timeout) + return False + + if re.search(pattern, '\n'.join(self.captured_stdout), re.IGNORECASE) is not None: + log.info("Found pattern %r in %d seconds via stdout", pattern, elapsed) + return True + if re.search(pattern, '\n'.join(self.captured_stderr), re.IGNORECASE) is not None: + log.info("Found pattern %r in %d seconds via stderr", pattern, elapsed) + return True + time.sleep(0.1) + + def stop(self): + self.should_die.set() + self.join() diff --git a/frontera/tests/test_kafka_import.py b/frontera/tests/test_kafka_import.py deleted file mode 100644 index 7aaff4c30..000000000 --- a/frontera/tests/test_kafka_import.py +++ /dev/null @@ -1,7 +0,0 @@ -# -*- coding: utf-8 -*- - - -def test_kafka_messagebus_import(): - import frontera.contrib.messagebus.kafka - import frontera.contrib.messagebus.kafkabus - pass \ No newline at end of file diff --git a/frontera/tests/test_message_bus.py b/frontera/tests/test_message_bus.py index 2428e46d8..1334fdbb7 100644 --- a/frontera/tests/test_message_bus.py +++ b/frontera/tests/test_message_bus.py @@ -1,16 +1,21 @@ # -*- coding: utf-8 -*- from frontera.settings import Settings -from frontera.contrib.messagebus.zeromq import MessageBus +from frontera.contrib.messagebus.zeromq import MessageBus as ZeroMQMessageBus +from frontera.contrib.messagebus.kafkabus import MessageBus as KafkaMessageBus from frontera.utils.fingerprint import sha1 from random import randint from time import sleep +import os +from kafka_utils.case import KafkaIntegrationTestCase, random_string +from kafka_utils.fixtures import ZookeeperFixture, KafkaFixture +import logging class MessageBusTester(object): - def __init__(self, settings=Settings()): + def __init__(self, cls, settings=Settings()): settings.set('SPIDER_FEED_PARTITIONS', 1) settings.set('QUEUE_HOSTNAME_PARTITIONING', True) - self.messagebus = MessageBus(settings) + self.messagebus = cls(settings) spiderlog = self.messagebus.spider_log() # sw @@ -41,6 +46,7 @@ def spider_log_activity(self, messages): self.sp_sl_p.send(sha1(str(randint(1, 1000))), 'http://helloworld.com/way/to/the/sun/' + str(0)) else: self.sp_sl_p.send(sha1(str(randint(1, 1000))), 'http://way.to.the.sun' + str(0)) + self.sp_sl_p.flush() def spider_feed_activity(self): sf_c = 0 @@ -50,10 +56,13 @@ def spider_feed_activity(self): def sw_activity(self): c = 0 + p = 0 for m in self.sw_sl_c.get_messages(timeout=1.0, count=512): if m.startswith('http://helloworld.com/'): + p += 1 self.sw_us_p.send(None, 'message' + str(0) + "," + str(c)) c += 1 + assert p > 0 return c def db_activity(self, messages): @@ -70,7 +79,7 @@ def db_activity(self, messages): self.db_sf_p.send("newhost", "http://newhost/new/url/to/crawl") else: self.db_sf_p.send("someotherhost", "http://newhost223/new/url/to/crawl") - + self.db_sf_p.flush() return (sl_c, us_c) @@ -90,9 +99,51 @@ def test_zmq_message_bus(): """ Test MessageBus with default settings, IPv6 and Star as ZMQ_ADDRESS """ - tester = MessageBusTester() - + tester = MessageBusTester(ZeroMQMessageBus) tester.spider_log_activity(64) assert tester.sw_activity() == 64 assert tester.db_activity(128) == (64, 32) assert tester.spider_feed_activity() == 128 + + +class KafkaMessageBusTestCase(KafkaIntegrationTestCase): + @classmethod + def setUpClass(cls): + #logging.basicConfig(level=logging.DEBUG) + #logger = logging.getLogger("frontera.tests.kafka_utils.service") + #logger.addHandler(logging.StreamHandler()) + if not os.environ.get('KAFKA_VERSION'): + return + + cls.zk = ZookeeperFixture.instance() + chroot = random_string(10) + cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port, + zk_chroot=chroot, partitions=1) + + @classmethod + def tearDownClass(cls): + if not os.environ.get('KAFKA_VERSION'): + return + + cls.server.close() + cls.zk.close() + + def test_kafka_message_bus_integration(self): + """ + Test MessageBus with default settings, IPv6 and Star as ZMQ_ADDRESS + """ + self.client.ensure_topic_exists("frontier-todo") + self.client.ensure_topic_exists("frontier-done") + self.client.ensure_topic_exists("frontier-score") + + #logging.basicConfig(level=logging.INFO) + #kafkabus = logging.getLogger("kafkabus") + #kafkabus.addHandler(logging.StreamHandler()) + settings = Settings() + settings.set('KAFKA_LOCATION', '%s:%s' % (self.server.host, self.server.port)) + settings.set('FRONTIER_GROUP', 'frontier2') + tester = MessageBusTester(KafkaMessageBus, settings) + tester.spider_log_activity(64) + assert tester.sw_activity() == 64 + assert tester.db_activity(128) == (64, 32) + assert tester.spider_feed_activity() == 128 diff --git a/frontera/worker/db.py b/frontera/worker/db.py index a83cadc0b..fd71da28f 100644 --- a/frontera/worker/db.py +++ b/frontera/worker/db.py @@ -147,9 +147,8 @@ def consume_incoming(self, *args, **kwargs): self._backend.request_error(request, error) if type == 'offset': _, partition_id, offset = msg - try: - producer_offset = self.spider_feed_producer.get_offset(partition_id) - except KeyError: + producer_offset = self.spider_feed_producer.get_offset(partition_id) + if producer_offset is None: continue else: lag = producer_offset - offset diff --git a/requirements/tests.txt b/requirements/tests.txt index 485f80ba0..1cd7850ef 100644 --- a/requirements/tests.txt +++ b/requirements/tests.txt @@ -8,5 +8,5 @@ SQLAlchemy>=1.0.0 cachetools pyzmq msgpack-python -kafka-python<=0.9.5 +kafka-python>=1.0.0 diff --git a/setup.py b/setup.py index 5ed41879a..4c8f3055b 100644 --- a/setup.py +++ b/setup.py @@ -62,7 +62,7 @@ 'msgpack-python' ], 'kafka': [ - 'kafka-python<=0.9.5', + 'kafka-python>=1.0.0', 'python-snappy' ], 'distributed': [ diff --git a/tox.ini b/tox.ini index bd06a1693..0f006f6ce 100644 --- a/tox.ini +++ b/tox.ini @@ -14,6 +14,9 @@ deps = -r{toxinidir}/requirements/tests.txt commands = py.test -s -v {envsitepackagesdir}/frontera +setenv = + PROJECT_ROOT = {toxinidir} +passenv = KAFKA_VERSION [testenv:flake8] changedir = {toxinidir} @@ -28,5 +31,5 @@ exclude = frontera/_version.py,versioneer.py,docs/source/conf.py,frontera/contri # Options for pytest [pytest] -addopts = -rsvXf +addopts = -rsvxXf ignore=requirements