Skip to content

Commit 0b4195b

Browse files
authored
fix: manifest normalization (#599)
1 parent c357b4c commit 0b4195b

File tree

4 files changed

+455
-664
lines changed

4 files changed

+455
-664
lines changed

airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py

Lines changed: 110 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,7 @@ def normalize(self) -> ManifestType:
108108
ManifestNormalizationException: Caught internally and handled by returning the original manifest.
109109
"""
110110
try:
111-
self._deduplicate_minifest()
112-
self._reference_schemas()
111+
self._deduplicate_manifest()
113112

114113
return self._normalized_manifest
115114
except ManifestNormalizationException:
@@ -131,7 +130,7 @@ def _get_manifest_streams(self) -> Iterable[Dict[str, Any]]:
131130

132131
yield from []
133132

134-
def _deduplicate_minifest(self) -> None:
133+
def _deduplicate_manifest(self) -> None:
135134
"""
136135
Find commonalities in the input JSON structure and refactor it to avoid redundancy.
137136
"""
@@ -141,9 +140,117 @@ def _deduplicate_minifest(self) -> None:
141140
self._prepare_definitions()
142141
# replace duplicates with references, if any
143142
self._handle_duplicates(self._collect_duplicates())
143+
# replace parent streams with $refs
144+
self._replace_parent_streams_with_refs()
145+
# clean dangling fields after resolving $refs
146+
self._clean_dangling_fields()
144147
except Exception as e:
145148
raise ManifestNormalizationException(str(e))
146149

150+
def _replace_parent_streams_with_refs(self) -> None:
151+
"""
152+
For each stream in the manifest, if it has a retriever.partition_router with parent_stream_configs,
153+
replace any 'stream' fields in those configs that are dicts and deeply equal to another stream object
154+
with a $ref to the correct stream index.
155+
"""
156+
streams = self._normalized_manifest.get(STREAMS_TAG, [])
157+
158+
# Build a hash-to-index mapping for O(1) lookups
159+
stream_hash_to_index = {}
160+
for idx, stream in enumerate(streams):
161+
stream_hash = self._hash_object(stream)
162+
stream_hash_to_index[stream_hash] = idx
163+
164+
for idx, stream in enumerate(streams):
165+
retriever = stream.get("retriever")
166+
if not retriever:
167+
continue
168+
partition_router = retriever.get("partition_router")
169+
routers = (
170+
partition_router
171+
if isinstance(partition_router, list)
172+
else [partition_router]
173+
if partition_router
174+
else []
175+
)
176+
for router in routers:
177+
if not isinstance(router, dict):
178+
continue
179+
if router.get("type") != "SubstreamPartitionRouter":
180+
continue
181+
parent_stream_configs = router.get("parent_stream_configs", [])
182+
for parent_config in parent_stream_configs:
183+
if not isinstance(parent_config, dict):
184+
continue
185+
stream_ref = parent_config.get("stream")
186+
# Only replace if it's a dict and matches any stream in the manifest
187+
if stream_ref is not None and isinstance(stream_ref, dict):
188+
stream_ref_hash = self._hash_object(stream_ref)
189+
if stream_ref_hash in stream_hash_to_index:
190+
parent_config["stream"] = {
191+
"$ref": f"#/streams/{stream_hash_to_index[stream_ref_hash]}"
192+
}
193+
194+
def _clean_dangling_fields(self) -> None:
195+
"""
196+
Clean the manifest by removing unused definitions and schemas.
197+
This method removes any definitions or schemas that are not referenced by any $ref in the manifest.
198+
"""
199+
200+
def find_all_refs(obj: Dict[str, Any], refs: List[str]) -> None:
201+
"""
202+
Recursively find all $ref paths in the object.
203+
204+
Args:
205+
obj: The object to search through
206+
refs: List to store found reference paths
207+
"""
208+
if not isinstance(obj, dict):
209+
return
210+
211+
for key, value in obj.items():
212+
if key == "$ref" and isinstance(value, str):
213+
# Remove the leading #/ from the ref path
214+
refs.append(value[2:])
215+
elif isinstance(value, dict):
216+
find_all_refs(value, refs)
217+
elif isinstance(value, list):
218+
for item in value:
219+
if isinstance(item, dict):
220+
find_all_refs(item, refs)
221+
222+
def clean_section(section: Dict[str, Any], section_path: str) -> None:
223+
"""
224+
Clean a section by removing unreferenced fields.
225+
226+
Args:
227+
section: The section to clean
228+
section_path: The path to this section in the manifest
229+
"""
230+
for key in list(section.keys()):
231+
current_path = f"{section_path}/{key}"
232+
# Check if this path is referenced or is a parent of a referenced path
233+
if not any(ref.startswith(current_path) for ref in all_refs):
234+
del section[key]
235+
236+
# Find all references in the manifest
237+
all_refs: List[str] = []
238+
find_all_refs(self._normalized_manifest, all_refs)
239+
240+
# Clean definitions
241+
if DEF_TAG in self._normalized_manifest:
242+
clean_section(self._normalized_manifest[DEF_TAG], DEF_TAG)
243+
# Remove empty definitions section
244+
if not self._normalized_manifest[DEF_TAG]:
245+
del self._normalized_manifest[DEF_TAG]
246+
247+
# Clean schemas
248+
if SCHEMAS_TAG in self._normalized_manifest:
249+
clean_section(self._normalized_manifest[SCHEMAS_TAG], SCHEMAS_TAG)
250+
# Remove empty schemas section
251+
if not self._normalized_manifest[SCHEMAS_TAG]:
252+
del self._normalized_manifest[SCHEMAS_TAG]
253+
147254
def _prepare_definitions(self) -> None:
148255
"""
149256
Clean the definitions in the manifest by removing unnecessary properties.
@@ -163,43 +270,6 @@ def _prepare_definitions(self) -> None:
163270
if key != LINKED_TAG:
164271
self._normalized_manifest[DEF_TAG].pop(key, None)
165272

166-
def _extract_stream_schema(self, stream: Dict[str, Any]) -> None:
167-
"""
168-
Extract the schema from the stream and add it to the `schemas` tag.
169-
"""
170-
171-
stream_name = stream["name"]
172-
# copy the value of the SCHEMA_TAG to the SCHEMAS_TAG with the stream name as key
173-
schema = stream.get(SCHEMA_LOADER_TAG, {}).get(SCHEMA_TAG)
174-
if not SCHEMAS_TAG in self._normalized_manifest.keys():
175-
self._normalized_manifest[SCHEMAS_TAG] = {}
176-
# add stream schema to the SCHEMAS_TAG
177-
if not stream_name in self._normalized_manifest[SCHEMAS_TAG].keys():
178-
# add the schema to the SCHEMAS_TAG with the stream name as key
179-
self._normalized_manifest[SCHEMAS_TAG][stream_name] = schema
180-
181-
def _reference_schemas(self) -> None:
182-
"""
183-
Set the schema reference for the given stream in the manifest.
184-
This function modifies the manifest in place.
185-
"""
186-
187-
# reference the stream schema for the stream to where it's stored
188-
if SCHEMAS_TAG in self._normalized_manifest.keys():
189-
for stream in self._get_manifest_streams():
190-
self._extract_stream_schema(stream)
191-
self._set_stream_schema_ref(stream)
192-
193-
def _set_stream_schema_ref(self, stream: Dict[str, Any]) -> None:
194-
"""
195-
Set the schema reference for the given stream in the manifest.
196-
This function modifies the manifest in place.
197-
"""
198-
stream_name = stream["name"]
199-
if SCHEMAS_TAG in self._normalized_manifest.keys():
200-
if stream_name in self._normalized_manifest[SCHEMAS_TAG]:
201-
stream[SCHEMA_LOADER_TAG][SCHEMA_TAG] = self._create_schema_ref(stream_name)
202-
203273
def _replace_duplicates_with_refs(self, duplicates: DuplicatesType) -> None:
204274
"""
205275
Process duplicate objects and replace them with references.
@@ -447,16 +517,3 @@ def _create_linked_definition_ref(self, type_key: str, key: str) -> Dict[str, st
447517
"""
448518

449519
return {"$ref": f"#/{DEF_TAG}/{LINKED_TAG}/{type_key}/{key}"}
450-
451-
def _create_schema_ref(self, key: str) -> Dict[str, str]:
452-
"""
453-
Create a reference object for stream schema using the specified key.
454-
455-
Args:
456-
key: The reference key to use
457-
458-
Returns:
459-
A reference object in the proper format
460-
"""
461-
462-
return {"$ref": f"#/{SCHEMAS_TAG}/{key}"}

0 commit comments

Comments
 (0)