@@ -624,17 +624,15 @@ def on_response(response):
624624 ``open()``ed yet.
625625 on_response (Callable[[protobuf.Message], None]): The callback to
626626 be called for every response on the stream.
627- reraise_exceptions (bool): Whether to reraise exceptions during
628- the lifetime of the consumer, generally those that are not
629- handled by `BidiRpc`'s `should_recover` or `should_terminate`.
630- Default `False`.
627+ on_fatal_exception (Callable[[Exception], None]): The callback to
628+ be called on fatal errors during consumption. Default None.
631629 """
632630
633- def __init__ (self , bidi_rpc , on_response , reraise_exceptions = False ):
631+ def __init__ (self , bidi_rpc , on_response , on_fatal_exception = None ):
634632 self ._bidi_rpc = bidi_rpc
635633 self ._on_response = on_response
636634 self ._paused = False
637- self ._reraise_exceptions = reraise_exceptions
635+ self ._on_fatal_exception = on_fatal_exception
638636 self ._wake = threading .Condition ()
639637 self ._thread = None
640638 self ._operational_lock = threading .Lock ()
@@ -681,17 +679,17 @@ def _thread_main(self, ready):
681679 exc ,
682680 exc_info = True ,
683681 )
684- if self ._reraise_exceptions :
685- raise
682+ if self ._on_fatal_exception is not None :
683+ self . _on_fatal_exception ( exc )
686684
687685 except Exception as exc :
688686 _LOGGER .exception (
689687 "%s caught unexpected exception %s and will exit." ,
690688 _BIDIRECTIONAL_CONSUMER_NAME ,
691689 exc ,
692690 )
693- if self ._reraise_exceptions :
694- raise
691+ if self ._on_fatal_exception is not None :
692+ self . _on_fatal_exception ( exc )
695693
696694 _LOGGER .info ("%s exiting" , _BIDIRECTIONAL_CONSUMER_NAME )
697695
@@ -734,6 +732,7 @@ def stop(self):
734732
735733 self ._thread = None
736734 self ._on_response = None
735+ self ._on_fatal_exception = None
737736
738737 @property
739738 def is_active (self ):
0 commit comments