Skip to content

Commit b94f6c0

Browse files
committed
using new kafka-python interfaces
1 parent aa2b18c commit b94f6c0

File tree

6 files changed

+101
-82
lines changed

6 files changed

+101
-82
lines changed

docs/source/topics/frontera-settings.rst

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -577,15 +577,14 @@ KAFKA_LOCATION
577577

578578
Hostname and port of kafka broker, separated with :. Can be a string with hostname:port pair separated with commas(,).
579579

580-
.. setting:: KAFKA_CODEC
580+
.. setting:: KAFKA_COMPRESSION
581581

582-
KAFKA_CODEC
583-
___________
582+
KAFKA_COMPRESSION
583+
-----------------
584584

585-
Default:: ``CODEC_NONE``
585+
Default:: ``None``
586586

587-
Kafka protocol compression codec, see kafka-python documentation for more details. Please use symbols from kafka-python
588-
package.
587+
Kafka's producer compression type string, see `kafka-python documentation`_ for more details.
589588

590589
.. setting:: FRONTIER_GROUP
591590

@@ -636,8 +635,6 @@ Default: ``frontier-score``
636635
Kafka topic used for :term:`scoring log` stream.
637636

638637

639-
640-
641638
Default settings
642639
================
643640

@@ -681,3 +678,5 @@ Values::
681678

682679
EVENT_LOG_MANAGER = 'frontera.logger.events.EventLogManager'
683680

681+
682+
.. _`kafka-python documentation`: http://kafka-python.readthedocs.io/en/1.1.1/apidoc/KafkaProducer.html

frontera/contrib/backends/partitioners.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ def partition_by_hash(self, value, partitions):
1818
idx = value % size
1919
return partitions[idx]
2020

21+
def __call__(self, key, all_partitions, available):
22+
return self.partition(key, all_partitions)
23+
2124

2225
class FingerprintPartitioner(Partitioner):
2326
def partition(self, key, partitions=None):
@@ -26,4 +29,7 @@ def partition(self, key, partitions=None):
2629
digest = unhexlify(key[0:2] + key[5:7] + key[10:12] + key[15:17])
2730
value = unpack("<I", digest)
2831
idx = value[0] % len(partitions)
29-
return partitions[idx]
32+
return partitions[idx]
33+
34+
def __call__(self, key, all_partitions, available):
35+
return self.partition(key, all_partitions)

frontera/contrib/messagebus/kafkabus.py

Lines changed: 78 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
11
# -*- coding: utf-8 -*-
22
from __future__ import absolute_import
3-
from frontera.core.messagebus import BaseMessageBus, BaseSpiderLogStream, BaseSpiderFeedStream, \
4-
BaseStreamConsumer, BaseScoringLogStream, BaseStreamProducer
53

6-
from kafka import KafkaClient, SimpleConsumer, KeyedProducer as KafkaKeyedProducer, SimpleProducer as KafkaSimpleProducer
7-
from kafka.common import BrokerResponseError, MessageSizeTooLargeError
4+
from logging import getLogger
5+
6+
from kafka import KafkaClient, SimpleConsumer
7+
from kafka import KafkaConsumer, KafkaProducer, TopicPartition
8+
from kafka.common import BrokerResponseError
89
from kafka.protocol import CODEC_NONE
910

1011
from frontera.contrib.backends.partitioners import FingerprintPartitioner, Crc32NamePartitioner
1112
from frontera.contrib.messagebus.kafka import OffsetsFetcher
12-
from logging import getLogger
13-
from time import sleep
13+
from frontera.core.messagebus import BaseMessageBus, BaseSpiderLogStream, BaseSpiderFeedStream, \
14+
BaseStreamConsumer, BaseScoringLogStream, BaseStreamProducer
1415

1516
logger = getLogger("kafkabus")
1617

1718

18-
class Consumer(BaseStreamConsumer):
19+
class DeprecatedConsumer(BaseStreamConsumer):
1920
"""
2021
Used in DB and SW worker. SW consumes per partition.
2122
"""
@@ -70,68 +71,75 @@ def get_offset(self):
7071
return 0
7172

7273

74+
class Consumer(BaseStreamConsumer):
75+
"""
76+
Used in DB and SW worker. SW consumes per partition.
77+
"""
78+
def __init__(self, location, topic, group, partition_id):
79+
self._location = location
80+
self._group = group
81+
self._topic = topic
82+
self._partition_ids = [partition_id] if partition_id is not None else None
83+
84+
self._consumer = KafkaConsumer(
85+
bootstrap_servers=self._location,
86+
group_id=self._group,
87+
max_partition_fetch_bytes=10485760)
88+
if self._partition_ids:
89+
self._consumer.assign([TopicPartition(self._topic, pid) for pid in self._partition_ids])
90+
else:
91+
self._consumer.subscribe(self._topic)
92+
93+
def get_messages(self, timeout=0.1, count=1):
94+
while True:
95+
try:
96+
batch = self._consumer.poll(timeout_ms=timeout)
97+
for _, records in batch.iteritems():
98+
for record in records:
99+
yield record.value
100+
except Exception, err:
101+
logger.warning("Error %s" % err)
102+
finally:
103+
break
104+
105+
def get_offset(self):
106+
return 0
107+
108+
73109
class SimpleProducer(BaseStreamProducer):
74-
def __init__(self, location, topic, codec):
110+
def __init__(self, location, topic, compression):
75111
self._location = location
76112
self._topic = topic
77-
self._codec = codec
113+
self._compression = compression
78114
self._create()
79115

80116
def _create(self):
81-
self._client = KafkaClient(self._location)
82-
self._producer = KafkaSimpleProducer(self._client, codec=self._codec)
117+
self._producer = KafkaProducer(bootstrap_servers=self._location, retries=5,
118+
compression_type=self._compression)
83119

84120
def send(self, key, *messages):
85-
self._producer.send_messages(self._topic, *messages)
121+
for msg in messages:
122+
self._producer.send(self._topic, value=msg)
86123

87-
def get_offset(self, partition_id):
88-
# Kafka has it's own offset management
89-
raise KeyError
124+
def flush(self):
125+
self._producer.flush()
90126

91127

92128
class KeyedProducer(BaseStreamProducer):
93-
def __init__(self, location, topic_done, partitioner_cls, codec):
94-
self._prod = None
129+
def __init__(self, location, topic_done, partitioner, compression):
95130
self._location = location
96131
self._topic_done = topic_done
97-
self._partitioner_cls = partitioner_cls
98-
self._codec = codec
99-
100-
def _connect_producer(self):
101-
if self._prod is None:
102-
try:
103-
self._client = KafkaClient(self._location)
104-
self._prod = KafkaKeyedProducer(self._client, partitioner=self._partitioner_cls, codec=self._codec)
105-
except BrokerResponseError:
106-
self._prod = None
107-
logger.warning("Could not connect producer to Kafka server")
108-
return False
109-
return True
132+
self._partitioner = partitioner
133+
self._compression = compression
134+
self._producer = KafkaProducer(bootstrap_servers=self._location, partitioner=partitioner, retries=5,
135+
compression_type=self._compression)
110136

111137
def send(self, key, *messages):
112-
success = False
113-
max_tries = 5
114-
if self._connect_producer():
115-
n_tries = 0
116-
while not success and n_tries < max_tries:
117-
try:
118-
self._prod.send_messages(self._topic_done, key, *messages)
119-
success = True
120-
except MessageSizeTooLargeError, e:
121-
logger.error(str(e))
122-
break
123-
except BrokerResponseError:
124-
n_tries += 1
125-
logger.warning(
126-
"Could not send message. Try {0}/{1}".format(
127-
n_tries, max_tries)
128-
)
129-
sleep(1.0)
130-
return success
131-
132-
def get_offset(self, partition_id):
133-
# Kafka has it's own offset management
134-
raise KeyError
138+
for msg in messages:
139+
self._producer.send(self._topic_done, key=key, value=msg)
140+
141+
def flush(self):
142+
self._producer.flush()
135143

136144

137145
class SpiderLogStream(BaseSpiderLogStream):
@@ -140,10 +148,12 @@ def __init__(self, messagebus):
140148
self._db_group = messagebus.general_group
141149
self._sw_group = messagebus.sw_group
142150
self._topic_done = messagebus.topic_done
143-
self._codec = messagebus.codec
151+
self._compression_type = messagebus.compression_type
152+
self._partitions = messagebus.spider_log_partitions
144153

145154
def producer(self):
146-
return KeyedProducer(self._location, self._topic_done, FingerprintPartitioner, self._codec)
155+
return KeyedProducer(self._location, self._topic_done, FingerprintPartitioner(self._partitions),
156+
self._compression_type)
147157

148158
def consumer(self, partition_id, type):
149159
"""
@@ -153,7 +163,7 @@ def consumer(self, partition_id, type):
153163
:return:
154164
"""
155165
group = self._sw_group if type == 'sw' else self._db_group
156-
return Consumer(self._location, self._topic_done, group, partition_id)
166+
return DeprecatedConsumer(self._location, self._topic_done, group, partition_id)
157167

158168

159169
class SpiderFeedStream(BaseSpiderFeedStream):
@@ -164,10 +174,11 @@ def __init__(self, messagebus):
164174
self._max_next_requests = messagebus.max_next_requests
165175
self._hostname_partitioning = messagebus.hostname_partitioning
166176
self._offset_fetcher = OffsetsFetcher(self._location, self._topic, self._general_group)
167-
self._codec = messagebus.codec
177+
self._compression_type = messagebus.compression_type
178+
self._partitions = messagebus.spider_feed_partitions
168179

169180
def consumer(self, partition_id):
170-
return Consumer(self._location, self._topic, self._general_group, partition_id)
181+
return DeprecatedConsumer(self._location, self._topic, self._general_group, partition_id)
171182

172183
def available_partitions(self):
173184
partitions = []
@@ -178,22 +189,23 @@ def available_partitions(self):
178189
return partitions
179190

180191
def producer(self):
181-
partitioner = Crc32NamePartitioner if self._hostname_partitioning else FingerprintPartitioner
182-
return KeyedProducer(self._location, self._topic, partitioner, self._codec)
192+
partitioner = Crc32NamePartitioner(self._partitions) if self._hostname_partitioning \
193+
else FingerprintPartitioner(self._partitions)
194+
return KeyedProducer(self._location, self._topic, partitioner, self._compression_type)
183195

184196

185197
class ScoringLogStream(BaseScoringLogStream):
186198
def __init__(self, messagebus):
187199
self._topic = messagebus.topic_scoring
188200
self._group = messagebus.general_group
189201
self._location = messagebus.kafka_location
190-
self._codec = messagebus.codec
202+
self._compression_type = messagebus.compression_type
191203

192204
def consumer(self):
193-
return Consumer(self._location, self._topic, self._group, partition_id=None)
205+
return DeprecatedConsumer(self._location, self._topic, self._group, partition_id=None)
194206

195207
def producer(self):
196-
return SimpleProducer(self._location, self._topic, self._codec)
208+
return SimpleProducer(self._location, self._topic, self._compression_type)
197209

198210

199211
class MessageBus(BaseMessageBus):
@@ -206,9 +218,10 @@ def __init__(self, settings):
206218
self.spider_partition_id = settings.get('SPIDER_PARTITION_ID')
207219
self.max_next_requests = settings.MAX_NEXT_REQUESTS
208220
self.hostname_partitioning = settings.get('QUEUE_HOSTNAME_PARTITIONING')
209-
codec = settings.get('KAFKA_CODEC')
210-
self.codec = codec if codec else CODEC_NONE
221+
self.compression_type = settings.get('KAFKA_COMPRESSION')
211222
self.kafka_location = settings.get('KAFKA_LOCATION')
223+
self.spider_log_partitions = settings.get('SPIDER_LOG_PARTITIONS')
224+
self.spider_feed_partitions = settings.get('SPIDER_FEED_PARTITIONS')
212225

213226
def spider_log(self):
214227
return SpiderLogStream(self)

frontera/settings/default_settings.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
#--------------------------------------------------------
5959
# Kafka
6060
#--------------------------------------------------------
61-
KAFKA_CODEC = None
61+
KAFKA_COMPRESSION = None
6262
OUTGOING_TOPIC = "frontier-todo"
6363
INCOMING_TOPIC = "frontier-done"
6464
SCORING_TOPIC = "frontier-score"

frontera/tests/test_message_bus.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ def spider_log_activity(self, messages):
4646
self.sp_sl_p.send(sha1(str(randint(1, 1000))), 'http://helloworld.com/way/to/the/sun/' + str(0))
4747
else:
4848
self.sp_sl_p.send(sha1(str(randint(1, 1000))), 'http://way.to.the.sun' + str(0))
49+
self.sp_sl_p.flush()
4950

5051
def spider_feed_activity(self):
5152
sf_c = 0
@@ -78,7 +79,7 @@ def db_activity(self, messages):
7879
self.db_sf_p.send("newhost", "http://newhost/new/url/to/crawl")
7980
else:
8081
self.db_sf_p.send("someotherhost", "http://newhost223/new/url/to/crawl")
81-
82+
self.db_sf_p.flush()
8283
return (sl_c, us_c)
8384

8485

@@ -108,9 +109,9 @@ def test_zmq_message_bus():
108109
class KafkaMessageBusTestCase(KafkaIntegrationTestCase):
109110
@classmethod
110111
def setUpClass(cls):
111-
logging.basicConfig(level=logging.DEBUG)
112-
logger = logging.getLogger("frontera.tests.kafka_utils.service")
113-
logger.addHandler(logging.StreamHandler())
112+
#logging.basicConfig(level=logging.DEBUG)
113+
#logger = logging.getLogger("frontera.tests.kafka_utils.service")
114+
#logger.addHandler(logging.StreamHandler())
114115
if not os.environ.get('KAFKA_VERSION'):
115116
return
116117

@@ -127,7 +128,7 @@ def tearDownClass(cls):
127128
cls.server.close()
128129
cls.zk.close()
129130

130-
def test_kafka_message_bus(self):
131+
def test_kafka_message_bus_integration(self):
131132
"""
132133
Test MessageBus with default settings, IPv6 and Star as ZMQ_ADDRESS
133134
"""
@@ -145,4 +146,4 @@ def test_kafka_message_bus(self):
145146
tester.spider_log_activity(64)
146147
assert tester.sw_activity() == 64
147148
assert tester.db_activity(128) == (64, 32)
148-
assert tester.spider_feed_activity() == 128
149+
assert tester.spider_feed_activity() == 128

tox.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,5 @@ exclude = frontera/_version.py,versioneer.py,docs/source/conf.py,frontera/contri
3131

3232
# Options for pytest
3333
[pytest]
34-
addopts = -rsvxXf -k test_kafka
34+
addopts = -rsvxXf
3535
ignore=requirements

0 commit comments

Comments
 (0)