3030from tfx .orchestration import metadata
3131from tfx .orchestration import node_proto_view
3232from tfx .orchestration .experimental .core import constants
33+ from tfx .orchestration .experimental .core import env
3334from tfx .orchestration .experimental .core import mlmd_state
3435from tfx .orchestration .experimental .core import task as task_lib
3536from tfx .orchestration import mlmd_connection_manager as mlmd_cm
@@ -548,21 +549,35 @@ def register_executions_from_existing_executions(
548549 contexts = metadata_handle .store .get_contexts_by_execution (
549550 existing_executions [0 ].id
550551 )
551- return execution_lib .put_executions (
552+ executions = execution_lib .put_executions (
552553 metadata_handle ,
553554 new_executions ,
554555 contexts ,
555556 input_artifacts_maps = input_artifacts ,
556557 )
557558
559+ pipeline_asset = metadata_handle .store .pipeline_asset
560+ if pipeline_asset :
561+ env .get_env ().create_pipeline_run_node_executions (
562+ pipeline_asset .owner ,
563+ pipeline_asset .name ,
564+ pipeline ,
565+ node .node_info .id ,
566+ executions ,
567+ )
568+
569+ return executions
570+
558571
559572def register_executions (
560573 metadata_handle : metadata .Metadata ,
561574 execution_type : metadata_store_pb2 .ExecutionType ,
562575 contexts : Sequence [metadata_store_pb2 .Context ],
563576 input_and_params : Sequence [InputAndParam ],
577+ pipeline : Optional [pipeline_pb2 .Pipeline ] = None ,
578+ node_id : Optional [str ] = None ,
564579) -> Sequence [metadata_store_pb2 .Execution ]:
565- """Registers multiple executions in MLMD.
580+ """Registers multiple executions in MLMD and Tflex backends .
566581
567582 Along with the execution:
568583 - the input artifacts will be linked to the executions.
@@ -575,6 +590,8 @@ def register_executions(
575590 input_and_params: A list of InputAndParams, which includes input_dicts
576591 (dictionaries of artifacts. One execution will be registered for each of
577592 the input_dict) and corresponding exec_properties.
593+ pipeline: Optional. The pipeline proto.
594+ node_id: Optional. The node id of the executions to be registered.
578595
579596 Returns:
580597 A list of MLMD executions that are registered in MLMD, with id populated.
@@ -603,21 +620,35 @@ def register_executions(
603620 executions .append (execution )
604621
605622 if len (executions ) == 1 :
606- return [
623+ new_executions = [
607624 execution_lib .put_execution (
608625 metadata_handle ,
609626 executions [0 ],
610627 contexts ,
611628 input_artifacts = input_and_params [0 ].input_artifacts ,
612629 )
613630 ]
631+ else :
632+ new_executions = execution_lib .put_executions (
633+ metadata_handle ,
634+ executions ,
635+ contexts ,
636+ [
637+ input_and_param .input_artifacts
638+ for input_and_param in input_and_params
639+ ],
640+ )
614641
615- return execution_lib .put_executions (
616- metadata_handle ,
617- executions ,
618- contexts ,
619- [input_and_param .input_artifacts for input_and_param in input_and_params ],
620- )
642+ pipeline_asset = metadata_handle .store .pipeline_asset
643+ if pipeline_asset and pipeline and node_id :
644+ env .get_env ().create_pipeline_run_node_executions (
645+ pipeline_asset .owner ,
646+ pipeline_asset .name ,
647+ pipeline ,
648+ node_id ,
649+ new_executions ,
650+ )
651+ return new_executions
621652
622653
623654def update_external_artifact_type (
0 commit comments