diff --git a/.github/ISSUE_TEMPLATE/bug.yml b/.github/ISSUE_TEMPLATE/bug.yml index 3b12aa28..bd567007 100644 --- a/.github/ISSUE_TEMPLATE/bug.yml +++ b/.github/ISSUE_TEMPLATE/bug.yml @@ -50,7 +50,7 @@ body: - Documentation (part:docs) - Unit, integration and performance tests (part:tests) - Build script, CI, dependencies, etc. (part:tooling) - - Channels, `Broadcast`, `Bidirectional`, etc. (part:channels) + - Channels, `Broadcast`, `Anycast`, etc. (part:channels) - Select (part:select) - Utility receivers, `Merge`, etc. (part:receivers) validations: diff --git a/.github/labeler.yml b/.github/labeler.yml index edfdd7a2..4507b9a8 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -6,7 +6,7 @@ # For more details on the configuration please see: # https://github.com/marketplace/actions/labeler -"part:docs": +"part:docs": - "**/*.md" - "docs/**" - "examples/**" @@ -31,14 +31,15 @@ - noxfile.py "part:channels": - - any: - - "src/frequenz/channels/**" - - "!src/frequenz/channels/util/**" + - "src/frequenz/channels/_anycast.py" + - "src/frequenz/channels/_broadcast.py" "part:receivers": - - any: - - "src/frequenz/channels/util/**" - - "!src/frequenz/channels/util/_select.py" + - "src/frequenz/channels/_merge.py" + - "src/frequenz/channels/_merge_named.py" + - "src/frequenz/channels/event.py" + - "src/frequenz/channels/file_watcher.py" + - "src/frequenz/channels/timer.py" "part:select": - - "src/frequenz/channels/util/_select.py" + - "src/frequenz/channels/_select.py" diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 08db0eae..fd869421 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -2,7 +2,6 @@ ## Summary -The `Timer` now can be started with a delay. ## Upgrading @@ -12,11 +11,9 @@ The `Timer` now can be started with a delay. You should instantiate using `Anycast(name=..., limit=...)` (or `Anycast(name=...)` if the default `limit` is enough) instead of `Anycast(...)` or `Anycast(maxsize=...)`. -* `Bidirectional` - - - The `client_id` and `service_id` arguments were merged into a keyword-only `name`. + - `new_sender` and `new_receiver`: They now return a base `Sender` and `Receiver` class (respectively) instead of a channel-specific `Sender` or `Receiver` subclass. - You should instantiate using `Bidirectional(name=...)` instead of `Bidirectional(..., ...)` or `Bidirectional(client_id=..., service_id=...)`. + This means users now don't have access to the internals to the channel-specific `Sender` and `Receiver` subclasses. * `Broadcast` @@ -28,16 +25,71 @@ The `Timer` now can be started with a delay. You should use `.new_receiver(name=name, limit=limit)` (or `.new_receiver()` if the defaults are enough) instead of `.new_receiver(name)` or `.new_receiver(name, maxsize)`. + - `new_sender` and `new_receiver` now return a base `Sender` and `Receiver` class (respectively) instead of a channel-specific `Sender` or `Receiver` subclass. + + This means users now don't have access to the internals to the channel-specific `Sender` and `Receiver` subclasses. + * `Event` - `__init__`: The `name` argument was made keyword-only. The default was changed to a more readable version of `id(self)`. You should instantiate using `Event(name=...)` instead of `Event(...)`. + - Moved from `frequenz.channels.util` to `frequenz.channels.event`. + +* `FileWatcher` + + - Moved from `frequenz.channels.util` to `frequenz.channels.file_watcher`. + + - Support classes are no longer nested inside `FileWatcher`. They are now top-level classes within the new `frequenz.channels.file_watcher` module (e.g., `frequenz.channels.util.FileWatcher.EventType` -> `frequenz.channels.file_watcher.EventType`, `frequenz.channels.util.FileWatcher.Event` -> `frequenz.channels.file_watcher.Event`). + +* `Timer` and support classes + + - Moved from `frequenz.channels.util` to `frequenz.channels.timer`. + * All exceptions that took `Any` as the `message` argument now take `str` instead. If you were passing a non-`str` value to an exception, you should convert it using `str(value)` before passing it to the exception. +* The following symbols were moved to the top-level `frequenz.channels` package: + + - `Merge` + - `MergeNamed` + - `Selected` + - `SelectError` + - `SelectErrorGroup` + - `UnhandledSelectedError` + - `select` + - `selected_from` + +### Removals + +* `Bidirectional` + + This channel was removed as it is not recommended practice and was a niche use case. If you need to use it, you can set up two channels or copy the `Bidirectional` class from the previous version to your project. + +* `Peekable` + + This class was removed because it was merely a shortcut to a receiver that caches the last value received. It did not fit the channel abstraction well and was infrequently used. + + You can replace it with a task that receives and retains the last value. + +* `Broadcast.new_peekable()` + + This was removed alongside `Peekable`. + +* `Receiver.into_peekable()` + + This was removed alongside `Peekable`. + +* `ReceiverInvalidatedError` + + This was removed alongside `Peekable` (it was only raised when using a `Receiver` that was converted into a `Peekable`). + +* `util` + + The entire `util` package was removed and its symbols were either moved to the top-level package or to their own public modules (as noted above). + ## New Features * `Anycast` diff --git a/src/frequenz/channels/__init__.py b/src/frequenz/channels/__init__.py index 3489452b..dbf60751 100644 --- a/src/frequenz/channels/__init__.py +++ b/src/frequenz/channels/__init__.py @@ -6,35 +6,32 @@ This package contains [channel](https://en.wikipedia.org/wiki/Channel_(programming)) implementations. +Base classes: + +* [Receiver][frequenz.channels.Receiver]: An object that can wait for and + consume messages from a channel. + +* [Sender][frequenz.channels.Sender]: An object that can send messages to + a channel. + Channels: * [Anycast][frequenz.channels.Anycast]: A channel that supports multiple senders and multiple receivers. A message sent through a sender will be received by exactly one receiver. -* [Bidirectional][frequenz.channels.Bidirectional]: A channel providing - a `client` and a `service` handle to send and receive bidirectionally. - * [Broadcast][frequenz.channels.Broadcast]: A channel to broadcast messages from multiple senders to multiple receivers. Each message sent through any of the senders is received by all of the receivers. -Other base classes: +Utilities to work with channels: -* [Peekable][frequenz.channels.Peekable]: An object to allow users to get - a peek at the latest value in the channel, without consuming anything. +* [Merge][frequenz.channels.Merge] and [MergeNamed][frequenz.channels.MergeNamed]: + [Receivers][frequenz.channels.Receiver] that merge messages coming from multiple + receivers into a single stream. -* [Receiver][frequenz.channels.Receiver]: An object that can wait for and - consume messages from a channel. - -* [Sender][frequenz.channels.Sender]: An object that can send messages to - a channel. - -Utilities: - -* [util][frequenz.channels.util]: A module with utilities, like special - receivers that implement timers, file watchers, merge receivers, or wait for - messages in multiple channels. +* [select][frequenz.channels.select]: Iterate over the values of all + [receivers][frequenz.channels.Receiver] as new values become available. Exception classes: @@ -56,39 +53,61 @@ * [ReceiverStoppedError][frequenz.channels.ReceiverStoppedError]: A receiver stopped producing messages. -* [ReceiverInvalidatedError][frequenz.channels.ReceiverInvalidatedError]: - A receiver is not longer valid (for example if it was converted into - a peekable. +* [SelectError][frequenz.channels.SelectError]: Base class for all errors + related to [select][frequenz.channels.select]. + +* [SelectErrorGroup][frequenz.channels.SelectErrorGroup]: A group of errors + raised by [select][frequenz.channels.select]. + +* [UnhandledSelectedError][frequenz.channels.UnhandledSelectedError]: An error + raised by [select][frequenz.channels.select] that was not handled by the + user. + +Extra utility receivers: + +* [Event][frequenz.channels.event.Event]: A receiver that generates a message when + an event is set. + +* [FileWatcher][frequenz.channels.file_watcher.FileWatcher]: A receiver that + generates a message when a file is added, modified or deleted. + +* [Timer][frequenz.channels.timer.Timer]: A receiver that generates a message after a + given amount of time. """ -from . import util from ._anycast import Anycast -from ._base_classes import Peekable, Receiver, Sender -from ._bidirectional import Bidirectional from ._broadcast import Broadcast -from ._exceptions import ( - ChannelClosedError, - ChannelError, - Error, - ReceiverError, - ReceiverInvalidatedError, - ReceiverStoppedError, - SenderError, +from ._exceptions import ChannelClosedError, ChannelError, Error +from ._merge import Merge +from ._merge_named import MergeNamed +from ._receiver import Receiver, ReceiverError, ReceiverStoppedError +from ._select import ( + Selected, + SelectError, + SelectErrorGroup, + UnhandledSelectedError, + select, + selected_from, ) +from ._sender import Sender, SenderError __all__ = [ "Anycast", - "Bidirectional", "Broadcast", "ChannelClosedError", "ChannelError", "Error", - "Peekable", + "Merge", + "MergeNamed", "Receiver", "ReceiverError", - "ReceiverInvalidatedError", "ReceiverStoppedError", + "SelectError", + "SelectErrorGroup", + "Selected", "Sender", "SenderError", - "util", + "UnhandledSelectedError", + "select", + "selected_from", ] diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index 21293b94..952aa12f 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -8,17 +8,18 @@ import logging from asyncio import Condition from collections import deque -from typing import Generic +from typing import Generic, TypeVar -from ._base_classes import Receiver as BaseReceiver -from ._base_classes import Sender as BaseSender -from ._base_classes import T -from ._exceptions import ChannelClosedError, ReceiverStoppedError, SenderError +from ._exceptions import ChannelClosedError +from ._receiver import Receiver, ReceiverStoppedError +from ._sender import Sender, SenderError _logger = logging.getLogger(__name__) +_T = TypeVar("_T") -class Anycast(Generic[T]): + +class Anycast(Generic[_T]): """A channel for sending data across async tasks. Anycast channels support multiple senders and multiple receivers. A message sent @@ -37,9 +38,9 @@ class Anycast(Generic[T]): thread-safe. When there are multiple channel receivers, they can be awaited - simultaneously using [select][frequenz.channels.util.select], - [Merge][frequenz.channels.util.Merge] or - [MergeNamed][frequenz.channels.util.MergeNamed]. + simultaneously using [select][frequenz.channels.select], + [Merge][frequenz.channels.Merge] or + [MergeNamed][frequenz.channels.MergeNamed]. Example: ``` python @@ -87,7 +88,7 @@ def __init__(self, *, name: str, limit: int = 10) -> None: of the channel. """ - self._deque: deque[T] = deque(maxlen=limit) + self._deque: deque[_T] = deque(maxlen=limit) """The channel's buffer.""" self._send_cv: Condition = Condition() @@ -157,21 +158,21 @@ async def close(self) -> None: async with self._recv_cv: self._recv_cv.notify_all() - def new_sender(self) -> Sender[T]: + def new_sender(self) -> Sender[_T]: """Create a new sender. Returns: A Sender instance attached to the Anycast channel. """ - return Sender(self) + return _Sender(self) - def new_receiver(self) -> Receiver[T]: + def new_receiver(self) -> Receiver[_T]: """Create a new receiver. Returns: A Receiver instance attached to the Anycast channel. """ - return Receiver(self) + return _Receiver(self) def __str__(self) -> str: """Return a string representation of this channel.""" @@ -185,23 +186,23 @@ def __repr__(self) -> str: ) -class Sender(BaseSender[T]): +class _Sender(Sender[_T]): """A sender to send messages to an Anycast channel. Should not be created directly, but through the `Anycast.new_sender()` method. """ - def __init__(self, chan: Anycast[T]) -> None: + def __init__(self, chan: Anycast[_T]) -> None: """Create a channel sender. Args: chan: A reference to the channel that this sender belongs to. """ - self._chan: Anycast[T] = chan + self._chan: Anycast[_T] = chan """The channel that this sender belongs to.""" - async def send(self, msg: T) -> None: + async def send(self, msg: _T) -> None: """Send a message across the channel. To send, this method inserts the message into the Anycast channel's @@ -253,23 +254,23 @@ class _Empty: """A sentinel value to indicate that a value has not been set.""" -class Receiver(BaseReceiver[T]): +class _Receiver(Receiver[_T]): """A receiver to receive messages from an Anycast channel. Should not be created directly, but through the `Anycast.new_receiver()` method. """ - def __init__(self, chan: Anycast[T]) -> None: + def __init__(self, chan: Anycast[_T]) -> None: """Create a channel receiver. Args: chan: A reference to the channel that this receiver belongs to. """ - self._chan: Anycast[T] = chan + self._chan: Anycast[_T] = chan """The channel that this receiver belongs to.""" - self._next: T | type[_Empty] = _Empty + self._next: _T | type[_Empty] = _Empty async def ready(self) -> bool: """Wait until the receiver is ready with a value or an error. @@ -298,7 +299,7 @@ async def ready(self) -> bool: # pylint: enable=protected-access return True - def consume(self) -> T: + def consume(self) -> _T: """Return the latest value once `ready()` is complete. Returns: @@ -318,7 +319,7 @@ def consume(self) -> T: ), "`consume()` must be preceded by a call to `ready()`" # mypy doesn't understand that the assert above ensures that self._next is not # _Sentinel. So we have to use a type ignore here. - next_val: T = self._next # type: ignore[assignment] + next_val: _T = self._next # type: ignore[assignment] self._next = _Empty return next_val diff --git a/src/frequenz/channels/_bidirectional.py b/src/frequenz/channels/_bidirectional.py deleted file mode 100644 index d535705b..00000000 --- a/src/frequenz/channels/_bidirectional.py +++ /dev/null @@ -1,206 +0,0 @@ -# License: MIT -# Copyright © 2022 Frequenz Energy-as-a-Service GmbH - -"""An abstraction to provide bi-directional communication between actors.""" - -from __future__ import annotations - -from typing import Generic, TypeVar - -from ._base_classes import Receiver, Sender, T, U -from ._broadcast import Broadcast -from ._exceptions import ChannelError, ReceiverError, SenderError - -V = TypeVar("V") -W = TypeVar("W") - - -class Bidirectional(Generic[T, U]): - """A wrapper class for simulating bidirectional channels.""" - - class Handle(Sender[V], Receiver[W]): - """A handle to a [Bidirectional][frequenz.channels.Bidirectional] instance. - - It can be used to send/receive values between the client and service. - """ - - def __init__( - self, - channel: Bidirectional[V, W] | Bidirectional[W, V], - sender: Sender[V], - receiver: Receiver[W], - ) -> None: - """Create a `Bidirectional.Handle` instance. - - Args: - channel: The underlying channel. - sender: A sender to send values with. - receiver: A receiver to receive values from. - """ - self._chan: Bidirectional[V, W] | Bidirectional[W, V] = channel - """The underlying channel.""" - - self._sender: Sender[V] = sender - """The sender to send values with.""" - - self._receiver: Receiver[W] = receiver - """The receiver to receive values from.""" - - async def send(self, msg: V) -> None: - """Send a value to the other side. - - Args: - msg: The value to send. - - Raises: - SenderError: if the underlying channel was closed. - A [ChannelClosedError][frequenz.channels.ChannelClosedError] - is set as the cause. - """ - try: - await self._sender.send(msg) - except SenderError as err: - # If this comes from a channel error, then we inject another - # ChannelError having the information about the Bidirectional - # channel to hide (at least partially) the underlying - # Broadcast channels we use. - if isinstance(err.__cause__, ChannelError): - this_chan_error = ChannelError( - f"Error in the underlying channel {err.__cause__.channel}: {err.__cause__}", - self._chan, # pylint: disable=protected-access - ) - this_chan_error.__cause__ = err.__cause__ - err.__cause__ = this_chan_error - raise err - - async def ready(self) -> bool: - """Wait until the receiver is ready with a value or an error. - - Once a call to `ready()` has finished, the value should be read with - a call to `consume()` (`receive()` or iterated over). The receiver will - remain ready (this method will return immediately) until it is - consumed. - - Returns: - Whether the receiver is still active. - """ - return await self._receiver.ready() # pylint: disable=protected-access - - def consume(self) -> W: - """Return the latest value once `_ready` is complete. - - Returns: - The next value that was received. - - Raises: - ReceiverStoppedError: if there is some problem with the receiver. - ReceiverError: if there is some problem with the receiver. - """ - try: - return self._receiver.consume() # pylint: disable=protected-access - except ReceiverError as err: - # If this comes from a channel error, then we inject another - # ChannelError having the information about the Bidirectional - # channel to hide (at least partially) the underlying - # Broadcast channels we use. - if isinstance(err.__cause__, ChannelError): - this_chan_error = ChannelError( - f"Error in the underlying channel {err.__cause__.channel}: {err.__cause__}", - self._chan, # pylint: disable=protected-access - ) - this_chan_error.__cause__ = err.__cause__ - err.__cause__ = this_chan_error - raise err - - def __str__(self) -> str: - """Return a string representation of this handle.""" - return f"{type(self).__name__}:{self._chan}" - - def __repr__(self) -> str: - """Return a string representation of this handle.""" - return ( - f"{type(self).__name__}(channel={self._chan!r}, " - f"sender={self._sender!r}, receiver={self._receiver!r})" - ) - - def __init__(self, *, name: str) -> None: - """Create a `Bidirectional` instance. - - Args: - name: The name of the channel. This is used for logging, and it is used - in the string representation and to name the underlying channels. - """ - self._name: str = name - """The name for the client, used to name the channels.""" - - self._request_channel: Broadcast[T] = Broadcast(name=f"{self._name}:request") - """The channel to send requests.""" - - self._response_channel: Broadcast[U] = Broadcast(name=f"{self._name}:response") - """The channel to send responses.""" - - self._client_handle: Bidirectional.Handle[T, U] = Bidirectional.Handle( - self, - self._request_channel.new_sender(), - self._response_channel.new_receiver(), - ) - """The handle for the client side to send/receive values.""" - - self._service_handle: Bidirectional.Handle[U, T] = Bidirectional.Handle( - self, - self._response_channel.new_sender(), - self._request_channel.new_receiver(), - ) - """The handle for the service side to send/receive values.""" - - @property - def name(self) -> str: - """The name of this channel. - - This is for logging purposes, and it will be shown in the string representation - of this channel. - """ - return self._name - - @property - def is_closed(self) -> bool: - """Whether this channel is closed. - - Any further attempts to use this channel after it is closed will result in an - exception. - - As long as there is a way to send or receive data, the channel is considered - open, even if the other side is closed, so this returns `False` if only both - underlying channels are closed. - """ - return self._request_channel.is_closed and self._response_channel.is_closed - - @property - def client_handle(self) -> Bidirectional.Handle[T, U]: - """Get a `Handle` for the client side to use. - - Returns: - Object to send/receive messages with. - """ - return self._client_handle - - @property - def service_handle(self) -> Bidirectional.Handle[U, T]: - """Get a `Handle` for the service side to use. - - Returns: - Object to send/receive messages with. - """ - return self._service_handle - - def __str__(self) -> str: - """Return a string representation of this channel.""" - return f"{type(self).__name__}:{self._name}" - - def __repr__(self) -> str: - """Return a string representation of this channel.""" - return ( - f"{type(self).__name__}(name={self._name!r}):<" - f"request_channel={self._request_channel!r}, " - f"response_channel={self._response_channel!r}>" - ) diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index b7ca9d56..327d3a0c 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -9,23 +9,18 @@ import weakref from asyncio import Condition from collections import deque -from typing import Generic - -from ._base_classes import Peekable as BasePeekable -from ._base_classes import Receiver as BaseReceiver -from ._base_classes import Sender as BaseSender -from ._base_classes import T -from ._exceptions import ( - ChannelClosedError, - ReceiverInvalidatedError, - ReceiverStoppedError, - SenderError, -) +from typing import Generic, TypeVar + +from ._exceptions import ChannelClosedError +from ._receiver import Receiver, ReceiverStoppedError +from ._sender import Sender, SenderError _logger = logging.Logger(__name__) +_T = TypeVar("_T") + -class Broadcast(Generic[T]): +class Broadcast(Generic[_T]): """A channel to broadcast messages to multiple receivers. `Broadcast` channels can have multiple senders and multiple receivers. Each @@ -37,9 +32,9 @@ class Broadcast(Generic[T]): are thread-safe. Because of this, `Broadcast` channels are thread-safe. When there are multiple channel receivers, they can be awaited - simultaneously using [select][frequenz.channels.util.select], - [Merge][frequenz.channels.util.Merge] or - [MergeNamed][frequenz.channels.util.MergeNamed]. + simultaneously using [select][frequenz.channels.select], + [Merge][frequenz.channels.Merge] or + [MergeNamed][frequenz.channels.MergeNamed]. Example: ``` python @@ -93,13 +88,13 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None: self._recv_cv: Condition = Condition() """The condition to wait for data in the channel's buffer.""" - self._receivers: dict[int, weakref.ReferenceType[Receiver[T]]] = {} + self._receivers: dict[int, weakref.ReferenceType[_Receiver[_T]]] = {} """The receivers attached to the channel, indexed by their hash().""" self._closed: bool = False """Whether the channel is closed.""" - self._latest: T | None = None + self._latest: _T | None = None """The latest value sent to the channel.""" self.resend_latest: bool = resend_latest @@ -148,15 +143,15 @@ async def close(self) -> None: async with self._recv_cv: self._recv_cv.notify_all() - def new_sender(self) -> Sender[T]: + def new_sender(self) -> Sender[_T]: """Create a new broadcast sender. Returns: A Sender instance attached to the broadcast channel. """ - return Sender(self) + return _Sender(self) - def new_receiver(self, *, name: str | None = None, limit: int = 50) -> Receiver[T]: + def new_receiver(self, *, name: str | None = None, limit: int = 50) -> Receiver[_T]: """Create a new broadcast receiver. Broadcast receivers have their own buffer, and when messages are not @@ -170,24 +165,12 @@ def new_receiver(self, *, name: str | None = None, limit: int = 50) -> Receiver[ Returns: A Receiver instance attached to the broadcast channel. """ - recv: Receiver[T] = Receiver(name, limit, self) + recv: _Receiver[_T] = _Receiver(name, limit, self) self._receivers[hash(recv)] = weakref.ref(recv) if self.resend_latest and self._latest is not None: recv.enqueue(self._latest) return recv - def new_peekable(self) -> Peekable[T]: - """Create a new Peekable for the broadcast channel. - - A Peekable provides a [peek()][frequenz.channels.Peekable.peek] method - that allows the user to get a peek at the latest value in the channel, - without consuming anything. - - Returns: - A Peekable to peek into the broadcast channel with. - """ - return Peekable(self) - def __str__(self) -> str: """Return a string representation of this receiver.""" return f"{type(self).__name__}:{self._name}" @@ -203,7 +186,7 @@ def __repr__(self) -> str: ) -class Sender(BaseSender[T]): +class _Sender(Sender[_T]): """A sender to send messages to the broadcast channel. Should not be created directly, but through the @@ -211,16 +194,16 @@ class Sender(BaseSender[T]): method. """ - def __init__(self, chan: Broadcast[T]) -> None: + def __init__(self, chan: Broadcast[_T]) -> None: """Create a Broadcast sender. Args: chan: A reference to the broadcast channel this sender belongs to. """ - self._chan: Broadcast[T] = chan + self._chan: Broadcast[_T] = chan """The broadcast channel this sender belongs to.""" - async def send(self, msg: T) -> None: + async def send(self, msg: _T) -> None: """Send a message to all broadcast receivers. Args: @@ -259,7 +242,7 @@ def __repr__(self) -> str: return f"{type(self).__name__}({self._chan!r})" -class Receiver(BaseReceiver[T]): +class _Receiver(Receiver[_T]): """A receiver to receive messages from the broadcast channel. Should not be created directly, but through the @@ -267,7 +250,7 @@ class Receiver(BaseReceiver[T]): method. """ - def __init__(self, name: str | None, limit: int, chan: Broadcast[T]) -> None: + def __init__(self, name: str | None, limit: int, chan: Broadcast[_T]) -> None: """Create a broadcast receiver. Broadcast receivers have their own buffer, and when messages are not @@ -289,20 +272,13 @@ def __init__(self, name: str | None, limit: int, chan: Broadcast[T]) -> None: Only used for debugging purposes. """ - self._chan: Broadcast[T] = chan + self._chan: Broadcast[_T] = chan """The broadcast channel that this receiver belongs to.""" - self._q: deque[T] = deque(maxlen=limit) + self._q: deque[_T] = deque(maxlen=limit) """The receiver's internal message queue.""" - self._active: bool = True - """Whether the receiver is still active. - - If this receiver is converted into a Peekable, it will neither be - considered valid nor active. - """ - - def enqueue(self, msg: T) -> None: + def enqueue(self, msg: _T) -> None: """Put a message into this receiver's queue. To be called by broadcast senders. If the receiver's queue is already @@ -343,10 +319,6 @@ async def ready(self) -> bool: if self._q: return True - # if it is not longer active, return immediately - if not self._active: - return False - # Use a while loop here, to handle spurious wakeups of condition variables. # # The condition also makes sure that if there are already messages ready to be @@ -360,16 +332,7 @@ async def ready(self) -> bool: return True # pylint: enable=protected-access - def _deactivate(self) -> None: - """Set the receiver as inactive and remove it from the channel.""" - self._active = False - # pylint: disable=protected-access - _hash = hash(self) - if _hash in self._chan._receivers: - del self._chan._receivers[_hash] - # pylint: enable=protected-access - - def consume(self) -> T: + def consume(self) -> _T: """Return the latest value once `ready` is complete. Returns: @@ -377,34 +340,13 @@ def consume(self) -> T: Raises: ReceiverStoppedError: if there is some problem with the receiver. - ReceiverInvalidatedError: if the receiver was converted into - a peekable. """ - if not self._q and not self._active: - raise ReceiverInvalidatedError( - "This receiver was converted into a Peekable so it is not longer valid.", - self, - ) - if not self._q and self._chan._closed: # pylint: disable=protected-access raise ReceiverStoppedError(self) from ChannelClosedError(self._chan) assert self._q, "`consume()` must be preceded by a call to `ready()`" return self._q.popleft() - def into_peekable(self) -> Peekable[T]: - """Convert the `Receiver` implementation into a `Peekable`. - - Once this function has been called, the receiver will no longer be - usable, and calling [receive()][frequenz.channels.Receiver.receive] on - the receiver will raise an exception. - - Returns: - A `Peekable` instance. - """ - self._deactivate() - return Peekable(self._chan) - def __str__(self) -> str: """Return a string representation of this receiver.""" return f"{self._chan}:{type(self).__name__}" @@ -415,41 +357,5 @@ def __repr__(self) -> str: assert limit is not None return ( f"{type(self).__name__}(name={self._name!r}, limit={limit!r}, " - f"{self._chan!r}):" + f"{self._chan!r}):" ) - - -class Peekable(BasePeekable[T]): - """A Peekable to peek into broadcast channels. - - A Peekable provides a [peek()][frequenz.channels.Peekable] method that - allows the user to get a peek at the latest value in the channel, without - consuming anything. - """ - - def __init__(self, chan: Broadcast[T]) -> None: - """Create a `Peekable` instance. - - Args: - chan: The broadcast channel this Peekable will try to peek into. - """ - self._chan: Broadcast[T] = chan - """The broadcast channel this Peekable will try to peek into.""" - - def peek(self) -> T | None: - """Return the latest value that was sent to the channel. - - Returns: - The latest value received by the channel, and `None`, if nothing - has been sent to the channel yet, or if the channel is closed. - """ - return self._chan._latest # pylint: disable=protected-access - - def __str__(self) -> str: - """Return a string representation of this receiver.""" - return f"{self._chan}:{type(self).__name__}" - - def __repr__(self) -> str: - """Return a string representation of this receiver.""" - return f"{type(self).__name__}({self._chan!r}):" diff --git a/src/frequenz/channels/_exceptions.py b/src/frequenz/channels/_exceptions.py index 921c6237..559b2df7 100644 --- a/src/frequenz/channels/_exceptions.py +++ b/src/frequenz/channels/_exceptions.py @@ -1,16 +1,9 @@ # License: MIT # Copyright © 2022 Frequenz Energy-as-a-Service GmbH -"""Exception classes.""" +"""Base exception classes.""" -from __future__ import annotations - -from typing import TYPE_CHECKING, Any, Generic, TypeVar - -if TYPE_CHECKING: - from . import _base_classes - -T = TypeVar("T") +from typing import Any class Error(RuntimeError): @@ -56,63 +49,3 @@ def __init__(self, channel: Any): channel: A reference to the channel that was closed. """ super().__init__(f"Channel {channel} was closed", channel) - - -class SenderError(Error, Generic[T]): - """An error produced in a [Sender][frequenz.channels.Sender]. - - All exceptions generated by senders inherit from this exception. - """ - - def __init__(self, message: str, sender: _base_classes.Sender[T]): - """Create an instance. - - Args: - message: An error message. - sender: The [Sender][frequenz.channels.Sender] where the error - happened. - """ - super().__init__(message) - self.sender: _base_classes.Sender[T] = sender - """The sender where the error happened.""" - - -class ReceiverError(Error, Generic[T]): - """An error produced in a [Receiver][frequenz.channels.Receiver]. - - All exceptions generated by receivers inherit from this exception. - """ - - def __init__(self, message: str, receiver: _base_classes.Receiver[T]): - """Create an instance. - - Args: - message: An error message. - receiver: The [Receiver][frequenz.channels.Receiver] where the - error happened. - """ - super().__init__(message) - self.receiver: _base_classes.Receiver[T] = receiver - """The receiver where the error happened.""" - - -class ReceiverStoppedError(ReceiverError[T]): - """The [Receiver][frequenz.channels.Receiver] stopped producing messages.""" - - def __init__(self, receiver: _base_classes.Receiver[T]): - """Create an instance. - - Args: - receiver: The [Receiver][frequenz.channels.Receiver] where the - error happened. - """ - super().__init__(f"Receiver {receiver} was stopped", receiver) - - -class ReceiverInvalidatedError(ReceiverError[T]): - """The [Receiver][frequenz.channels.Receiver] was invalidated. - - This happens when the Receiver is converted - [into][frequenz.channels.Receiver.into_peekable] - a [Peekable][frequenz.channels.Peekable]. - """ diff --git a/src/frequenz/channels/util/_merge.py b/src/frequenz/channels/_merge.py similarity index 93% rename from src/frequenz/channels/util/_merge.py rename to src/frequenz/channels/_merge.py index 5baeea31..00461ed9 100644 --- a/src/frequenz/channels/util/_merge.py +++ b/src/frequenz/channels/_merge.py @@ -6,13 +6,14 @@ import asyncio import itertools from collections import deque -from typing import Any +from typing import Any, TypeVar -from .._base_classes import Receiver, T -from .._exceptions import ReceiverStoppedError +from ._receiver import Receiver, ReceiverStoppedError +_T = TypeVar("_T") -class Merge(Receiver[T]): + +class Merge(Receiver[_T]): """Merge messages coming from multiple channels into a single stream. Example: @@ -38,20 +39,20 @@ class Merge(Receiver[T]): `self.stop()` method. This will cleanup any internal pending async tasks. """ - def __init__(self, *args: Receiver[T]) -> None: + def __init__(self, *args: Receiver[_T]) -> None: """Create a `Merge` instance. Args: *args: sequence of channel receivers. """ - self._receivers: dict[str, Receiver[T]] = { + self._receivers: dict[str, Receiver[_T]] = { str(id): recv for id, recv in enumerate(args) } self._pending: set[asyncio.Task[Any]] = { asyncio.create_task(anext(recv), name=name) for name, recv in self._receivers.items() } - self._results: deque[T] = deque(maxlen=len(self._receivers)) + self._results: deque[_T] = deque(maxlen=len(self._receivers)) def __del__(self) -> None: """Cleanup any pending tasks.""" @@ -102,7 +103,7 @@ async def ready(self) -> bool: asyncio.create_task(anext(self._receivers[name]), name=name) ) - def consume(self) -> T: + def consume(self) -> _T: """Return the latest value once `ready` is complete. Returns: diff --git a/src/frequenz/channels/util/_merge_named.py b/src/frequenz/channels/_merge_named.py similarity index 91% rename from src/frequenz/channels/util/_merge_named.py rename to src/frequenz/channels/_merge_named.py index 271f72da..fb08c867 100644 --- a/src/frequenz/channels/util/_merge_named.py +++ b/src/frequenz/channels/_merge_named.py @@ -5,26 +5,27 @@ import asyncio import itertools from collections import deque -from typing import Any +from typing import Any, TypeVar -from .._base_classes import Receiver, T -from .._exceptions import ReceiverStoppedError +from ._receiver import Receiver, ReceiverStoppedError +_T = TypeVar("_T") -class MergeNamed(Receiver[tuple[str, T]]): + +class MergeNamed(Receiver[tuple[str, _T]]): """Merge messages coming from multiple named channels into a single stream. When `MergeNamed` is no longer needed, then it should be stopped using `self.stop()` method. This will cleanup any internal pending async tasks. """ - def __init__(self, **kwargs: Receiver[T]) -> None: + def __init__(self, **kwargs: Receiver[_T]) -> None: """Create a `MergeNamed` instance. Args: **kwargs: sequence of channel receivers. """ - self._receivers: dict[str, Receiver[T]] = kwargs + self._receivers: dict[str, Receiver[_T]] = kwargs """The sequence of channel receivers to get the messages to merge.""" self._pending: set[asyncio.Task[Any]] = { @@ -33,7 +34,7 @@ def __init__(self, **kwargs: Receiver[T]) -> None: } """The set of pending tasks to merge messages.""" - self._results: deque[tuple[str, T]] = deque(maxlen=len(self._receivers)) + self._results: deque[tuple[str, _T]] = deque(maxlen=len(self._receivers)) """The internal buffer of merged messages.""" def __del__(self) -> None: @@ -86,7 +87,7 @@ async def ready(self) -> bool: asyncio.create_task(anext(self._receivers[name]), name=name) ) - def consume(self) -> tuple[str, T]: + def consume(self) -> tuple[str, _T]: """Return the latest value once `ready` is complete. Returns: diff --git a/src/frequenz/channels/_base_classes.py b/src/frequenz/channels/_receiver.py similarity index 70% rename from src/frequenz/channels/_base_classes.py rename to src/frequenz/channels/_receiver.py index 55a13644..d3308837 100644 --- a/src/frequenz/channels/_base_classes.py +++ b/src/frequenz/channels/_receiver.py @@ -1,39 +1,24 @@ # License: MIT # Copyright © 2022 Frequenz Energy-as-a-Service GmbH -"""Base classes for Channel Sender and Receiver.""" +"""Channel receiver and associated exceptions.""" from __future__ import annotations from abc import ABC, abstractmethod from collections.abc import Callable -from typing import Generic, TypeVar +from typing import Generic, Self, TypeVar -from ._exceptions import ReceiverStoppedError +from ._exceptions import Error -T = TypeVar("T") -U = TypeVar("U") +_T = TypeVar("_T") +_U = TypeVar("_U") -class Sender(ABC, Generic[T]): - """A channel Sender.""" - - @abstractmethod - async def send(self, msg: T) -> None: - """Send a message to the channel. - - Args: - msg: The message to be sent. - - Raises: - SenderError: if there was an error sending the message. - """ - - -class Receiver(ABC, Generic[T]): +class Receiver(ABC, Generic[_T]): """A channel Receiver.""" - async def __anext__(self) -> T: + async def __anext__(self) -> _T: """Await the next value in the async iteration over received values. Returns: @@ -63,7 +48,7 @@ async def ready(self) -> bool: """ @abstractmethod - def consume(self) -> T: + def consume(self) -> _T: """Return the latest value once `ready()` is complete. `ready()` must be called before each call to `consume()`. @@ -76,7 +61,7 @@ def consume(self) -> T: ReceiverError: if there is some problem with the receiver. """ - def __aiter__(self) -> Receiver[T]: + def __aiter__(self) -> Self: """Initialize the async iterator over received values. Returns: @@ -84,7 +69,7 @@ def __aiter__(self) -> Receiver[T]: """ return self - async def receive(self) -> T: + async def receive(self) -> _T: """Receive a message from the channel. Returns: @@ -111,7 +96,7 @@ async def receive(self) -> T: raise ReceiverStoppedError(self) from exc return received - def map(self, call: Callable[[T], U]) -> Receiver[U]: + def map(self, call: Callable[[_T], _U]) -> Receiver[_U]: """Return a receiver with `call` applied on incoming messages. Args: @@ -122,42 +107,40 @@ def map(self, call: Callable[[T], U]) -> Receiver[U]: """ return _Map(self, call) - def into_peekable(self) -> Peekable[T]: - """Convert the `Receiver` implementation into a `Peekable`. - Once this function has been called, the receiver will no longer be - usable, and calling `receive` on the receiver will raise an exception. +class ReceiverError(Error, Generic[_T]): + """An error produced in a [Receiver][frequenz.channels.Receiver]. - Returns: - A `Peekable` that can be used to peek at the latest value in the - channel. + All exceptions generated by receivers inherit from this exception. + """ - Raises: - NotImplementedError: when a `Receiver` implementation doesn't have - a custom `into_peekable` implementation. + def __init__(self, message: str, receiver: Receiver[_T]): + """Create an instance. + + Args: + message: An error message. + receiver: The [Receiver][frequenz.channels.Receiver] where the + error happened. """ - raise NotImplementedError("This receiver does not implement `into_peekable`") + super().__init__(message) + self.receiver: Receiver[_T] = receiver + """The receiver where the error happened.""" -class Peekable(ABC, Generic[T]): - """A channel peekable. +class ReceiverStoppedError(ReceiverError[_T]): + """The [Receiver][frequenz.channels.Receiver] stopped producing messages.""" - A Peekable provides a [peek()][frequenz.channels.Peekable] method that - allows the user to get a peek at the latest value in the channel, without - consuming anything. - """ + def __init__(self, receiver: Receiver[_T]): + """Create an instance. - @abstractmethod - def peek(self) -> T | None: - """Return the latest value that was sent to the channel. - - Returns: - The latest value received by the channel, and `None`, if nothing - has been sent to the channel yet. + Args: + receiver: The [Receiver][frequenz.channels.Receiver] where the + error happened. """ + super().__init__(f"Receiver {receiver} was stopped", receiver) -class _Map(Receiver[U], Generic[T, U]): +class _Map(Receiver[_U], Generic[_T, _U]): """Apply a transform function on a channel receiver. Has two generic types: @@ -166,17 +149,17 @@ class _Map(Receiver[U], Generic[T, U]): - The output type: return type of the transform method. """ - def __init__(self, receiver: Receiver[T], transform: Callable[[T], U]) -> None: + def __init__(self, receiver: Receiver[_T], transform: Callable[[_T], _U]) -> None: """Create a `Transform` instance. Args: receiver: The input receiver. transform: The function to run on the input data. """ - self._receiver: Receiver[T] = receiver + self._receiver: Receiver[_T] = receiver """The input receiver.""" - self._transform: Callable[[T], U] = transform + self._transform: Callable[[_T], _U] = transform """The function to run on the input data.""" async def ready(self) -> bool: @@ -194,7 +177,7 @@ async def ready(self) -> bool: # We need a noqa here because the docs have a Raises section but the code doesn't # explicitly raise anything. - def consume(self) -> U: # noqa: DOC502 + def consume(self) -> _U: # noqa: DOC502 """Return a transformed value once `ready()` is complete. Returns: diff --git a/src/frequenz/channels/util/_select.py b/src/frequenz/channels/_select.py similarity index 88% rename from src/frequenz/channels/util/_select.py rename to src/frequenz/channels/_select.py index ac6e59ad..a7ec4c90 100644 --- a/src/frequenz/channels/util/_select.py +++ b/src/frequenz/channels/_select.py @@ -12,43 +12,43 @@ from collections.abc import AsyncIterator from typing import Any, Generic, TypeGuard, TypeVar -from .._base_classes import Receiver -from .._exceptions import ReceiverStoppedError +from ._receiver import Receiver, ReceiverStoppedError _T = TypeVar("_T") +class _EmptyResult: + """A sentinel value to distinguish between None and empty result. + + We need a sentinel because a result can also be `None`. + """ + + def __repr__(self) -> str: + return "" + + class Selected(Generic[_T]): - """A result of a [`select()`][frequenz.channels.util.select] iteration. + """A result of a [`select()`][frequenz.channels.select] iteration. The selected receiver is consumed immediately and the received value is stored in the instance, unless there was an exception while receiving the value, in which case the exception is stored instead. `Selected` instances should be used in conjunction with the - [`selected_from()`][frequenz.channels.util.selected_from] function to determine + [`selected_from()`][frequenz.channels.selected_from] function to determine which receiver was selected. - Please see [`select()`][frequenz.channels.util.select] for an example. + Please see [`select()`][frequenz.channels.select] for an example. """ - class _EmptyResult: - """A sentinel value to distinguish between None and empty result. - - We need a sentinel because a result can also be `None`. - """ - - def __repr__(self) -> str: - return "" - def __init__(self, receiver: Receiver[_T]) -> None: """Create a new instance. The receiver is consumed immediately when creating the instance and the received value is stored in the instance for later use as - [`value`][frequenz.channels.util.Selected.value]. If there was an exception + [`value`][frequenz.channels.Selected.value]. If there was an exception while receiving the value, then the exception is stored in the instance instead - (as [`exception`][frequenz.channels.util.Selected.exception]). + (as [`exception`][frequenz.channels.Selected.exception]). Args: receiver: The receiver that was selected. @@ -56,7 +56,7 @@ def __init__(self, receiver: Receiver[_T]) -> None: self._recv: Receiver[_T] = receiver """The receiver that was selected.""" - self._value: _T | Selected._EmptyResult = Selected._EmptyResult() + self._value: _T | _EmptyResult = _EmptyResult() """The value that was received. If there was an exception while receiving the value, then this will be `None`. @@ -87,7 +87,7 @@ def value(self) -> _T: """ if self._exception is not None: raise self._exception - assert not isinstance(self._value, Selected._EmptyResult) + assert not isinstance(self._value, _EmptyResult) return self._value @property @@ -140,16 +140,16 @@ def __repr__(self) -> str: def selected_from( selected: Selected[Any], receiver: Receiver[_T] ) -> TypeGuard[Selected[_T]]: - """Check if the given receiver was selected by [`select()`][frequenz.channels.util.select]. + """Check if the given receiver was selected by [`select()`][frequenz.channels.select]. This function is used in conjunction with the - [`Selected`][frequenz.channels.util.Selected] class to determine which receiver was + [`Selected`][frequenz.channels.Selected] class to determine which receiver was selected in `select()` iteration. It also works as a [type guard][typing.TypeGuard] to narrow the type of the `Selected` instance to the type of the receiver. - Please see [`select()`][frequenz.channels.util.select] for an example. + Please see [`select()`][frequenz.channels.select] for an example. Args: selected: The result of a `select()` iteration. @@ -164,21 +164,21 @@ def selected_from( class SelectError(BaseException): - """A base exception for [`select()`][frequenz.channels.util.select]. + """A base exception for [`select()`][frequenz.channels.select]. This exception is raised when a `select()` iteration fails. It is raised as a single exception when one receiver fails during normal operation (while calling `ready()` for example). It is raised as a group exception - ([`SelectErrorGroup`][frequenz.channels.util.SelectErrorGroup]) when a `select` loop + ([`SelectErrorGroup`][frequenz.channels.SelectErrorGroup]) when a `select` loop is cleaning up after it's done. """ class UnhandledSelectedError(SelectError, Generic[_T]): - """A receiver was not handled in a [`select()`][frequenz.channels.util.select] loop. + """A receiver was not handled in a [`select()`][frequenz.channels.select] loop. This exception is raised when a `select()` iteration finishes without a call to - [`selected_from()`][frequenz.channels.util.selected_from] for the selected receiver. + [`selected_from()`][frequenz.channels.selected_from] for the selected receiver. """ def __init__(self, selected: Selected[_T]) -> None: @@ -194,7 +194,7 @@ def __init__(self, selected: Selected[_T]) -> None: class SelectErrorGroup(BaseExceptionGroup[BaseException], SelectError): - """An exception group for [`select()`][frequenz.channels.util.select] operation. + """An exception group for [`select()`][frequenz.channels.select] operation. This exception group is raised when a `select()` loops fails while cleaning up running tests to check for ready receivers. @@ -243,8 +243,8 @@ async def select(*receivers: Receiver[Any]) -> AsyncIterator[Selected[Any]]: This function is used to iterate over the values of all receivers as they receive new values. It is used in conjunction with the - [`Selected`][frequenz.channels.util.Selected] class and the - [`selected_from()`][frequenz.channels.util.selected_from] function to determine + [`Selected`][frequenz.channels.Selected] class and the + [`selected_from()`][frequenz.channels.selected_from] function to determine which function to determine which receiver was selected in a select operation. An exhaustiveness check is performed at runtime to make sure all selected receivers @@ -258,8 +258,8 @@ async def select(*receivers: Receiver[Any]) -> AsyncIterator[Selected[Any]]: receivers from a select loop, there are a few alternatives. Depending on your use case, one or the other could work better for you: - * Use [`Merge`][frequenz.channels.util.Merge] or - [`MergeNamed`][frequenz.channels.util.MergeNamed]: this is useful when you + * Use [`Merge`][frequenz.channels.Merge] or + [`MergeNamed`][frequenz.channels.MergeNamed]: this is useful when you have and unknown number of receivers of the same type that can be handled as a group. * Use tasks to manage each receiver individually: this is better if there are no @@ -273,8 +273,8 @@ async def select(*receivers: Receiver[Any]) -> AsyncIterator[Selected[Any]]: import datetime from typing import assert_never - from frequenz.channels import ReceiverStoppedError - from frequenz.channels.util import select, selected_from, Timer + from frequenz.channels import ReceiverStoppedError, select, selected_from + from frequenz.channels.timer import Timer timer1 = Timer.periodic(datetime.timedelta(seconds=1)) timer2 = Timer.timeout(datetime.timedelta(seconds=0.5)) diff --git a/src/frequenz/channels/_sender.py b/src/frequenz/channels/_sender.py new file mode 100644 index 00000000..8cf00f86 --- /dev/null +++ b/src/frequenz/channels/_sender.py @@ -0,0 +1,45 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Channel sender and associated exceptions.""" + +from abc import ABC, abstractmethod +from typing import Generic, TypeVar + +from ._exceptions import Error + +_T = TypeVar("_T") + + +class Sender(ABC, Generic[_T]): + """A channel Sender.""" + + @abstractmethod + async def send(self, msg: _T) -> None: + """Send a message to the channel. + + Args: + msg: The message to be sent. + + Raises: + SenderError: if there was an error sending the message. + """ + + +class SenderError(Error, Generic[_T]): + """An error produced in a [Sender][frequenz.channels.Sender]. + + All exceptions generated by senders inherit from this exception. + """ + + def __init__(self, message: str, sender: Sender[_T]): + """Create an instance. + + Args: + message: An error message. + sender: The [Sender][frequenz.channels.Sender] where the error + happened. + """ + super().__init__(message) + self.sender: Sender[_T] = sender + """The sender where the error happened.""" diff --git a/src/frequenz/channels/util/_event.py b/src/frequenz/channels/event.py similarity index 89% rename from src/frequenz/channels/util/_event.py rename to src/frequenz/channels/event.py index ca43e262..f546555e 100644 --- a/src/frequenz/channels/util/_event.py +++ b/src/frequenz/channels/event.py @@ -6,25 +6,25 @@ import asyncio as _asyncio -from frequenz.channels import _base_classes, _exceptions +from frequenz.channels import _receiver -class Event(_base_classes.Receiver[None]): +class Event(_receiver.Receiver[None]): """A receiver that can be made ready through an event. - The receiver (the [`ready()`][frequenz.channels.util.Event.ready] method) will wait - until [`set()`][frequenz.channels.util.Event.set] is called. At that point the + The receiver (the [`ready()`][frequenz.channels.event.Event.ready] method) will wait + until [`set()`][frequenz.channels.event.Event.set] is called. At that point the receiver will wait again after the event is [`consume()`][frequenz.channels.Receiver.consume]d. The receiver can be completely stopped by calling - [`stop()`][frequenz.channels.util.Event.stop]. + [`stop()`][frequenz.channels.event.Event.stop]. Example: ```python import asyncio - from frequenz.channels import Receiver - from frequenz.channels.util import Event, select, selected_from + from frequenz.channels import Receiver, select, selected_from + from frequenz.channels.event import Event other_receiver: Receiver[int] = ... exit_event = Event() @@ -134,7 +134,7 @@ def consume(self) -> None: ReceiverStoppedError: If this receiver is stopped. """ if not self._is_set and self._is_stopped: - raise _exceptions.ReceiverStoppedError(self) + raise _receiver.ReceiverStoppedError(self) assert self._is_set, "calls to `consume()` must be follow a call to `ready()`" diff --git a/src/frequenz/channels/util/_file_watcher.py b/src/frequenz/channels/file_watcher.py similarity index 81% rename from src/frequenz/channels/util/_file_watcher.py rename to src/frequenz/channels/file_watcher.py index 5291a0f9..64bc62e1 100644 --- a/src/frequenz/channels/util/_file_watcher.py +++ b/src/frequenz/channels/file_watcher.py @@ -3,8 +3,6 @@ """A Channel receiver for watching for new, modified or deleted files.""" -from __future__ import annotations - import asyncio import pathlib from collections import abc @@ -14,33 +12,34 @@ from watchfiles import Change, awatch from watchfiles.main import FileChange -from .._base_classes import Receiver -from .._exceptions import ReceiverStoppedError +from ._receiver import Receiver, ReceiverStoppedError -class FileWatcher(Receiver["FileWatcher.Event"]): - """A channel receiver that watches for file events.""" +class EventType(Enum): + """Available types of changes to watch for.""" + + CREATE = Change.added + """A new file was created.""" - class EventType(Enum): - """Available types of changes to watch for.""" + MODIFY = Change.modified + """An existing file was modified.""" - CREATE = Change.added - """A new file was created.""" + DELETE = Change.deleted + """An existing file was deleted.""" - MODIFY = Change.modified - """An existing file was modified.""" - DELETE = Change.deleted - """An existing file was deleted.""" +@dataclass(frozen=True) +class Event: + """A file change event.""" - @dataclass(frozen=True) - class Event: - """A file change event.""" + type: EventType + """The type of change that was observed.""" + path: pathlib.Path + """The path where the change was observed.""" - type: FileWatcher.EventType - """The type of change that was observed.""" - path: pathlib.Path - """The path where the change was observed.""" + +class FileWatcher(Receiver[Event]): + """A channel receiver that watches for file events.""" def __init__( self, @@ -54,7 +53,7 @@ def __init__( event_types: Types of events to watch for. Defaults to watch for all event types. """ - self.event_types: frozenset[FileWatcher.EventType] = frozenset(event_types) + self.event_types: frozenset[EventType] = frozenset(event_types) """The types of events to watch for.""" self._stop_event: asyncio.Event = asyncio.Event() @@ -134,9 +133,7 @@ def consume(self) -> Event: assert self._changes, "`consume()` must be preceded by a call to `ready()`" # Tuple of (Change, path) returned by watchfiles change, path_str = self._changes.pop() - return FileWatcher.Event( - type=FileWatcher.EventType(change), path=pathlib.Path(path_str) - ) + return Event(type=EventType(change), path=pathlib.Path(path_str)) def __str__(self) -> str: """Return a string representation of this receiver.""" diff --git a/src/frequenz/channels/util/_timer.py b/src/frequenz/channels/timer.py similarity index 95% rename from src/frequenz/channels/util/_timer.py rename to src/frequenz/channels/timer.py index 9b0fd712..18db2beb 100644 --- a/src/frequenz/channels/util/_timer.py +++ b/src/frequenz/channels/timer.py @@ -17,8 +17,7 @@ import asyncio from datetime import timedelta -from .._base_classes import Receiver -from .._exceptions import ReceiverStoppedError +from ._receiver import Receiver, ReceiverStoppedError def _to_microseconds(time: float | timedelta) -> int: @@ -271,7 +270,7 @@ class Timer(Receiver[timedelta]): """A timer receiver that triggers every `interval` time. The timer has microseconds resolution, so the - [`interval`][frequenz.channels.util.Timer.interval] must be at least + [`interval`][frequenz.channels.timer.Timer.interval] must be at least 1 microsecond. The message it produces is a [`timedelta`][datetime.timedelta] containing the drift @@ -284,34 +283,34 @@ class Timer(Receiver[timedelta]): as the timer uses [`asyncio`][asyncio]s loop monotonic clock. If the timer is delayed too much, then it will behave according to the - [`missed_tick_policy`][frequenz.channels.util.Timer.missed_tick_policy]. Missing + [`missed_tick_policy`][frequenz.channels.timer.Timer.missed_tick_policy]. Missing ticks might or might not trigger a message and the drift could be accumulated or not depending on the chosen policy. These are the currently built-in available policies: - * [`SkipMissedAndDrift`][frequenz.channels.util.SkipMissedAndDrift] - * [`SkipMissedAndResync`][frequenz.channels.util.SkipMissedAndResync] - * [`TriggerAllMissed`][frequenz.channels.util.TriggerAllMissed] + * [`SkipMissedAndDrift`][frequenz.channels.timer.SkipMissedAndDrift] + * [`SkipMissedAndResync`][frequenz.channels.timer.SkipMissedAndResync] + * [`TriggerAllMissed`][frequenz.channels.timer.TriggerAllMissed] For the most common cases, a specialized constructor is provided: - * [`periodic()`][frequenz.channels.util.Timer.periodic] (uses the - [`TriggerAllMissed`][frequenz.channels.util.TriggerAllMissed] or - [`SkipMissedAndResync`][frequenz.channels.util.SkipMissedAndResync] policy) - * [`timeout()`][frequenz.channels.util.Timer.timeout] (uses the - [`SkipMissedAndDrift`][frequenz.channels.util.SkipMissedAndDrift] policy) + * [`periodic()`][frequenz.channels.timer.Timer.periodic] (uses the + [`TriggerAllMissed`][frequenz.channels.timer.TriggerAllMissed] or + [`SkipMissedAndResync`][frequenz.channels.timer.SkipMissedAndResync] policy) + * [`timeout()`][frequenz.channels.timer.Timer.timeout] (uses the + [`SkipMissedAndDrift`][frequenz.channels.timer.SkipMissedAndDrift] policy) - The timer accepts an optional [`loop`][frequenz.channels.util.Timer.loop], which + The timer accepts an optional [`loop`][frequenz.channels.timer.Timer.loop], which will be used to track the time. If `loop` is `None`, then the running loop will be used (if there is no running loop most calls will raise a [`RuntimeError`][RuntimeError]). Starting the timer can be delayed if necessary by using `auto_start=False` (for example until we have a running loop). A call to - [`reset()`][frequenz.channels.util.Timer.reset], - [`ready()`][frequenz.channels.util.Timer.ready], - [`receive()`][frequenz.channels.util.Timer.receive] or the async iterator interface + [`reset()`][frequenz.channels.timer.Timer.reset], + [`ready()`][frequenz.channels.timer.Timer.ready], + [`receive()`][frequenz.channels.timer.Timer.receive] or the async iterator interface to await for a new message will start the timer. Example: Periodic timer example @@ -320,12 +319,11 @@ class Timer(Receiver[timedelta]): print(f"The timer has triggered {drift=}") ``` - But you can also use a [`select`][frequenz.channels.util.select] to combine + But you can also use a [`select`][frequenz.channels.select] to combine it with other receivers, and even start it (semi) manually: ```python - from frequenz.channels.util import select, selected_from - from frequenz.channels import Broadcast + from frequenz.channels import Broadcast, select, selected_from timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False) chan = Broadcast[int](name="input-chan") @@ -347,8 +345,7 @@ class Timer(Receiver[timedelta]): Example: Timeout example ```python - from frequenz.channels.util import select, selected_from - from frequenz.channels import Broadcast + from frequenz.channels import Broadcast, select, selected_from def process_data(data: int): print(f"Processing data: {data}") diff --git a/src/frequenz/channels/util/__init__.py b/src/frequenz/channels/util/__init__.py deleted file mode 100644 index 515e1ac2..00000000 --- a/src/frequenz/channels/util/__init__.py +++ /dev/null @@ -1,66 +0,0 @@ -# License: MIT -# Copyright © 2022 Frequenz Energy-as-a-Service GmbH - -"""Channel utilities. - -A module with several utilities to work with channels: - -* [Event][frequenz.channels.util.Event]: - A [receiver][frequenz.channels.Receiver] that can be made ready through an event. - -* [FileWatcher][frequenz.channels.util.FileWatcher]: - A [receiver][frequenz.channels.Receiver] that watches for file events. - -* [Merge][frequenz.channels.util.Merge]: - A [receiver][frequenz.channels.Receiver] that merge messages coming from - multiple receivers into a single stream. - -* [MergeNamed][frequenz.channels.util.MergeNamed]: - A [receiver][frequenz.channels.Receiver] that merge messages coming from - multiple receivers into a single named stream, allowing to identify the - origin of each message. - -* [Timer][frequenz.channels.util.Timer]: - A [receiver][frequenz.channels.Receiver] that ticks at certain intervals. - -* [select][frequenz.channels.util.select]: Iterate over the values of all - [receivers][frequenz.channels.Receiver] as new values become available. -""" - -from ._event import Event -from ._file_watcher import FileWatcher -from ._merge import Merge -from ._merge_named import MergeNamed -from ._select import ( - Selected, - SelectError, - SelectErrorGroup, - UnhandledSelectedError, - select, - selected_from, -) -from ._timer import ( - MissedTickPolicy, - SkipMissedAndDrift, - SkipMissedAndResync, - Timer, - TriggerAllMissed, -) - -__all__ = [ - "Event", - "FileWatcher", - "Merge", - "MergeNamed", - "MissedTickPolicy", - "SelectError", - "SelectErrorGroup", - "Selected", - "SkipMissedAndDrift", - "SkipMissedAndResync", - "Timer", - "TriggerAllMissed", - "UnhandledSelectedError", - "select", - "selected_from", -] diff --git a/tests/test_bidirectional.py b/tests/test_bidirectional.py deleted file mode 100644 index 30d59335..00000000 --- a/tests/test_bidirectional.py +++ /dev/null @@ -1,83 +0,0 @@ -# License: MIT -# Copyright © 2022 Frequenz Energy-as-a-Service GmbH - -"""Tests for the RequestResponse implementation.""" - -import asyncio - -import pytest - -from frequenz.channels import ( - Bidirectional, - ChannelClosedError, - ChannelError, - ReceiverError, - SenderError, -) - - -async def test_request_response() -> None: - """Ensure bi-directional communication is possible.""" - req_resp: Bidirectional[int, str] = Bidirectional(name="test_service") - - async def service(handle: Bidirectional.Handle[str, int]) -> None: - while True: - num = await handle.receive() - if num is None: - break - if num == 42: - break - if num >= 0: - await handle.send("positive") - else: - await handle.send("negative") - - service_task = asyncio.create_task( - service(req_resp.service_handle), - ) - - client_handle: Bidirectional.Handle[int, str] = req_resp.client_handle - - for ctr in range(-5, 5): - await client_handle.send(ctr) - ret = await client_handle.receive() - if ctr < 0: - assert ret == "negative" - else: - assert ret == "positive" - - await client_handle.send(42) # Stop the service task - await service_task - - -async def test_sender_error_chaining() -> None: - """Ensure bi-directional communication is possible.""" - req_resp: Bidirectional[int, str] = Bidirectional(name="test_service") - - await req_resp._response_channel.close() # pylint: disable=protected-access - - with pytest.raises(SenderError, match="The channel was closed") as exc_info: - await req_resp.service_handle.send("I'm closed!") - - err = exc_info.value - cause = err.__cause__ - assert isinstance(cause, ChannelError) - assert cause.args[0].startswith("Error in the underlying channel") - assert isinstance(cause.__cause__, ChannelClosedError) - - -async def test_consume_error_chaining() -> None: - """Ensure bi-directional communication is possible.""" - req_resp: Bidirectional[int, str] = Bidirectional(name="test_service") - - await req_resp._request_channel.close() # pylint: disable=protected-access - - await req_resp.service_handle.ready() - with pytest.raises(ReceiverError, match="Receiver .* was stopped") as exc_info: - _ = req_resp.service_handle.consume() - - err = exc_info.value - cause = err.__cause__ - assert isinstance(cause, ChannelError) - assert cause.args[0].startswith("Error in the underlying channel") - assert isinstance(cause.__cause__, ChannelClosedError) diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index 8d1f4aec..5319d4bf 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -12,7 +12,6 @@ Broadcast, ChannelClosedError, Receiver, - ReceiverInvalidatedError, ReceiverStoppedError, Sender, SenderError, @@ -105,6 +104,10 @@ async def test_broadcast_after_close() -> None: async def test_broadcast_overflow() -> None: """Ensure messages sent to full broadcast receivers get dropped.""" + from frequenz.channels._broadcast import ( # pylint: disable=import-outside-toplevel + _Receiver, + ) + bcast: Broadcast[int] = Broadcast(name="meter_5") big_recv_size = 10 @@ -112,7 +115,9 @@ async def test_broadcast_overflow() -> None: sender = bcast.new_sender() big_receiver = bcast.new_receiver(name="named-recv", limit=big_recv_size) + assert isinstance(big_receiver, _Receiver) small_receiver = bcast.new_receiver(limit=small_recv_size) + assert isinstance(small_receiver, _Receiver) async def drain_receivers() -> tuple[int, int]: big_sum = 0 @@ -187,32 +192,6 @@ async def test_broadcast_no_resend_latest() -> None: assert await new_recv.receive() == 100 -async def test_broadcast_peek() -> None: - """Ensure we are able to peek into broadcast channels.""" - bcast: Broadcast[int] = Broadcast(name="peek-test") - receiver = bcast.new_receiver() - peekable = receiver.into_peekable() - sender = bcast.new_sender() - - with pytest.raises(ReceiverInvalidatedError): - await receiver.receive() - - assert peekable.peek() is None - - for val in range(0, 10): - await sender.send(val) - - assert peekable.peek() == 9 - assert peekable.peek() == 9 - - await sender.send(20) - - assert peekable.peek() == 20 - - await bcast.close() - assert peekable.peek() is None - - async def test_broadcast_async_iterator() -> None: """Check that the broadcast receiver works as an async iterator.""" bcast: Broadcast[int] = Broadcast(name="iter_test") diff --git a/tests/utils/test_event.py b/tests/test_event.py similarity index 96% rename from tests/utils/test_event.py rename to tests/test_event.py index 0cda9d23..950720d0 100644 --- a/tests/utils/test_event.py +++ b/tests/test_event.py @@ -8,7 +8,7 @@ import pytest as _pytest from frequenz.channels import ReceiverStoppedError -from frequenz.channels.util import Event +from frequenz.channels.event import Event async def test_event() -> None: diff --git a/tests/utils/test_file_watcher.py b/tests/test_file_watcher.py similarity index 85% rename from tests/utils/test_file_watcher.py rename to tests/test_file_watcher.py index bed75bcb..c1a65838 100644 --- a/tests/utils/test_file_watcher.py +++ b/tests/test_file_watcher.py @@ -15,7 +15,7 @@ from watchfiles import Change from watchfiles.main import FileChange -from frequenz.channels.util import FileWatcher +from frequenz.channels.file_watcher import Event, EventType, FileWatcher class _FakeAwatch: @@ -52,7 +52,7 @@ def fake_awatch() -> Iterator[_FakeAwatch]: """Fixture to mock the awatch function.""" fake = _FakeAwatch() with mock.patch( - "frequenz.channels.util._file_watcher.awatch", + "frequenz.channels.file_watcher.awatch", autospec=True, side_effect=fake.fake_awatch, ): @@ -74,14 +74,14 @@ async def test_file_watcher_receive_updates( for change in changes: recv_changes = await file_watcher.receive() - event_type = FileWatcher.EventType(change[0]) + event_type = EventType(change[0]) path = pathlib.Path(change[1]) - assert recv_changes == FileWatcher.Event(type=event_type, path=path) + assert recv_changes == Event(type=event_type, path=path) -@hypothesis.given(event_types=st.sets(st.sampled_from(FileWatcher.EventType))) +@hypothesis.given(event_types=st.sets(st.sampled_from(EventType))) async def test_file_watcher_filter_events( - event_types: set[FileWatcher.EventType], + event_types: set[EventType], ) -> None: """Test the file watcher events filtering.""" good_path = "good-file" @@ -89,7 +89,7 @@ async def test_file_watcher_filter_events( # We need to reset the mock explicitly because hypothesis runs all the produced # inputs in the same context. with mock.patch( - "frequenz.channels.util._file_watcher.awatch", autospec=True + "frequenz.channels.file_watcher.awatch", autospec=True ) as awatch_mock: file_watcher = FileWatcher(paths=[good_path], event_types=event_types) @@ -100,7 +100,7 @@ async def test_file_watcher_filter_events( pathlib.Path(good_path), stop_event=mock.ANY, watch_filter=filter_events ) ] - for event_type in FileWatcher.EventType: + for event_type in EventType: assert filter_events(event_type.value, good_path) == ( event_type in event_types ) diff --git a/tests/utils/test_integration.py b/tests/test_file_watcher_integration.py similarity index 87% rename from tests/utils/test_integration.py rename to tests/test_file_watcher_integration.py index e61cb620..754aca5f 100644 --- a/tests/utils/test_integration.py +++ b/tests/test_file_watcher_integration.py @@ -9,7 +9,9 @@ import pytest -from frequenz.channels.util import FileWatcher, Timer, select, selected_from +from frequenz.channels import select, selected_from +from frequenz.channels.file_watcher import Event, EventType, FileWatcher +from frequenz.channels.timer import Timer @pytest.mark.integration @@ -31,12 +33,8 @@ async def test_file_watcher(tmp_path: pathlib.Path) -> None: if selected_from(selected, timer): filename.write_text(f"{selected.value}") elif selected_from(selected, file_watcher): - event_type = ( - FileWatcher.EventType.CREATE - if number_of_writes == 0 - else FileWatcher.EventType.MODIFY - ) - assert selected.value == FileWatcher.Event(type=event_type, path=filename) + event_type = EventType.CREATE if number_of_writes == 0 else EventType.MODIFY + assert selected.value == Event(type=event_type, path=filename) number_of_writes += 1 # After receiving a write 3 times, unsubscribe from the writes channel if number_of_writes == expected_number_of_writes: @@ -56,9 +54,7 @@ async def test_file_watcher_deletes(tmp_path: pathlib.Path) -> None: tmp_path: A tmp directory to run the file watcher on. Created by pytest. """ filename = tmp_path / "test-file" - file_watcher = FileWatcher( - paths=[str(tmp_path)], event_types={FileWatcher.EventType.DELETE} - ) + file_watcher = FileWatcher(paths=[str(tmp_path)], event_types={EventType.DELETE}) write_timer = Timer.timeout(timedelta(seconds=0.1)) deletion_timer = Timer.timeout(timedelta(seconds=0.25)) diff --git a/tests/test_merge.py b/tests/test_merge.py index c0d1c420..5f6b14a5 100644 --- a/tests/test_merge.py +++ b/tests/test_merge.py @@ -5,8 +5,7 @@ import asyncio -from frequenz.channels import Anycast, Sender -from frequenz.channels.util import Merge +from frequenz.channels import Anycast, Merge, Sender async def test_merge() -> None: diff --git a/tests/test_mergenamed.py b/tests/test_mergenamed.py index 565264ef..03d272ea 100644 --- a/tests/test_mergenamed.py +++ b/tests/test_mergenamed.py @@ -5,8 +5,7 @@ import asyncio -from frequenz.channels import Anycast, Sender -from frequenz.channels.util import MergeNamed +from frequenz.channels import Anycast, MergeNamed, Sender async def test_mergenamed() -> None: diff --git a/tests/utils/test_select.py b/tests/test_select.py similarity index 93% rename from tests/utils/test_select.py rename to tests/test_select.py index a9a46921..9eb001c5 100644 --- a/tests/utils/test_select.py +++ b/tests/test_select.py @@ -7,8 +7,7 @@ import pytest -from frequenz.channels import Receiver, ReceiverStoppedError -from frequenz.channels.util import Selected, selected_from +from frequenz.channels import Receiver, ReceiverStoppedError, Selected, selected_from class TestSelected: diff --git a/tests/utils/test_select_integration.py b/tests/test_select_integration.py similarity index 99% rename from tests/utils/test_select_integration.py rename to tests/test_select_integration.py index a4984f62..6d676528 100644 --- a/tests/utils/test_select_integration.py +++ b/tests/test_select_integration.py @@ -15,14 +15,15 @@ class at a time. import async_solipsism import pytest -from frequenz.channels import Receiver, ReceiverStoppedError -from frequenz.channels.util import ( - Event, +from frequenz.channels import ( + Receiver, + ReceiverStoppedError, Selected, UnhandledSelectedError, select, selected_from, ) +from frequenz.channels.event import Event @pytest.mark.integration diff --git a/tests/utils/test_timer.py b/tests/test_timer.py similarity index 99% rename from tests/utils/test_timer.py rename to tests/test_timer.py index dd5e5109..73fe28f6 100644 --- a/tests/utils/test_timer.py +++ b/tests/test_timer.py @@ -14,7 +14,7 @@ import pytest from hypothesis import strategies as st -from frequenz.channels.util import ( +from frequenz.channels.timer import ( SkipMissedAndDrift, SkipMissedAndResync, Timer,