Skip to content

Commit d357f67

Browse files
committed
Try fixing sqlite errors
1 parent 50d6f21 commit d357f67

File tree

2 files changed

+13
-7
lines changed

2 files changed

+13
-7
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,9 @@
5656

5757

5858
class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]):
59-
# By default, we defer to a value of 1 which represents running a connector using the Concurrent CDK engine on only one thread.
60-
SINGLE_THREADED_CONCURRENCY_LEVEL = 1
59+
# By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock
60+
# because it has hit the limit of futures but not partition reader is consuming them.
61+
SINGLE_THREADED_CONCURRENCY_LEVEL = 2
6162

6263
def __init__(
6364
self,
@@ -121,7 +122,7 @@ def __init__(
121122
) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
122123
else:
123124
concurrency_level = self.SINGLE_THREADED_CONCURRENCY_LEVEL
124-
initial_number_of_partitions_to_generate = self.SINGLE_THREADED_CONCURRENCY_LEVEL
125+
initial_number_of_partitions_to_generate = self.SINGLE_THREADED_CONCURRENCY_LEVEL // 2
125126

126127
self._concurrent_source = ConcurrentSource.create(
127128
num_workers=concurrency_level,

airbyte_cdk/sources/streams/http/http_client.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ def __init__(
9595
):
9696
self._name = name
9797
self._api_budget: APIBudget = api_budget or APIBudget(policies=[])
98+
self._logger = logger
9899
if session:
99100
self._session = session
100101
else:
@@ -108,7 +109,6 @@ def __init__(
108109
)
109110
if isinstance(authenticator, AuthBase):
110111
self._session.auth = authenticator
111-
self._logger = logger
112112
self._error_handler = error_handler or HttpStatusErrorHandler(self._logger)
113113
if backoff_strategy is not None:
114114
if isinstance(backoff_strategy, list):
@@ -140,10 +140,12 @@ def _request_session(self) -> requests.Session:
140140
# Use in-memory cache if cache_dir is not set
141141
# This is a non-obvious interface, but it ensures we don't write sql files when running unit tests
142142
if cache_dir:
143+
self._logger.info(f"Using path {cache_dir} for HTTP cache") # TODO: remove
143144
sqlite_path = str(Path(cache_dir) / self.cache_filename)
144145
else:
146+
self._logger.info("Using memory for cache") # TODO: remove
145147
sqlite_path = "file::memory:?cache=shared"
146-
backend = SkipFailureSQLiteCache(sqlite_path)
148+
backend = SkipFailureSQLiteCache(self._name, sqlite_path) # TODO maybe add a busy timeout
147149
return CachedLimiterSession(
148150
sqlite_path, backend=backend, api_budget=self._api_budget, match_headers=True
149151
) # type: ignore # there are no typeshed stubs for requests_cache
@@ -541,18 +543,21 @@ def _write(self, key: str, value: str) -> None:
541543
class SkipFailureSQLiteCache(requests_cache.backends.sqlite.SQLiteCache):
542544
def __init__( # type: ignore # ignoring as lib is not typed
543545
self,
546+
table_name="response",
544547
db_path="http_cache",
545548
serializer=None,
546549
**kwargs,
547550
) -> None:
548551
super().__init__(db_path, serializer, **kwargs)
549552
skwargs = {"serializer": serializer, **kwargs} if serializer else kwargs
550553
self.responses: requests_cache.backends.sqlite.SQLiteDict = SkipFailureSQLiteDict(
551-
db_path, table_name="responses", **skwargs
554+
db_path, table_name=table_name, fast_save=True, wal=True, **skwargs
552555
)
553556
self.redirects: requests_cache.backends.sqlite.SQLiteDict = SkipFailureSQLiteDict(
554557
db_path,
555-
table_name="redirects",
558+
table_name=f"redirects_{table_name}",
559+
fast_save=True,
560+
wal=True,
556561
lock=self.responses._lock,
557562
serializer=None,
558563
**kwargs,

0 commit comments

Comments
 (0)