|
22 | 22 | MultipleExceptionError, |
23 | 23 | NoPublicConstructor, |
24 | 24 | final, |
25 | | - generic_function, |
26 | 25 | raise_single_exception_from_group, |
27 | 26 | ) |
28 | 27 |
|
|
45 | 44 | P = ParamSpec("P") |
46 | 45 |
|
47 | 46 |
|
48 | | -def _open_memory_channel( |
49 | | - max_buffer_size: int | float, # noqa: PYI041 |
50 | | -) -> tuple[MemorySendChannel[T], MemoryReceiveChannel[T]]: |
| 47 | +# written as a class so you can say open_memory_channel[int](5) |
| 48 | +@final |
| 49 | +class open_memory_channel(tuple["MemorySendChannel[T]", "MemoryReceiveChannel[T]"]): |
51 | 50 | """Open a channel for passing objects between tasks within a process. |
52 | 51 |
|
53 | 52 | Memory channels are lightweight, cheap to allocate, and entirely |
@@ -97,38 +96,24 @@ def _open_memory_channel( |
97 | 96 | channel (summing over all clones). |
98 | 97 | * ``tasks_waiting_receive``: The number of tasks blocked in ``receive`` on |
99 | 98 | this channel (summing over all clones). |
100 | | -
|
101 | 99 | """ |
102 | | - if max_buffer_size != inf and not isinstance(max_buffer_size, int): |
103 | | - raise TypeError("max_buffer_size must be an integer or math.inf") |
104 | | - if max_buffer_size < 0: |
105 | | - raise ValueError("max_buffer_size must be >= 0") |
106 | | - state: MemoryChannelState[T] = MemoryChannelState(max_buffer_size) |
107 | | - return ( |
108 | | - MemorySendChannel[T]._create(state), |
109 | | - MemoryReceiveChannel[T]._create(state), |
110 | | - ) |
111 | | - |
112 | | - |
113 | | -# This workaround requires python3.9+, once older python versions are not supported |
114 | | -# or there's a better way of achieving type-checking on a generic factory function, |
115 | | -# it could replace the normal function header |
116 | | -if TYPE_CHECKING: |
117 | | - # written as a class so you can say open_memory_channel[int](5) |
118 | | - class open_memory_channel(tuple["MemorySendChannel[T]", "MemoryReceiveChannel[T]"]): |
119 | | - def __new__( # type: ignore[misc] # "must return a subtype" |
120 | | - cls, |
121 | | - max_buffer_size: int | float, # noqa: PYI041 |
122 | | - ) -> tuple[MemorySendChannel[T], MemoryReceiveChannel[T]]: |
123 | | - return _open_memory_channel(max_buffer_size) |
124 | | - |
125 | | - def __init__(self, max_buffer_size: int | float) -> None: # noqa: PYI041 |
126 | | - ... |
127 | | - |
128 | | -else: |
129 | | - # apply the generic_function decorator to make open_memory_channel indexable |
130 | | - # so it's valid to say e.g. ``open_memory_channel[bytes](5)`` at runtime |
131 | | - open_memory_channel = generic_function(_open_memory_channel) |
| 100 | + |
| 101 | + def __new__( # type: ignore[misc] # "must return a subtype" |
| 102 | + cls, |
| 103 | + max_buffer_size: int | float, # noqa: PYI041 |
| 104 | + ) -> tuple[MemorySendChannel[T], MemoryReceiveChannel[T]]: |
| 105 | + if max_buffer_size != inf and not isinstance(max_buffer_size, int): |
| 106 | + raise TypeError("max_buffer_size must be an integer or math.inf") |
| 107 | + if max_buffer_size < 0: |
| 108 | + raise ValueError("max_buffer_size must be >= 0") |
| 109 | + state: MemoryChannelState[T] = MemoryChannelState(max_buffer_size) |
| 110 | + return ( |
| 111 | + MemorySendChannel[T]._create(state), |
| 112 | + MemoryReceiveChannel[T]._create(state), |
| 113 | + ) |
| 114 | + |
| 115 | + def __init__(self, max_buffer_size: int | float) -> None: # noqa: PYI041 |
| 116 | + ... |
132 | 117 |
|
133 | 118 |
|
134 | 119 | @attrs.frozen |
|
0 commit comments