diff --git a/docs/subscriptions.rst b/docs/subscriptions.rst index 4c0b5e3de4..6f853688e2 100644 --- a/docs/subscriptions.rst +++ b/docs/subscriptions.rst @@ -49,7 +49,11 @@ As of v7.7.0, web3.py includes some additional convenient subscription managemen 1.) The subscription_manager ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -First, your ``w3`` (``AsyncWeb3``) instance now includes a new module, ``subscription_manager``. While you may still use the ``w3.eth.subscribe`` method from the previous example, the ``subscription_manager`` offers an additional way to start one or more subscriptions. We're going to pass in a list of events we want to subscribe to within the ``w3.subscription_manager.subscribe`` method. +The ``w3`` (``AsyncWeb3``) instance has a ``subscription_manager`` module. While you may +still use the ``w3.eth.subscribe`` method from the previous example, the +``subscription_manager`` offers an additional way to start one or more subscriptions and +provides better management of those subscriptions. We're going to pass in a list of +events we want to subscribe to within the ``w3.subscription_manager.subscribe`` method. .. code-block:: python @@ -71,13 +75,15 @@ To aid in defining those subscriptions, subscription type classes have been intr sub1 = NewHeadsSubscription( label="new-heads-mainnet", # optional label - handler=new_heads_handler + handler=new_heads_handler, ) sub2 = PendingTxSubscription( label="pending-tx-mainnet", # optional label full_transactions=True, handler=pending_tx_handler, + # optional parallelization flag (see Parallelizing subscriptions section below) + parallelize=True, ) sub3 = LogsSubscription( @@ -215,6 +221,92 @@ Let's put all the pieces together. This example will subscribe to new block head asyncio.run(sub_manager()) +Parallelizing subscriptions +--------------------------- + +.. important:: + + Parallelizing subscriptions does not guarantee that events will be processed in the + order they are received. Most events should still be processed in the order they are + received, but if a particular handler takes a long time to execute, newer events may + be processed first. It is recommended to set the ``parallelize`` flag to ``False`` + (default behavior) for subscriptions that depend on the order of events. + + +If you have multiple subscriptions that can be processed in parallel, you can set the +``parallelize`` flag to ``True`` - either globally on the subscription manager, or +individually on each subscription. This control allows the subscription manager to +handle subscription processing concurrently. This flag can be set on the manager, as a +global setting, or on individual subscriptions. This can help with performance if +subscriptions are independent of each other, or do not rely on some external shared +state (no race conditions are present). + +Global parallelization is off by default, meaning all subscriptions will be processed +sequentially unless you set the ``parallelize`` flag to ``True`` on the subscription +manager or individual subscriptions. + +.. code-block:: python + + sub1 = NewHeadsSubscription( + label="new-heads-mainnet", + handler=new_heads_handler, + parallelize=True, # process this subscription in parallel + ) + + sub2 = LogsSubscription( + label="WETH transfers", + address=weth_contract.address, + topics=[weth_contract.events.Transfer().topic], + handler=log_handler, + parallelize=False, # process sequentially (this is the default behavior) + ) + + sub3 = LogsSubscription( + label="WETH approvals", + address=weth_contract.address, + topics=[weth_contract.events.Approval().topic], + handler=approval_handler, + parallelize=True, # process this subscription in parallel + ) + + await w3.subscription_manager.subscribe([sub1, sub2]) + +Global parallelization can also be set on the subscription manager, which will apply to +all subscriptions unless overridden by an individual subscription's ``parallelize`` +flag: + +.. code-block:: python + + # or set the parallelize flag globally on the subscription manager: + w3.subscription_manager.parallelize = True + + # parallelize is set globally, so this will be processed in parallel + sub1 = NewHeadsSubscription( + label="new-heads-mainnet", + handler=new_heads_handler, + ) + + # this will be processed sequentially since ``parallelize`` is set to ``False``, + # overriding the global setting + sub2 = LogsSubscription( + label="WETH transfers", + address=weth_contract.address, + topics=[weth_contract.events.Transfer().topic], + handler=log_handler, + parallelize=False, # process sequentially + ) + + # this will also be processed in parallel + sub3 = LogsSubscription( + label="WETH approvals", + address=weth_contract.address, + topics=[weth_contract.events.Approval().topic], + handler=approval_handler, + ) + + await w3.subscription_manager.subscribe([sub1, sub2, sub3]) + + FAQ --- diff --git a/newsfragments/3709.feature.rst b/newsfragments/3709.feature.rst new file mode 100644 index 0000000000..23ca7e0e32 --- /dev/null +++ b/newsfragments/3709.feature.rst @@ -0,0 +1 @@ +Support parallelization of subscription handling globally via the subscription manager ``parallelize`` flag, and on a per-subscription basis via the ``parallelize`` flag on the subscription itself. diff --git a/tests/core/subscriptions/test_subscription_manager.py b/tests/core/subscriptions/test_subscription_manager.py index 52e2b7f334..f29859c361 100644 --- a/tests/core/subscriptions/test_subscription_manager.py +++ b/tests/core/subscriptions/test_subscription_manager.py @@ -1,5 +1,10 @@ import pytest +import asyncio import itertools +import time +from typing import ( + cast, +) from unittest.mock import ( AsyncMock, ) @@ -14,10 +19,14 @@ PersistentConnectionProvider, ) from web3.exceptions import ( + SubscriptionHandlerTaskException, Web3ValueError, ) -from web3.providers.persistent.subscription_manager import ( - SubscriptionManager, +from web3.providers.persistent.request_processor import ( + TaskReliantQueue, +) +from web3.types import ( + RPCResponse, ) from web3.utils.subscriptions import ( LogsSubscription, @@ -34,12 +43,23 @@ class MockProvider(PersistentConnectionProvider): @pytest_asyncio.fixture async def subscription_manager(): countr = itertools.count() - _w3 = AsyncWeb3(MockProvider()) - _w3.eth._subscribe = AsyncMock() - _w3.eth._subscribe.side_effect = lambda *_: f"0x{str(next(countr))}" - _w3.eth._unsubscribe = AsyncMock() - _w3.eth._unsubscribe.return_value = True - yield SubscriptionManager(_w3) + w3 = AsyncWeb3(MockProvider()) + w3.eth._subscribe = AsyncMock() + w3.eth._subscribe.side_effect = lambda *_: f"0x{str(next(countr))}" + w3.eth._unsubscribe = AsyncMock() + w3.eth._unsubscribe.return_value = True + yield w3.subscription_manager + + +def create_subscription_message(sub_id): + return cast( + RPCResponse, + { + "jsonrpc": "2.0", + "method": "eth_subscription", + "params": {"subscription": sub_id, "result": "0x0"}, + }, + ) @pytest.mark.asyncio @@ -213,3 +233,299 @@ async def test_unsubscribe_with_subscriptions_reference_does_not_mutate_the_list await subscription_manager.unsubscribe_all() assert subscription_manager.subscriptions == [] + + +@pytest.mark.asyncio +async def test_high_throughput_subscription_with_parallelize( + subscription_manager, +) -> None: + provider = subscription_manager._w3.provider + num_msgs = 5_000 + + provider._request_processor._handler_subscription_queue = TaskReliantQueue( + maxsize=num_msgs + ) + + # Turn on task-based processing. This test should fail the time constraint if this + # is not set to ``True`` (not task-based processing). + subscription_manager.parallelize = True + + class Counter: + val: int = 0 + + counter = Counter() + + async def high_throughput_handler(handler_context) -> None: + # if we awaited all `num_msgs`, we would sleep at least 5 seconds total + await asyncio.sleep(5 / num_msgs) + + handler_context.counter.val += 1 + if handler_context.counter.val == num_msgs: + await handler_context.subscription.unsubscribe() + + # build a meaningless subscription since we are fabricating the messages + sub_id = await subscription_manager.subscribe( + NewHeadsSubscription( + handler=high_throughput_handler, handler_context={"counter": counter} + ), + ) + provider._request_processor.cache_request_information( + request_id=sub_id, + method="eth_subscribe", + params=[], + response_formatters=((), (), ()), + ) + + # put `num_msgs` messages in the queue + for _ in range(num_msgs): + provider._request_processor._handler_subscription_queue.put_nowait( + create_subscription_message(sub_id) + ) + + start = time.time() + await subscription_manager.handle_subscriptions() + stop = time.time() + + assert counter.val == num_msgs + + assert subscription_manager.total_handler_calls == num_msgs + assert stop - start < 3 + + +@pytest.mark.asyncio +async def test_parallelize_with_error_propagation( + subscription_manager, +) -> None: + provider = subscription_manager._w3.provider + subscription_manager.parallelize = True + + async def high_throughput_handler(_handler_context) -> None: + raise ValueError("Test error msg.") + + # build a meaningless subscription since we are fabricating the messages + sub_id = await subscription_manager.subscribe( + NewHeadsSubscription(handler=high_throughput_handler) + ) + provider._request_processor.cache_request_information( + request_id=sub_id, + method="eth_subscribe", + params=[], + response_formatters=((), (), ()), + ) + provider._request_processor._handler_subscription_queue.put_nowait( + create_subscription_message(sub_id) + ) + + with pytest.raises( + SubscriptionHandlerTaskException, + match="Test error msg.", + ): + await subscription_manager.handle_subscriptions() + + +@pytest.mark.asyncio +async def test_subscription_parallelize_false_overrides_manager_true( + subscription_manager, +) -> None: + provider = subscription_manager._w3.provider + subscription_manager.parallelize = True # manager parallelizing + + async def test_handler(context) -> None: + # assert not parallelized + assert context.subscription.parallelize is False + assert subscription_manager._tasks == set() + await context.subscription.unsubscribe() + + sub_id = await subscription_manager.subscribe( + # parallelize=False overrides manager's parallelization setting + NewHeadsSubscription(handler=test_handler, parallelize=False) + ) + + provider._request_processor.cache_request_information( + request_id=sub_id, + method="eth_subscribe", + params=[], + response_formatters=((), (), ()), + ) + + provider._request_processor._handler_subscription_queue.put_nowait( + create_subscription_message(sub_id) + ) + + await subscription_manager.handle_subscriptions() + + assert subscription_manager.total_handler_calls == 1 + + +@pytest.mark.asyncio +async def test_subscription_parallelize_true_overrides_manager_default_false( + subscription_manager, +) -> None: + provider = subscription_manager._w3.provider + assert subscription_manager.parallelize is False + + async def test_handler(context) -> None: + # check that the subscription is parallelized + assert context.subscription.parallelize is True + assert len(context.async_w3.subscription_manager._tasks) == 1 + assert asyncio.current_task() in context.async_w3.subscription_manager._tasks + await context.subscription.unsubscribe() + + sub_id = await subscription_manager.subscribe( + NewHeadsSubscription(handler=test_handler, parallelize=True) + ) + + provider._request_processor.cache_request_information( + request_id=sub_id, + method="eth_subscribe", + params=[], + response_formatters=((), (), ()), + ) + + provider._request_processor._handler_subscription_queue.put_nowait( + create_subscription_message(sub_id) + ) + + await subscription_manager.handle_subscriptions() + + assert subscription_manager.total_handler_calls == 1 + assert len(subscription_manager._tasks) == 0 # assert cleaned up + + +@pytest.mark.asyncio +async def test_mixed_subscription_parallelization_settings( + subscription_manager, +) -> None: + provider = subscription_manager._w3.provider + subscription_manager.parallelize = True # manager wants parallel + + completion_order = [] + + async def fast_parallel_handler(_ctx) -> None: + await asyncio.sleep(0.05) + completion_order.append("fast_parallel") + assert asyncio.current_task() in subscription_manager._tasks + + async def slow_sequential_handler(_ctx) -> None: + await asyncio.sleep(0.15) + completion_order.append("slow_sequential") + assert asyncio.current_task() not in subscription_manager._tasks + + async def medium_default_handler(_ctx) -> None: + await asyncio.sleep(0.10) + completion_order.append("medium_default") + assert asyncio.current_task() in subscription_manager._tasks + + # we assume this should be the last task to complete so unsubscribe only here + await subscription_manager.unsubscribe_all() + + # subscriptions with different settings + fast_sub_id = await subscription_manager.subscribe( + NewHeadsSubscription(handler=fast_parallel_handler, parallelize=True) + ) + slow_sub_id = await subscription_manager.subscribe( + NewHeadsSubscription(handler=slow_sequential_handler, parallelize=False) + ) + medium_sub_id = await subscription_manager.subscribe( + # uses the manager default parallelization setting (True) + NewHeadsSubscription(handler=medium_default_handler) + ) + + for sub_id in {slow_sub_id, fast_sub_id, medium_sub_id}: + provider._request_processor.cache_request_information( + request_id=sub_id, + method="eth_subscribe", + params=[], + response_formatters=((), (), ()), + ) + + # send messages in order of slow (but main loop), fast (parallel), medium (parallel) + for sub_id in [slow_sub_id, fast_sub_id, medium_sub_id]: + provider._request_processor._handler_subscription_queue.put_nowait( + create_subscription_message(sub_id) + ) + + await subscription_manager.handle_subscriptions() + + # `slow_sequential` should complete first despite taking longest because it + # blocks the main loop. The next two run in parallel after, so the fastest of the + # two should complete next, leaving the `medium_default` last. + assert len(completion_order) == 3 + assert completion_order[0] == "slow_sequential" + assert "fast_parallel" == completion_order[1] + assert "medium_default" == completion_order[2] + + +@pytest.mark.asyncio +async def test_performance_difference_with_subscription_overrides( + subscription_manager, +) -> None: + provider = subscription_manager._w3.provider + assert subscription_manager.parallelize is False + + manager_tasks = subscription_manager._tasks + + async def parallel_handler(_ctx) -> None: + await asyncio.sleep(0.1) + assert asyncio.current_task() in manager_tasks + if len(manager_tasks) >= 3: + await subscription_manager.unsubscribe_all() + + # create 3 subscriptions, override all to parallel despite manager default False + for _ in range(3): + sub_id = await subscription_manager.subscribe( + NewHeadsSubscription(handler=parallel_handler, parallelize=True) + ) + provider._request_processor.cache_request_information( + request_id=sub_id, + method="eth_subscribe", + params=[], + response_formatters=((), (), ()), + ) + provider._request_processor._handler_subscription_queue.put_nowait( + create_subscription_message(sub_id) + ) + + await subscription_manager.handle_subscriptions() + + assert subscription_manager.total_handler_calls == 3 + assert len(manager_tasks) == 0 # all tasks cleaned up + + +@pytest.mark.asyncio +async def test_eth_subscribe_api_call_with_all_kwargs(subscription_manager): + async_w3 = subscription_manager._w3 + provider = subscription_manager._w3.provider + + label = "test_subscription" + test_ctx = "test context" + + async def parallel_handler(context) -> None: + assert asyncio.current_task() in subscription_manager._tasks + assert context.test_ctx == test_ctx + sub = subscription_manager.get_by_id(context.subscription.id) + assert sub.label == label + + await context.subscription.unsubscribe() + + sub_id = await async_w3.eth.subscribe( + "newHeads", + handler=parallel_handler, + handler_context={"test_ctx": test_ctx}, + label=label, + parallelize=True, + ) + provider._request_processor.cache_request_information( + request_id=sub_id, + method="eth_subscribe", + params=[], + response_formatters=((), (), ()), + ) + provider._request_processor._handler_subscription_queue.put_nowait( + create_subscription_message(sub_id) + ) + + await subscription_manager.handle_subscriptions() + + assert subscription_manager.total_handler_calls == 1 + assert len(async_w3.subscription_manager._tasks) == 0 diff --git a/web3/eth/async_eth.py b/web3/eth/async_eth.py index 2339ac3b38..603db9b3bd 100644 --- a/web3/eth/async_eth.py +++ b/web3/eth/async_eth.py @@ -720,19 +720,6 @@ async def uninstall_filter(self, filter_id: HexStr) -> bool: mungers=[default_root_munger], ) - _subscribe_with_args: Method[ - Callable[ - [ - SubscriptionType, - Optional[Union[LogsSubscriptionArg, bool]], - ], - Awaitable[HexStr], - ] - ] = Method( - RPC.eth_subscribe, - mungers=[default_root_munger], - ) - async def subscribe( self, subscription_type: SubscriptionType, @@ -745,6 +732,7 @@ async def subscribe( handler: Optional[EthSubscriptionHandler] = None, handler_context: Optional[Dict[str, Any]] = None, label: Optional[str] = None, + parallelize: Optional[bool] = None, ) -> HexStr: if not isinstance(self.w3.provider, PersistentConnectionProvider): raise MethodNotSupported( @@ -757,6 +745,7 @@ async def subscribe( handler=handler, handler_context=handler_context or {}, label=label, + parallelize=parallelize, ) return await self.w3.subscription_manager.subscribe(sub) diff --git a/web3/exceptions.py b/web3/exceptions.py index 05c9f18825..1bdbd7709c 100644 --- a/web3/exceptions.py +++ b/web3/exceptions.py @@ -353,13 +353,20 @@ class PersistentConnectionClosedOK(PersistentConnectionError): """ -class SubscriptionProcessingFinished(Web3Exception): +class SubscriptionProcessingFinished(PersistentConnectionError): """ Raised to alert the subscription manager that the processing of subscriptions has finished. """ +class SubscriptionHandlerTaskException(TaskNotRunning): + """ + Raised to alert the subscription manager that an exception occurred in the + subscription processing task. + """ + + class Web3RPCError(Web3Exception): """ Raised when a JSON-RPC response contains an error field. diff --git a/web3/manager.py b/web3/manager.py index 7b5e989c7c..dc3d663bf9 100644 --- a/web3/manager.py +++ b/web3/manager.py @@ -550,13 +550,10 @@ async def _message_stream( else: # if not an active sub, skip processing and continue continue - except TaskNotRunning: + except TaskNotRunning as e: await asyncio.sleep(0) self._provider._handle_listener_task_exceptions() - self.logger.error( - "Message listener background task has stopped unexpectedly. " - "Stopping message stream." - ) + self.logger.error("Stopping message stream: %s", e.message) return async def _process_response( diff --git a/web3/providers/persistent/persistent.py b/web3/providers/persistent/persistent.py index d0eb076d9d..7406daba58 100644 --- a/web3/providers/persistent/persistent.py +++ b/web3/providers/persistent/persistent.py @@ -164,7 +164,7 @@ async def send_batch_func( if cache_key != self._send_batch_func_cache[0]: async def send_func( - requests: List[Tuple[RPCEndpoint, Any]] + requests: List[Tuple[RPCEndpoint, Any]], ) -> List[RPCRequest]: for mw in middleware: initialized = mw(async_w3) @@ -376,11 +376,12 @@ def _message_listener_callback( ) -> None: # Puts a `TaskNotRunning` in appropriate queues to signal the end of the # listener task to any listeners relying on the queues. + message = "Message listener task has ended." self._request_processor._subscription_response_queue.put_nowait( - TaskNotRunning(message_listener_task) + TaskNotRunning(message_listener_task, message=message) ) self._request_processor._handler_subscription_queue.put_nowait( - TaskNotRunning(message_listener_task) + TaskNotRunning(message_listener_task, message=message) ) def _raise_stray_errors_from_cache(self) -> None: diff --git a/web3/providers/persistent/subscription_manager.py b/web3/providers/persistent/subscription_manager.py index 4b8e4e8121..242b059765 100644 --- a/web3/providers/persistent/subscription_manager.py +++ b/web3/providers/persistent/subscription_manager.py @@ -5,6 +5,7 @@ Any, List, Sequence, + Set, Union, cast, overload, @@ -15,6 +16,7 @@ ) from web3.exceptions import ( + SubscriptionHandlerTaskException, SubscriptionProcessingFinished, TaskNotRunning, Web3TypeError, @@ -50,19 +52,26 @@ class SubscriptionManager: logger: logging.Logger = logging.getLogger( "web3.providers.persistent.subscription_manager" ) - total_handler_calls: int = 0 def __init__(self, w3: "AsyncWeb3") -> None: self._w3 = w3 self._provider = cast("PersistentConnectionProvider", w3.provider) self._subscription_container = SubscriptionContainer() + # parallelize all subscription handler calls + self.parallelize = False + self.task_timeout = 1 + # TODO: can remove quotes from type hints once Python 3.8 support is dropped + self._tasks: Set["asyncio.Task[None]"] = set() + # share the subscription container with the request processor so it can separate # subscriptions into different queues based on ``sub._handler`` presence self._provider._request_processor._subscription_container = ( self._subscription_container ) + self.total_handler_calls: int = 0 + def _add_subscription(self, subscription: EthSubscription[Any]) -> None: self._subscription_container.add_subscription(subscription) @@ -86,6 +95,35 @@ def _validate_and_normalize_label(self, subscription: EthSubscription[Any]) -> N f"labels.\n label: {subscription._label}" ) + # TODO: can remove quotes from type hints once Python 3.8 support is dropped + def _handler_task_callback(self, task: "asyncio.Task[None]") -> None: + """ + Callback when a handler task completes. Similar to _message_listener_callback. + Puts handler exceptions into the queue to be raised in the main loop, else + removes the task from the set of active tasks. + """ + if task.done() and not task.cancelled(): + try: + task.result() + self._tasks.discard(task) + except Exception as e: + self.logger.exception("Subscription handler task raised an exception.") + self._provider._request_processor._handler_subscription_queue.put_nowait( # noqa: E501 + SubscriptionHandlerTaskException(task, message=str(e)) + ) + + async def _cleanup_remaining_tasks(self) -> None: + """Cancel and clean up all remaining tasks.""" + if not self._tasks: + return + + self.logger.debug("Cleaning up %d remaining tasks...", len(self._tasks)) + for task in self._tasks: + if not task.done(): + task.cancel() + + self._tasks.clear() + @property def subscriptions(self) -> List[EthSubscription[Any]]: return self._subscription_container.subscriptions @@ -281,14 +319,23 @@ async def handle_subscriptions(self, run_forever: bool = False) -> None: sub_id ) if sub: - await sub._handler( - EthSubscriptionContext( - self._w3, - sub, - formatted_sub_response["result"], - **sub._handler_context, - ) + sub_context = EthSubscriptionContext( + self._w3, + sub, + formatted_sub_response["result"], + **sub._handler_context, ) + if sub.parallelize is True or ( + sub.parallelize is None and self.parallelize + ): + # run the handler in a task to allow parallel processing + task = asyncio.create_task(sub._handler(sub_context)) + self._tasks.add(task) + task.add_done_callback(self._handler_task_callback) + else: + # await the handler in the main loop to ensure order + await sub._handler(sub_context) + except SubscriptionProcessingFinished: if not run_forever: self.logger.info( @@ -296,14 +343,20 @@ async def handle_subscriptions(self, run_forever: bool = False) -> None: "Stopping subscription handling." ) break - except TaskNotRunning: - await asyncio.sleep(0) - self._provider._handle_listener_task_exceptions() + except SubscriptionHandlerTaskException: self.logger.error( - "Message listener background task for the provider has stopped " - "unexpectedly. Stopping subscription handling." + "An exception occurred in a subscription handler task. " + "Stopping subscription handling." ) + await self._cleanup_remaining_tasks() + raise + except TaskNotRunning as e: + self.logger.error("Stopping subscription handling: %s", e.message) + self._provider._handle_listener_task_exceptions() break # no active handler subscriptions, clear the handler subscription queue self._provider._request_processor._reset_handler_subscription_queue() + + if self._tasks: + await self._cleanup_remaining_tasks() diff --git a/web3/utils/subscriptions.py b/web3/utils/subscriptions.py index 4e1c62c3f5..d0cddd7f35 100644 --- a/web3/utils/subscriptions.py +++ b/web3/utils/subscriptions.py @@ -110,11 +110,14 @@ def __init__( handler: Optional[EthSubscriptionHandler] = None, handler_context: Optional[Dict[str, Any]] = None, label: Optional[str] = None, + parallelize: Optional[bool] = None, ) -> None: self._subscription_params = subscription_params self._handler = handler_wrapper(handler) self._handler_context = handler_context or {} self._label = label + + self.parallelize = parallelize self.handler_call_count = 0 @property @@ -128,6 +131,7 @@ def _create_type_aware_subscription( handler: Optional[EthSubscriptionHandler] = None, handler_context: Optional[Dict[str, Any]] = None, label: Optional[str] = None, + parallelize: Optional[bool] = None, ) -> "EthSubscription[Any]": subscription_type = subscription_params[0] subscription_arg = ( @@ -135,7 +139,10 @@ def _create_type_aware_subscription( ) if subscription_type == "newHeads": return NewHeadsSubscription( - handler=handler, handler_context=handler_context, label=label + handler=handler, + handler_context=handler_context, + label=label, + parallelize=parallelize, ) elif subscription_type == "logs": subscription_arg = subscription_arg or {} @@ -144,6 +151,7 @@ def _create_type_aware_subscription( handler=handler, handler_context=handler_context, label=label, + parallelize=parallelize, ) elif subscription_type == "newPendingTransactions": subscription_arg = subscription_arg or False @@ -152,10 +160,14 @@ def _create_type_aware_subscription( handler=handler, handler_context=handler_context, label=label, + parallelize=parallelize, ) elif subscription_type == "syncing": return SyncingSubscription( - handler=handler, handler_context=handler_context, label=label + handler=handler, + handler_context=handler_context, + label=label, + parallelize=parallelize, ) else: params = ( @@ -168,6 +180,7 @@ def _create_type_aware_subscription( handler=handler, handler_context=handler_context, label=label, + parallelize=parallelize, ) @property @@ -206,6 +219,7 @@ def __init__( handler: LogsSubscriptionHandler = None, handler_context: Optional[Dict[str, Any]] = None, label: Optional[str] = None, + parallelize: Optional[bool] = None, ) -> None: self.address = address self.topics = topics @@ -222,6 +236,7 @@ def __init__( handler=handler, handler_context=handler_context, label=label, + parallelize=parallelize, ) @@ -237,12 +252,14 @@ def __init__( label: Optional[str] = None, handler: Optional[NewHeadsSubscriptionHandler] = None, handler_context: Optional[Dict[str, Any]] = None, + parallelize: Optional[bool] = None, ) -> None: super().__init__( subscription_params=("newHeads",), handler=handler, handler_context=handler_context, label=label, + parallelize=parallelize, ) @@ -261,6 +278,7 @@ def __init__( label: Optional[str] = None, handler: Optional[PendingTxSubscriptionHandler] = None, handler_context: Optional[Dict[str, Any]] = None, + parallelize: Optional[bool] = None, ) -> None: self.full_transactions = full_transactions super().__init__( @@ -268,6 +286,7 @@ def __init__( handler=handler, handler_context=handler_context, label=label, + parallelize=parallelize, ) @@ -283,10 +302,12 @@ def __init__( label: Optional[str] = None, handler: Optional[SyncingSubscriptionHandler] = None, handler_context: Optional[Dict[str, Any]] = None, + parallelize: Optional[bool] = None, ) -> None: super().__init__( subscription_params=("syncing",), handler=handler, handler_context=handler_context, label=label, + parallelize=parallelize, )