Skip to content

Commit c95eabe

Browse files
committed
Merge pull request #1 from confluentinc/kafkatest
kafkatest, refactoring, and bug fixes
2 parents 5a579d8 + f853b46 commit c95eabe

16 files changed

+741
-70
lines changed

MANIFEST.in

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
include README.md
2-
include *.c *.h
2+
include confluent_kafka/src/*.[ch]
33

confluent_kafka/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
__all__ = ['cimpl','kafkatest']
2+
from .cimpl import *

confluent_kafka/kafkatest/README

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
FIXME: Instructions on how to use this.
2+
3+
4+
Usage:
5+
6+
python -m confluent_kafka.kafkatest.verifiable_consumer <options>
7+
8+
python -m confluent_kafka.kafkatest.verifiable_producer <options>

confluent_kafka/kafkatest/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
""" Python client implementations of the official Kafka tests/kafkatest clients. """
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# Copyright 2016 Confluent Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
import signal, socket, os, sys, time, json, re, datetime
17+
18+
19+
class VerifiableClient(object):
20+
"""
21+
Generic base class for a kafkatest verifiable client.
22+
Implements the common kafkatest protocol and semantics.
23+
"""
24+
def __init__ (self, conf):
25+
"""
26+
"""
27+
super(VerifiableClient, self).__init__()
28+
self.conf = conf
29+
self.conf['client.id'] = 'python@' + socket.gethostname()
30+
self.run = True
31+
signal.signal(signal.SIGTERM, self.sig_term)
32+
self.dbg('Pid is %d' % os.getpid())
33+
34+
def sig_term (self, sig, frame):
35+
self.dbg('SIGTERM')
36+
self.run = False
37+
38+
@staticmethod
39+
def _timestamp ():
40+
return time.strftime('%H:%M:%S', time.localtime())
41+
42+
def dbg (self, s):
43+
""" Debugging printout """
44+
sys.stderr.write('%% %s DEBUG: %s\n' % (self._timestamp(), s))
45+
46+
def err (self, s, term=False):
47+
""" Error printout, if term=True the process will terminate immediately. """
48+
sys.stderr.write('%% %s ERROR: %s\n' % (self._timestamp(), s))
49+
if term:
50+
sys.stderr.write('%% FATAL ERROR ^\n')
51+
sys.exit(1)
52+
53+
def send (self, d):
54+
""" Send dict as JSON to stdout for consumtion by kafkatest handler """
55+
d['_time'] = str(datetime.datetime.now())
56+
self.dbg('SEND: %s' % json.dumps(d))
57+
sys.stdout.write('%s\n' % json.dumps(d))
58+
sys.stdout.flush()
59+
60+
61+
@staticmethod
62+
def set_config (conf, args):
63+
""" Set client config properties using args dict. """
64+
for n,v in args.iteritems():
65+
if v is None:
66+
continue
67+
# Things to ignore
68+
if '.' not in n:
69+
# App config, skip
70+
continue
71+
if n.startswith('topic.'):
72+
# Set "topic.<...>" properties on default topic conf dict
73+
conf['default.topic.config'][n[6:]] = v
74+
elif n == 'partition.assignment.strategy':
75+
# Convert Java class name to config value.
76+
# "org.apache.kafka.clients.consumer.RangeAssignor" -> "range"
77+
conf[n] = re.sub(r'org.apache.kafka.clients.consumer.(\w+)Assignor',
78+
lambda x: x.group(1).lower(), v)
79+
else:
80+
conf[n] = v
Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
#!/usr/bin/env python
2+
#
3+
# Copyright 2016 Confluent Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import argparse, sys
19+
from confluent_kafka import Consumer, KafkaError, KafkaException
20+
from verifiable_client import VerifiableClient
21+
22+
class VerifiableConsumer(VerifiableClient):
23+
"""
24+
confluent-kafka-python backed VerifiableConsumer class for use with
25+
Kafka's kafkatests client tests.
26+
"""
27+
def __init__ (self, conf):
28+
"""
29+
\p conf is a config dict passed to confluent_kafka.Consumer()
30+
"""
31+
super(VerifiableConsumer, self).__init__(conf)
32+
self.conf['on_commit'] = self.on_commit
33+
self.consumer = Consumer(**conf)
34+
self.consumed_msgs = 0
35+
self.consumed_msgs_last_reported = 0
36+
self.consumed_msgs_at_last_commit = 0
37+
self.use_auto_commit = False
38+
self.use_async_commit = False
39+
self.max_msgs = -1
40+
self.assignment = []
41+
self.assignment_dict = dict()
42+
43+
44+
def find_assignment (self, topic, partition):
45+
""" Find and return existing assignment based on \p topic and \p partition,
46+
or None on miss. """
47+
skey = '%s %d' % (topic, partition)
48+
return self.assignment_dict.get(skey)
49+
50+
51+
def send_records_consumed (self, immediate=False):
52+
""" Send records_consumed, every 100 messages, on timeout,
53+
or if immediate is set. """
54+
if (self.consumed_msgs <= self.consumed_msgs_last_reported +
55+
(0 if immediate else 100)):
56+
return
57+
58+
if len(self.assignment) == 0:
59+
return
60+
61+
d = {'name': 'records_consumed',
62+
'count': self.consumed_msgs - self.consumed_msgs_last_reported,
63+
'partitions': []}
64+
65+
for a in self.assignment:
66+
if a.min_offset == -1:
67+
# Skip partitions that havent had any messages since last time.
68+
# This is to circumvent some minOffset checks in kafkatest.
69+
continue
70+
d['partitions'].append(a.to_dict())
71+
a.min_offset = -1
72+
73+
self.send(d)
74+
self.consumed_msgs_last_reported = self.consumed_msgs
75+
76+
77+
def send_assignment (self, evtype, partitions):
78+
""" Send assignment update, \p evtype is either 'assigned' or 'revoked' """
79+
d = { 'name': 'partitions_' + evtype,
80+
'partitions': [{'topic': x.topic, 'partition': x.partition} for x in partitions]}
81+
self.send(d)
82+
83+
84+
def on_assign (self, consumer, partitions):
85+
""" Rebalance on_assign callback """
86+
old_assignment = self.assignment
87+
self.assignment = [AssignedPartition(p.topic, p.partition) for p in partitions]
88+
# Move over our last seen offsets so that we can report a proper
89+
# minOffset even after a rebalance loop.
90+
for a in old_assignment:
91+
b = self.find_assignment(a.topic, a.partition)
92+
b.min_offset = a.min_offset
93+
94+
self.assignment_dict = {a.skey: a for a in self.assignment}
95+
self.send_assignment('assigned', partitions)
96+
97+
def on_revoke (self, consumer, partitions):
98+
""" Rebalance on_revoke callback """
99+
# Send final consumed records prior to rebalancing to make sure
100+
# latest consumed is in par with what is going to be committed.
101+
self.send_records_consumed(immediate=True)
102+
self.assignment = list()
103+
self.assignment_dict = dict()
104+
self.send_assignment('revoked', partitions)
105+
self.do_commit(immediate=True)
106+
107+
108+
def on_commit (self, err, partitions):
109+
""" Offsets Committed callback """
110+
if err is not None and err.code() == KafkaError._NO_OFFSET:
111+
self.dbg('on_commit(): no offsets to commit')
112+
return
113+
114+
# Report consumed messages to make sure consumed position >= committed position
115+
self.send_records_consumed(immediate=True)
116+
117+
d = {'name': 'offsets_committed',
118+
'offsets': []}
119+
120+
if err is not None:
121+
d['success'] = False
122+
d['error'] = str(err)
123+
else:
124+
d['success'] = True
125+
d['error'] = ''
126+
127+
for p in partitions:
128+
pd = {'topic': p.topic, 'partition': p.partition,
129+
'offset': p.offset, 'error': str(p.error)}
130+
d['offsets'].append(pd)
131+
132+
self.send(d)
133+
134+
135+
def do_commit (self, immediate=False, async=None):
136+
""" Commit every 1000 messages or whenever there is a consume timeout
137+
or immediate. """
138+
if (self.use_auto_commit or
139+
self.consumed_msgs_at_last_commit + (0 if immediate else 1000) >
140+
self.consumed_msgs):
141+
return
142+
143+
# Make sure we report consumption before commit,
144+
# otherwise tests may fail because of commit > consumed
145+
if self.consumed_msgs_at_last_commit < self.consumed_msgs:
146+
self.send_records_consumed(immediate=True)
147+
148+
if async is None:
149+
async_mode = self.use_async_commit
150+
else:
151+
async_mode = async
152+
153+
self.dbg('Committing %d messages (Async=%s)' %
154+
(self.consumed_msgs - self.consumed_msgs_at_last_commit,
155+
async_mode))
156+
157+
try:
158+
self.consumer.commit(async=async_mode)
159+
except KafkaException as e:
160+
if e.args[0].code() == KafkaError._WAIT_COORD:
161+
self.dbg('Ignoring commit failure, still waiting for coordinator')
162+
elif e.args[0].code() == KafkaError._NO_OFFSET:
163+
self.dbg('No offsets to commit')
164+
else:
165+
raise
166+
167+
self.consumed_msgs_at_last_commit = self.consumed_msgs
168+
169+
170+
def msg_consume (self, msg):
171+
""" Handle consumed message (or error event) """
172+
if msg.error():
173+
if msg.error().code() == KafkaError._PARTITION_EOF:
174+
# ignore EOF
175+
pass
176+
else:
177+
self.err('Consume failed: %s' % msg.error(), term=True)
178+
return
179+
180+
if False:
181+
self.dbg('Read msg from %s [%d] @ %d' % \
182+
(msg.topic(), msg.partition(), msg.offset()))
183+
184+
if self.max_msgs >= 0 and self.consumed_msgs >= self.max_msgs:
185+
return # ignore extra messages
186+
187+
# Find assignment.
188+
a = self.find_assignment(msg.topic(), msg.partition())
189+
if a is None:
190+
self.err('Received message on unassigned partition %s [%d] @ %d' %
191+
(msg.topic(), msg.partition(), msg.offset()), term=True)
192+
193+
a.consumed_msgs += 1
194+
if a.min_offset == -1:
195+
a.min_offset = msg.offset()
196+
if a.max_offset < msg.offset():
197+
a.max_offset = msg.offset()
198+
199+
self.consumed_msgs += 1
200+
201+
self.send_records_consumed(immediate=False)
202+
self.do_commit(immediate=False)
203+
204+
205+
class AssignedPartition(object):
206+
""" Local state container for assigned partition. """
207+
def __init__ (self, topic, partition):
208+
super(AssignedPartition, self).__init__()
209+
self.topic = topic
210+
self.partition = partition
211+
self.skey = '%s %d' % (self.topic, self.partition)
212+
self.consumed_msgs = 0
213+
self.min_offset = -1
214+
self.max_offset = 0
215+
216+
def to_dict (self):
217+
""" Return a dict of this partition's state """
218+
return {'topic': self.topic, 'partition': self.partition,
219+
'minOffset': self.min_offset, 'maxOffset': self.max_offset}
220+
221+
222+
223+
224+
225+
226+
227+
228+
229+
230+
231+
if __name__ == '__main__':
232+
233+
parser = argparse.ArgumentParser(description='Verifiable Python Consumer')
234+
parser.add_argument('--topic', action='append', type=str, required=True)
235+
parser.add_argument('--group-id', dest='group.id', required=True)
236+
parser.add_argument('--broker-list', dest='bootstrap.servers', required=True)
237+
parser.add_argument('--session-timeout', type=int, dest='session.timeout.ms', default=6000)
238+
parser.add_argument('--enable-autocommit', action='store_true', dest='enable.auto.commit', default=False)
239+
parser.add_argument('--max-messages', type=int, dest='max_messages', default=-1)
240+
parser.add_argument('--assignment-strategy', dest='partition.assignment.strategy')
241+
parser.add_argument('--reset-policy', dest='topic.auto.offset.reset', default='earliest')
242+
parser.add_argument('--consumer.config', dest='consumer_config')
243+
args = vars(parser.parse_args())
244+
245+
conf = {'broker.version.fallback': '0.9.0',
246+
'default.topic.config': dict()}
247+
248+
VerifiableClient.set_config(conf, args)
249+
250+
vc = VerifiableConsumer(conf)
251+
vc.use_auto_commit = args['enable.auto.commit']
252+
vc.max_msgs = args['max_messages']
253+
254+
vc.dbg('Using config: %s' % conf)
255+
256+
vc.dbg('Subscribing to %s' % args['topic'])
257+
vc.consumer.subscribe(args['topic'],
258+
on_assign=vc.on_assign, on_revoke=vc.on_revoke)
259+
260+
261+
try:
262+
while vc.run:
263+
msg = vc.consumer.poll(timeout=1.0)
264+
if msg is None:
265+
# Timeout.
266+
# Try reporting consumed messages
267+
vc.send_records_consumed(immediate=True)
268+
# Commit every poll() timeout instead of on every message.
269+
# Also commit on every 1000 messages, whichever comes first.
270+
vc.do_commit(immediate=True)
271+
continue
272+
273+
# Handle message (or error event)
274+
vc.msg_consume(msg)
275+
276+
except KeyboardInterrupt:
277+
pass
278+
279+
vc.dbg('Closing consumer')
280+
vc.send_records_consumed(immediate=True)
281+
if not vc.use_auto_commit:
282+
vc.do_commit(immediate=True, async=False)
283+
284+
vc.consumer.close()
285+
286+
vc.send({'name': 'shutdown_complete'})
287+
288+
vc.dbg('All done')

0 commit comments

Comments
 (0)