Skip to content
Merged
Changes from 24 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
707a6c6
Add API Budget
tolik0 Feb 5, 2025
b6bcdd7
Refactor to move api_budget to root level
tolik0 Feb 6, 2025
040ff9e
Format
tolik0 Feb 6, 2025
824d2c6
Merge branch 'main' into tolik0/add-api-budget
tolik0 Feb 7, 2025
15f830c
Update for backward compatibility
tolik0 Feb 7, 2025
1285668
Add unit tests
tolik0 Feb 9, 2025
7be9842
Add FixedWindowCallRatePolicy unit test
tolik0 Feb 9, 2025
8d3bfce
Change the partitions limit to 1000
tolik0 Feb 10, 2025
509ea05
Refactored switching logic
tolik0 Feb 10, 2025
8d44150
Increase the limit for number of partitions in memory
tolik0 Feb 10, 2025
b3f9897
Merge branch 'tolik0/add-api-budget-limit-1000' into tolik0/refactor-…
tolik0 Feb 11, 2025
342375c
Refactor ConcurrentPerPartitionCursor to not use ConcurrentCursor wit…
tolik0 Feb 12, 2025
05f4db7
Delete code from another branch
tolik0 Feb 12, 2025
c0bc645
Fix cursor value from record
tolik0 Feb 12, 2025
52b95e3
Add throttling for state emitting in ConcurrentPerPartitionCursor
tolik0 Feb 13, 2025
1166a7a
Fix unit tests
tolik0 Feb 17, 2025
4a7d9ec
Move switching to global logic
tolik0 Feb 17, 2025
19ad269
Revert test limits
tolik0 Feb 17, 2025
667700f
Merge branch 'main' into tolik0/refactor-concurrent-global-cursor
tolik0 Feb 17, 2025
6498528
Fix format
tolik0 Feb 17, 2025
d3e7fe2
Add parent state updates
tolik0 Feb 17, 2025
7b4964e
Move acquiring the semaphore
tolik0 Feb 17, 2025
8617cc8
Merge branch 'tolik0/refactor-concurrent-global-cursor' into tolik0/c…
tolik0 Feb 17, 2025
a8db6b6
Merge branch 'main' into tolik0/concurrent-perpartition-add-parent-st…
tolik0 Feb 18, 2025
203c131
Refactor to store only unique states
tolik0 Feb 18, 2025
671fab4
Add intermediate states validation to unit tests
tolik0 Feb 18, 2025
a1d98fb
Fix format
tolik0 Feb 18, 2025
eff25ec
Add unit tests
tolik0 Feb 19, 2025
c51f840
Update unit tests
tolik0 Feb 21, 2025
4a18954
Add deleting finished semaphores
tolik0 Feb 21, 2025
a7ece97
Delete testing prints
tolik0 Feb 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ def __init__(
# the oldest partitions can be efficiently removed, maintaining the most recent partitions.
self._cursor_per_partition: OrderedDict[str, ConcurrentCursor] = OrderedDict()
self._semaphore_per_partition: OrderedDict[str, threading.Semaphore] = OrderedDict()

# Parent-state tracking: store each partition’s parent state in creation order
self._partition_parent_state_map: OrderedDict[str, Mapping[str, Any]] = OrderedDict()

self._finished_partitions: set[str] = set()
self._lock = threading.Lock()
self._timer = Timer()
Expand Down Expand Up @@ -155,7 +159,29 @@ def close_partition(self, partition: Partition) -> None:
and self._semaphore_per_partition[partition_key]._value == 0
):
self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key])
self._emit_state_message()

self._check_and_update_parent_state()

self._emit_state_message()

def _check_and_update_parent_state(self) -> None:
"""
If all slices for the earliest partitions are closed, pop them from the left
of _partition_parent_state_map and update _parent_state to the most recent popped.
"""
last_closed_state = None
# We iterate in creation order (left to right) in the OrderedDict
for p_key in list(self._partition_parent_state_map.keys()):
# If this partition is not fully closed, stop
if p_key not in self._finished_partitions or self._semaphore_per_partition[p_key]._value != 0:
break
# Otherwise, we pop from the left
_, closed_parent_state = self._partition_parent_state_map.popitem(last=False)
last_closed_state = closed_parent_state

# If we popped at least one partition, update the parent_state to that partition's parent state
if last_closed_state is not None:
self._parent_state = last_closed_state

def ensure_at_least_one_state_emitted(self) -> None:
"""
Expand Down Expand Up @@ -201,13 +227,17 @@ def stream_slices(self) -> Iterable[StreamSlice]:

slices = self._partition_router.stream_slices()
self._timer.start()
for partition in slices:
yield from self._generate_slices_from_partition(partition)
for partition, last, parent_state in iterate_with_last_flag_and_state(
slices, self._partition_router.get_stream_state
):
yield from self._generate_slices_from_partition(partition, parent_state)

def _generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]:
def _generate_slices_from_partition(self, partition: StreamSlice, parent_state: Mapping[str, Any]) -> Iterable[StreamSlice]:
# Ensure the maximum number of partitions is not exceeded
self._ensure_partition_limit()

partition_key = self._to_partition_key(partition.partition)

cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition))
if not cursor:
cursor = self._create_cursor(
Expand All @@ -216,18 +246,21 @@ def _generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[St
)
with self._lock:
self._number_of_partitions += 1
self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor
self._semaphore_per_partition[self._to_partition_key(partition.partition)] = (
self._cursor_per_partition[partition_key] = cursor
self._semaphore_per_partition[partition_key] = (
threading.Semaphore(0)
)

with self._lock:
self._partition_parent_state_map[partition_key] = deepcopy(parent_state)

for cursor_slice, is_last_slice, _ in iterate_with_last_flag_and_state(
cursor.stream_slices(),
lambda: None,
):
self._semaphore_per_partition[self._to_partition_key(partition.partition)].release()
self._semaphore_per_partition[partition_key].release()
if is_last_slice:
self._finished_partitions.add(self._to_partition_key(partition.partition))
self._finished_partitions.add(partition_key)
yield StreamSlice(
partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields
)
Expand All @@ -254,6 +287,7 @@ def _ensure_partition_limit(self) -> None:
self._use_global_cursor = True

with self._lock:
self._number_of_partitions += 1
while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1:
# Try removing finished partitions first
for partition_key in list(self._cursor_per_partition.keys()):
Expand Down
Loading