Skip to content

Standardized processing operations output. #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 62 additions & 8 deletions docs/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ paths:
content:
application/json:
schema:
description: List of processing logs
type: array
items:
type: string
$ref: '#/components/schemas/processing_results'
'500':
description: Processing error
content:
Expand Down Expand Up @@ -95,10 +92,7 @@ paths:
content:
application/json:
schema:
description: List of processing logs
type: array
items:
type: string
$ref: '#/components/schemas/processing_results'
'404':
description: Collection not found
content:
Expand Down Expand Up @@ -292,6 +286,65 @@ components:
type: array
items:
description: Aggregate pipelines are defined at https://www.mongodb.com/docs/manual/aggregation/
processing_results:
description: Array of processing operation results
type: array
items:
type: object
description: Result of a single processing operation
required:
- operation
- status
properties:
operation:
description: Type of operation performed
type: string
enum:
- evaluate_version
- remove_schema
- drop_index
- run_migration
- create_index
- apply_schema
- load_test_data
- update_version
- collection_processing
- version_processing
- overall_status
status:
description: Operation status
type: string
enum:
- success
- error
- skipped
collection:
description: Collection name affected by the operation
type: string
message:
description: Human-readable status message
type: string
details_type:
description: Type of details for complex operations
type: string
enum:
- schema
- index
- migration
- test_data
- version
- error
- overall
details:
description: Operation-specific details (object or array)
oneOf:
- type: object
additionalProperties: true
- type: array
items:
type: object
additionalProperties: true
additionalProperties: true
config:
type: object
properties:
Expand Down Expand Up @@ -335,3 +388,4 @@ components:
type: array
items:
type: string

160 changes: 127 additions & 33 deletions stage0_mongodb_api/managers/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,11 @@ def process_all_collections(self) -> Dict[str, List[Dict]]:
results[collection_name] = [{
"operation": "collection_processing",
"collection": collection_name,
"error": str(e),
"message": f"Error processing collection: {str(e)}",
"details_type": "error",
"details": {
"error": str(e)
},
"status": "error"
}]
any_collection_failed = True
Expand All @@ -198,12 +202,15 @@ def process_all_collections(self) -> Dict[str, List[Dict]]:
for collection_name in results.keys():
results[collection_name].append({
"operation": "overall_status",
"status": overall_status,
"message": overall_message,
"collections_processed": len(self.collection_configs),
"collections_failed": sum(1 for result in results.values()
if any(isinstance(op, dict) and op.get("status") == "error"
for op in result))
"details_type": "overall",
"details": {
"collections_processed": len(self.collection_configs),
"collections_failed": sum(1 for result in results.values()
if any(isinstance(op, dict) and op.get("status") == "error"
for op in result))
},
"status": overall_status
})

return results
Expand Down Expand Up @@ -241,7 +248,17 @@ def process_collection_versions(self, collection_name: str) -> List[Dict]:
for version_config in versions:
current_version = VersionManager.get_current_version(collection_name)
version_number = VersionNumber(version_config.get("version"))
operations.append(f"Evaluating version {version_number}")
operations.append({
"operation": "evaluate_version",
"collection": collection_name,
"message": f"Evaluating version {version_number}",
"details_type": "version",
"details": {
"version": str(version_number),
"current_version": current_version
},
"status": "success"
})

# Only process versions greater than current version
if version_number > current_version:
Expand All @@ -257,14 +274,30 @@ def process_collection_versions(self, collection_name: str) -> List[Dict]:
current_version = VersionNumber(self.version_manager.get_current_version(collection_name))
else:
logger.info(f"Skipping version {str(version_number)} for {collection_name} - already processed")
operations.append({
"operation": "evaluate_version",
"collection": collection_name,
"message": f"Skipping version {version_number} - already processed",
"details_type": "version",
"details": {
"version": str(version_number),
"current_version": current_version,
"skipped": True
},
"status": "skipped"
})

except Exception as e:
logger.error(f"Error during version processing for {collection_name}: {str(e)}")
operations.append({
"operation": "version_processing",
"collection": collection_name,
"version": "unknown",
"error": f"Error during version processing: {str(e)}",
"message": f"Error during version processing: {str(e)}",
"details_type": "error",
"details": {
"error": str(e),
"version": "unknown"
},
"status": "error"
})

Expand All @@ -284,54 +317,100 @@ def _process_version(self, collection_name: str, version_config: Dict) -> List[D

try:
# Required: Remove existing schema validation
operations.append(f"Removing schema validation for {collection_name}")
operations.append(self.schema_manager.remove_schema(collection_name))
operations.append({
"operation": "remove_schema",
"collection": collection_name,
"message": f"Removing schema validation for {collection_name}",
"status": "success"
})
remove_result = self.schema_manager.remove_schema(collection_name)
operations.append(remove_result)
self._assert_no_errors(operations)

# Optional: Process drop_indexes if present
if "drop_indexes" in version_config:
for index in version_config["drop_indexes"]:
operations.append(f"Dropping index {index} for {collection_name}")
operations.append(self.index_manager.drop_index(collection_name, index))
operations.append({
"operation": "drop_index",
"collection": collection_name,
"message": f"Dropping index {index} for {collection_name}",
"status": "success"
})
drop_result = self.index_manager.drop_index(collection_name, index)
operations.append(drop_result)
self._assert_no_errors(operations)

# Optional: Process aggregations if present
if "aggregations" in version_config:
for migration in version_config["aggregations"]:
pipeline_name = migration.get("name", "unnamed_pipeline")
operations.append(f"Running Aggregation Pipeline '{pipeline_name}' for {collection_name}")
operations.append(self.migration_manager.run_migration(collection_name, migration))
operations.append({
"operation": "run_migration",
"collection": collection_name,
"message": f"Running Aggregation Pipeline '{pipeline_name}' for {collection_name}",
"status": "success"
})
migration_result = self.migration_manager.run_migration(collection_name, migration)
operations.append(migration_result)
self._assert_no_errors(operations)

# Optional: Process add_indexes if present
if "add_indexes" in version_config:
operations.append(f"Creating indexes for {collection_name}")
operations.append(self.index_manager.create_index(collection_name, version_config["add_indexes"]))
operations.append({
"operation": "create_index",
"collection": collection_name,
"message": f"Creating indexes for {collection_name}",
"status": "success"
})
create_result = self.index_manager.create_index(collection_name, version_config["add_indexes"])
operations.append(create_result)
self._assert_no_errors(operations)

# Required: Apply schema validation
operations.append(f"Applying schema for {collection_name}")
operations.append(self.schema_manager.apply_schema(f"{collection_name}.{version_config.get("version")}"))
operations.append({
"operation": "apply_schema",
"collection": collection_name,
"message": f"Applying schema for {collection_name}",
"status": "success"
})
apply_result = self.schema_manager.apply_schema(f"{collection_name}.{version_config.get("version")}")
operations.append(apply_result)
self._assert_no_errors(operations)

# Optional: Load test data if enabled and present
if "test_data" in version_config and self.config.LOAD_TEST_DATA:
operations.append(f"Loading test data for {collection_name} - {version_config['test_data']}")
operations.append(self._load_test_data(collection_name, version_config["test_data"]))
operations.append({
"operation": "load_test_data",
"collection": collection_name,
"message": f"Loading test data for {collection_name} - {version_config['test_data']}",
"status": "success"
})
test_data_result = self._load_test_data(collection_name, version_config["test_data"])
operations.append(test_data_result)
self._assert_no_errors(operations)

# Update version if version string is present
operations.append(f"Updating version for {collection_name}")
operations.append(self.version_manager.update_version(collection_name, version_config["version"]))
operations.append({
"operation": "update_version",
"collection": collection_name,
"message": f"Updating version for {collection_name}",
"status": "success"
})
version_result = self.version_manager.update_version(collection_name, version_config["version"])
operations.append(version_result)
self._assert_no_errors(operations)

except Exception as e:
logger.error(f"Error processing version for {collection_name}: {str(e)}")
operations.append({
"status": "error",
"operation": "version_processing",
"collection": collection_name,
"error": str(e)
"message": f"Error processing version: {str(e)}",
"details_type": "error",
"details": {
"error": str(e)
},
"status": "error"
})

return operations
Expand All @@ -344,7 +423,7 @@ def _load_test_data(self, collection_name: str, test_data_file: str) -> Dict:
test_data_file: Name of the test data file

Returns:
Dict containing operation result with proper error handling for bulk write errors
Dict containing operation result in consistent format
"""
from stage0_py_utils.mongo_utils.mongo_io import TestDataLoadError
try:
Expand All @@ -354,18 +433,29 @@ def _load_test_data(self, collection_name: str, test_data_file: str) -> Dict:
return {
"operation": "load_test_data",
"collection": collection_name,
"test_data": str(data_file),
"results": results,
"message": f"Test data loaded successfully from {test_data_file}",
"details_type": "test_data",
"details": {
"test_data_file": str(data_file),
"results": results,
"documents_loaded": results.get("documents_loaded", 0),
"inserted_ids": results.get("inserted_ids", []),
"acknowledged": results.get("acknowledged", False)
},
"status": "success"
}

except TestDataLoadError as e:
return {
"operation": "load_test_data",
"collection": collection_name,
"test_data": str(data_file),
"error": str(e),
"details": e.details,
"message": str(e),
"details_type": "error",
"details": {
"error": str(e),
"test_data_file": str(data_file),
"details": e.details
},
"status": "error"
}
except Exception as e:
Expand All @@ -374,8 +464,12 @@ def _load_test_data(self, collection_name: str, test_data_file: str) -> Dict:
return {
"operation": "load_test_data",
"collection": collection_name,
"test_data": str(data_file),
"error": error_message,
"message": error_message,
"details_type": "error",
"details": {
"error": error_message,
"test_data_file": str(data_file)
},
"status": "error"
}

Expand Down
Loading
Loading