|
38 | 38 | # Topic to use
|
39 | 39 | topic = 'test'
|
40 | 40 |
|
| 41 | +# API version requests are only implemented in Kafka broker >=0.10 |
| 42 | +# but the client handles failed API version requests gracefully for older |
| 43 | +# versions as well, except for 0.9.0.x which will stall for about 10s |
| 44 | +# on each connect with this set to True. |
| 45 | +api_version_request = True |
| 46 | + |
41 | 47 | # global variable to be set by stats_cb call back function
|
42 | 48 | good_stats_cb_result = False
|
43 | 49 |
|
@@ -85,6 +91,7 @@ def verify_producer():
|
85 | 91 | # Producer config
|
86 | 92 | conf = {'bootstrap.servers': bootstrap_servers,
|
87 | 93 | 'error_cb': error_cb,
|
| 94 | + 'api.version.request': api_version_request, |
88 | 95 | 'default.topic.config':{'produce.offset.report': True}}
|
89 | 96 |
|
90 | 97 | # Create producer
|
@@ -121,6 +128,7 @@ def verify_producer():
|
121 | 128 | def verify_producer_performance(with_dr_cb=True):
|
122 | 129 | """ Time how long it takes to produce and delivery X messages """
|
123 | 130 | conf = {'bootstrap.servers': bootstrap_servers,
|
| 131 | + 'api.version.request': api_version_request, |
124 | 132 | 'error_cb': error_cb}
|
125 | 133 |
|
126 | 134 | p = confluent_kafka.Producer(**conf)
|
@@ -214,7 +222,7 @@ def verify_consumer():
|
214 | 222 | 'group.id': 'test.py',
|
215 | 223 | 'session.timeout.ms': 6000,
|
216 | 224 | 'enable.auto.commit': False,
|
217 |
| - 'api.version.request': True, |
| 225 | + 'api.version.request': api_version_request, |
218 | 226 | 'on_commit': print_commit_result,
|
219 | 227 | 'error_cb': error_cb,
|
220 | 228 | 'default.topic.config': {
|
@@ -247,11 +255,10 @@ def verify_consumer():
|
247 | 255 | print('Consumer error: %s: ignoring' % msg.error())
|
248 | 256 | break
|
249 | 257 |
|
250 |
| - if False: |
251 |
| - tstype, timestamp = msg.timestamp() |
252 |
| - print('%s[%d]@%d: key=%s, value=%s, tstype=%d, timestamp=%s' % \ |
253 |
| - (msg.topic(), msg.partition(), msg.offset(), |
254 |
| - msg.key(), msg.value(), tstype, timestamp)) |
| 258 | + tstype, timestamp = msg.timestamp() |
| 259 | + print('%s[%d]@%d: key=%s, value=%s, tstype=%d, timestamp=%s' % \ |
| 260 | + (msg.topic(), msg.partition(), msg.offset(), |
| 261 | + msg.key(), msg.value(), tstype, timestamp)) |
255 | 262 |
|
256 | 263 | if (msg.offset() % 5) == 0:
|
257 | 264 | # Async commit
|
|
0 commit comments