Tee and avoiding memory leaks #183
Unanswered
tsoos99dev
asked this question in
Q&A
Replies: 1 comment
-
The You're probably best off writing your own group-of-iterators class where each iterator tracks their index and can compare it against its peers. For example, a (draft, untested) utility class would look like this: from typing import AsyncIterator
class CohesiveAsyncIterators[T]:
def __init__(self, gap_limit: int) -> None:
self.gap_limit = gap_limit
self._peers: dict[object, int] = {}
async def attach(
self, peer: AsyncIterator[T], /, index: int = 0
) -> AsyncIterator[T]:
token = object()
self._peers[token] = index
try:
async for item in peer:
self._peers[token] += 1
yield item
# check that other peers are not too far behind before proceeding
if self._peers[token] > min(self._peers.values()) - self.gap_limit:
# could also be a warning or simply waiting for an Event to show we are back in sync
raise RuntimeError(f"Iterator {peer} advanced more than {self.gap_limit} from peers")
finally:
del self._peers[token] One would just import asyncstdlib as a
def bounded_tee(iterable, n: int, bound: int):
return map(CohesiveAsyncIterators(gap_limit =bound).attach, a.tee(iterable, n=n)) |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Hi. What would be a good way to make sure the iterators returned by tee don't get too far out of sync? I'd love to have a version where I can supply the max number of items the most and least advanced iterators can differ by.
Beta Was this translation helpful? Give feedback.
All reactions