Skip to content

Commit 4b0f968

Browse files
authored
Merge pull request confluentinc#4 from mapr/mapr-25766
Mapr 25766
2 parents 3d3bfed + 41ec172 commit 4b0f968

11 files changed

+409
-48
lines changed

confluent_kafka/kafkatest/verifiable_consumer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ def to_dict (self):
233233
parser = argparse.ArgumentParser(description='Verifiable Python Consumer')
234234
parser.add_argument('--topic', action='append', type=str, required=True)
235235
parser.add_argument('--group-id', dest='group.id', required=True)
236-
parser.add_argument('--broker-list', dest='bootstrap.servers', required=True)
236+
parser.add_argument('--broker-list', dest='bootstrap.servers', required=False)
237237
parser.add_argument('--session-timeout', type=int, dest='session.timeout.ms', default=6000)
238238
parser.add_argument('--enable-autocommit', action='store_true', dest='enable.auto.commit', default=False)
239239
parser.add_argument('--max-messages', type=int, dest='max_messages', default=-1)

confluent_kafka/kafkatest/verifiable_producer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def dr_cb (self, err, msg):
6666
parser = argparse.ArgumentParser(description='Verifiable Python Producer')
6767
parser.add_argument('--topic', type=str, required=True)
6868
parser.add_argument('--throughput', type=int, default=0)
69-
parser.add_argument('--broker-list', dest='bootstrap.servers', required=True)
69+
parser.add_argument('--broker-list', dest='bootstrap.servers', required=False)
7070
parser.add_argument('--max-messages', type=int, dest='max_msgs', default=1000000) # avoid infinite
7171
parser.add_argument('--value-prefix', dest='value_prefix', type=str, default=None)
7272
parser.add_argument('--acks', type=int, dest='topic.request.required.acks', default=-1)

examples/consumer.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,17 @@
2323
import sys
2424

2525
if __name__ == '__main__':
26-
if len(sys.argv) < 4:
27-
sys.stderr.write('Usage: %s <bootstrap-brokers> <group> <topic1> <topic2> ..\n' % sys.argv[0])
26+
if len(sys.argv) < 3:
27+
sys.stderr.write('Usage: %s <group> <topic1> [<topic2> [..]]\n' % sys.argv[0])
2828
sys.exit(1)
2929

30-
broker = sys.argv[1]
31-
group = sys.argv[2]
32-
topics = sys.argv[3:]
30+
group = sys.argv[1]
31+
topics = sys.argv[2:]
3332

3433
# Consumer configuration
3534
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
36-
conf = {'bootstrap.servers': broker, 'group.id': group, 'session.timeout.ms': 6000,
37-
'default.topic.config': {'auto.offset.reset': 'smallest'}}
35+
conf = {'group.id': group, 'session.timeout.ms': 6000,
36+
'default.topic.config': {'auto.offset.reset': 'earliest'}}
3837

3938

4039
# Create Consumer instance

examples/integration_test.py

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,11 @@ def verify_producer():
115115

116116
def verify_producer_performance(with_dr_cb=True):
117117
""" Time how long it takes to produce and delivery X messages """
118-
conf = {'bootstrap.servers': bootstrap_servers,
119-
'error_cb': error_cb}
118+
conf = {'error_cb': error_cb}
120119

121120
p = confluent_kafka.Producer(**conf)
122121

123-
topic = 'test'
122+
topic = '/test_stream:topic3'
124123
msgcnt = 1000000
125124
msgsize = 100
126125
msg_pattern = 'test.py performance'
@@ -141,9 +140,9 @@ def verify_producer_performance(with_dr_cb=True):
141140
for i in range(0, msgcnt):
142141
try:
143142
if with_dr_cb:
144-
p.produce('test', value=msg_payload, callback=dr.delivery)
143+
p.produce(topic, value=msg_payload, callback=dr.delivery)
145144
else:
146-
p.produce('test', value=msg_payload)
145+
p.produce(topic, value=msg_payload)
147146
except BufferError as e:
148147
# Local queue is full (slow broker connection?)
149148
msgs_backpressure += 1
@@ -277,8 +276,7 @@ def verify_consumer():
277276
def verify_consumer_performance():
278277
""" Verify Consumer performance """
279278

280-
conf = {'bootstrap.servers': bootstrap_servers,
281-
'group.id': uuid.uuid1(),
279+
conf = {'group.id': uuid.uuid1(),
282280
'session.timeout.ms': 6000,
283281
'error_cb': error_cb,
284282
'default.topic.config': {
@@ -291,15 +289,13 @@ def my_on_assign (consumer, partitions):
291289
print('on_assign:', len(partitions), 'partitions:')
292290
for p in partitions:
293291
print(' %s [%d] @ %d' % (p.topic, p.partition, p.offset))
294-
consumer.assign(partitions)
295292

296293
def my_on_revoke (consumer, partitions):
297294
print('on_revoke:', len(partitions), 'partitions:')
298295
for p in partitions:
299296
print(' %s [%d] @ %d' % (p.topic, p.partition, p.offset))
300-
consumer.unassign()
301297

302-
c.subscribe(["test"], on_assign=my_on_assign, on_revoke=my_on_revoke)
298+
c.subscribe(["/test_stream:topic3"], on_assign=my_on_assign, on_revoke=my_on_revoke)
303299

304300
max_msgcnt = 1000000
305301
bytecnt = 0
@@ -362,12 +358,6 @@ def my_on_revoke (consumer, partitions):
362358
print('Using confluent_kafka module version %s (0x%x)' % confluent_kafka.version())
363359
print('Using librdkafka version %s (0x%x)' % confluent_kafka.libversion())
364360

365-
print('=' * 30, 'Verifying Producer', '=' * 30)
366-
verify_producer()
367-
368-
print('=' * 30, 'Verifying Consumer', '=' * 30)
369-
verify_consumer()
370-
371361
print('=' * 30, 'Verifying Producer performance (with dr_cb)', '=' * 30)
372362
verify_producer_performance(with_dr_cb=True)
373363

examples/producer.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,16 @@
2424
import sys
2525

2626
if __name__ == '__main__':
27-
if len(sys.argv) != 3:
28-
sys.stderr.write('Usage: %s <bootstrap-brokers> <topic>\n' % sys.argv[0])
27+
if len(sys.argv) != 2:
28+
sys.stderr.write('Usage: %s <topic>\n' % sys.argv[0])
2929
sys.exit(1)
3030

31-
broker = sys.argv[1]
32-
topic = sys.argv[2]
31+
topic = sys.argv[1]
3332

3433
# Producer configuration
3534
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
36-
conf = {'bootstrap.servers': broker}
35+
# Remove bootstrap.servers config
36+
conf = {}
3737

3838
# Create Producer instance
3939
p = Producer(**conf)
@@ -49,11 +49,23 @@ def delivery_callback (err, msg):
4949
(msg.topic(), msg.partition()))
5050

5151
# Read lines from stdin, produce each line to Kafka
52-
for line in sys.stdin:
52+
# Empty lines are ignored
53+
while True:
54+
try:
55+
line = sys.stdin.readline()
56+
except KeyboardInterrupt:
57+
# CTRL + C
58+
break
59+
60+
if not line:
61+
break
62+
63+
if line == '\n':
64+
continue
65+
5366
try:
5467
# Produce line (without newline)
5568
p.produce(topic, line.rstrip(), callback=delivery_callback)
56-
5769
except BufferError as e:
5870
sys.stderr.write('%% Local producer queue is full ' \
5971
'(%d messages awaiting delivery): try again\n' %

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
'confluent_kafka/src/Producer.c',
1111
'confluent_kafka/src/Consumer.c'])
1212

13-
setup(name='confluent-kafka',
13+
setup(name='mapr-streams-python',
1414
version='0.9.2',
1515
description='Confluent\'s Apache Kafka client for Python',
1616
author='Confluent Inc',

tests/test_Consumer.py

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,28 @@
11
#!/usr/bin/env python
22

33
from confluent_kafka import Consumer, TopicPartition, KafkaError, KafkaException
4+
import pytest
5+
import subprocess
6+
import utils as u
7+
8+
@pytest.fixture(scope="module", autouse=True)
9+
def create_stream(request):
10+
u.new_stream('/stream', checked=True)
11+
print("stream created")
12+
def delete_stream():
13+
u.delete_stream('/stream', checked=True)
14+
print("stream deleted")
15+
request.addfinalizer(delete_stream)
16+
17+
18+
@pytest.fixture(autouse=True)
19+
def resource_setup(request):
20+
u.create_topic('/stream', 'topic1')
21+
print("resource_setup")
22+
def resource_teardown():
23+
u.delete_topic('/stream', 'topic1')
24+
print("resource_teardown")
25+
request.addfinalizer(resource_teardown)
426

527

628
def test_basic_api():
@@ -19,13 +41,13 @@ def dummy_commit_cb (err, partitions):
1941
'session.timeout.ms': 1000, # Avoid close() blocking too long
2042
'on_commit': dummy_commit_cb})
2143

22-
kc.subscribe(["test"])
44+
kc.subscribe(["/stream:topic1"])
2345
kc.unsubscribe()
2446

2547
def dummy_assign_revoke (consumer, partitions):
2648
pass
2749

28-
kc.subscribe(["test"], on_assign=dummy_assign_revoke, on_revoke=dummy_assign_revoke)
50+
kc.subscribe(["/stream:topic1"], on_assign=dummy_assign_revoke, on_revoke=dummy_assign_revoke)
2951
kc.unsubscribe()
3052

3153
msg = kc.poll(timeout=0.001)
@@ -36,10 +58,9 @@ def dummy_assign_revoke (consumer, partitions):
3658
else:
3759
print('OK: consumed message')
3860

39-
partitions = list(map(lambda p: TopicPartition("test", p), range(0,100,3)))
61+
partitions = list(map(lambda p: TopicPartition("/stream:topic1", p), range(0,100,3)))
4062
kc.assign(partitions)
4163

42-
kc.unassign()
4364

4465
kc.commit(async=True)
4566

@@ -49,7 +70,12 @@ def dummy_assign_revoke (consumer, partitions):
4970
assert e.args[0].code() in (KafkaError._TIMED_OUT, KafkaError._NO_OFFSET)
5071

5172
# Get current position, should all be invalid.
52-
kc.position(partitions)
73+
try:
74+
kc.position(partitions)
75+
except KafkaException as e:
76+
print e
77+
assert e.args[0].code() == KafkaError.TOPIC_EXCEPTION
78+
5379
assert len([p for p in partitions if p.offset == -1001]) == len(partitions)
5480

5581
try:

tests/test_Producer.py

Lines changed: 67 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,32 @@
11
#!/usr/bin/env python
22

3-
from confluent_kafka import Producer, KafkaError, KafkaException
3+
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException, TopicPartition
4+
import mock
5+
import pytest
6+
import subprocess
7+
import utils as u
48

9+
@pytest.fixture(scope="module", autouse=True)
10+
def create_stream(request):
11+
u.new_stream('/test_stream', checked=True)
12+
print("stream created")
13+
def delete_stream():
14+
u.delete_stream('/test_stream', checked=True)
15+
print("stream deleted")
16+
request.addfinalizer(delete_stream)
17+
18+
19+
@pytest.fixture(autouse=True)
20+
def resource_setup(request):
21+
u.create_topic('/test_stream', 'topic1')
22+
print("resource_setup")
23+
def resource_teardown():
24+
u.delete_topic('/test_stream', 'topic1', checked=True)
25+
print("resource_teardown")
26+
request.addfinalizer(resource_teardown)
527

6-
def test_basic_api():
7-
""" Basic API tests, these wont really do anything since there is no
8-
broker configured. """
928

29+
def test_basic_api():
1030
try:
1131
p = Producer()
1232
except TypeError as e:
@@ -18,21 +38,58 @@ def error_cb (err):
1838

1939
p = Producer({'socket.timeout.ms':10,
2040
'error_cb': error_cb,
21-
'default.topic.config': {'message.timeout.ms': 10}})
41+
'default.topic.config': {'message.timeout.ms': 10, 'auto.offset.reset': 'earliest'}})
42+
43+
p.produce('/test_stream:topic1')
44+
p.produce('/test_stream:topic1', value='somedata', key='a key')
45+
try:
46+
p.produce(None)
47+
except TypeError as e:
48+
assert str(e) == "argument 1 must be string, not None"
2249

23-
p.produce('mytopic')
24-
p.produce('mytopic', value='somedata', key='a key')
2550

2651
def on_delivery(err,msg):
2752
print('delivery', str)
28-
# Since there is no broker, produced messages should time out.
29-
assert err.code() == KafkaError._MSG_TIMED_OUT
3053

31-
p.produce(topic='another_topic', value='testing', partition=9,
54+
p.produce(topic='/test_stream:topic1', value='testing', partition=9,
3255
callback=on_delivery)
3356

3457
p.poll(0.001)
3558

3659
p.flush()
3760

3861

62+
def test_producer_on_delivery():
63+
p = Producer({'socket.timeout.ms':10,
64+
'default.topic.config': {'message.timeout.ms': 10, 'auto.offset.reset': 'earliest'}})
65+
on_delivery = mock.Mock()
66+
p.produce(topic='/test_stream:topic1', value='testing', partition=0,
67+
callback=on_delivery)
68+
p.flush()
69+
assert on_delivery.called
70+
71+
72+
def test_producer_partition():
73+
p = Producer({'socket.timeout.ms':10,
74+
'default.topic.config': {'message.timeout.ms': 10, 'auto.offset.reset': 'earliest'}})
75+
p.produce(topic='/test_stream:topic2', value='testing', partition=0)
76+
p.poll(1)
77+
kc = Consumer({'group.id':'test', 'socket.timeout.ms':'100','enable.auto.commit': False,
78+
'session.timeout.ms': 1000, 'default.topic.config':{'auto.offset.reset': 'earliest'}})
79+
kc.assign([TopicPartition("/test_stream:topic2", 0)])
80+
msg = kc.poll()
81+
assert msg.value() == "testing"
82+
kc.close()
83+
84+
85+
def test_producer_default_stream():
86+
p = Producer({'socket.timeout.ms':10, 'streams.producer.default.stream': '/test_stream',
87+
'default.topic.config': {'message.timeout.ms': 10, 'auto.offset.reset': 'earliest'}})
88+
p.produce(topic='topic1', value='TestDefaultStream')
89+
p.poll(1)
90+
kc = Consumer({'group.id':'test', 'socket.timeout.ms':'100','enable.auto.commit': False,
91+
'session.timeout.ms': 1000, 'default.topic.config':{'auto.offset.reset': 'earliest'}})
92+
kc.assign([TopicPartition("/test_stream:topic1", 0)])
93+
msg = kc.poll()
94+
assert msg.value() == "TestDefaultStream"
95+
kc.close()

0 commit comments

Comments
 (0)