From afddade8e63fdcb0a9ab690330851ec57349ed28 Mon Sep 17 00:00:00 2001 From: Fabio Pulvirenti Date: Mon, 17 Nov 2025 09:50:22 +0100 Subject: [PATCH 01/12] Flatten allOf properties for OpenAI compatibility --- docs/models/openai.md | 4 + docs/tools.md | 24 ++++ pydantic_ai_slim/pydantic_ai/_json_schema.py | 113 +++++++++++++++++- .../pydantic_ai/profiles/google.py | 4 +- .../pydantic_ai/profiles/openai.py | 4 +- tests/models/test_openai_like_providers.py | 46 +++++++ tests/test_json_schema_flattener.py | 110 +++++++++++++++++ 7 files changed, 301 insertions(+), 4 deletions(-) create mode 100644 tests/models/test_openai_like_providers.py create mode 100644 tests/test_json_schema_flattener.py diff --git a/docs/models/openai.md b/docs/models/openai.md index 6c246481bf..2324f97330 100644 --- a/docs/models/openai.md +++ b/docs/models/openai.md @@ -99,6 +99,10 @@ agent = Agent(model) ``` ## OpenAI Responses API +### Tips for JSON Schema compatibility + +Strict Structured Outputs prefer flat object schemas without combinators. If your tool or output schema contains `allOf`/`oneOf`, consider flattening `allOf` ahead of time. See the Tools documentation section "Flattening allOf for provider compatibility" for a sample prepare hook. + Pydantic AI also supports OpenAI's [Responses API](https://platform.openai.com/docs/api-reference/responses) through the diff --git a/docs/tools.md b/docs/tools.md index 40dcf5c810..39510f2486 100644 --- a/docs/tools.md +++ b/docs/tools.md @@ -357,6 +357,30 @@ print(test_model.last_model_request_parameters.function_tools) _(This example is complete, it can be run "as is")_ +### Flattening allOf for provider compatibility + +Some providers (especially when using strict Structured Outputs) reject JSON Schema combinators like `allOf/oneOf`. +If your tool parameter schema includes `allOf`, you can flatten it before sending to the model using a prepare hook. + +Example prepare hook that flattens `allOf`: + +```python +from pydantic_ai import Agent +from pydantic_ai.tools import RunContext, ToolDefinition +from pydantic_ai._json_schema import flatten_allof + +async def flatten_prepare(ctx: RunContext[None], tool: ToolDefinition) -> ToolDefinition: + tool.parameters_json_schema = flatten_allof(tool.parameters_json_schema) + return tool + +# Register your tools normally, then pass `prepare_tools=flatten_prepare` to Agent if you want to apply globally. +agent = Agent('openai:gpt-4o', prepare_tools=lambda ctx, tools: [ + await flatten_prepare(ctx, t) or t for t in tools +]) +``` + +Alternatively, you can construct tools with a flattened schema at source (e.g., for MCP-exposed tools) using `Tool.from_schema`. + ## See Also For more tool features and integrations, see: diff --git a/pydantic_ai_slim/pydantic_ai/_json_schema.py b/pydantic_ai_slim/pydantic_ai/_json_schema.py index cbaa180208..6309fc3d16 100644 --- a/pydantic_ai_slim/pydantic_ai/_json_schema.py +++ b/pydantic_ai_slim/pydantic_ai/_json_schema.py @@ -4,12 +4,14 @@ from abc import ABC, abstractmethod from copy import deepcopy from dataclasses import dataclass -from typing import Any, Literal +from typing import Any, Literal, cast from .exceptions import UserError JsonSchema = dict[str, Any] +__all__ = ['JsonSchemaTransformer', 'InlineDefsJsonSchemaTransformer', 'flatten_allof'] + @dataclass(init=False) class JsonSchemaTransformer(ABC): @@ -30,7 +32,9 @@ def __init__( self.schema = schema self.strict = strict - self.is_strict_compatible = True # Can be set to False by subclasses to set `strict` on `ToolDefinition` when set not set by user explicitly + # Can be set to False by subclasses to set `strict` on `ToolDefinition` + # when not set explicitly by the user. + self.is_strict_compatible = True self.prefer_inlined_defs = prefer_inlined_defs self.simplify_nullable_unions = simplify_nullable_unions @@ -188,3 +192,108 @@ def __init__(self, schema: JsonSchema, *, strict: bool | None = None): def transform(self, schema: JsonSchema) -> JsonSchema: return schema + + +def _allof_is_object_like(member: JsonSchema) -> bool: + member_type = member.get('type') + if member_type is None: + keys = ('properties', 'additionalProperties', 'patternProperties') + return bool(any(k in member for k in keys)) + return member_type == 'object' + + +def _merge_additional_properties_values(values: list[Any]) -> bool | JsonSchema: + if any(isinstance(v, dict) for v in values): + return True + return False if values and all(v is False for v in values) else True + + +def _flatten_current_level(s: JsonSchema) -> JsonSchema: + raw_members = s.get('allOf') + if not isinstance(raw_members, list) or not raw_members: + return s + + members = cast(list[JsonSchema], raw_members) + for raw in members: + if not isinstance(raw, dict): + return s + if not all(_allof_is_object_like(member) for member in members): + return s + + processed_members = [_recurse_flatten_allof(member) for member in members] + merged: JsonSchema = {k: v for k, v in s.items() if k != 'allOf'} + merged['type'] = 'object' + + properties: dict[str, JsonSchema] = {} + if isinstance(merged.get('properties'), dict): + properties.update(merged['properties']) + + required: set[str] = set(merged.get('required', []) or []) + pattern_properties: dict[str, JsonSchema] = dict(merged.get('patternProperties', {}) or {}) + additional_values: list[Any] = [] + + for m in processed_members: + if isinstance(m.get('properties'), dict): + properties.update(m['properties']) + if isinstance(m.get('required'), list): + required.update(m['required']) + if isinstance(m.get('patternProperties'), dict): + pattern_properties.update(m['patternProperties']) + if 'additionalProperties' in m: + additional_values.append(m['additionalProperties']) + + if properties: + merged['properties'] = {k: _recurse_flatten_allof(v) for k, v in properties.items()} + if required: + merged['required'] = sorted(required) + if pattern_properties: + merged['patternProperties'] = {k: _recurse_flatten_allof(v) for k, v in pattern_properties.items()} + + if additional_values: + merged['additionalProperties'] = _merge_additional_properties_values(additional_values) + + return merged + + +def _recurse_children(s: JsonSchema) -> JsonSchema: + t = s.get('type') + if t == 'object': + if isinstance(s.get('properties'), dict): + s['properties'] = { + k: _recurse_flatten_allof(cast(JsonSchema, v)) + for k, v in s['properties'].items() + if isinstance(v, dict) + } + ap = s.get('additionalProperties') + if isinstance(ap, dict): + ap_schema = cast(JsonSchema, ap) + s['additionalProperties'] = _recurse_flatten_allof(ap_schema) + if isinstance(s.get('patternProperties'), dict): + s['patternProperties'] = { + k: _recurse_flatten_allof(cast(JsonSchema, v)) + for k, v in s['patternProperties'].items() + if isinstance(v, dict) + } + elif t == 'array': + items = s.get('items') + if isinstance(items, dict): + s['items'] = _recurse_flatten_allof(cast(JsonSchema, items)) + return s + + +def _recurse_flatten_allof(schema: JsonSchema) -> JsonSchema: + s = deepcopy(schema) + s = _flatten_current_level(s) + s = _recurse_children(s) + return s + + +def flatten_allof(schema: JsonSchema) -> JsonSchema: + """Flatten simple object-only allOf combinations by merging object members. + + - Merges properties and unions required lists. + - Combines additionalProperties conservatively: only False if all are False; otherwise True. + - Recurses into nested object/array members. + - Leaves non-object allOfs untouched. + """ + return _recurse_flatten_allof(schema) diff --git a/pydantic_ai_slim/pydantic_ai/profiles/google.py b/pydantic_ai_slim/pydantic_ai/profiles/google.py index e8a88ac223..555483c3bb 100644 --- a/pydantic_ai_slim/pydantic_ai/profiles/google.py +++ b/pydantic_ai_slim/pydantic_ai/profiles/google.py @@ -4,7 +4,7 @@ from pydantic_ai.exceptions import UserError -from .._json_schema import JsonSchema, JsonSchemaTransformer +from .._json_schema import JsonSchema, JsonSchemaTransformer, flatten_allof from . import ModelProfile @@ -35,6 +35,8 @@ def __init__(self, schema: JsonSchema, *, strict: bool | None = None): super().__init__(schema, strict=strict, prefer_inlined_defs=True, simplify_nullable_unions=True) def transform(self, schema: JsonSchema) -> JsonSchema: + # Flatten object-only allOf to improve compatibility + schema = flatten_allof(schema) # Note: we need to remove `additionalProperties: False` since it is currently mishandled by Gemini additional_properties = schema.pop( 'additionalProperties', None diff --git a/pydantic_ai_slim/pydantic_ai/profiles/openai.py b/pydantic_ai_slim/pydantic_ai/profiles/openai.py index a3cd83d3e8..34fd99bf89 100644 --- a/pydantic_ai_slim/pydantic_ai/profiles/openai.py +++ b/pydantic_ai_slim/pydantic_ai/profiles/openai.py @@ -6,7 +6,7 @@ from dataclasses import dataclass from typing import Any, Literal -from .._json_schema import JsonSchema, JsonSchemaTransformer +from .._json_schema import JsonSchema, JsonSchemaTransformer, flatten_allof from . import ModelProfile OpenAISystemPromptRole = Literal['system', 'developer', 'user'] @@ -162,6 +162,8 @@ def walk(self) -> JsonSchema: return result def transform(self, schema: JsonSchema) -> JsonSchema: # noqa C901 + # First, flatten object-only allOf constructs to avoid unsupported combinators in strict mode + schema = flatten_allof(schema) # Remove unnecessary keys schema.pop('title', None) schema.pop('$schema', None) diff --git a/tests/models/test_openai_like_providers.py b/tests/models/test_openai_like_providers.py new file mode 100644 index 0000000000..4b45f628eb --- /dev/null +++ b/tests/models/test_openai_like_providers.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +from collections.abc import Callable + +import pytest + +from pydantic_ai._json_schema import JsonSchema +from pydantic_ai.profiles.openai import OpenAIJsonSchemaTransformer + + +def _openai_transformer_factory(schema: JsonSchema) -> OpenAIJsonSchemaTransformer: + return OpenAIJsonSchemaTransformer(schema, strict=True) + + +TransformerFactory = Callable[[JsonSchema], OpenAIJsonSchemaTransformer] + + +@pytest.mark.parametrize('transformer_factory', [_openai_transformer_factory]) +def test_openai_compatible_transformers_flatten_allof( + transformer_factory: TransformerFactory, +) -> None: + schema: JsonSchema = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'foo': {'type': 'string'}}, + 'required': ['foo'], + }, + { + 'type': 'object', + 'properties': {'bar': {'type': 'integer'}}, + 'required': ['bar'], + }, + ], + } + + transformer = transformer_factory(schema) + transformed = transformer.walk() + + # allOf should have been flattened by the transformer + assert 'allOf' not in transformed + assert transformed['type'] == 'object' + assert set(transformed.get('required', [])) == {'foo', 'bar'} + assert transformed['properties']['foo']['type'] == 'string' + assert transformed['properties']['bar']['type'] == 'integer' diff --git a/tests/test_json_schema_flattener.py b/tests/test_json_schema_flattener.py new file mode 100644 index 0000000000..d4ef298ca9 --- /dev/null +++ b/tests/test_json_schema_flattener.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +import copy +from typing import Any + + +def test_flatten_allof_simple_merge() -> None: + # Import inline to avoid import errors before implementation exists in editors + from pydantic_ai._json_schema import flatten_allof + + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'required': ['a'], + 'additionalProperties': False, + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'integer'}}, + 'required': ['b'], + 'additionalProperties': False, + }, + ], + } + + flattened = flatten_allof(copy.deepcopy(schema)) + + assert 'allOf' not in flattened + assert flattened['type'] == 'object' + assert flattened['properties']['a']['type'] == 'string' + assert flattened['properties']['b']['type'] == 'integer' + # union of required keys + assert set(flattened['required']) == {'a', 'b'} + # boolean AP should remain False when all are False + assert flattened.get('additionalProperties') is False + + +def test_flatten_allof_nested_objects_and_pass_through_keywords() -> None: + from pydantic_ai._json_schema import flatten_allof + + schema: dict[str, Any] = { + 'type': 'object', + 'title': 'Root', + 'allOf': [ + { + 'type': 'object', + 'properties': { + 'user': { + 'type': 'object', + 'properties': {'id': {'type': 'string'}}, + 'required': ['id'], + } + }, + 'required': ['user'], + }, + { + 'type': 'object', + 'properties': {'age': {'type': 'integer'}}, + 'required': ['age'], + }, + ], + 'description': 'test', + } + + flattened = flatten_allof(copy.deepcopy(schema)) + assert flattened.get('title') == 'Root' + assert flattened.get('description') == 'test' + assert 'allOf' not in flattened + assert set(flattened['required']) == {'user', 'age'} + assert flattened['properties']['user']['type'] == 'object' + assert set(flattened['properties']['user']['required']) == {'id'} + + +def test_flatten_allof_does_not_touch_unrelated_unions() -> None: + from pydantic_ai._json_schema import flatten_allof + + schema: dict[str, Any] = { + 'type': 'object', + 'properties': { + 'x': { + 'anyOf': [ + {'type': 'string'}, + {'type': 'null'}, + ] + } + }, + 'required': ['x'], + } + + flattened = flatten_allof(copy.deepcopy(schema)) + assert flattened['properties']['x'].get('anyOf') is not None + + +def test_flatten_allof_non_object_members_are_left_as_is() -> None: + from pydantic_ai._json_schema import flatten_allof + + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + {'type': 'string'}, + {'type': 'integer'}, + ], + } + + # Expect: we cannot sensibly merge non-object members; keep allOf + flattened = flatten_allof(copy.deepcopy(schema)) + assert 'allOf' in flattened From 8ff4db7af1c96b32d9f0c8fbc51fe58bfed8dacf Mon Sep 17 00:00:00 2001 From: Fabio Pulvirenti Date: Mon, 17 Nov 2025 13:05:03 +0100 Subject: [PATCH 02/12] Fixed issue OpenAI strictmode --- pydantic_ai_slim/pydantic_ai/profiles/openai.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pydantic_ai_slim/pydantic_ai/profiles/openai.py b/pydantic_ai_slim/pydantic_ai/profiles/openai.py index 34fd99bf89..37b5ca5a5a 100644 --- a/pydantic_ai_slim/pydantic_ai/profiles/openai.py +++ b/pydantic_ai_slim/pydantic_ai/profiles/openai.py @@ -157,7 +157,12 @@ def walk(self) -> JsonSchema: if self.root_ref is not None: result.pop('$ref', None) # We replace references to the self.root_ref with just '#' in the transform method root_key = re.sub(r'^#/\$defs/', '', self.root_ref) - result.update(self.defs.get(root_key) or {}) + # Use the transformed schema from $defs, not the original self.defs + if '$defs' in result and root_key in result['$defs']: + result.update(result['$defs'][root_key]) + else: + # Fallback to original if transformed version not available (shouldn't happen in normal flow) + result.update(self.defs.get(root_key) or {}) return result From e4aebaf859b7e20b89965231231cfd3277e4d7c7 Mon Sep 17 00:00:00 2001 From: Fabio Pulvirenti Date: Mon, 17 Nov 2025 15:03:05 +0100 Subject: [PATCH 03/12] Removed change in documentation --- docs/models/openai.md | 4 ---- docs/tools.md | 24 ------------------------ 2 files changed, 28 deletions(-) diff --git a/docs/models/openai.md b/docs/models/openai.md index 2324f97330..6c246481bf 100644 --- a/docs/models/openai.md +++ b/docs/models/openai.md @@ -99,10 +99,6 @@ agent = Agent(model) ``` ## OpenAI Responses API -### Tips for JSON Schema compatibility - -Strict Structured Outputs prefer flat object schemas without combinators. If your tool or output schema contains `allOf`/`oneOf`, consider flattening `allOf` ahead of time. See the Tools documentation section "Flattening allOf for provider compatibility" for a sample prepare hook. - Pydantic AI also supports OpenAI's [Responses API](https://platform.openai.com/docs/api-reference/responses) through the diff --git a/docs/tools.md b/docs/tools.md index 39510f2486..40dcf5c810 100644 --- a/docs/tools.md +++ b/docs/tools.md @@ -357,30 +357,6 @@ print(test_model.last_model_request_parameters.function_tools) _(This example is complete, it can be run "as is")_ -### Flattening allOf for provider compatibility - -Some providers (especially when using strict Structured Outputs) reject JSON Schema combinators like `allOf/oneOf`. -If your tool parameter schema includes `allOf`, you can flatten it before sending to the model using a prepare hook. - -Example prepare hook that flattens `allOf`: - -```python -from pydantic_ai import Agent -from pydantic_ai.tools import RunContext, ToolDefinition -from pydantic_ai._json_schema import flatten_allof - -async def flatten_prepare(ctx: RunContext[None], tool: ToolDefinition) -> ToolDefinition: - tool.parameters_json_schema = flatten_allof(tool.parameters_json_schema) - return tool - -# Register your tools normally, then pass `prepare_tools=flatten_prepare` to Agent if you want to apply globally. -agent = Agent('openai:gpt-4o', prepare_tools=lambda ctx, tools: [ - await flatten_prepare(ctx, t) or t for t in tools -]) -``` - -Alternatively, you can construct tools with a flattened schema at source (e.g., for MCP-exposed tools) using `Tool.from_schema`. - ## See Also For more tool features and integrations, see: From e5815fdab9a251428caf5142e39c40ec8ca200ea Mon Sep 17 00:00:00 2001 From: Fabio Pulvirenti Date: Mon, 17 Nov 2025 15:54:46 +0100 Subject: [PATCH 04/12] Added tests to increase coverage --- tests/models/test_openai.py | 30 ++++++ tests/test_json_schema_flattener.py | 142 ++++++++++++++++++++++++++++ 2 files changed, 172 insertions(+) diff --git a/tests/models/test_openai.py b/tests/models/test_openai.py index e68c64abe3..29adc1a51b 100644 --- a/tests/models/test_openai.py +++ b/tests/models/test_openai.py @@ -1915,6 +1915,36 @@ class MyModel(BaseModel): ) +def test_openai_transformer_fallback_when_defs_missing() -> None: + """Test that OpenAIJsonSchemaTransformer falls back to original defs when root_key not in $defs.""" + from unittest.mock import patch + + from pydantic_ai.profiles.openai import OpenAIJsonSchemaTransformer + + # Create a schema with $ref pointing to a key that exists in original + schema: dict[str, Any] = { + '$ref': '#/$defs/MyModel', + '$defs': { + 'MyModel': { + 'type': 'object', + 'properties': {'foo': {'type': 'string'}}, + 'required': ['foo'], + }, + }, + } + + transformer = OpenAIJsonSchemaTransformer(schema, strict=True) + + # Mock super().walk() to return a result without $defs to test the fallback path + with patch.object(transformer.__class__.__bases__[0], 'walk', return_value={'$ref': '#/$defs/MyModel'}): + result = transformer.walk() + # The fallback should use self.defs.get(root_key) or {} + # Since we mocked to return just $ref, the fallback should trigger + assert isinstance(result, dict) + # Result should have been updated with the original defs content + assert 'properties' in result or 'type' in result + + def test_native_output_strict_mode(allow_model_requests: None): class CityLocation(BaseModel): city: str diff --git a/tests/test_json_schema_flattener.py b/tests/test_json_schema_flattener.py index d4ef298ca9..380889b071 100644 --- a/tests/test_json_schema_flattener.py +++ b/tests/test_json_schema_flattener.py @@ -108,3 +108,145 @@ def test_flatten_allof_non_object_members_are_left_as_is() -> None: # Expect: we cannot sensibly merge non-object members; keep allOf flattened = flatten_allof(copy.deepcopy(schema)) assert 'allOf' in flattened + + +def test_flatten_allof_object_like_without_type() -> None: + """Test that object-like schemas without explicit type are recognized.""" + from pydantic_ai._json_schema import flatten_allof + + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + # No type, but has properties - should be recognized as object-like + 'properties': {'a': {'type': 'string'}}, + 'required': ['a'], + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'integer'}}, + 'required': ['b'], + }, + ], + } + + flattened = flatten_allof(copy.deepcopy(schema)) + assert 'allOf' not in flattened + assert set(flattened['required']) == {'a', 'b'} + + +def test_flatten_allof_with_dict_additional_properties() -> None: + """Test merging when additionalProperties is a dict schema.""" + from pydantic_ai._json_schema import flatten_allof + + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'additionalProperties': {'type': 'string'}, # dict schema + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'integer'}}, + 'additionalProperties': False, + }, + ], + } + + flattened = flatten_allof(copy.deepcopy(schema)) + assert 'allOf' not in flattened + # When any member has dict additionalProperties, result should be True + assert flattened.get('additionalProperties') is True + + +def test_flatten_allof_with_non_dict_member() -> None: + """Test that allOf with non-dict members is left untouched.""" + from pydantic_ai._json_schema import flatten_allof + + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + {'type': 'object', 'properties': {'a': {'type': 'string'}}}, + 'not a dict', # Non-dict member + ], + } + + flattened = flatten_allof(copy.deepcopy(schema)) + # Should be left untouched because one member is not a dict + assert 'allOf' in flattened + + +def test_flatten_allof_no_initial_properties() -> None: + """Test flattening when root schema has no initial properties.""" + from pydantic_ai._json_schema import flatten_allof + + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'required': ['a'], + }, + ], + } + + flattened = flatten_allof(copy.deepcopy(schema)) + assert 'allOf' not in flattened + assert flattened['properties']['a']['type'] == 'string' + assert flattened['required'] == ['a'] + + +def test_flatten_allof_members_without_properties() -> None: + """Test flattening when some members don't have properties/required/patternProperties.""" + from pydantic_ai._json_schema import flatten_allof + + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'required': ['a'], + }, + { + 'type': 'object', + # No properties, required, or patternProperties + 'additionalProperties': False, + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'integer'}}, + # No required + }, + ], + } + + flattened = flatten_allof(copy.deepcopy(schema)) + assert 'allOf' not in flattened + assert set(flattened['properties'].keys()) == {'a', 'b'} + assert flattened['required'] == ['a'] # Only from first member + assert flattened.get('additionalProperties') is False + + +def test_flatten_allof_empty_properties_after_merge() -> None: + """Test edge case where properties/required/patternProperties might be empty.""" + from pydantic_ai._json_schema import flatten_allof + + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + # No properties, required, or patternProperties + }, + ], + } + + flattened = flatten_allof(copy.deepcopy(schema)) + assert 'allOf' not in flattened + # Should not have properties/required if they're empty + assert 'properties' not in flattened or not flattened.get('properties') + assert 'required' not in flattened or not flattened.get('required') From b1058e99273846900386392eb8cafd94fdaa01aa Mon Sep 17 00:00:00 2001 From: Fabio Pulvirenti Date: Mon, 17 Nov 2025 16:31:31 +0100 Subject: [PATCH 05/12] Added tests to increase coverage --- tests/test_json_schema_flattener.py | 56 +++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/tests/test_json_schema_flattener.py b/tests/test_json_schema_flattener.py index 380889b071..631f0d9405 100644 --- a/tests/test_json_schema_flattener.py +++ b/tests/test_json_schema_flattener.py @@ -250,3 +250,59 @@ def test_flatten_allof_empty_properties_after_merge() -> None: # Should not have properties/required if they're empty assert 'properties' not in flattened or not flattened.get('properties') assert 'required' not in flattened or not flattened.get('required') + + +def test_flatten_allof_with_initial_properties() -> None: + """Test flattening when root schema has initial properties (line 229).""" + from pydantic_ai._json_schema import flatten_allof + + schema: dict[str, Any] = { + 'type': 'object', + 'properties': {'root_prop': {'type': 'string'}}, # Initial properties + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'integer'}}, + 'required': ['a'], + }, + ], + } + + flattened = flatten_allof(copy.deepcopy(schema)) + assert 'allOf' not in flattened + assert 'root_prop' in flattened['properties'] + assert 'a' in flattened['properties'] + assert flattened['required'] == ['a'] + + +def test_flatten_allof_with_pattern_properties() -> None: + """Test flattening when members have patternProperties (lines 241, 250).""" + from pydantic_ai._json_schema import flatten_allof + + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'patternProperties': { + '^prefix_': {'type': 'string'}, + }, + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'integer'}}, + 'patternProperties': { + '^suffix_': {'type': 'number'}, + }, + }, + ], + } + + flattened = flatten_allof(copy.deepcopy(schema)) + assert 'allOf' not in flattened + assert 'patternProperties' in flattened + assert '^prefix_' in flattened['patternProperties'] + assert '^suffix_' in flattened['patternProperties'] + assert flattened['patternProperties']['^prefix_']['type'] == 'string' + assert flattened['patternProperties']['^suffix_']['type'] == 'number' From 1244bfd168d5b3beba2a4e24f46f97fb1d41e131 Mon Sep 17 00:00:00 2001 From: Fabio Pulvirenti Date: Mon, 17 Nov 2025 19:14:54 +0100 Subject: [PATCH 06/12] Updated pull request based on feedbacks --- pydantic_ai_slim/pydantic_ai/_json_schema.py | 6 + .../pydantic_ai/profiles/openai.py | 4 +- tests/models/test_openai.py | 64 ++++- tests/models/test_openai_like_providers.py | 46 ---- tests/test_json_schema_flattener.py | 255 ++++++++++++------ 5 files changed, 232 insertions(+), 143 deletions(-) delete mode 100644 tests/models/test_openai_like_providers.py diff --git a/pydantic_ai_slim/pydantic_ai/_json_schema.py b/pydantic_ai_slim/pydantic_ai/_json_schema.py index 6309fc3d16..835868c26b 100644 --- a/pydantic_ai_slim/pydantic_ai/_json_schema.py +++ b/pydantic_ai_slim/pydantic_ai/_json_schema.py @@ -28,6 +28,7 @@ def __init__( strict: bool | None = None, prefer_inlined_defs: bool = False, simplify_nullable_unions: bool = False, + flatten_allof: bool = False, ): self.schema = schema @@ -38,6 +39,7 @@ def __init__( self.prefer_inlined_defs = prefer_inlined_defs self.simplify_nullable_unions = simplify_nullable_unions + self.flatten_allof = flatten_allof self.defs: dict[str, JsonSchema] = self.schema.get('$defs', {}) self.refs_stack: list[str] = [] @@ -77,6 +79,10 @@ def walk(self) -> JsonSchema: return handled def _handle(self, schema: JsonSchema) -> JsonSchema: + # Flatten allOf if requested, before processing the schema + if self.flatten_allof: + schema = flatten_allof(schema) + nested_refs = 0 if self.prefer_inlined_defs: while ref := schema.get('$ref'): diff --git a/pydantic_ai_slim/pydantic_ai/profiles/openai.py b/pydantic_ai_slim/pydantic_ai/profiles/openai.py index 37b5ca5a5a..1595072a95 100644 --- a/pydantic_ai_slim/pydantic_ai/profiles/openai.py +++ b/pydantic_ai_slim/pydantic_ai/profiles/openai.py @@ -6,7 +6,7 @@ from dataclasses import dataclass from typing import Any, Literal -from .._json_schema import JsonSchema, JsonSchemaTransformer, flatten_allof +from .._json_schema import JsonSchema, JsonSchemaTransformer from . import ModelProfile OpenAISystemPromptRole = Literal['system', 'developer', 'user'] @@ -167,8 +167,6 @@ def walk(self) -> JsonSchema: return result def transform(self, schema: JsonSchema) -> JsonSchema: # noqa C901 - # First, flatten object-only allOf constructs to avoid unsupported combinators in strict mode - schema = flatten_allof(schema) # Remove unnecessary keys schema.pop('title', None) schema.pop('$schema', None) diff --git a/tests/models/test_openai.py b/tests/models/test_openai.py index 29adc1a51b..8a3f840936 100644 --- a/tests/models/test_openai.py +++ b/tests/models/test_openai.py @@ -1915,13 +1915,11 @@ class MyModel(BaseModel): ) -def test_openai_transformer_fallback_when_defs_missing() -> None: - """Test that OpenAIJsonSchemaTransformer falls back to original defs when root_key not in $defs.""" - from unittest.mock import patch - +def test_openai_transformer_with_recursive_ref() -> None: + """Test that OpenAIJsonSchemaTransformer correctly handles recursive models with $ref root.""" from pydantic_ai.profiles.openai import OpenAIJsonSchemaTransformer - # Create a schema with $ref pointing to a key that exists in original + # Create a schema with $ref root (recursive model scenario) schema: dict[str, Any] = { '$ref': '#/$defs/MyModel', '$defs': { @@ -1934,15 +1932,55 @@ def test_openai_transformer_fallback_when_defs_missing() -> None: } transformer = OpenAIJsonSchemaTransformer(schema, strict=True) + result = transformer.walk() + + # The transformer should resolve the $ref and use the transformed schema from $defs + # (not the original self.defs, which was the bug we fixed) + assert isinstance(result, dict) + # In strict mode, all properties should be required + assert 'properties' in result + assert 'required' in result + # The transformed schema should have strict mode applied (additionalProperties: False) + assert result.get('additionalProperties') is False + # All properties should be in required list (strict mode requirement) + assert 'foo' in result['required'] + + +def test_openai_transformer_flattens_allof() -> None: + """Test that OpenAIJsonSchemaTransformer flattens allOf schemas.""" + from pydantic_ai._json_schema import JsonSchema + from pydantic_ai.profiles.openai import OpenAIJsonSchemaTransformer + + schema: JsonSchema = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'foo': {'type': 'string'}}, + 'required': ['foo'], + }, + { + 'type': 'object', + 'properties': {'bar': {'type': 'integer'}}, + 'required': ['bar'], + }, + ], + } + + transformer = OpenAIJsonSchemaTransformer(schema, strict=True) + transformed = transformer.walk() - # Mock super().walk() to return a result without $defs to test the fallback path - with patch.object(transformer.__class__.__bases__[0], 'walk', return_value={'$ref': '#/$defs/MyModel'}): - result = transformer.walk() - # The fallback should use self.defs.get(root_key) or {} - # Since we mocked to return just $ref, the fallback should trigger - assert isinstance(result, dict) - # Result should have been updated with the original defs content - assert 'properties' in result or 'type' in result + assert transformed == snapshot( + { + 'type': 'object', + 'properties': { + 'foo': {'type': 'string'}, + 'bar': {'type': 'integer'}, + }, + 'required': ['foo', 'bar'], + 'additionalProperties': False, + } + ) def test_native_output_strict_mode(allow_model_requests: None): diff --git a/tests/models/test_openai_like_providers.py b/tests/models/test_openai_like_providers.py deleted file mode 100644 index 4b45f628eb..0000000000 --- a/tests/models/test_openai_like_providers.py +++ /dev/null @@ -1,46 +0,0 @@ -from __future__ import annotations - -from collections.abc import Callable - -import pytest - -from pydantic_ai._json_schema import JsonSchema -from pydantic_ai.profiles.openai import OpenAIJsonSchemaTransformer - - -def _openai_transformer_factory(schema: JsonSchema) -> OpenAIJsonSchemaTransformer: - return OpenAIJsonSchemaTransformer(schema, strict=True) - - -TransformerFactory = Callable[[JsonSchema], OpenAIJsonSchemaTransformer] - - -@pytest.mark.parametrize('transformer_factory', [_openai_transformer_factory]) -def test_openai_compatible_transformers_flatten_allof( - transformer_factory: TransformerFactory, -) -> None: - schema: JsonSchema = { - 'type': 'object', - 'allOf': [ - { - 'type': 'object', - 'properties': {'foo': {'type': 'string'}}, - 'required': ['foo'], - }, - { - 'type': 'object', - 'properties': {'bar': {'type': 'integer'}}, - 'required': ['bar'], - }, - ], - } - - transformer = transformer_factory(schema) - transformed = transformer.walk() - - # allOf should have been flattened by the transformer - assert 'allOf' not in transformed - assert transformed['type'] == 'object' - assert set(transformed.get('required', [])) == {'foo', 'bar'} - assert transformed['properties']['foo']['type'] == 'string' - assert transformed['properties']['bar']['type'] == 'integer' diff --git a/tests/test_json_schema_flattener.py b/tests/test_json_schema_flattener.py index 631f0d9405..31c6f9a718 100644 --- a/tests/test_json_schema_flattener.py +++ b/tests/test_json_schema_flattener.py @@ -1,13 +1,23 @@ from __future__ import annotations -import copy from typing import Any +from inline_snapshot import snapshot -def test_flatten_allof_simple_merge() -> None: - # Import inline to avoid import errors before implementation exists in editors - from pydantic_ai._json_schema import flatten_allof +from pydantic_ai._json_schema import JsonSchema, JsonSchemaTransformer + + +class FlattenAllofTransformer(JsonSchemaTransformer): + """Transformer that only flattens allOf, no other transformations.""" + + def __init__(self, schema: JsonSchema): + super().__init__(schema, flatten_allof=True) + + def transform(self, schema: JsonSchema) -> JsonSchema: + return schema + +def test_flatten_allof_simple_merge() -> None: schema: dict[str, Any] = { 'type': 'object', 'allOf': [ @@ -26,21 +36,23 @@ def test_flatten_allof_simple_merge() -> None: ], } - flattened = flatten_allof(copy.deepcopy(schema)) + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() - assert 'allOf' not in flattened - assert flattened['type'] == 'object' - assert flattened['properties']['a']['type'] == 'string' - assert flattened['properties']['b']['type'] == 'integer' - # union of required keys - assert set(flattened['required']) == {'a', 'b'} - # boolean AP should remain False when all are False - assert flattened.get('additionalProperties') is False + assert flattened == snapshot( + { + 'type': 'object', + 'properties': { + 'a': {'type': 'string'}, + 'b': {'type': 'integer'}, + }, + 'required': ['a', 'b'], + 'additionalProperties': False, + } + ) def test_flatten_allof_nested_objects_and_pass_through_keywords() -> None: - from pydantic_ai._json_schema import flatten_allof - schema: dict[str, Any] = { 'type': 'object', 'title': 'Root', @@ -65,18 +77,28 @@ def test_flatten_allof_nested_objects_and_pass_through_keywords() -> None: 'description': 'test', } - flattened = flatten_allof(copy.deepcopy(schema)) - assert flattened.get('title') == 'Root' - assert flattened.get('description') == 'test' - assert 'allOf' not in flattened - assert set(flattened['required']) == {'user', 'age'} - assert flattened['properties']['user']['type'] == 'object' - assert set(flattened['properties']['user']['required']) == {'id'} + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + assert flattened == snapshot( + { + 'type': 'object', + 'title': 'Root', + 'description': 'test', + 'properties': { + 'user': { + 'type': 'object', + 'properties': {'id': {'type': 'string'}}, + 'required': ['id'], + }, + 'age': {'type': 'integer'}, + }, + 'required': ['user', 'age'], + } + ) def test_flatten_allof_does_not_touch_unrelated_unions() -> None: - from pydantic_ai._json_schema import flatten_allof - schema: dict[str, Any] = { 'type': 'object', 'properties': { @@ -90,13 +112,26 @@ def test_flatten_allof_does_not_touch_unrelated_unions() -> None: 'required': ['x'], } - flattened = flatten_allof(copy.deepcopy(schema)) - assert flattened['properties']['x'].get('anyOf') is not None + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + assert flattened == snapshot( + { + 'type': 'object', + 'properties': { + 'x': { + 'anyOf': [ + {'type': 'string'}, + {'type': 'null'}, + ] + } + }, + 'required': ['x'], + } + ) def test_flatten_allof_non_object_members_are_left_as_is() -> None: - from pydantic_ai._json_schema import flatten_allof - schema: dict[str, Any] = { 'type': 'object', 'allOf': [ @@ -105,15 +140,23 @@ def test_flatten_allof_non_object_members_are_left_as_is() -> None: ], } + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + # Expect: we cannot sensibly merge non-object members; keep allOf - flattened = flatten_allof(copy.deepcopy(schema)) - assert 'allOf' in flattened + assert flattened == snapshot( + { + 'type': 'object', + 'allOf': [ + {'type': 'string'}, + {'type': 'integer'}, + ], + } + ) def test_flatten_allof_object_like_without_type() -> None: """Test that object-like schemas without explicit type are recognized.""" - from pydantic_ai._json_schema import flatten_allof - schema: dict[str, Any] = { 'type': 'object', 'allOf': [ @@ -130,15 +173,23 @@ def test_flatten_allof_object_like_without_type() -> None: ], } - flattened = flatten_allof(copy.deepcopy(schema)) - assert 'allOf' not in flattened - assert set(flattened['required']) == {'a', 'b'} + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + assert flattened == snapshot( + { + 'type': 'object', + 'properties': { + 'a': {'type': 'string'}, + 'b': {'type': 'integer'}, + }, + 'required': ['a', 'b'], + } + ) def test_flatten_allof_with_dict_additional_properties() -> None: """Test merging when additionalProperties is a dict schema.""" - from pydantic_ai._json_schema import flatten_allof - schema: dict[str, Any] = { 'type': 'object', 'allOf': [ @@ -155,16 +206,23 @@ def test_flatten_allof_with_dict_additional_properties() -> None: ], } - flattened = flatten_allof(copy.deepcopy(schema)) - assert 'allOf' not in flattened - # When any member has dict additionalProperties, result should be True - assert flattened.get('additionalProperties') is True + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + assert flattened == snapshot( + { + 'type': 'object', + 'properties': { + 'a': {'type': 'string'}, + 'b': {'type': 'integer'}, + }, + 'additionalProperties': True, # When any member has dict additionalProperties, result should be True + } + ) def test_flatten_allof_with_non_dict_member() -> None: """Test that allOf with non-dict members is left untouched.""" - from pydantic_ai._json_schema import flatten_allof - schema: dict[str, Any] = { 'type': 'object', 'allOf': [ @@ -173,15 +231,23 @@ def test_flatten_allof_with_non_dict_member() -> None: ], } - flattened = flatten_allof(copy.deepcopy(schema)) + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + # Should be left untouched because one member is not a dict - assert 'allOf' in flattened + assert flattened == snapshot( + { + 'type': 'object', + 'allOf': [ + {'type': 'object', 'properties': {'a': {'type': 'string'}}}, + 'not a dict', + ], + } + ) def test_flatten_allof_no_initial_properties() -> None: """Test flattening when root schema has no initial properties.""" - from pydantic_ai._json_schema import flatten_allof - schema: dict[str, Any] = { 'type': 'object', 'allOf': [ @@ -193,16 +259,20 @@ def test_flatten_allof_no_initial_properties() -> None: ], } - flattened = flatten_allof(copy.deepcopy(schema)) - assert 'allOf' not in flattened - assert flattened['properties']['a']['type'] == 'string' - assert flattened['required'] == ['a'] + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'required': ['a'], + } + ) def test_flatten_allof_members_without_properties() -> None: """Test flattening when some members don't have properties/required/patternProperties.""" - from pydantic_ai._json_schema import flatten_allof - schema: dict[str, Any] = { 'type': 'object', 'allOf': [ @@ -224,17 +294,24 @@ def test_flatten_allof_members_without_properties() -> None: ], } - flattened = flatten_allof(copy.deepcopy(schema)) - assert 'allOf' not in flattened - assert set(flattened['properties'].keys()) == {'a', 'b'} - assert flattened['required'] == ['a'] # Only from first member - assert flattened.get('additionalProperties') is False + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + assert flattened == snapshot( + { + 'type': 'object', + 'properties': { + 'a': {'type': 'string'}, + 'b': {'type': 'integer'}, + }, + 'required': ['a'], # Only from first member + 'additionalProperties': False, + } + ) def test_flatten_allof_empty_properties_after_merge() -> None: """Test edge case where properties/required/patternProperties might be empty.""" - from pydantic_ai._json_schema import flatten_allof - schema: dict[str, Any] = { 'type': 'object', 'allOf': [ @@ -245,17 +322,18 @@ def test_flatten_allof_empty_properties_after_merge() -> None: ], } - flattened = flatten_allof(copy.deepcopy(schema)) - assert 'allOf' not in flattened - # Should not have properties/required if they're empty - assert 'properties' not in flattened or not flattened.get('properties') - assert 'required' not in flattened or not flattened.get('required') + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + assert flattened == snapshot( + { + 'type': 'object', + } + ) def test_flatten_allof_with_initial_properties() -> None: """Test flattening when root schema has initial properties (line 229).""" - from pydantic_ai._json_schema import flatten_allof - schema: dict[str, Any] = { 'type': 'object', 'properties': {'root_prop': {'type': 'string'}}, # Initial properties @@ -268,17 +346,23 @@ def test_flatten_allof_with_initial_properties() -> None: ], } - flattened = flatten_allof(copy.deepcopy(schema)) - assert 'allOf' not in flattened - assert 'root_prop' in flattened['properties'] - assert 'a' in flattened['properties'] - assert flattened['required'] == ['a'] + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + assert flattened == snapshot( + { + 'type': 'object', + 'properties': { + 'root_prop': {'type': 'string'}, + 'a': {'type': 'integer'}, + }, + 'required': ['a'], + } + ) def test_flatten_allof_with_pattern_properties() -> None: """Test flattening when members have patternProperties (lines 241, 250).""" - from pydantic_ai._json_schema import flatten_allof - schema: dict[str, Any] = { 'type': 'object', 'allOf': [ @@ -299,10 +383,19 @@ def test_flatten_allof_with_pattern_properties() -> None: ], } - flattened = flatten_allof(copy.deepcopy(schema)) - assert 'allOf' not in flattened - assert 'patternProperties' in flattened - assert '^prefix_' in flattened['patternProperties'] - assert '^suffix_' in flattened['patternProperties'] - assert flattened['patternProperties']['^prefix_']['type'] == 'string' - assert flattened['patternProperties']['^suffix_']['type'] == 'number' + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + assert flattened == snapshot( + { + 'type': 'object', + 'properties': { + 'a': {'type': 'string'}, + 'b': {'type': 'integer'}, + }, + 'patternProperties': { + '^prefix_': {'type': 'string'}, + '^suffix_': {'type': 'number'}, + }, + } + ) From f1430292c2767cb84f1b7d518cbe2fd8fe6349f0 Mon Sep 17 00:00:00 2001 From: Fabio Pulvirenti Date: Mon, 17 Nov 2025 19:19:38 +0100 Subject: [PATCH 07/12] Updated pull request based on feedbacks --- pydantic_ai_slim/pydantic_ai/profiles/google.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/profiles/google.py b/pydantic_ai_slim/pydantic_ai/profiles/google.py index 555483c3bb..e8a88ac223 100644 --- a/pydantic_ai_slim/pydantic_ai/profiles/google.py +++ b/pydantic_ai_slim/pydantic_ai/profiles/google.py @@ -4,7 +4,7 @@ from pydantic_ai.exceptions import UserError -from .._json_schema import JsonSchema, JsonSchemaTransformer, flatten_allof +from .._json_schema import JsonSchema, JsonSchemaTransformer from . import ModelProfile @@ -35,8 +35,6 @@ def __init__(self, schema: JsonSchema, *, strict: bool | None = None): super().__init__(schema, strict=strict, prefer_inlined_defs=True, simplify_nullable_unions=True) def transform(self, schema: JsonSchema) -> JsonSchema: - # Flatten object-only allOf to improve compatibility - schema = flatten_allof(schema) # Note: we need to remove `additionalProperties: False` since it is currently mishandled by Gemini additional_properties = schema.pop( 'additionalProperties', None From eaa685ed2383ed504de1cc53beb19c69bc1aa2bd Mon Sep 17 00:00:00 2001 From: Fabio Pulvirenti Date: Tue, 18 Nov 2025 09:17:32 +0100 Subject: [PATCH 08/12] Updated failing tests and flatten logic --- pydantic_ai_slim/pydantic_ai/profiles/openai.py | 2 +- tests/test_json_schema_flattener.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/profiles/openai.py b/pydantic_ai_slim/pydantic_ai/profiles/openai.py index 1595072a95..82232697c7 100644 --- a/pydantic_ai_slim/pydantic_ai/profiles/openai.py +++ b/pydantic_ai_slim/pydantic_ai/profiles/openai.py @@ -143,7 +143,7 @@ class OpenAIJsonSchemaTransformer(JsonSchemaTransformer): """ def __init__(self, schema: JsonSchema, *, strict: bool | None = None): - super().__init__(schema, strict=strict) + super().__init__(schema, strict=strict, flatten_allof=True) self.root_ref = schema.get('$ref') def walk(self) -> JsonSchema: diff --git a/tests/test_json_schema_flattener.py b/tests/test_json_schema_flattener.py index 31c6f9a718..8d40a2d93d 100644 --- a/tests/test_json_schema_flattener.py +++ b/tests/test_json_schema_flattener.py @@ -93,7 +93,7 @@ def test_flatten_allof_nested_objects_and_pass_through_keywords() -> None: }, 'age': {'type': 'integer'}, }, - 'required': ['user', 'age'], + 'required': ['age', 'user'], } ) From cea2b100f4a767ed9fda4786ba0c216a808b3a9d Mon Sep 17 00:00:00 2001 From: Fabio Pulvirenti Date: Tue, 18 Nov 2025 12:44:42 +0100 Subject: [PATCH 09/12] Updated failing tests and flatten logic --- tests/models/test_openai.py | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/tests/models/test_openai.py b/tests/models/test_openai.py index 8a3f840936..7ed78c89a0 100644 --- a/tests/models/test_openai.py +++ b/tests/models/test_openai.py @@ -1946,6 +1946,43 @@ def test_openai_transformer_with_recursive_ref() -> None: assert 'foo' in result['required'] +def test_openai_transformer_fallback_when_defs_missing() -> None: + """Test fallback path when root_key is not in result['$defs'] (line 165). + + This tests the safety net fallback that shouldn't happen in normal flow. + The fallback uses self.defs (original schema) when the transformed $defs + doesn't contain the root_key. This edge case is simulated using a mock. + """ + from unittest.mock import patch + + from pydantic_ai.profiles.openai import OpenAIJsonSchemaTransformer + + schema: dict[str, Any] = { + '$ref': '#/$defs/MyModel', + '$defs': { + 'MyModel': { + 'type': 'object', + 'properties': {'foo': {'type': 'string'}}, + 'required': ['foo'], + }, + }, + } + + transformer = OpenAIJsonSchemaTransformer(schema, strict=True) + + # Simulate edge case: super().walk() returns $defs without root_key + # This shouldn't happen in normal flow, but we test the fallback path + with patch.object( + transformer.__class__.__bases__[0], + 'walk', + return_value={'$defs': {'OtherModel': {'type': 'object'}}}, + ): + result = transformer.walk() + # Fallback should use self.defs.get(root_key) which contains MyModel + assert isinstance(result, dict) + assert 'properties' in result or 'type' in result + + def test_openai_transformer_flattens_allof() -> None: """Test that OpenAIJsonSchemaTransformer flattens allOf schemas.""" from pydantic_ai._json_schema import JsonSchema From 27192da058536f7d7f4f9494d612363134f906bd Mon Sep 17 00:00:00 2001 From: Fabio Pulvirenti Date: Wed, 19 Nov 2025 12:27:01 +0100 Subject: [PATCH 10/12] Updated failing tests and flatten logic --- pydantic_ai_slim/pydantic_ai/_json_schema.py | 182 ++++++-- tests/models/test_openai.py | 86 +++- tests/test_json_schema_flattener.py | 445 ++++++++++++++++++- 3 files changed, 645 insertions(+), 68 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/_json_schema.py b/pydantic_ai_slim/pydantic_ai/_json_schema.py index 835868c26b..fb0b9abafd 100644 --- a/pydantic_ai_slim/pydantic_ai/_json_schema.py +++ b/pydantic_ai_slim/pydantic_ai/_json_schema.py @@ -10,7 +10,7 @@ JsonSchema = dict[str, Any] -__all__ = ['JsonSchemaTransformer', 'InlineDefsJsonSchemaTransformer', 'flatten_allof'] +__all__ = ['JsonSchemaTransformer', 'InlineDefsJsonSchemaTransformer'] @dataclass(init=False) @@ -81,7 +81,7 @@ def walk(self) -> JsonSchema: def _handle(self, schema: JsonSchema) -> JsonSchema: # Flatten allOf if requested, before processing the schema if self.flatten_allof: - schema = flatten_allof(schema) + schema = _recurse_flatten_allof(schema) nested_refs = 0 if self.prefer_inlined_defs: @@ -209,9 +209,123 @@ def _allof_is_object_like(member: JsonSchema) -> bool: def _merge_additional_properties_values(values: list[Any]) -> bool | JsonSchema: + # If any value is False, return False (most restrictive) + if any(v is False for v in values): + return False + # If any value is a dict schema, we can't easily merge multiple schemas, so return True if any(isinstance(v, dict) for v in values): return True - return False if values and all(v is False for v in values) else True + # Default to True (allow additional properties) + return True + + +def _collect_member_data( + processed_members: list[JsonSchema], +) -> tuple[ + dict[str, JsonSchema], + set[str], + dict[str, JsonSchema], + list[Any], + list[set[str]], + list[dict[str, JsonSchema]], + list[Any], +]: + """Collect properties, required, patternProperties, and additionalProperties from all members.""" + properties: dict[str, JsonSchema] = {} + required: set[str] = set() + pattern_properties: dict[str, JsonSchema] = {} + additional_values: list[Any] = [] + restricted_property_sets: list[set[str]] = [] + members_properties: list[dict[str, JsonSchema]] = [] + members_additional_props: list[Any] = [] + + for m in processed_members: + member_properties_raw = m.get('properties') + member_properties: dict[str, JsonSchema] = ( + cast(dict[str, JsonSchema], member_properties_raw) if isinstance(member_properties_raw, dict) else {} + ) + members_properties.append(member_properties) + members_additional_props.append(m.get('additionalProperties')) + + if member_properties: + properties.update(member_properties) + if isinstance(m.get('required'), list): + required.update(m['required']) + if isinstance(m.get('patternProperties'), dict): + pattern_properties.update(m['patternProperties']) + if 'additionalProperties' in m: + additional_values.append(m['additionalProperties']) + if m['additionalProperties'] is False: + restricted_property_sets.append(set(member_properties.keys())) + + return ( + properties, + required, + pattern_properties, + additional_values, + restricted_property_sets, + members_properties, + members_additional_props, + ) + + +def _filter_by_restricted_property_sets(merged: JsonSchema, restricted_property_sets: list[set[str]]) -> None: + """Filter properties to only those allowed by all members with additionalProperties: False.""" + if not restricted_property_sets: + return + + allowed_names = restricted_property_sets[0].copy() + for prop_set in restricted_property_sets[1:]: + allowed_names &= prop_set + + if 'properties' in merged: + merged['properties'] = {k: v for k, v in merged['properties'].items() if k in allowed_names} + if not merged['properties']: + merged.pop('properties') + if 'required' in merged: + merged['required'] = [k for k in merged['required'] if k in allowed_names] + if not merged['required']: + merged.pop('required') + + +def _filter_incompatible_properties( + merged: JsonSchema, + members_properties: list[dict[str, JsonSchema]], + members_additional_props: list[Any], +) -> None: + """Filter properties that are incompatible with additionalProperties constraints.""" + if 'properties' not in merged: + return + + incompatible_props: set[str] = set() + for prop_name, prop_schema in merged['properties'].items(): + prop_types = _get_type_set(prop_schema) + for member_props, member_additional in zip(members_properties, members_additional_props): + if prop_name in member_props: + member_prop_types = _get_type_set(member_props[prop_name]) + if prop_types and member_prop_types and not prop_types & member_prop_types: + incompatible_props.add(prop_name) + break + continue + + if member_additional is False: + incompatible_props.add(prop_name) + break + + if isinstance(member_additional, dict): + allowed_types = _get_type_set(cast(JsonSchema, member_additional)) + if prop_types and allowed_types and not prop_types <= allowed_types: + incompatible_props.add(prop_name) + break + + if incompatible_props: + merged['properties'] = {k: v for k, v in merged['properties'].items() if k not in incompatible_props} + if not merged['properties']: + merged.pop('properties') + if 'required' in merged: + merged['required'] = [k for k in merged['required'] if k not in incompatible_props] + if not merged['required']: + merged.pop('required') def _flatten_current_level(s: JsonSchema) -> JsonSchema: @@ -230,24 +344,31 @@ def _flatten_current_level(s: JsonSchema) -> JsonSchema: merged: JsonSchema = {k: v for k, v in s.items() if k != 'allOf'} merged['type'] = 'object' + # Collect initial properties from merged schema properties: dict[str, JsonSchema] = {} if isinstance(merged.get('properties'), dict): properties.update(merged['properties']) required: set[str] = set(merged.get('required', []) or []) pattern_properties: dict[str, JsonSchema] = dict(merged.get('patternProperties', {}) or {}) - additional_values: list[Any] = [] - - for m in processed_members: - if isinstance(m.get('properties'), dict): - properties.update(m['properties']) - if isinstance(m.get('required'), list): - required.update(m['required']) - if isinstance(m.get('patternProperties'), dict): - pattern_properties.update(m['patternProperties']) - if 'additionalProperties' in m: - additional_values.append(m['additionalProperties']) + # Collect data from all members + ( + member_properties, + member_required, + member_pattern_properties, + additional_values, + restricted_property_sets, + members_properties, + members_additional_props, + ) = _collect_member_data(processed_members) + + # Merge all collected data + properties.update(member_properties) + required.update(member_required) + pattern_properties.update(member_pattern_properties) + + # Apply merged properties, required, and patternProperties if properties: merged['properties'] = {k: _recurse_flatten_allof(v) for k, v in properties.items()} if required: @@ -255,12 +376,34 @@ def _flatten_current_level(s: JsonSchema) -> JsonSchema: if pattern_properties: merged['patternProperties'] = {k: _recurse_flatten_allof(v) for k, v in pattern_properties.items()} + # Filter by restricted property sets (additionalProperties: False) + _filter_by_restricted_property_sets(merged, restricted_property_sets) + + # Merge additionalProperties if additional_values: merged['additionalProperties'] = _merge_additional_properties_values(additional_values) + # Filter incompatible properties based on additionalProperties constraints + _filter_incompatible_properties(merged, members_properties, members_additional_props) + return merged +def _get_type_set(schema: JsonSchema | None) -> set[str] | None: + if not schema: + return None + schema_type = schema.get('type') + if isinstance(schema_type, list): + result: set[str] = set() + type_list: list[Any] = cast(list[Any], schema_type) + for t in type_list: + result.add(str(t)) + return result + if isinstance(schema_type, str): + return {schema_type} + return None + + def _recurse_children(s: JsonSchema) -> JsonSchema: t = s.get('type') if t == 'object': @@ -292,14 +435,3 @@ def _recurse_flatten_allof(schema: JsonSchema) -> JsonSchema: s = _flatten_current_level(s) s = _recurse_children(s) return s - - -def flatten_allof(schema: JsonSchema) -> JsonSchema: - """Flatten simple object-only allOf combinations by merging object members. - - - Merges properties and unions required lists. - - Combines additionalProperties conservatively: only False if all are False; otherwise True. - - Recurses into nested object/array members. - - Leaves non-object allOfs untouched. - """ - return _recurse_flatten_allof(schema) diff --git a/tests/models/test_openai.py b/tests/models/test_openai.py index 7ed78c89a0..dc5328bfd4 100644 --- a/tests/models/test_openai.py +++ b/tests/models/test_openai.py @@ -1936,14 +1936,22 @@ def test_openai_transformer_with_recursive_ref() -> None: # The transformer should resolve the $ref and use the transformed schema from $defs # (not the original self.defs, which was the bug we fixed) - assert isinstance(result, dict) - # In strict mode, all properties should be required - assert 'properties' in result - assert 'required' in result - # The transformed schema should have strict mode applied (additionalProperties: False) - assert result.get('additionalProperties') is False - # All properties should be in required list (strict mode requirement) - assert 'foo' in result['required'] + assert result == snapshot( + { + '$defs': { + 'MyModel': { + 'type': 'object', + 'properties': {'foo': {'type': 'string'}}, + 'required': ['foo'], + 'additionalProperties': False, + } + }, + 'type': 'object', + 'properties': {'foo': {'type': 'string'}}, + 'required': ['foo'], + 'additionalProperties': False, + } + ) def test_openai_transformer_fallback_when_defs_missing() -> None: @@ -1951,12 +1959,46 @@ def test_openai_transformer_fallback_when_defs_missing() -> None: This tests the safety net fallback that shouldn't happen in normal flow. The fallback uses self.defs (original schema) when the transformed $defs - doesn't contain the root_key. This edge case is simulated using a mock. - """ - from unittest.mock import patch + doesn't contain the root_key. + We test this by creating a custom transformer subclass that overrides the base + class walk() to return a result without the root_key, allowing us to test the + actual fallback code path in OpenAIJsonSchemaTransformer.walk(). + """ + from pydantic_ai._json_schema import JsonSchema, JsonSchemaTransformer from pydantic_ai.profiles.openai import OpenAIJsonSchemaTransformer + # Create a custom transformer that overrides walk() to intercept super().walk() + # and manipulate the result, then call the actual OpenAIJsonSchemaTransformer.walk() + # logic which will hit the fallback path + class TestTransformer(OpenAIJsonSchemaTransformer): + def __init__(self, schema: dict[str, Any], *, strict: bool | None = None): + JsonSchemaTransformer.__init__(self, schema, strict=strict, flatten_allof=True) + self.root_ref = schema.get('$ref') + + def walk(self) -> JsonSchema: + # Store the original base class walk method + base_walk = JsonSchemaTransformer.walk + + # Create a wrapper that manipulates the result to simulate the edge case + def wrapped_walk(self_instance: TestTransformer) -> JsonSchema: + result = base_walk(self_instance) + # Remove root_key from $defs to simulate the edge case + if '$defs' in result and 'MyModel' in result.get('$defs', {}): + result['$defs'].pop('MyModel') + return result + + # Temporarily replace the base class walk method for this instance + JsonSchemaTransformer.walk = wrapped_walk # type: ignore[assignment] + try: + # Now call the actual OpenAIJsonSchemaTransformer.walk() method + # which will call super().walk() (our wrapped version) and then + # hit the fallback path at lines 164-165 + return super().walk() + finally: + # Restore the original method + JsonSchemaTransformer.walk = base_walk + schema: dict[str, Any] = { '$ref': '#/$defs/MyModel', '$defs': { @@ -1968,19 +2010,17 @@ def test_openai_transformer_fallback_when_defs_missing() -> None: }, } - transformer = OpenAIJsonSchemaTransformer(schema, strict=True) + transformer = TestTransformer(schema, strict=True) + # Call walk() which will execute the actual OpenAIJsonSchemaTransformer.walk() logic + # and hit the fallback path at line 164-165 + result = transformer.walk() - # Simulate edge case: super().walk() returns $defs without root_key - # This shouldn't happen in normal flow, but we test the fallback path - with patch.object( - transformer.__class__.__bases__[0], - 'walk', - return_value={'$defs': {'OtherModel': {'type': 'object'}}}, - ): - result = transformer.walk() - # Fallback should use self.defs.get(root_key) which contains MyModel - assert isinstance(result, dict) - assert 'properties' in result or 'type' in result + # Verify the fallback worked: result should have MyModel's properties from self.defs + # Note: The fallback uses the original, untransformed schema, so it won't have + # additionalProperties: False applied (that transformation happens in transform()) + assert result == snapshot( + {'$defs': {}, 'type': 'object', 'properties': {'foo': {'type': 'string'}}, 'required': ['foo']} + ) def test_openai_transformer_flattens_allof() -> None: diff --git a/tests/test_json_schema_flattener.py b/tests/test_json_schema_flattener.py index 8d40a2d93d..8404f6817b 100644 --- a/tests/test_json_schema_flattener.py +++ b/tests/test_json_schema_flattener.py @@ -17,7 +17,8 @@ def transform(self, schema: JsonSchema) -> JsonSchema: return schema -def test_flatten_allof_simple_merge() -> None: +def test_allof_strict_each_member_results_in_empty() -> None: + """AllOf with members that each have additionalProperties: False and disjoint properties → empty schema.""" schema: dict[str, Any] = { 'type': 'object', 'allOf': [ @@ -42,17 +43,13 @@ def test_flatten_allof_simple_merge() -> None: assert flattened == snapshot( { 'type': 'object', - 'properties': { - 'a': {'type': 'string'}, - 'b': {'type': 'integer'}, - }, - 'required': ['a', 'b'], 'additionalProperties': False, } ) def test_flatten_allof_nested_objects_and_pass_through_keywords() -> None: + """Test that nested objects and pass-through keywords are preserved.""" schema: dict[str, Any] = { 'type': 'object', 'title': 'Root', @@ -99,6 +96,7 @@ def test_flatten_allof_nested_objects_and_pass_through_keywords() -> None: def test_flatten_allof_does_not_touch_unrelated_unions() -> None: + """Test that unrelated unions are not touched.""" schema: dict[str, Any] = { 'type': 'object', 'properties': { @@ -131,7 +129,8 @@ def test_flatten_allof_does_not_touch_unrelated_unions() -> None: ) -def test_flatten_allof_non_object_members_are_left_as_is() -> None: +def test_allof_with_primitive_types_is_unsupported() -> None: + """AllOf with primitive types cannot be merged and is not supported by OpenAI strict mode.""" schema: dict[str, Any] = { 'type': 'object', 'allOf': [ @@ -143,7 +142,6 @@ def test_flatten_allof_non_object_members_are_left_as_is() -> None: transformer = FlattenAllofTransformer(schema) flattened = transformer.walk() - # Expect: we cannot sensibly merge non-object members; keep allOf assert flattened == snapshot( { 'type': 'object', @@ -209,14 +207,42 @@ def test_flatten_allof_with_dict_additional_properties() -> None: transformer = FlattenAllofTransformer(schema) flattened = transformer.walk() + assert flattened == snapshot( + { + 'type': 'object', + 'additionalProperties': False, + } + ) + + +def test_flatten_allof_with_dict_additional_properties_both_strings() -> None: + """Test merging when additionalProperties conflicts: dict schema vs False, both properties are strings.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'additionalProperties': {'type': 'string'}, # dict schema + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + 'additionalProperties': False, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + assert flattened == snapshot( { 'type': 'object', 'properties': { - 'a': {'type': 'string'}, - 'b': {'type': 'integer'}, + 'b': {'type': 'string'}, }, - 'additionalProperties': True, # When any member has dict additionalProperties, result should be True + 'additionalProperties': False, } ) @@ -280,16 +306,17 @@ def test_flatten_allof_members_without_properties() -> None: 'type': 'object', 'properties': {'a': {'type': 'string'}}, 'required': ['a'], + 'additionalProperties': False, }, { 'type': 'object', - # No properties, required, or patternProperties - 'additionalProperties': False, + 'properties': {'b': {'type': 'integer'}}, + # No required, no additionalProperties }, { 'type': 'object', - 'properties': {'b': {'type': 'integer'}}, - # No required + 'properties': {'c': {'type': 'boolean'}}, + # No required, no additionalProperties }, ], } @@ -300,12 +327,9 @@ def test_flatten_allof_members_without_properties() -> None: assert flattened == snapshot( { 'type': 'object', - 'properties': { - 'a': {'type': 'string'}, - 'b': {'type': 'integer'}, - }, + 'properties': {'a': {'type': 'string'}}, 'required': ['a'], # Only from first member - 'additionalProperties': False, + 'additionalProperties': False, # From first member } ) @@ -399,3 +423,384 @@ def test_flatten_allof_with_pattern_properties() -> None: }, } ) + + +def test_no_allof_preserves_schema() -> None: + """Sanity check: schema without allOf remains unchanged.""" + schema: dict[str, Any] = { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'required': ['a'], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'required': ['a'], + } + ) + + +def test_single_allof_flattens_schema() -> None: + """A single element in allOf is simply flattened.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'required': ['a'], + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'required': ['a'], + } + ) + + +def test_disjoint_allof_merges_properties() -> None: + """Disjoint properties are merged correctly.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'required': ['a'], + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'integer'}}, + 'required': ['b'], + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + assert flattened == snapshot( + { + 'type': 'object', + 'properties': { + 'a': {'type': 'string'}, + 'b': {'type': 'integer'}, + }, + 'required': ['a', 'b'], + } + ) + + +def test_tool_friendly_strict_merge() -> None: + """Tool-friendly case for OpenAI with additionalProperties: False.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'required': ['a'], + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'integer'}}, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # Nota: attualmente quando un membro ha additionalProperties: False, + # le proprietà degli altri membri vengono filtrate se non sono compatibili + assert flattened == snapshot( + {'type': 'object', 'properties': {'a': {'type': 'string'}, 'b': {'type': 'integer'}}, 'required': ['a']} + ) + + +def test_nested_allof_collapses_recursively() -> None: + """Nested allOf are flattened to all levels.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'integer'}}, + }, + ], + }, + { + 'type': 'object', + 'properties': {'c': {'type': 'boolean'}}, + 'required': ['c'], + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + assert flattened == snapshot( + { + 'type': 'object', + 'properties': { + 'a': {'type': 'string'}, + 'b': {'type': 'integer'}, + 'c': {'type': 'boolean'}, + }, + 'required': ['c'], + } + ) + + +def test_additional_properties_schema_vs_false_string() -> None: + """additionalProperties schema vs False: caso soddisfacibile (b è string).""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'additionalProperties': {'type': 'string'}, + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + 'additionalProperties': False, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # b è string, quindi può essere incluso + assert flattened == snapshot( + { + 'type': 'object', + 'properties': { + 'b': {'type': 'string'}, + }, + 'additionalProperties': False, # {} o {"b": "..."} + } + ) + + +def test_additional_properties_schema_vs_false_integer() -> None: + """additionalProperties schema vs False: b integer → dovrebbe restare solo {}.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'additionalProperties': {'type': 'string'}, + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'integer'}}, + 'additionalProperties': False, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + assert flattened == snapshot( + { + 'type': 'object', + 'additionalProperties': False, # solo {} + } + ) + + +def test_required_but_no_properties_is_unsatisfiable() -> None: + """additionalProperties: False without properties but with required results in an empty schema.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'required': ['a'], + }, + { + 'type': 'object', + 'additionalProperties': False, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # Nessun oggetto può avere 'a' e contemporaneamente nessuna proprietà + assert flattened == snapshot( + { + 'type': 'object', + 'additionalProperties': False, + } + ) + + +def test_additional_properties_each_only_conflict() -> None: + """Two mutually exclusive schemas (only a vs only b) → empty result.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'required': ['a'], + 'additionalProperties': False, + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'integer'}}, + 'required': ['b'], + 'additionalProperties': False, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + assert flattened == snapshot( + { + 'type': 'object', + 'additionalProperties': False, + } + ) + + +def test_conflicting_property_types_last_definition_wins() -> None: + """Conflicting types: currently the last value wins (current behavior).""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + }, + { + 'type': 'object', + 'properties': {'a': {'type': 'integer'}}, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + assert flattened == snapshot({'type': 'object'}) + + +def test_flatten_allof_with_anyof_commands() -> None: + """Realistic schema with nested allOf + anyOf (Notion-like commands).""" + schema: dict[str, Any] = { + 'type': 'object', + 'properties': { + 'data': { + 'allOf': [ + { + 'type': 'object', + 'properties': {'page_id': {'type': 'string'}}, + 'required': ['page_id'], + }, + { + 'anyOf': [ + { + 'type': 'object', + 'properties': { + 'command': {'type': 'string', 'enum': ['update_properties']}, + 'properties': { + 'type': 'object', + 'additionalProperties': { + 'type': ['string', 'number', 'null'], + }, + }, + }, + 'required': ['command', 'properties'], + 'additionalProperties': False, + }, + { + 'type': 'object', + 'properties': { + 'command': {'type': 'string', 'enum': ['replace_content']}, + 'new_str': {'type': 'string'}, + }, + 'required': ['command', 'new_str'], + 'additionalProperties': False, + }, + ], + }, + ] + } + }, + 'required': ['data'], + 'additionalProperties': False, + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + assert flattened == snapshot( + { + 'type': 'object', + 'properties': { + 'data': { + 'allOf': [ + {'type': 'object', 'properties': {'page_id': {'type': 'string'}}, 'required': ['page_id']}, + { + 'anyOf': [ + { + 'type': 'object', + 'properties': { + 'command': {'type': 'string', 'enum': ['update_properties']}, + 'properties': { + 'type': 'object', + 'additionalProperties': {'type': ['string', 'number', 'null']}, + }, + }, + 'required': ['command', 'properties'], + 'additionalProperties': False, + }, + { + 'type': 'object', + 'properties': { + 'command': {'type': 'string', 'enum': ['replace_content']}, + 'new_str': {'type': 'string'}, + }, + 'required': ['command', 'new_str'], + 'additionalProperties': False, + }, + ] + }, + ] + } + }, + 'required': ['data'], + 'additionalProperties': False, + } + ) From 6b4717995dedaa494950073837f31565fc519adb Mon Sep 17 00:00:00 2001 From: Fabio Pulvirenti Date: Wed, 19 Nov 2025 15:57:59 +0100 Subject: [PATCH 11/12] Improved test coverage --- tests/models/test_openai.py | 42 ++++ tests/test_json_schema_flattener.py | 326 ++++++++++++++++++++++++++++ 2 files changed, 368 insertions(+) diff --git a/tests/models/test_openai.py b/tests/models/test_openai.py index dc5328bfd4..ec9b2e3c95 100644 --- a/tests/models/test_openai.py +++ b/tests/models/test_openai.py @@ -2023,6 +2023,48 @@ def wrapped_walk(self_instance: TestTransformer) -> JsonSchema: ) +def test_openai_transformer_fallback_with_prefer_inlined_defs() -> None: + """Test fallback path when prefer_inlined_defs=True causes root_key to be missing from result['$defs']. + + When prefer_inlined_defs=True, only recursive refs are kept in $defs. + For non-recursive models, the root_key won't be in result['$defs'], triggering the fallback. + """ + from pydantic_ai._json_schema import JsonSchemaTransformer + from pydantic_ai.profiles.openai import OpenAIJsonSchemaTransformer + + class TestTransformer(OpenAIJsonSchemaTransformer): + def __init__(self, schema: dict[str, Any], *, strict: bool | None = None): + # Set prefer_inlined_defs=True to test fallback + JsonSchemaTransformer.__init__(self, schema, strict=strict, flatten_allof=True, prefer_inlined_defs=True) + self.root_ref = schema.get('$ref') + + schema: dict[str, Any] = { + '$ref': '#/$defs/MyModel', + '$defs': { + 'MyModel': { + 'type': 'object', + 'properties': {'foo': {'type': 'string'}}, + 'required': ['foo'], + }, + }, + } + + transformer = TestTransformer(schema, strict=True) + result = transformer.walk() + + # When prefer_inlined_defs=True and model is not recursive, root_key won't be in result['$defs'] + # So the fallback uses self.defs (original, untransformed schema) + # Note: The fallback uses the original schema, so it won't have additionalProperties: False + assert result == snapshot( + { + 'type': 'object', + 'properties': {'foo': {'type': 'string'}}, + 'required': ['foo'], + 'additionalProperties': False, + } + ) + + def test_openai_transformer_flattens_allof() -> None: """Test that OpenAIJsonSchemaTransformer flattens allOf schemas.""" from pydantic_ai._json_schema import JsonSchema diff --git a/tests/test_json_schema_flattener.py b/tests/test_json_schema_flattener.py index 8404f6817b..48da1988e2 100644 --- a/tests/test_json_schema_flattener.py +++ b/tests/test_json_schema_flattener.py @@ -804,3 +804,329 @@ def test_flatten_allof_with_anyof_commands() -> None: 'additionalProperties': False, } ) + + +def test_merge_additional_properties_multiple_dict_schemas() -> None: + """Test merging when all additionalProperties are dict schemas (no False).""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'additionalProperties': {'type': 'string'}, + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + 'additionalProperties': {'type': 'number'}, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # Multiple dict schemas can't be easily merged, so return True + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + 'additionalProperties': True, + } + ) + + +def test_filter_by_restricted_property_sets_removes_properties() -> None: + """Test that restricted property sets filter out properties not in intersection.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}, 'b': {'type': 'string'}}, + 'additionalProperties': False, + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}, 'c': {'type': 'string'}}, + 'additionalProperties': False, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # Only 'b' is in both restricted sets + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + 'additionalProperties': False, + } + ) + + +def test_filter_incompatible_properties_with_false_additional() -> None: + """Test filtering properties when a member has additionalProperties: False.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'additionalProperties': {'type': 'string'}, + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'integer'}}, + 'additionalProperties': False, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # 'a' is not in second member's properties and second has additionalProperties: False + # 'b' is integer, not compatible with first member's additionalProperties: {'type': 'string'} + # Result should be empty + assert flattened == snapshot( + { + 'type': 'object', + 'additionalProperties': False, + } + ) + + +def test_filter_incompatible_properties_removes_all_properties() -> None: + """Test that filtering incompatible properties can remove all properties.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'required': ['a'], + 'additionalProperties': {'type': 'string'}, + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'integer'}}, + 'required': ['b'], + 'additionalProperties': False, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # All properties are incompatible, so both properties and required should be removed + assert flattened == snapshot( + { + 'type': 'object', + 'additionalProperties': False, + } + ) + + +def test_merge_additional_properties_true_values() -> None: + """Test merging when additionalProperties are True values (not False, not dict) - covers line 218.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'additionalProperties': True, # Explicitly set to True + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + 'additionalProperties': True, # Explicitly set to True + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # When all values are True (not False, not dict), line 218 returns True + assert flattened == snapshot( + { + 'type': 'object', + 'properties': { + 'a': {'type': 'string'}, + 'b': {'type': 'string'}, + }, + 'additionalProperties': True, + } + ) + + +def test_filter_by_restricted_property_sets_no_required() -> None: + """Test filtering when properties exist but required doesn't.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}, 'b': {'type': 'string'}}, + 'additionalProperties': False, + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}, 'c': {'type': 'string'}}, + 'additionalProperties': False, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # Only 'b' is in both restricted sets, no required field + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + 'additionalProperties': False, + } + ) + + +def test_filter_incompatible_properties_removes_required_only() -> None: + """Test that filtering incompatible properties can remove required while keeping some properties.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}, 'b': {'type': 'string'}}, + 'required': ['a', 'b'], + 'additionalProperties': {'type': 'string'}, + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + 'required': ['b'], + 'additionalProperties': False, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # 'a' is incompatible (not in second member's properties, second has additionalProperties: False) + # 'b' is compatible (in both, both are strings) + # So 'a' should be removed from both properties and required + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + 'required': ['b'], + 'additionalProperties': False, + } + ) + + +def test_filter_incompatible_properties_removes_required_to_empty() -> None: + """Test that filtering incompatible properties can remove all required fields while keeping properties.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}, 'b': {'type': 'string'}}, + 'required': ['a'], + 'additionalProperties': {'type': 'string'}, + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + # No required field + 'additionalProperties': False, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # 'a' is incompatible (not in second member's properties, second has additionalProperties: False) + # 'b' is compatible (in both, both are strings) + # So 'a' should be removed from properties, and required should become empty and be removed + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + 'additionalProperties': False, + } + ) + + +def test_filter_incompatible_properties_with_list_type() -> None: + """Test filtering properties when additionalProperties has list type (covers _get_type_set with list).""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'additionalProperties': {'type': ['string', 'number']}, + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'boolean'}}, + 'additionalProperties': False, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # 'a' is string, compatible with ['string', 'number'] + # 'b' is boolean, not compatible with ['string', 'number'], and second has additionalProperties: False + assert flattened == snapshot( + { + 'type': 'object', + 'additionalProperties': False, + } + ) + + +def test_filter_incompatible_properties_with_no_type_in_additional() -> None: + """Test filtering when additionalProperties schema has no type field (covers _get_type_set with no type).""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'additionalProperties': {'properties': {'x': {'type': 'string'}}}, # No type field + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + 'additionalProperties': False, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # When additionalProperties has no type, _get_type_set returns None, so type check passes + # But 'a' is not in second member's properties and second has additionalProperties: False + # 'b' is not in first member's properties and first's additionalProperties has no type (None) + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + 'additionalProperties': False, + } + ) From 6114437f4ae036619d372d44766262c3905308e9 Mon Sep 17 00:00:00 2001 From: Fabio Pulvirenti Date: Wed, 19 Nov 2025 23:17:42 +0100 Subject: [PATCH 12/12] Updated flatten logic, simplified and added tests for 100% coverage --- pydantic_ai_slim/pydantic_ai/_json_schema.py | 368 ++--- tests/models/test_openai.py | 69 - tests/test_json_schema_flattener.py | 1340 +++++++++++++++++- 3 files changed, 1534 insertions(+), 243 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/_json_schema.py b/pydantic_ai_slim/pydantic_ai/_json_schema.py index e2ff7ca49a..ec17c756c9 100644 --- a/pydantic_ai_slim/pydantic_ai/_json_schema.py +++ b/pydantic_ai_slim/pydantic_ai/_json_schema.py @@ -199,55 +199,91 @@ def transform(self, schema: JsonSchema) -> JsonSchema: return schema -def _allof_is_object_like(member: JsonSchema) -> bool: - member_type = member.get('type') - if member_type is None: - keys = ('properties', 'additionalProperties', 'patternProperties') - return bool(any(k in member for k in keys)) - return member_type == 'object' +def _get_type_set(schema: JsonSchema) -> set[str] | None: + """Extract type(s) from a schema as a set of strings.""" + schema_type = schema.get('type') + if isinstance(schema_type, list): + return {str(t) for t in cast(list[Any], schema_type)} + if isinstance(schema_type, str): + return {schema_type} + return None -def _merge_additional_properties_values(values: list[Any]) -> bool | JsonSchema: - # If any value is False, return False (most restrictive) - if any(v is False for v in values): - return False - # If any value is a dict schema, we can't easily merge multiple schemas, so return True - if any(isinstance(v, dict) for v in values): - return True - # Default to True (allow additional properties) - return True +def _process_nested_schemas_without_allof(s: JsonSchema) -> JsonSchema: + """Process nested schemas recursively when there is no allOf at the current level.""" + schema_type = s.get('type') + if schema_type == 'object': + if isinstance(s.get('properties'), dict): + s['properties'] = { + k: _recurse_flatten_allof(cast(JsonSchema, v)) + for k, v in s['properties'].items() + if isinstance(v, dict) + } + if isinstance(s.get('additionalProperties'), dict): + s['additionalProperties'] = _recurse_flatten_allof(cast(JsonSchema, s['additionalProperties'])) + if isinstance(s.get('patternProperties'), dict): + s['patternProperties'] = { + k: _recurse_flatten_allof(cast(JsonSchema, v)) + for k, v in s['patternProperties'].items() + if isinstance(v, dict) + } + elif schema_type == 'array': + if isinstance(s.get('items'), dict): + s['items'] = _recurse_flatten_allof(cast(JsonSchema, s['items'])) + return s -def _collect_member_data( - processed_members: list[JsonSchema], -) -> tuple[ - dict[str, JsonSchema], - set[str], - dict[str, JsonSchema], - list[Any], - list[set[str]], - list[dict[str, JsonSchema]], - list[Any], -]: - """Collect properties, required, patternProperties, and additionalProperties from all members.""" +def _collect_base_schema_data( + result: JsonSchema, +) -> tuple[dict[str, JsonSchema], set[str], dict[str, JsonSchema], list[Any], list[set[str]]]: + """Collect data from base schema: properties, required, patternProperties, additionalProperties.""" properties: dict[str, JsonSchema] = {} required: set[str] = set() pattern_properties: dict[str, JsonSchema] = {} additional_values: list[Any] = [] restricted_property_sets: list[set[str]] = [] - members_properties: list[dict[str, JsonSchema]] = [] - members_additional_props: list[Any] = [] + base_properties = ( + cast(dict[str, JsonSchema], result.get('properties', {})) if isinstance(result.get('properties'), dict) else {} + ) + base_additional = result.get('additionalProperties') + + if base_properties: + properties.update(base_properties) + if isinstance(result.get('required'), list): + required.update(result['required']) + if isinstance(result.get('patternProperties'), dict): + pattern_properties.update(result['patternProperties']) + if base_additional is False: + additional_values.append(False) + # Only restrict if base schema has properties; if base has no properties but additionalProperties: False, + # it means no additional properties are allowed, but properties from allOf members are still valid + if base_properties: + restricted_property_sets.append(set(base_properties.keys())) + + return properties, required, pattern_properties, additional_values, restricted_property_sets + + +def _collect_member_data( + processed_members: list[JsonSchema], + properties: dict[str, JsonSchema], + required: set[str], + pattern_properties: dict[str, JsonSchema], + additional_values: list[Any], + restricted_property_sets: list[set[str]], + members_properties: list[dict[str, JsonSchema]], + members_additional_props: list[Any], +) -> None: + """Collect data from allOf members and update the collections.""" for m in processed_members: - member_properties_raw = m.get('properties') - member_properties: dict[str, JsonSchema] = ( - cast(dict[str, JsonSchema], member_properties_raw) if isinstance(member_properties_raw, dict) else {} + member_props = ( + cast(dict[str, JsonSchema], m.get('properties', {})) if isinstance(m.get('properties'), dict) else {} ) - members_properties.append(member_properties) + members_properties.append(member_props) members_additional_props.append(m.get('additionalProperties')) - if member_properties: - properties.update(member_properties) + if member_props: + properties.update(member_props) if isinstance(m.get('required'), list): required.update(m['required']) if isinstance(m.get('patternProperties'), dict): @@ -255,182 +291,180 @@ def _collect_member_data( if 'additionalProperties' in m: additional_values.append(m['additionalProperties']) if m['additionalProperties'] is False: - restricted_property_sets.append(set(member_properties.keys())) - - return ( - properties, - required, - pattern_properties, - additional_values, - restricted_property_sets, - members_properties, - members_additional_props, - ) + restricted_property_sets.append(set(member_props.keys())) -def _filter_by_restricted_property_sets(merged: JsonSchema, restricted_property_sets: list[set[str]]) -> None: - """Filter properties to only those allowed by all members with additionalProperties: False.""" +def _filter_by_restricted_property_sets( + properties: dict[str, JsonSchema], required: set[str], restricted_property_sets: list[set[str]] +) -> tuple[dict[str, JsonSchema], set[str]]: + """Filter properties and required by restricted property sets (intersection when some/all have additionalProperties: False).""" if not restricted_property_sets: - return + return properties, required + # Intersection of allowed properties from all members with additionalProperties: False allowed_names = restricted_property_sets[0].copy() for prop_set in restricted_property_sets[1:]: allowed_names &= prop_set + # Filter properties to only include allowed names + if allowed_names: + properties = {k: v for k, v in properties.items() if k in allowed_names} + required = {r for r in required if r in allowed_names} + else: + # Empty intersection - remove all properties + properties = {} + required = set() - if 'properties' in merged: - merged['properties'] = {k: v for k, v in merged['properties'].items() if k in allowed_names} - if not merged['properties']: - merged.pop('properties') - if 'required' in merged: - merged['required'] = [k for k in merged['required'] if k in allowed_names] - if not merged['required']: - merged.pop('required') + return properties, required def _filter_incompatible_properties( - merged: JsonSchema, + properties: dict[str, JsonSchema], + required: set[str], members_properties: list[dict[str, JsonSchema]], members_additional_props: list[Any], -) -> None: - """Filter properties that are incompatible with additionalProperties constraints.""" - if 'properties' not in merged: - return +) -> tuple[dict[str, JsonSchema], set[str]]: + """Filter incompatible properties based on additionalProperties constraints.""" + if not properties: + return properties, required incompatible_props: set[str] = set() - for prop_name, prop_schema in merged['properties'].items(): + + for prop_name, prop_schema in properties.items(): prop_types = _get_type_set(prop_schema) + + # Check compatibility with each member (including base) for member_props, member_additional in zip(members_properties, members_additional_props): if prop_name in member_props: + # Property explicitly defined - check type compatibility member_prop_types = _get_type_set(member_props[prop_name]) if prop_types and member_prop_types and not prop_types & member_prop_types: incompatible_props.add(prop_name) break - continue - - if member_additional is False: - incompatible_props.add(prop_name) - break - + continue # Compatible, check next member if isinstance(member_additional, dict): allowed_types = _get_type_set(cast(JsonSchema, member_additional)) - if prop_types and allowed_types and not prop_types <= allowed_types: + # Property type must be a subset of allowed types + if prop_types and allowed_types and not (prop_types <= allowed_types): incompatible_props.add(prop_name) break if incompatible_props: - merged['properties'] = {k: v for k, v in merged['properties'].items() if k not in incompatible_props} - if not merged['properties']: - merged.pop('properties') - if 'required' in merged: - merged['required'] = [k for k in merged['required'] if k not in incompatible_props] - if not merged['required']: - merged.pop('required') - - -def _flatten_current_level(s: JsonSchema) -> JsonSchema: - raw_members = s.get('allOf') - if not isinstance(raw_members, list) or not raw_members: + allowed_names = {k for k in properties.keys() if k not in incompatible_props} + properties = {k: v for k, v in properties.items() if k in allowed_names} + required = {r for r in required if r in allowed_names} + + return properties, required + + +def _process_result_nested_schemas(result: JsonSchema) -> None: + """Recursively process nested schemas in the result (additionalProperties, patternProperties, items).""" + if isinstance(result.get('additionalProperties'), dict): + result['additionalProperties'] = _recurse_flatten_allof(cast(JsonSchema, result['additionalProperties'])) + if isinstance(result.get('patternProperties'), dict): + result['patternProperties'] = { + k: _recurse_flatten_allof(cast(JsonSchema, v)) + for k, v in result['patternProperties'].items() + if isinstance(v, dict) + } + if isinstance(result.get('items'), dict): + result['items'] = _recurse_flatten_allof(cast(JsonSchema, result['items'])) + + +def _recurse_flatten_allof(schema: JsonSchema) -> JsonSchema: + """Recursively flatten allOf in a JSON schema. + + This function: + 1. Makes a deep copy of the schema + 2. Flattens allOf at the current level + 3. Recursively processes nested schemas (properties, items, etc.) + """ + s = deepcopy(schema) + + # Case 1: No allOf - process nested schemas recursively and return + allof = s.get('allOf') + if not isinstance(allof, list) or not allof: + return _process_nested_schemas_without_allof(s) + + # Check all members are dicts + members = cast(list[JsonSchema], allof) + if not all(isinstance(m, dict) for m in members): return s - members = cast(list[JsonSchema], raw_members) - for raw in members: - if not isinstance(raw, dict): - return s - if not all(_allof_is_object_like(member) for member in members): + # Check all members are object-like (can be merged) + def _is_object_like(member: JsonSchema) -> bool: + member_type = member.get('type') + if member_type is None: + # No type but has object-like keys + keys = ('properties', 'additionalProperties', 'patternProperties') + return bool(any(k in member for k in keys)) + return isinstance(member_type, str) and member_type == 'object' + + if not all(_is_object_like(m) for m in members): return s - processed_members = [_recurse_flatten_allof(member) for member in members] - merged: JsonSchema = {k: v for k, v in s.items() if k != 'allOf'} - merged['type'] = 'object' + # Recursively flatten each member first + processed_members = [_recurse_flatten_allof(m) for m in members] + result: JsonSchema = {k: v for k, v in s.items() if k != 'allOf'} + result['type'] = 'object' - # Collect initial properties from merged schema - properties: dict[str, JsonSchema] = {} - if isinstance(merged.get('properties'), dict): - properties.update(merged['properties']) + # Collect data from base schema and members + base_properties = ( + cast(dict[str, JsonSchema], result.get('properties', {})) if isinstance(result.get('properties'), dict) else {} + ) + base_additional = result.get('additionalProperties') + + properties, required, pattern_properties, additional_values, restricted_property_sets = _collect_base_schema_data( + result + ) - required: set[str] = set(merged.get('required', []) or []) - pattern_properties: dict[str, JsonSchema] = dict(merged.get('patternProperties', {}) or {}) + # Then merge properties from all members + members_properties: list[dict[str, JsonSchema]] = [base_properties] + members_additional_props: list[Any] = [base_additional] - # Collect data from all members - ( - member_properties, - member_required, - member_pattern_properties, + _collect_member_data( + processed_members, + properties, + required, + pattern_properties, additional_values, restricted_property_sets, members_properties, members_additional_props, - ) = _collect_member_data(processed_members) + ) - # Merge all collected data - properties.update(member_properties) - required.update(member_required) - pattern_properties.update(member_pattern_properties) + # Filter by restricted property sets and incompatible properties + properties, required = _filter_by_restricted_property_sets(properties, required, restricted_property_sets) + properties, required = _filter_incompatible_properties( + properties, required, members_properties, members_additional_props + ) - # Apply merged properties, required, and patternProperties + # Apply filtered properties if properties: - merged['properties'] = {k: _recurse_flatten_allof(v) for k, v in properties.items()} + # Recursively flatten nested properties + result['properties'] = {k: _recurse_flatten_allof(v) for k, v in properties.items()} if required: - merged['required'] = sorted(required) + result['required'] = sorted(required) if pattern_properties: - merged['patternProperties'] = {k: _recurse_flatten_allof(v) for k, v in pattern_properties.items()} - - # Filter by restricted property sets (additionalProperties: False) - _filter_by_restricted_property_sets(merged, restricted_property_sets) + result['patternProperties'] = {k: _recurse_flatten_allof(v) for k, v in pattern_properties.items()} # Merge additionalProperties if additional_values: - merged['additionalProperties'] = _merge_additional_properties_values(additional_values) - - # Filter incompatible properties based on additionalProperties constraints - _filter_incompatible_properties(merged, members_properties, members_additional_props) - - return merged - - -def _get_type_set(schema: JsonSchema | None) -> set[str] | None: - if not schema: - return None - schema_type = schema.get('type') - if isinstance(schema_type, list): - result: set[str] = set() - type_list: list[Any] = cast(list[Any], schema_type) - for t in type_list: - result.add(str(t)) - return result - if isinstance(schema_type, str): - return {schema_type} - return None - - -def _recurse_children(s: JsonSchema) -> JsonSchema: - t = s.get('type') - if t == 'object': - if isinstance(s.get('properties'), dict): - s['properties'] = { - k: _recurse_flatten_allof(cast(JsonSchema, v)) - for k, v in s['properties'].items() - if isinstance(v, dict) - } - ap = s.get('additionalProperties') - if isinstance(ap, dict): - ap_schema = cast(JsonSchema, ap) - s['additionalProperties'] = _recurse_flatten_allof(ap_schema) - if isinstance(s.get('patternProperties'), dict): - s['patternProperties'] = { - k: _recurse_flatten_allof(cast(JsonSchema, v)) - for k, v in s['patternProperties'].items() - if isinstance(v, dict) - } - elif t == 'array': - items = s.get('items') - if isinstance(items, dict): - s['items'] = _recurse_flatten_allof(cast(JsonSchema, items)) - return s - - -def _recurse_flatten_allof(schema: JsonSchema) -> JsonSchema: - s = deepcopy(schema) - s = _flatten_current_level(s) - s = _recurse_children(s) - return s + # If any is False, result is False (most restrictive) + if any(v is False for v in additional_values): + result['additionalProperties'] = False + # If there's exactly one dict schema, preserve it + elif len(additional_values) == 1 and isinstance(additional_values[0], dict): + result['additionalProperties'] = additional_values[0] + # If any is a dict schema (multiple), result is True (can't merge multiple schemas) + elif any(isinstance(v, dict) for v in additional_values): + result['additionalProperties'] = True + # Otherwise, default to True + else: + result['additionalProperties'] = True + + # Recursively process nested schemas (additionalProperties, patternProperties) + # Note: items is only valid for array types, not object types, so result.get('items') should never + # be present when result['type'] == 'object'. However, we keep this check for robustness. + _process_result_nested_schemas(result) + + return result diff --git a/tests/models/test_openai.py b/tests/models/test_openai.py index ec9b2e3c95..e6dd977c80 100644 --- a/tests/models/test_openai.py +++ b/tests/models/test_openai.py @@ -1954,75 +1954,6 @@ def test_openai_transformer_with_recursive_ref() -> None: ) -def test_openai_transformer_fallback_when_defs_missing() -> None: - """Test fallback path when root_key is not in result['$defs'] (line 165). - - This tests the safety net fallback that shouldn't happen in normal flow. - The fallback uses self.defs (original schema) when the transformed $defs - doesn't contain the root_key. - - We test this by creating a custom transformer subclass that overrides the base - class walk() to return a result without the root_key, allowing us to test the - actual fallback code path in OpenAIJsonSchemaTransformer.walk(). - """ - from pydantic_ai._json_schema import JsonSchema, JsonSchemaTransformer - from pydantic_ai.profiles.openai import OpenAIJsonSchemaTransformer - - # Create a custom transformer that overrides walk() to intercept super().walk() - # and manipulate the result, then call the actual OpenAIJsonSchemaTransformer.walk() - # logic which will hit the fallback path - class TestTransformer(OpenAIJsonSchemaTransformer): - def __init__(self, schema: dict[str, Any], *, strict: bool | None = None): - JsonSchemaTransformer.__init__(self, schema, strict=strict, flatten_allof=True) - self.root_ref = schema.get('$ref') - - def walk(self) -> JsonSchema: - # Store the original base class walk method - base_walk = JsonSchemaTransformer.walk - - # Create a wrapper that manipulates the result to simulate the edge case - def wrapped_walk(self_instance: TestTransformer) -> JsonSchema: - result = base_walk(self_instance) - # Remove root_key from $defs to simulate the edge case - if '$defs' in result and 'MyModel' in result.get('$defs', {}): - result['$defs'].pop('MyModel') - return result - - # Temporarily replace the base class walk method for this instance - JsonSchemaTransformer.walk = wrapped_walk # type: ignore[assignment] - try: - # Now call the actual OpenAIJsonSchemaTransformer.walk() method - # which will call super().walk() (our wrapped version) and then - # hit the fallback path at lines 164-165 - return super().walk() - finally: - # Restore the original method - JsonSchemaTransformer.walk = base_walk - - schema: dict[str, Any] = { - '$ref': '#/$defs/MyModel', - '$defs': { - 'MyModel': { - 'type': 'object', - 'properties': {'foo': {'type': 'string'}}, - 'required': ['foo'], - }, - }, - } - - transformer = TestTransformer(schema, strict=True) - # Call walk() which will execute the actual OpenAIJsonSchemaTransformer.walk() logic - # and hit the fallback path at line 164-165 - result = transformer.walk() - - # Verify the fallback worked: result should have MyModel's properties from self.defs - # Note: The fallback uses the original, untransformed schema, so it won't have - # additionalProperties: False applied (that transformation happens in transform()) - assert result == snapshot( - {'$defs': {}, 'type': 'object', 'properties': {'foo': {'type': 'string'}}, 'required': ['foo']} - ) - - def test_openai_transformer_fallback_with_prefer_inlined_defs() -> None: """Test fallback path when prefer_inlined_defs=True causes root_key to be missing from result['$defs']. diff --git a/tests/test_json_schema_flattener.py b/tests/test_json_schema_flattener.py index 48da1988e2..0049854899 100644 --- a/tests/test_json_schema_flattener.py +++ b/tests/test_json_schema_flattener.py @@ -523,8 +523,8 @@ def test_tool_friendly_strict_merge() -> None: transformer = FlattenAllofTransformer(schema) flattened = transformer.walk() - # Nota: attualmente quando un membro ha additionalProperties: False, - # le proprietà degli altri membri vengono filtrate se non sono compatibili + # Note: currently when a member has additionalProperties: False, + # properties from other members are filtered if they are not compatible assert flattened == snapshot( {'type': 'object', 'properties': {'a': {'type': 'string'}, 'b': {'type': 'integer'}}, 'required': ['a']} ) @@ -573,7 +573,7 @@ def test_nested_allof_collapses_recursively() -> None: def test_additional_properties_schema_vs_false_string() -> None: - """additionalProperties schema vs False: caso soddisfacibile (b è string).""" + """Test additionalProperties schema vs False: satisfiable case (b is string).""" schema: dict[str, Any] = { 'type': 'object', 'allOf': [ @@ -593,7 +593,7 @@ def test_additional_properties_schema_vs_false_string() -> None: transformer = FlattenAllofTransformer(schema) flattened = transformer.walk() - # b è string, quindi può essere incluso + # b is string, so it can be included assert flattened == snapshot( { 'type': 'object', @@ -606,7 +606,7 @@ def test_additional_properties_schema_vs_false_string() -> None: def test_additional_properties_schema_vs_false_integer() -> None: - """additionalProperties schema vs False: b integer → dovrebbe restare solo {}.""" + """Test additionalProperties schema vs False: b is integer → should result in empty object only.""" schema: dict[str, Any] = { 'type': 'object', 'allOf': [ @@ -654,7 +654,7 @@ def test_required_but_no_properties_is_unsatisfiable() -> None: transformer = FlattenAllofTransformer(schema) flattened = transformer.walk() - # Nessun oggetto può avere 'a' e contemporaneamente nessuna proprietà + # No object can have 'a' and simultaneously have no properties assert flattened == snapshot( { 'type': 'object', @@ -997,6 +997,72 @@ def test_filter_by_restricted_property_sets_no_required() -> None: ) +def test_filter_by_restricted_property_sets_removes_all_properties() -> None: + """Test that restricted property sets can remove all properties when intersection is empty.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'additionalProperties': False, + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + 'additionalProperties': False, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # No properties in intersection, so all properties are removed + assert flattened == snapshot( + { + 'type': 'object', + 'additionalProperties': False, + } + ) + + +def test_filter_by_restricted_property_sets_with_required() -> None: + """Test restricted property sets when both properties and required exist - covers branch 280->284.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}, 'b': {'type': 'string'}}, + 'required': ['a', 'b'], + 'additionalProperties': False, + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}, 'c': {'type': 'string'}}, + 'required': ['b', 'c'], + 'additionalProperties': False, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # Only 'b' is in both restricted sets, so 'a' and 'c' are filtered out + # 'b' remains in both properties and required + # This covers branch 280->284 where both 'properties' and 'required' exist and required is not empty + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + 'required': ['b'], + 'additionalProperties': False, + } + ) + + def test_filter_incompatible_properties_removes_required_only() -> None: """Test that filtering incompatible properties can remove required while keeping some properties.""" schema: dict[str, Any] = { @@ -1034,7 +1100,7 @@ def test_filter_incompatible_properties_removes_required_only() -> None: def test_filter_incompatible_properties_removes_required_to_empty() -> None: - """Test that filtering incompatible properties can remove all required fields while keeping properties.""" + """Test that filtering incompatible properties can remove all required fields while keeping properties - covers 326->exit.""" schema: dict[str, Any] = { 'type': 'object', 'allOf': [ @@ -1059,6 +1125,42 @@ def test_filter_incompatible_properties_removes_required_to_empty() -> None: # 'a' is incompatible (not in second member's properties, second has additionalProperties: False) # 'b' is compatible (in both, both are strings) # So 'a' should be removed from properties, and required should become empty and be removed + # This covers the exit branch 326->exit when required becomes empty + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + 'additionalProperties': False, + } + ) + + +def test_incompatible_property_filtered_when_member_has_false_additional() -> None: + """Test that incompatible property is filtered when member has additionalProperties: False.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'additionalProperties': {'type': 'string'}, # Allows strings as additional + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + 'additionalProperties': False, # 'a' is not in this member's properties, so it's incompatible + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # 'a' is in first member's properties, not in second member's properties + # When checking second member: 'a' is NOT in member_props, and member_additional is False + # This should trigger lines 311-312 (add to incompatible_props and break) + # 'b' is in second member, not in first member, but first allows strings as additional + # Result: 'a' should be removed, 'b' should remain assert flattened == snapshot( { 'type': 'object', @@ -1130,3 +1232,1227 @@ def test_filter_incompatible_properties_with_no_type_in_additional() -> None: 'additionalProperties': False, } ) + + +def test_get_type_set_with_empty_schema() -> None: + """Test _get_type_set with empty schema (covers line 393 guard clause).""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {}}, # Empty property schema - falsy, triggers guard clause + 'additionalProperties': {'type': 'string'}, + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + 'additionalProperties': False, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # Empty property schema triggers _get_type_set guard clause (line 393) + # 'a' has empty schema, so prop_types is None, type check passes + # But 'a' is not in second member's properties and second has additionalProperties: False + # 'b' is compatible + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + 'additionalProperties': False, + } + ) + + +def test_restricted_property_sets_preserves_properties_and_required() -> None: + """Test that restricted property sets filtering preserves both properties and required fields. + + Creates a situation where: + - merged has properties and required when _filter_by_restricted_property_sets is called + - After filtering, both still exist (not empty) + - This ensures the branch handling both properties and required is executed + """ + schema: dict[str, Any] = { + 'type': 'object', + 'properties': {'base': {'type': 'string'}}, # Initial properties in merged + 'required': ['base'], # Initial required in merged + 'allOf': [ + { + 'type': 'object', + 'properties': {'base': {'type': 'string'}, 'extra': {'type': 'string'}}, + 'required': ['base', 'extra'], + 'additionalProperties': False, # Creates restricted_property_sets + }, + { + 'type': 'object', + 'properties': {'base': {'type': 'string'}}, # Solo 'base' in comune + 'required': ['base'], + 'additionalProperties': False, # Creates restricted_property_sets + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # 'base' is in both restricted sets, so it survives + # When _filter_by_restricted_property_sets is called: + # - merged has 'properties' (line 280 True) + # - After filtering, 'base' remains (not empty) + # - merged still has 'required' (line 284 True) + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'base': {'type': 'string'}}, + 'required': ['base'], + 'additionalProperties': False, + } + ) + + +def test_incompatible_property_filtered_when_not_in_member_with_false_additional() -> None: + """Test that properties not in member properties are filtered when member has additionalProperties: False. + + Creates a situation where: + - A property exists in merged but NOT in member_props of a member + - That member has additionalProperties: False + - This triggers the filtering logic + """ + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}, 'b': {'type': 'string'}}, + 'additionalProperties': True, + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, # 'a' is NOT here + 'additionalProperties': False, # When checking 'a': + # - 'a' is NOT in member_props (line 303 False) + # - member_additional is False (line 310 True, we execute 311-312) + }, + { + 'type': 'object', + 'properties': {'c': {'type': 'string'}}, + 'additionalProperties': {'type': 'string'}, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # 'a' is removed (triggers 311-312) - not in second member, second has additionalProperties: False + # 'b' remains (is in second member) + # 'c' is removed (triggers 311-312) - not in second member, second has additionalProperties: False + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + 'additionalProperties': False, + } + ) + + +def test_required_removed_when_all_required_fields_incompatible() -> None: + """Test that required field is removed when all required fields become incompatible. + + Creates a situation where: + - required exists initially + - All required fields are removed as incompatible + - required becomes empty and is removed (lines 326-327) + """ + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'req1': {'type': 'string'}, 'req2': {'type': 'string'}}, + 'required': ['req1', 'req2'], # Both required + 'additionalProperties': True, + }, + { + 'type': 'object', + 'properties': {'other': {'type': 'string'}}, # None of the required fields are here + 'additionalProperties': False, # Both are removed (311-312) + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # 'req1' and 'req2' are removed (incompatible, triggers 311-312) + # required becomes empty and is removed (triggers 326->exit) + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'other': {'type': 'string'}}, + 'additionalProperties': False, + } + ) + + +def test_empty_property_schema_triggers_type_set_guard_clause() -> None: + """Test that empty property schema triggers _get_type_set guard clause. + + Creates a situation where: + - A property has empty schema {} + - _get_type_set is called with empty schema + - This triggers the guard clause + """ + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': { + 'empty_schema': {}, # Empty schema - triggers guard clause + 'normal': {'type': 'string'}, + }, + 'additionalProperties': True, + }, + { + 'type': 'object', + 'properties': {'normal': {'type': 'string'}}, + 'additionalProperties': False, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # 'empty_schema' has empty schema, so _get_type_set({}) returns None (guard clause) + # 'empty_schema' is not in second member, so it is removed + # 'normal' remains + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'normal': {'type': 'string'}}, + 'additionalProperties': False, + } + ) + + +def test_combined_filtering_scenarios_all_branches() -> None: + """Test that combines all filtering conditions to exercise all branches.""" + schema: dict[str, Any] = { + 'type': 'object', + 'properties': {'common': {'type': 'string'}}, # For restricted property sets + 'required': ['common'], # For restricted property sets + 'allOf': [ + { + 'type': 'object', + 'properties': { + 'common': {'type': 'string'}, # Survives (restricted property sets) + 'removed_by_false': {'type': 'string'}, # For incompatible property filtering + 'removed_required': {'type': 'string'}, # For required removal + 'empty_schema': {}, # For empty schema guard clause + }, + 'required': ['common', 'removed_required'], # 'removed_required' for required removal + 'additionalProperties': True, + }, + { + 'type': 'object', + 'properties': {'common': {'type': 'string'}}, # Only 'common' in common + 'required': ['common'], + 'additionalProperties': False, # Creates restricted_property_sets (280->284) + # 'removed_by_false' is NOT here -> triggers 311-312 + # 'removed_required' is NOT here -> triggers 311-312, then 326->exit + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # 'common' survives (restricted property sets) + # 'removed_by_false' is removed (incompatible property filtering) + # 'removed_required' is removed (incompatible property filtering), required becomes empty (required removal) + # 'empty_schema' is removed (empty schema guard clause) + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'common': {'type': 'string'}}, + 'required': ['common'], + 'additionalProperties': False, + } + ) + + +def test_empty_schema_returns_unchanged() -> None: + """Test that empty schema returns unchanged.""" + schema: dict[str, Any] = {} + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # Empty schema returns unchanged + assert flattened == snapshot(schema) + + +def test_no_allof_returns_unchanged() -> None: + """Test that schema without allOf returns unchanged.""" + schema: dict[str, Any] = { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'required': ['a'], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # Schema without allOf returns unchanged + assert flattened == snapshot(schema) + + +def test_allof_not_list_returns_unchanged() -> None: + """Test that schema with allOf that is not a list returns unchanged.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': 'not a list', # allOf is not a list + 'properties': {'a': {'type': 'string'}}, + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # Schema with allOf that is not a list returns unchanged + assert flattened == snapshot(schema) + + +def test_allof_empty_list_returns_unchanged() -> None: + """Test that schema with empty allOf list returns unchanged.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [], # Empty list + 'properties': {'a': {'type': 'string'}}, + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # Schema with empty allOf list returns unchanged + assert flattened == snapshot(schema) + + +def test_allof_member_not_dict_returns_unchanged() -> None: + """Test that schema with allOf containing non-dict members returns unchanged.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + {'type': 'object', 'properties': {'a': {'type': 'string'}}}, + 'not a dict', # Non-dict member + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # Schema with allOf containing non-dict members returns unchanged + assert flattened == snapshot(schema) + + +def test_allof_member_not_object_like_returns_unchanged() -> None: + """Test that schema with allOf containing non object-like members returns unchanged.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + {'type': 'object', 'properties': {'a': {'type': 'string'}}}, + {'type': 'string'}, # Not object-like (type is 'string', not 'object') + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # Schema with allOf containing non object-like members returns unchanged + assert flattened == snapshot(schema) + + +def test_allof_member_no_type_no_object_keys_returns_unchanged() -> None: + """Test that schema with allOf containing members without type and without object keys returns unchanged.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + {'type': 'object', 'properties': {'a': {'type': 'string'}}}, + {'description': 'not object-like'}, # No type, no object keys + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # Schema with allOf containing non object-like members returns unchanged + assert flattened == snapshot(schema) + + +def test_properties_and_required_both_exist_after_filtering() -> None: + """Test that both properties and required exist when filtering is applied.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}, 'b': {'type': 'string'}}, + 'required': ['a', 'b'], + 'additionalProperties': False, + }, + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, # Only 'a' in common + 'required': ['a'], + 'additionalProperties': False, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # 'a' is in both restricted sets, so it survives + # When _filter_properties_and_required is called: + # - properties exists (line 239 True) + # - filtered is not empty, so properties is updated + # - required exists (line 247 True) + # This ensures the branch handling both properties and required is executed + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'required': ['a'], + 'additionalProperties': False, + } + ) + + +def test_get_type_set_with_schema_without_type() -> None: + """Test that _get_type_set returns None when schema has no type field.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {}}, # Schema senza type + 'additionalProperties': True, + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + 'additionalProperties': False, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # 'a' has schema without type, so _get_type_set returns None + # 'a' is not in second member and second has additionalProperties: False + # So 'a' is removed + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + 'additionalProperties': False, + } + ) + + +def test_array_items_in_properties_handled_recursively() -> None: + """Test that array items in properties are handled recursively.""" + schema: dict[str, Any] = { + 'type': 'object', + 'properties': { + 'items': { + 'type': 'array', + 'items': { + 'allOf': [ + {'type': 'object', 'properties': {'x': {'type': 'string'}}}, + {'type': 'object', 'properties': {'y': {'type': 'integer'}}}, + ], + }, + }, + }, + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # Array items with allOf are flattened + assert flattened == snapshot( + { + 'type': 'object', + 'properties': { + 'items': { + 'type': 'array', + 'items': { + 'type': 'object', + 'properties': {'x': {'type': 'string'}, 'y': {'type': 'integer'}}, + }, + }, + }, + } + ) + + +def test_get_type_set_with_none_schema() -> None: + """Test that _get_type_set handles None schema (falsy). + + This test covers the case where _get_type_set is called with None + through additionalProperties which is None. + """ + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'additionalProperties': None, # None schema - triggers guard clause + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + 'additionalProperties': False, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # additionalProperties: None is handled as True (default) + # 'a' and 'b' are compatible + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'b': {'type': 'string'}}, + 'additionalProperties': False, + } + ) + + +def test_properties_and_required_exist_after_filtering_with_base_schema() -> None: + """Test that both properties and required exist after filtering when base schema has them.""" + schema: dict[str, Any] = { + 'type': 'object', + 'properties': {'base': {'type': 'string'}}, # Initial properties + 'required': ['base'], # Initial required + 'allOf': [ + { + 'type': 'object', + 'properties': {'base': {'type': 'string'}, 'extra': {'type': 'string'}}, + 'required': ['base', 'extra'], + 'additionalProperties': False, + }, + { + 'type': 'object', + 'properties': {'base': {'type': 'string'}}, # Solo 'base' in comune + 'required': ['base'], + 'additionalProperties': False, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # 'base' is in both restricted sets, so it survives + # When _filter_properties_and_required is called: + # - properties exists (line 240 True) with 'base' + # - filtered_props is not empty, so properties is updated (we don't return) + # - required exists (line 251 True) with 'base' + # This ensures the branch handling both properties and required is executed + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'base': {'type': 'string'}}, + 'required': ['base'], + 'additionalProperties': False, + } + ) + + +def test_properties_and_required_both_exist_after_restricted_filtering() -> None: + """Test that both properties and required exist after restricted property sets filtering.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}, 'b': {'type': 'string'}}, + 'required': ['a', 'b'], + 'additionalProperties': False, + }, + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}, 'b': {'type': 'string'}}, + 'required': ['a', 'b'], + 'additionalProperties': False, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # Entrambi hanno 'a' e 'b', quindi entrambi sopravvivono al filtering + # When _filter_properties_and_required is called: + # - properties exists (line 240 True) + # - filtered_props is not empty (contains 'a' and 'b'), so we don't return + # - required exists (line 251 True) + # This ensures the branch handling both properties and required is executed + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}, 'b': {'type': 'string'}}, + 'required': ['a', 'b'], + 'additionalProperties': False, + } + ) + + +def test_property_filtered_when_not_in_member_with_false_additional() -> None: + """Test that property is filtered when not in member properties and member has additionalProperties: False.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'additionalProperties': True, + }, + { + 'type': 'object', + 'properties': {}, # 'a' is not here + 'additionalProperties': False, # When checking 'a': + # - 'a' is NOT in member_props (line 330 False) + # - member_additional is False (line 347 True, we execute 348-349) + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # 'a' is removed because it's not in second member and second has additionalProperties: False + # This triggers lines 348-349 + assert flattened == snapshot( + { + 'type': 'object', + 'additionalProperties': False, + } + ) + + +def test_array_items_dict_in_properties_processed_recursively() -> None: + """Test that array items dict in properties is processed recursively.""" + schema: dict[str, Any] = { + 'type': 'object', + 'properties': { + 'items': { + 'type': 'array', + 'items': { + 'type': 'object', + 'properties': {'x': {'type': 'string'}}, + }, + }, + }, + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # Array items is a dict, so it is processed (line 385 True) + # This ensures the branch handling array items is executed + assert flattened == snapshot( + { + 'type': 'object', + 'properties': { + 'items': { + 'type': 'array', + 'items': { + 'type': 'object', + 'properties': {'x': {'type': 'string'}}, + }, + }, + }, + } + ) + + +def test_allof_with_array_items() -> None: + """Test that allOf in array items is flattened recursively.""" + schema: dict[str, Any] = { + 'type': 'array', + 'items': { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'integer'}}, + }, + ], + }, + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + assert flattened == snapshot( + { + 'type': 'array', + 'items': { + 'type': 'object', + 'properties': { + 'a': {'type': 'string'}, + 'b': {'type': 'integer'}, + }, + }, + } + ) + + +def test_allof_single_member_with_dict_additional_properties() -> None: + """Test flattening allOf with single member having additionalProperties as dict schema.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'additionalProperties': {'type': 'string'}, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + 'additionalProperties': {'type': 'string'}, + } + ) + + +def test_allof_single_member_with_pattern_properties() -> None: + """Test flattening allOf with single member having patternProperties.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'b': {'type': 'integer'}}, + 'patternProperties': { + '^suffix_': {'type': 'number'}, + }, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'b': {'type': 'integer'}}, + 'patternProperties': { + '^suffix_': {'type': 'number'}, + }, + } + ) + + +def test_allof_base_schema_with_additional_properties_false() -> None: + """Test flattening allOf when base schema has additionalProperties: False.""" + schema: dict[str, Any] = { + 'type': 'object', + 'properties': {'base_prop': {'type': 'string'}}, + 'additionalProperties': False, + 'allOf': [ + { + 'type': 'object', + 'properties': {'member_prop': {'type': 'integer'}}, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'base_prop': {'type': 'string'}}, + 'additionalProperties': False, + } + ) + + +def test_allof_base_schema_with_pattern_properties() -> None: + """Test flattening allOf when base schema has patternProperties.""" + schema: dict[str, Any] = { + 'type': 'object', + 'patternProperties': { + '^prefix_': {'type': 'string'}, + }, + 'allOf': [ + { + 'type': 'object', + 'properties': {'member_prop': {'type': 'integer'}}, + 'patternProperties': { + '^suffix_': {'type': 'number'}, + }, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'member_prop': {'type': 'integer'}}, + 'patternProperties': { + '^prefix_': {'type': 'string'}, + '^suffix_': {'type': 'number'}, + }, + } + ) + + +def test_allof_filter_property_not_in_member_with_false_additional() -> None: + """Test that properties not explicitly defined in a member with additionalProperties: False are filtered out.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + # No additionalProperties - allows additional + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'integer'}}, + 'additionalProperties': False, # Only 'b' is allowed + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # Property 'a' should be removed because member 2 has additionalProperties: False + # and 'a' is not explicitly defined in member 2 + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'b': {'type': 'integer'}}, + 'additionalProperties': False, + } + ) + + +def test_allof_result_with_items_recursive_processing() -> None: + """Test that items in the result schema are processed recursively.""" + schema: dict[str, Any] = { + 'type': 'object', + 'properties': { + 'items': { + 'type': 'array', + 'items': { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'x': {'type': 'string'}}, + }, + { + 'type': 'object', + 'properties': {'y': {'type': 'integer'}}, + }, + ], + }, + }, + }, + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # The items inside the array should have allOf flattened + assert flattened == snapshot( + { + 'type': 'object', + 'properties': { + 'items': { + 'type': 'array', + 'items': { + 'type': 'object', + 'properties': { + 'x': {'type': 'string'}, + 'y': {'type': 'integer'}, + }, + }, + }, + }, + } + ) + + +def test_allof_base_schema_with_items() -> None: + """Test that items in base schema are processed recursively after allOf merge.""" + schema: dict[str, Any] = { + 'type': 'object', + 'items': { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'integer'}}, + }, + ], + }, + 'allOf': [ + { + 'type': 'object', + 'properties': {'c': {'type': 'number'}}, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # The items should have allOf flattened recursively + assert flattened == snapshot( + { + 'type': 'object', + 'properties': {'c': {'type': 'number'}}, + 'items': { + 'type': 'object', + 'properties': { + 'a': {'type': 'string'}, + 'b': {'type': 'integer'}, + }, + }, + } + ) + + +def test_array_with_items_allof() -> None: + """Test array with items containing allOf (realistic case from user).""" + schema: dict[str, Any] = { + 'type': 'array', + 'items': { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': { + 'remindAt': { + 'type': 'string', + 'format': 'date-time', + }, + }, + 'required': ['remindAt'], + }, + { + 'type': 'object', + 'properties': { + 'comment': { + 'type': 'string', + }, + }, + 'required': ['comment'], + }, + ], + }, + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # The items should have allOf flattened + assert flattened == snapshot( + { + 'type': 'array', + 'items': { + 'type': 'object', + 'properties': {'remindAt': {'type': 'string', 'format': 'date-time'}, 'comment': {'type': 'string'}}, + 'required': ['comment', 'remindAt'], + }, + } + ) + + +def test_allof_base_additional_properties_false_no_properties() -> None: + """Test that base schema with additionalProperties: False but no properties allows properties from allOf members. + + This covers lines 286-291: when base has additionalProperties: False but no properties, + we don't add an empty set to restricted_property_sets, so properties from allOf members are still valid. + """ + schema: dict[str, Any] = { + 'type': 'object', + 'additionalProperties': False, + 'allOf': [ + { + 'type': 'object', + 'properties': { + 'a': {'type': 'string'}, + }, + }, + { + 'type': 'object', + 'properties': { + 'b': {'type': 'integer'}, + }, + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # Properties from allOf members should be preserved even though base has additionalProperties: False + # because base has no properties, so we don't restrict to an empty set + assert flattened == snapshot( + { + 'type': 'object', + 'properties': { + 'a': {'type': 'string'}, + 'b': {'type': 'integer'}, + }, + 'additionalProperties': False, + } + ) + + +def test_array_without_allof() -> None: + """Test that array without allOf is processed recursively (items are still flattened if they contain allOf).""" + schema: dict[str, Any] = { + 'type': 'array', + 'items': { + 'type': 'object', + 'properties': { + 'nested': { + 'type': 'object', + 'allOf': [ + {'type': 'object', 'properties': {'x': {'type': 'string'}}}, + {'type': 'object', 'properties': {'y': {'type': 'integer'}}}, + ], + }, + }, + }, + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # The nested allOf in items.properties.nested should be flattened + assert flattened == snapshot( + { + 'type': 'array', + 'items': { + 'type': 'object', + 'properties': { + 'nested': { + 'type': 'object', + 'properties': { + 'x': {'type': 'string'}, + 'y': {'type': 'integer'}, + }, + }, + }, + }, + } + ) + + +def test_array_items_with_allof() -> None: + """Test that array items with allOf are flattened recursively.""" + schema: dict[str, Any] = { + 'type': 'array', + 'items': { + 'type': 'object', + 'allOf': [ + {'type': 'object', 'properties': {'a': {'type': 'string'}}}, + {'type': 'object', 'properties': {'b': {'type': 'integer'}}}, + ], + }, + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # The allOf in items should be flattened directly + assert flattened == snapshot( + { + 'type': 'array', + 'items': { + 'type': 'object', + 'properties': { + 'a': {'type': 'string'}, + 'b': {'type': 'integer'}, + }, + }, + } + ) + + +def test_array_items_without_allof_recursive() -> None: + """Test that array items without allOf are processed recursively (covers lines 242-245). + + This covers the case where the root schema has no allOf (Case 1), so it enters + the recursive processing branch. For arrays, this means processing items recursively. + """ + schema: dict[str, Any] = { + 'type': 'array', + 'items': { + 'type': 'object', + 'properties': { + 'nested': { + 'type': 'object', + 'allOf': [ + {'type': 'object', 'properties': {'x': {'type': 'string'}}}, + {'type': 'object', 'properties': {'y': {'type': 'integer'}}}, + ], + }, + }, + }, + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # The root array has no allOf, so it enters Case 1 and processes items recursively + # The items have no allOf, so they process properties recursively + # The nested property has allOf, which gets flattened + assert flattened == snapshot( + { + 'type': 'array', + 'items': { + 'type': 'object', + 'properties': { + 'nested': { + 'type': 'object', + 'properties': { + 'x': {'type': 'string'}, + 'y': {'type': 'integer'}, + }, + }, + }, + }, + } + ) + + +def test_array_with_simple_items_dict() -> None: + """Test that array with simple items dict is processed (covers lines 242-245). + + This directly covers the branch where schema_type == 'array' and items is a dict. + The items don't need to have allOf - they just need to be a dict to trigger the recursive call. + """ + schema: dict[str, Any] = { + 'type': 'array', + 'items': { + 'type': 'string', + }, + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # The array has no allOf, so it enters Case 1 + # schema_type == 'array', so it enters the elif branch + # items is a dict, so it calls _recurse_flatten_allof recursively + # The items have no allOf, so they're returned unchanged + assert flattened == snapshot( + { + 'type': 'array', + 'items': { + 'type': 'string', + }, + } + ) + + +def test_array_no_items() -> None: + """Test that array without items key is handled (covers branch 243->245). + + This covers the branch where schema_type == 'array' but items is None or not present, + so the if condition at line 243 is False and we skip to line 245 (return). + """ + schema: dict[str, Any] = { + 'type': 'array', + # No 'items' key + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # The array has no allOf, so it enters Case 1 + # schema_type == 'array', so it enters the elif branch + # items is None (not present), so isinstance(s.get('items'), dict) is False + # We skip to return s (line 245) + assert flattened == snapshot( + { + 'type': 'array', + } + ) + + +def test_allof_incompatible_props_member_false_additional_not_in_restricted() -> None: + """Test that covers lines 328-329: property not in restricted_property_sets but incompatible due to member with False additionalProperties.""" + schema: dict[str, Any] = { + 'type': 'object', + 'allOf': [ + { + 'type': 'object', + 'properties': {'a': {'type': 'string'}}, + # No additionalProperties - allows additional + }, + { + 'type': 'object', + 'properties': {'b': {'type': 'integer'}}, + # No additionalProperties - allows additional, so 'a' is not filtered by restricted_property_sets + }, + { + 'type': 'object', + # No properties defined + 'additionalProperties': False, # This will filter 'a' and 'b' in incompatible_props check + }, + ], + } + + transformer = FlattenAllofTransformer(schema) + flattened = transformer.walk() + + # Both 'a' and 'b' should be removed because member 3 has additionalProperties: False + # and they are not explicitly defined in member 3 + assert flattened == snapshot( + { + 'type': 'object', + 'additionalProperties': False, + } + )