From 09c8ef420385c6820a3ba29cbe9f4c0bff1b829c Mon Sep 17 00:00:00 2001 From: taskbot Date: Mon, 17 Nov 2025 15:10:38 +0100 Subject: [PATCH 1/2] schema and config changes for vMCP: Composition - Output Aggregation for Multi-Step Workflows Successfully added the OutputFormat / outputFormat field to all configuration layers for vMCP composite tool workflows. This PR lays the foundation for implementing output aggregation in multi-step workflows as described in issue #174. --- ...virtualmcpcompositetooldefinition_types.go | 26 ++++++ deploy/charts/operator-crds/Chart.yaml | 2 +- deploy/charts/operator-crds/README.md | 2 +- ...ev_virtualmcpcompositetooldefinitions.yaml | 25 ++++++ docs/operator/crd-api.md | 1 + pkg/vmcp/composer/composer.go | 11 +++ pkg/vmcp/config/config.go | 24 ++++++ pkg/vmcp/server/workflow_converter.go | 13 +-- pkg/vmcp/server/workflow_converter_test.go | 86 +++++++++++++++++++ 9 files changed, 182 insertions(+), 8 deletions(-) diff --git a/cmd/thv-operator/api/v1alpha1/virtualmcpcompositetooldefinition_types.go b/cmd/thv-operator/api/v1alpha1/virtualmcpcompositetooldefinition_types.go index b8214c975..adff56749 100644 --- a/cmd/thv-operator/api/v1alpha1/virtualmcpcompositetooldefinition_types.go +++ b/cmd/thv-operator/api/v1alpha1/virtualmcpcompositetooldefinition_types.go @@ -46,6 +46,32 @@ type VirtualMCPCompositeToolDefinitionSpec struct { // +kubebuilder:default=abort // +optional FailureMode string `json:"failureMode,omitempty"` + + // OutputFormat is an optional Go template for constructing the workflow output. + // If specified, the template is evaluated with access to: + // - .params.* - Input parameters + // - .steps.*.output - Step outputs + // - .steps.*.status - Step status + // - .workflow.* - Workflow metadata (id, duration, timestamps) + // + // The template must produce valid JSON. If omitted, defaults to returning + // the last step's output (backward compatible behavior). + // + // Example: + // outputFormat: | + // { + // "data": { + // "logs": {{.steps.fetch_logs.output}}, + // "metrics": {{.steps.fetch_metrics.output}} + // }, + // "metadata": { + // "workflow_id": "{{.workflow.id}}", + // "duration_ms": {{.workflow.duration_ms}} + // } + // } + // + // +optional + OutputFormat string `json:"outputFormat,omitempty"` } // VirtualMCPCompositeToolDefinitionStatus defines the observed state of VirtualMCPCompositeToolDefinition diff --git a/deploy/charts/operator-crds/Chart.yaml b/deploy/charts/operator-crds/Chart.yaml index 8fce5a988..abb1ad542 100644 --- a/deploy/charts/operator-crds/Chart.yaml +++ b/deploy/charts/operator-crds/Chart.yaml @@ -2,5 +2,5 @@ apiVersion: v2 name: toolhive-operator-crds description: A Helm chart for installing the ToolHive Operator CRDs into Kubernetes. type: application -version: 0.0.56 +version: 0.0.57 appVersion: "0.0.1" diff --git a/deploy/charts/operator-crds/README.md b/deploy/charts/operator-crds/README.md index 4f1ad81b3..05cf6811c 100644 --- a/deploy/charts/operator-crds/README.md +++ b/deploy/charts/operator-crds/README.md @@ -1,7 +1,7 @@ # ToolHive Operator CRDs Helm Chart -![Version: 0.0.56](https://img.shields.io/badge/Version-0.0.56-informational?style=flat-square) +![Version: 0.0.57](https://img.shields.io/badge/Version-0.0.57-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) A Helm chart for installing the ToolHive Operator CRDs into Kubernetes. diff --git a/deploy/charts/operator-crds/crds/toolhive.stacklok.dev_virtualmcpcompositetooldefinitions.yaml b/deploy/charts/operator-crds/crds/toolhive.stacklok.dev_virtualmcpcompositetooldefinitions.yaml index 7141bb4c9..7b8bda73d 100644 --- a/deploy/charts/operator-crds/crds/toolhive.stacklok.dev_virtualmcpcompositetooldefinitions.yaml +++ b/deploy/charts/operator-crds/crds/toolhive.stacklok.dev_virtualmcpcompositetooldefinitions.yaml @@ -92,6 +92,31 @@ spec: minLength: 1 pattern: ^[a-z0-9]([a-z0-9_-]*[a-z0-9])?$ type: string + outputFormat: + description: |- + OutputFormat is an optional Go template for constructing the workflow output. + If specified, the template is evaluated with access to: + - .params.* - Input parameters + - .steps.*.output - Step outputs + - .steps.*.status - Step status + - .workflow.* - Workflow metadata (id, duration, timestamps) + + The template must produce valid JSON. If omitted, defaults to returning + the last step's output (backward compatible behavior). + + Example: + outputFormat: | + { + "data": { + "logs": {{.steps.fetch_logs.output}}, + "metrics": {{.steps.fetch_metrics.output}} + }, + "metadata": { + "workflow_id": "{{.workflow.id}}", + "duration_ms": {{.workflow.duration_ms}} + } + } + type: string parameters: additionalProperties: description: ParameterSpec defines a parameter for a composite tool diff --git a/docs/operator/crd-api.md b/docs/operator/crd-api.md index 62b239041..8556dc815 100644 --- a/docs/operator/crd-api.md +++ b/docs/operator/crd-api.md @@ -1813,6 +1813,7 @@ _Appears in:_ | `steps` _[WorkflowStep](#workflowstep) array_ | Steps defines the workflow step definitions
Steps are executed sequentially in Phase 1
Phase 2 will support DAG execution via dependsOn | | MinItems: 1
Required: \{\}
| | `timeout` _string_ | Timeout is the overall workflow timeout
Defaults to 30m if not specified | 30m | Pattern: `^([0-9]+(\.[0-9]+)?(ms\|s\|m\|h))+$`
| | `failureMode` _string_ | FailureMode defines the failure handling strategy
- abort: Stop execution on first failure (default)
- continue: Continue executing remaining steps
- best_effort: Try all steps, report partial success | abort | Enum: [abort continue best_effort]
| +| `outputFormat` _string_ | OutputFormat is an optional Go template for constructing the workflow output.
If specified, the template is evaluated with access to:
- .params.* - Input parameters
- .steps.*.output - Step outputs
- .steps.*.status - Step status
- .workflow.* - Workflow metadata (id, duration, timestamps)
The template must produce valid JSON. If omitted, defaults to returning
the last step's output (backward compatible behavior).
Example:
outputFormat: \|
\{
"data": \{
"logs": \{\{.steps.fetch_logs.output\}\},
"metrics": \{\{.steps.fetch_metrics.output\}\}
\},
"metadata": \{
"workflow_id": "\{\{.workflow.id\}\}",
"duration_ms": \{\{.workflow.duration_ms\}\}
\}
\} | | | #### VirtualMCPCompositeToolDefinitionStatus diff --git a/pkg/vmcp/composer/composer.go b/pkg/vmcp/composer/composer.go index 031159b55..febdfafe2 100644 --- a/pkg/vmcp/composer/composer.go +++ b/pkg/vmcp/composer/composer.go @@ -59,6 +59,17 @@ type WorkflowDefinition struct { // Options: "abort" (default), "continue", "best_effort" FailureMode string + // OutputFormat is an optional Go template for constructing the workflow output. + // If specified, the template is evaluated with access to: + // - .params.* - Input parameters + // - .steps.*.output - Step outputs + // - .steps.*.status - Step status + // - .workflow.* - Workflow metadata (id, duration, timestamps) + // + // The template must produce valid JSON. If omitted, defaults to returning + // the last step's output (backward compatible behavior). + OutputFormat string + // Metadata stores additional workflow information. Metadata map[string]string } diff --git a/pkg/vmcp/config/config.go b/pkg/vmcp/config/config.go index 56de900cc..e4ce03ec4 100644 --- a/pkg/vmcp/config/config.go +++ b/pkg/vmcp/config/config.go @@ -339,6 +339,30 @@ type CompositeToolConfig struct { // Steps are the workflow steps to execute. Steps []*WorkflowStepConfig `json:"steps"` + + // OutputFormat is an optional template for constructing the workflow output. + // If specified, the template is evaluated with access to: + // - .params.* - Input parameters + // - .steps.*.output - Step outputs + // - .steps.*.status - Step status + // - .workflow.* - Workflow metadata (id, duration, timestamps) + // + // The template must produce valid JSON. If omitted, defaults to returning + // the last step's output (backward compatible behavior). + // + // Example: + // output_format: | + // { + // "data": { + // "logs": {{.steps.fetch_logs.output}}, + // "metrics": {{.steps.fetch_metrics.output}} + // }, + // "metadata": { + // "workflow_id": "{{.workflow.id}}", + // "duration_ms": {{.workflow.duration_ms}} + // } + // } + OutputFormat string `json:"output_format,omitempty" yaml:"output_format,omitempty"` } // ParameterSchema defines a workflow parameter. diff --git a/pkg/vmcp/server/workflow_converter.go b/pkg/vmcp/server/workflow_converter.go index f8d0fb5d9..30d4381aa 100644 --- a/pkg/vmcp/server/workflow_converter.go +++ b/pkg/vmcp/server/workflow_converter.go @@ -75,12 +75,13 @@ func ConvertConfigToWorkflowDefinitions( // Create workflow definition def := &composer.WorkflowDefinition{ - Name: ct.Name, - Description: ct.Description, - Parameters: params, - Steps: steps, - Timeout: timeout, - Metadata: make(map[string]string), + Name: ct.Name, + Description: ct.Description, + Parameters: params, + Steps: steps, + Timeout: timeout, + OutputFormat: ct.OutputFormat, + Metadata: make(map[string]string), } workflowDefs[ct.Name] = def diff --git a/pkg/vmcp/server/workflow_converter_test.go b/pkg/vmcp/server/workflow_converter_test.go index 595ac2327..0142734bb 100644 --- a/pkg/vmcp/server/workflow_converter_test.go +++ b/pkg/vmcp/server/workflow_converter_test.go @@ -264,3 +264,89 @@ func TestConvertSteps_ComplexWorkflow(t *testing.T) { assert.NotEmpty(t, result[2].Condition) assert.Equal(t, []string{"confirm"}, result[2].DependsOn) } + +func TestConvertConfigToWorkflowDefinitions_WithOutputFormat(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + input []*config.CompositeToolConfig + wantOutputFormat string + }{ + { + name: "workflow without output_format", + input: []*config.CompositeToolConfig{{ + Name: "simple", + Steps: []*config.WorkflowStepConfig{ + {ID: "s1", Type: "tool", Tool: "backend.tool"}, + }, + }}, + wantOutputFormat: "", + }, + { + name: "workflow with output_format", + input: []*config.CompositeToolConfig{{ + Name: "aggregated", + Steps: []*config.WorkflowStepConfig{ + {ID: "fetch_logs", Type: "tool", Tool: "splunk.fetch"}, + {ID: "fetch_metrics", Type: "tool", Tool: "datadog.fetch"}, + }, + OutputFormat: `{ + "logs": {{.steps.fetch_logs.output}}, + "metrics": {{.steps.fetch_metrics.output}} + }`, + }}, + wantOutputFormat: `{ + "logs": {{.steps.fetch_logs.output}}, + "metrics": {{.steps.fetch_metrics.output}} + }`, + }, + { + name: "workflow with complex output_format", + input: []*config.CompositeToolConfig{{ + Name: "investigation", + Steps: []*config.WorkflowStepConfig{ + {ID: "fetch_data", Type: "tool", Tool: "backend.fetch"}, + {ID: "analyze", Type: "tool", Tool: "backend.analyze"}, + }, + OutputFormat: `{ + "data": {{.steps.fetch_data.output}}, + "analysis": {{.steps.analyze.output}}, + "metadata": { + "workflow_id": "{{.workflow.id}}", + "duration_ms": {{.workflow.duration_ms}} + } + }`, + }}, + wantOutputFormat: `{ + "data": {{.steps.fetch_data.output}}, + "analysis": {{.steps.analyze.output}}, + "metadata": { + "workflow_id": "{{.workflow.id}}", + "duration_ms": {{.workflow.duration_ms}} + } + }`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + result, err := ConvertConfigToWorkflowDefinitions(tt.input) + + require.NoError(t, err) + require.Len(t, result, 1) + + // Get the first (and only) workflow definition + var workflowDef *composer.WorkflowDefinition + for _, def := range result { + workflowDef = def + break + } + + require.NotNil(t, workflowDef) + assert.Equal(t, tt.wantOutputFormat, workflowDef.OutputFormat) + }) + } +} From b2e7e3281dc60cc2d5fad579d8fc2c20f4235087 Mon Sep 17 00:00:00 2001 From: taskbot Date: Mon, 17 Nov 2025 16:05:41 +0100 Subject: [PATCH 2/2] fix conflict --- deploy/charts/operator-crds/Chart.yaml | 2 +- deploy/charts/operator-crds/README.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/deploy/charts/operator-crds/Chart.yaml b/deploy/charts/operator-crds/Chart.yaml index abb1ad542..beabb7094 100644 --- a/deploy/charts/operator-crds/Chart.yaml +++ b/deploy/charts/operator-crds/Chart.yaml @@ -2,5 +2,5 @@ apiVersion: v2 name: toolhive-operator-crds description: A Helm chart for installing the ToolHive Operator CRDs into Kubernetes. type: application -version: 0.0.57 +version: 0.0.58 appVersion: "0.0.1" diff --git a/deploy/charts/operator-crds/README.md b/deploy/charts/operator-crds/README.md index 05cf6811c..039775afc 100644 --- a/deploy/charts/operator-crds/README.md +++ b/deploy/charts/operator-crds/README.md @@ -1,7 +1,7 @@ # ToolHive Operator CRDs Helm Chart -![Version: 0.0.57](https://img.shields.io/badge/Version-0.0.57-informational?style=flat-square) +![Version: 0.0.58](https://img.shields.io/badge/Version-0.0.58-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) A Helm chart for installing the ToolHive Operator CRDs into Kubernetes.