Skip to content

Commit f360d91

Browse files
maxi297octavia-squidington-iii
andauthored
fix(manifest server): default concurrency settings (#792)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent 8158f0d commit f360d91

File tree

4 files changed

+23
-67
lines changed

4 files changed

+23
-67
lines changed

airbyte_cdk/manifest_server/command_processor/utils.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,7 @@ def build_source(
6464
page_limit: Optional[int] = None,
6565
slice_limit: Optional[int] = None,
6666
) -> ConcurrentDeclarativeSource:
67-
# We enforce a concurrency level of 1 so that the stream is processed on a single thread
68-
# to retain ordering for the grouping of the builder message responses.
6967
definition = copy.deepcopy(manifest)
70-
if "concurrency_level" in definition:
71-
definition["concurrency_level"]["default_concurrency"] = 1
72-
else:
73-
definition["concurrency_level"] = {
74-
"type": "ConcurrencyLevel",
75-
"default_concurrency": 1,
76-
}
7768

7869
should_normalize = should_normalize_manifest(manifest)
7970
if should_normalize:

airbyte_cdk/manifest_server/routers/manifest.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,19 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse:
8787
"md5": hashlib.md5(request.custom_components_code.encode()).hexdigest()
8888
}
8989

90+
# We enforce a concurrency level of 1 so that the stream is processed on a single thread
91+
# to retain ordering for the grouping of the builder message responses.
92+
manifest = request.manifest.model_dump()
93+
if "concurrency_level" in manifest:
94+
manifest["concurrency_level"]["default_concurrency"] = 1
95+
else:
96+
manifest["concurrency_level"] = {
97+
"type": "ConcurrencyLevel",
98+
"default_concurrency": 1,
99+
}
100+
90101
source = safe_build_source(
91-
request.manifest.model_dump(),
102+
manifest,
92103
config_dict,
93104
catalog,
94105
converted_state,

unit_tests/manifest_server/command_processor/test_utils.py

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -31,61 +31,6 @@ def test_build_catalog_creates_correct_structure(self):
3131
assert configured_stream.sync_mode == SyncMode.incremental
3232
assert configured_stream.destination_sync_mode == DestinationSyncMode.overwrite
3333

34-
@patch("airbyte_cdk.manifest_server.command_processor.utils.ConcurrentDeclarativeSource")
35-
def test_build_source_creates_source(self, mock_source_class):
36-
"""Test that build_source creates a ConcurrentDeclarativeSource with correct parameters."""
37-
# Setup mocks
38-
mock_source = Mock()
39-
mock_source_class.return_value = mock_source
40-
41-
# Test with complex manifest and config structures
42-
manifest = {
43-
"version": "0.1.0",
44-
"definitions": {"selector": {"extractor": {"field_path": ["data"]}}},
45-
"streams": [
46-
{
47-
"name": "users",
48-
"primary_key": "id",
49-
"retriever": {
50-
"requester": {
51-
"url_base": "https://api.example.com",
52-
"path": "/users",
53-
}
54-
},
55-
}
56-
],
57-
"check": {"stream_names": ["users"]},
58-
}
59-
60-
config = {
61-
"api_key": "sk-test-123",
62-
"base_url": "https://api.example.com",
63-
"timeout": 30,
64-
}
65-
66-
# Call build_source with additional parameters
67-
catalog = build_catalog("test_stream")
68-
state = []
69-
result = build_source(manifest, catalog, config, state)
70-
71-
# Verify ConcurrentDeclarativeSource was created with correct parameters
72-
expected_source_config = {
73-
**manifest,
74-
"concurrency_level": {"type": "ConcurrencyLevel", "default_concurrency": 1},
75-
}
76-
mock_source_class.assert_called_once_with(
77-
catalog=catalog,
78-
state=state,
79-
source_config=expected_source_config,
80-
config=config,
81-
normalize_manifest=False, # Default when flag not set
82-
migrate_manifest=False, # Default when flag not set
83-
emit_connector_builder_messages=True,
84-
limits=mock_source_class.call_args[1]["limits"],
85-
)
86-
87-
assert result == mock_source
88-
8934
@patch("airbyte_cdk.manifest_server.command_processor.utils.ConcurrentDeclarativeSource")
9035
def test_build_source_with_normalize_flag(self, mock_source_class):
9136
"""Test build_source when normalize flag is set."""

unit_tests/manifest_server/routers/test_manifest.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,12 @@ def test_test_read_endpoint_success(
109109

110110
assert response.status_code == 200
111111
# Verify build_source was called with correct arguments
112+
expected_source_config = {
113+
**sample_manifest,
114+
"concurrency_level": {"type": "ConcurrencyLevel", "default_concurrency": 1},
115+
}
112116
mock_build_source.assert_called_once_with(
113-
sample_manifest,
117+
expected_source_config,
114118
mock_build_catalog.return_value,
115119
sample_config,
116120
[],
@@ -169,8 +173,13 @@ def test_test_read_with_custom_components(
169173
assert config_arg["__injected_components_py_checksums"]["md5"] == expected_checksum
170174

171175
# Verify other arguments
176+
# Verify build_source was called with correct arguments
177+
expected_source_config = {
178+
**sample_manifest,
179+
"concurrency_level": {"type": "ConcurrencyLevel", "default_concurrency": 1},
180+
}
172181
mock_build_source.assert_called_once_with(
173-
sample_manifest,
182+
expected_source_config,
174183
mock_build_catalog.return_value,
175184
config_arg,
176185
[],

0 commit comments

Comments
 (0)