Skip to content

mark3labs/flyt

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

81 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Flyt Logo

Flyt

Norwegian for "flow" • Pronounced "fleet"

A minimalist workflow framework for Go with zero dependencies inspired by Pocket Flow.

Table of Contents

Installation

go get github.com/mark3labs/flyt

Getting Started

Using the Project Template (Recommended)

The fastest way to start a new Flyt project is using the official template:

# Create a new project from the template
git clone https://github.com/mark3labs/flyt-project-template my-flyt-project
cd my-flyt-project

# Remove the template git history and start fresh
rm -rf .git
git init

# Install dependencies
go mod tidy

# Run the example
go run main.go

The template provides a starting point for your Flyt project with a basic structure and example code.

Manual Setup

package main

import (
    "context"
    "fmt"
    "github.com/mark3labs/flyt"
)

func main() {
    // Create a simple node using the helper
    node := flyt.NewNode(
        flyt.WithExecFunc(func(ctx context.Context, prepResult flyt.Result) (flyt.Result, error) {
            fmt.Println("Hello, Flyt!")
            return flyt.R("done"), nil
        }),
    )

    // Run it
    ctx := context.Background()
    shared := flyt.NewSharedStore()
    
    action, err := flyt.Run(ctx, node, shared)
    if err != nil {
        panic(err)
    }
    fmt.Printf("Completed with action: %s\n", action)
}

Builder Pattern

Flyt supports a fluent builder pattern for creating nodes:

node := flyt.NewNode().
    WithMaxRetries(3).
    WithWait(time.Second).
    WithExecFunc(func(ctx context.Context, prepResult flyt.Result) (flyt.Result, error) {
        fmt.Println("Hello from builder pattern!")
        return flyt.R("done"), nil
    })

// NodeBuilder directly implements Node interface, so you can use it as-is
// node := flyt.NewNode().WithExecFunc(fn)

You can mix traditional and builder patterns:

// Start with traditional options
node := flyt.NewNode(
    flyt.WithMaxRetries(3),
    flyt.WithWait(time.Second),
)

// Continue with builder pattern
node = node.
    WithExecFunc(func(ctx context.Context, prepResult flyt.Result) (flyt.Result, error) {
        return flyt.R("processed"), nil
    }).
    WithPostFunc(func(ctx context.Context, shared *flyt.SharedStore, prepResult, execResult flyt.Result) (flyt.Action, error) {
        shared.Set("result", execResult.Value())
        return flyt.DefaultAction, nil
    })

Core Concepts

Nodes

Nodes are the building blocks. Each node has three phases:

  1. Prep - Read from shared store and prepare data
  2. Exec - Execute main logic (can be retried)
  3. Post - Process results and decide next action
// Simple node with type-safe Result handling
node := flyt.NewNode(
    flyt.WithPrepFunc(func(ctx context.Context, shared *flyt.SharedStore) (flyt.Result, error) {
        // Use type-safe getters to retrieve data
        input := shared.GetString("input")
        return flyt.R(input), nil
    }),
    flyt.WithExecFunc(func(ctx context.Context, prepResult flyt.Result) (flyt.Result, error) {
        // Use Result's type-safe accessors
        input := prepResult.AsStringOr("")
        // Process data
        return flyt.R("result"), nil
    }),
    flyt.WithPostFunc(func(ctx context.Context, shared *flyt.SharedStore, prepResult, execResult flyt.Result) (flyt.Action, error) {
        shared.Set("output", execResult.Value())
        return flyt.DefaultAction, nil
    }),
)

// Working with structured data
type ProcessRequest struct {
    UserID    int      `json:"user_id"`
    Operation string   `json:"operation"`
    Resources []string `json:"resources"`
}

processNode := flyt.NewNode(
    flyt.WithPrepFunc(func(ctx context.Context, shared *flyt.SharedStore) (flyt.Result, error) {
        // Bind structured data from shared store
        var request ProcessRequest
        if err := shared.Bind("request", &request); err != nil {
            return flyt.R(nil), fmt.Errorf("invalid request: %w", err)
        }
        return flyt.R(request), nil
    }),
    flyt.WithExecFunc(func(ctx context.Context, prepResult flyt.Result) (flyt.Result, error) {
        // Use Result's Bind method for type-safe access
        var request ProcessRequest
        prepResult.MustBind(&request)  // Or use Bind() with error handling
        
        // Process the structured request
        result := processUserRequest(request.UserID, request.Operation, request.Resources)
        return flyt.R(result), nil
    }),
    flyt.WithPostFunc(func(ctx context.Context, shared *flyt.SharedStore, prepResult, execResult flyt.Result) (flyt.Action, error) {
        shared.Set("process_result", execResult.Value())
        return flyt.DefaultAction, nil
    }),
)

Actions

Actions are strings returned by a node's Post phase that determine what happens next:

func (n *MyNode) Post(ctx context.Context, shared *flyt.SharedStore, prepResult, execResult any) (flyt.Action, error) {
    // Convert to Result for type-safe access
    result := flyt.R(execResult)
    if result.AsBoolOr(false) {
        return "success", nil  // Go to node connected with "success"
    }
    return "retry", nil       // Go to node connected with "retry"
}

The default action is flyt.DefaultAction (value: "default"). If no connection exists for an action, the flow ends.

Flows

Connect nodes to create workflows:

// Create nodes
validateNode := createValidateNode()
processNode := createProcessNode()
errorNode := createErrorNode()

// Build flow with action-based routing
flow := flyt.NewFlow(validateNode)
flow.Connect(validateNode, "valid", processNode)    // If validation succeeds
flow.Connect(validateNode, "invalid", errorNode)    // If validation fails
flow.Connect(processNode, "done", nil)              // End flow after processing

// Run flow
err := flow.Run(ctx, shared)

Shared Store

Thread-safe data sharing between nodes with type-safe helpers:

shared := flyt.NewSharedStore()

// Set values
shared.Set("name", "Alice")
shared.Set("count", 42)
shared.Set("price", 19.99)
shared.Set("enabled", true)
shared.Set("items", []string{"apple", "banana"})
shared.Set("config", map[string]any{"timeout": 30})

// Type-safe getters (return zero values if not found or wrong type)
str := shared.GetString("name")           // Returns "Alice"
num := shared.GetInt("count")             // Returns 42
price := shared.GetFloat64("price")       // Returns 19.99
enabled := shared.GetBool("enabled")      // Returns true
items := shared.GetSlice("items")         // Returns []any{"apple", "banana"}
config := shared.GetMap("config")         // Returns map[string]any{"timeout": 30}

// Type-safe getters with custom defaults
str = shared.GetStringOr("missing", "anonymous")     // Returns "anonymous"
num = shared.GetIntOr("missing", -1)                 // Returns -1
price = shared.GetFloat64Or("missing", 99.99)        // Returns 99.99
enabled = shared.GetBoolOr("missing", true)          // Returns true

// Bind complex types (similar to Echo framework)
type User struct {
    ID    int      `json:"id"`
    Name  string   `json:"name"`
    Email string   `json:"email"`
    Tags  []string `json:"tags"`
}

// Store a typed struct - it gets stored as-is
user := User{
    ID:    123,
    Name:  "Alice",
    Email: "[email protected]",
    Tags:  []string{"admin", "developer"},
}
shared.Set("user", user)

// Later, in a node's Prep function, bind it back to a struct
func (n *MyNode) Prep(ctx context.Context, shared *flyt.SharedStore) (any, error) {
    var user User
    err := shared.Bind("user", &user)  // Binds stored data to struct
    if err != nil {
        return nil, err
    }
    // Or use MustBind (panics on failure - use for required data)
    // shared.MustBind("user", &user)
    
    return user, nil
}

// Utility methods
exists := shared.Has("key")       // Check if key exists
shared.Delete("key")               // Remove a key
keys := shared.Keys()              // Get all keys
length := shared.Len()             // Get number of items
shared.Clear()                     // Remove all items

// Get all data as a map (returns a copy)
allData := shared.GetAll()

// Merge multiple values at once
shared.Merge(map[string]any{
    "user_id": 123,
    "config": map[string]any{"timeout": 30},
})

The type-safe getters handle numeric conversions automatically:

  • GetInt() converts from int8, int16, int32, int64, uint variants, and float types
  • GetFloat64() converts from all numeric types including int and float32
  • GetSlice() uses the same conversion logic as ToSlice() utility

Intermediate Patterns

Creating Nodes with Builder Pattern

The builder pattern provides a fluent interface for creating nodes:

node := flyt.NewNode().
    WithMaxRetries(3).
    WithWait(time.Second).
    WithPrepFunc(func(ctx context.Context, shared *flyt.SharedStore) (flyt.Result, error) {
        // Read input data
        data := shared.Get("input")
        return flyt.NewResult(data), nil
    }).
    WithExecFunc(func(ctx context.Context, prepResult flyt.Result) (flyt.Result, error) {
        // Process data
        result := processData(prepResult.Value())
        return flyt.NewResult(result), nil
    }).
    WithPostFunc(func(ctx context.Context, shared *flyt.SharedStore, prepResult, execResult flyt.Result) (flyt.Action, error) {
        // Store result
        shared.Set("output", execResult.Value())
        return flyt.DefaultAction, nil
    })

Configuration via Closures

Pass configuration to nodes using closures:

func createAPINode(apiKey string, baseURL string) flyt.Node {
    return flyt.NewNode(
        flyt.WithExecFunc(func(ctx context.Context, prepResult flyt.Result) (flyt.Result, error) {
            // apiKey and baseURL are captured in the closure
            url := fmt.Sprintf("%s/data", baseURL)
            req, _ := http.NewRequest("GET", url, nil)
            req.Header.Set("Authorization", apiKey)
            // ... make request
            return flyt.R(data), nil
        }),
    )
}

// Usage
node := createAPINode("secret-key", "https://api.example.com")

Error Handling & Retries

Add retry logic to handle transient failures:

node := flyt.NewNode(
    flyt.WithExecFunc(func(ctx context.Context, prepResult flyt.Result) (flyt.Result, error) {
        // This will be retried up to 3 times
        data, err := callFlakeyAPI()
        return flyt.R(data), err
    }),
    flyt.WithMaxRetries(3),
    flyt.WithWait(time.Second),
    flyt.WithExecFallbackFunc(func(prepResult flyt.Result, err error) (flyt.Result, error) {
        // Called after all retries fail
        return flyt.R("default-value"), nil
    }),
)

Fallback on Failure

Handle failures gracefully by implementing the FallbackNode interface:

type CachedAPINode struct {
    *flyt.BaseNode
    cache map[string]any
}

func (n *CachedAPINode) ExecFallback(prepResult any, err error) (any, error) {
    // Return cached data when API fails
    result := flyt.R(prepResult)
    key := result.MustString()
    if cached, ok := n.cache[key]; ok {
        return cached, nil
    }
    // Return default value if no cache
    return map[string]any{"status": "unavailable"}, nil
}

func (n *CachedAPINode) Exec(ctx context.Context, prepResult any) (any, error) {
    result := flyt.R(prepResult)
    key := result.MustString()
    data, err := callAPI(key)
    if err == nil {
        n.cache[key] = data // Update cache on success
    }
    return data, err
}

The ExecFallback method is called automatically after all retries are exhausted, allowing you to provide degraded functionality, cached results, or default values.

Conditional Branching

Control flow based on results:

decisionNode := flyt.NewNode(
    flyt.WithExecFunc(func(ctx context.Context, prepResult flyt.Result) (flyt.Result, error) {
        value := prepResult.MustInt()
        return flyt.R(value > 100), nil
    }),
    flyt.WithPostFunc(func(ctx context.Context, shared *flyt.SharedStore, prepResult, execResult flyt.Result) (flyt.Action, error) {
        if execResult.MustBool() {
            return "high", nil
        }
        return "low", nil
    }),
)

flow := flyt.NewFlow(decisionNode)
flow.Connect(decisionNode, "high", highNode)
flow.Connect(decisionNode, "low", lowNode)

Advanced Usage

Custom Node Types

For complex nodes with state, create custom types:

type RateLimitedNode struct {
    *flyt.BaseNode
    limiter *rate.Limiter
}

func NewRateLimitedNode(rps int) *RateLimitedNode {
    return &RateLimitedNode{
        BaseNode: flyt.NewBaseNode(),
        limiter:  rate.NewLimiter(rate.Limit(rps), 1),
    }
}

func (n *RateLimitedNode) Exec(ctx context.Context, prepResult any) (any, error) {
    if err := n.limiter.Wait(ctx); err != nil {
        return nil, err
    }
    // Process with rate limiting
    data, err := process(prepResult)
    return data, err
}

RetryableNode Interface

For custom retry logic, implement the RetryableNode interface:

type CustomRetryNode struct {
    *flyt.BaseNode
    attempts int
}

func (n *CustomRetryNode) GetMaxRetries() int {
    // Dynamic retry count based on state
    if n.attempts > 5 {
        return 0 // Stop retrying after 5 total attempts
    }
    return 3
}

func (n *CustomRetryNode) GetWait() time.Duration {
    // Exponential backoff
    return time.Duration(n.attempts) * time.Second
}

func (n *CustomRetryNode) Exec(ctx context.Context, prepResult any) (any, error) {
    n.attempts++
    data, err := callAPI(prepResult)
    return data, err
}

Batch Processing

BatchNode simplifies batch processing by working almost exactly like regular nodes. The framework automatically detects batch processing when Prep returns []Result and handles all the complexity of iteration.

// Create a batch node with the simplified API
batchNode := flyt.NewBatchNode().
    WithPrepFunc(func(ctx context.Context, shared *flyt.SharedStore) ([]flyt.Result, error) {
        // Return []Result to indicate batch processing
        items := shared.GetSlice("items")
        results := make([]flyt.Result, len(items))
        for i, item := range items {
            results[i] = flyt.NewResult(item)
        }
        return results, nil
    }).
    WithExecFunc(func(ctx context.Context, item flyt.Result) (flyt.Result, error) {
        // Process individual item - called N times automatically
        data := item.Value().(string)
        processed := strings.ToUpper(data)
        return flyt.NewResult(processed), nil
    }).
    WithPostFunc(func(ctx context.Context, shared *flyt.SharedStore, items, results []flyt.Result) (flyt.Action, error) {
        // Called ONCE with all results aggregated
        var successful []any
        var failed int
        
        for i, result := range results {
            if result.IsError() {
                log.Printf("Item %d failed: %v", i, result.Error())
                failed++
            } else {
                successful = append(successful, result.Value())
            }
        }
        
        shared.Set("processed", successful)
        shared.Set("failed_count", failed)
        
        if failed > 0 {
            return "partial_success", nil
        }
        return flyt.DefaultAction, nil
    })

// Set items and run
shared := flyt.NewSharedStore()
shared.Set("items", []string{"item1", "item2", "item3"})
action, err := flyt.Run(ctx, batchNode, shared)

Advanced Batch Configuration

Configure concurrency and error handling:

batchNode := flyt.NewBatchNode().
    WithPrepFunc(func(ctx context.Context, shared *flyt.SharedStore) ([]flyt.Result, error) {
        // Convert items to []Result for processing
        users := shared.GetSlice("users")
        results := make([]flyt.Result, len(users))
        for i, user := range users {
            results[i] = flyt.NewResult(user)
        }
        return results, nil
    }).
    WithExecFunc(func(ctx context.Context, user flyt.Result) (flyt.Result, error) {
        // Process each user - automatically called N times
        userData := user.AsMapOr(nil)
        if userData == nil {
            return flyt.Result{}, fmt.Errorf("invalid user data")
        }
        
        processed, err := processUser(userData)
        if err != nil {
            // Return error for this specific item
            return flyt.Result{}, err
        }
        return flyt.NewResult(processed), nil
    }).
    WithPostFunc(func(ctx context.Context, shared *flyt.SharedStore, users, results []flyt.Result) (flyt.Action, error) {
        // Aggregate all results
        successCount := 0
        for _, result := range results {
            if !result.IsError() {
                successCount++
            }
        }
        
        log.Printf("Processed %d/%d users successfully", successCount, len(users))
        shared.Set("success_count", successCount)
        
        return flyt.DefaultAction, nil
    }).
    WithBatchConcurrency(10).           // Process up to 10 items concurrently
    WithBatchErrorHandling(true).       // Continue even if some items fail
    WithMaxRetries(3).                  // Retry each item up to 3 times
    WithWait(time.Second)                // Wait between retries

Batch Error Handling

The API provides clean error handling per item:

batchNode := flyt.NewBatchNode().
    WithPrepFunc(func(ctx context.Context, shared *flyt.SharedStore) ([]flyt.Result, error) {
        // Some items may already be errors
        return []flyt.Result{
            flyt.NewResult("valid1"),
            flyt.NewErrorResult(errors.New("invalid input")),
            flyt.NewResult("valid2"),
        }, nil
    }).
    WithExecFunc(func(ctx context.Context, item flyt.Result) (flyt.Result, error) {
        if item.IsError() {
            // Pass through existing errors
            return item, nil
        }
        
        // Process valid items
        value := item.Value().(string)
        if value == "fail" {
            return flyt.Result{}, errors.New("processing failed")
        }
        return flyt.NewResult(strings.ToUpper(value)), nil
    }).
    WithPostFunc(func(ctx context.Context, shared *flyt.SharedStore, items, results []flyt.Result) (flyt.Action, error) {
        // Handle mixed success/failure results
        var errors []error
        var successes []any
        
        for i, result := range results {
            if result.IsError() {
                errors = append(errors, result.Error())
                log.Printf("Item %d: %v", i, result.Error())
            } else {
                successes = append(successes, result.Value())
            }
        }
        
        if len(errors) > 0 && len(successes) == 0 {
            return "all_failed", nil
        } else if len(errors) > 0 {
            return "partial_success", nil
        }
        return "all_success", nil
    }).
    WithBatchErrorHandling(true)  // Continue processing despite errors

// The Result type now supports error tracking
result := flyt.NewErrorResult(errors.New("something failed"))
if result.IsError() {
    fmt.Printf("Error: %v\n", result.Error())
}

Key Benefits

  1. Simplicity: BatchNode works almost exactly like regular nodes
  2. Automatic Handling: Framework detects []Result and handles iteration
  3. Clean Error Tracking: Each Result can carry its own error state
  4. Flexible Configuration: Concurrency and error handling are configurable
  5. Type Safety: Strong typing with Result type throughout

Batch Flows

Use BatchNode to run flows with different parameters:

// Use BatchNode to process multiple flow inputs
batchNode := flyt.NewBatchNode().
    WithPrepFunc(func(ctx context.Context, shared *flyt.SharedStore) ([]flyt.Result, error) {
        // Prepare multiple parameter sets
        return []flyt.Result{
            flyt.NewResult(map[string]any{"user_id": 1, "email": "[email protected]"}),
            flyt.NewResult(map[string]any{"user_id": 2, "email": "[email protected]"}),
            flyt.NewResult(map[string]any{"user_id": 3, "email": "[email protected]"}),
        }, nil
    }).
    WithExecFunc(func(ctx context.Context, params flyt.Result) (flyt.Result, error) {
        // Process each parameter set
        data := params.MustMap()
        userID := data["user_id"].(int)
        email := data["email"].(string)
        
        // Run your flow logic here
        result := processUser(userID, email)
        return flyt.NewResult(result), nil
    }).
    WithPostFunc(func(ctx context.Context, shared *flyt.SharedStore, params, results []flyt.Result) (flyt.Action, error) {
        // Aggregate results from all parameter sets
        var allResults []any
        for _, r := range results {
            if !r.IsError() {
                allResults = append(allResults, r.Value())
            }
        }
        shared.Set("all_results", allResults)
        return flyt.DefaultAction, nil
    }).
    WithBatchConcurrency(3)  // Process 3 parameter sets concurrently

// Create a flow that starts with the batch node
flow := flyt.NewFlow(batchNode)
flow.Connect(batchNode, flyt.DefaultAction, aggregateNode)

// Run the flow
err := flow.Run(ctx, shared)

Nested Flows

Compose flows for complex workflows:

// Sub-flow for data validation
validationFlow := createValidationFlow()

// Main flow
mainFlow := flyt.NewFlow(fetchNode)
mainFlow.Connect(fetchNode, "validate", validationFlow)
mainFlow.Connect(validationFlow, flyt.DefaultAction, processNode)

Flow as Node

Flows implement the Node interface and can be used anywhere a node is expected:

// Create a reusable flow
func createProcessingFlow() *flyt.Flow {
    validateNode := createValidateNode()
    transformNode := createTransformNode()
    
    flow := flyt.NewFlow(validateNode)
    flow.Connect(validateNode, "valid", transformNode)
    return flow
}

// Use the flow as a node in another flow
processingFlow := createProcessingFlow()
mainFlow := flyt.NewFlow(fetchNode)
mainFlow.Connect(fetchNode, flyt.DefaultAction, processingFlow) // Flow used as node
mainFlow.Connect(processingFlow, flyt.DefaultAction, saveNode)

Worker Pool

For custom concurrent task management:

// Create a worker pool with 10 workers
pool := flyt.NewWorkerPool(10)

// Submit tasks
for _, item := range items {
    item := item // Capture loop variable
    pool.Submit(func() {
        // Process item
        result := processItem(item)
        // Store result safely
        mu.Lock()
        results = append(results, result)
        mu.Unlock()
    })
}

// Wait for all tasks to complete
pool.Wait()

// Clean up
pool.Close()

Utility Functions

ToSlice

Convert various types to slices for batch processing:

// Convert different types to []any
items1 := flyt.ToSlice([]string{"a", "b", "c"})
items2 := flyt.ToSlice([]int{1, 2, 3})
items3 := flyt.ToSlice("single item") // Returns []any{"single item"}

// Useful for batch processing with mixed types
shared.Set("items", flyt.ToSlice(data))

Best Practices

  1. Single Responsibility: Each node should do one thing well
  2. Idempotency: Nodes should be idempotent when possible
  3. Error Handling: Always handle errors appropriately
  4. Context Awareness: Respect context cancellation
  5. Concurrency Safety: Don't share node instances across flows

Examples

Check out the cookbook directory for complete, real-world examples:

  • Agent - AI agent with web search capabilities using LLM and search providers
  • Chat - Interactive chat application with conversation history
  • LLM Streaming - Real-time streaming of LLM responses with OpenAI SSE
  • MCP - Model Context Protocol integration with OpenAI function calling
  • Summarize - Text summarization with error handling and retries
  • Tracing - Distributed tracing with Langfuse for observability

License

MIT