Skip to content

SIGSEGV, Objects/unicodeobject.c: No such file or directory. randomly on consumer re-connecting to broker #874

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
4 of 7 tasks
danyc opened this issue May 18, 2020 · 4 comments
Assignees
Labels

Comments

@danyc
Copy link

danyc commented May 18, 2020

Description

I have a consumer implemented like this:

`

class KafkaReader:

	def __init__(self, topics: List[str], config: Dict, timeout: float = 1):
		self._consumer = Consumer(config)
		self._consumer.subscribe(topics)
		self._topics = topics
		self._timeout = timeout

	def read(self):
		while True:
			message = self._consumer.poll(timeout=self._timeout)
			if message is None:
				continue
			self._consumer.store_offsets(message)
			if message.error():
				logger.error(
					"Kafka error", topic=message.topic(), error=message.error()
				)
				continue
			yield KafkaMessage(...)

`

It randomly fails with SIGSEGV when trying to re-connect to broker. I've captured the following log using gdb (edited for anonymization). It shows both successful re-connection and failure with segmentation fault.

`

%7|1589814027.729|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "kafka-group-id " is rebalancing in state up (join-state started) with assignment: group is rebalancing
%7|1589814027.729|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka-group-id ": new assignment of 0 partition(s) in join state wait-revoke-rebalance_cb
%7|1589814027.729|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka-group-id ": unassigning 5 partition(s) (v6)
%7|1589814027.729|JOIN|rdkafka#consumer-1| [thrd:main]: 10.61.32.88:9092/1004: Joining group "kafka-group-id " with 1 subscribed topic(s)
%7|1589814028.733|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka-group-id ": new assignment of 5 partition(s) in join state wait-assign-rebalance_cb
%7|1589814028.734|OFFSET|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1004: Fetch committed offsets for 5/5 partition(s)
%7|1589814028.734|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my.topic [0] start fetching at offset 176905
%7|1589814028.734|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my.topic [1] start fetching at offset 177036
%7|1589814028.734|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my.topic [2] start fetching at offset 183135
%7|1589814028.734|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my.topic [3] start fetching at offset 175680
%7|1589814028.734|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my.topic [4] start fetching at offset 177818


%7|1589815172.579|BROKERFAIL|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection reset by peer)
%7|1589815172.579|STATE|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Broker changed state UP -> DOWN
%7|1589815172.579|REQERR|rdkafka#consumer-1| [thrd:main]: broker.test.com:9092/bootstrap: MetadataRequest failed: Local: Broker transport failure: actions Retry
%7|1589815172.579|STATE|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Broker changed state DOWN -> INIT
%7|1589815172.679|STATE|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1589815172.679|RETRY|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Moved 1 retry buffer(s) to output queue
%7|1589815173.579|CONNECT|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: broker in state TRY_CONNECT connecting
%7|1589815173.579|STATE|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1589815173.604|CONNECT|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Connecting to ipv4#127.0.0.1:9092 (plaintext) with socket 9
%7|1589815173.605|CONNECT|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Connected to ipv4#127.0.0.1:9092
%7|1589815173.605|CONNECTED|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Connected (#2)
%7|1589815173.605|STATE|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1589815173.606|PROTOERR|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Protocol parse failure for ApiVersion v3 at 3/6 (rd_kafka_handle_ApiVersion:1911) (incorrect broker.version.fallback?)
%7|1589815173.606|PROTOERR|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: ApiArrayCnt -1 out of range
%7|1589815173.606|APIVERSION|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: ApiVersionRequest v3 failed due to UNSUPPORTED_VERSION: retrying with v0
%7|1589815173.606|STATE|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Broker changed state APIVERSION_QUERY - > UP
%7|1589815776.829|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "kafka-group-id " is rebalancing in state up (join-state started) with assignment: group is rebalancing
%7|1589815776.829|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka-group-id ": new assignment of 0 partition(s) in join state wait-revoke-rebalance_cb
%7|1589815776.829|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka-group-id ": unassigning 5 partition(s) (v11)
%7|1589815776.829|JOIN|rdkafka#consumer-1| [thrd:main]: 10.61.32.88:9092/1004: Joining group "kafka-group-id " with 1 subscribed topic(s)
%7|1589815776.831|ASSIGNOR|rdkafka#consumer-1| [thrd:main]: Group "kafka-group-id ": "range" assignor run for 3 member(s)
%7|1589815776.833|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka-group-id ": new assignment of 5 partition(s) in join state wait-assign-rebalance_cb
%7|1589815776.833|OFFSET|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1004: Fetch committed offsets for 5/5 partition(s)
%7|1589815776.834|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my.topic [0] start fetching at offset 176905
%7|1589815776.834|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my.topic [1] start fetching at offset 177036
%7|1589815776.834|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my.topic [2] start fetching at offset 183135
%7|1589815776.834|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my.topic [3] start fetching at offset 175680
%7|1589815776.834|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my.topic [4] start fetching at offset 177818


%7|1589816373.600|BROKERFAIL|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection reset by peer)
%7|1589816373.600|STATE|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Broker changed state UP -> DOWN
%7|1589816373.600|STATE|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Broker changed state DOWN -> INIT
3818 Objects/unicodeobject.c: No such file or directory.
Thread 1 "python" received signal SIGSEGV, Segmentation fault.
0x00007ffff7dc441a in PyUnicode_AsUTF8AndSize (unicode=0x0, psize=0x0)
at Objects/unicodeobject.c:3818
(gdb) quit

`

Is there something that I have to configure additionally to avoid this failure?

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
    "confluent_kafka.version(): ('1.4.0', 17039360)"
    "confluent_kafka.libversion(): ('1.4.0', 17039615)"
  • Apache Kafka broker version:
  • Client configuration:
    `

{
"bootstrap.servers": ...,
"group.id": ...,
"enable.auto.offset.store": False,
"auto.offset.reset": "latest",
"security.protocol": ...,
"debug": "consumer,broker",
}

`

  • Operating system:
    PRETTY_NAME="Debian GNU/Linux 10 (buster)"
    NAME="Debian GNU/Linux"
    VERSION_ID="10"
    VERSION="10 (buster)"
    VERSION_CODENAME=buster
    ID=debian
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

Interesting, can you provide the full backtrace from gdb with bt full ?

@danyc
Copy link
Author

danyc commented May 19, 2020

Hello,

Did some changes in the meantime:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):

"confluent_kafka.version(): ('1.4.1', 17039616)"
"confluent_kafka.libversion(): ('1.4.0', 17039615)"

  • Client configuration:

`

{
"bootstrap.servers": ...,
"group.id": ...,
"enable.auto.offset.store": False,
"auto.offset.reset": "latest",
"security.protocol": ...,
"log.connection.close": True,
"socket.keepalive.enable": True,
"debug": "generic,broker,consumer",
"logger": structlog.get_logger(name),
}
`


bt full:
`
Thread 1 "python" received signal SIGSEGV, Segmentation fault.
0x00007ffff7dc441a in PyUnicode_AsUTF8AndSize (unicode=0x0, psize=0x0) at Objects/unicodeobject.c:3818
3818 Objects/unicodeobject.c: No such file or directory.

#0 0x00007ffff7dc441a in PyUnicode_AsUTF8AndSize (unicode=0x0, psize=0x0) at Objects/unicodeobject.c:3818
#1 0x00007ffff6ff54cf in cfl_PyUnistr_AsUTF8 (uobjp=, o=) at /tmp/pip-req-build-a2556ujq/confluent_kafka/src/confluent_kafka.h:135
#2 Consumer_store_offsets (self=0x7ffff21cc050, args=, kwargs=) at /tmp/pip-req-build-a2556ujq/confluent_kafka/src/Consumer.c:551
#3 0x00007ffff7d7d69c in _PyMethodDef_RawFastCallKeywords (method=0x7ffff7205de0 <Consumer_methods+224>, self=self@entry=0x7ffff21cc050, args=args@entry=0x7fffd8003018, nargs=nargs@entry=1, kwnames=kwnames@entry=0x0) at Objects/call.c:694
#4 0x00007ffff7d830ed in _PyMethodDescr_FastCallKeywords (descrobj=0x7ffff7439d70, args=args@entry=0x7fffd8003010, nargs=nargs@entry=2, kwnames=kwnames@entry=0x0) at Objects/descrobject.c:288
#5 0x00007ffff7df2955 in call_function (kwnames=0x0, oparg=2, pp_stack=) at Python/ceval.c:4593
#6 _PyEval_EvalFrameDefault (f=, throwflag=) at Python/ceval.c:3110
#7 0x00007ffff7d84340 in gen_send_ex (closing=0, exc=0, arg=0x0, gen=0x7ffff248de50) at Objects/genobject.c:221
#8 gen_iternext (gen=0x7ffff248de50) at Objects/genobject.c:542
#9 0x00007ffff7def15b in _PyEval_EvalFrameDefault (f=, throwflag=) at Python/ceval.c:2809
#10 0x00007ffff7d7e58a in function_code_fastcall (globals=, nargs=1, args=, co=) at Objects/call.c:283
#11 _PyFunction_FastCallKeywords (func=, stack=0x7ffff2dd7b80, nargs=1, kwnames=) at Objects/call.c:408
#12 0x00007ffff7deed62 in call_function (kwnames=0x0, oparg=, pp_stack=) at Python/ceval.c:4616
#13 _PyEval_EvalFrameDefault (f=, throwflag=) at Python/ceval.c:3110
#14 0x00007ffff7d7e58a in function_code_fastcall (globals=, nargs=0, args=, co=) at Objects/call.c:283
#15 _PyFunction_FastCallKeywords (func=, stack=0x5555555d7d80, nargs=0, kwnames=) at Objects/call.c:408
#16 0x00007ffff7deef81 in call_function (kwnames=0x0, oparg=, pp_stack=) at Python/ceval.c:4616
#17 _PyEval_EvalFrameDefault (f=, throwflag=) at Python/ceval.c:3124
#18 0x00007ffff7dedbf1 in _PyEval_EvalCodeWithName (_co=, globals=, locals=, args=, argcount=, kwnames=0x0, kwargs=0x0, kwcount=, kwstep=2, defs=0x0, defcount=0, kwdefs=0x0, closure=0x0, name=0x0, qualname=0x0) at Python/ceval.c:3930
#19 0x00007ffff7ded8f9 in PyEval_EvalCodeEx (_co=, globals=, locals=, args=, argcount=, kws=, kwcount=0, defs=0x0, defcount=0, kwdefs=0x0, closure=0x0) at Python/ceval.c:3959
#20 0x00007ffff7ded8bb in PyEval_EvalCode (co=co@entry=0x7ffff75616f0, globals=globals@entry=0x7ffff7688280, locals=locals@entry=0x7ffff7688280) at Python/ceval.c:524
#21 0x00007ffff7e718de in run_mod (mod=mod@entry=0x555555639000, filename=filename@entry=0x7ffff757e210, globals=globals@entry=0x7ffff7688280, locals=locals@entry=0x7ffff7688280, flags=flags@entry=0x7fffffffcfcc, arena=arena@entry=0x7ffff7698390) at Python/pythonrun.c:1035
#22 0x00007ffff7e70c97 in PyRun_FileExFlags (fp=0x5555555e77b0, filename_str=, start=, globals=0x7ffff7688280, locals=0x7ffff7688280, closeit=1, flags=0x7fffffffcfcc) at Python/pythonrun.c:988
#23 0x00007ffff7e70abe in PyRun_SimpleFileExFlags (fp=0x5555555e77b0, filename=, closeit=1, flags=0x7fffffffcfcc) at Python/pythonrun.c:429
#24 0x00007ffff7e76068 in pymain_run_file (p_cf=0x7fffffffcfcc, filename=0x55555555c130 L"kafka_importer_worker/worker.py", fp=0x5555555e77b0) at Modules/main.c:428
#25 pymain_run_filename (cf=0x7fffffffcfcc, pymain=0x7fffffffd0a0) at Modules/main.c:1607
#26 pymain_run_python (pymain=0x7fffffffd0a0) at Modules/main.c:2868
#27 pymain_main (pymain=0x7fffffffd0a0) at Modules/main.c:3029
#28 0x00007ffff7e75d5e in _Py_UnixMain (argc=, argv=) at Modules/main.c:3064
#29 0x00007ffff78df09b in __libc_start_main () from /lib/x86_64-linux-gnu/libc.so.6
#30 0x000055555555508a in _start () `

@edenhill
Copy link
Contributor

This crash is a bug in the consumer code that we will fix, but it is triggered by trying to store_offsets() of a message object that represents a consumer error.

consumer.poll() may return three things:

  • None - on timeout
  • msg.error() is not None - a consumer error occurred. These are generally informational and there isn't much the applicaiton needs to do other than to perhaps log it.
  • msg.error() is None - a proper message. These are the only messages that may be committed/offset_stored.

So you will need to modify your consume loop to accomodate the message errors and in that case not do processing or offset stores.

@danyc
Copy link
Author

danyc commented May 19, 2020

Thank you for the fast support!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants