19
19
20
20
""" Test script for confluent_kafka module """
21
21
22
- import confluent_kafka
22
+ import mapr_streams_python
23
23
import re
24
24
import time
25
25
import uuid
@@ -83,7 +83,7 @@ def verify_producer():
83
83
'default.topic.config' :{'produce.offset.report' : True }}
84
84
85
85
# Create producer
86
- p = confluent_kafka .Producer (** conf )
86
+ p = mapr_streams_python .Producer (** conf )
87
87
print ('producer at %s' % p )
88
88
89
89
# Produce some messages
@@ -117,7 +117,7 @@ def verify_producer_performance(with_dr_cb=True):
117
117
""" Time how long it takes to produce and delivery X messages """
118
118
conf = {'error_cb' : error_cb }
119
119
120
- p = confluent_kafka .Producer (** conf )
120
+ p = mapr_streams_python .Producer (** conf )
121
121
122
122
topic = '/test_stream:topic3'
123
123
msgcnt = 1000000
@@ -216,7 +216,7 @@ def verify_consumer():
216
216
}}
217
217
218
218
# Create consumer
219
- c = confluent_kafka .Consumer (** conf )
219
+ c = mapr_streams_python .Consumer (** conf )
220
220
221
221
# Subscribe to a list of topics
222
222
c .subscribe (["test" ])
@@ -233,7 +233,7 @@ def verify_consumer():
233
233
raise Exception ('Got timeout from poll() without a timeout set: %s' % msg )
234
234
235
235
if msg .error ():
236
- if msg .error ().code () == confluent_kafka .KafkaError ._PARTITION_EOF :
236
+ if msg .error ().code () == mapr_streams_python .KafkaError ._PARTITION_EOF :
237
237
print ('Reached end of %s [%d] at offset %d' % \
238
238
(msg .topic (), msg .partition (), msg .offset ()))
239
239
break
@@ -263,8 +263,8 @@ def verify_consumer():
263
263
264
264
265
265
# Start a new client and get the committed offsets
266
- c = confluent_kafka .Consumer (** conf )
267
- offsets = c .committed (list (map (lambda p : confluent_kafka .TopicPartition ("test" , p ), range (0 ,3 ))))
266
+ c = mapr_streams_python .Consumer (** conf )
267
+ offsets = c .committed (list (map (lambda p : mapr_streams_python .TopicPartition ("test" , p ), range (0 , 3 ))))
268
268
for tp in offsets :
269
269
print (tp )
270
270
@@ -283,7 +283,7 @@ def verify_consumer_performance():
283
283
'auto.offset.reset' : 'earliest'
284
284
}}
285
285
286
- c = confluent_kafka .Consumer (** conf )
286
+ c = mapr_streams_python .Consumer (** conf )
287
287
288
288
def my_on_assign (consumer , partitions ):
289
289
print ('on_assign:' , len (partitions ), 'partitions:' )
@@ -318,11 +318,11 @@ def my_on_revoke (consumer, partitions):
318
318
(msgcnt , max_msgcnt ))
319
319
320
320
if msg .error ():
321
- if msg .error ().code () == confluent_kafka .KafkaError ._PARTITION_EOF :
321
+ if msg .error ().code () == mapr_streams_python .KafkaError ._PARTITION_EOF :
322
322
# Reached EOF for a partition, ignore.
323
323
continue
324
324
else :
325
- raise confluent_kafka .KafkaException (msg .error ())
325
+ raise mapr_streams_python .KafkaException (msg .error ())
326
326
327
327
328
328
bytecnt += len (msg )
@@ -355,8 +355,8 @@ def my_on_revoke (consumer, partitions):
355
355
if len (sys .argv ) > 1 :
356
356
bootstrap_servers = sys .argv [1 ]
357
357
358
- print ('Using confluent_kafka module version %s (0x%x)' % confluent_kafka .version ())
359
- print ('Using librdkafka version %s (0x%x)' % confluent_kafka .libversion ())
358
+ print ('Using confluent_kafka module version %s (0x%x)' % mapr_streams_python .version ())
359
+ print ('Using librdkafka version %s (0x%x)' % mapr_streams_python .libversion ())
360
360
361
361
print ('=' * 30 , 'Verifying Producer performance (with dr_cb)' , '=' * 30 )
362
362
verify_producer_performance (with_dr_cb = True )
0 commit comments