@@ -185,7 +185,11 @@ def process_all_collections(self) -> Dict[str, List[Dict]]:
185
185
results [collection_name ] = [{
186
186
"operation" : "collection_processing" ,
187
187
"collection" : collection_name ,
188
- "error" : str (e ),
188
+ "message" : f"Error processing collection: { str (e )} " ,
189
+ "details_type" : "error" ,
190
+ "details" : {
191
+ "error" : str (e )
192
+ },
189
193
"status" : "error"
190
194
}]
191
195
any_collection_failed = True
@@ -198,12 +202,15 @@ def process_all_collections(self) -> Dict[str, List[Dict]]:
198
202
for collection_name in results .keys ():
199
203
results [collection_name ].append ({
200
204
"operation" : "overall_status" ,
201
- "status" : overall_status ,
202
205
"message" : overall_message ,
203
- "collections_processed" : len (self .collection_configs ),
204
- "collections_failed" : sum (1 for result in results .values ()
205
- if any (isinstance (op , dict ) and op .get ("status" ) == "error"
206
- for op in result ))
206
+ "details_type" : "overall" ,
207
+ "details" : {
208
+ "collections_processed" : len (self .collection_configs ),
209
+ "collections_failed" : sum (1 for result in results .values ()
210
+ if any (isinstance (op , dict ) and op .get ("status" ) == "error"
211
+ for op in result ))
212
+ },
213
+ "status" : overall_status
207
214
})
208
215
209
216
return results
@@ -241,7 +248,17 @@ def process_collection_versions(self, collection_name: str) -> List[Dict]:
241
248
for version_config in versions :
242
249
current_version = VersionManager .get_current_version (collection_name )
243
250
version_number = VersionNumber (version_config .get ("version" ))
244
- operations .append (f"Evaluating version { version_number } " )
251
+ operations .append ({
252
+ "operation" : "evaluate_version" ,
253
+ "collection" : collection_name ,
254
+ "message" : f"Evaluating version { version_number } " ,
255
+ "details_type" : "version" ,
256
+ "details" : {
257
+ "version" : str (version_number ),
258
+ "current_version" : current_version
259
+ },
260
+ "status" : "success"
261
+ })
245
262
246
263
# Only process versions greater than current version
247
264
if version_number > current_version :
@@ -257,14 +274,30 @@ def process_collection_versions(self, collection_name: str) -> List[Dict]:
257
274
current_version = VersionNumber (self .version_manager .get_current_version (collection_name ))
258
275
else :
259
276
logger .info (f"Skipping version { str (version_number )} for { collection_name } - already processed" )
277
+ operations .append ({
278
+ "operation" : "evaluate_version" ,
279
+ "collection" : collection_name ,
280
+ "message" : f"Skipping version { version_number } - already processed" ,
281
+ "details_type" : "version" ,
282
+ "details" : {
283
+ "version" : str (version_number ),
284
+ "current_version" : current_version ,
285
+ "skipped" : True
286
+ },
287
+ "status" : "skipped"
288
+ })
260
289
261
290
except Exception as e :
262
291
logger .error (f"Error during version processing for { collection_name } : { str (e )} " )
263
292
operations .append ({
264
293
"operation" : "version_processing" ,
265
294
"collection" : collection_name ,
266
- "version" : "unknown" ,
267
- "error" : f"Error during version processing: { str (e )} " ,
295
+ "message" : f"Error during version processing: { str (e )} " ,
296
+ "details_type" : "error" ,
297
+ "details" : {
298
+ "error" : str (e ),
299
+ "version" : "unknown"
300
+ },
268
301
"status" : "error"
269
302
})
270
303
@@ -284,54 +317,100 @@ def _process_version(self, collection_name: str, version_config: Dict) -> List[D
284
317
285
318
try :
286
319
# Required: Remove existing schema validation
287
- operations .append (f"Removing schema validation for { collection_name } " )
288
- operations .append (self .schema_manager .remove_schema (collection_name ))
320
+ operations .append ({
321
+ "operation" : "remove_schema" ,
322
+ "collection" : collection_name ,
323
+ "message" : f"Removing schema validation for { collection_name } " ,
324
+ "status" : "success"
325
+ })
326
+ remove_result = self .schema_manager .remove_schema (collection_name )
327
+ operations .append (remove_result )
289
328
self ._assert_no_errors (operations )
290
329
291
330
# Optional: Process drop_indexes if present
292
331
if "drop_indexes" in version_config :
293
332
for index in version_config ["drop_indexes" ]:
294
- operations .append (f"Dropping index { index } for { collection_name } " )
295
- operations .append (self .index_manager .drop_index (collection_name , index ))
333
+ operations .append ({
334
+ "operation" : "drop_index" ,
335
+ "collection" : collection_name ,
336
+ "message" : f"Dropping index { index } for { collection_name } " ,
337
+ "status" : "success"
338
+ })
339
+ drop_result = self .index_manager .drop_index (collection_name , index )
340
+ operations .append (drop_result )
296
341
self ._assert_no_errors (operations )
297
342
298
343
# Optional: Process aggregations if present
299
344
if "aggregations" in version_config :
300
345
for migration in version_config ["aggregations" ]:
301
346
pipeline_name = migration .get ("name" , "unnamed_pipeline" )
302
- operations .append (f"Running Aggregation Pipeline '{ pipeline_name } ' for { collection_name } " )
303
- operations .append (self .migration_manager .run_migration (collection_name , migration ))
347
+ operations .append ({
348
+ "operation" : "run_migration" ,
349
+ "collection" : collection_name ,
350
+ "message" : f"Running Aggregation Pipeline '{ pipeline_name } ' for { collection_name } " ,
351
+ "status" : "success"
352
+ })
353
+ migration_result = self .migration_manager .run_migration (collection_name , migration )
354
+ operations .append (migration_result )
304
355
self ._assert_no_errors (operations )
305
356
306
357
# Optional: Process add_indexes if present
307
358
if "add_indexes" in version_config :
308
- operations .append (f"Creating indexes for { collection_name } " )
309
- operations .append (self .index_manager .create_index (collection_name , version_config ["add_indexes" ]))
359
+ operations .append ({
360
+ "operation" : "create_index" ,
361
+ "collection" : collection_name ,
362
+ "message" : f"Creating indexes for { collection_name } " ,
363
+ "status" : "success"
364
+ })
365
+ create_result = self .index_manager .create_index (collection_name , version_config ["add_indexes" ])
366
+ operations .append (create_result )
310
367
self ._assert_no_errors (operations )
311
368
312
369
# Required: Apply schema validation
313
- operations .append (f"Applying schema for { collection_name } " )
314
- operations .append (self .schema_manager .apply_schema (f"{ collection_name } .{ version_config .get ("version" )} " ))
370
+ operations .append ({
371
+ "operation" : "apply_schema" ,
372
+ "collection" : collection_name ,
373
+ "message" : f"Applying schema for { collection_name } " ,
374
+ "status" : "success"
375
+ })
376
+ apply_result = self .schema_manager .apply_schema (f"{ collection_name } .{ version_config .get ("version" )} " )
377
+ operations .append (apply_result )
315
378
self ._assert_no_errors (operations )
316
379
317
380
# Optional: Load test data if enabled and present
318
381
if "test_data" in version_config and self .config .LOAD_TEST_DATA :
319
- operations .append (f"Loading test data for { collection_name } - { version_config ['test_data' ]} " )
320
- operations .append (self ._load_test_data (collection_name , version_config ["test_data" ]))
382
+ operations .append ({
383
+ "operation" : "load_test_data" ,
384
+ "collection" : collection_name ,
385
+ "message" : f"Loading test data for { collection_name } - { version_config ['test_data' ]} " ,
386
+ "status" : "success"
387
+ })
388
+ test_data_result = self ._load_test_data (collection_name , version_config ["test_data" ])
389
+ operations .append (test_data_result )
321
390
self ._assert_no_errors (operations )
322
391
323
392
# Update version if version string is present
324
- operations .append (f"Updating version for { collection_name } " )
325
- operations .append (self .version_manager .update_version (collection_name , version_config ["version" ]))
393
+ operations .append ({
394
+ "operation" : "update_version" ,
395
+ "collection" : collection_name ,
396
+ "message" : f"Updating version for { collection_name } " ,
397
+ "status" : "success"
398
+ })
399
+ version_result = self .version_manager .update_version (collection_name , version_config ["version" ])
400
+ operations .append (version_result )
326
401
self ._assert_no_errors (operations )
327
402
328
403
except Exception as e :
329
404
logger .error (f"Error processing version for { collection_name } : { str (e )} " )
330
405
operations .append ({
331
- "status" : "error" ,
332
406
"operation" : "version_processing" ,
333
407
"collection" : collection_name ,
334
- "error" : str (e )
408
+ "message" : f"Error processing version: { str (e )} " ,
409
+ "details_type" : "error" ,
410
+ "details" : {
411
+ "error" : str (e )
412
+ },
413
+ "status" : "error"
335
414
})
336
415
337
416
return operations
@@ -344,7 +423,7 @@ def _load_test_data(self, collection_name: str, test_data_file: str) -> Dict:
344
423
test_data_file: Name of the test data file
345
424
346
425
Returns:
347
- Dict containing operation result with proper error handling for bulk write errors
426
+ Dict containing operation result in consistent format
348
427
"""
349
428
from stage0_py_utils .mongo_utils .mongo_io import TestDataLoadError
350
429
try :
@@ -354,18 +433,29 @@ def _load_test_data(self, collection_name: str, test_data_file: str) -> Dict:
354
433
return {
355
434
"operation" : "load_test_data" ,
356
435
"collection" : collection_name ,
357
- "test_data" : str (data_file ),
358
- "results" : results ,
436
+ "message" : f"Test data loaded successfully from { test_data_file } " ,
437
+ "details_type" : "test_data" ,
438
+ "details" : {
439
+ "test_data_file" : str (data_file ),
440
+ "results" : results ,
441
+ "documents_loaded" : results .get ("documents_loaded" , 0 ),
442
+ "inserted_ids" : results .get ("inserted_ids" , []),
443
+ "acknowledged" : results .get ("acknowledged" , False )
444
+ },
359
445
"status" : "success"
360
446
}
361
447
362
448
except TestDataLoadError as e :
363
449
return {
364
450
"operation" : "load_test_data" ,
365
451
"collection" : collection_name ,
366
- "test_data" : str (data_file ),
367
- "error" : str (e ),
368
- "details" : e .details ,
452
+ "message" : str (e ),
453
+ "details_type" : "error" ,
454
+ "details" : {
455
+ "error" : str (e ),
456
+ "test_data_file" : str (data_file ),
457
+ "details" : e .details
458
+ },
369
459
"status" : "error"
370
460
}
371
461
except Exception as e :
@@ -374,8 +464,12 @@ def _load_test_data(self, collection_name: str, test_data_file: str) -> Dict:
374
464
return {
375
465
"operation" : "load_test_data" ,
376
466
"collection" : collection_name ,
377
- "test_data" : str (data_file ),
378
- "error" : error_message ,
467
+ "message" : error_message ,
468
+ "details_type" : "error" ,
469
+ "details" : {
470
+ "error" : error_message ,
471
+ "test_data_file" : str (data_file )
472
+ },
379
473
"status" : "error"
380
474
}
381
475
0 commit comments