Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@


class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]):
# By default, we defer to a value of 1 which represents running a connector using the Concurrent CDK engine on only one thread.
SINGLE_THREADED_CONCURRENCY_LEVEL = 1
# By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock
# because it has hit the limit of futures but not partition reader is consuming them.
_LOWEST_SAFE_CONCURRENCY_LEVEL = 2

def __init__(
self,
Expand Down Expand Up @@ -107,8 +108,8 @@ def __init__(
concurrency_level // 2, 1
) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
else:
concurrency_level = self.SINGLE_THREADED_CONCURRENCY_LEVEL
initial_number_of_partitions_to_generate = self.SINGLE_THREADED_CONCURRENCY_LEVEL
concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2

self._concurrent_source = ConcurrentSource.create(
num_workers=concurrency_level,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ definitions:
additionalProperties: true
ConcurrencyLevel:
title: Concurrency Level
description: Defines the amount of parallelization for the streams that are being synced. The factor of parallelization is how many partitions or streams are synced at the same time. For example, with a concurrency_level of 10, ten streams or partitions of data will processed at the same time.
description: Defines the amount of parallelization for the streams that are being synced. The factor of parallelization is how many partitions or streams are synced at the same time. For example, with a concurrency_level of 10, ten streams or partitions of data will processed at the same time. Note that a value of 1 could create deadlock if a stream has a very high number of partitions.
type: object
required:
- default_concurrency
Expand Down
Loading