Skip to content

Commit 97d81ff

Browse files
committed
removing lock from concurrent cursor
1 parent 20ae208 commit 97d81ff

File tree

3 files changed

+32
-24
lines changed

3 files changed

+32
-24
lines changed

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,12 @@ def on_partition(self, partition: Partition) -> None:
9696
"""
9797
stream_name = partition.stream_name()
9898
self._streams_to_running_partitions[stream_name].add(partition)
99-
cursor = self._stream_name_to_instance[stream_name].cursor
10099
if self._slice_logger.should_log_slice_message(self._logger):
101100
self._message_repository.emit_message(
102101
self._slice_logger.create_slice_log_message(partition.to_slice())
103102
)
104103
self._thread_pool_manager.submit(
105-
self._partition_reader.process_partition, partition, cursor
104+
self._partition_reader.process_partition, partition
106105
)
107106

108107
def on_partition_complete_sentinel(
@@ -116,16 +115,26 @@ def on_partition_complete_sentinel(
116115
"""
117116
partition = sentinel.partition
118117

119-
partitions_running = self._streams_to_running_partitions[partition.stream_name()]
120-
if partition in partitions_running:
121-
partitions_running.remove(partition)
122-
# If all partitions were generated and this was the last one, the stream is done
123-
if (
124-
partition.stream_name() not in self._streams_currently_generating_partitions
125-
and len(partitions_running) == 0
126-
):
127-
yield from self._on_stream_is_done(partition.stream_name())
128-
yield from self._message_repository.consume_queue()
118+
try:
119+
if sentinel.is_successful:
120+
stream = self._stream_name_to_instance[partition.stream_name()]
121+
stream.cursor.close_partition(partition)
122+
except Exception as exception:
123+
self._flag_exception(partition.stream_name(), exception)
124+
yield AirbyteTracedException.from_exception(
125+
exception, stream_descriptor=StreamDescriptor(name=partition.stream_name())
126+
).as_sanitized_airbyte_message()
127+
finally:
128+
partitions_running = self._streams_to_running_partitions[partition.stream_name()]
129+
if partition in partitions_running:
130+
partitions_running.remove(partition)
131+
# If all partitions were generated and this was the last one, the stream is done
132+
if (
133+
partition.stream_name() not in self._streams_currently_generating_partitions
134+
and len(partitions_running) == 0
135+
):
136+
yield from self._on_stream_is_done(partition.stream_name())
137+
yield from self._message_repository.consume_queue()
129138

130139
def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
131140
"""
@@ -154,6 +163,7 @@ def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
154163
stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING
155164
)
156165
self._record_counter[stream.name] += 1
166+
stream.cursor.observe(record)
157167
yield message
158168
yield from self._message_repository.consume_queue()
159169

airbyte_cdk/sources/streams/concurrent/cursor.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ def __init__(
198198
# not thread safe. When multiple partitions are being closed by the cursor at the same time, it is
199199
# possible for one partition to update concurrent_state after a second partition has already read
200200
# the previous state. This can lead to the second partition overwriting the previous one's state.
201-
self._lock = threading.Lock()
201+
# self._lock = threading.Lock()
202202

203203
@property
204204
def state(self) -> MutableMapping[str, Any]:
@@ -273,14 +273,14 @@ def _extract_cursor_value(self, record: Record) -> Any:
273273
return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
274274

275275
def close_partition(self, partition: Partition) -> None:
276-
with self._lock:
277-
slice_count_before = len(self._concurrent_state.get("slices", []))
278-
self._add_slice_to_state(partition)
279-
if slice_count_before < len(
280-
self._concurrent_state["slices"]
281-
): # only emit if at least one slice has been processed
282-
self._merge_partitions()
283-
self._emit_state_message()
276+
#with self._lock:
277+
slice_count_before = len(self._concurrent_state.get("slices", []))
278+
self._add_slice_to_state(partition)
279+
if slice_count_before < len(
280+
self._concurrent_state["slices"]
281+
): # only emit if at least one slice has been processed
282+
self._merge_partitions()
283+
self._emit_state_message()
284284
self._has_closed_at_least_one_slice = True
285285

286286
def _add_slice_to_state(self, partition: Partition) -> None:

airbyte_cdk/sources/streams/concurrent/partition_reader.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def __init__(
6060
self._queue = queue
6161
self._partition_logger = partition_logger
6262

63-
def process_partition(self, partition: Partition, cursor: Cursor) -> None:
63+
def process_partition(self, partition: Partition) -> None:
6464
"""
6565
Process a partition and put the records in the output queue.
6666
When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated.
@@ -78,8 +78,6 @@ def process_partition(self, partition: Partition, cursor: Cursor) -> None:
7878

7979
for record in partition.read():
8080
self._queue.put(record)
81-
cursor.observe(record)
82-
cursor.close_partition(partition)
8381
self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL))
8482
except Exception as e:
8583
self._queue.put(StreamThreadException(e, partition.stream_name()))

0 commit comments

Comments
 (0)