diff --git a/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py b/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py index fc4219134..bb60f2a96 100644 --- a/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py +++ b/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py @@ -8,6 +8,7 @@ import requests from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler +from airbyte_cdk.sources.streams.http.error_handlers.backoff_strategy import BackoffStrategy from airbyte_cdk.sources.streams.http.error_handlers.response_models import ( ErrorResolution, ResponseAction, @@ -77,3 +78,24 @@ def interpret_response( return matched_error_resolution return create_fallback_error_resolution(response_or_exception) + + @property + def backoff_strategies(self) -> Optional[List[BackoffStrategy]]: + """ + Combines backoff strategies from all child error handlers into a single flattened list. + + When used with HttpRequester, note the following behavior: + - In HttpRequester.__post_init__, the entire list of backoff strategies is assigned to the error handler + - However, the error handler's backoff_time() method only ever uses the first non-None strategy in the list + - This means that if any backoff strategies are present, the first non-None strategy becomes the default + - This applies to both user-defined response filters and errors from DEFAULT_ERROR_MAPPING + - The list structure is not used to map different strategies to different error conditions + - Therefore, subsequent strategies in the list will not be used + + Returns None if no handlers have strategies defined, which will result in HttpRequester using its default backoff strategy. + """ + all_strategies = [] + for handler in self.error_handlers: + if hasattr(handler, "backoff_strategies") and handler.backoff_strategies: + all_strategies.extend(handler.backoff_strategies) + return all_strategies if all_strategies else None diff --git a/unit_tests/sources/declarative/requesters/error_handlers/test_composite_error_handler.py b/unit_tests/sources/declarative/requesters/error_handlers/test_composite_error_handler.py index 7a4e46ed3..765b879de 100644 --- a/unit_tests/sources/declarative/requesters/error_handlers/test_composite_error_handler.py +++ b/unit_tests/sources/declarative/requesters/error_handlers/test_composite_error_handler.py @@ -9,6 +9,9 @@ from airbyte_cdk.models import FailureType from airbyte_cdk.sources.declarative.requesters.error_handlers import HttpResponseFilter +from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies import ( + ConstantBackoffStrategy, +) from airbyte_cdk.sources.declarative.requesters.error_handlers.composite_error_handler import ( CompositeErrorHandler, ) @@ -272,3 +275,77 @@ def test_max_time_is_max_of_underlying_handlers(test_name, max_times, expected_m max_time = composite_error_handler.max_time assert max_time == expected_max_time + + +@pytest.mark.parametrize( + "test_name, handler_strategies, expected_strategies", + [ + ("test_empty_strategies", [None, None], None), + ( + "test_single_handler_with_strategy", + [[ConstantBackoffStrategy(5, {}, {})], None], + [ConstantBackoffStrategy(5, {}, {})], + ), + ( + "test_multiple_handlers_with_strategies", + [[ConstantBackoffStrategy(5, {}, {})], [ConstantBackoffStrategy(10, {}, {})]], + [ConstantBackoffStrategy(5, {}, {}), ConstantBackoffStrategy(10, {}, {})], + ), + ( + "test_some_handlers_without_strategies", + [[ConstantBackoffStrategy(5, {}, {})], None, [ConstantBackoffStrategy(10, {}, {})]], + [ConstantBackoffStrategy(5, {}, {}), ConstantBackoffStrategy(10, {}, {})], + ), + ], +) +def test_composite_error_handler_backoff_strategies( + test_name, handler_strategies, expected_strategies +): + parameters = {} + config = {} + + error_handlers = [ + DefaultErrorHandler(backoff_strategies=strategies, parameters=parameters, config=config) + for strategies in handler_strategies + ] + + composite_handler = CompositeErrorHandler(error_handlers=error_handlers, parameters=parameters) + + assert composite_handler.backoff_strategies == expected_strategies + + +def test_composite_error_handler_always_uses_first_strategy(): + first_handler = DefaultErrorHandler( + backoff_strategies=[ConstantBackoffStrategy(5, {}, {})], + parameters={}, + config={}, + response_filters=[ + HttpResponseFilter( + action=ResponseAction.RETRY, http_codes={429}, config={}, parameters={} + ) + ], + ) + second_handler = DefaultErrorHandler( + backoff_strategies=[ConstantBackoffStrategy(10, {}, {})], + parameters={}, + config={}, + response_filters=[ + HttpResponseFilter( + action=ResponseAction.RETRY, http_codes={500}, config={}, parameters={} + ) + ], + ) + + composite_handler = CompositeErrorHandler( + error_handlers=[first_handler, second_handler], parameters={} + ) + + # Test that even for a 500 error (which matches second handler's filter), + # we still get both strategies with first handler's coming first + response_mock = create_response(500) + assert first_handler.backoff_strategies[0].backoff_time(response_mock, 1) == 5 + + # Verify we get both strategies in the composite handler + assert len(composite_handler.backoff_strategies) == 2 + assert isinstance(composite_handler.backoff_strategies[0], ConstantBackoffStrategy) + assert composite_handler.backoff_strategies[1], ConstantBackoffStrategy diff --git a/unit_tests/sources/declarative/requesters/test_http_requester.py b/unit_tests/sources/declarative/requesters/test_http_requester.py index 28ea0cb9b..8e63aa21e 100644 --- a/unit_tests/sources/declarative/requesters/test_http_requester.py +++ b/unit_tests/sources/declarative/requesters/test_http_requester.py @@ -13,6 +13,7 @@ from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.requesters.error_handlers import HttpResponseFilter from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies import ( ConstantBackoffStrategy, ExponentialBackoffStrategy, @@ -26,6 +27,7 @@ InterpolatedRequestOptionsProvider, ) from airbyte_cdk.sources.message import MessageRepository +from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction from airbyte_cdk.sources.streams.http.exceptions import ( RequestBodyException, UserDefinedBackoffException, @@ -901,3 +903,34 @@ def test_request_attempt_count_with_exponential_backoff_strategy(http_requester_ http_requester._http_client._request_attempt_count.get(request_mock) == http_requester._http_client._max_retries + 1 ) + + +@pytest.mark.usefixtures("mock_sleep") +def test_backoff_strategy_from_manifest_is_respected(http_requester_factory: Any) -> None: + backoff_strategy = ConstantBackoffStrategy( + parameters={}, config={}, backoff_time_in_seconds=0.1 + ) + error_handler = DefaultErrorHandler( + parameters={}, config={}, max_retries=1, backoff_strategies=[backoff_strategy] + ) + + request_mock = MagicMock(spec=requests.PreparedRequest) + request_mock.headers = {} + request_mock.url = "https://orksy.com/orks_rule_humies_drule" + request_mock.method = "GET" + request_mock.body = {} + + http_requester = http_requester_factory(error_handler=error_handler) + http_requester._http_client._session.send = MagicMock() + + response = requests.Response() + response.status_code = 500 + http_requester._http_client._session.send.return_value = response + + with pytest.raises(UserDefinedBackoffException): + http_requester._http_client._send_with_retry(request=request_mock, request_kwargs={}) + + assert ( + http_requester._http_client._request_attempt_count.get(request_mock) + == http_requester._http_client._max_retries + 1 + )