Skip to content

Commit 4a96a2a

Browse files
maxi297octavia-squidington-iii
andauthored
feat: do not reset semaphore when duplicate partitions (#509)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent 2fd2492 commit 4a96a2a

File tree

1 file changed

+8
-1
lines changed

1 file changed

+8
-1
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class ConcurrentPerPartitionCursor(Cursor):
6565
_NO_CURSOR_STATE: Mapping[str, Any] = {}
6666
_GLOBAL_STATE_KEY = "state"
6767
_PERPARTITION_STATE_KEY = "states"
68+
_IS_PARTITION_DUPLICATION_LOGGED = False
6869
_KEY = 0
6970
_VALUE = 1
7071

@@ -279,7 +280,13 @@ def _generate_slices_from_partition(
279280
with self._lock:
280281
self._number_of_partitions += 1
281282
self._cursor_per_partition[partition_key] = cursor
282-
self._semaphore_per_partition[partition_key] = threading.Semaphore(0)
283+
284+
if partition_key in self._semaphore_per_partition:
285+
if not self._IS_PARTITION_DUPLICATION_LOGGED:
286+
logger.warning(f"Partition duplication detected for stream {self._stream_name}")
287+
self._IS_PARTITION_DUPLICATION_LOGGED = True
288+
else:
289+
self._semaphore_per_partition[partition_key] = threading.Semaphore(0)
283290

284291
with self._lock:
285292
if (

0 commit comments

Comments
 (0)