Skip to content

Commit 6245533

Browse files
committed
integration_test.py: require broker to be configured, allow topic name conf
1 parent 24fbc52 commit 6245533

File tree

2 files changed

+26
-19
lines changed

2 files changed

+26
-19
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ Tests
115115

116116
**Run integration tests:**
117117

118-
$ examples/integration_test.py <kafka-broker>
118+
$ examples/integration_test.py <kafka-broker> [<test-topic>]
119119

120120
**WARNING**: These tests require an active Kafka cluster and will make use of a topic named 'test'.
121121

examples/integration_test.py

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@
3333
with_progress = False
3434

3535
# Kafka bootstrap server(s)
36-
bootstrap_servers = 'localhost'
36+
bootstrap_servers = None
3737

38+
# Topic to use
39+
topic = 'test'
3840

3941
# global variable to be set by stats_cb call back function
4042
good_stats_cb_result = False
@@ -90,22 +92,22 @@ def verify_producer():
9092
print('producer at %s' % p)
9193

9294
# Produce some messages
93-
p.produce('test', 'Hello Python!')
94-
p.produce('test', key='Just a key')
95-
p.produce('test', partition=1, value='Strictly for partition 1',
95+
p.produce(topic, 'Hello Python!')
96+
p.produce(topic, key='Just a key')
97+
p.produce(topic, partition=1, value='Strictly for partition 1',
9698
key='mykey')
9799

98100
# Produce more messages, now with delivery report callbacks in various forms.
99101
mydr = MyTestDr()
100-
p.produce('test', value='This one has a dr callback',
102+
p.produce(topic, value='This one has a dr callback',
101103
callback=mydr.delivery)
102-
p.produce('test', value='This one has a lambda',
104+
p.produce(topic, value='This one has a lambda',
103105
callback=lambda err, msg: MyTestDr._delivery(err, msg))
104-
p.produce('test', value='This one has neither')
106+
p.produce(topic, value='This one has neither')
105107

106108
# Produce even more messages
107109
for i in range(0, 10):
108-
p.produce('test', value='Message #%d' % i, key=str(i),
110+
p.produce(topic, value='Message #%d' % i, key=str(i),
109111
callback=mydr.delivery)
110112
p.poll(0)
111113

@@ -123,7 +125,6 @@ def verify_producer_performance(with_dr_cb=True):
123125

124126
p = confluent_kafka.Producer(**conf)
125127

126-
topic = 'test'
127128
msgcnt = 1000000
128129
msgsize = 100
129130
msg_pattern = 'test.py performance'
@@ -144,9 +145,9 @@ def verify_producer_performance(with_dr_cb=True):
144145
for i in range(0, msgcnt):
145146
try:
146147
if with_dr_cb:
147-
p.produce('test', value=msg_payload, callback=dr.delivery)
148+
p.produce(topic, value=msg_payload, callback=dr.delivery)
148149
else:
149-
p.produce('test', value=msg_payload)
150+
p.produce(topic, value=msg_payload)
150151
except BufferError as e:
151152
# Local queue is full (slow broker connection?)
152153
msgs_backpressure += 1
@@ -224,7 +225,7 @@ def verify_consumer():
224225
c = confluent_kafka.Consumer(**conf)
225226

226227
# Subscribe to a list of topics
227-
c.subscribe(["test"])
228+
c.subscribe([topic])
228229

229230
max_msgcnt = 100
230231
msgcnt = 0
@@ -270,7 +271,7 @@ def verify_consumer():
270271

271272
# Start a new client and get the committed offsets
272273
c = confluent_kafka.Consumer(**conf)
273-
offsets = c.committed(list(map(lambda p: confluent_kafka.TopicPartition("test", p), range(0,3))))
274+
offsets = c.committed(list(map(lambda p: confluent_kafka.TopicPartition(topic, p), range(0,3))))
274275
for tp in offsets:
275276
print(tp)
276277

@@ -304,7 +305,7 @@ def my_on_revoke (consumer, partitions):
304305
print(' %s [%d] @ %d' % (p.topic, p.partition, p.offset))
305306
consumer.unassign()
306307

307-
c.subscribe(["test"], on_assign=my_on_assign, on_revoke=my_on_revoke)
308+
c.subscribe([topic], on_assign=my_on_assign, on_revoke=my_on_revoke)
308309

309310
max_msgcnt = 1000000
310311
bytecnt = 0
@@ -364,10 +365,11 @@ def verify_stats_cb():
364365
def stats_cb(stats_json_str):
365366
global good_stats_cb_result
366367
stats_json = json.loads(stats_json_str)
367-
if 'test' in stats_json['topics']:
368-
app_offset = stats_json['topics']['test']['partitions']['0']['app_offset']
368+
if topic in stats_json['topics']:
369+
app_offset = stats_json['topics'][topic]['partitions']['0']['app_offset']
369370
if app_offset > 0:
370-
print("# app_offset stats for topic test partition 0: %d" % app_offset)
371+
print("# app_offset stats for topic %s partition 0: %d" % \
372+
(topic, app_offset))
371373
good_stats_cb_result = True
372374

373375
conf = {'bootstrap.servers': bootstrap_servers,
@@ -381,7 +383,7 @@ def stats_cb(stats_json_str):
381383
}}
382384

383385
c = confluent_kafka.Consumer(**conf)
384-
c.subscribe(["test"])
386+
c.subscribe([topic])
385387

386388
max_msgcnt = 1000000
387389
bytecnt = 0
@@ -439,6 +441,11 @@ def stats_cb(stats_json_str):
439441

440442
if len(sys.argv) > 1:
441443
bootstrap_servers = sys.argv[1]
444+
if len(sys.argv) > 2:
445+
topic = sys.argv[2]
446+
else:
447+
print('Usage: %s <broker> [<topic>]' % sys.argv[0])
448+
sys.exit(1)
442449

443450
print('Using confluent_kafka module version %s (0x%x)' % confluent_kafka.version())
444451
print('Using librdkafka version %s (0x%x)' % confluent_kafka.libversion())

0 commit comments

Comments
 (0)